Optimizing Spark Query
This is the last article from our Spark Optimization Series. Optimizing a spark query is challenging as well as interesting, as a Data Engineer, I love to optimize and look into small things that can be improved to increase the performance of the process whether its transformation or aggregation.
There are some common techniques that should be used when writing spark code.
Avoid Shuffling, its evil
Yes, you read it right, shuffling is the evil, it’s the most time-consuming part of any spark job yet we have to use as it’s the core of any data processing job, common functions that use shuffling are joins, group by and distinct. However, there are ways depending on the goal to avoid shuffle, for instance a job to remove duplicates using distinct can be replaced with the window partitioning. Bucketing can also be used to avoid shuffling, read more.
Join Smartly
Joining is required most of the time and its unavoidable, however there are ways to have a better join plan, a join between big and small table should use a broadcast join where the smaller one gets into each executor by broadcasting it. Second, sometimes a semi-join has a better execution plan because of the way it works, look more into semi join.
dataframe_1.join(dataframe_2,Seq(join_key(s)), join_type)
Cache it to save it
Since Spark gives us the capability to do in memory data processing, it is very easy to optimize your query based on the amount of times data is reused, if your code refers to a single dataframe more than one time then it means it needs caching, store in memory and get the result, no need to recalculate. If no cache space available, it might be better to persist the result in the secondary storage.
dataframe.cache()
dataframe.persist(DISK_ONLY)
Filter before you dive
Filtering the data using filter or where is also a very good way to optimize, instead of doing join then filtering, do a filter then a join so the shuffle happens on a subset of data. However, spark is intelligent, most of the times it has a well execution plan.
dataframe.where()
dataframe.filter()
Kick out unnecessary columns
Selecting only required columns increases the performance because it saves memory, imagine having 100 columns and selecting 10 for the output. First thing is to always filter out the necessary columns using a select statement.
dataframe.select()
Avoid the UDFs
Yes, the UDFs knows as user defined function are cruel, they were my favorite for quite a while but until I found out they are slow as compared to their underlying function. UDF gives us a cleaner code but with another layer on top of the function which slows down the process especially in Python as it has to serialize and move the data back and forth for the JVM as mentioned here, try to use the functions directly.
Read More: How UDFs affect Performance
Avoiding data skewness
If you notice that most of the stages are finished but few executors are still working at the end and the overall job becomes frustrating, then it is definitely a data skew problem. In order to solve skewness, a technique known as Salting is used to spread the data across the nodes evenly.
Recommended Reading: Solving Data Skewness in Spark