Hi,

I am using Akka Kafka Consumer.committablePartitionedSource  to stream 
messages from kafka and group them based on group key with groupedWithin . 
Grouped records should  be sink into database and then it should able to 
commit offset.

The code skeleton is as following:

val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
  .withBootstrapServers(kafkaServers)
  .withGroupId("testclientId")
  .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

val kafkaSource:Source[CommittableMessage[Array[Byte], Array[Byte]], Control]=

   Consumer.committablePartitionedSource(consumerSettings, 
Subscriptions.topics(topicName)) .flatMapMerge(maxPartitions, _._2)


val flow=Flow[In]

  .groupBy[K](maximumGroupSize, groupKeyFn)
  .map(e => groupKeyFn(e) -> map(e))
  .mergeSubstreams
  .groupedWithin(maximumGroupSize, FiniteDuration(10, TimeUnit.SECONDS))


kafkaSource.via(flow).to(sinkToDBSaveOffSet).run


The problem what we are facing, what if sinkToDB is successful, but SaveOffSet 
failed or what if system crashes before SaveOffSet is called, that means we 
will have duplicate records in the database. We don't want to do query 
operation before insert to find duplicate record.


Is there any way we can use any fault tolerance strategy to avoid such 
duplicate message or saveOffset can become one transaction?



Thanks & Regards,

Arun   

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to