Processing tweets with Spark : Spark streaming vs Streaming structured

I’m a big fan of Spark. Nowadays, data is everywhere from databases, APIs or flat files, data exploitation is limitless !

As Data Engineer, my day-to-day is to find the best way to create efficient, scalable and fault tolerant ETLs with Apache Spark, being one of the most convenient tools to do that.

Before starting by the core subject of discussion (streaming twitter data with Spark), let’s summarize what is Spark.

What is Apache Spark ?

Created in 2009 at U.C. Berkeley, Spark is a distributed data processing engine.

Spark core is the base engine for large-scale parallel and distributed data processing.

Additional libraries that are built on top of the core enable many workloads for streaming, SQL, and machine learning.

Spark Core is responsible for fault recovery, scheduling, memory management, distributing and monitoring jobs a cluster and interacting with storage system.

Apache Spark

More than a powerful tool, its usage is various:

  • Spark SQL
  • Streaming
  • MLib
  • GraphX

It is an efficient way to process large dataset. You can use Spark in many languages:

  • Java
  • Scala (its native language)
  • Python
  • R
  • SQL

Note: I based this project on this Hanee’ Medhat Shousha’s tutorial but developed it further with :

  • The idea for Spark Streaming
  • Python functions about tweet management

Twitter API

Now let’s go to the Twitter part. To have access to the twitter API, you have to create to a developer account here.

  1. Create an application and fill the form
  2. Once you have your application created, go to your new created app and open the “Keys and Access Tokens” tab.

Your “Consumer API keys” should be already available, however, this is not the case for “Access token & access token secret”, so click on “Generate”.

3. Create a json file on your computer and paste all information into :

  • consumerKey
  • consumerSecret
  • accessTokenKey
  • accessTokenSecret

4. And below the code to create a class to twitter credential:

import json
import requests_oauthlib
def secure():
path = '.../secret/'
# Load Twitter API secrets from a json file
secrets = json.loads(open(path + 'secrets.json').read())
api_key = secrets['API_key']
api_secret_key = secrets['API_secret_key']
access_token = secrets['access_token']
access_token_secret = secrets['access_token_secret']
my_auth = requests_oauthlib.OAuth1(api_key, api_secret_key, access_token, access_token_secret)
return my_auth

Tweet Structure

Before jumping in spark, it’s important to understand the twitter JSON data structure. A tweet is represented by a JSON (so a dictionary data structure in Python). You will find more exhaustive information here

Twitter reader

  1. Get_tweets

This function allows you to connect on the API. This one of important part of the sender code, so I’m detailing the code below:

  • Official twitter API URL :
url = 'https://stream.twitter.com/1.1/statuses/filter.json'

  • API request parameters (more parameters here):
query_data = [(‘language’, ‘en’), (‘locations’, ‘-130,-20,100,50’)]
  • Create URL :
query_url = url + ‘?’ + ‘&’.join([str(t[0]) + ‘=’ + str(t[1]) for t in query_data])
  • Send get request
response = requests.get(query_url, auth=my_auth, stream=True)

The final code is below :

def get_tweets():
url = 'https://stream.twitter.com/1.1/statuses/filter.json'
query_data = [('language', 'en'), ('locations', '-130,-20,100,50')]
query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data])
response = requests.get(query_url, auth=my_auth, stream=True)
return response
view raw get_tweets.py hosted with ❤ by GitHub

2. Send_tweet_to_spark

This function sends data to spark through localhost/9009 port. For each tweet received:

In “Try” section:

  • You load your row :
full_tweet = json.loads(line)
  • In the json file, you select “datetime” and “text” element :
datetime = full_tweet['created_at'][:20]
tweet = full_tweet['text']
  • Send tweet through TCP connection :
tcp_connection.send(bytes(tweet, "utf-8"))

In except:

  • Sent system error via print :
e = sys.exc_info()[0] 
print("Error KO000: %s" % e)

The final code is below:

def send_tweets_to_spark(http_resp, tcp_connection):
for line in http_resp.iter_lines():
try:
full_tweet = json.loads(line)
datetime = full_tweet['created_at'][:20]
tweet = full_tweet['text']
print (f"---------------{datetime}--------------------------")
print(tweet)
tcp_connection.send(bytes(tweet, "utf-8"))
except:
e = sys.exc_info()[0]
print("Error : %s" % e)

3. In the main function

  • Define IP and port
TCP_IP = "localhost"
TCP_PORT = 9009
  • Create stream socket, bind ip/port and listen it. The value in parenthesis is an argument specifies the number of queued connections. Then, the tweeter interface will listen and wait for a connection with the bind ip/port and it’s the print message that inform you about its state:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((TCP_IP, TCP_PORT))
s.listen(1)
print("Waiting for TCP connection...")
  • Once a system has the bind ip/connection, the print message will display and data will be sent through the “send_tweets_to_spark” function:
conn, addr = s.accept()

print("Connected... Starting getting tweets.")

resp = get_tweets()

send_tweets_to_spark(resp, conn)

The final code is below :

def main():
TCP_IP = "localhost"
TCP_PORT = 9009
conn = None
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((TCP_IP, TCP_PORT))
s.listen(1)
print("Waiting for TCP connection...")
conn, addr = s.accept()
print("Connected... Starting getting tweets.")
resp = get_tweets()
send_tweets_to_spark(resp, conn)
view raw main.py hosted with ❤ by GitHub


The full code :

import json
import requests
import socket
import sys
import twitter_secure as ts
# call the Twitter API URL and return the response for a stream of tweets
my_auth = ts.secure()
host = "localhost"
port = 9999
def get_tweets():
url = 'https://stream.twitter.com/1.1/statuses/filter.json'
query_data = [('language', 'en'), ('locations', '-130,-20,100,50')]
query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data])
response = requests.get(query_url, auth=my_auth, stream=True)
return response
def send_tweets_to_spark(http_resp, tcp_connection):
for line in http_resp.iter_lines():
try:
full_tweet = json.loads(line)
datetime = full_tweet['created_at'][:20]
tweet = full_tweet['text']
print (f"---------------{datetime}--------------------------")
print(tweet)
tcp_connection.send(bytes(tweet, "utf-8"))
except:
e = sys.exc_info()[0]
print("Error KO000: %s" % e)
def main():
TCP_IP = "localhost"
TCP_PORT = 9009
conn = None
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((TCP_IP, TCP_PORT))
s.listen(1)
print("Waiting for TCP connection...")
conn, addr = s.accept()
print("Connected... Starting getting tweets.")
resp = get_tweets()
send_tweets_to_spark(resp, conn)
if __name__ == '__main__':
main()

Spark Streaming vs. Structured Streaming

Spark give you 2 ways to process streaming data:

  1. Spark Streaming
  2. Structured Streaming

Both allow to ingested from many sources such as:

  • Kafka
  • API
  • Kinesis
  • And more

After processing them, data can be sent to final destinations:

  • Filesystems
  • Databases
  • Live dashboards
  • And more

The logic behind both is to send data by micro-batched.

We will look at Spark and Structured Streaming, and then we compare them to each other.

Spark Streaming

Spark Streaming provides a high-level abstraction called DStream (discretized stream) which represents a continuous stream of data. You can see DStream as an RDD.

DStreams provide you data divided into chunks as RDDs received from the source of streaming to be processed and, after processing, sends it to the destination.

So below, we going to analyze the code for a “continuous” word count version spark streaming:

  • This function allows you to sum per state the count:
def updateCount(newCounts, state):
     if state == None:
         return sum(newCounts)
     else:
 
         return state + sum(newCounts)

In the main function :

  • Create the Spark context :
sc = SparkContext(appName="Pyspark_Streaming_Demo")
  • Spark Streaming is pretty wordy, so we going to constrain the log to only output for warnings :
sc.setLogLevel("WARN")
  • Streaming will execute every 2 seconds :
ssc = StreamingContext(sc, 2)
  • Receive twitter data from 9009 of your localhost :
lines = ssc.socketTextStream("localhost", 9009)

  • Word count per step :
  1. Split sentence in individual word :
flatMap(lambda line: line.split(" "))

2. Split sentence in individual word :

flatMap(lambda line: line.split(" "))

3. Put 1 value on each word :

map(lambda x: (x, 1))

4. Add value per key :

reduceByKey(lambda a, b: a + b)
  • As a stream can be infinite (so an infinite sequence of Dstream), recovery might be very expensive, so a good practice is checkpointing saves the data in your hard disk. Checkpointed must be before any actions on the RDD :
ssc.checkpoint("result/checkpoints") # Save in the "result/checkpoints" folder
  • Compute a state DStream based on the previous states, updated with the values from the current batch of request counts :
totalWords = counts.updateStateByKey(lambda newCounts, state: updateCount(newCounts, state))
  • Order the display per descending value :
totalWords = totalWords.transform(lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False))

The final code is below :

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def updateCount(newCounts, state):
if state == None:
return sum(newCounts)
else:
return state + sum(newCounts)
# DataFrame operations inside your streaming program
def main():
sc = SparkContext(appName="Pyspark_Streaming_Demo")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 2) #Streaming will execute in every 2 seconds
lines = ssc.socketTextStream("localhost", 9009)
# create a new RDD with one word per line
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda x: (x, 1)) \
.reduceByKey(lambda a, b: a + b)
ssc.checkpoint("result/checkpoints")
totalWords = counts.updateStateByKey(lambda newCounts, state: updateCount(newCounts, state))
totalWords = totalWords.transform( lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False))
totalWords.pprint(30)
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()

The result :

Structured Streaming

Since Spark 2.3, Structured Streaming allows you to create is a scalable and fault-tolerant streaming pipeline based on Spark SQL engine.

You can create your streaming pipeline the same way than a classic batch ETL.

You can easily use dataframe to process:

  • Streaming aggregations
  • event-time windows
  • stream-to-batch joins

The computation is executed on the same optimized Spark SQL engine.

Below, a “continuous” word count version Spark Structured streaming:

  • Create spark session and set a specific “LogLevel” and partition shuffle :



spark = SparkSession
         .builder
         .appName("StructuredNetworkWordCount")
         .getOrCreate()
     spark.sparkContext.setLogLevel('WARN')
     spark.conf.set("spark.sql.shuffle.partitions",2)
  • Create DataFrame based on the stream of input lines from connection to host:port :
lines = spark.readStream.format('socket').option('host', "localhost").option('port', 9009).load()
  • Split the lines into words :
words = lines.select(explode(split(lines.value, ' ')).alias('word'))
  • Generate a temporary view to query it afterwards :
words.createOrReplaceTempView("words_table")
  • Generate running word count with this SQL Query :
wordCounts = spark.sql("""select word
                          , count(*) as cnt
                          from words_table
                          group by word
                          order by cnt desc""")
  • Start running the query that prints the running counts to the console. You can set number to rows to display by setting ‘numRows’ property on writestream and the processingTime :
query = wordCounts
        .writeStream
        .outputMode('complete')
        .format('console') 
        .trigger(processingTime='2 seconds')
        .option("numRows",30)
        .start()
  • Block until query is terminated, with stop() or with error :
query.awaitTermination()

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
def main():
spark = SparkSession\
.builder\
.appName("StructuredNetworkWordCount")\
.getOrCreate()
spark.sparkContext.setLogLevel('WARN')
spark.conf.set("spark.sql.shuffle.partitions",2)
# Create DataFrame representing the stream of input lines from connection to host:port
lines = spark.readStream.format('socket').option('host', "localhost").option('port', 9009).load()
# Split the lines into words
words = lines.select(explode(split(lines.value, ' ')).alias('word'))
# Generate running word count
words.createOrReplaceTempView("words_table")
wordCounts = spark.sql("""select word
, count(*) as cnt
from words_table
group by word
order by cnt desc""")
# Start running the query that prints the running counts to the console
# You can set number to rows to display by setting 'numRows' property on writestream.
query = wordCounts\
.writeStream\
.outputMode('complete')\
.format('console') \
.trigger(processingTime='2 seconds')\
.option("numRows",30)\
.start()
query.awaitTermination()
if __name__ == "__main__":
main()

The result :

Conclusion: When use Spark Streaming and (Spark) Structured Streaming

As RDD and DataFrame, there are the same comparison between DStream (of Spark Streaming) and Dataframe (of Structured streaming).

Contrary to DStream, DataFrame is more optimized to process complex aggregations and there are plenty of functions.

Furthermore, with Structured streaming, there are much more operations to handle late data (Handling Late Data and Watermarking for instance), which is not the case with Spark Streaming.

So, what is the best ? With DStream allows you to do low-level transformations and processing.

With Structured Streaming and DataFrames, that gives you a powerful abstraction like SQL and many custom settings for your Real-Time pipeline, as if you would work on a SQL table, which is easier to query for the majority of people.

Structured Streaming may be a better solution most of the situations and allow you much more complex transformations and data management (especially windows over event-time and late and out-of-order data).

Laisser un commentaire

Entrez vos coordonnées ci-dessous ou cliquez sur une icône pour vous connecter:

Logo WordPress.com

Vous commentez à l’aide de votre compte WordPress.com. Déconnexion /  Changer )

Image Twitter

Vous commentez à l’aide de votre compte Twitter. Déconnexion /  Changer )

Photo Facebook

Vous commentez à l’aide de votre compte Facebook. Déconnexion /  Changer )

Connexion à %s