How to convert RDD to DataFrame

In this blog post we will learn how to convert RDD to DataFrame with spark helper methods used in local development or testing.

Converting RDD to Data Frame

First let us create an RDD from collections,

val temperatureRecords = Seq(
(India,Array(27.0,26.0,40.1)),
(China,Array(27.0,26.0,40.1))
)
val temperatureRDD = spark.sparkContext.parallize(temperatureRDD);

There are several ways to convert RDD to DataFrame

  1. By using createDataFrame(RDD obj) from SparkSession object
  2. By using createDataFrame(RDD obj) from SparkSession object and by specifying columns names
  3. By using createDataFrame(RDD obj, StructType type) by providing schema using StructType

Method 01 - 

We will use createDataFrame(Rdd rdd) method to convert RDD into DataFrame

Creating RDD with collections and converting into DataFrame

val temperatureRecords = List(("Afghanistan","8,996,351"),("China","667,070.00"),("India","449,480.61"))
val dfWithoutSchema = spark.createDataFrame(temperatureData)
dfWithoutSchema.show;

Output

+-----------+----------+
|         _1|        _2|
+-----------+----------+
|Afghanistan| 8,996,351|
|      China|667,070.00|
|      India|449,480.61|
+-----------+----------+

Creating RDD with external source and converting into DataFrame

val temperatureData = spark.sparkContext.textFile("d:/spark-example/population.csv");

Output

+-----------+----------+
|         _1|        _2|
+-----------+----------+
|Afghanistan| 8,996,351|
|      China|667,070.00|
|   India449|    480.61|
+-----------+----------+

If you notice the above output the columns names are "_1" and "2" since we didn't specify the column names. Lets see how to define column names in the next method.

Method 02 - 

toDF() provides a concise syntax for creating DataFrames  by specifying column names and can be accessed after importing Spark implicits.

import sqlContext.implicits.
val dataFrameWithSchema = spark.createDataFrame(temperatureRecords).toDF("Country","Population in billions");
dataFrameWithSchema.show;

Output

+-----------+----------------------+
|    Country|Population in billions|
+-----------+----------------------+
|Afghanistan|             8,996,351|
|      China|            667,070.00|
|   India449|                480.61|
+-----------+----------------------+

Method 03 - 

In this method we will see how to provide schema by using StructType

val schema = new StructType()
.add(StructField("id", StringType, true))
.add(StructField("population", LongType, true))
)
va dataFrame = spark.createDataFrame(rowsRDD,schema)
dataFrame.show;

 Reference documentation

Comments

No comments yet. Why don’t you start the discussion?

Leave a Reply

Your email address will not be published. Required fields are marked *