Skip to content
Home » Blog » Scaling Data Ingestion with Akka Streams and Kafka

Scaling Data Ingestion with Akka Streams and Kafka

Iterable helps corporations use knowledge about their customers and occasions to ship extra related and efficient advertising and marketing campaigns. Iterable’s clients span many various industries and sizes, and collectively they ship Iterable tons of of hundreds of thousands of person updates and billions of occasions each day.

This article will describe some ways in which Iterable’s structure has improved over time to raised deal with numerous key situations, utilizing Kafka and Akka Streams.

The Problem

Over the previous few years, Iterable’s buyer base has been rising and so has the load on the info ingestion service. This usually resulted in processing delays, which considerably hindered buyer expertise.

Iterable ingests customers and occasions by way of its API and shops them in Elasticsearch, to allow fast filtering and aggregation. When the person knowledge is ingested, customer-defined actions might have to be triggered. For instance, a “signed up” occasion might set off a welcome email or an in-app message.

Iterable’s knowledge infrastructure crew outlined a number of necessities for the ingestion service, a few of which the unique structure didn’t absolutely meet:

  • When a buyer sends an occasion or an replace by way of Iterable’s API, it have to be processed as quickly as doable.
  • The service ought to have the ability to deal with tons of of hundreds of occasions per second with out backing up queues.
  • Events and updates for a similar person have to be processed so as.
  • The service ought to have the ability to react to downstream delays with out falling over.
  • Processing pace needs to be restricted by Elasticsearch indexing pace, not by different elements.

Original Architecture

The unique ingestion structure used RabbitMQ queues to enqueue these updates, then the ingestion service used Akka Actors to obtain messages from RabbitMQ and course of them. This structure had numerous ramifications:

  • It had each a bulk and a person endpoint on the API. For the majority endpoint, it could enqueue in batches, breaking right down to 100 messages per batch. For the person updates, it could enqueue the updates one at at time. This allowed the ingestion service to course of in batches just for the majority endpoint, and led to a considerably variable quantity of labor on the ingestion service.
  • The ingestion of occasions can set off different actions to occur. For instance, a buyer might desire a “signed up” occasion to set off a welcome email marketing campaign. To deal with that, the ingestion service wanted to publish to a different queue.
  • Under excessive load, RabbitMQ can have points with movement management. This means RabbitMQ blocks publishes to decelerate the speed at which messages are getting printed. This would occur each on the API (inflicting request timeouts) and inside the ingestion service when publishing to different queues.
  • Ingestion messages weren’t essentially processed so as, which was a problem. As is typical with RabbitMQ, there was a client prefetch rely, a lot of messages RabbitMQ will go away unacknowledged on the queue. So, if the prefetch rely was set to twenty, the ingestion service would initially get 20 messages to be processed in parallel. A brand new message can be delivered every time an impressive one was acknowledged. This meant processing may typically occur out of order.
  • When encountering recoverable errors, the ingestion service would reject the message, which re-enqueues it. This was even worse for ordering.

Ordering Issues

Here’s a easy instance for instance the ordering points. Imagine the ingestion service will get a request with the next knowledge fields:

then, shortly after, this request is available in:

It needs to be clear why these occasions shouldn’t be processed out of order. It’s comparatively uncommon for this to occur, however it may be a severe problem when it does.

Another related problem was that the outdated structure separated customers and occasions into totally different queues. So, if a buyer despatched an replace and then an occasion for a similar person to Iterable’s API, the occasion is likely to be processed earlier than the replace. Such an occasion would possibly set off an email marketing campaign, for instance, which might then find yourself making use of outdated person profile knowledge.

Throughput

The unique ingestion service despatched updates to Elasticsearch one document at a time, besides once they have been initially despatched in bulk. Elasticsearch batch updates present a lot greater throughput, however the current service was not absolutely benefiting from this function.

This naive batching technique resulted in a lot decrease throughput than was theoretically doable with batched updates. A serious aim of the brand new structure was to batch updates as effectively as doable.

New Architecture

In the brand new structure, Iterable’s crew determined to take benefits of some particular instruments and applied sciences:

  • Apache Kafka: A distributed pub-sub system for processing streams of knowledge. In Kafka, subjects are partitioned into ordered streams of knowledge, which are supposed to be processed so as. It additionally helps greater availability and throughput than may fairly be achieved on Iterable’s RabbitMQ infrastructure.
  • Akka Streams: Provides high-level APIs to compose, slice, and cube knowledge streams. Though there’s a little bit of a studying curve, understanding Akka Streams can vastly simplify stream processing. It additionally makes it simple to batch knowledge on dimension or time and propagates again strain to stop the overloading of downstream processing phases.
  • Alpakka Kafka connector: Provides instruments for interfacing with Kafka utilizing Akka Streams. For instance, the software makes it doable to request a message supply from Kafka, which abstracts the main points of requesting new messages and buffering them for processing.
  • Elasticsearch: A distributed search engine constructed on prime of Lucene. This was just about a given, as a result of Iterable already makes use of it to help quick looking out, filtering, and segmentation of buyer knowledge.

Implementation

Iterable’s new ingestion structure consists of some components:

  • The API unbatches any already-batched messages and publishes them as particular person messages. This permits the ingestion service to resolve the optimum batch dimension after consuming. When truly transferring the messages to the Kafka dealer, the Kafka consumer will batch them effectively.
  • The API publishes to Kafka (quite than RabbitMQ). There is a single matter for each customers and occasions, partitioned by person ID. Partitioning by person ID achieves parallelism throughout many partitions whereas nonetheless sustaining ordering for updates to the identical person. Merging person updates and occasions right into a single matter prevents problems with inconsistent ordering between person updates and occasions.
  • The ingestion service consumes the person updates and occasions from Kafka and implements a processing movement utilizing Akka Streams. Here it teams all of the incoming messages into batches which can be despatched as a bulk request to Elasticsearch.
  • Redis is used for deduplication. Messages could also be duplicated if the service must restart consuming from a partition after a failure, or if a message finally ends up being republished on the API aspect. This is predicted as a result of Kafka offers an “at least once” assure. To account for this, message IDs are saved in Redis so previously-seen messages might be discarded.

At a excessive degree, the brand new ingestion service is carried out with the next processing movement:

Breaking this down:

First, the service creates a supply utilizing the Alpakka connector. This helper, committablePartitionedSource, produces a supply of sources. The outer supply produces a Source containing CommittableMessages every time the patron is assigned a brand new partition to learn. That supply is accomplished when that client is unassigned from that partition. Consuming every Source reads messages from the partition.

Next, mapAsyncUnordered processes every partition supply. The max parallelism is the same as the utmost variety of partitions that this ingestion node will course of. Inside the operate it will definitely calls runWith(Sink.ignore) which produces a Done when the internal stream completes. That will turn out to be useful later.

Now there’s an internal processing pipeline for the precise messages.

First, mapAsync is used to run some validation logic. This checks to verify the group truly exists and there are not any apparent knowledge format points. If so, the error is reported. validateMessage truly returns an Either and the following stage retains solely the Right values.

This is asynchronous as a result of a cache or Postgres database entry could also be wanted, which entails I/O. Since these APIs are asynchronous and return Future cases, mapAsync is required. parallelism > 1 can be utilized right here since ordering doesn’t matter, in contrast to another components of the stream.

The subsequent stage batches all of the messages that arrive inside a given time, as much as a given batch dimension:

batchSize = 5000 and batchTimeout = 250ms appears to supply good ends in Iterable’s particular use case, however outcomes can fluctuate based mostly on the standard dimension of messages.

After that, the messages are deduplicated and listed to Elasticsearch:

This step checks the Redis duplicate tracker to see if the message exists. If it does, which means the message was already processed, maybe as a result of the stream died earlier than the offset may very well be dedicated. In that case the message is skipped.

This step additionally merges updates for a similar person inside that batch right into a single replace. This offers considerably higher efficiency as a result of Elasticsearch not must pressure a refresh to get the present state of the person for the following replace.

The subsequent stage marks the message as processed in Redis:

Finally, the outcomes move by way of a stage that triggers any subsequent actions that needs to be carried out.

This can ship the person by way of a workflow that defines a sequence of customer-defined actions. This is the stage that may set off the welcome email mentioned above.

The final stream stage within the internal stream commits the offset to Kafka.

This is just the commitScaladsl methodology name wrapped in a retry helper. The retry is important to deal with errors because of intermittent dealer outages. These outages are comparatively unusual and often solely final a couple of seconds in a typical case, so retrying for a couple of seconds is often sufficient.

Note that the above phases are achieved with .mapAsync(parallelism = 1), that means they course of a single batch at a time. This is vital for sustaining ordering.

The complete internal supply is then run utilizing Sink.ignore. This will produce a Done downstream on the outer supply when consuming from that partition finishes.

The the rest of the outer graph is then wrapped in a DrainingControl:

This takes each the Done emitted by Sink.ignore when achieved consuming and the Control from the Kafka producer and offers a way DrainingControl#drainAndShutdown() that each triggers a shutdown and returns a Future that might be accomplished when the graph is shut down. This might be triggered as a part of the appliance shutdown sequence to verify the stream shuts down cleanly.

Handling Errors

There are a couple of several types of errors which may happen:

  • Unrecoverable errors: These are errors with recognized trigger however no apparent restoration plan, for instance, customers who’re particularly banned from ingestion, or errors because of invalid knowledge that weren’t caught on the API. The ingestion service merely stories the error and continues quite than making an attempt to deal with it.
  • Recoverable errors: These are errors that may be retried. In the case of a 429 (Too Many Requests) or 409 (Conflict) from ElasticSearch, the failed updates might be retried with an exponential backoff. The backoff is beneficial in case ES is simply too busy to deal with the quantity of incoming load. Another instance of a recoverable error is a Kafka commit failure because of a dealer outage.
  • Unknown varieties of errors: For sudden exceptions the service merely permits the stream to cease, which is the default motion that Akka Streams takes when it receives an exception.

The Iterable app additionally has a customized materializer to catch these sudden exceptions:

This permits Iterable engineers to simply see what induced an error if the stream stops unexpectedly. When this occurs, the Iterable crew investigates the error and decides what to do.

Results

This new structure resulted in a major improve in ingestion processing pace and reliability, whereas reaching all the unique targets. In apply, the brand new service achieved round 10x efficiency enchancment with peak ingestion load, and important processing delays have been virtually nonexistent.

Kafka fits Iterable’s wants a lot better than RabbitMQ, because it was designed to help in-order processing throughout many partitions. Akka Streams additionally fits Iterable’s wants higher than (untyped) actors, as a result of it offers a type-safe manner of modeling the stream processing phases, and takes care of all of the complexity of effectively batching.

So far the Iterable crew has been very blissful with the funding in Akka Streams and are regularly utilizing it to enhance different elements of the infrastructure.

Leave a Reply

Your email address will not be published. Required fields are marked *