In this blog post, I’ll share example #3 and #4 from my presentation to demonstrate capabilities of Spark SQL Module. As I already explained in my previous blog posts, Spark SQL Module provides DataFrames (and DataSets – but Python doesn’t support DataSets because it’s a dynamically typed language) to work with structured data.

First, let’s start creating a temporary table from a CSV file and run query on it. Like I did my previous blog posts, I use the “u.user” file file of MovieLens 100K Data (I save it as users.csv).

from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
spark.read.load( "users.csv", format="csv", sep="|" )
.toDF( "id","age","gender","occupation","zip" )
.createOrReplaceTempView( "users" )
spark.sql( "select gender, count(*) from users group by gender" ).show()
sc.stop()

Line 1) Each Spark application needs a Spark Context object to access Spark APIs. So we start with importing SparkContext library.

Line 2) Because I’ll use DataFrames, I also import SparkSession library.

Line 4) I create a Spark Context object (as “sc”)

Line 5) I create a Spark Session object (based on Spark Context) – If you will run this code in PySpark client or in a notebook such as Zeppelin, you should ignore these steps (importing SparkContext, SparkSession and creating sc and spark objects), because the they are already defined. You should also ignore the last line because you don’t need to stop the Spark context.

Line 7) I use DataFrameReader object of spark (spark.read) to load CSV data. As you can see, I don’t need to write a mapper to parse the CSV file.

Line 8) If the CSV file has headers, DataFrameReader can use them but our sample CSV has no headers so I give the column names.

Line 9) Using “createOrReplaceTempView” method, I register my data as a temporary view.

Line 11) I run SQL to query my temporary view using Spark Sessions sql method. The result is a dataframe so I can use show method to print the result.

Line 13) sc.stop will stop the context – as I said it’s not necessary for pyspark client or notebooks such as Zeppelin.

When I check the tables with “show tables”, I see that users table is temporary, so when our session(job) is done, the table will be gone. What if we want to store our users data as persistent? If our Spark environment is already configured to connect Hive, we can use DataFrameWriter object’s “saveAsTable” method. We can also save the file as parquet table, CSV file or JSON file.

from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

df = spark.read.load( "users.csv", format="csv", sep="|" )
.toDF( "id","age","gender","occupation","zip" )

df.write.saveAsTable( "users", mode="overwrite" )

df.write.save("users_json", format="json", mode="overwrite")

df.write.save("users_parquet", format="parquet", mode="overwrite")

df.write.save("users_csv", format="csv", mode="overwrite")

spark.sql("SELECT gender, count(*) FROM
json.`users_json` GROUP BY gender").show()

sc.stop()

Line 1-5,20) I already explained them in previous code.

Line 7) I use DataFrameReader object of spark (spark.read) to load CSV data. The result will be stored in df (a DataFrame object)

Line 8) If the CSV file has headers, DataFrameReader can use them but our sample CSV has no headers so I give the column names.

Line 10) I use saveAsTable method of DataFrameWriter (write property of a DataFrame) to save the data directly to Hive. The “mode” parameter lets me overwrite the table if it already exists.

Line 12) I save data as JSON files in “users_json” directory.

Line 14) I save data as JSON parquet in “users_parquet” directory.

Line 16) I save data as CSV files in “users_csv” directory.

Line 18) Spark SQL’s direct read capabilities is incredible. You can directly run SQL queries on supported files (JSON, CSV, parquet). Because I selected a JSON file for my example, I did not need to name the columns. The column names are automatically generated from JSON file.

Spark SQL module also enables you to access a variety of data sources, including Hive, Avro, Parquet, ORC, JSON, and JDBC. You can even join data from different data sources. Please read my blog post about joining data from CSV And MySQL table to understand JDBC connectivity with Spark SQL Module.

If you have Oracle Cloud account, you can download and import the example notebook to test the scripts.

 

About the Author

Gokhan Atil

Gokhan Atil has over 15 years of experience in the IT industry and a strong background in database management (Oracle 8i,9i,10g,11g), software development and UNIX systems. He is an Oracle certified professional for EBS R12, Oracle 10g and 11g. Gokhan specializes in high availability solutions, performance tuning and monitoring tools. Gokhan is a founding member and current vice president of Turkish Oracle User Group (TROUG). He’s also a member of Independent Oracle User Group (IOUG). Gokhan presented at various conferences, and he is one of co-authors of “Expert Oracle Enterprise Manager 12c” book.

Start the discussion at forums.toadworld.com