OK. It sounds like you're requesting functionality that the high-level consumer simply doesn't have. As I am sure you know, there is no API call that supports "handing back a message".
I might be missing something, but if you need this kind of control, I think you need to code your application differently. You could try creating a ConsumerConnection per partition (your clients will then need to know the number of partitions out there). That way commitOffsets() will actually only apply to that partition. Auto-commit the same way. It might give you the level of control you need. Philip On Tue, Jul 9, 2013 at 2:22 PM, Chris Curtin <curtin.ch...@gmail.com> wrote: > Hi Philip, > > Correct, I don't want to explicitly control the offset committing. The > ConsumerConnector handles that well enough except for when I want to > shutdown and NOT have Kafka think I consumed that last message for a > stream. This isn't the crash case, it is a case where the logic consuming > the message detects and error and wants to cleanly exit until that issue > can be resolved, but not lose the message it was trying to process when the > problem is resolved. > > My understanding is that the commitOffsets() call is across all threads, > not just for the stream my thread is reading from. So knowing it is okay to > call this requires coordination across all my threads, which makes a High > Level Consumer a lot harder to write correctly. > > Thinking about what I'd like to happen is: my code hands the message back > to the KafkaStream (or whatever level knows about the consumed offsets) and > says > - set the next start offset for this topic/partition to this message in > ZooKeeper > - cleanly shutdown the stream from the broker(s) > - don't force a rebalance on the consumer since something is wrong with > processing of the data in the message, not the message. > - If I try to use the stream again I should get an exception > - I don't think I would want this to cause a complete shutdown of the > ConsumerConnector, in case other threads are still processing. If all > threads have the same issue they will all fail soon enough and do the same > logic. But if only one thread fails, our Operations teams will need to > resolve the issue then do a clean restart to recover. > > I think this logic would only happen when the down stream system was having > issues since the iterator would be drained correctly when the 'shutdown' > call to ConsumerConnector is made. > > Thanks, > > Chris > > > > On Tue, Jul 9, 2013 at 11:21 AM, Philip O'Toole <phi...@loggly.com> wrote: > > > It seems like you're not explicitly controlling the offsets. Is that > > correct? > > > > If so, the moment you pull a message from the stream, the client > framework > > considers it processed. So if your app subsequently crashes before the > > message is fully processed, and "auto-commit" updates the offsets in > > Zookeeper, you will drop that message. > > > > The solution to this to call commitOffsets() explicitly. > > > > Philip > > > > On Tue, Jul 9, 2013 at 11:16 AM, Chris Curtin <curtin.ch...@gmail.com > > >wrote: > > > > > Hi, > > > > > > I'm working through a production-level High Level Consumer app and > have a > > > couple of error/shutdown questions to understand how the offset storage > > is > > > handled. > > > > > > Test case - simulate an error writing to destination application, for > > > example a database, offset is 'lost' > > > > > > Scenario > > > - write 500 messages for each topic/partition > > > - use the example High Level Consumer code I wrote for the Wiki > > > - Change the code so that every 10th read from the 'hasNext()' > > > ConsumerIterator breaks out of the loop and returns from the thread, > > > simulating a hard error. I write the offset to System.out to see what > was > > > provided > > > - startup again and look to see what offset was first emitted for a > > > partition > > > > > > Issue: Kafka treats the offset for the message read that caused me to > > break > > > out of the loop as processed (as expected), but I really failed. How > do I > > > tell Kafka that I didn't really consume that offset? > > > > > > Here is the example code in the 'business logic': > > > > > > public void run() { > > > ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); > > > int counter = 0; > > > while (it.hasNext()) { > > > MessageAndMetadata<byte[], byte[]> msg = it.next(); > > > if (counter == 10) { > > > System.out.println("Stopping Thread " + m_threadNumber > + > > ": > > > Partition: " + msg.partition() + > > > ": Offset: " + msg.offset() + " :" + new > > > String(msg.message())); > > > break; > > > } > > > System.out.println("Thread " + m_threadNumber + ": > > Partition: " > > > + msg.partition() + > > > ": Offset: " + msg.offset() + " :" + new > > > String(msg.message())); > > > counter++; > > > } > > > > > > System.out.println("Shutting down Thread: " + m_threadNumber); > > > } > > > > > > I understand that handling 'hard' errors like JVM crashes, kill -9 etc. > > may > > > leave the offsets in ZooKeeper incorrect, but I'm trying to understand > > what > > > happens in a clean shutdown where Kafka and the Consumer are behaving > > > correctly but I can't process what I read. > > > > > > This also feels like I'm blurring SimpleConsumer theory into this, but > > > except for the exception/shutdown case High Level Consumer does > > everything > > > I want. > > > > > > > > > Thanks, > > > > > > Chris > > > > > >