Enhancement submitted: https://issues.apache.org/jira/browse/KAFKA-966
On Tue, Jul 9, 2013 at 3:53 PM, Chris Curtin <curtin.ch...@gmail.com> wrote: > Thanks. I know I can write a SimpleConsumer to do this, but it feels like > the High Level consumer is _so_ close to being robust enough to handle > what I'd think people want to do in most applications. I'm going to submit > an enhancement request. > > I'm trying to understand the level of data loss in this situation, so I > looked deeper into the KafkaStream logic: it looks like a KafkaStream > includes a BlockingQueue for transferring the messages to my code from > Kafka. If I call shutdown() when I detect the problem, are the messages > already in the BlockingQueue considered 'read' by Kafka, or does the > shutdown peek into the Queue to see what is still there before updating > ZooKeeper? > > My concern is if that queue is not empty I'll be losing more than the one > message that led to the failure. > > I'm also curious how others are handling this situation. Do you assume the > message that is causing problems is lost or somehow know to go get it > later? I'd think others would have this problem too. > > Thanks, > > Chris > > > > On Tue, Jul 9, 2013 at 3:23 PM, Philip O'Toole <phi...@loggly.com> wrote: > >> OK. >> >> It sounds like you're requesting functionality that the high-level >> consumer >> simply doesn't have. As I am sure you know, there is no API call that >> supports "handing back a message". >> >> I might be missing something, but if you need this kind of control, I >> think >> you need to code your application differently. You could try creating a >> ConsumerConnection per partition (your clients will then need to know the >> number of partitions out there). That way commitOffsets() will actually >> only apply to that partition. Auto-commit the same way. It might give you >> the level of control you need. >> >> Philip >> >> On Tue, Jul 9, 2013 at 2:22 PM, Chris Curtin <curtin.ch...@gmail.com> >> wrote: >> >> > Hi Philip, >> > >> > Correct, I don't want to explicitly control the offset committing. The >> > ConsumerConnector handles that well enough except for when I want to >> > shutdown and NOT have Kafka think I consumed that last message for a >> > stream. This isn't the crash case, it is a case where the logic >> consuming >> > the message detects and error and wants to cleanly exit until that issue >> > can be resolved, but not lose the message it was trying to process when >> the >> > problem is resolved. >> > >> > My understanding is that the commitOffsets() call is across all threads, >> > not just for the stream my thread is reading from. So knowing it is >> okay to >> > call this requires coordination across all my threads, which makes a >> High >> > Level Consumer a lot harder to write correctly. >> > >> > Thinking about what I'd like to happen is: my code hands the message >> back >> > to the KafkaStream (or whatever level knows about the consumed offsets) >> and >> > says >> > - set the next start offset for this topic/partition to this message in >> > ZooKeeper >> > - cleanly shutdown the stream from the broker(s) >> > - don't force a rebalance on the consumer since something is wrong with >> > processing of the data in the message, not the message. >> > - If I try to use the stream again I should get an exception >> > - I don't think I would want this to cause a complete shutdown of the >> > ConsumerConnector, in case other threads are still processing. If all >> > threads have the same issue they will all fail soon enough and do the >> same >> > logic. But if only one thread fails, our Operations teams will need to >> > resolve the issue then do a clean restart to recover. >> > >> > I think this logic would only happen when the down stream system was >> having >> > issues since the iterator would be drained correctly when the 'shutdown' >> > call to ConsumerConnector is made. >> > >> > Thanks, >> > >> > Chris >> > >> > >> > >> > On Tue, Jul 9, 2013 at 11:21 AM, Philip O'Toole <phi...@loggly.com> >> wrote: >> > >> > > It seems like you're not explicitly controlling the offsets. Is that >> > > correct? >> > > >> > > If so, the moment you pull a message from the stream, the client >> > framework >> > > considers it processed. So if your app subsequently crashes before the >> > > message is fully processed, and "auto-commit" updates the offsets in >> > > Zookeeper, you will drop that message. >> > > >> > > The solution to this to call commitOffsets() explicitly. >> > > >> > > Philip >> > > >> > > On Tue, Jul 9, 2013 at 11:16 AM, Chris Curtin <curtin.ch...@gmail.com >> > > >wrote: >> > > >> > > > Hi, >> > > > >> > > > I'm working through a production-level High Level Consumer app and >> > have a >> > > > couple of error/shutdown questions to understand how the offset >> storage >> > > is >> > > > handled. >> > > > >> > > > Test case - simulate an error writing to destination application, >> for >> > > > example a database, offset is 'lost' >> > > > >> > > > Scenario >> > > > - write 500 messages for each topic/partition >> > > > - use the example High Level Consumer code I wrote for the Wiki >> > > > - Change the code so that every 10th read from the 'hasNext()' >> > > > ConsumerIterator breaks out of the loop and returns from the thread, >> > > > simulating a hard error. I write the offset to System.out to see >> what >> > was >> > > > provided >> > > > - startup again and look to see what offset was first emitted for a >> > > > partition >> > > > >> > > > Issue: Kafka treats the offset for the message read that caused me >> to >> > > break >> > > > out of the loop as processed (as expected), but I really failed. How >> > do I >> > > > tell Kafka that I didn't really consume that offset? >> > > > >> > > > Here is the example code in the 'business logic': >> > > > >> > > > public void run() { >> > > > ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); >> > > > int counter = 0; >> > > > while (it.hasNext()) { >> > > > MessageAndMetadata<byte[], byte[]> msg = it.next(); >> > > > if (counter == 10) { >> > > > System.out.println("Stopping Thread " + >> m_threadNumber >> > + >> > > ": >> > > > Partition: " + msg.partition() + >> > > > ": Offset: " + msg.offset() + " :" + new >> > > > String(msg.message())); >> > > > break; >> > > > } >> > > > System.out.println("Thread " + m_threadNumber + ": >> > > Partition: " >> > > > + msg.partition() + >> > > > ": Offset: " + msg.offset() + " :" + new >> > > > String(msg.message())); >> > > > counter++; >> > > > } >> > > > >> > > > System.out.println("Shutting down Thread: " + >> m_threadNumber); >> > > > } >> > > > >> > > > I understand that handling 'hard' errors like JVM crashes, kill -9 >> etc. >> > > may >> > > > leave the offsets in ZooKeeper incorrect, but I'm trying to >> understand >> > > what >> > > > happens in a clean shutdown where Kafka and the Consumer are >> behaving >> > > > correctly but I can't process what I read. >> > > > >> > > > This also feels like I'm blurring SimpleConsumer theory into this, >> but >> > > > except for the exception/shutdown case High Level Consumer does >> > > everything >> > > > I want. >> > > > >> > > > >> > > > Thanks, >> > > > >> > > > Chris >> > > > >> > > >> > >> > >