Hi Sachin,

The CommitFailedException are thrown because the group is rebalancing. You
can see log messages like below happening before the commit failed
exception:

Attempt to heartbeat failed for group new-part-advice since it is
rebalancing.

It isn't clear from the logs why the rebalancing is happening. The broker
logs would have been helpful. Also, timestamps on the kafka streams logs
would also be useful.

w.r.t to continuing after a CommitFailedException during
onPartitionsRevoked - will have to think through the scenarios.

On Fri, 10 Feb 2017 at 04:35 Sachin Mittal <sjmit...@gmail.com> wrote:

> Hi,
> I could manage the streams client log, the server logs were deleted since
> time had elapsed and it got rolled over.
> See if you can figure out something from these. These are not best of logs
> generated.
>
>
> https://dl.dropboxusercontent.com/u/46450177/TestKafkaAdvice.StreamThread-1.log
> The above log pay attention to StreamThread-1 which fails due to
> CommitFailedException
> This mostly seem to be caused by OffsetCommitResponseHandler due to unknown
> member id.
>
>
> https://dl.dropboxusercontent.com/u/46450177/TestKafkaAdvice.StreamThread-4.log
> The above log pay attention to StreamThread-4 which fails due to
> CommitFailedException during sendOffsetCommitRequest.
> This mostly seem to be caused by Attempt to heartbeat failed for group
> new-part-advice since member id is not valid.
>
> Also please let us know why we shutdown the thread if we get
> CommitFailedException, ideally we catch this exception in
> onPartitionsRevokedand
> then thread should continue processing the new partitions assigned.
>
> Please let me know if you need any more information from me.
>
> Thanks
> Sachin
>
>
> On Thu, Feb 9, 2017 at 2:14 PM, Damian Guy <damian....@gmail.com> wrote:
>
> > Might be easiest to just send all the logs if possible.
> >
> > On Thu, 9 Feb 2017 at 08:10 Sachin Mittal <sjmit...@gmail.com> wrote:
> >
> > > I would try to get the logs soon.
> > > One quick question, I have three brokers which run in cluster with
> > default
> > > logging.
> > >
> > > Which log4j logs would be of interest at broker side and which broker
> or
> > do
> > > I need to send logs from all three.
> > >
> > > My topic is partitioned and replicated on all three so kafka-logs dir
> > > contains same topic logs.
> > >
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > On Thu, Feb 9, 2017 at 1:32 PM, Damian Guy <damian....@gmail.com>
> wrote:
> > >
> > > > Sachin,
> > > >
> > > > Can you provide the full logs from the broker and the streams app? It
> > is
> > > > hard to understand what is going on with only snippets of
> information.
> > It
> > > > seems like the rebalance is taking too long, but i can't tell from
> > this.
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > > > On Thu, 9 Feb 2017 at 07:53 Sachin Mittal <sjmit...@gmail.com>
> wrote:
> > > >
> > > > > Hi,
> > > > > In continuation of the CommitFailedException what we observe is
> that
> > > when
> > > > > this happens first time
> > > > >
> > > > > ConsumerCoordinator invokes onPartitionsRevoked on StreamThread.
> > > > > This calls suspendTasksAndState() which again tries to commit
> offset
> > > and
> > > > > then again the same exception is thrown.
> > > > > This gets handled at ConsumerCoordinator and it logs it and then re
> > > > assigns
> > > > > new partition.
> > > > >
> > > > > But stream thread before rethrowing the exception calls
> > > > > rebalanceException = t;
> > > > >
> > > > > https://github.com/apache/kafka/blob/0.10.2/streams/src/
> > > > main/java/org/apache/kafka/streams/processor/internals/
> > > > StreamThread.java#L261
> > > > >
> > > > > So now when runLoop executes it gets this exception and stream
> thread
> > > > > exits.
> > > > >
> > > > > Here are the logs
> > > > >
> > > > >  -----   Exception - happend
> > > > >
> > > > > Unsubscribed all topics or patterns and assigned partitions
> > > > > stream-thread [StreamThread-1] Updating suspended tasks to contain
> > > active
> > > > > tasks [[0_32, 0_3, 0_20, 0_8]]
> > > > > stream-thread [StreamThread-1] Removing all active tasks [[0_32,
> 0_3,
> > > > 0_20,
> > > > > 0_8]]
> > > > > stream-thread [StreamThread-1] Removing all standby tasks [[]]
> > > > > User provided listener
> > > > > org.apache.kafka.streams.processor.internals.StreamThread$1 for
> > group
> > > > > new-part-advice failed on partition revocation
> > > > >  -----   Revocation failed and exception handled
> > > > >
> > > > > (Re-)joining group new-part-advice
> > > > > stream-thread [StreamThread-1] found [advice-stream] topics
> possibly
> > > > > matching regex
> > > > > stream-thread [StreamThread-1] updating builder with
> > > > > SubscriptionUpdates{updatedTopicSubscriptions=[advice-stream]}
> > topic(s)
> > > > > with possible matching regex subscription(s)
> > > > > Sending JoinGroup ((type: JoinGroupRequest,
> groupId=new-part-advice,
> > > > > sessionTimeout=10000, rebalanceTimeout=300000, memberId=,
> > > > > protocolType=consumer,
> > > > >
> > > > > groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$
> > > > ProtocolMetadata@300c8a1f
> > > > > ))
> > > > > to coordinator 192.168.73.198:9092 (id: 2147483643
> <(214)%20748-3643>
> > <(214)%20748-3643>
> > > <(214)%20748-3643>
> > > > > rack: null)
> > > > >
> > > > > stream-thread [StreamThread-1] New partitions [[advice-stream-8,
> > > > > advice-stream-32, advice-stream-3, advice-stream-20]] assigned at
> the
> > > end
> > > > > of consumer rebalance.
> > > > >
> > > > >  -----   Same new partitions assigned on reblance
> > > > >
> > > > > stream-thread [StreamThread-1] recycling old task 0_32
> > > > > stream-thread [StreamThread-1] recycling old task 0_3
> > > > > stream-thread [StreamThread-1] recycling old task 0_20
> > > > > stream-thread [StreamThread-1] recycling old task 0_8
> > > > >
> > > > >  -----   It then shuts down
> > > > >
> > > > > stream-thread [StreamThread-1] Shutting down
> > > > > stream-thread [StreamThread-1] shutdownTasksAndState: shutting down
> > all
> > > > > active tasks [[0_32, 0_3, 0_20, 0_8]] and standby tasks [[]]
> > > > >
> > > > > Uncaught exception at : Tue Feb 07 21:43:15 IST 2017
> > > > > org.apache.kafka.streams.errors.StreamsException: stream-thread
> > > > > [StreamThread-1] Failed to rebalance
> > > > >     at
> > > > >
> > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > > StreamThread.java:612)
> > > > > ~[kafka-streams-0.10.2.0.jar:na]
> > > > >     at
> > > > >
> > > > > org.apache.kafka.streams.processor.internals.
> > > > StreamThread.run(StreamThread.java:368)
> > > > > ~[kafka-streams-0.10.2.0.jar:na]
> > > > > Caused by: org.apache.kafka.streams.errors.StreamsException:
> > > > stream-thread
> > > > > [StreamThread-1] failed to suspend stream tasks
> > > > >
> > > > >
> > > > > So is there a way we can restart that stream thread again. Also in
> > this
> > > > > case should we assign it rebalanceException, because this is
> > > > > CommitFailedException and new partitions are already assigned and
> > same
> > > > > thread can continue processing new partitions.
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > > >
> > > > > On Wed, Feb 8, 2017 at 2:19 PM, Sachin Mittal <sjmit...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi All,
> > > > > > I am trying out the 0.10.2.0 rc.
> > > > > > We have a source stream of 40 partitions.
> > > > > >
> > > > > > We start one instance with 4 threads.
> > > > > > After that we start second instance with same config on a
> different
> > > > > > machine and then same way third instance.
> > > > > >
> > > > > > After application reaches steady state we start getting
> > > > > > CommitFailedException.
> > > > > > Firstly I feel is that message for CommitFailedException should
> > > change.
> > > > > It
> > > > > > is thrown from number of places and sometimes is just not related
> > to
> > > > > > subsequent calls to poll() was longer than the configured
> > > > > > max.poll.interval.ms.
> > > > > >
> > > > > > We have verified this and this is never the case on our case when
> > we
> > > > get
> > > > > > the exception. So perhaps introduce another Exception class, but
> > this
> > > > > > message is very misleading and we end up investigation wrong
> areas.
> > > > > >
> > > > > > Anyway on the places where this exception gets thrown in our
> cases
> > > > > > 1. Exception trace is this
> > > > > > stream-thread [StreamThread-4] Failed while executing StreamTask
> > 0_5
> > > > due
> > > > > > to commit consumer offsets:
> > > > > > org.apache.kafka.clients.consumer.CommitFailedException: Commit
> > > > cannot be
> > > > > > completed since the group has already rebalanced and assigned the
> > > > > > partitions to another member. This means that the time between
> > > > subsequent
> > > > > > calls to poll() was longer than the configured
> > max.poll.interval.ms,
> > > > > > which typically implies that the poll loop is spending too much
> > time
> > > > > > message processing. You can address this either by increasing the
> > > > session
> > > > > > timeout or by reducing the maximum size of batches returned in
> > poll()
> > > > > with
> > > > > > max.poll.records.
> > > > > >     at org.apache.kafka.clients.consumer.internals.
> > > > ConsumerCoordinator.
> > > > > > sendOffsetCommitRequest(ConsumerCoordinator.java:698)
> > > > > > [kafka-clients-0.10.2.0.jar:na]
> > > > > >     at org.apache.kafka.clients.consumer.internals.
> > > > ConsumerCoordinator.
> > > > > > commitOffsetsSync(ConsumerCoordinator.java:577)
> > > > > > [kafka-clients-0.10.2.0.jar:na]
> > > > > >     at org.apache.kafka.clients.consumer.KafkaConsumer.
> > > > > > commitSync(KafkaConsumer.java:1125) [kafka-clients-0.10.2.0.jar:
> > na]
> > > > > >
> > > > > > in the code that line is
> > > > > > // if the generation is null, we are not part of an active group
> > (and
> > > > we
> > > > > > expect to be).
> > > > > > // the only thing we can do is fail the commit and let the user
> > > rejoin
> > > > > > the group in poll()
> > > > > > if (generation == null)
> > > > > > return RequestFuture.failure(new CommitFailedException());
> > > > > >
> > > > > > After we investigate what caused the generation to be null we
> found
> > > > that
> > > > > > sometime before that following was thrown
> > > > > >
> > > > > > Attempt to heartbeat failed for group new-part-advice since
> member
> > id
> > > > is
> > > > > > not valid.
> > > > > >
> > > > > > We check the code and we see this
> > > > > > if (error == Errors.UNKNOWN_MEMBER_ID) {
> > > > > > log.debug("Attempt to heartbeat failed for group {} since member
> id
> > > is
> > > > > > not valid.", groupId);
> > > > > > resetGeneration();
> > > > > > future.raise(Errors.UNKNOWN_MEMBER_ID);
> > > > > > }
> > > > > >
> > > > > > We check the code resetGeneration and find that it does
> > > > > > this.state = MemberState.UNJOINED;
> > > > > >
> > > > > > And then in method generation we do something like this
> > > > > > if (this.state != MemberState.STABLE)
> > > > > >             return null;
> > > > > >
> > > > > > So why are we not returning Generation.NO_GENERATION which was
> > being
> > > > set
> > > > > > by resetGenerationearlier.
> > > > > >
> > > > > > Would this line better?
> > > > > > if (this.state != MemberState.STABLE && !this.rejoinNeeded)
> > > > > >             return null;
> > > > > >
> > > > > > Also why are we getting UNKNOWN_MEMBER_ID exception in first
> place.
> > > > > >
> > > > > > 2. This is the second case:
> > > > > > org.apache.kafka.clients.consumer.CommitFailedException: Commit
> > > > cannot be
> > > > > > completed since the group has already rebalanced and assigned the
> > > > > > partitions to another member. This means that the time between
> > > > subsequent
> > > > > > calls to poll() was longer than the configured
> > max.poll.interval.ms,
> > > > > > which typically implies that the poll loop is spending too much
> > time
> > > > > > message processing. You can address this either by increasing the
> > > > session
> > > > > > timeout or by reducing the maximum size of batches returned in
> > poll()
> > > > > with
> > > > > > max.poll.records.
> > > > > >     at org.apache.kafka.clients.consumer.internals.
> > > > ConsumerCoordinator$
> > > > > > OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:766)
> > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > > > >     at org.apache.kafka.clients.consumer.internals.
> > > > ConsumerCoordinator$
> > > > > > OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:712)
> > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > > > >
> > > > > > Here again when we check the code we see that it failed inside
> > > > > > OffsetCommitResponseHandler
> > > > > >
> > > > > > if (error == Errors.UNKNOWN_MEMBER_ID
> > > > > > || error == Errors.ILLEGAL_GENERATION
> > > > > > || error == Errors.REBALANCE_IN_PROGRESS) {
> > > > > > // need to re-join group
> > > > > > log.debug("Offset commit for group {} failed: {}", groupId,
> error.
> > > > > > message());
> > > > > > resetGeneration();
> > > > > > future.raise(new CommitFailedException());
> > > > > > return;
> > > > > > }
> > > > > >
> > > > > > And we check check the log message before this we find
> > > > > > Offset commit for group new-part-advice failed: The coordinator
> is
> > > not
> > > > > > aware of this member.
> > > > > >
> > > > > > This error is again due to UNKNOWN_MEMBER_ID error.
> > > > > > The handling in this case is similar to one we do for heartbeat
> > fail.
> > > > > >
> > > > > > So again why are we getting this UNKNOWN_MEMBER_ID error and
> should
> > > not
> > > > > we
> > > > > > handle it with a different exception as these errors are not
> > related
> > > to
> > > > > > poll time at all.
> > > > > >
> > > > > > So please let us know what could be the issue here and how can we
> > fix
> > > > > this.
> > > > > >
> > > > > > Also not that we are running an identical single thread
> application
> > > > which
> > > > > > source from identical single partitioned source topic, and that
> > > > > application
> > > > > > never fails.
> > > > > > So it is hard to understand if we are doing something wrong in
> our
> > > > logic.
> > > > > >
> > > > > > Also from logs and JMX metrics collection we see that poll is
> > called
> > > > well
> > > > > > withing default 30 seconds, usually it is around 5 seconds.
> > > > > >
> > > > > > Please share any ideas.
> > > > > >
> > > > > > Thanks
> > > > > > Sachin
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to