I would try to get the logs soon.
One quick question, I have three brokers which run in cluster with default
logging.

Which log4j logs would be of interest at broker side and which broker or do
I need to send logs from all three.

My topic is partitioned and replicated on all three so kafka-logs dir
contains same topic logs.


Thanks
Sachin


On Thu, Feb 9, 2017 at 1:32 PM, Damian Guy <damian....@gmail.com> wrote:

> Sachin,
>
> Can you provide the full logs from the broker and the streams app? It is
> hard to understand what is going on with only snippets of information. It
> seems like the rebalance is taking too long, but i can't tell from this.
>
> Thanks,
> Damian
>
> On Thu, 9 Feb 2017 at 07:53 Sachin Mittal <sjmit...@gmail.com> wrote:
>
> > Hi,
> > In continuation of the CommitFailedException what we observe is that when
> > this happens first time
> >
> > ConsumerCoordinator invokes onPartitionsRevoked on StreamThread.
> > This calls suspendTasksAndState() which again tries to commit offset and
> > then again the same exception is thrown.
> > This gets handled at ConsumerCoordinator and it logs it and then re
> assigns
> > new partition.
> >
> > But stream thread before rethrowing the exception calls
> > rebalanceException = t;
> >
> > https://github.com/apache/kafka/blob/0.10.2/streams/src/
> main/java/org/apache/kafka/streams/processor/internals/
> StreamThread.java#L261
> >
> > So now when runLoop executes it gets this exception and stream thread
> > exits.
> >
> > Here are the logs
> >
> >  -----   Exception - happend
> >
> > Unsubscribed all topics or patterns and assigned partitions
> > stream-thread [StreamThread-1] Updating suspended tasks to contain active
> > tasks [[0_32, 0_3, 0_20, 0_8]]
> > stream-thread [StreamThread-1] Removing all active tasks [[0_32, 0_3,
> 0_20,
> > 0_8]]
> > stream-thread [StreamThread-1] Removing all standby tasks [[]]
> > User provided listener
> > org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> > new-part-advice failed on partition revocation
> >  -----   Revocation failed and exception handled
> >
> > (Re-)joining group new-part-advice
> > stream-thread [StreamThread-1] found [advice-stream] topics possibly
> > matching regex
> > stream-thread [StreamThread-1] updating builder with
> > SubscriptionUpdates{updatedTopicSubscriptions=[advice-stream]} topic(s)
> > with possible matching regex subscription(s)
> > Sending JoinGroup ((type: JoinGroupRequest, groupId=new-part-advice,
> > sessionTimeout=10000, rebalanceTimeout=300000, memberId=,
> > protocolType=consumer,
> >
> > groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$
> ProtocolMetadata@300c8a1f
> > ))
> > to coordinator 192.168.73.198:9092 (id: 2147483643 <(214)%20748-3643>
> > rack: null)
> >
> > stream-thread [StreamThread-1] New partitions [[advice-stream-8,
> > advice-stream-32, advice-stream-3, advice-stream-20]] assigned at the end
> > of consumer rebalance.
> >
> >  -----   Same new partitions assigned on reblance
> >
> > stream-thread [StreamThread-1] recycling old task 0_32
> > stream-thread [StreamThread-1] recycling old task 0_3
> > stream-thread [StreamThread-1] recycling old task 0_20
> > stream-thread [StreamThread-1] recycling old task 0_8
> >
> >  -----   It then shuts down
> >
> > stream-thread [StreamThread-1] Shutting down
> > stream-thread [StreamThread-1] shutdownTasksAndState: shutting down all
> > active tasks [[0_32, 0_3, 0_20, 0_8]] and standby tasks [[]]
> >
> > Uncaught exception at : Tue Feb 07 21:43:15 IST 2017
> > org.apache.kafka.streams.errors.StreamsException: stream-thread
> > [StreamThread-1] Failed to rebalance
> >     at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:612)
> > ~[kafka-streams-0.10.2.0.jar:na]
> >     at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:368)
> > ~[kafka-streams-0.10.2.0.jar:na]
> > Caused by: org.apache.kafka.streams.errors.StreamsException:
> stream-thread
> > [StreamThread-1] failed to suspend stream tasks
> >
> >
> > So is there a way we can restart that stream thread again. Also in this
> > case should we assign it rebalanceException, because this is
> > CommitFailedException and new partitions are already assigned and same
> > thread can continue processing new partitions.
> >
> > Thanks
> > Sachin
> >
> >
> > On Wed, Feb 8, 2017 at 2:19 PM, Sachin Mittal <sjmit...@gmail.com>
> wrote:
> >
> > > Hi All,
> > > I am trying out the 0.10.2.0 rc.
> > > We have a source stream of 40 partitions.
> > >
> > > We start one instance with 4 threads.
> > > After that we start second instance with same config on a different
> > > machine and then same way third instance.
> > >
> > > After application reaches steady state we start getting
> > > CommitFailedException.
> > > Firstly I feel is that message for CommitFailedException should change.
> > It
> > > is thrown from number of places and sometimes is just not related to
> > > subsequent calls to poll() was longer than the configured
> > > max.poll.interval.ms.
> > >
> > > We have verified this and this is never the case on our case when we
> get
> > > the exception. So perhaps introduce another Exception class, but this
> > > message is very misleading and we end up investigation wrong areas.
> > >
> > > Anyway on the places where this exception gets thrown in our cases
> > > 1. Exception trace is this
> > > stream-thread [StreamThread-4] Failed while executing StreamTask 0_5
> due
> > > to commit consumer offsets:
> > > org.apache.kafka.clients.consumer.CommitFailedException: Commit
> cannot be
> > > completed since the group has already rebalanced and assigned the
> > > partitions to another member. This means that the time between
> subsequent
> > > calls to poll() was longer than the configured max.poll.interval.ms,
> > > which typically implies that the poll loop is spending too much time
> > > message processing. You can address this either by increasing the
> session
> > > timeout or by reducing the maximum size of batches returned in poll()
> > with
> > > max.poll.records.
> > >     at org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.
> > > sendOffsetCommitRequest(ConsumerCoordinator.java:698)
> > > [kafka-clients-0.10.2.0.jar:na]
> > >     at org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.
> > > commitOffsetsSync(ConsumerCoordinator.java:577)
> > > [kafka-clients-0.10.2.0.jar:na]
> > >     at org.apache.kafka.clients.consumer.KafkaConsumer.
> > > commitSync(KafkaConsumer.java:1125) [kafka-clients-0.10.2.0.jar:na]
> > >
> > > in the code that line is
> > > // if the generation is null, we are not part of an active group (and
> we
> > > expect to be).
> > > // the only thing we can do is fail the commit and let the user rejoin
> > > the group in poll()
> > > if (generation == null)
> > > return RequestFuture.failure(new CommitFailedException());
> > >
> > > After we investigate what caused the generation to be null we found
> that
> > > sometime before that following was thrown
> > >
> > > Attempt to heartbeat failed for group new-part-advice since member id
> is
> > > not valid.
> > >
> > > We check the code and we see this
> > > if (error == Errors.UNKNOWN_MEMBER_ID) {
> > > log.debug("Attempt to heartbeat failed for group {} since member id is
> > > not valid.", groupId);
> > > resetGeneration();
> > > future.raise(Errors.UNKNOWN_MEMBER_ID);
> > > }
> > >
> > > We check the code resetGeneration and find that it does
> > > this.state = MemberState.UNJOINED;
> > >
> > > And then in method generation we do something like this
> > > if (this.state != MemberState.STABLE)
> > >             return null;
> > >
> > > So why are we not returning Generation.NO_GENERATION which was being
> set
> > > by resetGenerationearlier.
> > >
> > > Would this line better?
> > > if (this.state != MemberState.STABLE && !this.rejoinNeeded)
> > >             return null;
> > >
> > > Also why are we getting UNKNOWN_MEMBER_ID exception in first place.
> > >
> > > 2. This is the second case:
> > > org.apache.kafka.clients.consumer.CommitFailedException: Commit
> cannot be
> > > completed since the group has already rebalanced and assigned the
> > > partitions to another member. This means that the time between
> subsequent
> > > calls to poll() was longer than the configured max.poll.interval.ms,
> > > which typically implies that the poll loop is spending too much time
> > > message processing. You can address this either by increasing the
> session
> > > timeout or by reducing the maximum size of batches returned in poll()
> > with
> > > max.poll.records.
> > >     at org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator$
> > > OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:766)
> > > ~[kafka-clients-0.10.2.0.jar:na]
> > >     at org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator$
> > > OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:712)
> > > ~[kafka-clients-0.10.2.0.jar:na]
> > >
> > > Here again when we check the code we see that it failed inside
> > > OffsetCommitResponseHandler
> > >
> > > if (error == Errors.UNKNOWN_MEMBER_ID
> > > || error == Errors.ILLEGAL_GENERATION
> > > || error == Errors.REBALANCE_IN_PROGRESS) {
> > > // need to re-join group
> > > log.debug("Offset commit for group {} failed: {}", groupId, error.
> > > message());
> > > resetGeneration();
> > > future.raise(new CommitFailedException());
> > > return;
> > > }
> > >
> > > And we check check the log message before this we find
> > > Offset commit for group new-part-advice failed: The coordinator is not
> > > aware of this member.
> > >
> > > This error is again due to UNKNOWN_MEMBER_ID error.
> > > The handling in this case is similar to one we do for heartbeat fail.
> > >
> > > So again why are we getting this UNKNOWN_MEMBER_ID error and should not
> > we
> > > handle it with a different exception as these errors are not related to
> > > poll time at all.
> > >
> > > So please let us know what could be the issue here and how can we fix
> > this.
> > >
> > > Also not that we are running an identical single thread application
> which
> > > source from identical single partitioned source topic, and that
> > application
> > > never fails.
> > > So it is hard to understand if we are doing something wrong in our
> logic.
> > >
> > > Also from logs and JMX metrics collection we see that poll is called
> well
> > > withing default 30 seconds, usually it is around 5 seconds.
> > >
> > > Please share any ideas.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> >
>

Reply via email to