Performance Tuning of Spark ML Job

Abstract

Spark Advanced Tuning course is designed for attendees of online course of Big Data University. This article discuss some common way to tuning an job of spark ML.

Setup

Clone spark-ml-pipeline project

clone git@github.com:big-data-university/spark-ml-pipeline.git
git checkout -b advanced

Tuning

1. Change parallelism (default partition number of RDD)

1.1 Default core number

val conf = new SparkConf().setAppName("CrossValidation Pipeline").setMaster("local[4]") //Using 2 core running on local

core number

1.2 Default parallelism
1
2
3
4
5
/*
*Increase parallelism, local default = number of core. e.g. 8 on mac
*/
conf.set("spark.default.parallelism", "4")
1.3 Repartition RDD to change parallelism on the fly
1
2
3
train.repartition(1)
train.repartition(10)

2. Efficient Serialize

  • Enable Kryoserilizer for efficient persistent/cache and memory usage

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

  • Enable library of kryo in build.sbt

libraryDependencies += "com.esotericsoftware" %% "kryo" % "4.0.0"

  • RDD persistent/cache

RDD.persist(StorageLevel.MEMORY_ONLY_SER)

3. Reduce times of full GC

3.1 Enable GC verbose output for analysis

Add JVM parameters to enable verbose and GC detail -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseCompressedOops

/**
 * Enable GC trace for executor
 */
conf.set("spark.executor.extraJavaOptions", "-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseCompressedOops")
3.2 Determine number of full GC during whole execution using standard output.
cat <your_log_file>|grep "Full GC"|wc -l
3.3 Evaluate size of RDD partition
  1. dfs.block.size - The default value in Hadoop 2.0 is 128MB. In the local mode the corresponding parameter is fs.local.block.size (Default value 32MB). It defines the default partition size.
  1. numPartitions - The default value is 0 which effectively defaults to 1. This is a parameter you pass to the sc.textFile(inputPath,numPartitions) method. It is used to decrease the partition size (increase the number of partitions) from the default value defined by dfs.block.size

  2. mapreduce.input.fileinputformat.split.minsize - The default value is 1 byte. It is used to increase the partition size (decrease the number of partitions) from the default value defined by dfs.block.size

println("fs.local.block.size: " + sc.getConf.get("fs.local.block.size"))
println("mapreduce.input.fileinputformat.split.minsize: " + sc.getConf.get("mapreduce.input.fileinputformat.split.minsize"))
println("size of trainRDD: " + SizeEstimator.estimate(train) + " Byte")
println("size of testRDD: " + SizeEstimator.estimate(test) + " Byte")
3.4 Adjust parallelism to reduce number of full GC
  1. Reduce parallelism when size of RDD < block size
  2. Increase parallelism when of RDD > block size
3.5 Increase java heap for both driver JVM and executor to reduce GC
conf.set("spark.driver.memory", "2G") // Increase java heap for driver, -Xmx is illegal setting

conf.set("spark.executor.memory", "1G") //Increase to 2G per executor from default value 1G. -Xmx1G is illegal setting for spark executor.
3.6 validate number of full GC and conclusion

Conclusion: It is significantly to reduce the execution time after reducing number of full GC.

4. Increase instance number of executor in the cluster env

Check the size and partition of RDD

When size of RDD >> block size and number partition >> executor * core, it is significant improve performance to increase instance number of executor allowed in the cluster.

/**
 * Increase executor numbers to increase parallelism, only works for cluster env. standalone, yarn, mesos
 */
conf.set("spark.executor.instances", "4")

5. Reserve as much as possible memory for executor, but does make executor overload

conf.set("spark.executor.memory", "1G") 

All in one