Pyspark + Spark Operator + Amazon EKS = Big Data On Steroids!

3Bit Technologies
Nerd For Tech
Published in
7 min readMar 11, 2021

--

By the end of this post, you’ll be able to:

  • Create an EKS cluster as a code, using the recommended tools by AWS
  • Reduce costs and improve availability by using Spot Instances the right way
  • Reduce costs even more and improve scalability by installing some powerful add-ons on the EKS cluster
  • Run any Spark Job on EKS
  • Create a simple but powerful PySpark application to read/writes from and to a resilient object storage, and also applying some cool techniques to prepare and optimize your Data lake.

Introduction

On this post, we will show you how to install and run Spark Jobs in a Kubernetes cluster, more precisely we will use Amazon EKS as our kubernetes ‘flavor’.

Also we will be creating a very simple python Job (pyspark) that will be used to read JSON events from a S3 Bucket, repartition this events by year/month/day/hour and convert them to an optimized columnar format called Apache Parquet, finally writing this events back to another S3 Bucket.

As our Spark Runner, we will use the Spark on K8s Operator. We could use EMR on EKS for example, but for this tutorial the Spark Operator was our choice as it provides good control over spark jobs, restart policies, ability to preserve execution logs and it’s for free =).

Before we get our hands dirty, let’s just talk a little bit about these two amazing technologies (Spark and Amazon EKS).

Apache Spark, usually known as the successor of MapReduce, is a framework for parallel distributed data processing, being capable of executing concurrent Jobs (with support for the programming languages: Python, Java, Scala, SQL and R) to easily process huge amounts of data, with resilience, speed and scalability. Spark is used mostly for batch processing use cases (bounded data such as ETL jobs, analytics, data integration, etc.), but it also provides support for streaming use cases (unbounded data, like consuming messages from an Apache Kafka topic to train a ML model, performing streaming ETL, etc.).

Amazon EKS is the managed Kubernetes offering from Amazon Web Services, which provides a fully managed Kubernetes control plane, with many security and management features, it also offers easily integration with other AWS services as well. Amazon EKS is also available as an open source distribution, known as EKS Open Distro.

Helm is a package manager for Kubernetes, that provides management for Kubernetes workloads, such as sharing applications, templating, performing upgrades, install third-party software, etc. Without Helm, we need to manually create our manifests and share them with other teams/environments by doing copy-paste, with Helm we can have distinct repositories with secure and validated versions of our applications.

Here are the requirements to follow this post:

Unfortunately, Amazon doesn’t offer a free tier version of EKS, so be aware that creating an EKS cluster will incur charges for your AWS account!

Create an EKS cluster

To create our cluster, we will use the really handy eksctl tool, which is the official CLI (Command line interface) to provision EKS clusters.

Eksctl supports both imperative and declarative ways of creating/configuring clusters, so let’s define our configuration file that will specify every important configuration of our new cluster:

sedsed

A few things/tricks to consider about our configuration file:

  • We are using EKS version 1.18
  • Our cluster and all necessary resources (VPC, Subnets, NAT Gateway, etc.) will be created in us-east-2 (Ohio) region, because Spot intances are even cheaper there.
  • We enabled OIDC to be able to use IRSA with the Cluster Autoscaler component. The cluster autoscaler will enable our cluster to, well, automatically scale in and out : )
  • Because we will be running resilient workloads, we can use Spot instances with mixed pools to have a good balance between cost and availability.
  • We tag everything accordingly so Cluster Autoscaler will be able to scale our cluster in and out as needed.
  • Notice that we are using only one availability zone, but wait let me explain, our cluster was designed to be ephemeral, or in other words, to be created, run our jobs and be destroyed, decreasing the risk of running in a single AZ. And by running in one availability zone we can avoid network transfer costs between AZs. Also we recommend using a VPC S3 Endpoint, to avoid having unnecessary NAT Gateway traffic costs.
  • We installed the SSM agent in our worker nodes, to be able to access them without an SSH keypair!
  • Notice that we attached some managed policies to our worker group and one of them was like arn:aws:iam::XXXXXXXXXXX:policy/backend-AnalyticsPolicy-XXXXXXX, obviously this is a fake policy ARN and it should be replaced with the ARN of a valid policy granting permissions to access any AWS Service that we want our spark Jobs to have access, for example, to read/write files from/to AWS S3 or to use it as the Spark Checkpoint location. But why is that? Because this is (still) the easiest and secure way to provide access to AWS resources for Spark Jobs managed by the Spark Operator on EKS, as Hadoop (used by Spark s3a filesystem) comes with bundled version of AWS SDK, we can’t use IRSA to enable the WebIdentity Credentials Provider and using fixed access/secret keys is never a good idea.

To create our cluster (I’ll presume you already checked all the requirements mentioned before), execute the following command:

eksctl create cluster -f eks-cluster.yaml

If everything’s alright, congratulations! You now have a working (and cost effective) EKS cluster!

Install Spark Operator

We can now install Spark Operator in our brand new cluster with the following commands:

helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operatorhelm repo updatehelm install spark-operator spark-operator/spark-operator --namespace spark-operator --create-namespace --set sparkJobNamespace=default

So here we just:

  • Used helm CLI to add a new public repository
  • Updated all our local configured repositories
  • Used helm CLI again to install the Spark Operator chart (think that chart is an application, on this case is the Spark Operator itself), on the Kubernetes namespace spark-operator, with the latest version of the chart available (if we don’t specify a version, this is the default behavior) and watching our Spark Jobs (that we will create very soon) on the namespace default.

After a couple of minutes, run the following command to check if Spark Operator was installed successfully (you should see the spark-operator Pod listed):

kubectl get po -n spark-operator

Install Cluster Autoscaler

Let’s install the Cluster Autoscaler, so our cluster will scale to meet our BigData needs!

Just like we did earlier, follow the commands to install CA using helm:

helm repo add autoscaler https://kubernetes.github.io/autoscalerhelm repo updatehelm install cluster-autoscaler autoscaler/cluster-autoscaler --namespace kube-system --set 'autoDiscovery.clusterName'=analytics-k8s --version 1.1.1

Install and run a Python Spark Job

By now we should be ready to install and run our first Spark Job! So lets do it.

Do you remember when we mentioned a policy to enable our Spark Jobs to have access to AWS Services? So, for our example, assuming that we want our Spark Job to read files from a S3 bucket and publish files to another S3 bucket, let’s pretend that we created and attached the following IAM Managed Policy to our EKS worker nodes (this is just a simple snippet from Cloudformation):

This policy grants full access to the buckets taxi-events (our source bucket) and taxi-stage (our destination bucket).

Our spark job will be used to repartition JSON formatted events coming from taxi drivers by year, day, month and hour. Also this job will convert this events to use a more optimized columnar format, called Apache Parquet.

Here’s an example of our JSON event:

The source code of our pyspark re-partitioning Job will look like this:

To make this job available to Spark Operator, we will copy the file to a S3 Bucket (for this tutorial we are using a fake one, replace it with your repository Bucket):

aws s3 cp repartition-job.py s3://my-awesome-jobs/

Here’s our Job definition so Spark Operator can run our job:

This manifest is almost self explainable, but here’re a few things/tricks to consider:

  • We selected the InstanceProfile credentials provider, so our attached IAM managed policy will be used.
  • Our job will run on cluster mode with 6 instances, with 3 CPU and 10 GB of memory.
  • There’re some tricks around the ivy cache used by Spark to download the dependencies (packages).
  • This job will use a pre-built image of Spark version 3.0.1, available in our Docker Hub. If you want to generate your own image (recommended), follow this guide.

To submit this Job and watch the results, execute the following commands:

kubectl apply -f repartition-job.yamlkubectl get sparkapplication -w -n default

It’ll take some time to our Job to be available and running, because the worker nodes will have to pull the Spark Docker Image and also the Cluster Autoscaler might need to scale-out the number of worker nodes to meet the demand.

That’s it guys, in a real world scenario we would add monitoring/logging, a good continuous integration pipeline, security and some other few things.

Cleaning Up

Please don’t forget to delete the resources created by this tutorial! Follow the command to delete all resources created by eksctl:

eksctl delete cluster -w -f eks-cluster.yaml

We hope you enjoyed this post on as much as we did and if you have any doubt please feel free to reach us!

Want to know more? Don’t forget to visit us at https://3bit.com.br/

--

--

3Bit Technologies
Nerd For Tech

Cloud Specialists providing professional services with DevOps, BigData, Cloud Native Applications and Security. https://www.3bit.com.br/3bit