OK.

It sounds like you're requesting functionality that the high-level consumer
simply doesn't have. As I am sure you know, there is no API call that
supports "handing back a message".

I might be missing something, but if you need this kind of control, I think
you need to code your application differently. You could try creating a
ConsumerConnection per partition (your clients will then need to know the
number of partitions out there). That way commitOffsets() will actually
only apply to that partition. Auto-commit the same way. It might give you
the level of control you need.

Philip

On Tue, Jul 9, 2013 at 2:22 PM, Chris Curtin <curtin.ch...@gmail.com> wrote:

> Hi Philip,
>
> Correct, I don't want to explicitly control the offset committing. The
> ConsumerConnector handles that well enough except for when I want to
> shutdown and NOT have Kafka think I consumed that last message for a
> stream. This isn't the crash case, it is a case where the logic consuming
> the message detects and error and wants to cleanly exit until that issue
> can be resolved, but not lose the message it was trying to process when the
> problem is resolved.
>
> My understanding is that the commitOffsets() call is across all threads,
> not just for the stream my thread is reading from. So knowing it is okay to
> call this requires coordination across all my threads, which makes a High
> Level Consumer a lot harder to write correctly.
>
> Thinking about what I'd like to happen is: my code hands the message back
> to the KafkaStream (or whatever level knows about the consumed offsets) and
> says
> - set the next start offset for this topic/partition to this message in
> ZooKeeper
> - cleanly shutdown the stream from the broker(s)
> - don't force a rebalance on the consumer since something is wrong with
> processing of the data in the message, not the message.
> - If I try to use the stream again I should get an exception
> - I don't think I would want this to cause a complete shutdown of the
> ConsumerConnector, in case other threads are still processing. If all
> threads have the same issue they will all fail soon enough and do the same
> logic. But if only one thread fails, our Operations teams will need to
> resolve the issue then do a clean restart to recover.
>
> I think this logic would only happen when the down stream system was having
> issues since the iterator would be drained correctly when the 'shutdown'
> call to ConsumerConnector is made.
>
> Thanks,
>
> Chris
>
>
>
> On Tue, Jul 9, 2013 at 11:21 AM, Philip O'Toole <phi...@loggly.com> wrote:
>
> > It seems like you're not explicitly controlling the offsets. Is that
> > correct?
> >
> > If so, the moment you pull a message from the stream, the client
> framework
> > considers it processed. So if your app subsequently crashes before the
> > message is fully processed, and "auto-commit" updates the offsets in
> > Zookeeper, you will drop that message.
> >
> > The solution to this to call commitOffsets() explicitly.
> >
> > Philip
> >
> > On Tue, Jul 9, 2013 at 11:16 AM, Chris Curtin <curtin.ch...@gmail.com
> > >wrote:
> >
> > > Hi,
> > >
> > > I'm working through a production-level High Level Consumer app and
> have a
> > > couple of error/shutdown questions to understand how the offset storage
> > is
> > > handled.
> > >
> > > Test case - simulate an error writing to destination application, for
> > > example a database, offset is 'lost'
> > >
> > > Scenario
> > > - write 500 messages for each topic/partition
> > > - use the example High Level Consumer code I wrote for the Wiki
> > > - Change the code so that every 10th read from the 'hasNext()'
> > > ConsumerIterator breaks out of the loop and returns from the thread,
> > > simulating a hard error. I write the offset to System.out to see what
> was
> > > provided
> > > - startup again and look to see what offset was first emitted for a
> > > partition
> > >
> > > Issue: Kafka treats the offset for the message read that caused me to
> > break
> > > out of the loop as processed (as expected), but I really failed. How
> do I
> > > tell Kafka that I didn't really consume that offset?
> > >
> > > Here is the example code in the 'business logic':
> > >
> > > public void run() {
> > >         ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
> > >         int counter = 0;
> > >         while (it.hasNext())   {
> > >             MessageAndMetadata<byte[], byte[]> msg = it.next();
> > >             if (counter == 10) {
> > >                 System.out.println("Stopping Thread " + m_threadNumber
> +
> > ":
> > > Partition: " + msg.partition() +
> > >                         ": Offset: " + msg.offset() + " :" + new
> > > String(msg.message()));
> > >                 break;
> > >             }
> > >             System.out.println("Thread " + m_threadNumber + ":
> > Partition: "
> > > + msg.partition() +
> > >                     ": Offset: " + msg.offset() + " :" + new
> > > String(msg.message()));
> > >             counter++;
> > >         }
> > >
> > >         System.out.println("Shutting down Thread: " + m_threadNumber);
> > >     }
> > >
> > > I understand that handling 'hard' errors like JVM crashes, kill -9 etc.
> > may
> > > leave the offsets in ZooKeeper incorrect, but I'm trying to understand
> > what
> > > happens in a clean shutdown where Kafka and the Consumer are behaving
> > > correctly but I can't process what I read.
> > >
> > > This also feels like I'm blurring SimpleConsumer theory into this, but
> > > except for the exception/shutdown case High Level Consumer does
> > everything
> > > I want.
> > >
> > >
> > > Thanks,
> > >
> > > Chris
> > >
> >
>

Reply via email to