Home > English, java, work > Spark Streaming Backpressure – finding the optimal rate is now done automatically

Spark Streaming Backpressure – finding the optimal rate is now done automatically

One of my complaints about Spark was that it wasn’t possible to set a dynamic maximum rate. This is a problem in many jobs since the maximum throughput isn’t always linear with the output rate. Another issue is with local testing. You have to set the rate to extremely low values and experiment a lot to make a Spark job usable on a local machine.
But all these problems are in the past with the introduction of backpressure (I believe it’s spelled as back pressure, but I’ll stick to the Spark notation).

We have a couple of Spark jobs that connect to Kafka topics (but this article applies to everything that has a rate (RabbitMQ, file, ceph, elasticsearch, sockets, etc.)). It’s important that you don’t set the maximum rate (spark.streaming.receiver.maxRate) too high because if there are too many batches queued your system will come to a grinding halt eventually. Setting the rate too low will waste cpu cycles.
Just run some tests and find out the maximum throughput sound like a good solution, but…

Maximum throughput isn’t linear with the output rate

This might sound a bit mathematical, so let me explain it with an example. Say we have an input stream of twitter messages and we only want Dutch tweets. Our Twitter data supplier doesn’t know the difference between German an Dutch so we get a lot of German input we can discard. Usually the proportion of discarded tweets will stay the same and won’t affect the maximum throughput. But then something happens with the railroads in the Netherlands and the whole country wants to tell everybody how bad our public transport is. The proportion of Dutch tweets is now a lot higher. When you’ve set the maximum rate to the max your Spark job probably won’t survive a railroad outage.

In our environments this means we perform some small tests to set a theoretical maximum rate. The spark.streaming.receiver.maxRate is set to about 85-90% of this value so we can survive fluctuations. I called this the sustainable rate in a previous article.

On good days this leads to 5-10% waste of precious metal.

Local testing

When testing locally you don’t want a huge truckload with data and a matchbox with data is way too little.
If only there was some setting to guess the maximum rate. After all Spark knows how many batches there are queued (the Active Batches in the Spark UI) and how much time there is between the batch duration and the actual average processing time.
On the Spark Jira there already was an [SPARK-6691] about this. I was so excited I forgot to vote for it. Luckily the Spark community is a very active one and they gave us backpressure (a fancy -reactive- term for dynamic maximum rate😉 )

The Spark issue also contains a nifty design document that goes into much detail : [SPARK-7398]

Gimme backpressure!

Okay, I think there are enough reasons why you want backpressure. Let’s add it to your project!

The only thing you have to do is set the property spark.streaming.backpressure.enabled to true.

val conf = new SparkConf().setAppName("EsDraJob")
conf.set("spark.streaming.backpressure.enabled","true")

Note that spark.streaming.receiver.maxRate is still the maximum maximum, with backpressure the rate will never be higher than this maximum. It’s a good idea to set a maximum becasue the backpressure algorithm isn’t instant (which would be impossible). We ran into trouble with a job with Kafka input that could handle about 1000 events/sec when Kafka decided to give us 50.000 records/sec in the first few seconds. Set the maximum to about 150-200% of your maximum theoretical throughput and you should be safe.

I ran some tests with and without backpressure. Our example system can handle about 120 events/sec with a window size of 5 seconds. The input rate is changed every 100 seconds to mimic some random behaviour.
The blue line is with backpressure disabled. As you can see the scheduling delay of 3 minutes is way too long for a window size of 5 seconds and the line doesn’t respect the 120 events/sec limit. The red line is with backpressure enabled. As you can see there is almost no scheduling delay and the input rate won’t exceed 120.

spark-delay-backpressure

Scheduling delay

spark-rate-backpressure

Input rate

How to make your rate drop below 100

In my test project I came across a little bump in the road, the rate wouldn’t go any lower than 100. After some RateEstimator.scala#L63 I found the property spark.streaming.backpressure.pid.minRate and guess what? Yes, 100.
But 100 things/sec is very slow I hear you think. Yes, but think of large input zip files that contains hundreds of records and it isn’t so slow anymore (yes, we have a Spark job with those kind of files).
This feature isn’t documented (yet), but you can find more information about this and other properties in the Javadoc of the PIDRateEstimator.scala
The only question now is “why 100?”. I don’t know, so if you know the answer please let me know and I can share it with everybody.

Conclusion

The introduction of backpressure will save us a lot of time finding an (unreliable) maximum rate and safe some cpu cycles. There isn’t a lot of documentation about it, but it’s a simple concept, you just have to know it’s out there.

Sources

Spark Streaming Configuration
RateEstimator.scala
[SPARK-7398]

Categories: English, java, work Tags: , ,
  1. 26 October 2015 at 15:33

    Hi Jeroen,

    One of the implementors of the back-pressure feature here,

    Just to make sure this is clear : the back-pressure feedback loop indeed does not depend on the number of output elements, it depends on how many elements you manage to clear out out Spark’s memory. So, if you filter for dutch tweets, it depends on the number of tweets you have treated ( evaluated as being Dutch or not ), not on the percentage of dutch tweets you end up finding.

    The minRate is in elements per second, it was introduced to alleviate a fear that the PID loop orchestrating backpressure would get in a deadlock. See the PR here:
    https://github.com/apache/spark/pull/8199

    all the best,

    FG

  2. Jeroen van Wilgenburg
    27 November 2015 at 07:15

    Sorry for the ultra-late reply, I saw your name in a presentation by Gerard Maas yesterday and suddenly remembered I didn’t reply🙂

    Thanks for the clarification! So when I say “won’t affect the maximum throughput” it’s better to say “have less impact on the load of the Spark cluster than a Dutch tweet”? The ultimate effect is the same, but it’s good to have the internals correct.

  1. No trackbacks yet.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: