Spark on Kubernetes: An end-to-end Streaming Data Pipeline

Kubernetes has seen a meteoric rise in recent years. This cluster manager has gained a lot of traction since its introduction in Spark 2.3. The following are just a few of the major benefits of running Spark on Kubernetes rather than YARN.

Introduction

    • K8s allows multiple applications to coexist on a single cluster. Furthermore, namespaces keep workloads from interfering with one another.

    • K8s requires Spark applications and their dependencies to be packaged into containers. This resolves typical Spark conflict concerns.

    • K8s delivers effective resource management with more control over the amount of compute or memory consumed by each application.

    • K8s is an open-source project, which means it is a cloud-agnostic deployment that helps organisations avoid vendor lock-in.

    • K8s is offered by every major cloud vendor and is the future of big data analytics applications.

Overview

 

Tweepy Application

As previously mentioned, Spark excels at managing continuous data streams. This project will use the Python library Tweepy to retrieve data from the Twitter API. Before continuing, register an account on Twitter’s Developer Portal, create a free project, and place each of your authentication tokens into a file called .env. This file should be copied and pasted into each application folder.

BEARER_TOKEN=<YOUR_BEARER_TOKEN_HERE>
DB_NAME=postgresdb
DB_USER=ubuntu
DB_PASS=ubuntu
DB_HOST=<YOUR_MINIKUBE_IP_HERE>
DB_PORT=30432

The Twitter API requires that at least one rule be defined before a stream can be initialised. In other words, general tweets cannot be retrieved; they must have a specific hashtag or phrase. The GitHub repository contains a Jupyter Notebook where rules can be simply added or removed. For instance, the following rule can be created to fetch tweets that are in English and original (not a quotation, reply, or retweet).

# Receiving tweet and selecting fields of interest
print(f"========== Tweet # {self.i} ==========")
full_tweet = json.loads(data)

keys = ['text', 'created_at']
tweet_dict = { k:full_tweet['data'][k] for k in keys }
tweet_string = json.dumps(tweet_dict)

# Sending tweet to the socket
tweet_encoded = (tweet_string + "\n").encode('utf-8')
print(tweet_encoded)
s_conn.send(tweet_encoded)

PySpark Application

# Creating the SparkSession
spark = SparkSession \
         .builder \
         .appName("SparkStreaming") \
         .getOrCreate()

# Defining the schema of the incoming JSON string
schema = StructType([
         StructField("text", StringType(), True),
         StructField("created_at" , TimestampType(), True)
         ])

# Consuming the data stream from the twitter_app.py socket
tweets_df = spark \
             .readStream \
             .format("socket") \
             .option("host", "twitter-service") \
             .option("port", 9999) \
             .load() \
            .select(F.from_json(F.col("value").cast("string"), schema).alias("tmp")).select("tmp.*")

# Starting to process the stream in mini batches every ten seconds
q1 = tweets_df \
         .writeStream \
         .outputMode("append") \
         .foreachBatch(batch_hashtags) \
         .option("checkpointLocation", "./check") \
         .trigger(processingTime='10 seconds') \
         .start()

q1.awaitTermination()

The VADER function requires a UDF (user-defined function), which should only be used if PySpark does not already include the necessary function. The disadvantage of doing this is that the application cannot benefit from the PySpark code that has been optimized. However, this is acceptable in this example since Minikube (a local single-node K8s cluster) is being used.

Streamlit Application

# Getting the total percentage of each sentiment category
percent_query = '''
WITH t1 as (
SELECT sentiment, COUNT(*) AS n
FROM tweets
GROUP BY sentiment
)

SELECT sentiment, n, ROUND((n)/(SUM(n) OVER ()) * 100, 2)::real AS "percentage"
FROM t1;
'''


# Getting the total count of each sentiment category per minute
count_query = '''
WITH t1 as (
SELECT *, date_trunc('minute', created_at) AS truncated_created_at
FROM tweets
)

SELECT sentiment, COUNT(*) AS n, truncated_created_at
FROM t1
GROUP BY truncated_created_at, sentiment
ORDER BY truncated_created_at, sentiment;
'''


# Getting all of the tweets belonging to each sentiment category
negative_tweets = '''
SELECT text, created_at, sentiment, compound FROM tweets WHERE sentiment = 'negative' ORDER BY created_at DESC;
'''

neutral_tweets = '''
SELECT text, created_at, sentiment, compound FROM tweets WHERE sentiment = 'neutral' ORDER BY created_at DESC;
'''

positive_tweets = '''
SELECT text, created_at, sentiment, compound FROM tweets WHERE sentiment = 'positive' ORDER BY created_at DESC;
'''

Enable Role-Based Access Control (RBAC)

K8s uses service accounts and RBAC (Role-Based Access Control) roles to regulate access to the cluster’s resources. The Spark driver pod requires access to a service account that has the permission to create and monitor executor pods and services. A new service account is created within the desired namespace to accomplish this. By passing the clusterrolebinding flag to the kubectl command, the service account is assigned an edit clusterrole.

Create a new namespace.

kubectl create ns pyspark

Set it as the default namespace.

kubectl config set-context --curent --namespace=pyspark

Create a new service account.

kubectl create serviceaccount pyspark-service -n pyspark

Grant the service account access to resources in the namespace.

kubectl create clusterrolebinding pyspark-clusterrole \
    --clusterrole=edit \
    --serviceaccount=pyspark:pyspark-service \
    -n pyspark

Build Docker Images

The Docker images for each of the applications must now be created. Images provide a blueprint for creating K8s pods (a group of one or more containerised workloads).

Point the terminal to the Minikube Docker daemon.

eval $(minikube -p minikube docker-env)

Build PySpark Image

Create the PySpark image by changing the directory to Spark’s home directory and executing the following shell script.

cd $SPARK_HOME
./bin/docker-image-tool.sh \
    -m \
    -t v3.3.1 \
    -p ./kubernetes/dockerfiles/spark/bindings/python/Dockerfile \
    build

Build Application Images

The subsequent step involves using the previous image as a base to create a new image for the actual PySpark application. Once finished, create the Tweepy and Streamlit images in each of their directories—every folder comes with the necessary Dockerfile inside. Once everything has been built and is available in the Docker registry inside of Minikube, run the docker images command to verify it.

Inside ‘2. twitter-app’.

docker build -t twitter-app:1.0 .

Inside ‘3. spark-app’.

docker build -t spark-app:1.0 .

Inside ‘4. streamlit-app’.

docker build -t streamlit-app:1.0 .

List the Docker images.

docker images

Deploy to Minikube

Now that the Docker images are prepared, they may be deployed to the Minikube cluster. Run the minikube dashboard command to access the web-based Kubernetes interface. Interact with this for troubleshooting, such as connecting with the containerised application’s shell or reviewing its logs, etc.

Start Minikube dashboard.

minikube dashboard

PostgreSQL and pgAdmin Deployment

The PostgreSQL database and pgAdmin will be deployed first. Additionally, two NodePort services will be included to allow access to the database and interface from outside the cluster. To access this pgAdmin service at a given port, run the minikube service command.

Inside ‘1. postgresql’, start PgAdmin and PostgreSQL.

kubectl apply -f .

Access pgAdmin from outside the cluster.

minikube service pgadmin-service -n pyspark

Tweepy Deployment

Inside ‘2. twitter-app’, start Twitter stream socket.

kubectl apply -f twitter-app.yaml

PySpark Deployment

./bin/spark-submit \
    --master k8s://${K8S_SERVER} \
    --deploy-mode cluster \
    --jars local://${SPARK_HOME}/spark-app/postgresql-42.5.1.jar \
    --name spark-app \
    --conf spark.kubernetes.container.image=spark-app:1.0 \
    --conf spark.kubernetes.context=minikube \
    --conf spark.kubernetes.namespace=pyspark \
    --conf spark.kubernetes.driver.pod.name=pyspark-driver \
    --conf spark.kubernetes.driver.label.app=spark-app \
    --conf spark.executor.instances=1 \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=pyspark-service \
    --conf spark.kubernetes.file.upload.path=/tmp \
    --conf spark.kubernetes.submission.waitAppCompletion=false \
    local://${SPARK_HOME}/spark-app/spark_sentiment.py

A NodePort service to view the web user interface from outside the cluster is also included in a separate deployment file. To access this Spark service at a given port, run the minikube service command.

Inside ‘3. spark-app’, start the Spark service.

kubectl apply -f spark-service.yaml

To start the PySpark application, run the following command.

./start-spark-app.sh

To stop the PySpark application, run the following command.

./stop-spark-app.sh

Access the Spark web user interface from outside the cluster.

minikube service spark-service -n pyspark

There are several tabs available in the Spark web user interface to track the setup, resource usage, and status of the Spark application. These tabs include ‘Jobs’, ‘Stages’, ‘Storage’, ‘Environment’, ‘Executors’, ‘SQL / Dataframe’, and ‘Structured Streaming’. For instance, the processing for each mini batch and the time it took to finish may be seen under the Stages tab (Figure 1). The Structured Streaming tab shows the job’s running time, average processing times, etc. (Figure 2).

 

CROPWAY
Figure 1 — Spark Web UI (Stages)
CROPWAY
Figure 2 — Spark Web UI (Structured Streaming)

Streamlit Deployment

The Streamlit application is the last thing to be deployed. A NodePort service to view the dashboard from outside the cluster is also included in this deployment file. To access this Streamlit service at a given port, run the minikube service command (Figure 3).

Inside ‘4. streamlit-app’, start the Streamlit dashboard.

kubectl apply -f streamlit-app.yaml

Access the Streamlit dashboard from outside the cluster.

minikube service streamlit-service -n pyspark
CROPWAY
Figure 3 - Streamlit Dashbord (GIF)
CROPWAY
Figure 4 – Minikube Dashboard (Pods)

Summary

You should be able to observe the Streamlit dashboard update the results in real-time if everything has been deployed correctly and the containers are operating without errors (Figure 4). The usage of K8s as a resource manager by Spark has now been demonstrated. While sentiment analysis on a continuous data stream demonstrated in this blog post was done using Structured Streaming, Spark is capable of much more, including Machine Learning at very large scales. Minikube (a local single-node K8s cluster) and a single Spark executor were used in this project, but the same concepts may be applied to production-grade clusters provided by leading cloud providers like AWS, Azure, DigitalOcean, GCP, among others.

Use Cases in Agriculture

At Cropway, we are developing innovative technological solutions for the agricultural sector. For example, Spark Structured Streaming can be leveraged to support real-time applications like crop health monitoring, weather pattern forecasting, livestock health and wellbeing monitoring, and supply chain management.

    • Crop health monitoring: IoT devices can continuously produce data on temperature, nutrient levels, and soil moisture. This information can then be used to improve irrigation and fertilisation techniques and identify potential problems in real-time using a streaming data pipeline.

    • Weather pattern forecasting: Real-time data from IoT weather stations and satellite images may be analysed using a streaming data pipeline. This makes it possible to predict future weather patterns, aid farmers in improving crop management techniques, and help them get ready for severe weather.

    • Livestock health and wellbeing monitoring: Animal activity, temperature, and feed intake may all be tracked using data from IoT devices. For instance, by keeping an eye on the health of animals, infections can be avoided, and diseases can be detected early, leading to better treatment procedures.

    • Supply chain management: A streaming data pipeline can be used to track and analyse data continuously from different locations in the agricultural supply chain, such as farm production, shipping, and storage, to enhance procedures and identify bottlenecks.

I hope this article has helped you better understand how to run Spark on Kubernetes. Please share any comments or feedback you may have!

 

Credits

Spark on Kubernetes: An end-to-end Streaming Data Pipeline
Matthew McNulty

Chief Data Scientist

Matthew is Cropway’s Chief Data Scientist, with 3+ years of experience in a variety of fields and domains and a strong interest in emerging artificial intelligence technologies. He graduated with a first-class honours in MSc Big Data Analytics from Atlantic Technological University, Ireland. Exascale AI, which he cofounded, was named runner-up in the Ambition pre-accelerator programme by LEO Donegal, in partnership with NDRC. At Cropway, he designed and developed a production-ready blockchain-based supply chain application using Kubernetes and Hyperledger Fabric.

Share this content on Social Media

Leave a Reply

Your email address will not be published. Required fields are marked *