Apologies for the late arrival to this thread. There was a bug in the
0.9.0.0 release of Kafka which could cause the consumer to stop fetching
from a partition after a rebalance. If you're seeing this, please checkout
the 0.9.0 branch of Kafka and see if you can reproduce this problem. If you
can, then it would be really helpful if you file a JIRA with the steps to
reproduce.

>From Han's initial example, it kind of looks like the problem might be in
the usage. The consumer lag as shown by the kafka-consumer-groups script
relies on the last committed position to determine lag. To update progress,
you need to commit offsets regularly. In the gist, offsets are only
committed on shutdown or when a rebalance occurs. When the group is stable,
no progress will be seen because there are no commits to update the
position.

Thanks,
Jason

On Mon, Jan 25, 2016 at 9:09 AM, Ismael Juma <ism...@juma.me.uk> wrote:

> Thanks!
>
> Ismael
>
> On Mon, Jan 25, 2016 at 4:03 PM, Han JU <ju.han.fe...@gmail.com> wrote:
>
> > Issue created: https://issues.apache.org/jira/browse/KAFKA-3146
> >
> > 2016-01-25 16:07 GMT+01:00 Han JU <ju.han.fe...@gmail.com>:
> >
> > > Hi Bruno,
> > >
> > > Can you tell me a little bit more about that? A seek() in the
> > > `onPartitionAssigned`?
> > >
> > > Thanks.
> > >
> > > 2016-01-25 10:51 GMT+01:00 Han JU <ju.han.fe...@gmail.com>:
> > >
> > >> Ok I'll create a JIRA issue on this.
> > >>
> > >> Thanks!
> > >>
> > >> 2016-01-23 21:47 GMT+01:00 Bruno Rassaerts <
> bruno.rassae...@novazone.be
> > >:
> > >>
> > >>> +1 here
> > >>>
> > >>> As a workaround we seek to the current offset which resets the
> current
> > >>> clients internal states and everything continues.
> > >>>
> > >>> Regards,
> > >>> Bruno Rassaerts | Freelance Java Developer
> > >>>
> > >>> Novazone, Edingsesteenweg 302, B-1755 Gooik, Belgium
> > >>> T: +32(0)54/26.02.03 - M:+32(0)477/39.01.15
> > >>> bruno.rassae...@novazone.be -www.novazone.be
> > >>>
> > >>> > On 23 Jan 2016, at 17:52, Ismael Juma <ism...@juma.me.uk> wrote:
> > >>> >
> > >>> > Hi,
> > >>> >
> > >>> > Can you please file an issue in JIRA so that we make sure this is
> > >>> > investigated?
> > >>> >
> > >>> > Ismael
> > >>> >
> > >>> >> On Fri, Jan 22, 2016 at 3:13 PM, Han JU <ju.han.fe...@gmail.com>
> > >>> wrote:
> > >>> >>
> > >>> >> Hi,
> > >>> >>
> > >>> >> I'm prototyping with the new consumer API of kafka 0.9 and I'm
> > >>> particularly
> > >>> >> interested in the `ConsumerRebalanceListener`.
> > >>> >>
> > >>> >> My test setup is like the following:
> > >>> >>  - 5M messages pre-loaded in one node kafka 0.9
> > >>> >>  - 12 partitions, auto offset commit set to false
> > >>> >>  - in `onPartitionsRevoked`, commit offset and flush the local
> state
> > >>> >>
> > >>> >> The test run is like the following:
> > >>> >>  - launch one process with 2 consumers and let it consume for a
> > while
> > >>> >>  - launch another process with 2 consumers, this triggers a
> > >>> rebalancing,
> > >>> >> and let these 2 processes run until messages are all consumed
> > >>> >>
> > >>> >> The code is here:
> > https://gist.github.com/darkjh/fe1e5a5387bf13b4d4dd
> > >>> >>
> > >>> >> So at first, the 2 consumers of the first process each got 6
> > >>> partitions.
> > >>> >> And after the rebalancing, each consumer got 3 partitions. It's
> > >>> confirmed
> > >>> >> by logging inside the `onPartitionAssigned` callback.
> > >>> >>
> > >>> >> But after the rebalancing, one of the 2 consumers of the first
> > >>> process stop
> > >>> >> receiving messages, even if it has partitions assigned to:
> > >>> >>
> > >>> >> balance-1 pulled 7237 msgs ...
> > >>> >> balance-0 pulled 7263 msgs ...
> > >>> >> 2016-01-22 15:50:37,533 [INFO] [pool-1-thread-2]
> > >>> >> o.a.k.c.c.i.AbstractCoordinator - Attempt to heart beat failed
> since
> > >>> the
> > >>> >> group is rebalancing, try to re-join group.
> > >>> >> balance-1 flush @ 536637
> > >>> >> balance-1 committed offset for List(balance-11, balance-10,
> > balance-9,
> > >>> >> balance-8, balance-7, balance-6)
> > >>> >> 2016-01-22 15:50:37,575 [INFO] [pool-1-thread-1]
> > >>> >> o.a.k.c.c.i.AbstractCoordinator - Attempt to heart beat failed
> since
> > >>> the
> > >>> >> group is rebalancing, try to re-join group.
> > >>> >> balance-0 flush @ 543845
> > >>> >> balance-0 committed offset for List(balance-5, balance-4,
> balance-3,
> > >>> >> balance-2, balance-1, balance-0)
> > >>> >> balance-0 got assigned List(balance-5, balance-4, balance-3)
> > >>> >> balance-1 got assigned List(balance-11, balance-10, balance-9)
> > >>> >> balance-1 pulled 3625 msgs ...
> > >>> >> balance-0 pulled 3621 msgs ...
> > >>> >> balance-0 pulled 3631 msgs ...
> > >>> >> balance-0 pulled 3631 msgs ...
> > >>> >> balance-1 pulled 0 msgs ...
> > >>> >> balance-0 pulled 3643 msgs ...
> > >>> >> balance-0 pulled 3643 msgs ...
> > >>> >> balance-1 pulled 0 msgs ...
> > >>> >> balance-0 pulled 3622 msgs ...
> > >>> >> balance-0 pulled 3632 msgs ...
> > >>> >> balance-1 pulled 0 msgs ...
> > >>> >> balance-0 pulled 3637 msgs ...
> > >>> >> balance-0 pulled 3641 msgs ...
> > >>> >> balance-0 pulled 3640 msgs ...
> > >>> >> balance-1 pulled 0 msgs ...
> > >>> >> balance-0 pulled 3632 msgs ...
> > >>> >> balance-0 pulled 3630 msgs ...
> > >>> >> balance-1 pulled 0 msgs ...
> > >>> >> ......
> > >>> >>
> > >>> >> `balance-0` and `balance-1` are the names of the consumer thread.
> So
> > >>> after
> > >>> >> the rebalancing, thread `balance-1` continues to poll but no
> message
> > >>> >> arrive, given that it has got 3 partitions assigned to after the
> > >>> >> rebalancing.
> > >>> >>
> > >>> >> Finally other 3 consumers pulls all their partitions' message, the
> > >>> >> situation is like
> > >>> >>
> > >>> >> GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG,
> OWNER
> > >>> >> balance-test, balance, 9, 417467, 417467, 0, consumer-2_/
> 127.0.0.1
> > >>> >> balance-test, balance, 10, 417467, 417467, 0, consumer-2_/
> 127.0.0.1
> > >>> >> balance-test, balance, 11, 417467, 417467, 0, consumer-2_/
> 127.0.0.1
> > >>> >> balance-test, balance, 6, 180269, 417467, 237198, consumer-2_/
> > >>> 127.0.0.1
> > >>> >> balance-test, balance, 7, 180036, 417468, 237432, consumer-2_/
> > >>> 127.0.0.1
> > >>> >> balance-test, balance, 8, 180197, 417467, 237270, consumer-2_/
> > >>> 127.0.0.1
> > >>> >> balance-test, balance, 3, 417467, 417467, 0, consumer-1_/
> 127.0.0.1
> > >>> >> balance-test, balance, 4, 417468, 417468, 0, consumer-1_/
> 127.0.0.1
> > >>> >> balance-test, balance, 5, 417468, 417468, 0, consumer-1_/
> 127.0.0.1
> > >>> >> balance-test, balance, 0, 417467, 417467, 0, consumer-1_/
> 127.0.0.1
> > >>> >> balance-test, balance, 1, 417467, 417467, 0, consumer-1_/
> 127.0.0.1
> > >>> >> balance-test, balance, 2, 417467, 417467, 0, consumer-1_/
> 127.0.0.1
> > >>> >>
> > >>> >> So you can see, partition [6, 7, 8] still has messages, but the
> > >>> consumer
> > >>> >> can't pull them after the rebalancing.
> > >>> >>
> > >>> >> I've tried 0.9.0.0 release, trunk and 0.9.0 branch, for both
> > >>> server/broker
> > >>> >> and client.
> > >>> >>
> > >>> >> I hope the code is clear enough to illustrate/reproduce the
> problem.
> > >>> It's
> > >>> >> quite a surprise for me because this is the main feature of the
> new
> > >>> >> consumer API, but it does not seem to work properly.
> > >>> >> Feel free to talk to me for any details.
> > >>> >> --
> > >>> >> *JU Han*
> > >>> >>
> > >>> >> Software Engineer @ Teads.tv
> > >>> >>
> > >>> >> +33 0619608888
> > >>> >>
> > >>>
> > >>
> > >>
> > >>
> > >> --
> > >> *JU Han*
> > >>
> > >> Software Engineer @ Teads.tv
> > >>
> > >> +33 0619608888
> > >>
> > >
> > >
> > >
> > > --
> > > *JU Han*
> > >
> > > Software Engineer @ Teads.tv
> > >
> > > +33 0619608888
> > >
> >
> >
> >
> > --
> > *JU Han*
> >
> > Software Engineer @ Teads.tv
> >
> > +33 0619608888
> >
>

Reply via email to