Jun, I read that FAQ entry you linked, but I am not seeing any Zookeeper 
connection loss in the logs. It's rebalancing multiple times per minute, 
though. Any idea what else could cause this? We're running kafka 0.7.2 on 
approx 400 consumers against a topic with 400 partitions * 3 brokers.   

--  
Ian Friedman


On Thursday, August 15, 2013 at 11:52 AM, Jun Rao wrote:

> Yes, during rebalances, messages could be re-delievered since the new owner
> of a partition starts fetching from the last checkpointed offset in ZK.
>  
> For reasons on why rebalances happen a lot, see
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whyaretheremanyrebalancesinmyconsumerlog%3F
>  
> Thanks,
>  
> Jun
>  
>  
> On Thu, Aug 15, 2013 at 8:36 AM, Ian Friedman <i...@flurry.com 
> (mailto:i...@flurry.com)> wrote:
>  
> > It's a simple enough patch, but wouldn't this mean that messages still in
> > process when a rebalance happens could get delivered to another consumer if
> > we end up losing the partition? Rebalances seem to happen very frequently
> > with a lot of consumers for some reason… And it doesn't seem like a
> > consumer is guaranteed or likely to retain ownership of a partition it's in
> > the middle of consuming after a rebalance.
> >  
> > --
> > Ian Friedman
> >  
> >  
> > On Thursday, August 15, 2013 at 10:53 AM, Jun Rao wrote:
> >  
> > > We are only patching blocker issues in 0.7. 0.8 beta1 has been released
> > and
> > > most dev effort will be on 0.8 and beyond. That said. This particular
> >  
> > case
> > > is easy to fix. If you can port the patch in
> > > https://issues.apache.org/jira/browse/KAFKA-919 o the 0.7 branch , we
> > >  
> >  
> > can
> > > commit that to the 0.7 branch.
> > >  
> > > Thanks,
> > >  
> > > Jun
> > >  
> > >  
> > > On Wed, Aug 14, 2013 at 9:30 PM, Ian Friedman <i...@flurry.com 
> > > (mailto:i...@flurry.com) (mailto:
> > i...@flurry.com (mailto:i...@flurry.com))> wrote:
> > >  
> > > > Ugh.
> > > >  
> > > > Is there any way to make this work in 0.7, or is transitioning to 0.8
> > the
> > > > only way? My operations engineers spent a lot of effort in configuring
> > >  
> >  
> > and
> > > > hardening our 0.7 production install, and 0.8 isn't released yet. Not
> > >  
> >  
> > to
> > > > mention having to integrate the new client side code.
> > > >  
> > > > Either way, thanks for all your help Jun.
> > > >  
> > > > --
> > > > Ian Friedman
> > > >  
> > > >  
> > > > On Thursday, August 15, 2013 at 12:21 AM, Jun Rao wrote:
> > > >  
> > > > > Yes, this is an issue and has been fixed in 0.8.
> > > > >  
> > > > > Thanks,
> > > > >  
> > > > > Jun
> > > > >  
> > > > >  
> > > > > On Wed, Aug 14, 2013 at 5:21 PM, Ian Friedman <i...@flurry.com 
> > > > > (mailto:i...@flurry.com)(mailto:
> > i...@flurry.com (mailto:i...@flurry.com)) (mailto:
> > > > i...@flurry.com (mailto:i...@flurry.com))> wrote:
> > > > >  
> > > > > > Hey guys,
> > > > > >  
> > > > > > I designed my consumer app (running on 0.7) to run with autocommit
> > off
> > > > and
> > > > > > commit manually once it was done processing a record. The intent
> > > > >  
> > > >  
> > > >  
> > >  
> >  
> > was so
> > > > > > that if a consumer died while processing a message, the offset
> > > > >  
> > > >  
> > >  
> >  
> > would
> > > > > >  
> > > > >  
> > > >  
> > > >  
> > > > not be
> > > > > > committed, and another box would pick up the partition and
> > > > >  
> > > >  
> > > >  
> > >  
> >  
> > reprocess
> > > > >  
> > > >  
> > > >  
> > > > the
> > > > > > message. This seemed to work fine with small numbers of consumers
> > > > >  
> > > >  
> > > >  
> > > > (~10).
> > > > > > But now that I'm scaling it out, I'm running into a problem where
> > > > >  
> > > >  
> > > >  
> > >  
> >  
> > it
> > > > >  
> > > >  
> > > >  
> > > > looks
> > > > > > like messages that consumers picked up and then errored on are not
> > > > >  
> > > >  
> > > >  
> > > > getting
> > > > > > processed on another machine.
> > > > > >  
> > > > > > After investigating the logs and the partition offsets in
> > zookeeper, I
> > > > > > found that in ZookeeperConsumerConnector.scala
> > > > >  
> > > >  
> > >  
> >  
> > closeFetchersForQueues,
> > > > > > called during the rebalance process, will commit the offset
> > > > >  
> > > >  
> > >  
> >  
> > regardless
> > > > > >  
> > > > >  
> > > >  
> > > >  
> > > > of
> > > > > > the autocommit status. So it looks like even if my consumer is in
> > > > >  
> > > >  
> > > >  
> > >  
> >  
> > the
> > > > > > middle of processing a message, the offset will be committed, and
> > > > >  
> > > >  
> > >  
> >  
> > even
> > > > > >  
> > > > >  
> > > >  
> > > >  
> > > > if
> > > > > > the processing fails, it will never be picked up again. Now that I
> > > > >  
> > > >  
> > > >  
> > > > have a
> > > > > > lot of consumer nodes, the rebalancer is going off a lot more often
> > > > >  
> > > >  
> > > >  
> > > > and I'm
> > > > > > running into this constantly.
> > > > > >  
> > > > > > Were my assumptions faulty? Did I design this wrong? After reading
> > the
> > > > > > comment in the code I understand that if it didn't commit the
> > > > >  
> > > >  
> > >  
> >  
> > offset
> > > > > >  
> > > > >  
> > > >  
> > > >  
> > > > there,
> > > > > > the message would just get immediately consumed by whoever ended up
> > > > >  
> > > >  
> > > >  
> > > > owning
> > > > > > the partition, even if we were in the middle of consuming it
> > > > >  
> > > >  
> > > >  
> > > > elsewhere, and
> > > > > > we'd get unintentional duplicate delivery. How can I make it work
> > > > >  
> > > >  
> > > >  
> > >  
> >  
> > the
> > > > >  
> > > >  
> > > >  
> > > > way
> > > > > > I've described? Is there any way?
> > > > > >  
> > > > > > Thanks in advance,
> > > > > >  
> > > > > > --
> > > > > > Ian Friedman
> > > > > >  
> > > > >  
> > > >  
> > > >  
> > >  
> >  
> >  
>  
>  
>  


Reply via email to