Optimizing Spark Job (spark-submit/shell)
This article is second from our series, optimizing the spark command, we usually use two types of spark commands, spark-submit
and spark-shell
, both of them take the same parameters and options, however the second is a REPL which is used to mainly do debugging. In this, we will see what parameters are important and how to set/calculate the values for better performance.
Let’s go!
Memory and Cores
The most important parameters in the command are memory and corers, both driver and executor have them and its very important to calculate them for best utilization.
Driver
Driver has two parameters driver memory and cores:
--driver-memory <value> --driver-cores <value>
The easiest for this is to use ~98%
of memory and cores. For example, using 64GB
memory and 32
virtual cores, to make the most of it we calculate like this:
--driver-memory 6200M --driver-cores 31
Executor
Executors have three parameters executor number, memory and cores:
--executor-memory <value> --executor-cores <value> --num-executors <value>
However, we can ignore the num-executors
as it can be calculated based on the other two values. For example, using 10 core nodes with 64GB
memory and 32
virtual cores, we have few scenarios:
Fat executor with 100%
utilization:
--executor-memory 6400M --executor-cores 32
To calculate the number of cores is divide both with the same number 1 in this case then multiply it by the core nodes count, 1 * 10 = 10
.
Thin executor with 100%
utilization:
--executor-memory 800M --executor-cores 4
To calculate the number of cores is divide both with the same number 8 in this case then multiply it by the core nodes count, 8 * 10 = 80
.
Unfamiliar with fat and thin executor words? Read Here!
For the sake of explanation, I used 100%
utilization so we can see how we divide it by a factor, the suggested utilization is ~98%
. Also, we did not consider memory overhead which should be subtracted from the 6400M/800M
. Explained the importance of memory overhead later in this article.
Configuration
Spark commands have a parameter --conf
, which has several configurations that need to be setup before submitting the job. These configurations can also be updated within the spark actual code (scala, python, etc). Here I will discuss the most important ones that usually cause issues.
Garbage Collection
--conf spark.executor.extraJavaOptions=<value>
Garbage collector configuration is done in extraJavaOptionsfor both driver and executor, the default is Parallel GC
. And the most common problem is that job becomes very slow as the garbage collection process takes longer to empty up the space due to several reasons like memory management when process keeps persisting the data making it difficult for garbage collector.
It is recommended to start with the G1 GC garbage collector -XX:+UseG1GC
which uses a new approach
Recommended Reading: Tuning Garbage Collection
Memory Overhead
--conf spark.executor.memoryOverhead=<value>
Memory overhead is also an important configuration, the total executor memory is defined by both –executor-memory and the memoryOverhead. By default, it is executorMemory * 0.10
, with minimum of 384
. Physical memory errors are usually because of low memory overhead. The value of memory overhead depends on the job, the more the parallel jobs the more overhead memory is needed, however, it is good to start with the default value.
Read Spark Docs!
Parallelism and Partition
--conf spark.default.parallelism=<value>
--conf spark.sql.shuffle.partitions=<value>
Setting up parallelism and partitions correctly helps in improving the job performance, they both are related to each other as the parallelism depends on the number of partitions and the value are set the same for both. The basic rule of thumb is the number of cores should be equal or multiple of the number of partitions.
Complete Spark Command Example
Using 10 core nodes with 64GB
memory and 32
virtual cores with `~98%` utilization and a balanced configuration between fat and thin type:
/usr/bin/spark-submit --class class_path --driver-memory 6200M --driver-cores 31 --executor-memory 2700M --executor-cores 15 --num-executors 100 --conf spark.driver.extraJavaOptions=-XX:+UseG1GC --conf spark.executor.extraJavaOptions=-XX:+UseG1GC --conf spark.sql.shuffle.partitions=300 --conf spark.executor.memoryOverhead=400M --conf spark.default.parallelism=300 jar_path
Explaining the command
The values set here are just for the sake of example, it does not mean to use this setting for every job.
Driver setting is pretty straight forward as mentioned earlier.
Number of executors is set to a random number
100
as its calculated through the memory and core. The actual number of executors will be2 * 10 = 20
as explained earlier (factor * number of core nodes).Each core node will have two executors with
2700M + 400M = 3100M
(memory + overhead) and15
VC (Total memory used by executor is3100M * 2 = 6200M
and similarly30
VC).Total virtual cores used are
15 * 2 * 10 = 300
VC (VC used by single executor * number of executors in single node * total number of core nodes).Partitions and Parallelism both are set to
300
because the VC used are300
(gives the maximum parallelism).Garbage collector
-XX:+UseG1GC
used as suggested above.
For more about the spark command? Read Spark Docs!