On 10 February 2017 at 11:18, Sachin Mittal <sjmit...@gmail.com> wrote:

> The heartbeat exception while rebalancing is OK. However I had some
> different scenario which I wanted to understand.
>
> Please check line 42428 of https://dl.dropboxusercontent.com/u/46450177/
> TestKafkaAdvice.StreamThread-1.log


If you look at line 42006 you will see that the group is rebalancing.
Attempt to heartbeat failed for group new-part-advice since it is
rebalancing.

This is the likely cause of the message at line 42428 because that member
has been kicked out of the group.

>
>
> Attempt to heartbeat failed for group new-part-advice since member id is
> not valid.
>
> Why do we get this exception?
>
> Also on line 87485 of https://dl.dropboxusercontent.com/u/46450177/
> TestKafkaAdvice.StreamThread-4.log
> We get
> Offset commit for group new-part-advice failed: The coordinator is not
> aware of this member.
>
> Both the errors look same, and seems to be failing due to member id
> unknown.
>
> What could be the reason for the same?
>

Again if you look a bit before that at line 87248, you'll see that the
group has rebalanced


>
> Regarding
> w.r.t to continuing after a CommitFailedException during
> onPartitionsRevoked - will have to think through the scenarios.
>
> Please look at the log: https://dl.dropboxusercontent.com/u/46450177/
> TestKafkaAdvice.StreamThread-4.log
> Line 88714
> stream-thread [StreamThread-1] Updating suspended tasks to contain active
> tasks [[0_17, 0_19, 0_24, 0_31]]
> stream-thread [StreamThread-1] Removing all active tasks [[0_17, 0_19,
> 0_24, 0_31]]
> stream-thread [StreamThread-1] Removing all standby tasks [[]]
> User provided listener
> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> new-part-advice failed on partition revocation
> This gets handled and logged and stream thread resumes with new partitions
>
> Check line 90005
> stream-thread [StreamThread-1] New partitions [[advice-stream-12,
> advice-stream-15, advice-stream-1, advice-stream-21]] assigned at the end
> of consumer rebalance.
>
> Now at this stage stream thread should continue.
> However since we assign
> rebalanceException = t;
> in https://github.com/apache/kafka/blob/0.10.2/streams/src/
> main/java/org/apache/kafka/streams/processor/internals/
> StreamThread.java#L261
>
> It shuts down when it again tries to execute runLoop
>
> Please check line 91074
> stream-thread [StreamThread-1] Shutting down
>
> Please let me know if this case if valid and we should not shut down the
> thread itself.
>
> I know this log is a mess and we have made due corrections.
>
> We ran the streams application again now with 12 threads (3 machines) and
> 12 partitions. It ran well for few hours, but then 2 of the machines and 2
> threads of the third machine died.
> This was much better result than 40 partitions and same 12 threads scenario
> where application used to go in perpetual rebalance state within couple of
> hours.
> We are in process of analysing those logs and would present out findings.
>
> Thanks
> Sachin
>
>
> On Fri, Feb 10, 2017 at 3:42 PM, Damian Guy <damian....@gmail.com> wrote:
>
> > Hi Sachin,
> >
> > The CommitFailedException are thrown because the group is rebalancing.
> You
> > can see log messages like below happening before the commit failed
> > exception:
> >
> > Attempt to heartbeat failed for group new-part-advice since it is
> > rebalancing.
> >
> > It isn't clear from the logs why the rebalancing is happening. The broker
> > logs would have been helpful. Also, timestamps on the kafka streams logs
> > would also be useful.
> >
> > w.r.t to continuing after a CommitFailedException during
> > onPartitionsRevoked - will have to think through the scenarios.
> >
> > On Fri, 10 Feb 2017 at 04:35 Sachin Mittal <sjmit...@gmail.com> wrote:
> >
> > > Hi,
> > > I could manage the streams client log, the server logs were deleted
> since
> > > time had elapsed and it got rolled over.
> > > See if you can figure out something from these. These are not best of
> > logs
> > > generated.
> > >
> > >
> > > https://dl.dropboxusercontent.com/u/46450177/
> > TestKafkaAdvice.StreamThread-1.log
> > > The above log pay attention to StreamThread-1 which fails due to
> > > CommitFailedException
> > > This mostly seem to be caused by OffsetCommitResponseHandler due to
> > unknown
> > > member id.
> > >
> > >
> > > https://dl.dropboxusercontent.com/u/46450177/
> > TestKafkaAdvice.StreamThread-4.log
> > > The above log pay attention to StreamThread-4 which fails due to
> > > CommitFailedException during sendOffsetCommitRequest.
> > > This mostly seem to be caused by Attempt to heartbeat failed for group
> > > new-part-advice since member id is not valid.
> > >
> > > Also please let us know why we shutdown the thread if we get
> > > CommitFailedException, ideally we catch this exception in
> > > onPartitionsRevokedand
> > > then thread should continue processing the new partitions assigned.
> > >
> > > Please let me know if you need any more information from me.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > On Thu, Feb 9, 2017 at 2:14 PM, Damian Guy <damian....@gmail.com>
> wrote:
> > >
> > > > Might be easiest to just send all the logs if possible.
> > > >
> > > > On Thu, 9 Feb 2017 at 08:10 Sachin Mittal <sjmit...@gmail.com>
> wrote:
> > > >
> > > > > I would try to get the logs soon.
> > > > > One quick question, I have three brokers which run in cluster with
> > > > default
> > > > > logging.
> > > > >
> > > > > Which log4j logs would be of interest at broker side and which
> broker
> > > or
> > > > do
> > > > > I need to send logs from all three.
> > > > >
> > > > > My topic is partitioned and replicated on all three so kafka-logs
> dir
> > > > > contains same topic logs.
> > > > >
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > > >
> > > > > On Thu, Feb 9, 2017 at 1:32 PM, Damian Guy <damian....@gmail.com>
> > > wrote:
> > > > >
> > > > > > Sachin,
> > > > > >
> > > > > > Can you provide the full logs from the broker and the streams
> app?
> > It
> > > > is
> > > > > > hard to understand what is going on with only snippets of
> > > information.
> > > > It
> > > > > > seems like the rebalance is taking too long, but i can't tell
> from
> > > > this.
> > > > > >
> > > > > > Thanks,
> > > > > > Damian
> > > > > >
> > > > > > On Thu, 9 Feb 2017 at 07:53 Sachin Mittal <sjmit...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > > In continuation of the CommitFailedException what we observe is
> > > that
> > > > > when
> > > > > > > this happens first time
> > > > > > >
> > > > > > > ConsumerCoordinator invokes onPartitionsRevoked on
> StreamThread.
> > > > > > > This calls suspendTasksAndState() which again tries to commit
> > > offset
> > > > > and
> > > > > > > then again the same exception is thrown.
> > > > > > > This gets handled at ConsumerCoordinator and it logs it and
> then
> > re
> > > > > > assigns
> > > > > > > new partition.
> > > > > > >
> > > > > > > But stream thread before rethrowing the exception calls
> > > > > > > rebalanceException = t;
> > > > > > >
> > > > > > > https://github.com/apache/kafka/blob/0.10.2/streams/src/
> > > > > > main/java/org/apache/kafka/streams/processor/internals/
> > > > > > StreamThread.java#L261
> > > > > > >
> > > > > > > So now when runLoop executes it gets this exception and stream
> > > thread
> > > > > > > exits.
> > > > > > >
> > > > > > > Here are the logs
> > > > > > >
> > > > > > >  -----   Exception - happend
> > > > > > >
> > > > > > > Unsubscribed all topics or patterns and assigned partitions
> > > > > > > stream-thread [StreamThread-1] Updating suspended tasks to
> > contain
> > > > > active
> > > > > > > tasks [[0_32, 0_3, 0_20, 0_8]]
> > > > > > > stream-thread [StreamThread-1] Removing all active tasks
> [[0_32,
> > > 0_3,
> > > > > > 0_20,
> > > > > > > 0_8]]
> > > > > > > stream-thread [StreamThread-1] Removing all standby tasks [[]]
> > > > > > > User provided listener
> > > > > > > org.apache.kafka.streams.processor.internals.StreamThread$1
> for
> > > > group
> > > > > > > new-part-advice failed on partition revocation
> > > > > > >  -----   Revocation failed and exception handled
> > > > > > >
> > > > > > > (Re-)joining group new-part-advice
> > > > > > > stream-thread [StreamThread-1] found [advice-stream] topics
> > > possibly
> > > > > > > matching regex
> > > > > > > stream-thread [StreamThread-1] updating builder with
> > > > > > > SubscriptionUpdates{updatedTopicSubscriptions=[advice-stream]}
> > > > topic(s)
> > > > > > > with possible matching regex subscription(s)
> > > > > > > Sending JoinGroup ((type: JoinGroupRequest,
> > > groupId=new-part-advice,
> > > > > > > sessionTimeout=10000, rebalanceTimeout=300000, memberId=,
> > > > > > > protocolType=consumer,
> > > > > > >
> > > > > > > groupProtocols=org.apache.kafka.common.requests.
> > JoinGroupRequest$
> > > > > > ProtocolMetadata@300c8a1f
> > > > > > > ))
> > > > > > > to coordinator 192.168.73.198:9092 (id: 2147483643
> > > <(214)%20748-3643>
> > > > <(214)%20748-3643>
> > > > > <(214)%20748-3643>
> > > > > > > rack: null)
> > > > > > >
> > > > > > > stream-thread [StreamThread-1] New partitions
> [[advice-stream-8,
> > > > > > > advice-stream-32, advice-stream-3, advice-stream-20]] assigned
> at
> > > the
> > > > > end
> > > > > > > of consumer rebalance.
> > > > > > >
> > > > > > >  -----   Same new partitions assigned on reblance
> > > > > > >
> > > > > > > stream-thread [StreamThread-1] recycling old task 0_32
> > > > > > > stream-thread [StreamThread-1] recycling old task 0_3
> > > > > > > stream-thread [StreamThread-1] recycling old task 0_20
> > > > > > > stream-thread [StreamThread-1] recycling old task 0_8
> > > > > > >
> > > > > > >  -----   It then shuts down
> > > > > > >
> > > > > > > stream-thread [StreamThread-1] Shutting down
> > > > > > > stream-thread [StreamThread-1] shutdownTasksAndState: shutting
> > down
> > > > all
> > > > > > > active tasks [[0_32, 0_3, 0_20, 0_8]] and standby tasks [[]]
> > > > > > >
> > > > > > > Uncaught exception at : Tue Feb 07 21:43:15 IST 2017
> > > > > > > org.apache.kafka.streams.errors.StreamsException:
> stream-thread
> > > > > > > [StreamThread-1] Failed to rebalance
> > > > > > >     at
> > > > > > >
> > > > > > > org.apache.kafka.streams.processor.internals.
> > StreamThread.runLoop(
> > > > > > StreamThread.java:612)
> > > > > > > ~[kafka-streams-0.10.2.0.jar:na]
> > > > > > >     at
> > > > > > >
> > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > StreamThread.run(StreamThread.java:368)
> > > > > > > ~[kafka-streams-0.10.2.0.jar:na]
> > > > > > > Caused by: org.apache.kafka.streams.errors.StreamsException:
> > > > > > stream-thread
> > > > > > > [StreamThread-1] failed to suspend stream tasks
> > > > > > >
> > > > > > >
> > > > > > > So is there a way we can restart that stream thread again. Also
> > in
> > > > this
> > > > > > > case should we assign it rebalanceException, because this is
> > > > > > > CommitFailedException and new partitions are already assigned
> and
> > > > same
> > > > > > > thread can continue processing new partitions.
> > > > > > >
> > > > > > > Thanks
> > > > > > > Sachin
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Feb 8, 2017 at 2:19 PM, Sachin Mittal <
> > sjmit...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi All,
> > > > > > > > I am trying out the 0.10.2.0 rc.
> > > > > > > > We have a source stream of 40 partitions.
> > > > > > > >
> > > > > > > > We start one instance with 4 threads.
> > > > > > > > After that we start second instance with same config on a
> > > different
> > > > > > > > machine and then same way third instance.
> > > > > > > >
> > > > > > > > After application reaches steady state we start getting
> > > > > > > > CommitFailedException.
> > > > > > > > Firstly I feel is that message for CommitFailedException
> should
> > > > > change.
> > > > > > > It
> > > > > > > > is thrown from number of places and sometimes is just not
> > related
> > > > to
> > > > > > > > subsequent calls to poll() was longer than the configured
> > > > > > > > max.poll.interval.ms.
> > > > > > > >
> > > > > > > > We have verified this and this is never the case on our case
> > when
> > > > we
> > > > > > get
> > > > > > > > the exception. So perhaps introduce another Exception class,
> > but
> > > > this
> > > > > > > > message is very misleading and we end up investigation wrong
> > > areas.
> > > > > > > >
> > > > > > > > Anyway on the places where this exception gets thrown in our
> > > cases
> > > > > > > > 1. Exception trace is this
> > > > > > > > stream-thread [StreamThread-4] Failed while executing
> > StreamTask
> > > > 0_5
> > > > > > due
> > > > > > > > to commit consumer offsets:
> > > > > > > > org.apache.kafka.clients.consumer.CommitFailedException:
> > Commit
> > > > > > cannot be
> > > > > > > > completed since the group has already rebalanced and assigned
> > the
> > > > > > > > partitions to another member. This means that the time
> between
> > > > > > subsequent
> > > > > > > > calls to poll() was longer than the configured
> > > > max.poll.interval.ms,
> > > > > > > > which typically implies that the poll loop is spending too
> much
> > > > time
> > > > > > > > message processing. You can address this either by increasing
> > the
> > > > > > session
> > > > > > > > timeout or by reducing the maximum size of batches returned
> in
> > > > poll()
> > > > > > > with
> > > > > > > > max.poll.records.
> > > > > > > >     at org.apache.kafka.clients.consumer.internals.
> > > > > > ConsumerCoordinator.
> > > > > > > > sendOffsetCommitRequest(ConsumerCoordinator.java:698)
> > > > > > > > [kafka-clients-0.10.2.0.jar:na]
> > > > > > > >     at org.apache.kafka.clients.consumer.internals.
> > > > > > ConsumerCoordinator.
> > > > > > > > commitOffsetsSync(ConsumerCoordinator.java:577)
> > > > > > > > [kafka-clients-0.10.2.0.jar:na]
> > > > > > > >     at org.apache.kafka.clients.consumer.KafkaConsumer.
> > > > > > > > commitSync(KafkaConsumer.java:1125)
> > [kafka-clients-0.10.2.0.jar:
> > > > na]
> > > > > > > >
> > > > > > > > in the code that line is
> > > > > > > > // if the generation is null, we are not part of an active
> > group
> > > > (and
> > > > > > we
> > > > > > > > expect to be).
> > > > > > > > // the only thing we can do is fail the commit and let the
> user
> > > > > rejoin
> > > > > > > > the group in poll()
> > > > > > > > if (generation == null)
> > > > > > > > return RequestFuture.failure(new CommitFailedException());
> > > > > > > >
> > > > > > > > After we investigate what caused the generation to be null we
> > > found
> > > > > > that
> > > > > > > > sometime before that following was thrown
> > > > > > > >
> > > > > > > > Attempt to heartbeat failed for group new-part-advice since
> > > member
> > > > id
> > > > > > is
> > > > > > > > not valid.
> > > > > > > >
> > > > > > > > We check the code and we see this
> > > > > > > > if (error == Errors.UNKNOWN_MEMBER_ID) {
> > > > > > > > log.debug("Attempt to heartbeat failed for group {} since
> > member
> > > id
> > > > > is
> > > > > > > > not valid.", groupId);
> > > > > > > > resetGeneration();
> > > > > > > > future.raise(Errors.UNKNOWN_MEMBER_ID);
> > > > > > > > }
> > > > > > > >
> > > > > > > > We check the code resetGeneration and find that it does
> > > > > > > > this.state = MemberState.UNJOINED;
> > > > > > > >
> > > > > > > > And then in method generation we do something like this
> > > > > > > > if (this.state != MemberState.STABLE)
> > > > > > > >             return null;
> > > > > > > >
> > > > > > > > So why are we not returning Generation.NO_GENERATION which
> was
> > > > being
> > > > > > set
> > > > > > > > by resetGenerationearlier.
> > > > > > > >
> > > > > > > > Would this line better?
> > > > > > > > if (this.state != MemberState.STABLE && !this.rejoinNeeded)
> > > > > > > >             return null;
> > > > > > > >
> > > > > > > > Also why are we getting UNKNOWN_MEMBER_ID exception in first
> > > place.
> > > > > > > >
> > > > > > > > 2. This is the second case:
> > > > > > > > org.apache.kafka.clients.consumer.CommitFailedException:
> > Commit
> > > > > > cannot be
> > > > > > > > completed since the group has already rebalanced and assigned
> > the
> > > > > > > > partitions to another member. This means that the time
> between
> > > > > > subsequent
> > > > > > > > calls to poll() was longer than the configured
> > > > max.poll.interval.ms,
> > > > > > > > which typically implies that the poll loop is spending too
> much
> > > > time
> > > > > > > > message processing. You can address this either by increasing
> > the
> > > > > > session
> > > > > > > > timeout or by reducing the maximum size of batches returned
> in
> > > > poll()
> > > > > > > with
> > > > > > > > max.poll.records.
> > > > > > > >     at org.apache.kafka.clients.consumer.internals.
> > > > > > ConsumerCoordinator$
> > > > > > > > OffsetCommitResponseHandler.handle(ConsumerCoordinator.
> > java:766)
> > > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > > > > > >     at org.apache.kafka.clients.consumer.internals.
> > > > > > ConsumerCoordinator$
> > > > > > > > OffsetCommitResponseHandler.handle(ConsumerCoordinator.
> > java:712)
> > > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > > > > > >
> > > > > > > > Here again when we check the code we see that it failed
> inside
> > > > > > > > OffsetCommitResponseHandler
> > > > > > > >
> > > > > > > > if (error == Errors.UNKNOWN_MEMBER_ID
> > > > > > > > || error == Errors.ILLEGAL_GENERATION
> > > > > > > > || error == Errors.REBALANCE_IN_PROGRESS) {
> > > > > > > > // need to re-join group
> > > > > > > > log.debug("Offset commit for group {} failed: {}", groupId,
> > > error.
> > > > > > > > message());
> > > > > > > > resetGeneration();
> > > > > > > > future.raise(new CommitFailedException());
> > > > > > > > return;
> > > > > > > > }
> > > > > > > >
> > > > > > > > And we check check the log message before this we find
> > > > > > > > Offset commit for group new-part-advice failed: The
> coordinator
> > > is
> > > > > not
> > > > > > > > aware of this member.
> > > > > > > >
> > > > > > > > This error is again due to UNKNOWN_MEMBER_ID error.
> > > > > > > > The handling in this case is similar to one we do for
> heartbeat
> > > > fail.
> > > > > > > >
> > > > > > > > So again why are we getting this UNKNOWN_MEMBER_ID error and
> > > should
> > > > > not
> > > > > > > we
> > > > > > > > handle it with a different exception as these errors are not
> > > > related
> > > > > to
> > > > > > > > poll time at all.
> > > > > > > >
> > > > > > > > So please let us know what could be the issue here and how
> can
> > we
> > > > fix
> > > > > > > this.
> > > > > > > >
> > > > > > > > Also not that we are running an identical single thread
> > > application
> > > > > > which
> > > > > > > > source from identical single partitioned source topic, and
> that
> > > > > > > application
> > > > > > > > never fails.
> > > > > > > > So it is hard to understand if we are doing something wrong
> in
> > > our
> > > > > > logic.
> > > > > > > >
> > > > > > > > Also from logs and JMX metrics collection we see that poll is
> > > > called
> > > > > > well
> > > > > > > > withing default 30 seconds, usually it is around 5 seconds.
> > > > > > > >
> > > > > > > > Please share any ideas.
> > > > > > > >
> > > > > > > > Thanks
> > > > > > > > Sachin
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to