Hi, I am using Akka Kafka Consumer.committablePartitionedSource to stream messages from kafka and group them based on group key with groupedWithin . Grouped records should be sink into database and then it should able to commit offset.
The code skeleton is as following: val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer) .withBootstrapServers(kafkaServers) .withGroupId("testclientId") .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") val kafkaSource:Source[CommittableMessage[Array[Byte], Array[Byte]], Control]= Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics(topicName)) .flatMapMerge(maxPartitions, _._2) val flow=Flow[In] .groupBy[K](maximumGroupSize, groupKeyFn) .map(e => groupKeyFn(e) -> map(e)) .mergeSubstreams .groupedWithin(maximumGroupSize, FiniteDuration(10, TimeUnit.SECONDS)) kafkaSource.via(flow).to(sinkToDBSaveOffSet).run The problem what we are facing, what if sinkToDB is successful, but SaveOffSet failed or what if system crashes before SaveOffSet is called, that means we will have duplicate records in the database. We don't want to do query operation before insert to find duplicate record. Is there any way we can use any fault tolerance strategy to avoid such duplicate message or saveOffset can become one transaction? Thanks & Regards, Arun -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.