Hi,
In the old consumer, I have got just a simple stream of messages, one
by one and if I detected something was wrong, I would destroy my
consumer immediately without commit so once I restart consumer, I will
get the same messages once again because they will be delivered to me
from the last offset committed (if I understand that correctly).
While this can work, I have at-least-once delivery guarantee and that
is not good in my case. I need to have exactly-once guarantee.
While looking into new consumer, I noticed that there is the
possiblity to kind of "rewind" in a partition.
My new algorithm is something like this:
Partition myPartition;
consumer.subscribe(myTopic);
ConsumerRecords = consumer.poll(0);
for (Record record: ConsumerRecords) {
processRecord(record);
processedMessages++;
if (failure) {
int offsetOfLastProcessedRecord = record.offset();
// this will effectively rewind me back so I get messages
which are not processed yet
consumer.seek(myPartition, offsetOfLastProcessedRecord -
processedMessages);
// here i commit the position of the lastly processed record
so on the next poll
// i should get messages which were polled before but stayed
unprocessed because of the
// error
consumer.commit(map<parition, offsetOfLastProcessedRecord>,
CommitType.SYNC);
}
}
Does this approach make sense?
--
Stefan Miklosovic