I’m a big fan of Spark. Nowadays, data is everywhere from databases, APIs or flat files, data exploitation is limitless !
As Data Engineer, my day-to-day is to find the best way to create efficient, scalable and fault tolerant ETLs with Apache Spark, being one of the most convenient tools to do that.
Before starting by the core subject of discussion (streaming twitter data with Spark), let’s summarize what is Spark.
What is Apache Spark ?
Created in 2009 at U.C. Berkeley, Spark is a distributed data processing engine.
Spark core is the base engine for large-scale parallel and distributed data processing.
Additional libraries that are built on top of the core enable many workloads for streaming, SQL, and machine learning.
Spark Core is responsible for fault recovery, scheduling, memory management, distributing and monitoring jobs a cluster and interacting with storage system.

More than a powerful tool, its usage is various:
- Spark SQL
- Streaming
- MLib
- GraphX
It is an efficient way to process large dataset. You can use Spark in many languages:
- Java
- Scala (its native language)
- Python
- R
- SQL
Note: I based this project on this Hanee’ Medhat Shousha’s tutorial but developed it further with :
- The idea for Spark Streaming
- Python functions about tweet management
Twitter API
Now let’s go to the Twitter part. To have access to the twitter API, you have to create to a developer account here.
- Create an application and fill the form
- Once you have your application created, go to your new created app and open the “Keys and Access Tokens” tab.
Your “Consumer API keys” should be already available, however, this is not the case for “Access token & access token secret”, so click on “Generate”.
3. Create a json file on your computer and paste all information into :
- consumerKey
- consumerSecret
- accessTokenKey
- accessTokenSecret

4. And below the code to create a class to twitter credential:
import json | |
import requests_oauthlib | |
def secure(): | |
path = '.../secret/' | |
# Load Twitter API secrets from a json file | |
secrets = json.loads(open(path + 'secrets.json').read()) | |
api_key = secrets['API_key'] | |
api_secret_key = secrets['API_secret_key'] | |
access_token = secrets['access_token'] | |
access_token_secret = secrets['access_token_secret'] | |
my_auth = requests_oauthlib.OAuth1(api_key, api_secret_key, access_token, access_token_secret) | |
return my_auth |
Tweet Structure
Before jumping in spark, it’s important to understand the twitter JSON data structure. A tweet is represented by a JSON (so a dictionary data structure in Python). You will find more exhaustive information here
Twitter reader
- Get_tweets
This function allows you to connect on the API. This one of important part of the sender code, so I’m detailing the code below:
- Official twitter API URL :
url = 'https://stream.twitter.com/1.1/statuses/filter.json'
- API request parameters (more parameters here):
query_data = [(‘language’, ‘en’), (‘locations’, ‘-130,-20,100,50’)]
- Create URL :
query_url = url + ‘?’ + ‘&’.join([str(t[0]) + ‘=’ + str(t[1]) for t in query_data])
- Send get request
response = requests.get(query_url, auth=my_auth, stream=True)
The final code is below :
def get_tweets(): | |
url = 'https://stream.twitter.com/1.1/statuses/filter.json' | |
query_data = [('language', 'en'), ('locations', '-130,-20,100,50')] | |
query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) | |
response = requests.get(query_url, auth=my_auth, stream=True) | |
return response |
2. Send_tweet_to_spark
This function sends data to spark through localhost/9009 port. For each tweet received:
In “Try” section:
- You load your row :
full_tweet = json.loads(line)
- In the json file, you select “datetime” and “text” element :
datetime = full_tweet['created_at'][:20]
tweet = full_tweet['text']

- Send tweet through TCP connection :
tcp_connection.send(bytes(tweet, "utf-8"))
In except:
- Sent system error via print :
e = sys.exc_info()[0]
print("Error KO000: %s" % e)
The final code is below:
def send_tweets_to_spark(http_resp, tcp_connection): | |
for line in http_resp.iter_lines(): | |
try: | |
full_tweet = json.loads(line) | |
datetime = full_tweet['created_at'][:20] | |
tweet = full_tweet['text'] | |
print (f"---------------{datetime}--------------------------") | |
print(tweet) | |
tcp_connection.send(bytes(tweet, "utf-8")) | |
except: | |
e = sys.exc_info()[0] | |
print("Error : %s" % e) |
3. In the main function
- Define IP and port
TCP_IP = "localhost"
TCP_PORT = 9009
- Create stream socket, bind ip/port and listen it. The value in parenthesis is an argument specifies the number of queued connections. Then, the tweeter interface will listen and wait for a connection with the bind ip/port and it’s the print message that inform you about its state:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((TCP_IP, TCP_PORT))
s.listen(1)
print("Waiting for TCP connection...")
- Once a system has the bind ip/connection, the print message will display and data will be sent through the “send_tweets_to_spark” function:
conn, addr = s.accept()
print("Connected... Starting getting tweets.")
resp = get_tweets()
send_tweets_to_spark(resp, conn)
The final code is below :
def main(): | |
TCP_IP = "localhost" | |
TCP_PORT = 9009 | |
conn = None | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.bind((TCP_IP, TCP_PORT)) | |
s.listen(1) | |
print("Waiting for TCP connection...") | |
conn, addr = s.accept() | |
print("Connected... Starting getting tweets.") | |
resp = get_tweets() | |
send_tweets_to_spark(resp, conn) |
The full code :
import json | |
import requests | |
import socket | |
import sys | |
import twitter_secure as ts | |
# call the Twitter API URL and return the response for a stream of tweets | |
my_auth = ts.secure() | |
host = "localhost" | |
port = 9999 | |
def get_tweets(): | |
url = 'https://stream.twitter.com/1.1/statuses/filter.json' | |
query_data = [('language', 'en'), ('locations', '-130,-20,100,50')] | |
query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) | |
response = requests.get(query_url, auth=my_auth, stream=True) | |
return response | |
def send_tweets_to_spark(http_resp, tcp_connection): | |
for line in http_resp.iter_lines(): | |
try: | |
full_tweet = json.loads(line) | |
datetime = full_tweet['created_at'][:20] | |
tweet = full_tweet['text'] | |
print (f"---------------{datetime}--------------------------") | |
print(tweet) | |
tcp_connection.send(bytes(tweet, "utf-8")) | |
except: | |
e = sys.exc_info()[0] | |
print("Error KO000: %s" % e) | |
def main(): | |
TCP_IP = "localhost" | |
TCP_PORT = 9009 | |
conn = None | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.bind((TCP_IP, TCP_PORT)) | |
s.listen(1) | |
print("Waiting for TCP connection...") | |
conn, addr = s.accept() | |
print("Connected... Starting getting tweets.") | |
resp = get_tweets() | |
send_tweets_to_spark(resp, conn) | |
if __name__ == '__main__': | |
main() |
Spark Streaming vs. Structured Streaming
Spark give you 2 ways to process streaming data:
- Spark Streaming
- Structured Streaming
Both allow to ingested from many sources such as:
- Kafka
- API
- Kinesis
- And more
After processing them, data can be sent to final destinations:
- Filesystems
- Databases
- Live dashboards
- And more
The logic behind both is to send data by micro-batched.
We will look at Spark and Structured Streaming, and then we compare them to each other.
Spark Streaming
Spark Streaming provides a high-level abstraction called DStream (discretized stream) which represents a continuous stream of data. You can see DStream as an RDD.
DStreams provide you data divided into chunks as RDDs received from the source of streaming to be processed and, after processing, sends it to the destination.

So below, we going to analyze the code for a “continuous” word count version spark streaming:
- This function allows you to sum per state the count:
def updateCount(newCounts, state):
if state == None:
return sum(newCounts)
else:
return state + sum(newCounts)
In the main function :
- Create the Spark context :
sc = SparkContext(appName="Pyspark_Streaming_Demo")
- Spark Streaming is pretty wordy, so we going to constrain the log to only output for warnings :
sc.setLogLevel("WARN")
- Streaming will execute every 2 seconds :
ssc = StreamingContext(sc, 2)
- Receive twitter data from 9009 of your localhost :
lines = ssc.socketTextStream("localhost", 9009)
- Word count per step :
- Split sentence in individual word :
flatMap(lambda line: line.split(" "))
2. Split sentence in individual word :
flatMap(lambda line: line.split(" "))
3. Put 1 value on each word :
map(lambda x: (x, 1))
4. Add value per key :
reduceByKey(lambda a, b: a + b)
- As a stream can be infinite (so an infinite sequence of Dstream), recovery might be very expensive, so a good practice is checkpointing saves the data in your hard disk. Checkpointed must be before any actions on the RDD :
ssc.checkpoint("result/checkpoints") # Save in the "result/checkpoints" folder
- Compute a state DStream based on the previous states, updated with the values from the current batch of request counts :
totalWords = counts.updateStateByKey(lambda newCounts, state: updateCount(newCounts, state))
- Order the display per descending value :
totalWords = totalWords.transform(lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False))
The final code is below :
from pyspark import SparkContext | |
from pyspark.streaming import StreamingContext | |
def updateCount(newCounts, state): | |
if state == None: | |
return sum(newCounts) | |
else: | |
return state + sum(newCounts) | |
# DataFrame operations inside your streaming program | |
def main(): | |
sc = SparkContext(appName="Pyspark_Streaming_Demo") | |
sc.setLogLevel("WARN") | |
ssc = StreamingContext(sc, 2) #Streaming will execute in every 2 seconds | |
lines = ssc.socketTextStream("localhost", 9009) | |
# create a new RDD with one word per line | |
counts = lines.flatMap(lambda line: line.split(" ")) \ | |
.map(lambda x: (x, 1)) \ | |
.reduceByKey(lambda a, b: a + b) | |
ssc.checkpoint("result/checkpoints") | |
totalWords = counts.updateStateByKey(lambda newCounts, state: updateCount(newCounts, state)) | |
totalWords = totalWords.transform( lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False)) | |
totalWords.pprint(30) | |
ssc.start() | |
ssc.awaitTermination() | |
if __name__ == "__main__": | |
main() |
The result :

Structured Streaming
Since Spark 2.3, Structured Streaming allows you to create is a scalable and fault-tolerant streaming pipeline based on Spark SQL engine.
You can create your streaming pipeline the same way than a classic batch ETL.
You can easily use dataframe to process:
- Streaming aggregations
- event-time windows
- stream-to-batch joins
The computation is executed on the same optimized Spark SQL engine.

Below, a “continuous” word count version Spark Structured streaming:
- Create spark session and set a specific “LogLevel” and partition shuffle :
spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()
spark.sparkContext.setLogLevel('WARN')
spark.conf.set("spark.sql.shuffle.partitions",2)
- Create DataFrame based on the stream of input lines from connection to host:port :
lines = spark.readStream.format('socket').option('host', "localhost").option('port', 9009).load()
- Split the lines into words :
words = lines.select(explode(split(lines.value, ' ')).alias('word'))
- Generate a temporary view to query it afterwards :
words.createOrReplaceTempView("words_table")
- Generate running word count with this SQL Query :
wordCounts = spark.sql("""select word
, count(*) as cnt
from words_table
group by word
order by cnt desc""")
- Start running the query that prints the running counts to the console. You can set number to rows to display by setting ‘numRows’ property on writestream and the processingTime :
query = wordCounts
.writeStream
.outputMode('complete')
.format('console')
.trigger(processingTime='2 seconds')
.option("numRows",30)
.start()
- Block until query is terminated, with stop() or with error :
query.awaitTermination()
from pyspark.sql import SparkSession | |
from pyspark.sql.functions import explode | |
from pyspark.sql.functions import split | |
def main(): | |
spark = SparkSession\ | |
.builder\ | |
.appName("StructuredNetworkWordCount")\ | |
.getOrCreate() | |
spark.sparkContext.setLogLevel('WARN') | |
spark.conf.set("spark.sql.shuffle.partitions",2) | |
# Create DataFrame representing the stream of input lines from connection to host:port | |
lines = spark.readStream.format('socket').option('host', "localhost").option('port', 9009).load() | |
# Split the lines into words | |
words = lines.select(explode(split(lines.value, ' ')).alias('word')) | |
# Generate running word count | |
words.createOrReplaceTempView("words_table") | |
wordCounts = spark.sql("""select word | |
, count(*) as cnt | |
from words_table | |
group by word | |
order by cnt desc""") | |
# Start running the query that prints the running counts to the console | |
# You can set number to rows to display by setting 'numRows' property on writestream. | |
query = wordCounts\ | |
.writeStream\ | |
.outputMode('complete')\ | |
.format('console') \ | |
.trigger(processingTime='2 seconds')\ | |
.option("numRows",30)\ | |
.start() | |
query.awaitTermination() | |
if __name__ == "__main__": | |
main() |
The result :

Conclusion: When use Spark Streaming and (Spark) Structured Streaming
As RDD and DataFrame, there are the same comparison between DStream (of Spark Streaming) and Dataframe (of Structured streaming).
Contrary to DStream, DataFrame is more optimized to process complex aggregations and there are plenty of functions.
Furthermore, with Structured streaming, there are much more operations to handle late data (Handling Late Data and Watermarking for instance), which is not the case with Spark Streaming.
So, what is the best ? With DStream allows you to do low-level transformations and processing.
With Structured Streaming and DataFrames, that gives you a powerful abstraction like SQL and many custom settings for your Real-Time pipeline, as if you would work on a SQL table, which is easier to query for the majority of people.
Structured Streaming may be a better solution most of the situations and allow you much more complex transformations and data management (especially windows over event-time and late and out-of-order data).