Might be easiest to just send all the logs if possible.

On Thu, 9 Feb 2017 at 08:10 Sachin Mittal <sjmit...@gmail.com> wrote:

> I would try to get the logs soon.
> One quick question, I have three brokers which run in cluster with default
> logging.
>
> Which log4j logs would be of interest at broker side and which broker or do
> I need to send logs from all three.
>
> My topic is partitioned and replicated on all three so kafka-logs dir
> contains same topic logs.
>
>
> Thanks
> Sachin
>
>
> On Thu, Feb 9, 2017 at 1:32 PM, Damian Guy <damian....@gmail.com> wrote:
>
> > Sachin,
> >
> > Can you provide the full logs from the broker and the streams app? It is
> > hard to understand what is going on with only snippets of information. It
> > seems like the rebalance is taking too long, but i can't tell from this.
> >
> > Thanks,
> > Damian
> >
> > On Thu, 9 Feb 2017 at 07:53 Sachin Mittal <sjmit...@gmail.com> wrote:
> >
> > > Hi,
> > > In continuation of the CommitFailedException what we observe is that
> when
> > > this happens first time
> > >
> > > ConsumerCoordinator invokes onPartitionsRevoked on StreamThread.
> > > This calls suspendTasksAndState() which again tries to commit offset
> and
> > > then again the same exception is thrown.
> > > This gets handled at ConsumerCoordinator and it logs it and then re
> > assigns
> > > new partition.
> > >
> > > But stream thread before rethrowing the exception calls
> > > rebalanceException = t;
> > >
> > > https://github.com/apache/kafka/blob/0.10.2/streams/src/
> > main/java/org/apache/kafka/streams/processor/internals/
> > StreamThread.java#L261
> > >
> > > So now when runLoop executes it gets this exception and stream thread
> > > exits.
> > >
> > > Here are the logs
> > >
> > >  -----   Exception - happend
> > >
> > > Unsubscribed all topics or patterns and assigned partitions
> > > stream-thread [StreamThread-1] Updating suspended tasks to contain
> active
> > > tasks [[0_32, 0_3, 0_20, 0_8]]
> > > stream-thread [StreamThread-1] Removing all active tasks [[0_32, 0_3,
> > 0_20,
> > > 0_8]]
> > > stream-thread [StreamThread-1] Removing all standby tasks [[]]
> > > User provided listener
> > > org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> > > new-part-advice failed on partition revocation
> > >  -----   Revocation failed and exception handled
> > >
> > > (Re-)joining group new-part-advice
> > > stream-thread [StreamThread-1] found [advice-stream] topics possibly
> > > matching regex
> > > stream-thread [StreamThread-1] updating builder with
> > > SubscriptionUpdates{updatedTopicSubscriptions=[advice-stream]} topic(s)
> > > with possible matching regex subscription(s)
> > > Sending JoinGroup ((type: JoinGroupRequest, groupId=new-part-advice,
> > > sessionTimeout=10000, rebalanceTimeout=300000, memberId=,
> > > protocolType=consumer,
> > >
> > > groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$
> > ProtocolMetadata@300c8a1f
> > > ))
> > > to coordinator 192.168.73.198:9092 (id: 2147483643 <(214)%20748-3643>
> <(214)%20748-3643>
> > > rack: null)
> > >
> > > stream-thread [StreamThread-1] New partitions [[advice-stream-8,
> > > advice-stream-32, advice-stream-3, advice-stream-20]] assigned at the
> end
> > > of consumer rebalance.
> > >
> > >  -----   Same new partitions assigned on reblance
> > >
> > > stream-thread [StreamThread-1] recycling old task 0_32
> > > stream-thread [StreamThread-1] recycling old task 0_3
> > > stream-thread [StreamThread-1] recycling old task 0_20
> > > stream-thread [StreamThread-1] recycling old task 0_8
> > >
> > >  -----   It then shuts down
> > >
> > > stream-thread [StreamThread-1] Shutting down
> > > stream-thread [StreamThread-1] shutdownTasksAndState: shutting down all
> > > active tasks [[0_32, 0_3, 0_20, 0_8]] and standby tasks [[]]
> > >
> > > Uncaught exception at : Tue Feb 07 21:43:15 IST 2017
> > > org.apache.kafka.streams.errors.StreamsException: stream-thread
> > > [StreamThread-1] Failed to rebalance
> > >     at
> > >
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:612)
> > > ~[kafka-streams-0.10.2.0.jar:na]
> > >     at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:368)
> > > ~[kafka-streams-0.10.2.0.jar:na]
> > > Caused by: org.apache.kafka.streams.errors.StreamsException:
> > stream-thread
> > > [StreamThread-1] failed to suspend stream tasks
> > >
> > >
> > > So is there a way we can restart that stream thread again. Also in this
> > > case should we assign it rebalanceException, because this is
> > > CommitFailedException and new partitions are already assigned and same
> > > thread can continue processing new partitions.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > On Wed, Feb 8, 2017 at 2:19 PM, Sachin Mittal <sjmit...@gmail.com>
> > wrote:
> > >
> > > > Hi All,
> > > > I am trying out the 0.10.2.0 rc.
> > > > We have a source stream of 40 partitions.
> > > >
> > > > We start one instance with 4 threads.
> > > > After that we start second instance with same config on a different
> > > > machine and then same way third instance.
> > > >
> > > > After application reaches steady state we start getting
> > > > CommitFailedException.
> > > > Firstly I feel is that message for CommitFailedException should
> change.
> > > It
> > > > is thrown from number of places and sometimes is just not related to
> > > > subsequent calls to poll() was longer than the configured
> > > > max.poll.interval.ms.
> > > >
> > > > We have verified this and this is never the case on our case when we
> > get
> > > > the exception. So perhaps introduce another Exception class, but this
> > > > message is very misleading and we end up investigation wrong areas.
> > > >
> > > > Anyway on the places where this exception gets thrown in our cases
> > > > 1. Exception trace is this
> > > > stream-thread [StreamThread-4] Failed while executing StreamTask 0_5
> > due
> > > > to commit consumer offsets:
> > > > org.apache.kafka.clients.consumer.CommitFailedException: Commit
> > cannot be
> > > > completed since the group has already rebalanced and assigned the
> > > > partitions to another member. This means that the time between
> > subsequent
> > > > calls to poll() was longer than the configured max.poll.interval.ms,
> > > > which typically implies that the poll loop is spending too much time
> > > > message processing. You can address this either by increasing the
> > session
> > > > timeout or by reducing the maximum size of batches returned in poll()
> > > with
> > > > max.poll.records.
> > > >     at org.apache.kafka.clients.consumer.internals.
> > ConsumerCoordinator.
> > > > sendOffsetCommitRequest(ConsumerCoordinator.java:698)
> > > > [kafka-clients-0.10.2.0.jar:na]
> > > >     at org.apache.kafka.clients.consumer.internals.
> > ConsumerCoordinator.
> > > > commitOffsetsSync(ConsumerCoordinator.java:577)
> > > > [kafka-clients-0.10.2.0.jar:na]
> > > >     at org.apache.kafka.clients.consumer.KafkaConsumer.
> > > > commitSync(KafkaConsumer.java:1125) [kafka-clients-0.10.2.0.jar:na]
> > > >
> > > > in the code that line is
> > > > // if the generation is null, we are not part of an active group (and
> > we
> > > > expect to be).
> > > > // the only thing we can do is fail the commit and let the user
> rejoin
> > > > the group in poll()
> > > > if (generation == null)
> > > > return RequestFuture.failure(new CommitFailedException());
> > > >
> > > > After we investigate what caused the generation to be null we found
> > that
> > > > sometime before that following was thrown
> > > >
> > > > Attempt to heartbeat failed for group new-part-advice since member id
> > is
> > > > not valid.
> > > >
> > > > We check the code and we see this
> > > > if (error == Errors.UNKNOWN_MEMBER_ID) {
> > > > log.debug("Attempt to heartbeat failed for group {} since member id
> is
> > > > not valid.", groupId);
> > > > resetGeneration();
> > > > future.raise(Errors.UNKNOWN_MEMBER_ID);
> > > > }
> > > >
> > > > We check the code resetGeneration and find that it does
> > > > this.state = MemberState.UNJOINED;
> > > >
> > > > And then in method generation we do something like this
> > > > if (this.state != MemberState.STABLE)
> > > >             return null;
> > > >
> > > > So why are we not returning Generation.NO_GENERATION which was being
> > set
> > > > by resetGenerationearlier.
> > > >
> > > > Would this line better?
> > > > if (this.state != MemberState.STABLE && !this.rejoinNeeded)
> > > >             return null;
> > > >
> > > > Also why are we getting UNKNOWN_MEMBER_ID exception in first place.
> > > >
> > > > 2. This is the second case:
> > > > org.apache.kafka.clients.consumer.CommitFailedException: Commit
> > cannot be
> > > > completed since the group has already rebalanced and assigned the
> > > > partitions to another member. This means that the time between
> > subsequent
> > > > calls to poll() was longer than the configured max.poll.interval.ms,
> > > > which typically implies that the poll loop is spending too much time
> > > > message processing. You can address this either by increasing the
> > session
> > > > timeout or by reducing the maximum size of batches returned in poll()
> > > with
> > > > max.poll.records.
> > > >     at org.apache.kafka.clients.consumer.internals.
> > ConsumerCoordinator$
> > > > OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:766)
> > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > >     at org.apache.kafka.clients.consumer.internals.
> > ConsumerCoordinator$
> > > > OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:712)
> > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > >
> > > > Here again when we check the code we see that it failed inside
> > > > OffsetCommitResponseHandler
> > > >
> > > > if (error == Errors.UNKNOWN_MEMBER_ID
> > > > || error == Errors.ILLEGAL_GENERATION
> > > > || error == Errors.REBALANCE_IN_PROGRESS) {
> > > > // need to re-join group
> > > > log.debug("Offset commit for group {} failed: {}", groupId, error.
> > > > message());
> > > > resetGeneration();
> > > > future.raise(new CommitFailedException());
> > > > return;
> > > > }
> > > >
> > > > And we check check the log message before this we find
> > > > Offset commit for group new-part-advice failed: The coordinator is
> not
> > > > aware of this member.
> > > >
> > > > This error is again due to UNKNOWN_MEMBER_ID error.
> > > > The handling in this case is similar to one we do for heartbeat fail.
> > > >
> > > > So again why are we getting this UNKNOWN_MEMBER_ID error and should
> not
> > > we
> > > > handle it with a different exception as these errors are not related
> to
> > > > poll time at all.
> > > >
> > > > So please let us know what could be the issue here and how can we fix
> > > this.
> > > >
> > > > Also not that we are running an identical single thread application
> > which
> > > > source from identical single partitioned source topic, and that
> > > application
> > > > never fails.
> > > > So it is hard to understand if we are doing something wrong in our
> > logic.
> > > >
> > > > Also from logs and JMX metrics collection we see that poll is called
> > well
> > > > withing default 30 seconds, usually it is around 5 seconds.
> > > >
> > > > Please share any ideas.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > >
> > >
> >
>

Reply via email to