Spark SQL Overview

Like it? Share...Share on FacebookPin on PinterestTweet about this on TwitterShare on LinkedInShare on Google+Email this to someone

What is Spark SQL and Dataframes?

Spark SQL is a Spark module which lets you process and query structured data. Spark SQL uses special type of interface called Dataset which has all features of RDD plus can store extra information in order to do optimizations. When operations are performed over structured data in Spark SQL module, results are returned as Dataset. There is a specific type of Dataset called DataFrame which is a Dataset organized into named columns. Dataframe is represented as Dataset of Rows in Scala and Java. Spark SQL provides Dataset API and Dataframe API to handle these structures.

Spark SQL can interact with multiple structured data sources using its Dataframe API. Once data is captured into dataframe, standard SQL queries can be performed on the dataframe.

Hands-on

Here I will create a basic python program that demonstrates the Spark SQL module in action. Program has necessary comments explaining different actions being taken.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#create spark session 
from pyspark import SparkSession 
spark = SparkSession.builder.\
        appName("spark sql sample").\
        config("spark.some.config", "value")\
        .getOrCreate()
 
#read tweets json into a dataframe 
df = spark.read.json("tweets.json") 
 
#show df contents 
df.show() 
# +----+-------+---------+ 
# | id| tweet  |  shares | 
# +----+-------+-------+  
# | 1 | test tweet A| 10 | 
# | 2 | test tweet B| 20 | 
# | 3 | test tweet C| 30 | 
# +----+-------+---------+

You can save this file as sparksql1.py and run it by passing it to spark-submit like this:

1
[root@sandbox ~]# /usr/hdp/2.6.0.3-8/spark2/bin/spark-submit sparksql1.py

Other actions that can be taken once dataframe is created. For example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
df.select("tweet").show() 
# +-------------+ 
# | tweet       | 
# +-------------+ 
# | test tweet A| 
# | test tweet B| 
# | test tweet C| 
# +-------------+ 
 
df.filter(df['count'] > 10).show() 
# +----+-------+---------+ 
# | id| tweet  | shares  | 
# +----+-------+---------+ 
# | 2 | test tweet B| 20 | 
# | 3 | test tweet C| 30 | 
# +----+-------+---------+
 
df.groupBy("shares").count().show() 
# +----+----------+ 
# | shares| count | 
# +----+----------+ 
# | 10    | 1     | 
# | 20    | 2     | 
# | 30    | 3     | 
# +----+----------+ 
 
#This will register df into a temporary view called test 
df.createOrReplaceTempView("tweetview")

SparkSession provides a function called sql which can be used to run SQL queries on registered tables or views.

1
2
3
4
5
6
7
8
9
sqlDF = spark.sql("SELECT * from tweetview") 
sqlDF.show() 
 # +----+----------------+ 
# | id| tweet   | shares | 
# +----+-----------------+ 
# | 1 | test tweet A| 10 | 
# | 2 | test tweet B| 20 | 
# | 3 | test tweet C| 30 | 
# +----+-----------------+

Data Sources

Let’s go through a few examples to see how Spark SQL supports various data sources.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#Parquet is the default data source used when loading a file and format can be explicitly given for other file sources. 
df = spark.read.load("tweets.parquet")  
df.select("id", "tweet").write.save("tweetsonly.parquet") 
 
df = spark.read.load("tweets.json", format="json")  
df.select("id", "tweet").write.save("tweetsfromjson.parquet", format="parquet") 
 
#Print the inferred schema 
df.printSchema() 
 
#run SQL directly on file 
df = spark.sql("SELECT * FROM parquet.`tweets.parquet`") 
 
#save table to hive example 
df.saveAsTable("Tweets") 
 
#to save table to spesific location 
df.write.option("path", "/usr/tables").saveAsTable("Tweets")

Spark SQL also supports reading and writing from Hive. In order to configure that, place hive-site.xml, core-site.xml (for security configuration), and hdfs-site.xml (for HDFS configuration) file in conf/ directory.

Below example shows how Hive support is enabled in SparkSession and how it can be used from within SparkSession.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from os.path import expanduser, join, abspath  
from pyspark.sql import SparkSession  
from pyspark.sql import Row  
 
# warehouse_location points to the default location for managed databases and tables  
warehouse_location = abspath('spark-warehouse')  
spark = SparkSession.builder.appName("Spark SQL With Hive"\ 
        .config("spark.sql.warehouse.dir",warehouse_location)\ 
        .enableHiveSupport().getOrCreate() 
 
#create table and load it with contents of a file 
spark.sql("CREATE TABLE IF NOT EXISTS attributes (id INT, value String) USING hive") 
spark.sql("LOAD DATA LOCAL INPATH 'attributes.txt' INTO TABLE attributes") 
 
#read existing hive table 
spark.sql("select * from attributes LIMIT 10").show() 
# +----+-------+ 
# | id | value | 
# +----+-------+ 
# | 11 | a.b.c | 
# | 20 | xyz.ab| 
# | 30 | test  | 
# +----+-------+

JDBC Connection

In order to connect to jdbc datasource, JDBC driver needs to be passed to spark classpath like this:

1
./pyspark --jars ./../jars/mysql-connector-java-5.1.44-bin.jar

Below example shows how to connect to MySQL and query the table.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#data load from jdbc source like mysql 
testDF = spark.read \  
.format("jdbc") \  
.option("url", "jdbc:mysql://10.0.2.2:3306") \  
.option("dbtable", "schemaabc.tabletest") \  
.option("user", "username") \  
.option("password", "password") \  
.load() 
TestDF.saveAsTable("tabletest") 
 
#data save 
testDF.write \ 
.format("jdbc") \ 
.option("url", "jdbc:mysql://10.0.2.2:3306") \ 
.option("dbtable", "schemaabc.tabletest") \ 
.option("user", "username") \ 
.option("password", "password") \ 
.save()
Like it? Share...Share on FacebookPin on PinterestTweet about this on TwitterShare on LinkedInShare on Google+Email this to someone

Leave a Reply