Hi,
I'm working through a production-level High Level Consumer app and have a
couple of error/shutdown questions to understand how the offset storage is
handled.
Test case - simulate an error writing to destination application, for
example a database, offset is 'lost'
Scenario
- write 500 messages for each topic/partition
- use the example High Level Consumer code I wrote for the Wiki
- Change the code so that every 10th read from the 'hasNext()'
ConsumerIterator breaks out of the loop and returns from the thread,
simulating a hard error. I write the offset to System.out to see what was
provided
- startup again and look to see what offset was first emitted for a
partition
Issue: Kafka treats the offset for the message read that caused me to break
out of the loop as processed (as expected), but I really failed. How do I
tell Kafka that I didn't really consume that offset?
Here is the example code in the 'business logic':
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
int counter = 0;
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> msg = it.next();
if (counter == 10) {
System.out.println("Stopping Thread " + m_threadNumber + ":
Partition: " + msg.partition() +
": Offset: " + msg.offset() + " :" + new
String(msg.message()));
break;
}
System.out.println("Thread " + m_threadNumber + ": Partition: "
+ msg.partition() +
": Offset: " + msg.offset() + " :" + new
String(msg.message()));
counter++;
}
System.out.println("Shutting down Thread: " + m_threadNumber);
}
I understand that handling 'hard' errors like JVM crashes, kill -9 etc. may
leave the offsets in ZooKeeper incorrect, but I'm trying to understand what
happens in a clean shutdown where Kafka and the Consumer are behaving
correctly but I can't process what I read.
This also feels like I'm blurring SimpleConsumer theory into this, but
except for the exception/shutdown case High Level Consumer does everything
I want.
Thanks,
Chris