Abstract
Spark Advanced Tuning course is designed for attendees of online course of Big Data University. This article discuss some common way to tuning an job of spark ML.
Setup
Clone spark-ml-pipeline project
clone git@github.com:big-data-university/spark-ml-pipeline.git
git checkout -b advanced
Tuning
1. Change parallelism (default partition number of RDD)
1.1 Default core number
val conf = new SparkConf().setAppName("CrossValidation Pipeline").setMaster("local[4]") //Using 2 core running on local
1.2 Default parallelism
|
|
1.3 Repartition RDD to change parallelism on the fly
|
|
2. Efficient Serialize
- Enable Kryoserilizer for efficient persistent/cache and memory usage
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- Enable library of kryo in build.sbt
libraryDependencies += "com.esotericsoftware" %% "kryo" % "4.0.0"
- RDD persistent/cache
RDD.persist(StorageLevel.MEMORY_ONLY_SER)
3. Reduce times of full GC
3.1 Enable GC verbose output for analysis
Add JVM parameters to enable verbose and GC detail -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseCompressedOops
/**
* Enable GC trace for executor
*/
conf.set("spark.executor.extraJavaOptions", "-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseCompressedOops")
3.2 Determine number of full GC during whole execution using standard output.
cat <your_log_file>|grep "Full GC"|wc -l
3.3 Evaluate size of RDD partition
- dfs.block.size - The default value in Hadoop 2.0 is 128MB. In the local mode the corresponding parameter is fs.local.block.size (Default value 32MB). It defines the default partition size.
numPartitions - The default value is 0 which effectively defaults to 1. This is a parameter you pass to the sc.textFile(inputPath,numPartitions) method. It is used to decrease the partition size (increase the number of partitions) from the default value defined by dfs.block.size
mapreduce.input.fileinputformat.split.minsize - The default value is 1 byte. It is used to increase the partition size (decrease the number of partitions) from the default value defined by dfs.block.size
println("fs.local.block.size: " + sc.getConf.get("fs.local.block.size"))
println("mapreduce.input.fileinputformat.split.minsize: " + sc.getConf.get("mapreduce.input.fileinputformat.split.minsize"))
println("size of trainRDD: " + SizeEstimator.estimate(train) + " Byte")
println("size of testRDD: " + SizeEstimator.estimate(test) + " Byte")
3.4 Adjust parallelism to reduce number of full GC
- Reduce parallelism when size of RDD < block size
- Increase parallelism when of RDD > block size
3.5 Increase java heap for both driver JVM and executor to reduce GC
conf.set("spark.driver.memory", "2G") // Increase java heap for driver, -Xmx is illegal setting
conf.set("spark.executor.memory", "1G") //Increase to 2G per executor from default value 1G. -Xmx1G is illegal setting for spark executor.
3.6 validate number of full GC and conclusion
Conclusion: It is significantly to reduce the execution time after reducing number of full GC.
4. Increase instance number of executor in the cluster env
Check the size and partition of RDD
When size of RDD >> block size and number partition >> executor * core, it is significant improve performance to increase instance number of executor allowed in the cluster.
/**
* Increase executor numbers to increase parallelism, only works for cluster env. standalone, yarn, mesos
*/
conf.set("spark.executor.instances", "4")
5. Reserve as much as possible memory for executor, but does make executor overload
conf.set("spark.executor.memory", "1G")