Hi, I'm working through a production-level High Level Consumer app and have a couple of error/shutdown questions to understand how the offset storage is handled.
Test case - simulate an error writing to destination application, for example a database, offset is 'lost' Scenario - write 500 messages for each topic/partition - use the example High Level Consumer code I wrote for the Wiki - Change the code so that every 10th read from the 'hasNext()' ConsumerIterator breaks out of the loop and returns from the thread, simulating a hard error. I write the offset to System.out to see what was provided - startup again and look to see what offset was first emitted for a partition Issue: Kafka treats the offset for the message read that caused me to break out of the loop as processed (as expected), but I really failed. How do I tell Kafka that I didn't really consume that offset? Here is the example code in the 'business logic': public void run() { ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); int counter = 0; while (it.hasNext()) { MessageAndMetadata<byte[], byte[]> msg = it.next(); if (counter == 10) { System.out.println("Stopping Thread " + m_threadNumber + ": Partition: " + msg.partition() + ": Offset: " + msg.offset() + " :" + new String(msg.message())); break; } System.out.println("Thread " + m_threadNumber + ": Partition: " + msg.partition() + ": Offset: " + msg.offset() + " :" + new String(msg.message())); counter++; } System.out.println("Shutting down Thread: " + m_threadNumber); } I understand that handling 'hard' errors like JVM crashes, kill -9 etc. may leave the offsets in ZooKeeper incorrect, but I'm trying to understand what happens in a clean shutdown where Kafka and the Consumer are behaving correctly but I can't process what I read. This also feels like I'm blurring SimpleConsumer theory into this, but except for the exception/shutdown case High Level Consumer does everything I want. Thanks, Chris