Spark UDFs Are Cruel!
Spark UDFs (User Defined Functions) are not the best thing a developer will use, they look so cool especially the syntax to write them is really cool, looks attractive and make the code cleaner but the problem with UDFs are related to performance especially a big impact if you are using Python because it is non JVM language. It isrecommended to avoid using the UDFs and try to use the built-in function.
In this article, I will try to explain the UDFs' problems through practical Scala examples.
Performance Impact: UDFs vs Built-In Function
I used a proprietary dataset having 10 plus billion rows and tried to add anew column through concatenation of string and a column in two different ways, one using a UDF and the other using built-in function. Let's see what happens;
Built-In Function:
The following is the use of built-in function concat which does the same thing as above. This takes around 40 seconds when I performed an action like writing the data afterwards. Check the concat function definition here.
data.withColumn('new_column',concat($'name',lit('junaid')))
UDF:
The following is the UDF where I appended a name to the name column. This takesaround 70 seconds when performed an action like writing the data afterwards.
def addString(str: String) = udf((value: String) => value + str)
data.withColumn('new_column',addString('Junaid')($'name'))
UDFs mess up Spark Execution Plan
This is another problem and the reason for the performance decrease, UDFs changes the execution plan which might no longer be optimized enough. Here I will use where clause with and without UDFs and will see the execution plan difference.
Built-In Function:
data.where($"name" === "Effendi").queryExecution.executedPlan
res29: org.apache.spark.sql.execution.SparkPlan =
*(1) Project [name#13]
+- *(1) Filter (isnotnull(name#13) && (name#13 = Effendi))
+- *(1) FileScan parquet [name#13] Batched: true, Format: Parquet, Location: InMemoryFileIndex[path_to_data..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,Effendi)], ReadSchema: struct
UDF:
val isEffendi = udf((pm:String) => pm == "Effendi")
data.where(isEffendi($"name ")).queryExecution.executedPlan
res28: org.apache.spark.sql.execution.SparkPlan =
*(1)Filter UDF(name#13)
+- *(1) FileScan parquet [name#13] Batched: true, Format: Parquet, Location: InMemoryFileIndex[path_to_data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct
So, we can see that the execution plan is different in both of the above scenarios (ignore the clutter). The UDF plan is hiding information and we are not sure what is it doing and how will Spark know. Spark does so much for us in terms of optimization, if we keep using UDFs that means we are not making the most of Spark. Avoid it!
UDFs can’t Handle Null Values
You might have seen this type of error:
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (string) => string)
It is very common, and the most common reason is because of the null values, UDFs cannot handle null values, so if you use the function on non-null values it will work fine but with them it will throw the above error. It is on the programmer to write a program that can handle such scenarios.
Try running the following to get the above error:
val df = Seq(
("junaid"),
("effendi"),
("arsalan"),
(null)
).toDF("name")
val size = udf((s:String) => s.size)
df.withColumn("new_column", size($"name"))
Conclusion is to always avoid using UDFs, if you cannot then write a function that can handle the issues like the null problem.