Hey Stefan, I only see a commit in the failure case. Were you planning to use auto-commits otherwise? You'd probably want to handle all commits directly or you'd always be left guessing. But even if you did, I think the main problem is that your process could fail before a needed commit is sent to the broker. After it resumes, it wouldn't know that a commit had been pending and might reprocess some messages.
I think some Kafka devs have been thinking about exactly once semantics, but I don't think there's anything solid yet. -Jason On Thu, Jul 16, 2015 at 4:07 AM, Stefan Miklosovic <[email protected]> wrote: > Hi, > > In the old consumer, I have got just a simple stream of messages, one > by one and if I detected something was wrong, I would destroy my > consumer immediately without commit so once I restart consumer, I will > get the same messages once again because they will be delivered to me > from the last offset committed (if I understand that correctly). > > While this can work, I have at-least-once delivery guarantee and that > is not good in my case. I need to have exactly-once guarantee. > > While looking into new consumer, I noticed that there is the > possiblity to kind of "rewind" in a partition. > > My new algorithm is something like this: > > Partition myPartition; > > consumer.subscribe(myTopic); > > ConsumerRecords = consumer.poll(0); > > for (Record record: ConsumerRecords) { > > processRecord(record); > > processedMessages++; > > if (failure) { > int offsetOfLastProcessedRecord = record.offset(); > > // this will effectively rewind me back so I get messages > which are not processed yet > > consumer.seek(myPartition, offsetOfLastProcessedRecord - > processedMessages); > > // here i commit the position of the lastly processed record > so on the next poll > // i should get messages which were polled before but stayed > unprocessed because of the > // error > consumer.commit(map<parition, offsetOfLastProcessedRecord>, > CommitType.SYNC); > } > } > > Does this approach make sense? > > -- > Stefan Miklosovic >
