Understanding Spark parameters – A step by step guide to tune your Spark job
After using Spark for a few months we thought we had a pretty good grip on how to use it. The documentation of Spark appeared pretty decent and we had acceptable performance on most jobs. On one Job we kept hitting limits which were much lower than with that Jobs predecessor (Storm). When we did some research we found out we didn’t understand Spark as good as we thought.
My colleague Jethro pointed me to an article by Gerard Maas and I found another great article by Michael Noll. Combined with the Spark docs and some Googlin’ I wrote this article to help you tune your Spark Job. We improved our throughput by 600% (and then the elasticsearch cluster became the new bottle neck)
This article was written after the release of Spark 1.2.1
A problem with Spark is that there are a quite a few parameters and the documentation isn’t accurate enough (but still much better than many open source projects!). Another ‘problem’ is that our test cluster has quite impressive specifications, so it sometimes takes a long time for problems to surface.
We needed some direction and a starting point to steer us around the bumps.
Our problematic Job was getting records from a Kafka topic and saving them to elasticsearch eventually. This is just to illustrate our case, this article should be applicable to all Spark jobs with any input/output source.
Limit the Receiver and set the blockInterval/partition size
To start the tuning you have to establish some safe limits, from where you can slowly increase the pressure on the system.
The first system you have to configure is the Receiver. A Receiver should be viewed as a separate entity in Spark. There’s the Receiver and the processing part of Spark. The processing part processes blocks/partitions. The size of a partition can be controlled with the maximum output rate (yes, it’s default value is infinite, but that’s a bad idea with Kafka and probably other input sources) and the blockInterval (how often a block is emitted).
partitionSize = (1000 / blockInterval) * maxRate
You probably have a good gut feeling about how fast your job can process data. Take about half of that. Let’s say we’re pretty confident we can process about 1000 records/second. So we set the
spark.streaming.receiver.maxRate parameter to 500.
When this parameter is set we can set the
spark.streaming.blockInterval parameter. This parameter implicitly controls the partition size. When we set the parameter to 200ms it means that every 200ms the Receiver produces a block/partition. 200ms is 5 blocks per second. With a rate of 500 records/second a partition contains 100 records.
After some experimenting you will get a good feeling of what the right partition size is. The partition size is one of the first parameters I lock. This means that when you increase the maxRate the blockInterval should be lowered.
Number of cores
When you submit a Spark job you can provide a number of cores. With Yarn you have to provide the number of executors and cores. Multiply these two values and you have the total cores (
--num-executors 3 --executor-cores 2 if you want to use 6 cores for instance).
Spark standalone and mesos use the total cores parameter directly (
--total-executor-cores 6 for instance)
You have to subtract the number of Receivers to get the effective number of usable cores.
Spark advises a the number of partitions to be 2-3x times the number of (usable) cores. In the aforementioned case this will be 31×2 – 31×3 is 62-93 partitions. Let’s pick a number somewhere in between. 75 sounds nice. The number isn’t really important now. It just gives us a baseline from where you can start the fine tuning of the number of partitions.
Now that we know the amount of partitions and the blockInterval we can calculate the batch interval : blockInterval * #partitions. In our case this will be 200ms x 75 = 15000ms. The Spark documentation talks about a ‘conservative batch interval’ of 5-10 seconds. In my experience you can go a bit higher. But just run some tests because you might have a completely different use case. Our output processing has a relatively high latency, so that might explain the larger batch interval.
Verify your changes
Now we’ve changed enough parameters to start testing. I use the Spark application UI to do the tuning (usually located at http://localhost:4040 in standalone mode). The first thing you do is a quick visual inspection of the duration of you stages. This should be way lower than the batch interval. In the screenshot I have a batch interval size of 60 seconds so there clearly is something wrong here. After about 5 completed stages you get a good idea if you have to kill your job and fiddle with the parameters.
When most durations fit in your batch interval you can go to the ‘Streaming’ tab. My guideline is that the median processing time should be lower dan 85% of your batch interval and the 75th percentile processing time lower dan 95%. Why not 100%? You will always experience hickups and it’s pretty hard to recover from a big delay.
Test with some long running Jobs. Our cluster is so big that one Job showed problems after 30 hours. This also means you should have a large data set to test with. When you don’t have much data (yet) just generate it.
Make sure your pace really is sustainable. Check the ‘Streaming’ tab in the Spark application UI regularly. The total delay shouldn’t be larger than a few minutes and the median processing time should be way below your calculated batch interval. In my opinion 85% is a sustainable rate.
Don’t jump to conclusions when you think you understand everything. Just read articles about Spark slowly and thoroughly until you fully understand them. I finally understood it when I had some quiet time (without my laptop). This article is just a starting point, when you’re going to do serious things with Spark it’s a small investment to read the articles by Gerard Maas and Michael Noll.