Hello Kyle,

For your first question, the first option would be preferable: it may use
little bit more memory, and have more ZK writes. In 0.9 though, the offsets
will be stored in Kafka servers instead of ZK, so you will no longer
bombard ZK.

For the third question, our designed usage pattern for manual commits would
be:

message = iter.next();
process(message)
consumer.commit();

Thus if one crashes between process(message) and consumer.commit(), you do
incur duplicates; but you will not get any data loss in this case. If you
are more tolerable to data loss than duplicates, you can do:

message = iter.next();
consumer.commit();
process(message)


Guozhang


On Wed, Jun 18, 2014 at 9:43 PM, Jagbir <jsho...@hotmail.com> wrote:

> Hi Kyle,
>
> Thanks for the update.  Wondering if you found answer to your N-1 commit
> question? If auto commit happens only at iterator.next () and onky for the
> N -1 message then client code can be much simpler and reliable as you
> mentioned. I'm also looking forward to any post in this regard.
>
> Jagbir
>
> On June 18, 2014 3:17:25 PM PDT, Kyle Banker <kyleban...@gmail.com> wrote:
> >I think I've discovered the answer to my second question: according to
> >the
> >code in ZookeeperConsumerConnector.scala, a rebalance derives its
> >offsets
> >from what's already in Zookeeper. Therefore, uncommitted but consumed
> >messages from a given partition will be replayed when the partition is
> >reassigned.
> >
> >
> >On Fri, Jun 13, 2014 at 3:01 PM, Kyle Banker <kyleban...@gmail.com>
> >wrote:
> >
> >> I'm using Kafka 0.8.1.1.
> >>
> >> I have a simple goal: use the high-level consumer to consume a
> >message
> >> from Kafka, publish the message to a different system, and then
> >commit the
> >> message in Kafka. Based on my reading of the docs and the mailing
> >list, it
> >> seems like this isn't so easy to achieve. Here is my current
> >understanding:
> >>
> >> First, I have to disable auto-commit. If the consumer automatically
> >> commits, then I may lose messages if, for example, my process dies
> >after
> >> consuming but before publishing my message.
> >>
> >> Next, if my app is multi-threaded, I need to either
> >>
> >> a) use a separate consumer per thread (memory-intensive, hard on
> >> Zookeeper) or
> >> b) use a single consumer and assign a KafkaStream to each thread.
> >Then,
> >> when I want to commit, first synchronize all threads using a barrier.
> >>
> >> First question: is this correct so far?
> >>
> >>
> >> Still, it appears that rebalancing may be a problem. In particular,
> >this
> >> sequence of events:
> >>
> >> 1. I'm consuming from a stream tied to two partitions, A and B.
> >> 2. I consume a message, M, from partition A.
> >> 3. Partition A gets assigned to a different consumer.
> >> 4. I choose not to commit M or my process fails.
> >>
> >> Second question: When the partition is reassigned, will the message
> >that I
> >> consumed be automatically committed? If so, then there's no way to
> >get the
> >> reliability I want.
> >>
> >>
> >> Third question: How do the folks at LinkedIn handle this overall use
> >case?
> >> What about other users?
> >>
> >> It seems to me that a lot of the complexity here could be easily
> >addressed
> >> by changing the way in which a partition's message pointer is
> >advanced.
> >> That is, when I consume message M, advance the pointer to message (M
> >- 1)
> >> rather than to M. In other words, calling iterator.next() would imply
> >that
> >> the previously consumed message may be safely committed. If this were
> >the
> >> case, I could simply enable auto-commit and be happy.
> >>
>
> --
> Sent from my Android phone with K-9 Mail. Please excuse my brevity.




-- 
-- Guozhang

Reply via email to