Hi,

I'm working through a production-level High Level Consumer app and have a
couple of error/shutdown questions to understand how the offset storage is
handled.

Test case - simulate an error writing to destination application, for
example a database, offset is 'lost'

Scenario
- write 500 messages for each topic/partition
- use the example High Level Consumer code I wrote for the Wiki
- Change the code so that every 10th read from the 'hasNext()'
ConsumerIterator breaks out of the loop and returns from the thread,
simulating a hard error. I write the offset to System.out to see what was
provided
- startup again and look to see what offset was first emitted for a
partition

Issue: Kafka treats the offset for the message read that caused me to break
out of the loop as processed (as expected), but I really failed. How do I
tell Kafka that I didn't really consume that offset?

Here is the example code in the 'business logic':

public void run() {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        int counter = 0;
        while (it.hasNext())   {
            MessageAndMetadata<byte[], byte[]> msg = it.next();
            if (counter == 10) {
                System.out.println("Stopping Thread " + m_threadNumber + ":
Partition: " + msg.partition() +
                        ": Offset: " + msg.offset() + " :" + new
String(msg.message()));
                break;
            }
            System.out.println("Thread " + m_threadNumber + ": Partition: "
+ msg.partition() +
                    ": Offset: " + msg.offset() + " :" + new
String(msg.message()));
            counter++;
        }

        System.out.println("Shutting down Thread: " + m_threadNumber);
    }

I understand that handling 'hard' errors like JVM crashes, kill -9 etc. may
leave the offsets in ZooKeeper incorrect, but I'm trying to understand what
happens in a clean shutdown where Kafka and the Consumer are behaving
correctly but I can't process what I read.

This also feels like I'm blurring SimpleConsumer theory into this, but
except for the exception/shutdown case High Level Consumer does everything
I want.


Thanks,

Chris

Reply via email to