Spark Streaming Backpressure – finding the optimal rate is now done automatically
One of my complaints about Spark was that it wasn’t possible to set a dynamic maximum rate. This is a problem in many jobs since the maximum throughput isn’t always linear with the output rate. Another issue is with local testing. You have to set the rate to extremely low values and experiment a lot to make a Spark job usable on a local machine.
But all these problems are in the past with the introduction of backpressure (I believe it’s spelled as back pressure, but I’ll stick to the Spark notation).
We have a couple of Spark jobs that connect to Kafka topics (but this article applies to everything that has a rate (RabbitMQ, file, ceph, elasticsearch, sockets, etc.)). It’s important that you don’t set the maximum rate (
spark.streaming.receiver.maxRate) too high because if there are too many batches queued your system will come to a grinding halt eventually. Setting the rate too low will waste cpu cycles.
Just run some tests and find out the maximum throughput sound like a good solution, but…
Maximum throughput isn’t linear with the output rate
This might sound a bit mathematical, so let me explain it with an example. Say we have an input stream of twitter messages and we only want Dutch tweets. Our Twitter data supplier doesn’t know the difference between German an Dutch so we get a lot of German input we can discard. Usually the proportion of discarded tweets will stay the same and won’t affect the maximum throughput. But then something happens with the railroads in the Netherlands and the whole country wants to tell everybody how bad our public transport is. The proportion of Dutch tweets is now a lot higher. When you’ve set the maximum rate to the max your Spark job probably won’t survive a railroad outage.
In our environments this means we perform some small tests to set a theoretical maximum rate. The
spark.streaming.receiver.maxRate is set to about 85-90% of this value so we can survive fluctuations. I called this the sustainable rate in a previous article.
On good days this leads to 5-10% waste of precious metal.
When testing locally you don’t want a huge truckload with data and a matchbox with data is way too little.
If only there was some setting to guess the maximum rate. After all Spark knows how many batches there are queued (the Active Batches in the Spark UI) and how much time there is between the batch duration and the actual average processing time.
On the Spark Jira there already was an [SPARK-6691] about this. I was so excited I forgot to vote for it. Luckily the Spark community is a very active one and they gave us backpressure (a fancy -reactive- term for dynamic maximum rate😉 )
The Spark issue also contains a nifty design document that goes into much detail : [SPARK-7398]
Okay, I think there are enough reasons why you want backpressure. Let’s add it to your project!
The only thing you have to do is set the property
spark.streaming.backpressure.enabled to true.
val conf = new SparkConf().setAppName("EsDraJob") conf.set("spark.streaming.backpressure.enabled","true")
spark.streaming.receiver.maxRate is still the maximum maximum, with backpressure the rate will never be higher than this maximum. It’s a good idea to set a maximum becasue the backpressure algorithm isn’t instant (which would be impossible). We ran into trouble with a job with Kafka input that could handle about 1000 events/sec when Kafka decided to give us 50.000 records/sec in the first few seconds. Set the maximum to about 150-200% of your maximum theoretical throughput and you should be safe.
I ran some tests with and without backpressure. Our example system can handle about 120 events/sec with a window size of 5 seconds. The input rate is changed every 100 seconds to mimic some random behaviour.
The blue line is with backpressure disabled. As you can see the scheduling delay of 3 minutes is way too long for a window size of 5 seconds and the line doesn’t respect the 120 events/sec limit. The red line is with backpressure enabled. As you can see there is almost no scheduling delay and the input rate won’t exceed 120.
How to make your rate drop below 100
In my test project I came across a little bump in the road, the rate wouldn’t go any lower than 100. After some RateEstimator.scala#L63 I found the property
spark.streaming.backpressure.pid.minRate and guess what? Yes, 100.
But 100 things/sec is very slow I hear you think. Yes, but think of large input zip files that contains hundreds of records and it isn’t so slow anymore (yes, we have a Spark job with those kind of files).
This feature isn’t documented (yet), but you can find more information about this and other properties in the Javadoc of the
The only question now is “why 100?”. I don’t know, so if you know the answer please let me know and I can share it with everybody.
The introduction of backpressure will save us a lot of time finding an (unreliable) maximum rate and safe some cpu cycles. There isn’t a lot of documentation about it, but it’s a simple concept, you just have to know it’s out there.