In this blog post we will learn how to convert RDD to DataFrame with spark helper methods used in local development or testing.
Converting RDD to Data Frame
First let us create an RDD from collections,
val temperatureRecords = Seq( (India,Array(27.0,26.0,40.1)), (China,Array(27.0,26.0,40.1)) ) val temperatureRDD = spark.sparkContext.parallize(temperatureRDD);
There are several ways to convert RDD to DataFrame
- By using createDataFrame(RDD obj) from SparkSession object
- By using createDataFrame(RDD obj) from SparkSession object and by specifying columns names
- By using createDataFrame(RDD obj, StructType type) by providing schema using StructType
Method 01 -
We will use createDataFrame(Rdd rdd) method to convert RDD into DataFrame
Creating RDD with collections and converting into DataFrame
val temperatureRecords = List(("Afghanistan","8,996,351"),("China","667,070.00"),("India","449,480.61")) val dfWithoutSchema = spark.createDataFrame(temperatureData) dfWithoutSchema.show;
Output
+-----------+----------+ | _1| _2| +-----------+----------+ |Afghanistan| 8,996,351| | China|667,070.00| | India|449,480.61| +-----------+----------+
Creating RDD with external source and converting into DataFrame
val temperatureData = spark.sparkContext.textFile("d:/spark-example/population.csv");
Output
+-----------+----------+ | _1| _2| +-----------+----------+ |Afghanistan| 8,996,351| | China|667,070.00| | India449| 480.61| +-----------+----------+
If you notice the above output the columns names are "_1" and "2" since we didn't specify the column names. Lets see how to define column names in the next method.
Method 02 -
toDF() provides a concise syntax for creating DataFrames by specifying column names and can be accessed after importing Spark implicits.
import sqlContext.implicits.
val dataFrameWithSchema = spark.createDataFrame(temperatureRecords).toDF("Country","Population in billions"); dataFrameWithSchema.show;
Output
+-----------+----------------------+ | Country|Population in billions| +-----------+----------------------+ |Afghanistan| 8,996,351| | China| 667,070.00| | India449| 480.61| +-----------+----------------------+
Method 03 -
In this method we will see how to provide schema by using StructType
val schema = new StructType() .add(StructField("id", StringType, true)) .add(StructField("population", LongType, true)) ) va dataFrame = spark.createDataFrame(rowsRDD,schema) dataFrame.show;