I am getting the logs but could you please look at the line rebalanceException = t; https://github.com/apache/kafka/blob/0.10.2/streams/src/ main/java/org/apache/kafka/streams/processor/internals/ StreamThread.java#L261
Why are we setting rebalanceException in case of commit failed exception on partition revoked. What is happening is when runLoop rensures it encounters this and thread shuts down . however it just got new partitions assigned and should have continued. Thanks Sachin On 9 Feb 2017 14:14, "Damian Guy" <damian....@gmail.com> wrote: Might be easiest to just send all the logs if possible. On Thu, 9 Feb 2017 at 08:10 Sachin Mittal <sjmit...@gmail.com> wrote: > I would try to get the logs soon. > One quick question, I have three brokers which run in cluster with default > logging. > > Which log4j logs would be of interest at broker side and which broker or do > I need to send logs from all three. > > My topic is partitioned and replicated on all three so kafka-logs dir > contains same topic logs. > > > Thanks > Sachin > > > On Thu, Feb 9, 2017 at 1:32 PM, Damian Guy <damian....@gmail.com> wrote: > > > Sachin, > > > > Can you provide the full logs from the broker and the streams app? It is > > hard to understand what is going on with only snippets of information. It > > seems like the rebalance is taking too long, but i can't tell from this. > > > > Thanks, > > Damian > > > > On Thu, 9 Feb 2017 at 07:53 Sachin Mittal <sjmit...@gmail.com> wrote: > > > > > Hi, > > > In continuation of the CommitFailedException what we observe is that > when > > > this happens first time > > > > > > ConsumerCoordinator invokes onPartitionsRevoked on StreamThread. > > > This calls suspendTasksAndState() which again tries to commit offset > and > > > then again the same exception is thrown. > > > This gets handled at ConsumerCoordinator and it logs it and then re > > assigns > > > new partition. > > > > > > But stream thread before rethrowing the exception calls > > > rebalanceException = t; > > > > > > https://github.com/apache/kafka/blob/0.10.2/streams/src/ > > main/java/org/apache/kafka/streams/processor/internals/ > > StreamThread.java#L261 > > > > > > So now when runLoop executes it gets this exception and stream thread > > > exits. > > > > > > Here are the logs > > > > > > ----- Exception - happend > > > > > > Unsubscribed all topics or patterns and assigned partitions > > > stream-thread [StreamThread-1] Updating suspended tasks to contain > active > > > tasks [[0_32, 0_3, 0_20, 0_8]] > > > stream-thread [StreamThread-1] Removing all active tasks [[0_32, 0_3, > > 0_20, > > > 0_8]] > > > stream-thread [StreamThread-1] Removing all standby tasks [[]] > > > User provided listener > > > org.apache.kafka.streams.processor.internals.StreamThread$1 for group > > > new-part-advice failed on partition revocation > > > ----- Revocation failed and exception handled > > > > > > (Re-)joining group new-part-advice > > > stream-thread [StreamThread-1] found [advice-stream] topics possibly > > > matching regex > > > stream-thread [StreamThread-1] updating builder with > > > SubscriptionUpdates{updatedTopicSubscriptions=[advice-stream]} topic(s) > > > with possible matching regex subscription(s) > > > Sending JoinGroup ((type: JoinGroupRequest, groupId=new-part-advice, > > > sessionTimeout=10000, rebalanceTimeout=300000, memberId=, > > > protocolType=consumer, > > > > > > groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ > > ProtocolMetadata@300c8a1f > > > )) > > > to coordinator 192.168.73.198:9092 (id: 2147483643 <(214)%20748-3643> > <(214)%20748-3643> > > > rack: null) > > > > > > stream-thread [StreamThread-1] New partitions [[advice-stream-8, > > > advice-stream-32, advice-stream-3, advice-stream-20]] assigned at the > end > > > of consumer rebalance. > > > > > > ----- Same new partitions assigned on reblance > > > > > > stream-thread [StreamThread-1] recycling old task 0_32 > > > stream-thread [StreamThread-1] recycling old task 0_3 > > > stream-thread [StreamThread-1] recycling old task 0_20 > > > stream-thread [StreamThread-1] recycling old task 0_8 > > > > > > ----- It then shuts down > > > > > > stream-thread [StreamThread-1] Shutting down > > > stream-thread [StreamThread-1] shutdownTasksAndState: shutting down all > > > active tasks [[0_32, 0_3, 0_20, 0_8]] and standby tasks [[]] > > > > > > Uncaught exception at : Tue Feb 07 21:43:15 IST 2017 > > > org.apache.kafka.streams.errors.StreamsException: stream-thread > > > [StreamThread-1] Failed to rebalance > > > at > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > > StreamThread.java:612) > > > ~[kafka-streams-0.10.2.0.jar:na] > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > StreamThread.run(StreamThread.java:368) > > > ~[kafka-streams-0.10.2.0.jar:na] > > > Caused by: org.apache.kafka.streams.errors.StreamsException: > > stream-thread > > > [StreamThread-1] failed to suspend stream tasks > > > > > > > > > So is there a way we can restart that stream thread again. Also in this > > > case should we assign it rebalanceException, because this is > > > CommitFailedException and new partitions are already assigned and same > > > thread can continue processing new partitions. > > > > > > Thanks > > > Sachin > > > > > > > > > On Wed, Feb 8, 2017 at 2:19 PM, Sachin Mittal <sjmit...@gmail.com> > > wrote: > > > > > > > Hi All, > > > > I am trying out the 0.10.2.0 rc. > > > > We have a source stream of 40 partitions. > > > > > > > > We start one instance with 4 threads. > > > > After that we start second instance with same config on a different > > > > machine and then same way third instance. > > > > > > > > After application reaches steady state we start getting > > > > CommitFailedException. > > > > Firstly I feel is that message for CommitFailedException should > change. > > > It > > > > is thrown from number of places and sometimes is just not related to > > > > subsequent calls to poll() was longer than the configured > > > > max.poll.interval.ms. > > > > > > > > We have verified this and this is never the case on our case when we > > get > > > > the exception. So perhaps introduce another Exception class, but this > > > > message is very misleading and we end up investigation wrong areas. > > > > > > > > Anyway on the places where this exception gets thrown in our cases > > > > 1. Exception trace is this > > > > stream-thread [StreamThread-4] Failed while executing StreamTask 0_5 > > due > > > > to commit consumer offsets: > > > > org.apache.kafka.clients.consumer.CommitFailedException: Commit > > cannot be > > > > completed since the group has already rebalanced and assigned the > > > > partitions to another member. This means that the time between > > subsequent > > > > calls to poll() was longer than the configured max.poll.interval.ms, > > > > which typically implies that the poll loop is spending too much time > > > > message processing. You can address this either by increasing the > > session > > > > timeout or by reducing the maximum size of batches returned in poll() > > > with > > > > max.poll.records. > > > > at org.apache.kafka.clients.consumer.internals. > > ConsumerCoordinator. > > > > sendOffsetCommitRequest(ConsumerCoordinator.java:698) > > > > [kafka-clients-0.10.2.0.jar:na] > > > > at org.apache.kafka.clients.consumer.internals. > > ConsumerCoordinator. > > > > commitOffsetsSync(ConsumerCoordinator.java:577) > > > > [kafka-clients-0.10.2.0.jar:na] > > > > at org.apache.kafka.clients.consumer.KafkaConsumer. > > > > commitSync(KafkaConsumer.java:1125) [kafka-clients-0.10.2.0.jar:na] > > > > > > > > in the code that line is > > > > // if the generation is null, we are not part of an active group (and > > we > > > > expect to be). > > > > // the only thing we can do is fail the commit and let the user > rejoin > > > > the group in poll() > > > > if (generation == null) > > > > return RequestFuture.failure(new CommitFailedException()); > > > > > > > > After we investigate what caused the generation to be null we found > > that > > > > sometime before that following was thrown > > > > > > > > Attempt to heartbeat failed for group new-part-advice since member id > > is > > > > not valid. > > > > > > > > We check the code and we see this > > > > if (error == Errors.UNKNOWN_MEMBER_ID) { > > > > log.debug("Attempt to heartbeat failed for group {} since member id > is > > > > not valid.", groupId); > > > > resetGeneration(); > > > > future.raise(Errors.UNKNOWN_MEMBER_ID); > > > > } > > > > > > > > We check the code resetGeneration and find that it does > > > > this.state = MemberState.UNJOINED; > > > > > > > > And then in method generation we do something like this > > > > if (this.state != MemberState.STABLE) > > > > return null; > > > > > > > > So why are we not returning Generation.NO_GENERATION which was being > > set > > > > by resetGenerationearlier. > > > > > > > > Would this line better? > > > > if (this.state != MemberState.STABLE && !this.rejoinNeeded) > > > > return null; > > > > > > > > Also why are we getting UNKNOWN_MEMBER_ID exception in first place. > > > > > > > > 2. This is the second case: > > > > org.apache.kafka.clients.consumer.CommitFailedException: Commit > > cannot be > > > > completed since the group has already rebalanced and assigned the > > > > partitions to another member. This means that the time between > > subsequent > > > > calls to poll() was longer than the configured max.poll.interval.ms, > > > > which typically implies that the poll loop is spending too much time > > > > message processing. You can address this either by increasing the > > session > > > > timeout or by reducing the maximum size of batches returned in poll() > > > with > > > > max.poll.records. > > > > at org.apache.kafka.clients.consumer.internals. > > ConsumerCoordinator$ > > > > OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:766) > > > > ~[kafka-clients-0.10.2.0.jar:na] > > > > at org.apache.kafka.clients.consumer.internals. > > ConsumerCoordinator$ > > > > OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:712) > > > > ~[kafka-clients-0.10.2.0.jar:na] > > > > > > > > Here again when we check the code we see that it failed inside > > > > OffsetCommitResponseHandler > > > > > > > > if (error == Errors.UNKNOWN_MEMBER_ID > > > > || error == Errors.ILLEGAL_GENERATION > > > > || error == Errors.REBALANCE_IN_PROGRESS) { > > > > // need to re-join group > > > > log.debug("Offset commit for group {} failed: {}", groupId, error. > > > > message()); > > > > resetGeneration(); > > > > future.raise(new CommitFailedException()); > > > > return; > > > > } > > > > > > > > And we check check the log message before this we find > > > > Offset commit for group new-part-advice failed: The coordinator is > not > > > > aware of this member. > > > > > > > > This error is again due to UNKNOWN_MEMBER_ID error. > > > > The handling in this case is similar to one we do for heartbeat fail. > > > > > > > > So again why are we getting this UNKNOWN_MEMBER_ID error and should > not > > > we > > > > handle it with a different exception as these errors are not related > to > > > > poll time at all. > > > > > > > > So please let us know what could be the issue here and how can we fix > > > this. > > > > > > > > Also not that we are running an identical single thread application > > which > > > > source from identical single partitioned source topic, and that > > > application > > > > never fails. > > > > So it is hard to understand if we are doing something wrong in our > > logic. > > > > > > > > Also from logs and JMX metrics collection we see that poll is called > > well > > > > withing default 30 seconds, usually it is around 5 seconds. > > > > > > > > Please share any ideas. > > > > > > > > Thanks > > > > Sachin > > > > > > > > > > > > > >