Spark Tutorials

By Nadim Bahadoor | Last updated: January 1, 2020 at 17:58 pm

In this section, we will show how to use Apache Spark using IntelliJ IDE and Scala. The Apache Spark eco-system is moving at a fast pace and the tutorial will demonstrate the features of the latest Apache Spark 2 version.

 

If you are not familiar with IntelliJ and Scala, feel free to review our previous tutorials on IntelliJ and Scala.  So let's get started!

 

Source Code:

Project Setup:

DataFrame SQL Query:

Spark SQL:

DataFrame Statistics:

DataFrame Operations:

Spark Functions:

More examples on the way ... stay tuned!

 

StackOverflow dataset

We will make use of the open-sourced StackOverflow dataset. We've cut down each dataset to just 10K line items for the purpose of showing how to use Apache Spark DataFrame and Apache Spark SQL.

 

The first dataset is called question_tags_10K.csv and it has the following data columns:


Id,Tag
1,data
4,c#
4,winforms
4,type-conversion
4,decimal
4,opacity
6,html
6,css
6,css3

 

The second dataset is called questions_10K.csv and it has the following data columns:


Id,CreationDate,ClosedDate,DeletionDate,Score,OwnerUserId,AnswerCount
1,2008-07-31T21:26:37Z,NA,2011-03-28T00:53:47Z,1,NA,0
4,2008-07-31T21:42:52Z,NA,NA,472,8,13
6,2008-07-31T22:08:08Z,NA,NA,210,9,5
8,2008-07-31T23:33:19Z,2013-06-03T04:00:25Z,2015-02-11T08:26:40Z,42,NA,8
9,2008-07-31T23:40:59Z,NA,NA,1452,1,58
11,2008-07-31T23:55:37Z,NA,NA,1154,1,33
13,2008-08-01T00:42:38Z,NA,NA,464,9,25
14,2008-08-01T00:59:11Z,NA,NA,296,11,8
16,2008-08-01T04:59:33Z,NA,NA,84,2,5

 

We will put both of these datasets under the resources directory - see GitHub source code.

 

Add Apache Spark 2 SBT dependencies

In our build.sbt file, we need to tell SBT to import the Apache Spark 2 dependencies as shown below. You can learn more about importing SBT dependencies from this tutorial.


name := "learn-spark"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.spark"  %%  "spark-core"    % "2.2.0"   % "provided",
  "org.apache.spark"  %%  "spark-sql"     % "2.2.0",
  "org.apache.spark"  %%  "spark-mllib"   % "2.2.0"
)

 

Bootstrap a SparkSession

To connect to a Spark cluster, you need to create a spark session and we will encapsulate this behaviour into a simple trait. For more details on traits, refer to the Chapter on Scala Traits.


trait Context {

  lazy val sparkConf = new SparkConf()
    .setAppName("Learn Spark")
    .setMaster("local[*]")
    .set("spark.cores.max", "2")

  lazy val sparkSession = SparkSession
    .builder()
    .config(sparkConf)
    .getOrCreate()
}

NOTE:

  • A SparkSession takes a SparkConf where we've specified a name for our Spark application, the Spark master which is our local node and also have limited the use of only 2 cores.
  • For additional configuration properties for SparkConf, see the official Apache Spark documentation.



DataFrame introduction

The examples in this section will make use of the Context trait which we've created in Bootstrap a SparkSession. By extending the Context trait, we will have access to a SparkSession.


object DataFrame_Tutorial extends App with Context {

}

 

Create a DataFrame from reading a CSV file

To create a DataFrame from reading a CSV file we will make use of the SparkSession and call the read method. Since the CSV file question_tags_10K.csv has two columns id and tag, we call the toDF() method.

 

To visually inspect some of the data points from our dataframe, we call the method show(10) which will print only 10 line items to the console.


  // Create a DataFrame from reading a CSV file
  val dfTags = sparkSession
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("src/main/resources/question_tags_10K.csv")
    .toDF("id", "tag")

  dfTags.show(10)

You should see the following output when you run your Scala application in IntelliJ:


+---+-------------------+
| id|                tag|
+---+-------------------+
|  1|               data|
|  4|                 c#|
|  4|           winforms|
|  4|    type-conversion|
|  4|            decimal|
|  4|            opacity|
|  6|               html|
|  6|                css|
|  6|               css3|
|  6|internet-explorer-7|
+---+-------------------+
only showing top 10 rows

 

When creating the dfTags DataFrame, we specified the option to infer schema using: option("inferSchema", "true")  This essentially instructs Spark to automatically infer the data type for each column when reading the CSV file question_tags_10K.csv 

 

To show the dataframe schema which was inferred by Spark, you can call the method printSchema() on the dataframe dfTags.


  // Print DataFrame schema
  dfTags.printSchema()

You should see the following output when you run your Scala application in IntelliJ:


root
 |-- id: integer (nullable = true)
 |-- tag: string (nullable = true)

 

NOTE:

  • Spark correctly inferred that the id column is of integer datatype and the tag column is of string type.

DataFrame Query: select columns from a dataframe

To select specific columns from a dataframe, you can use the select() method and pass in the columns which you want to select.


  // Query dataframe: select columns from a dataframe
  dfTags.select("id", "tag").show(10)

You should see the following output when you run your Scala application in IntelliJ:


+---+-------------------+
| id|                tag|
+---+-------------------+
|  1|               data|
|  4|                 c#|
|  4|           winforms|
|  4|    type-conversion|
|  4|            decimal|
|  4|            opacity|
|  6|               html|
|  6|                css|
|  6|               css3|
|  6|internet-explorer-7|
+---+-------------------+
only showing top 10 rows

 

DataFrame Query: filter by column value of a dataframe

To find all rows matching a specific column value, you can use the filter() method of a dataframe. For example, let's find all rows where the tag column has a value of php.


  // DataFrame Query: filter by column value of a dataframe
  dfTags.filter("tag == 'php'").show(10)

You should see the following output when you run your Scala application in IntelliJ:


+---+---+
| id|tag|
+---+---+
| 23|php|
| 42|php|
| 85|php|
|126|php|
|146|php|
|227|php|
|249|php|
|328|php|
|588|php|
|657|php|
+---+---+
only showing top 10 rows

 

DataFrame Query: count rows of a dataframe

To count the number of rows in a dataframe, you can use the count() method. Note also that you can chain Spark DataFrame's method. As an example, let's count the number of php tags in our dataframe dfTags.


  // DataFrame Query: count rows of a dataframe
  println(s"Number of php tags = ${ dfTags.filter("tag == 'php'").count() }")

You should see the following output when you run your Scala application in IntelliJ:


Number of php tags = 133

NOTE:

  • We've chained the filter() and count() methods.

DataFrame Query: SQL like query

We've already seen that you can query a dataframe column and find an exact value match using the filter() method. In addition to finding the exact value, you can also query a dataframe column's value using a familiar SQL like clause.

 

As an example, let us find all tags whose value start with the letter s.


  // DataFrame Query: SQL like query
  dfTags.filter("tag like 's%'").show(10)

You should see the following output when you run your Scala application in IntelliJ:


+---+-------------+
| id|          tag|
+---+-------------+
| 25|      sockets|
| 36|          sql|
| 36|   sql-server|
| 40| structuremap|
| 48|submit-button|
| 79|          svn|
| 79|    subclipse|
| 85|          sql|
| 90|          svn|
|108|          svn|
+---+-------------+
only showing top 10 rows

 

DataFrame Query: Multiple filter chaining

From our previous examples, you should already be aware that Spark allows you to chain multiple dataframe operations. With that in mind, let us expand the previous example and add one more filter() method.

 

Our query below will find all tags whose value starts with letter s and then only pick id 25 or 108.


  // DataFrame Query: Multiple filter chaining
  dfTags
    .filter("tag like 's%'")
    .filter("id == 25 or id == 108")
    .show(10)

You should see the following output when you run your Scala application in IntelliJ:


+---+-------+
| id|    tag|
+---+-------+
| 25|sockets|
|108|    svn|
+---+-------+

 

DataFrame Query: SQL IN clause

In the previous example, we saw how to slice our data using the OR clause to include only id 25 or 108 using .filter("id == 25 or id == 108"). Similarly, we can make use of a SQL IN clause to find all tags whose ids are equal to (25, 108).


  // DataFrame Query: SQL IN clause
  dfTags.filter("id in (25, 108)").show(10)

You should see the following output when you run your Scala application in IntelliJ:


+---+---------+
| id|      tag|
+---+---------+
| 25|      c++|
| 25|        c|
| 25|  sockets|
| 25|mainframe|
| 25|      zos|
|108|  windows|
|108|      svn|
|108|    64bit|
+---+---------+

 

DataFrame Query: SQL Group By

We can use the groupBy() method on a dataframe to execute a similar SQL group by query. As an example, let us find out how many rows match each tag in our dataframe dfTags.


  // DataFrame Query: SQL Group By
  println("Group by tag value")
  dfTags.groupBy("tag").count().show(10)

You should see the following output when you run your Scala application in IntelliJ:


+--------------------+-----+
|                 tag|count|
+--------------------+-----+
|         type-safety|    4|
|             jbutton|    1|
|              iframe|    2|
|           svn-hooks|    2|
|           standards|    7|
|knowledge-management|    2|
|            trayicon|    1|
|           arguments|    1|
|                 zfs|    1|
|              import|    3|
+--------------------+-----+
only showing top 10 rows

 

DataFrame Query: SQL Group By with filter

We can further expand the previous group by example and only display tags that have more than 5 matching rows.


  // DataFrame Query: SQL Group By with filter
  dfTags.groupBy("tag").count().filter("count > 5").show(10)

You should see the following output when you run your Scala application in IntelliJ:


+----------------+-----+
|             tag|count|
+----------------+-----+
|       standards|    7|
|        keyboard|    8|
|             rss|   12|
|   documentation|   15|
|         session|    6|
|build-automation|    9|
|            unix|   34|
|          iphone|   16|
|             xss|    6|
| database-design|   12|
+----------------+-----+
only showing top 10 rows

 

DataFrame Query: SQL order by

To complete the previous example which was a group by query along with a count, let us also sort the final results by adding an order by clause.


  // DataFrame Query: SQL order by
  dfTags.groupBy("tag").count().filter("count > 5").orderBy("tag").show(10)

You should see the following output when you run your Scala application in IntelliJ:


+----------------+-----+
|             tag|count|
+----------------+-----+
|            .net|  351|
|        .net-2.0|   14|
|        .net-3.5|   30|
|           64bit|    7|
|  actionscript-3|   22|
|active-directory|   10|
|         ado.net|   11|
|           adobe|    7|
|           agile|    8|
|             air|   11|
+----------------+-----+
only showing top 10 rows

 

DataFrame Query: Cast columns to specific data type

So far, we've been using our dataframe dfTags which we read from StackOverflow's question_tags_10K.csv file. Let us now read the second StackOverflow file questions_10K.csv using a similar approach as we did for reading the tags file.


 // DataFrame Query: Cast columns to specific data type
 val dfQuestionsCSV = sparkSession
 .read
 .option("header", "true")
 .option("inferSchema", "true")
 .option("dateFormat","yyyy-MM-dd HH:mm:ss")
 .csv("src/main/resources/questions_10K.csv")
 .toDF("id", "creation_date", "closed_date", "deletion_date", "score", "owner_userid", "answer_count")

 dfQuestionsCSV.printSchema()

You should see the following output when you run your Scala application in IntelliJ:


root
 |-- id: integer (nullable = true)
 |-- creation_date: timestamp (nullable = true)
 |-- closed_date: string (nullable = true)
 |-- deletion_date: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- owner_userid: string (nullable = true)
 |-- answer_count: string (nullable = true)

NOTE:

  • Although we've passed in the inferSchema option, Spark did not fully match the data type for some of our columns. Column closed_date is of type string and so is column owner_userid and answer_count.
  • There are a few ways to be explicit about our column data types and for now we will show how to explicitly using the cast feature for the date fields.

 val dfQuestions = dfQuestionsCSV.select(
 dfQuestionsCSV.col("id").cast("integer"),
 dfQuestionsCSV.col("creation_date").cast("timestamp"),
 dfQuestionsCSV.col("closed_date").cast("timestamp"),
 dfQuestionsCSV.col("deletion_date").cast("date"),
 dfQuestionsCSV.col("score").cast("integer"),
 dfQuestionsCSV.col("owner_userid").cast("integer"),
 dfQuestionsCSV.col("answer_count").cast("integer")
 )

 dfQuestions.printSchema()
 dfQuestions.show(10)

You should see the following output when you run your Scala application in IntelliJ:


root
 |-- id: integer (nullable = true)
 |-- creation_date: timestamp (nullable = true)
 |-- closed_date: timestamp (nullable = true)
 |-- deletion_date: date (nullable = true)
 |-- score: integer (nullable = true)
 |-- owner_userid: integer (nullable = true)
 |-- answer_count: integer (nullable = true)

+---+-------------------+-------------------+-------------+-----+------------+------------+
| id|      creation_date|        closed_date|deletion_date|score|owner_userid|answer_count|
+---+-------------------+-------------------+-------------+-----+------------+------------+
| Id|               null|               null|         null|Score| OwnerUserId| AnswerCount|
|  1|2008-07-31 22:26:37|               null|   2011-03-28|    1|          NA|           0|
|  4|2008-07-31 22:42:52|               null|         null|  472|           8|          13|
|  6|2008-07-31 23:08:08|               null|         null|  210|           9|           5|
|  8|2008-08-01 00:33:19|2013-06-03 05:00:25|   2015-02-11|   42|          NA|           8|
|  9|2008-08-01 00:40:59|               null|         null| 1452|           1|          58|
| 11|2008-08-01 00:55:37|               null|         null| 1154|           1|          33|
| 13|2008-08-01 01:42:38|               null|         null|  464|           9|          25|
| 14|2008-08-01 01:59:11|               null|         null|  296|          11|           8|
| 16|2008-08-01 05:59:33|               null|         null|   84|           2|           5|
+---+-------------------+-------------------+-------------+-----+------------+------------+
only showing top 10 rows

NOTE:

  • All our columns for the questions dataframe now seem sensible with columns id, score, owner_userid and answer_count mapped to integer type, columns creation_date and closed_date are of type timestamp and deletion_date is of type date.

DataFrame Query: Operate on a filtered dataframe

The previous dfQuestions dataframe is great but perhaps we'd like to work with just a subset of the questions data. As an example, let us use our familiar filter() method to slice our dataframe dfQuestions with rows where the score is greater than 400 and less than 410.

 

You should already be familiar with such filtering from the previous examples. However, if you look closely, you would notice that we can in fact assign the filter() operation to a val of type dataframe! We will use the dfQuestionsSubset dataframe to show how to execute join queries by joining it with the dfTags dataframe.


  // DataFrame Query: Operate on a sliced dataframe
  val dfQuestionsSubset = dfQuestions.filter("score > 400 and score < 410").toDF()
  dfQuestionsSubset.show()

You should see the following output when you run your Scala application in IntelliJ:


+-----+-------------------+-------------------+-------------+-----+------------+------------+
|   id|      creation_date|        closed_date|deletion_date|score|owner_userid|answer_count|
+-----+-------------------+-------------------+-------------+-----+------------+------------+
|  888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|
| 1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|
| 3881|2008-08-06 20:26:30|2016-09-23 14:34:31|         null|  402|         122|          27|
|16100|2008-08-19 13:51:55|               null|         null|  406|         203|          19|
|28098|2008-08-26 14:56:49|               null|         null|  403|        2680|          23|
|28637|2008-08-26 18:09:45|               null|         null|  401|        2469|          15|
|41479|2008-09-03 12:29:57|               null|         null|  406|        3394|          86|
|50467|2008-09-08 20:21:19|               null|         null|  402|        1967|          34|
|56628|2008-09-11 15:08:11|               null|         null|  403|        5469|          19|
|64860|2008-09-15 18:21:31|               null|         null|  402|        2948|          12|
+-----+-------------------+-------------------+-------------+-----+------------+------------+

 

DataFrame Query: Join

We will now make use of the previous dfQuestionsSubset dataframe. In this example, we will join the dataframe dfQuestionsSubset with the tags dataframe dfTags by the id column.


  // DataFrame Query: Join
  dfQuestionsSubset.join(dfTags, "id").show(10)

You should see the following output when you run your Scala application in IntelliJ:


|  id|      creation_date|        closed_date|deletion_date|score|owner_userid|answer_count|                 tag|
+----+-------------------+-------------------+-------------+-----+------------+------------+--------------------+
| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|              xdebug|
| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|            phpstorm|
| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|           debugging|
| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|             eclipse|
| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|                 php|
|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|                 osx|
|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|                 ios|
|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|         objective-c|
|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|              iphone|
|3881|2008-08-06 20:26:30|2016-09-23 14:34:31|         null|  402|         122|          27|illegalargumentex...|
+----+-------------------+-------------------+-------------+-----+------------+------------+--------------------+
only showing top 10 rows

 

DataFrame Query: Join and select columns

To follow up on the previous example, you can chain the select() method after the join() in order to only display certain columns.


  // DataFrame Query: Join and select columns
  dfQuestionsSubset
    .join(dfTags, "id")
    .select("owner_userid", "tag", "creation_date", "score")
    .show(10)

You should see the following output when you run your Scala application in IntelliJ:


|owner_userid|                 tag|      creation_date|score|
+------------+--------------------+-------------------+-----+
|         131|              xdebug|2008-08-04 00:18:21|  405|
|         131|            phpstorm|2008-08-04 00:18:21|  405|
|         131|           debugging|2008-08-04 00:18:21|  405|
|         131|             eclipse|2008-08-04 00:18:21|  405|
|         131|                 php|2008-08-04 00:18:21|  405|
|        null|                 osx|2008-08-05 06:39:36|  408|
|        null|                 ios|2008-08-05 06:39:36|  408|
|        null|         objective-c|2008-08-05 06:39:36|  408|
|        null|              iphone|2008-08-05 06:39:36|  408|
|         122|illegalargumentex...|2008-08-06 20:26:30|  402|
+------------+--------------------+-------------------+-----+
only showing top 10 rows

 

DataFrame Query: Join on explicit columns

join() operation will join two dataframes based on some common column which in the previous example was the column id from dfTags and dfQuestionsSubset. But, what if the column to join to had different names? In such a case, you can explicitly specify the column from each dataframe on which to join.


  // DataFrame Query: Join on explicit columns
  dfQuestionsSubset
    .join(dfTags, dfTags("id") === dfQuestionsSubset("id"))
    .show(10)

You should see the following output when you run your Scala application in IntelliJ:


+----+-------------------+-------------------+-------------+-----+------------+------------+----+--------------------+
|  id|      creation_date|        closed_date|deletion_date|score|owner_userid|answer_count|  id|                 tag|
+----+-------------------+-------------------+-------------+-----+------------+------------+----+--------------------+
| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30| 888|              xdebug|
| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30| 888|            phpstorm|
| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30| 888|           debugging|
| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30| 888|             eclipse|
| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30| 888|                 php|
|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|1939|                 osx|
|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|1939|                 ios|
|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|1939|         objective-c|
|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|1939|              iphone|
|3881|2008-08-06 20:26:30|2016-09-23 14:34:31|         null|  402|         122|          27|3881|illegalargumentex...|
+----+-------------------+-------------------+-------------+-----+------------+------------+----+--------------------+
only showing top 10 rows

 

DataFrame Query: Inner Join

Spark supports various types of joins namely: inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, left_anti. These are specified in the official Apache Spark Documentation. In this example, we will show how to use the inner join type.


  // DataFrame Query: Inner Join
  dfQuestionsSubset
    .join(dfTags, Seq("id"), "inner")
    .show(10)

You should see the following output when you run your Scala application in IntelliJ:


+----+-------------------+-------------------+-------------+-----+------------+------------+--------------------+
|  id|      creation_date|        closed_date|deletion_date|score|owner_userid|answer_count|                 tag|
+----+-------------------+-------------------+-------------+-----+------------+------------+--------------------+
| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|              xdebug|
| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|            phpstorm|
| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|           debugging|
| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|             eclipse|
| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|                 php|
|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|                 osx|
|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|                 ios|
|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|         objective-c|
|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|              iphone|
|3881|2008-08-06 20:26:30|2016-09-23 14:34:31|         null|  402|         122|          27|illegalargumentex...|
+----+-------------------+-------------------+-------------+-----+------------+------------+--------------------+
only showing top 10 rows

 

DataFrame Query: Left Outer Join

Following on from the previous inner join example, the code below shows how to perform a left outer join in Apache Spark.


  // DataFrame Query: Left Outer Join
  dfQuestionsSubset
    .join(dfTags, Seq("id"), "left_outer")
    .show(10)

You should see the following output when you run your Scala application in IntelliJ:


+----+-------------------+-------------------+-------------+-----+------------+------------+--------------------+
|  id|      creation_date|        closed_date|deletion_date|score|owner_userid|answer_count|                 tag|
+----+-------------------+-------------------+-------------+-----+------------+------------+--------------------+
| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|              xdebug|
| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|            phpstorm|
| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|           debugging|
| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|             eclipse|
| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|                 php|
|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|                 osx|
|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|                 ios|
|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|         objective-c|
|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|              iphone|
|3881|2008-08-06 20:26:30|2016-09-23 14:34:31|         null|  402|         122|          27|illegalargumentex...|
+----+-------------------+-------------------+-------------+-----+------------+------------+--------------------+
only showing top 10 rows

 

DataFrame Query: Right Outer Join

As mentioned in the previous join examples, Apache Spark supports a number of join types as listed in the official Apache Spark Documentation. Let's show one more join type which is the right outer join. Note that we've swapped the dataframes ordering for the right outer join by joining dfTags with dfQuestionsSubset.


  // DataFrame Query: Right Outer Join
  dfTags
    .join(dfQuestionsSubset, Seq("id"), "right_outer")
    .show(10)

You should see the following output when you run your Scala application in IntelliJ:


+----+--------------------+-------------------+-------------------+-------------+-----+------------+------------+
|  id|                 tag|      creation_date|        closed_date|deletion_date|score|owner_userid|answer_count|
+----+--------------------+-------------------+-------------------+-------------+-----+------------+------------+
| 888|              xdebug|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|
| 888|            phpstorm|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|
| 888|           debugging|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|
| 888|             eclipse|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|
| 888|                 php|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|
|1939|                 osx|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|
|1939|                 ios|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|
|1939|         objective-c|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|
|1939|              iphone|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|
|3881|illegalargumentex...|2008-08-06 20:26:30|2016-09-23 14:34:31|         null|  402|         122|          27|
+----+--------------------+-------------------+-------------------+-------------+-----+------------+------------+
only showing top 10 rows

 

DataFrame Query: Distinct

With the dataframe dfTags in scope, we can find the unique values in the tag column by using the distinct() method.


  // DataFrame Query: Distinct
  dfTags
    .select("tag")
    .distinct()
    .show(10)

You should see the following output when you run your Scala application in IntelliJ:


+--------------------+
|                 tag|
+--------------------+
|         type-safety|
|             jbutton|
|              iframe|
|           svn-hooks|
|           standards|
|knowledge-management|
|            trayicon|
|           arguments|
|                 zfs|
|              import|
+--------------------+
only showing top 10 rows


Spark SQL Introduction

In this section, we will show how to use Apache Spark SQL which brings you much closer to an SQL style query similar to using a relational database. We will once more reuse the Context trait which we created in Bootstrap a SparkSession so that we can have access to a SparkSession.


object SparkSQL_Tutorial extends App with Context {

}

 

Register temp table from dataframe

By now you should be familiar with how to create a dataframe from reading a csv file. The code below will first create a dataframe for the StackOverflow question_tags_10K.csv file which we will name dfTags.  But instead of operating directly on the dataframe dfTags, we will register it as a temporary table in Spark's catalog and name the table so_tags.


  // Register temp table
  val dfTags = sparkSession
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("src/main/resources/question_tags_10K.csv")
    .toDF("id", "tag")

  dfTags.createOrReplaceTempView("so_tags")

 

List all tables in Spark's catalog

To verify that the temporary table so_tags has been in fact registered into Spark's catalog, you can access and call catalog related methods on the SparkSession as follows:


  // List all tables in Spark's catalog
  sparkSession.catalog.listTables().show()

You should see the following output when you run your Scala application in IntelliJ:


+-------+--------+-----------+---------+-----------+
|   name|database|description|tableType|isTemporary|
+-------+--------+-----------+---------+-----------+
|so_tags|    null|       null|TEMPORARY|       true|
+-------+--------+-----------+---------+-----------+

 

List catalog tables using Spark SQL

To issue SQL like queries, you can make use of the sql()  method on the SparkSession and pass in a query string. Let's redo the previous example and list all tables in Spark's catalog using Spark SQL query.


  // List all tables in Spark's catalog using Spark SQL
  sparkSession.sql("show tables").show()

You should see the following output when you run your Scala application in IntelliJ:


+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|        |  so_tags|       true|
+--------+---------+-----------+

 

Select columns

In the previous example, we showed how to use the sql() method to issue SQL style queries. We will now re-write the dataframe queries using Spark SQL.


  // Select columns
  sparkSession
    .sql("select id, tag from so_tags limit 10")
    .show()

You should see the following output when you run your Scala application in IntelliJ:


+---+-------------------+
| id|                tag|
+---+-------------------+
|  1|               data|
|  4|                 c#|
|  4|           winforms|
|  4|    type-conversion|
|  4|            decimal|
|  4|            opacity|
|  6|               html|
|  6|                css|
|  6|               css3|
|  6|internet-explorer-7|
+---+-------------------+

NOTE:

Filter by column value

In the DataFrame SQL query, we showed how to filter a dataframe by a column value. We can re-write the example using Spark SQL as shown below.


  // Filter by column value
  sparkSession
    .sql("select * from so_tags where tag = 'php'")
    .show(10)

You should see the following output when you run your Scala application in IntelliJ:


+---+---+
| id|tag|
+---+---+
| 23|php|
| 42|php|
| 85|php|
|126|php|
|146|php|
|227|php|
|249|php|
|328|php|
|588|php|
|657|php|
+---+---+
only showing top 10 rows

 

Count number of rows

In the DataFrame SQL query, we showed how to count rows of a dataframe. We can re-write the count number of php tags example using Spark SQL as shown below.


  // Count number of rows
  sparkSession
    .sql(
      """select
        |count(*) as php_count
        |from so_tags where tag='php'""".stripMargin)
    .show(10)

You should see the following output when you run your Scala application in IntelliJ:


+---------+
|php_count|
+---------+
|      133|
+---------+

 

SQL like

In the DataFrame SQL query, we showed how to issue SQL like query. We can re-write the dataframe like query to find all tags which start with the letter s using Spark SQL as shown below.


  // SQL like
  sparkSession
    .sql(
      """select *
        |from so_tags
        |where tag like 's%'""".stripMargin)
    .show(10)

You should see the following output when you run your Scala application in IntelliJ:


+---+-------------+
| id|          tag|
+---+-------------+
| 25|      sockets|
| 36|          sql|
| 36|   sql-server|
| 40| structuremap|
| 48|submit-button|
| 79|          svn|
| 79|    subclipse|
| 85|          sql|
| 90|          svn|
|108|          svn|
+---+-------------+
only showing top 10 rows

 

SQL where with and clause

In the DataFrame SQL query, we showed how to chain multiple filters on a dataframe. We can re-write the dataframe filter for tags starting the letter s and whose id is either 25 or 108 using Spark SQL as shown below.


  // SQL where with and clause
  sparkSession
    .sql(
      """select *
        |from so_tags
        |where tag like 's%'
        |and (id = 25 or id = 108)""".stripMargin)
    .show(10)

You should see the following output when you run your Scala application in IntelliJ:


+---+-------+
| id|    tag|
+---+-------+
| 25|sockets|
|108|    svn|
+---+-------+

 

SQL IN clause

In the DataFrame SQL query, we showed how to issue an SQL in clause on a dataframe. We can re-write the dataframe in query to find tags whose id are in (25, 108) using Spark SQL as shown below.


  // SQL IN clause
  sparkSession
    .sql(
      """select *
        |from so_tags
        |where id in (25, 108)""".stripMargin)
    .show(10)

You should see the following output when you run your Scala application in IntelliJ:


+---+---------+
| id|      tag|
+---+---------+
| 25|      c++|
| 25|        c|
| 25|  sockets|
| 25|mainframe|
| 25|      zos|
|108|  windows|
|108|      svn|
|108|    64bit|
+---+---------+

 

SQL Group By

In the DataFrame SQL query, we showed how to issue an SQL group by query on a dataframe. We can re-write the dataframe group by tag and count query using Spark SQL as shown below.


  // SQL Group By
  sparkSession
    .sql(
      """select tag, count(*) as count
        |from so_tags group by tag""".stripMargin)
    .show(10)

You should see the following output when you run your Scala application in IntelliJ:


+--------------------+-----+
|                 tag|count|
+--------------------+-----+
|         type-safety|    4|
|             jbutton|    1|
|              iframe|    2|
|           svn-hooks|    2|
|           standards|    7|
|knowledge-management|    2|
|            trayicon|    1|
|           arguments|    1|
|                 zfs|    1|
|              import|    3|
+--------------------+-----+
only showing top 10 rows

 

SQL Group By with having clause

In the DataFrame SQL query, we showed how to issue an SQL group by with filter query on a dataframe. We can re-write the dataframe group by tag and count where count is greater than 5 query using Spark SQL as shown below.


  // SQL Group By with having clause
  sparkSession
    .sql(
      """select tag, count(*) as count
        |from so_tags group by tag having count > 5""".stripMargin)
    .show(10)

You should see the following output when you run your Scala application in IntelliJ:


+----------------+-----+
|             tag|count|
+----------------+-----+
|       standards|    7|
|        keyboard|    8|
|             rss|   12|
|   documentation|   15|
|         session|    6|
|build-automation|    9|
|            unix|   34|
|          iphone|   16|
|             xss|    6|
| database-design|   12|
+----------------+-----+
only showing top 10 rows

 

SQL Order by

In the DataFrame SQL query, we showed how to issue an SQL order by query on a dataframe. We can re-write the dataframe group by, count and order by tag query using Spark SQL as shown below.


  // SQL Order by
  sparkSession
    .sql(
      """select tag, count(*) as count
        |from so_tags group by tag having count > 5 order by tag""".stripMargin)
    .show(10)

You should see the following output when you run your Scala application in IntelliJ:


+----------------+-----+
|             tag|count|
+----------------+-----+
|            .net|  351|
|        .net-2.0|   14|
|        .net-3.5|   30|
|           64bit|    7|
|  actionscript-3|   22|
|active-directory|   10|
|         ado.net|   11|
|           adobe|    7|
|           agile|    8|
|             air|   11|
+----------------+-----+
only showing top 10 rows

 

Typed columns, filter and create temp table

In the DataFrame SQL query, we showed how to cast columns to specific data types and how to filter dataframe. We will use these examples to register a temporary table named so_questions for the StackOverflow's questions file: questions_10K.csv. The so_questions and so_tags tables will later be used to show how to do SQL joins.


  // Typed dataframe, filter and temp table
  val dfQuestionsCSV = sparkSession
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("dateFormat","yyyy-MM-dd HH:mm:ss")
    .csv("src/main/resources/questions_10K.csv")
    .toDF("id", "creation_date", "closed_date", "deletion_date", "score", "owner_userid", "answer_count")


  // cast columns to data types
  val dfQuestions = dfQuestionsCSV.select(
    dfQuestionsCSV.col("id").cast("integer"),
    dfQuestionsCSV.col("creation_date").cast("timestamp"),
    dfQuestionsCSV.col("closed_date").cast("timestamp"),
    dfQuestionsCSV.col("deletion_date").cast("date"),
    dfQuestionsCSV.col("score").cast("integer"),
    dfQuestionsCSV.col("owner_userid").cast("integer"),
    dfQuestionsCSV.col("answer_count").cast("integer")
  )

  // filter dataframe
  val dfQuestionsSubset = dfQuestions.filter("score > 400 and score < 410").toDF()

  // register temp table
  dfQuestionsSubset.createOrReplaceTempView("so_questions")

 

SQL Inner Join

In the DataFrame SQL query, we showed how to issue an SQL inner join on two dataframes. We can re-write the dataframe tags inner join with the dataframe questions using Spark SQL as shown below. Note also that we are using the two temporary tables which we created earlier namely so_tags and so_questions.


  // SQL Inner Join
  sparkSession
    .sql(
    """select t.*, q.*
      |from so_questions q
      |inner join so_tags t
      |on t.id = q.id""".stripMargin)
    .show(10)

You should see the following output when you run your Scala application in IntelliJ:


+----+--------------------+----+-------------------+-------------------+-------------+-----+------------+------------+
|  id|                 tag|  id|      creation_date|        closed_date|deletion_date|score|owner_userid|answer_count|
+----+--------------------+----+-------------------+-------------------+-------------+-----+------------+------------+
| 888|              xdebug| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|
| 888|            phpstorm| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|
| 888|           debugging| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|
| 888|             eclipse| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|
| 888|                 php| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|
|1939|                 osx|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|
|1939|                 ios|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|
|1939|         objective-c|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|
|1939|              iphone|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|
|3881|illegalargumentex...|3881|2008-08-06 20:26:30|2016-09-23 14:34:31|         null|  402|         122|          27|
+----+--------------------+----+-------------------+-------------------+-------------+-----+------------+------------+
only showing top 10 rows

 

SQL Left Outer Join

In the DataFrame SQL query, we showed how to issue an SQL left outer join on two dataframes. We can re-write the dataframe tags left outer join with the dataframe questions using Spark SQL as shown below. Note also that we are using the two temporary tables which we created earlier namely so_tags and so_questions.


  // SQL Left Outer Join
  sparkSession
    .sql(
      """select t.*, q.*
        |from so_questions q
        |left outer join so_tags t
        |on t.id = q.id""".stripMargin)
    .show(10)

You should see the following output when you run your Scala application in IntelliJ:


+----+--------------------+----+-------------------+-------------------+-------------+-----+------------+------------+
|  id|                 tag|  id|      creation_date|        closed_date|deletion_date|score|owner_userid|answer_count|
+----+--------------------+----+-------------------+-------------------+-------------+-----+------------+------------+
| 888|              xdebug| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|
| 888|            phpstorm| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|
| 888|           debugging| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|
| 888|             eclipse| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|
| 888|                 php| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|
|1939|                 osx|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|
|1939|                 ios|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|
|1939|         objective-c|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|
|1939|              iphone|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|
|3881|illegalargumentex...|3881|2008-08-06 20:26:30|2016-09-23 14:34:31|         null|  402|         122|          27|
+----+--------------------+----+-------------------+-------------------+-------------+-----+------------+------------+
only showing top 10 rows

 

SQL Right Outer Join

In the DataFrame SQL query, we showed how to issue an SQL right outer join on two dataframes. We can re-write the dataframe tags right outer join with the dataframe questions using Spark SQL as shown below. Note also that we are using the two temporary tables which we created earlier namely so_tags and so_questions.


  // SQL Right Outer Join
  sparkSession
    .sql(
      """select t.*, q.*
        |from so_tags t
        |right outer join so_questions q
        |on t.id = q.id""".stripMargin)
    .show(10)

You should see the following output when you run your Scala application in IntelliJ:


+----+--------------------+----+-------------------+-------------------+-------------+-----+------------+------------+
|  id|                 tag|  id|      creation_date|        closed_date|deletion_date|score|owner_userid|answer_count|
+----+--------------------+----+-------------------+-------------------+-------------+-----+------------+------------+
| 888|              xdebug| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|
| 888|            phpstorm| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|
| 888|           debugging| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|
| 888|             eclipse| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|
| 888|                 php| 888|2008-08-04 00:18:21|2016-08-04 10:22:00|         null|  405|         131|          30|
|1939|                 osx|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|
|1939|                 ios|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|
|1939|         objective-c|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|
|1939|              iphone|1939|2008-08-05 06:39:36|2012-06-05 14:13:38|   2012-12-18|  408|        null|          48|
|3881|illegalargumentex...|3881|2008-08-06 20:26:30|2016-09-23 14:34:31|         null|  402|         122|          27|
+----+--------------------+----+-------------------+-------------------+-------------+-----+------------+------------+
only showing top 10 rows

 

SQL Distinct

In the DataFrame SQL query, we showed how to issue an SQL distinct on dataframe dfTags to find unique values in the tag column. We can re-write the dataframe tags distinct example using Spark SQL as shown below.


  // SQL Distinct
  sparkSession
    .sql("""select distinct tag from so_tags""".stripMargin)
    .show(10)

You should see the following output when you run your Scala application in IntelliJ:


+--------------------+
|                 tag|
+--------------------+
|         type-safety|
|             jbutton|
|              iframe|
|           svn-hooks|
|           standards|
|knowledge-management|
|            trayicon|
|           arguments|
|                 zfs|
|              import|
+--------------------+
only showing top 10 rows

 



Register User Defined Function (UDF)

For this example, we will show how Apache Spark allows you to register and use your own functions which are more commonly referred to as User Defined Functions (UDF).

 

We will create a function named prefixStackoverflow() which will prefix the String value so_ to a given String. In turn, we will register this function within our Spark session as a UDF and then use it in our Spark SQL query to augment each tag value with the prefix so_


  // Function to prefix a String with so_ short for StackOverflow
  def prefixStackoverflow(s: String): String = s"so_$s"

  // Register User Defined Function (UDF)
  sparkSession
    .udf
    .register("prefix_so", prefixStackoverflow _)

  // Use udf prefix_so to augment each tag value with so_
  sparkSession
    .sql("""select id, prefix_so(tag) from so_tags""".stripMargin)
    .show(10)

You should see the following output when you run your Scala application in IntelliJ:


+---+--------------------+
| id|  UDF:prefix_so(tag)|
+---+--------------------+
|  1|             so_data|
|  4|               so_c#|
|  4|         so_winforms|
|  4|  so_type-conversion|
|  4|          so_decimal|
|  4|          so_opacity|
|  6|             so_html|
|  6|              so_css|
|  6|             so_css3|
|  6|so_internet-explo...|
+---+--------------------+
only showing top 10 rows

NOTE: 

  • Every tag value now has a prefix so_

DataFrame Statistics Introduction

The examples in this section will make use of the Context trait which we've created in Bootstrap a SparkSession. By extending the Context trait, we will have access to a SparkSession.


object DataFrameStatistics_Tutorial extends App {
 
}

 

Create DataFrame from CSV

To recap from the example on creating a dataframe from reading a CSV file, we will once again create two dataframes: one to the tags while the other for the questions StackOverflow CSV file.


  // Create a dataframe from tags file question_tags_10K.csv
  val dfTags = sparkSession
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("src/main/resources/question_tags_10K.csv")
    .toDF("id", "tag")

  // Create a dataframe from questions file questions_10K.csv
  val dfQuestionsCSV = sparkSession
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("dateFormat","yyyy-MM-dd HH:mm:ss")
    .csv("src/main/resources/questions_10K.csv")
    .toDF("id", "creation_date", "closed_date", "deletion_date", "score", "owner_userid", "answer_count")

  // cast columns to data types
  val dfQuestions = dfQuestionsCSV.select(
    dfQuestionsCSV.col("id").cast("integer"),
    dfQuestionsCSV.col("creation_date").cast("timestamp"),
    dfQuestionsCSV.col("closed_date").cast("timestamp"),
    dfQuestionsCSV.col("deletion_date").cast("date"),
    dfQuestionsCSV.col("score").cast("integer"),
    dfQuestionsCSV.col("owner_userid").cast("integer"),
    dfQuestionsCSV.col("answer_count").cast("integer")
  )

 

Average

With dataframe dfQuestions in scope, we will compute the average of the score column using the code below. Note that you also need to import Spark's built-in functions using: import org.apache.spark.sql.functions._


  // Average
  import org.apache.spark.sql.functions._
  dfQuestions
    .select(avg("score"))
    .show()

You should see the following output when you run your Scala application in IntelliJ:


+-----------------+
|       avg(score)|
+-----------------+
|36.14631463146315|
+-----------------+

 

Maximum

With dataframe dfQuestions in scope, we will compute the maximum of the score column using the code below. Note that you also need to import Spark's built-in functions using: import org.apache.spark.sql.functions._


  // Max
  import org.apache.spark.sql.functions._
  dfQuestions
    .select(max("score"))
    .show()

You should see the following output when you run your Scala application in IntelliJ:


+----------+
|max(score)|
+----------+
|      4443|
+----------+

 

Minimum

With dataframe dfQuestions in scope, we will compute the minimum of the score column using the code below. Note that you also need to import Spark's built-in functions using: import org.apache.spark.sql.functions._


  // Minimum
  import org.apache.spark.sql.functions._
  dfQuestions
    .select(min("score"))
    .show()

You should see the following output when you run your Scala application in IntelliJ:


+----------+
|min(score)|
+----------+
|       -27|
+----------+

 

Mean

With dataframe dfQuestions in scope, we will compute the mean of the score column using the code below. Note that you also need to import Spark's built-in functions using: import org.apache.spark.sql.functions._


  // Mean
  import org.apache.spark.sql.functions._
  dfQuestions
    .select(mean("score"))
    .show()

You should see the following output when you run your Scala application in IntelliJ:


+-----------------+
|       avg(score)|
+-----------------+
|36.14631463146315|
+-----------------+

 

Sum

With dataframe dfQuestions in scope, we will compute the sum of the score column using the code below. Note that you also need to import Spark's built-in functions using: import org.apache.spark.sql.functions._


  // Sum
  import org.apache.spark.sql.functions._
  dfQuestions
    .select(sum("score"))
    .show()

You should see the following output when you run your Scala application in IntelliJ:


+----------+
|sum(score)|
+----------+
|    361427|
+----------+

 

Group by with statistics

With the dataframe dfQuestions and dfTags in scope, we will apply what we've learned on DataFrame Query and DataFrame Statistics. The example below will find all questions where id > 400 and id < 450, filter out any null in column owner_useridjoin with dfTags on the id column, group by owner_userid and calculate the average score column and the minimum answer_count column.


  // Group by with statistics
  import org.apache.spark.sql.functions._
  dfQuestions
    .filter("id > 400 and id < 450")
    .filter("owner_userid is not null")
    .join(dfTags, dfQuestions.col("id").equalTo(dfTags("id")))
    .groupBy(dfQuestions.col("owner_userid"))
    .agg(avg("score"), max("answer_count"))
    .show()

You should see the following output when you run your Scala application in IntelliJ:


+------------+----------+-----------------+
|owner_userid|avg(score)|max(answer_count)|
+------------+----------+-----------------+
|         268|      26.0|                1|
|         136|      57.6|                9|
|         123|      20.0|                3|
+------------+----------+-----------------+

 

DataFrame Statistics using describe() method

In the previous examples, we've shown how to compute statistics on DataFrame. If you are looking for a quick shortcut to compute the count, mean, standard deviation, min and max values from a DataFrame, then you can use the describe() method as shown below:


  // DataFrame Statistics using describe() method
  val dfQuestionsStatistics = dfQuestions.describe()
  dfQuestionsStatistics.show()

You should see the following output when you run your Scala application in IntelliJ:


+-------+-----------------+------------------+-----------------+------------------+
|summary|               id|             score|     owner_userid|      answer_count|
+-------+-----------------+------------------+-----------------+------------------+
|  count|             9999|              9999|             7388|              9922|
|   mean|33929.17081708171| 36.14631463146315|47389.99472116947|6.6232614392259626|
| stddev|19110.09560532429|160.48316753972045|280943.1070344427| 9.069109116851138|
|    min|                1|               -27|                1|                -5|
|    max|            66037|              4443|          3431280|               316|
+-------+-----------------+------------------+-----------------+------------------+

 

Correlation

For more advanced statistics which you typically add in a data science pipeline, Spark provides a convenient stat function. As an example, we will access the correlation method to find the correlation between column score and answer_count. For additional dataframe stat functions, see the official Spark 2 API documentation.


  // Correlation
  val correlation = dfQuestions.stat.corr("score", "answer_count")
  println(s"correlation between column score and answer_count = $correlation")

You should see the following output when you run your Scala application in IntelliJ:


correlation between column score and answer_count = 0.3699847903294707

 

Covariance

For more advanced statistics which you typically add in a data science pipeline, Spark provides a convenient stat function. As an example, we will access the covariance method to find the covariance between column score and answer_count. For additional dataframe stat functions, see the official Spark 2 API documentation.


  // Covariance
  val covariance = dfQuestions.stat.cov("score", "answer_count")
  println(s"covariance between column score and answer_count = $covariance")

You should see the following output when you run your Scala application in IntelliJ:


covariance between column score and answer_count = 537.513381444165

 

Frequent Items

For more advanced statistics which you typically add in a data science pipeline, Spark provides a convenient stat function. As an example, we will access the freqItems method to find the frequent items in the answer_count dataframe column. For additional dataframe stat functions, see the official Spark 2 API documentation.


  // Frequent Items
  val dfFrequentScore = dfQuestions.stat.freqItems(Seq("answer_count"))
  dfFrequentScore.show()

You should see the following output when you run your Scala application in IntelliJ:


+----------------------+
|answer_count_freqItems|
+----------------------+
|  [23, 131, 77, 86,...|
+----------------------+

 

Crosstab

For more advanced statistics which you typically add in a data science pipeline, Spark provides a convenient stat function. As an example, we will access the crosstab method to display a tabular view of score by owner_userid. For additional dataframe stat functions, see the official Spark 2 API documentation.


  // Crosstab
  val dfScoreByUserid = dfQuestions
    .filter("owner_userid > 0 and owner_userid < 20")
    .stat
    .crosstab("score", "owner_userid")
  dfScoreByUserid.show(10)

You should see the following output when you run your Scala application in IntelliJ:


+------------------+---+---+---+---+---+---+---+---+---+---+
|score_owner_userid|  1| 11| 13| 17|  2|  3|  4|  5|  8|  9|
+------------------+---+---+---+---+---+---+---+---+---+---+
|                56|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|
|               472|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|
|                14|  0|  0|  0|  1|  0|  0|  0|  1|  0|  0|
|                20|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|
|               179|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|
|                84|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|
|               160|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|
|                21|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|
|                 9|  0|  0|  0|  0|  0|  0|  1|  1|  0|  0|
|                 2|  0|  0|  0|  0|  0|  0|  0|  1|  0|  1|
+------------------+---+---+---+---+---+---+---+---+---+---+
only showing top 10 rows

 

Stratified sampling using sampleBy

For more advanced statistics which you typically add in a data science pipeline, Spark provides a convenient stat function. In this section, we will show how to perform stratified sampling on a dataframe using the sampleBy() method. For additional dataframe stat functions, see the official Spark 2 API documentation.

 

To start with, we will filter the dataframe dfQuestions to only include rows where answer_count is in (5, 10, 20). We then print the number of rows matching each answer_count so that we get an initial visual representation of the new dataframe dfQuestionsByAnswerCount.


  // find all rows where answer_count in (5, 10, 20)
  val dfQuestionsByAnswerCount = dfQuestions
    .filter("owner_userid > 0")
    .filter("answer_count in (5, 10, 20)")

  // count how many rows match answer_count in (5, 10, 20)
  dfQuestionsByAnswerCount
    .groupBy("answer_count")
    .count()
    .show()

You should see the following output when you run your Scala application in IntelliJ:


+------------+-----+
|answer_count|count|
+------------+-----+
|          20|   34|
|           5|  811|
|          10|  272|
+------------+-----+

 

Next, we create a fraction map which is a Map of key and values. Key in our example is the answer_count: 5, 10, 20 and values are fractions of the number of rows which we are interested to sample. The values should be in the range [0, 1]. Below the fractions map implies that we are interested in:

  • 50% of the rows that have answer_count = 5
  • 10% of the rows that have answer_count = 10
  • 100% of the rows that have answer_count = 20

With the fractions map defined, we need to pass it as a parameter to the sampleBy() method. Note also that you need to specify a random seed parameter as well.


  // Create a fraction map where we are only interested:
  // - 50% of the rows that have answer_count = 5
  // - 10% of the rows that have answer_count = 10
  // - 100% of the rows that have answer_count = 20
  // Note also that fractions should be in the range [0, 1]
  val fractionKeyMap = Map(5 -> 0.5, 10 -> 0.1, 20 -> 1.0)

  // Stratified sample using the fractionKeyMap.
  dfQuestionsByAnswerCount
    .stat
    .sampleBy("answer_count", fractionKeyMap, 7L)
    .groupBy("answer_count")
    .count()
    .show()

You should see the following output when you run your Scala application in IntelliJ:


+------------+-----+
|answer_count|count|
+------------+-----+
|          20|   34|
|           5|  400|
|          10|   26|
+------------+-----+

 

Note that changing the random seed will modify your sampling outcome. As an example, let's change the random seed to 37.


  // Note that changing the random seed will modify your sampling outcome. As an example, let's change the random seed to 37.
  dfQuestionsByAnswerCount
    .stat
    .sampleBy("answer_count", fractionKeyMap, 37L)
    .groupBy("answer_count")
    .count()
    .show()

You should see the following output when you run your Scala application in IntelliJ:


+------------+-----+
|answer_count|count|
+------------+-----+
|          20|   34|
|           5|  388|
|          10|   25|
+------------+-----+

 

Approximate Quantile

For more advanced statistics which you typically add in a data science pipeline, Spark provides a convenient stat function. For instance, when doing data exploration, you sometimes want to find out summary about various quantiles in your dataset. Spark comes with a handy approxQuantile() method and more details about the internal implementations and other stat functions can be found on the official Spark 2 API documentation.

 

The first parameter of the approxQuantile() method is the column of your dataframe on which to run the statistics, the second parameter is an Array of quantile probabilities and the third parameter is a precision error factor.

 

In the example below, we will find the minimum, median and maximum from the score column and as such we will pass an Array of probabilities Array(0, 0.5, 1) which represents:

  • 0 = minimum
  • 0.5 = median
  • 1 = maximum

  // Approximate Quantile
  val quantiles = dfQuestions
    .stat
    .approxQuantile("score", Array(0, 0.5, 1), 0.25)
  println(s"Qauntiles segments = ${quantiles.toSeq}")

You should see the following output when you run your Scala application in IntelliJ:


Qauntiles segments = WrappedArray(-27.0, 2.0, 4443.0)

 

You can also verify the quantiles statistics above using Spark SQL as follows:


  dfQuestions.createOrReplaceTempView("so_questions")
  sparkSession
    .sql("select min(score), percentile_approx(score, 0.25), max(score) from so_questions")
    .show()

You should see the following output when you run your Scala application in IntelliJ:


+----------+---------------------------------------------------------------------+----------+
|min(score)|percentile_approx(CAST(score AS DOUBLE), CAST(0.25 AS DOUBLE), 10000)|max(score)|
+----------+---------------------------------------------------------------------+----------+
|       -27|                                                                  2.0|      4443|
+----------+---------------------------------------------------------------------+----------+


Bloom Filter

For more advanced statistics which you typically add in a data science pipeline, Spark provides a convenient stat function. For instance, when training large datasets in a Machine Learning pipeline, you can make use of pre-processing steps such as Bloom Filtering to compact storage requirements of intermediate steps and also improve performance of iterative algorithms. Spark comes with a handy bloom filter implementation and it is exposed under the stat function.

 

The first parameter of the bloomFilter() method is the column of your dataframe on which a bloom filter set will be created, the second parameter is the number of items in the bloom filter set and the third parameter is a false positive factor.

 

In the example below, we will create a bloom filter for the tags column with 1000 items and a 10% false positive factor. Note that dfTags has 10000 tags using DataFrame count, but we are only storing 1000 items, i.e. 10% of the total number of tags, in the bloom filter below.


  // Bloom Filter
  val tagsBloomFilter = dfTags.stat.bloomFilter("tag", 1000L, 0.1)

 

Instead of querying the dataframe dfTags directly, we will use the mightContain() method of the Bloom Filter tagsBloomFilter to test whether certain tags exists.


  println(s"bloom filter contains java tag = ${tagsBloomFilter.mightContain("java")}")
  println(s"bloom filter contains some unknown tag = ${tagsBloomFilter.mightContain("unknown tag")}")

You should see the following output when you run your Scala application in IntelliJ:


bloom filter contains java tag = true
bloom filter contains some unknown tag = false

 

Count Min Sketch

For more advanced statistics which you typically add in a data science pipeline, Spark provides a convenient stat function. For instance, Spark supports the Count Min Sketch data structure typically used in probability approximation. The countMinSketch() method is exposed under the stat function.

 

As an example, we will create a Count Min Sketch data structure over the tag column of dataframe dfTags and estimate the occurrence for the term java.

 

The countMinSketch() method first parameter is the column of your dataframe to create the Count Min Sketch data structure, the second parameter is a precision error factor, the third parameter is the confidence level and the fourth parameter is a random seed.

 

In the example below, the various Count Min Sketch parameters are as follows:

  • first parameter = the tag column of dataframe dfTags
  • second parameter = 10% precision error factor
  • third parameter = 90% confidence level
  • fourth parameter = 37 as a random seed

  // Count Min Sketch
  val cmsTag = dfTags.stat.countMinSketch("tag", 0.1, 0.9, 37)
  val estimatedFrequency = cmsTag.estimateCount("java")
  println(s"Estimated frequency for tag java = $estimatedFrequency")

You should see the following output when you run your Scala application in IntelliJ:


Estimated frequency for tag java = 513

 

Sampling With Replacement

In the previous example above, we showed how to use Spark for Stratified Sampling using the sampleBy() method. To support additional data analysis such as correlation and covariance in your data science pipeline, Spark also supports Sampling in general.

 

In the example below, we will use the sample() method to create a sample of the tags dataframe. The sample() method takes the following parameters:

  • with replacement = true
  • number of rows to sample = 20%
  • a random seed = 37L

  // Sampling With Replacement
  val dfTagsSample = dfTags.sample(true, 0.2, 37L)
  println(s"Number of rows in sample dfTagsSample = ${dfTagsSample.count()}")
  println(s"Number of rows in dfTags = ${dfTags.count()}")

You should see the following output when you run your Scala application in IntelliJ:


Number of rows in sample dfTagsSample = 1948
Number of rows in dfTags = 9999

NOTE:

  • If you need sampling without replacement, you can reuse the same sample() method but you will have to set its first parameter to false.

DataFrame Operations Introduction

The examples in this section will make use of the Context trait which we've created in Bootstrap a SparkSession. By extending the Context trait, we will have access to a SparkSession.


object DataFrameOperations extends App with Context {

 

Setup DataFrames

By now you should be familiar with how to create a dataframe from reading a CSV file. Similar to the previous examples, we will create two dataframes, one for the StackOverflow tags dataset and the other for the questions dataset.

 

Note also that we will only take a subset of the questions dataset using the filter method and join method so that it is easier to work with the examples in this section.


  val dfTags = sparkSession
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("src/main/resources/question_tags_10K.csv")
    .toDF("id", "tag")

  dfTags.show(10)

  val dfQuestionsCSV = sparkSession
    .read
    .option("header", false)
    .option("inferSchema", true)
    .option("dateFormat","yyyy-MM-dd HH:mm:ss")
    .csv("src/main/resources/questions_10K.csv")
    .toDF("id", "creation_date", "closed_date", "deletion_date", "score", "owner_userid", "answer_count")

  val dfQuestions = dfQuestionsCSV
    .filter("score > 400 and score < 410")
    .join(dfTags, "id")
    .select("owner_userid", "tag", "creation_date", "score")
    .toDF()

  dfQuestions.show(10)

You should see the following output when you run your Scala application in IntelliJ:


+---+-------------------+
| id|                tag|
+---+-------------------+
|  1|               data|
|  4|                 c#|
|  4|           winforms|
|  4|    type-conversion|
|  4|            decimal|
|  4|            opacity|
|  6|               html|
|  6|                css|
|  6|               css3|
|  6|internet-explorer-7|
+---+-------------------+
only showing top 10 rows

+------------+--------------------+--------------------+-----+
|owner_userid|                 tag|       creation_date|score|
+------------+--------------------+--------------------+-----+
|         131|              xdebug|2008-08-03T23:18:21Z|  405|
|         131|            phpstorm|2008-08-03T23:18:21Z|  405|
|         131|           debugging|2008-08-03T23:18:21Z|  405|
|         131|             eclipse|2008-08-03T23:18:21Z|  405|
|         131|                 php|2008-08-03T23:18:21Z|  405|
|          NA|                 osx|2008-08-05T05:39:36Z|  408|
|          NA|                 ios|2008-08-05T05:39:36Z|  408|
|          NA|         objective-c|2008-08-05T05:39:36Z|  408|
|          NA|              iphone|2008-08-05T05:39:36Z|  408|
|         122|illegalargumentex...|2008-08-06T19:26:30Z|  402|
+------------+--------------------+--------------------+-----+
only showing top 10 rows


 

Convert DataFrame row to Scala case class

With the DataFrame dfTags in scope from the setup section, let us show how to convert each row of dataframe to a Scala case class.

 

We first create a case class to represent the tag properties namely id and tag.


  case class Tag(id: Int, tag: String)

The code below shows how to convert each row of the dataframe dfTags into Scala case class Tag created above.


  import sparkSession.implicits._
  val dfTagsOfTag: Dataset[Tag] = dfTags.as[Tag]
  dfTagsOfTag
    .take(10)
    .foreach(t => println(s"id = ${t.id}, tag = ${t.tag}"))

You should see the following output when you run your Scala application in IntelliJ:


id = 1, tag = data
id = 4, tag = c#
id = 4, tag = winforms
id = 4, tag = type-conversion
id = 4, tag = decimal
id = 4, tag = opacity
id = 6, tag = html
id = 6, tag = css
id = 6, tag = css3
id = 6, tag = internet-explorer-7

NOTE:

  • We've imported sparkSession.implicits._ which will do encoding of default types.
  • To demonstrate that each row of the dataframe was mapped to a Scala case class, we've sliced the dataset using the take() method and then used the foreach() method which gives you access to each row of type Tag created above.

DataFrame row to Scala case class using map()

In the previous example, we showed how to convert DataFrame row to Scala case class using as[]. If you need to manually parse each row, you can also make use of the map() method to convert DataFrame rows to a Scala case class.

 

In the example below, we will parse each row and normalize owner_userid and the creation_date fields. To start with, let us create a Case Class to represent the StackOverflow question dataset.


case class Question(owner_userid: Int, tag: String, creationDate: java.sql.Timestamp, score: Int)

Next, we'll create some functions to map each org.apache.spark.sql.Row into the Question case class above. Note also that we've defined Nested Functions to normalize owner_userid and creation_date fields.


// create a function which will parse each element in the row
  def toQuestion(row: org.apache.spark.sql.Row): Question = {
    // to normalize our owner_userid data
    val IntOf: String => Option[Int] = _ match {
      case s if s == "NA" => None
      case s => Some(s.toInt)
    }

    import java.time._
    val DateOf: String => java.sql.Timestamp = _ match {
      case s => java.sql.Timestamp.valueOf(ZonedDateTime.parse(s).toLocalDateTime)
    }

    Question (
      owner_userid = IntOf(row.getString(0)).getOrElse(-1),
      tag = row.getString(1),
      creationDate = DateOf(row.getString(2)),
      score = row.getString(3).toInt
    )
  }

To convert each row in the DataFrame into the Question case class, we can then call the map() method and pass in the toQuestion() function which we defined above.


// now let's convert each row into a Question case class
  import spark.implicits._
  val dfOfQuestion: Dataset[Question] = dfQuestions.map(row => toQuestion(row))
  dfOfQuestion
    .take(10)
    .foreach(q => println(s"owner userid = ${q.owner_userid}, tag = ${q.tag}, creation date = ${q.creationDate}, score = ${q.score}"))

You should see the following output when you run your Scala application in IntelliJ:


owner userid = 131, tag = xdebug, creation date = 2008-08-03 23:18:21.0, score = 405
owner userid = 131, tag = phpstorm, creation date = 2008-08-03 23:18:21.0, score = 405
owner userid = 131, tag = debugging, creation date = 2008-08-03 23:18:21.0, score = 405
owner userid = 131, tag = eclipse, creation date = 2008-08-03 23:18:21.0, score = 405
owner userid = 131, tag = php, creation date = 2008-08-03 23:18:21.0, score = 405
owner userid = -1, tag = osx, creation date = 2008-08-05 05:39:36.0, score = 408
owner userid = -1, tag = ios, creation date = 2008-08-05 05:39:36.0, score = 408
owner userid = -1, tag = objective-c, creation date = 2008-08-05 05:39:36.0, score = 408
owner userid = -1, tag = iphone, creation date = 2008-08-05 05:39:36.0, score = 408
owner userid = 122, tag = illegalargumentexception, creation date = 2008-08-06 19:26:30.0, score = 402

 

Create DataFrame from collection

So far we have seen how to create a dataframe by reading CSV file. In this example, we will show how to create a dataframe from a Collection sequence.


  val seqTags = Seq(
    1 -> "so_java",
    1 -> "so_jsp",
    2 -> "so_erlang",
    3 -> "so_scala",
    3 -> "so_akka"
  )

  import spark.implicits._
  val dfMoreTags = seqTags.toDF("id", "tag")
  dfMoreTags.show(10)

You should see the following output when you run your Scala application in IntelliJ:


+---+---------+
| id|      tag|
+---+---------+
|  1|  so_java|
|  1|   so_jsp|
|  2|so_erlang|
|  3| so_scala|
|  3|  so_akka|
+---+---------+

 

DataFrame Union

To merge two dataframes together you can make use of the union() method. In this example, we will merge the dataframe dfTags and the dataframe dfMoreTags which we created from the previous section.


  val dfUnionOfTags = dfTags
    .union(dfMoreTags)
    .filter("id in (1,3)")
  dfUnionOfTags.show(10)

You should see the following output when you run your Scala application in IntelliJ:


+---+--------+
| id|     tag|
+---+--------+
|  1|    data|
|  1| so_java|
|  1|  so_jsp|
|  3|so_scala|
|  3| so_akka|
+---+--------+

 

DataFrame Intersection

To find the intersection between two dataframes, you can make use of the intersection() method. In this example, we will find the intersection between the dataframe dfMoreTags and the dataframe dfUnionOfTags which we created from the previous section.


  val dfIntersectionTags = dfMoreTags
    .intersect(dfUnionOfTags)
    .show(10)

You should see the following output when you run your Scala application in IntelliJ:


+---+--------+
| id|     tag|
+---+--------+
|  3|so_scala|
|  3| so_akka|
|  1| so_java|
|  1|  so_jsp|
+---+--------+

 

Append column to DataFrame using withColumn()

When running data analysis, it can be quite handy to know how to add columns to dataframe. Using the withColumn() method, you can easily append columns to dataframe.

 

In the create dataframe from collection example above, we should have dataframe dfMoreTags in scope. We will call the withColumn() method along with org.apache.spark.sql.functions.split() method to split the value of the tag column and create two additional columns named so_prefix and so_tag.

 

Note also that we are showing how to call the drop() method to drop the temporary column tmp.

 
  import org.apache.spark.sql.functions._
  val dfSplitColumn = dfMoreTags
    .withColumn("tmp", split($"tag", "_"))
    .select(
      $"id",
      $"tag",
      $"tmp".getItem(0).as("so_prefix"),
      $"tmp".getItem(1).as("so_tag")
  ).drop("tmp")
  dfSplitColumn.show(10)

You should see the following output when you run your Scala application in IntelliJ:

 
 +---+---------+---------+------+
| id|      tag|so_prefix|so_tag|
+---+---------+---------+------+
|  1|  so_java|       so|  java|
|  1|   so_jsp|       so|   jsp|
|  2|so_erlang|       so|erlang|
|  3| so_scala|       so| scala|
|  3|  so_akka|       so|  akka|
+---+---------+---------+------+

 

Spark Functions
This section is inspired by the built-in functions that Spark provides out-of-the-box. They are very handy when working with dataframes, and running transformations on columns. For additional information, please refer to the official Spark API 2.0 documentation.

 

Create DataFrame from Tuples

Suppose that you have data represented as a Sequence of  Tuples for donut names along with their corresponding prices. To create a Spark DataFrame with two columns (one for donut names, and another for donut prices) from the Tuples, you can make use of the createDataFrame() method.


val donuts = Seq(("plain donut", 1.50), ("vanilla donut", 2.0), ("glazed donut", 2.50))
val df = sparkSession
 .createDataFrame(donuts)
 .toDF("Donut Name", "Price")

 df.show()

You should see the following output when you run your Scala application in IntelliJ:


+-------------+-----+
|   Donut Name|Price|
+-------------+-----+
|  plain donut|  1.5|
|vanilla donut|  2.0|
| glazed donut|  2.5|
+-------------+-----+

 

Get DataFrame column names

There may be times such as for reporting purposes, when you require getting the column names (also referred as headers or titles) from Spark DataFrames. To this end, you can make use of the columns() method, which is exposed on the dataframe.

For the purpose of this example, we'll reuse the previous code snippet to create a dataframe from tuples. We then call the columns() method on the dataframe, which will return an Array of type String representing the column headers. To iterate through the columnNames array, we make use of the foreach() collection function.


  val donuts = Seq(("plain donut", 1.50), ("vanilla donut", 2.0), ("glazed donut", 2.50))
  val df = sparkSession
    .createDataFrame(donuts)
    .toDF("Donut Name", "Price")

  val columnNames: Array[String] = df.columns
  columnNames.foreach(name => println(s"$name"))

You should see the following output when you run your Scala application in IntelliJ:


Donut Name
Price

 

DataFrame column names and types

Let's extend the two previous Spark functions examples on creating dataframe from tuples and getting dataframe column names. What if you also need to be aware of column datatypes? You can use the built-in dtypes method from a dataframe, which returns an Array of Tuples representing column names with their corresponding data types.


val donuts = Seq(("plain donut", 1.50), ("vanilla donut", 2.0), ("glazed donut", 2.50))
val df = sparkSession
    .createDataFrame(donuts)
    .toDF("Donut Name", "Price")

val (columnNames, columnDataTypes) = df.dtypes.unzip
println(s"DataFrame column names = ${columnNames.mkString(", ")}")
println(s"DataFrame column data types = ${columnDataTypes.mkString(", ")}")

NOTE:

  • columnNames is an Array[String] representing the dataframe column names
  • columnDataTypes is an Array[String] representing Spark column DataTypes
  • To learn more about Spark DataFrame data types, you can refer to the official documentation.

You should see the following output when you run your Scala application in IntelliJ:


DataFrame column names = Donut Name, Price
DataFrame column data types = StringType, DoubleType

 

Json into DataFrame using explode()

From the previous examples in our Spark tutorial, we have seen that Spark has built-in support for reading various file formats such as CSV or JSON files into DataFrame. JSON files can have additional complexities if its content is nested.

 

Let's create a simple nested JSON file below, which has a root element named stackoverflow, with an array of tag elements. Each tag element has an id, name, author, and frameworks elements. The frameworks elements is also an array of elements such as id and name.


{
  "stackoverflow": [{
    "tag": {
      "id": 1,
      "name": "scala",
      "author": "Martin Odersky",
      "frameworks": [
        {
          "id": 1,
          "name": "Play Framework"
        },
        {
          "id": 2,
          "name": "Akka Framework"
        }
      ]
    }
  },
    {
      "tag": {
        "id": 2,
        "name": "java",
        "author": "James Gosling",
        "frameworks": [
          {
            "id": 1,
            "name": "Apache Tomcat"
          },
          {
            "id": 2,
            "name": "Spring Boot"
          }
        ]
      }
    }
  ]
}

To read the nested Json file into a Spark dataframe, we'll use the multiLine and inferSchema options.


import sparkSession.sqlContext.implicits._
val tagsDF = sparkSession
  .read
  .option("multiLine", true)
  .option("inferSchema", true)
  .json("src/main/resources/tags_sample.json")

We can then use the explode() function to unwrap the root stackoverflow element. Note also that we can alias the exploded column using the as() method.


  val df = tagsDF.select(explode($"stackoverflow") as "stackoverflow_tags")

The exploded schema shows the various structs following the explode() method above. Dataframe df has the following structs: stackoverflow_tags, tag and the remaining child elements.


root
 |-- stackoverflow_tags: struct (nullable = true)
 |    |-- tag: struct (nullable = true)
 |    |    |-- author: string (nullable = true)
 |    |    |-- frameworks: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)

With the above schema and structs in mind, you can easily tabulate your dataframe by using the select() method.


  df.select(
    $"stackoverflow_tags.tag.id" as "id",
    $"stackoverflow_tags.tag.author" as "author",
    $"stackoverflow_tags.tag.name" as "tag_name",
    $"stackoverflow_tags.tag.frameworks.id" as "frameworks_id",
    $"stackoverflow_tags.tag.frameworks.name" as "frameworks_name"
  ).show()

You should see the following output when you run your Scala application in IntelliJ:


+---+--------------+--------+-------------+--------------------+
| id|        author|tag_name|frameworks_id|     frameworks_name|
+---+--------------+--------+-------------+--------------------+
|  1|Martin Odersky|   scala|       [1, 2]|[Play Framework, ...|
|  2| James Gosling|    java|       [1, 2]|[Apache Tomcat, S...|
+---+--------------+--------+-------------+--------------------+

 

Concatenate DataFrames using join()

Under our DataFrame SQL Query and Spark SQL sections, we provided various code snippets for merging DataFrames using the join() method. The question to concatenate DataFrames column-wise still come up, and let's provide another example for concatenating two DataFrames column-wise by making use of the join() method.

 

Firstly, we create a dataframe which represents a donut id, name and price.


  val donuts = Seq(("111","plain donut", 1.50), ("222", "vanilla donut", 2.0), ("333","glazed donut", 2.50))

  val dfDonuts = sparkSession
    .createDataFrame(donuts)
    .toDF("Id","Donut Name", "Price")
  dfDonuts.show()

Secondly, we create another dataframe which represents a donut id and an inventory amount.


  val inventory = Seq(("111", 10), ("222", 20), ("333", 30))
  val dfInventory = sparkSession
      .createDataFrame(inventory)
      .toDF("Id", "Inventory")
  dfInventory.show()

To concatenate column-wise the dataframe dfDonuts with the dataframe dfInventory, we can make use of the join() method, and specify the join column to be the id column.


  val dfDonutsInventory = dfDonuts.join(dfInventory, Seq("Id"), "inner")
  dfDonutsInventory.show()

You should see the following output when you run your Scala application in IntelliJ:


+---+-------------+-----+---------+
| Id|   Donut Name|Price|Inventory|
+---+-------------+-----+---------+
|111|  plain donut|  1.5|       10|
|222|vanilla donut|  2.0|       20|
|333| glazed donut|  2.5|       30|
+---+-------------+-----+---------+

 

Search DataFrame column using array_contains()

In the section on Json into DataFrame using explode(), we showed how to read a nested Json file by using Spark's built-in explode() method to denormalise the JSON content into a dataframe. We will reuse the tags_sample.json JSON file, which when converted into DataFrame produced the dataframe below consisting of columns id, author, tag_name, frameworks_id and frameworks_name.


+---+--------------+--------+-------------+--------------------+
| id|        author|tag_name|frameworks_id|     frameworks_name|
+---+--------------+--------+-------------+--------------------+
|  1|Martin Odersky|   scala|       [1, 2]|[Play Framework, ...|
|  2| James Gosling|    java|       [1, 2]|[Apache Tomcat, S...|
+---+--------------+--------+-------------+--------------------+

Note, however, that the frameworks_name column is in fact an array of type String. What if you needed to find only the rows in the dataframe, which contained the item Play Framework in the column frameworks_name? We will use Spark's DataFrame select() and where() methods, and pair them with array_contains() method to filter the frameworks_name column for the item Play Framework.


  import sparkSession.sqlContext.implicits._

  val tagsDF = sparkSession
    .read
    .option("multiLine", true)
    .option("inferSchema", true)
    .json("src/main/resources/tags_sample.json")

  val df = tagsDF
    .select(explode($"stackoverflow") as "stackoverflow_tags")
    .select(
      $"stackoverflow_tags.tag.id" as "id",
      $"stackoverflow_tags.tag.author" as "author",
      $"stackoverflow_tags.tag.name" as "tag_name",
      $"stackoverflow_tags.tag.frameworks.id" as "frameworks_id",
      $"stackoverflow_tags.tag.frameworks.name" as "frameworks_name"
    )
  df.show()

  df
    .select("*")
    .where(array_contains($"frameworks_name","Play Framework"))
    .show()

You should see the following output when you run your Scala application in IntelliJ:


+---+--------------+--------+-------------+--------------------+
| id|        author|tag_name|frameworks_id|     frameworks_name|
+---+--------------+--------+-------------+--------------------+
|  1|Martin Odersky|   scala|       [1, 2]|[Play Framework, ...|
+---+--------------+--------+-------------+--------------------+

 

Check DataFrame column exists

When using Spark for Extract Transform and Load (ETL), and even perhaps for Data Science work from plain data analytics to machine learning, you may be working with dataframes that have been generated by some other process or stage. As sanity check on the dataframe which you will be testing say your model, you may need to test for certain columns to exist in the dataframes.

To this end, the columns() method can be used to return an Array of type String, which represents the column names of the dataframe. From that array, you can make use of the contains() method to check if a particular column exists.


  val donuts = Seq(("plain donut", 1.50), ("vanilla donut", 2.0), ("glazed donut", 2.50))
  val df = sparkSession.createDataFrame(donuts).toDF("Donut Name", "Price")

  df.show()

  val priceColumnExists = df.columns.contains("Price")
  println(s"Does price column exist = $priceColumnExists")

You should see the following output when you run your Scala application in IntelliJ:


+-------------+-----+
|   Donut Name|Price|
+-------------+-----+
|  plain donut|  1.5|
|vanilla donut|  2.0|
| glazed donut|  2.5|
+-------------+-----+

Does price column exist = true

 

Split DataFrame Array column

Throughout this Spark 2.0 tutorial series, we've already showed that Spark's dataframe can hold columns of complex types such as an Array of values. In this example, we will show how you can further denormalise an Array columns into separate columns. Denormalisation is a fairly natural process, which happens in say a Machine Learning pipeline, in order to interpret and pre-process data for subsequent stages of the pipeline.

 

Let's start by creating a simple dataframe with two columns. The first column will represent a donut name of type String. The second column will represent our complex Array of prices. We will assume that the array will hold only two items for the purpose of this example. The first price is the Low Price of a donut, and the second price is the High Price of a donut. Using the familiar show() method, we can quickly visualise our dataframe with the two columns: Name and Prices.


  import sparkSession.sqlContext.implicits._

  val targets = Seq(("Plain Donut", Array(1.50, 2.0)), ("Vanilla Donut", Array(2.0, 2.50)), ("Strawberry Donut", Array(2.50, 3.50)))
  val df = sparkSession
    .createDataFrame(targets)
    .toDF("Name", "Prices")

  df.show()

You should see the following output when you run your Scala application in IntelliJ:


+----------------+----------+
|            Name|    Prices|
+----------------+----------+
|     Plain Donut|[1.5, 2.0]|
|   Vanilla Donut|[2.0, 2.5]|
|Strawberry Donut|[2.5, 3.5]|
+----------------+----------+

As a reminder, to better understand the structure of our dataframe, we can make use of the printSchema() method.


df.printSchema()

You should see the following output when you run your Scala application in IntelliJ:


root
 |-- Name: string (nullable = true)
 |-- Prices: array (nullable = true)
 |    |-- element: double (containsNull = false)

To split the Prices column (which is an Array of type double), we can reference the Prices column by the element index as shown below:


  val df2 = df
    .select(
      $"Name",
      $"Prices"(0).as("Low Price"),
      $"Prices"(1).as("High Price")
    )

  df2.show()

You should see the following output when you run your Scala application in IntelliJ:


+----------------+---------+----------+
|            Name|Low Price|High Price|
+----------------+---------+----------+
|     Plain Donut|      1.5|       2.0|
|   Vanilla Donut|      2.0|       2.5|
|Strawberry Donut|      2.5|       3.5|
+----------------+---------+----------+

 

Rename DataFrame column

We continue our Spark 2.0 series on some handy functions that Spark provides out-of-the-box, and in this section, we will show how you can easily rename a dataframe column. So you may wonder why would anyone bother renaming a dataframe column?

 

When running a model through a Machine Learning pipeline, a dataframe typically goes through various stages of transformation. A base price for a donut could, for instance, be enriched with some Correlation or Variance metrics of some other product, in order for the resulting column to be a proxy for the next stage of the Machine Learning pipeline. Throughout these enrichment steps, it is typical to rename dataframe columns to maintain clarity, and to keep our dataframes in-line with the corresponding transformations or models.

 

To rename a dataframe using Spark, you just have to make use of the withColumnRenamed() method. In the example below, we are simply renaming the Donut Name column to Name.


  val donuts = Seq(("plain donut", 1.50), ("vanilla donut", 2.0), ("glazed donut", 2.50))
  val df = spark.createDataFrame(donuts).toDF("Donut Name", "Price")
  df.show()

  val df2 = df.withColumnRenamed("Donut Name", "Name")
  df2.show()

You should see the following output when you run your Scala application in IntelliJ:


+-------------+-----+
|   Donut Name|Price|
+-------------+-----+
|  plain donut|  1.5|
|vanilla donut|  2.0|
| glazed donut|  2.5|
+-------------+-----+

+-------------+-----+
|         Name|Price|
+-------------+-----+
|  plain donut|  1.5|
|vanilla donut|  2.0|
| glazed donut|  2.5|
+-------------+-----+

 

Create DataFrame constant column

Continuing our Machine Learning discussion from the previous code snippet on Rename DataFrame column, it is also typical to lift or enrich dataframes with constant values. Constant columns may be required as additional parameters in order to fit a dataframe to a corresponding analytical model.

 

To create a constant column in a Spark dataframe, you can make use of the withColumn() method. Note that you need to import org.apache.spark.sql.functions._. In the example below, we will create three constant columns, and show that you can have constant columns of various data types. To start with, we will augment the dataframe with a column named Tasty, and it will hold a Boolean value of true. The second column named Correlation will hold an Int value of 1. Finally, the third column named Stock Min Max will represent an Array for the minimum and maximum stock quantities respectively.


  val donuts = Seq(("plain donut", 1.50), ("vanilla donut", 2.0), ("glazed donut", 2.50))
  val df = spark.createDataFrame(donuts).toDF("Donut Name", "Price")

  import org.apache.spark.sql.functions._
  val df2 = df
    .withColumn("Tasty", lit(true))
    .withColumn("Correlation", lit(1))
    .withColumn("Stock Min Max", typedLit(Seq(100, 500)))

  df2.show()

You should see the following output when you run your Scala application in IntelliJ:


+-------------+-----+-----+-----------+-------------+
|   Donut Name|Price|Tasty|Correlation|Stock Min Max|
+-------------+-----+-----+-----------+-------------+
|  plain donut|  1.5| true|          1|   [100, 500]|
|vanilla donut|  2.0| true|          1|   [100, 500]|
| glazed donut|  2.5| true|          1|   [100, 500]|
+-------------+-----+-----+-----------+-------------+

 

DataFrame new column with User Defined Function (UDF)

In the previous section, we showed how you can augment a Spark DataFrame by adding a constant column. Sometimes, though, in your Machine Learning pipeline, you may have to apply a particular function in order to produce a new dataframe column.

 

With Spark 2.0, you can make use of a User Defined Function (UDF). In the example below, we will use the Donut Name column as input to a UDF named stockMinMax(), and produce a new dataframe column named Stock Min Max. To keep this example simple, the stockMinMax() UDF will return a Sequence of Int to represent the minimum and maximum donut quantities. Note that in addition to importing the familiar org.apache.spark.sql.functions._, you also have to import spark.sqlContext.implicits._. Moreover, before using the stockMinMax() UDF as part of the withColumn() method, you first need to register it as a UDF by using the udf() method.


  val donuts = Seq(("plain donut", 1.50), ("vanilla donut", 2.0), ("glazed donut", 2.50))
  val df = spark.createDataFrame(donuts).toDF("Donut Name", "Price")

  import org.apache.spark.sql.functions._
  import spark.sqlContext.implicits._

  val stockMinMax: (String => Seq[Int]) = (donutName: String) => donutName match {
    case "plain donut"    => Seq(100, 500)
    case "vanilla donut"  => Seq(200, 400)
    case "glazed donut"   => Seq(300, 600)
    case _                => Seq(150, 150)
  }

  val udfStockMinMax = udf(stockMinMax)
  val df2 = df.withColumn("Stock Min Max", udfStockMinMax($"Donut Name"))
  df2.show()

You should see the following output when you run your Scala application in IntelliJ:


+-------------+-----+-------------+
|   Donut Name|Price|Stock Min Max|
+-------------+-----+-------------+
|  plain donut|  1.5|   [100, 500]|
|vanilla donut|  2.0|   [200, 400]|
| glazed donut|  2.5|   [300, 600]|
+-------------+-----+-------------+

 

DataFrame First Row

When using Spark for data science projects, data may be originate from various sources. Before applying a particular function or model to a dataframe, you may have to inspect its data points in order to visually be familiar with the data. While we've already introduced the Stratified sampling using Spark's sampleBy() method, in this example, we will show how you can make use of the first() method.

 

In addition, if you need to return a particular column from the first row of the dataframe, you can also specify the column index: df.first().get(0). Sometimes, though, you may have to refer to a particular column by name as opposed to a column index. In the code snippet below, we make use of the getAs() method to return the value from the dataframe's first row for the Price column: df.first().getAs[Double]("Price").


  val donuts = Seq(("plain donut", 1.50), ("vanilla donut", 2.0), ("glazed donut", 2.50))
  val df = sparkSession
    .createDataFrame(donuts)
    .toDF("Donut Name", "Price")

  val firstRow = df.first()
  println(s"First row = $firstRow")

  val firstRowColumn1 = df.first().get(0)
  println(s"First row column 1 = $firstRowColumn1")

  val firstRowColumnPrice = df.first().getAs[Double]("Price")
  println(s"First row column Price = $firstRowColumnPrice")

You should see the following output when you run your Scala application in IntelliJ:


First row = [plain donut,1.5]
First row column 1 = plain donut
First row column Price = 1.5

 

Format DataFrame column

Within your data analysis and Machine Learning pipelines, in addition to transforming data points of a dataframe, you would most certainly format dataframe columns. For instance, you may only need the day or month or year from a date. In the code snippet below, we will show how you can make use of the built-in functions that Spark provides to help you easily format columns.

 

We enrich our dataframe with the following formatted columns:

  • Price Formatted: We use Spark's format_number() function to format price values by 2 decimal places.
  • Name Formatted: We use Spark's format_string() function to prepend the text "awesome" to donut names.
  • Name Uppercase: We use Spark's upper() function to convert the Donut Name values into uppercase.
  • Name lowercase: We use Spark's lower() function to convert the Donut Name value into lowercase.
  • Date Formatted: We use Spark's date_format() function to format the Purchase Date column using the format yyyyMMdd.
  • Day: We use Spark's dayofmonth() function to extract the day of the month from the Purchase Date.
  • Month: We use Spark's month() function to extract the month from the Purchase Date.
  • Year: We use Spark's year() function to extract the year from the Purchase Date.

  val donuts = Seq(("plain donut", 1.50, "2018-04-17"), ("vanilla donut", 2.0, "2018-04-01"), ("glazed donut", 2.50, "2018-04-02"))
  val df = spark.createDataFrame(donuts).toDF("Donut Name", "Price", "Purchase Date")

  import org.apache.spark.sql.functions._
  import spark.sqlContext.implicits._

  df
    .withColumn("Price Formatted", format_number($"Price", 2))
    .withColumn("Name Formatted", format_string("awesome %s", $"Donut Name"))
    .withColumn("Name Uppercase", upper($"Donut Name"))
    .withColumn("Name Lowercase", lower($"Donut Name"))
    .withColumn("Date Formatted", date_format($"Purchase Date", "yyyyMMdd"))
    .withColumn("Day", dayofmonth($"Purchase Date"))
    .withColumn("Month", month($"Purchase Date"))
    .withColumn("Year", year($"Purchase Date"))
    .show()

You should see the following output when you run your Scala application in IntelliJ:


+-------------+-----+-------------+---------------+--------------------+--------------+--------------+--------------+---+-----+----+
|   Donut Name|Price|Purchase Date|Price Formatted|      Name Formatted|Name Uppercase|Name Lowercase|Date Formatted|Day|Month|Year|
+-------------+-----+-------------+---------------+--------------------+--------------+--------------+--------------+---+-----+----+
|  plain donut|  1.5|   2018-04-17|           1.50| awesome plain donut|   PLAIN DONUT|   plain donut|      20180417| 17|    4|2018|
|vanilla donut|  2.0|   2018-04-01|           2.00|awesome vanilla d...| VANILLA DONUT| vanilla donut|      20180401|  1|    4|2018|
| glazed donut|  2.5|   2018-04-02|           2.50|awesome glazed donut|  GLAZED DONUT|  glazed donut|      20180402|  2|    4|2018|
+-------------+-----+-------------+---------------+--------------------+--------------+--------------+--------------+---+-----+----+

 

DataFrame column hashing

In this section, we show some of the built-in hashing functions that Spark provides out-of-the-box. You may wonder, though, in what circumstance would you need to hash column values in a dataframe? In certain Machine Learning pipelines, there are some scenarios when you need to detect staleness in data, in order to reproduce a result set. In such a case, hashing some of your data points become very useful.

 

In the simple examples below, we enrich our dataframe with the following hashing columns:

  • Hash column: This column creates a hash values for column Donut Names. As per the Spark 2.0 API documentation, the hash() function makes use of the Murmur3 hash.
  • MD5 column: This column creates MD5 hash values for column Donut Names.
  • SHA-1 column: This column creates SHA-1 hash values for column Donut Names.
  • SHA-2 column: This column creates SHA-2 hash values for column Donut Names. For SHA-2, you will have to specify a second parameter for the number of bits. In the example below, I have chosen 256 as the number of bits.

val donuts = Seq(("plain donut", 1.50, "2018-04-17"), ("vanilla donut", 2.0, "2018-04-01"), ("glazed donut", 2.50, "2018-04-02"))
val df = spark.createDataFrame(donuts).toDF("Donut Name", "Price", "Purchase Date")

import org.apache.spark.sql.functions._
import spark.sqlContext.implicits._

df
.withColumn("Hash", hash($"Donut Name")) // murmur3 hash as default.
.withColumn("MD5", md5($"Donut Name"))
.withColumn("SHA1", sha1($"Donut Name"))
.withColumn("SHA2", sha2($"Donut Name", 256)) // 256 is the number of bits
.show()

You should see the following output when you run your Scala application in IntelliJ:


+-------------+-----+-------------+----------+--------------------+--------------------+--------------------+
|   Donut Name|Price|Purchase Date|      Hash|                 MD5|                SHA1|                SHA2|
+-------------+-----+-------------+----------+--------------------+--------------------+--------------------+
|  plain donut|  1.5|   2018-04-17|1594998220|53a70d9f08d8bb249...|7882fd7481cb43452...|4aace471ed4433f1b...|
|vanilla donut|  2.0|   2018-04-01| 673697474|254c8f04be947ec2c...|5dbbc954723a74fe0...|ccda17c5bc47d1671...|
| glazed donut|  2.5|   2018-04-02| 715175419|44199f422534a5736...|aaee30ecdc523fa1e...|6d1568ca8c20ffc0b...|
+-------------+-----+-------------+----------+--------------------+--------------------+--------------------+

 

DataFrame String Functions

We've already seen a few String functions such as split(), format_string(), upper() and lower() from the previous examples. In this section, I thought of presenting some of the additional built-in functions that Spark provides when you have to work with textual data points. As a matter of fact, these can be handy if you have a need to normalise and run feature extractions over textual datasets in a Machine Learning pipeline.

 

We will enrich our dataframe with the following columns:

  • Contains plain: This column uses the instr() function and outputs the index of the given substring in a column.
  • Length: This column uses the length() function and outputs the lengths for the string values in a column.
  • Trim: This column uses the trim() function and removes spaces on both sides of the text in a column.
  • LTrim: This column uses the ltrim() function and removes spaces from the left side of the text in a column.
  • RTrim: This column uses the rtrim() function and removes spaces from the right side of the text in a column.
  • Reverse: This column uses the reverse() function and outputs the text in reverse order in a column.
  • Substring: This column uses the substring() function and outputs the text in a column for the given from and to character indices.
  • IsNull: This column uses the isnull() function and outputs true or false if the text in a column is null or not.
  • Concat: This column uses the concat_ws() function and outputs a String representation of columns being concatenated. Note also that the concat_ws() function also allows you to provide a given textual separator.
  • InitCap: This column uses the initcap() function and converts the first letter for each word in the column into uppercase.

val donuts = Seq(("plain donut", 1.50, "2018-04-17"), ("vanilla donut", 2.0, "2018-04-01"), ("glazed donut", 2.50, "2018-04-02"))
val df = spark
	.createDataFrame(donuts)
	.toDF("Donut Name", "Price", "Purchase Date")

import org.apache.spark.sql.functions._
import spark.sqlContext.implicits._

df
.withColumn("Contains plain", instr($"Donut Name", "donut"))
.withColumn("Length", length($"Donut Name"))
.withColumn("Trim", trim($"Donut Name"))
.withColumn("LTrim", ltrim($"Donut Name"))
.withColumn("RTrim", rtrim($"Donut Name"))
.withColumn("Reverse", reverse($"Donut Name"))
.withColumn("Substring", substring($"Donut Name", 0, 5))
.withColumn("IsNull", isnull($"Donut Name"))
.withColumn("Concat", concat_ws(" - ", $"Donut Name", $"Price"))
.withColumn("InitCap", initcap($"Donut Name"))
.show()

You should see the following output when you run your Scala application in IntelliJ:


+-------------+-----+-------------+--------------+------+-------------+-------------+-------------+-------------+---------+------+-------------------+-------------+
|   Donut Name|Price|Purchase Date|Contains plain|Length|         Trim|        LTrim|        RTrim|      Reverse|Substring|IsNull|             Concat|      InitCap|
+-------------+-----+-------------+--------------+------+-------------+-------------+-------------+-------------+---------+------+-------------------+-------------+
|  plain donut|  1.5|   2018-04-17|             7|    11|  plain donut|  plain donut|  plain donut|  tunod nialp|    plain| false|  plain donut - 1.5|  Plain Donut|
|vanilla donut|  2.0|   2018-04-01|             9|    13|vanilla donut|vanilla donut|vanilla donut|tunod allinav|    vanil| false|vanilla donut - 2.0|Vanilla Donut|
| glazed donut|  2.5|   2018-04-02|             8|    12| glazed donut| glazed donut| glazed donut| tunod dezalg|    glaze| false| glazed donut - 2.5| Glazed Donut|
+-------------+-----+-------------+--------------+------+-------------+-------------+-------------+-------------+---------+------+-------------------+-------------+

 

DataFrame drop null

This section is a fairly short one, but it showcases yet another useful function from Spark 2.0; that of dropping null values in a dataframe. One of the very first stages of most Machine Learning pipelines involves a data cleansing or preparation stage. During that stage, it is typical for data scientists to identify and remove unwanted data points to fit a certain model or function. It certainly goes without saying that one of the most irritating step during the data cleansing stage is to drop null values. Fortunately for us, Spark 2.0 comes with the handy na.drop() functions to easily remove null values from a dataframe.

 

Let's create a simple dataframe which contains some null value in the Donut Name column.


val donuts = Seq(("plain donut", 1.50), (null.asInstanceOf[String], 2.0), ("glazed donut", 2.50))
val dfWithNull = sparkSession
  .createDataFrame(donuts)
  .toDF("Donut Name", "Price")

dfWithNull.show()

You should see the following output when you run your Scala application in IntelliJ:


+------------+-----+
|  Donut Name|Price|
+------------+-----+
| plain donut|  1.5|
|        null|  2.0|
|glazed donut|  2.5|
+------------+-----+

See below that when you use na.drop(), the row which had a null value in the Donut Name column gets dropped from the dataframe.


val dfWithoutNull = dfWithNull.na.drop()
dfWithoutNull.show()

You should see the following output when you run your Scala application in IntelliJ:


+------------+-----+
|  Donut Name|Price|
+------------+-----+
| plain donut|  1.5|
|glazed donut|  2.5|
+------------+-----+

 

Nadim Bahadoor on FacebookNadim Bahadoor on GithubNadim Bahadoor on LinkedinNadim Bahadoor on Twitter
Nadim Bahadoor
Technology and Finance Consultant with over 14 years of hands-on experience building large scale systems in the Financial (Electronic Trading Platforms), Risk, Insurance and Life Science sectors. I am self-driven and passionate about Finance, Distributed Systems, Functional Programming, Big Data, Semantic Data (Graph) and Machine Learning.
Other allaboutscala.com tutorials you may like: