In this blog post ,I will explain how to handle Nulls in Apache Spark.
Introduction
It is a best practice we should always use nulls to represent missing or empty data in a DataFrame. The main reason we should handle is because Spark can optimize when working with null values more than it can if you use empty strings or other values.
The primary way of interacting with null values at DataFrame is to use the .na subpackage on a DataFrame.
All the blank values and empty strings are read into a DataFrame as null by the Spark CSV library
Let’s look at the following file as an example of how Spark considers blank and empty CSV fields as null values.
We will be using scala language to code.
name,company,salary Anand,Infosys,1500000 Kiran,TCS,2000000 Pawan,Cerner,2100000 "",IBM,700000 Girish,,7979
val employeeDF = spark.read.option("header","true").option("inferSchema","true").csv("d:/data-set/employee.dat") employeeDF.show()
+-------+-------+-------+ | name|company| salary| +-------+-------+-------+ | Anand|Infosys|1500000| | Kiran| TCS|2000000| | Pawan| Cerner|2100000| | null| IBM| 700000| | Girish| null| 7979| |Kishore| TCS| null| | null| null| null| +-------+-------+-------+
drop
The simplest function is drop, which removes rows that contains nulls. The default is to drop any row in which any value is null.
val result = employeeDF.na.drop() or val result = employeeDF.na.drop("any")
+-----+-------+-------+ | name|company| salary| +-----+-------+-------+ |Anand|Infosys|1500000| |Kiran| TCS|2000000| |Pawan| Cerner|2100000| +-----+-------+-------+
fill
Using fill() function, we can fill one ore more columns with a set of values. This can be done by specifying a map - that is a particular value and a set of columns
val result = employeeDF.na.fill("NULL IN SOURCE",Seq("name","company")) result.show
+--------------+--------------+-------+ | name| company| salary| +--------------+--------------+-------+ | Anand| Infosys|1500000| | Kiran| TCS|2000000| | Pawan| Cerner|2100000| |NULL IN SOURCE| IBM| 700000| | Girish|NULL IN SOURCE| 7979| | Kishore| TCS| null| |NULL IN SOURCE|NULL IN SOURCE| null| +--------------+--------------+-------+
replace
In addition to replacing null values, there are more flexible options that you can use with more than just null values.
val result = employeeDF.na.replace("company",Map("TCS" -> "Tata Consultancy Service")) result.show()
+-------+--------------------+-------+ | name| company| salary| +-------+--------------------+-------+ | Anand| Infosys|1500000| | Kiran|Tata Consultancy ...|2000000| | Pawan| Cerner|2100000| | null| IBM| 700000| | Girish| null| 7979| |Kishore|Tata Consultancy ...| null| | null| null| null| +-------+--------------------+-------+
nullable columns
package com.npntraining.spark_sql import org.apache.spark.sql.SparkSession import org.apache.log4j.Level import org.apache.log4j.Logger import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.LongType object DealingNullValues_01 { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.ERROR) val sparkSession = SparkSession.builder().appName("Dealing Null Values").master("local").getOrCreate(); val customSchema = new StructType(). add("name", StringType, true). add("company_name", StringType, false). add("salary", LongType, true) val employeeDF = sparkSession.read.option("header","true").schema(customSchema).csv("d:/data-set/employee.dat") employeeDF.show() } }
Also check other post to know What is the difference between cache vs persist methods in Apache Spark