Enhancement submitted: https://issues.apache.org/jira/browse/KAFKA-966



On Tue, Jul 9, 2013 at 3:53 PM, Chris Curtin <curtin.ch...@gmail.com> wrote:

> Thanks. I know I can write a SimpleConsumer to do this, but it feels like
> the High Level consumer is _so_ close to being robust enough to    handle
> what I'd think people want to do in most applications. I'm going to submit
> an enhancement request.
>
> I'm trying to understand the level of data loss in this situation, so I
> looked deeper into the KafkaStream logic: it looks like a KafkaStream
> includes a BlockingQueue for transferring the messages to my code from
> Kafka. If I call shutdown() when I detect the problem, are the messages
> already in the BlockingQueue considered 'read' by Kafka, or does the
> shutdown peek into the Queue to see what is still there before updating
> ZooKeeper?
>
> My concern is if that queue is not empty I'll be losing more than the one
> message that led to the failure.
>
> I'm also curious how others are handling this situation. Do you assume the
> message that is causing problems is lost or somehow know to go get it
> later? I'd think others would have this problem too.
>
> Thanks,
>
> Chris
>
>
>
> On Tue, Jul 9, 2013 at 3:23 PM, Philip O'Toole <phi...@loggly.com> wrote:
>
>> OK.
>>
>> It sounds like you're requesting functionality that the high-level
>> consumer
>> simply doesn't have. As I am sure you know, there is no API call that
>> supports "handing back a message".
>>
>> I might be missing something, but if you need this kind of control, I
>> think
>> you need to code your application differently. You could try creating a
>> ConsumerConnection per partition (your clients will then need to know the
>> number of partitions out there). That way commitOffsets() will actually
>> only apply to that partition. Auto-commit the same way. It might give you
>> the level of control you need.
>>
>> Philip
>>
>> On Tue, Jul 9, 2013 at 2:22 PM, Chris Curtin <curtin.ch...@gmail.com>
>> wrote:
>>
>> > Hi Philip,
>> >
>> > Correct, I don't want to explicitly control the offset committing. The
>> > ConsumerConnector handles that well enough except for when I want to
>> > shutdown and NOT have Kafka think I consumed that last message for a
>> > stream. This isn't the crash case, it is a case where the logic
>> consuming
>> > the message detects and error and wants to cleanly exit until that issue
>> > can be resolved, but not lose the message it was trying to process when
>> the
>> > problem is resolved.
>> >
>> > My understanding is that the commitOffsets() call is across all threads,
>> > not just for the stream my thread is reading from. So knowing it is
>> okay to
>> > call this requires coordination across all my threads, which makes a
>> High
>> > Level Consumer a lot harder to write correctly.
>> >
>> > Thinking about what I'd like to happen is: my code hands the message
>> back
>> > to the KafkaStream (or whatever level knows about the consumed offsets)
>> and
>> > says
>> > - set the next start offset for this topic/partition to this message in
>> > ZooKeeper
>> > - cleanly shutdown the stream from the broker(s)
>> > - don't force a rebalance on the consumer since something is wrong with
>> > processing of the data in the message, not the message.
>> > - If I try to use the stream again I should get an exception
>> > - I don't think I would want this to cause a complete shutdown of the
>> > ConsumerConnector, in case other threads are still processing. If all
>> > threads have the same issue they will all fail soon enough and do the
>> same
>> > logic. But if only one thread fails, our Operations teams will need to
>> > resolve the issue then do a clean restart to recover.
>> >
>> > I think this logic would only happen when the down stream system was
>> having
>> > issues since the iterator would be drained correctly when the 'shutdown'
>> > call to ConsumerConnector is made.
>> >
>> > Thanks,
>> >
>> > Chris
>> >
>> >
>> >
>> > On Tue, Jul 9, 2013 at 11:21 AM, Philip O'Toole <phi...@loggly.com>
>> wrote:
>> >
>> > > It seems like you're not explicitly controlling the offsets. Is that
>> > > correct?
>> > >
>> > > If so, the moment you pull a message from the stream, the client
>> > framework
>> > > considers it processed. So if your app subsequently crashes before the
>> > > message is fully processed, and "auto-commit" updates the offsets in
>> > > Zookeeper, you will drop that message.
>> > >
>> > > The solution to this to call commitOffsets() explicitly.
>> > >
>> > > Philip
>> > >
>> > > On Tue, Jul 9, 2013 at 11:16 AM, Chris Curtin <curtin.ch...@gmail.com
>> > > >wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > I'm working through a production-level High Level Consumer app and
>> > have a
>> > > > couple of error/shutdown questions to understand how the offset
>> storage
>> > > is
>> > > > handled.
>> > > >
>> > > > Test case - simulate an error writing to destination application,
>> for
>> > > > example a database, offset is 'lost'
>> > > >
>> > > > Scenario
>> > > > - write 500 messages for each topic/partition
>> > > > - use the example High Level Consumer code I wrote for the Wiki
>> > > > - Change the code so that every 10th read from the 'hasNext()'
>> > > > ConsumerIterator breaks out of the loop and returns from the thread,
>> > > > simulating a hard error. I write the offset to System.out to see
>> what
>> > was
>> > > > provided
>> > > > - startup again and look to see what offset was first emitted for a
>> > > > partition
>> > > >
>> > > > Issue: Kafka treats the offset for the message read that caused me
>> to
>> > > break
>> > > > out of the loop as processed (as expected), but I really failed. How
>> > do I
>> > > > tell Kafka that I didn't really consume that offset?
>> > > >
>> > > > Here is the example code in the 'business logic':
>> > > >
>> > > > public void run() {
>> > > >         ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
>> > > >         int counter = 0;
>> > > >         while (it.hasNext())   {
>> > > >             MessageAndMetadata<byte[], byte[]> msg = it.next();
>> > > >             if (counter == 10) {
>> > > >                 System.out.println("Stopping Thread " +
>> m_threadNumber
>> > +
>> > > ":
>> > > > Partition: " + msg.partition() +
>> > > >                         ": Offset: " + msg.offset() + " :" + new
>> > > > String(msg.message()));
>> > > >                 break;
>> > > >             }
>> > > >             System.out.println("Thread " + m_threadNumber + ":
>> > > Partition: "
>> > > > + msg.partition() +
>> > > >                     ": Offset: " + msg.offset() + " :" + new
>> > > > String(msg.message()));
>> > > >             counter++;
>> > > >         }
>> > > >
>> > > >         System.out.println("Shutting down Thread: " +
>> m_threadNumber);
>> > > >     }
>> > > >
>> > > > I understand that handling 'hard' errors like JVM crashes, kill -9
>> etc.
>> > > may
>> > > > leave the offsets in ZooKeeper incorrect, but I'm trying to
>> understand
>> > > what
>> > > > happens in a clean shutdown where Kafka and the Consumer are
>> behaving
>> > > > correctly but I can't process what I read.
>> > > >
>> > > > This also feels like I'm blurring SimpleConsumer theory into this,
>> but
>> > > > except for the exception/shutdown case High Level Consumer does
>> > > everything
>> > > > I want.
>> > > >
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Chris
>> > > >
>> > >
>> >
>>
>
>

Reply via email to