This is the fourth blog post which I share sample scripts of my presentation about “Apache Spark with Python“. Spark supports two different way for streaming: Discretized Streams (DStreams) and Structured Streaming. DStreams is the basic abstraction in Spark Streaming. It is a continuous sequence of RDDs representing stream of data. Structured Streaming is the newer way of streaming and it’s built on the Spark SQL engine. In next blog post, I’ll also share a sample script about Structured Streaming but today, I will demonstrate how we can use DStreams.

When you search example scripts about DStreams, you find sample codes that reads data from TCP sockets. So I decided to write a different one: My sample code will read from files located in a directory. The script will check the directory every second, and process the new CSV files it finds. Here’s the code:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext.getOrCreate()
ssc = StreamingContext(sc, 1)

stream_data = ssc.textFileStream("file:///tmp/stream")
.map( lambda x: x.split(","))

stream_data.pprint()
ssc.start()
ssc.awaitTermination()

Here are the step by step explanation of the code:

Line 1) Each Spark application needs a Spark Context object to access Spark APIs. So we start with importing SparkContext library.

Line 2) For DStreams, I import StreamingContext library.

Line 4) Then I create a Spark Context object (as “sc”) – If you will run this code in PySpark client, you should ignore importing SparkContext and creating sc object, because the SparkContext is already defined.

Line 5) I create a Streaming Context object. The second parameter indicated the interval (1 seconds) for processing streaming data.

Line 7) Using textFileStream, I set the source directory for streaming, and create a DStream object.

Line 8) This simple function parses the CSV file.

Line 10) This is the action command for the DStream object. pprint method writes the content.

Line 12) Starts the streaming process.

Line 14) Waits until the script is terminated manually.

On every second, the script will check “/tmp/stream” folder, if it finds a new file, it will process the file and write the output. For example, if we put a file which contains the following data to the folder:

Fatih,5
Cenk,4
Ahmet,3
Arda,1

The script will print:

-------------------------------------------
Time: 2018-04-18 15:05:15
-------------------------------------------
[u'Fatih', u'5']
[u'Cenk', u'4']
[u'Ahmet', u'3']
[u'Arda', u'1']

pprint is a perfect function to debug your code, but you probably want to store the streaming data to an external target (such as a Database, HDFS location). DStream object’s foreachRDD method can be used for it. Here’s another code to save the streaming data to JSON files:

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
ssc = StreamingContext(sc, 1)
stream_data = ssc.textFileStream("file:///tmp/stream")
.map( lambda x: x.split(","))

def savetheresult( rdd ):
if not rdd.isEmpty():
rdd.toDF( [ "name", "score" ] )
.write.save("points_json", format="json", mode="append")

stream_data.foreachRDD(savetheresult)
ssc.start()
ssc.awaitTermination()

Here are the step by step explanation of the code:

Line 1) Each Spark application needs a Spark Context object to access Spark APIs. So we start with importing SparkContext library.

Line 2) Because I’ll use DataFrames, I also import SparkSession library.

Line 3) For DStreams, I import StreamingContext library.

Line 5,6) I create a Spark Context object (as “sc”) and a Spark Session object (based on Spark Context) – If you will run this code in PySpark client, you should ignore these lines.

Line 7) I create a Streaming Context object. The second parameter indicated the interval (1 seconds) for processing streaming data.

Line 9) Using textFileStream, I set the source directory for streaming, and create a DStream object.

Line 10) This simple function parses the CSV file.

Line 12) I define a function accepting an RDD as parameter.

Line 13) This function will be called every second – even if there’s no streaming data, so I check if the RDD is not empty

Line 14) Convert the RDD to a DataFrame with columns “name” and “score”.

Line 15) Write the data to points_json folder as JSON files.

Line 17) Assign saveresult function for processing streaming data

Line 19) Starts the streaming process.

Line 21) Waits until the script is terminated manually.

After storing all these data in JSON format, we can run a simple script to query data:

from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

spark.sql( "select name, sum(score) from json.`points_json`
group by name order by sum(score) desc").show()

sc.stop()

In my previous blog posts, I already explained similar code, so I’ll not explain it step by step. See you on next blog post about Spark structured streaming.

 

About the Author

Gokhan Atil

Gokhan Atil has over 15 years of experience in the IT industry and a strong background in database management (Oracle 8i,9i,10g,11g), software development and UNIX systems. He is an Oracle certified professional for EBS R12, Oracle 10g and 11g. Gokhan specializes in high availability solutions, performance tuning and monitoring tools. Gokhan is a founding member and current vice president of Turkish Oracle User Group (TROUG). He’s also a member of Independent Oracle User Group (IOUG). Gokhan presented at various conferences, and he is one of co-authors of “Expert Oracle Enterprise Manager 12c” book.

Start the discussion at forums.toadworld.com