Ok Jun, thanks very much. I'm working on building that now and will come back with a patch once I have it running in our production environment.
-- Ian Friedman On Thursday, August 15, 2013 at 10:53 AM, Jun Rao wrote: > We are only patching blocker issues in 0.7. 0.8 beta1 has been released and > most dev effort will be on 0.8 and beyond. That said. This particular case > is easy to fix. If you can port the patch in > https://issues.apache.org/jira/browse/KAFKA-919 o the 0.7 branch , we can > commit that to the 0.7 branch. > > Thanks, > > Jun > > > On Wed, Aug 14, 2013 at 9:30 PM, Ian Friedman <i...@flurry.com > (mailto:i...@flurry.com)> wrote: > > > Ugh. > > > > Is there any way to make this work in 0.7, or is transitioning to 0.8 the > > only way? My operations engineers spent a lot of effort in configuring and > > hardening our 0.7 production install, and 0.8 isn't released yet. Not to > > mention having to integrate the new client side code. > > > > Either way, thanks for all your help Jun. > > > > -- > > Ian Friedman > > > > > > On Thursday, August 15, 2013 at 12:21 AM, Jun Rao wrote: > > > > > Yes, this is an issue and has been fixed in 0.8. > > > > > > Thanks, > > > > > > Jun > > > > > > > > > On Wed, Aug 14, 2013 at 5:21 PM, Ian Friedman <i...@flurry.com > > > (mailto:i...@flurry.com) (mailto: > > i...@flurry.com (mailto:i...@flurry.com))> wrote: > > > > > > > Hey guys, > > > > > > > > I designed my consumer app (running on 0.7) to run with autocommit off > > and > > > > commit manually once it was done processing a record. The intent was so > > > > that if a consumer died while processing a message, the offset would > > > > > > > > > > > not be > > > > committed, and another box would pick up the partition and reprocess > > > > > > > the > > > > message. This seemed to work fine with small numbers of consumers > > > > > > > (~10). > > > > But now that I'm scaling it out, I'm running into a problem where it > > > > > > > looks > > > > like messages that consumers picked up and then errored on are not > > > > > > > getting > > > > processed on another machine. > > > > > > > > After investigating the logs and the partition offsets in zookeeper, I > > > > found that in ZookeeperConsumerConnector.scala closeFetchersForQueues, > > > > called during the rebalance process, will commit the offset regardless > > > > > > > > > > > of > > > > the autocommit status. So it looks like even if my consumer is in the > > > > middle of processing a message, the offset will be committed, and even > > > > > > > > > > > if > > > > the processing fails, it will never be picked up again. Now that I > > > > > > > have a > > > > lot of consumer nodes, the rebalancer is going off a lot more often > > > > > > > and I'm > > > > running into this constantly. > > > > > > > > Were my assumptions faulty? Did I design this wrong? After reading the > > > > comment in the code I understand that if it didn't commit the offset > > > > > > > > > > > there, > > > > the message would just get immediately consumed by whoever ended up > > > > > > > owning > > > > the partition, even if we were in the middle of consuming it > > > > > > > elsewhere, and > > > > we'd get unintentional duplicate delivery. How can I make it work the > > > > > > > way > > > > I've described? Is there any way? > > > > > > > > Thanks in advance, > > > > > > > > -- > > > > Ian Friedman > > > > > > > > > > > > > >