Home > English, java, work > Getting started with Akka Stream Kafka – Using Kafka the reactive streams way

Getting started with Akka Stream Kafka – Using Kafka the reactive streams way

A few days ago my eyes fell on a new release of Akka Stream Kafka. Since I’m doing a lot with Kafka currently and I really wanted to get my hands dirty with Akka this sounded very good. Also a good opportunity to see if an upgrade to Kafka 0.10.0.1 (from 0.8.2.2) is worth while (since older versions of Kafka are not supported in Akka Stream Kafka 0.11).

update 10-dec-16 @ 13:21Z : This article was originally written for version 0.11. The code samples are updated and verified for version 0.13

I created a project on github with all the code you need to run my examples.
I decided to create the project with good old Maven since we’re currently using that at my client. For Akka Stream Kafka there a two flavours: Java and Scala. I went for Scala, that means that when you have to choose between two imports in your IDE you have to pick the ones from the package akka.kafka.scaladsl.

Configuration (loading a property/json file) is handled by Typesafe Config, in my case an application.conf in the src/main/resources directory. I copied the config from the Consumer sample at akka-stream-conf. This configuration will serve as the base (as we will add some things later).

Now create a Main.scala with the following code in the main method :

    val config = ConfigFactory.load()
    implicit val system = ActorSystem.create("akka-stream-kafka-getting-started", config)
    implicit val mat = ActorMaterializer()

This will load the config and start the ActorSystem. The implicit val system is used when creating mat. The implicit mat is used at the runWith method (later in this article).

Creating a consumer

I decided to create a consumer since we can incorporate that in an upcoming project pretty soon.

    val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("jeroen-akka-stream-kafka-test")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    Consumer.committableSource(consumerSettings, Subscriptions.topics("YadaYadaTopic"))
      .map(msg => {
        println(msg)
      })
      .runWith(Sink.ignore)

The first part is configuration and after that the processing of the message. In our case we’re just printing the message. After processing the message is directed to the Sink.ignore. To read more about the map/runWith construction read the documentation about Akka Streams.

My Consumer uses a commitable source. That means that we have at-least-once delivery (when you don’t commit it will eventually be delivered again). When we finished processing we could commit after every message. But this means there are a lot of commit messages, an alternative solution is to use auto commit (with a time range)*.
To use auto commit add the following json to application.conf under akka.kafka.consumer:

  kafka-clients {
    enable.auto.commit = true
    auto.commit.interval.ms = 10000
  }

The commit interval is in ms, so every 10 seconds the data will be committed. Suppose your application crashes 6 seconds after a commit, you will receive the data since the last commit when you restart it (given that your retention time is long enough). A small price to pay for much better performance.

update 19-sept-16 @ 17:16Z
Patrik Nordwall pointed me to a better solution for commits: committing in batches. This is a more verbose solution, but also reliable (which auto-commit isn’t). So auto-commit is okay for proof-of-concepts, but go for batch commit when you’re getting serious.

Setting up and running Zookeeper and Kafka

To run the examples you need a running Zookeeper and Kafka server. I downloaded Kafka from Apache (which includes Zookeeper). You don’t need to change any configuration, just start both servers.

To start Zookeeper :

cd $KAFKA_HOME/bin
./zookeeper-server-start.sh  ../config/zookeeper.properties 

To start Kafka :

cd $KAFKA_HOME/bin
./kafka-server-start.sh ../config/server.properties

update 26-sept-16 @ 05:48Z When you’re using macOs Sierra with homebrew to install Kafka you have to update XCode first (thanks for the tip Corey O’Meara)

Publish some messages

To add some test data to Kafka run the following commands in a new terminal window :

cd $KAFKA_HOME/bin
./kafka-topics.sh --zookeeper localhost:2181 --create --topic YadaYadaTopic --partitions 3 --replication-factor 1
./kafka-console-producer.sh --broker-list localhost:9092 --topic YadaYadaTopic

When you type something and hit enter it’s published on Kakfa. Keep this terminal open and type some stuff when you’re ready to test.

When you publish a message to Kafka it will appear in the System.out logging. The nice thing is that the toString is pretty elaborate and gives you a lot of useful information.

Since you’re using Akka Streams you get back-pressure out of the box

Resetting offsets in Kafka

While playing around with stuff it’s nice if you can reset things and start over again. A really annoying thing of Kafka was resetting the offsets. Starting from version 0.10.0.1 there is a command called kafka-streams-application-reset.sh This will reset a specific groupId for a topic. It says Kafka Streams, but also works for other applications.
If you want to reset the offset for the YadaYadaTopic on groupId jeroen-akka-stream-kafka-test you just execute the command :

cd $KAFKA_HOME/bin
./kafka-streams-application-reset.sh --zookeeper localhost:2181 --application-id jeroen-akka-stream-kafka-test --input-topics YadaYadaTopic

It’s a real time saver!

It’s probably a wise thing to stop any running producers/consumers to prevent them from messing things up.

WakeupException

Currently there are some problems when quickly restarting your ActorSystem. It probably has something to do with cleaning up resources. The good news is that the creators are well aware of this problem and are working on a solution. For adding a ShutdownHook solves the problem :

    scala.sys.addShutdownHook {
      println("Terminating... - " + Instant.now)
      system.terminate()
      Await.result(system.whenTerminated, 30 seconds)
      println("Terminated... Bye - " + Instant.now)
    }

Some things that are nice to know

Since Akka Stream Kafka is under development there are some things you have to be aware of :

Conclusion

At Scala Days Berlin someone mentioned Gitter as a medium to reach out to to get help. I gave it a shot and got an answer within minutes! It’s great for short questions that are not worth a post on StackOverflow

The addition of the kafka-streams-application-reset alone is worth a client upgrade to me. A Kafka server update is mandatory to use Akka Stream Kafka, but to make a useful statement about whether an upgrade from 0.8.2.2 in production is worth while I need to do more research.

For people using Akka Streams it will be a seamless step to Akka Stream Kafka, for newcomers it’ll still be easy because of the clear api. The documentation is okay for now, it really helps if you take the time to read some things about Akka Streams first.

Akka Stream Kafka is a promising framework with an active community, so give it a shot!

Sources

Announcement of 0.11 and good introduction and background info
Akka Streams Kafka – A behind the scenes Interview
Graceful shutdown of Akka System
Michal Sitko @note on back-pressure
Eric Biggs @ebiggs on back-pressure
Clelio De Souza @cleliofs on details of back-pressure (also read the two subsequent messages
Konrad Malawski @ktoso on advanced back-pressure
Konrad Malawski @ktoso on renaming the project
Konrad Malawski @ktoso on versions supported
Akka Streams Docs explaining back-pressure
Kafka Stream offset reset to zero for consumer group – Stack Overflow

Github repo (might be renamed to akka-stream-kafka soon)
Source code for this article

Categories: English, java, work Tags: , ,
  1. 23 October 2016 at 19:38

    Thanks!

  2. 24 October 2016 at 11:16
  3. Christopher Roberts
    2 December 2016 at 00:27

    I needed the following code changes to compile this:
    val topics = Set((“YadaYadaTopic”))
    val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer, topics)
    .withBootstrapServers(“localhost:9092”)
    .withGroupId(“jeroen-akka-stream-kafka-test”)
    .withClientId(“1”)
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, “earliest”)

    Consumer.committableSource(consumerSettings)

    and this sbt file:
    name := “akkatest”
    scalaVersion := “2.11.0”
    libraryDependencies += “com.softwaremill.reactivekafka” % “reactive-kafka-core_2.11” % “0.8.8”
    libraryDependencies += “com.typesafe.akka” % “akka-actor_2.11” % “2.4.14”
    libraryDependencies += “com.typesafe.akka” % “akka-stream-kafka_2.11” % “0.11-M1”
    libraryDependencies += “org.scala-lang” % “scala-library” % “2.11.0”

    Dunno if I was depending on a slightly different version of some akka library which changed the api.

    • Jeroen van Wilgenburg
      2 December 2016 at 06:40

      The reactive-kafka-core dependency shouldn’t be necessary and you are using a milestone (M1) version of akka-stream-kafka. When I wrote the article I used 0.11 . Maybe that will explain the differences. Meanwhile version 0.13 is already available. I updated the article for that version and verified the code samples on github. Thanks for providing the code, that’ll make it easier to explain the differences.

  1. 22 September 2016 at 20:51

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: