On 10 February 2017 at 11:18, Sachin Mittal <sjmit...@gmail.com> wrote:
> The heartbeat exception while rebalancing is OK. However I had some > different scenario which I wanted to understand. > > Please check line 42428 of https://dl.dropboxusercontent.com/u/46450177/ > TestKafkaAdvice.StreamThread-1.log If you look at line 42006 you will see that the group is rebalancing. Attempt to heartbeat failed for group new-part-advice since it is rebalancing. This is the likely cause of the message at line 42428 because that member has been kicked out of the group. > > > Attempt to heartbeat failed for group new-part-advice since member id is > not valid. > > Why do we get this exception? > > Also on line 87485 of https://dl.dropboxusercontent.com/u/46450177/ > TestKafkaAdvice.StreamThread-4.log > We get > Offset commit for group new-part-advice failed: The coordinator is not > aware of this member. > > Both the errors look same, and seems to be failing due to member id > unknown. > > What could be the reason for the same? > Again if you look a bit before that at line 87248, you'll see that the group has rebalanced > > Regarding > w.r.t to continuing after a CommitFailedException during > onPartitionsRevoked - will have to think through the scenarios. > > Please look at the log: https://dl.dropboxusercontent.com/u/46450177/ > TestKafkaAdvice.StreamThread-4.log > Line 88714 > stream-thread [StreamThread-1] Updating suspended tasks to contain active > tasks [[0_17, 0_19, 0_24, 0_31]] > stream-thread [StreamThread-1] Removing all active tasks [[0_17, 0_19, > 0_24, 0_31]] > stream-thread [StreamThread-1] Removing all standby tasks [[]] > User provided listener > org.apache.kafka.streams.processor.internals.StreamThread$1 for group > new-part-advice failed on partition revocation > This gets handled and logged and stream thread resumes with new partitions > > Check line 90005 > stream-thread [StreamThread-1] New partitions [[advice-stream-12, > advice-stream-15, advice-stream-1, advice-stream-21]] assigned at the end > of consumer rebalance. > > Now at this stage stream thread should continue. > However since we assign > rebalanceException = t; > in https://github.com/apache/kafka/blob/0.10.2/streams/src/ > main/java/org/apache/kafka/streams/processor/internals/ > StreamThread.java#L261 > > It shuts down when it again tries to execute runLoop > > Please check line 91074 > stream-thread [StreamThread-1] Shutting down > > Please let me know if this case if valid and we should not shut down the > thread itself. > > I know this log is a mess and we have made due corrections. > > We ran the streams application again now with 12 threads (3 machines) and > 12 partitions. It ran well for few hours, but then 2 of the machines and 2 > threads of the third machine died. > This was much better result than 40 partitions and same 12 threads scenario > where application used to go in perpetual rebalance state within couple of > hours. > We are in process of analysing those logs and would present out findings. > > Thanks > Sachin > > > On Fri, Feb 10, 2017 at 3:42 PM, Damian Guy <damian....@gmail.com> wrote: > > > Hi Sachin, > > > > The CommitFailedException are thrown because the group is rebalancing. > You > > can see log messages like below happening before the commit failed > > exception: > > > > Attempt to heartbeat failed for group new-part-advice since it is > > rebalancing. > > > > It isn't clear from the logs why the rebalancing is happening. The broker > > logs would have been helpful. Also, timestamps on the kafka streams logs > > would also be useful. > > > > w.r.t to continuing after a CommitFailedException during > > onPartitionsRevoked - will have to think through the scenarios. > > > > On Fri, 10 Feb 2017 at 04:35 Sachin Mittal <sjmit...@gmail.com> wrote: > > > > > Hi, > > > I could manage the streams client log, the server logs were deleted > since > > > time had elapsed and it got rolled over. > > > See if you can figure out something from these. These are not best of > > logs > > > generated. > > > > > > > > > https://dl.dropboxusercontent.com/u/46450177/ > > TestKafkaAdvice.StreamThread-1.log > > > The above log pay attention to StreamThread-1 which fails due to > > > CommitFailedException > > > This mostly seem to be caused by OffsetCommitResponseHandler due to > > unknown > > > member id. > > > > > > > > > https://dl.dropboxusercontent.com/u/46450177/ > > TestKafkaAdvice.StreamThread-4.log > > > The above log pay attention to StreamThread-4 which fails due to > > > CommitFailedException during sendOffsetCommitRequest. > > > This mostly seem to be caused by Attempt to heartbeat failed for group > > > new-part-advice since member id is not valid. > > > > > > Also please let us know why we shutdown the thread if we get > > > CommitFailedException, ideally we catch this exception in > > > onPartitionsRevokedand > > > then thread should continue processing the new partitions assigned. > > > > > > Please let me know if you need any more information from me. > > > > > > Thanks > > > Sachin > > > > > > > > > On Thu, Feb 9, 2017 at 2:14 PM, Damian Guy <damian....@gmail.com> > wrote: > > > > > > > Might be easiest to just send all the logs if possible. > > > > > > > > On Thu, 9 Feb 2017 at 08:10 Sachin Mittal <sjmit...@gmail.com> > wrote: > > > > > > > > > I would try to get the logs soon. > > > > > One quick question, I have three brokers which run in cluster with > > > > default > > > > > logging. > > > > > > > > > > Which log4j logs would be of interest at broker side and which > broker > > > or > > > > do > > > > > I need to send logs from all three. > > > > > > > > > > My topic is partitioned and replicated on all three so kafka-logs > dir > > > > > contains same topic logs. > > > > > > > > > > > > > > > Thanks > > > > > Sachin > > > > > > > > > > > > > > > On Thu, Feb 9, 2017 at 1:32 PM, Damian Guy <damian....@gmail.com> > > > wrote: > > > > > > > > > > > Sachin, > > > > > > > > > > > > Can you provide the full logs from the broker and the streams > app? > > It > > > > is > > > > > > hard to understand what is going on with only snippets of > > > information. > > > > It > > > > > > seems like the rebalance is taking too long, but i can't tell > from > > > > this. > > > > > > > > > > > > Thanks, > > > > > > Damian > > > > > > > > > > > > On Thu, 9 Feb 2017 at 07:53 Sachin Mittal <sjmit...@gmail.com> > > > wrote: > > > > > > > > > > > > > Hi, > > > > > > > In continuation of the CommitFailedException what we observe is > > > that > > > > > when > > > > > > > this happens first time > > > > > > > > > > > > > > ConsumerCoordinator invokes onPartitionsRevoked on > StreamThread. > > > > > > > This calls suspendTasksAndState() which again tries to commit > > > offset > > > > > and > > > > > > > then again the same exception is thrown. > > > > > > > This gets handled at ConsumerCoordinator and it logs it and > then > > re > > > > > > assigns > > > > > > > new partition. > > > > > > > > > > > > > > But stream thread before rethrowing the exception calls > > > > > > > rebalanceException = t; > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/0.10.2/streams/src/ > > > > > > main/java/org/apache/kafka/streams/processor/internals/ > > > > > > StreamThread.java#L261 > > > > > > > > > > > > > > So now when runLoop executes it gets this exception and stream > > > thread > > > > > > > exits. > > > > > > > > > > > > > > Here are the logs > > > > > > > > > > > > > > ----- Exception - happend > > > > > > > > > > > > > > Unsubscribed all topics or patterns and assigned partitions > > > > > > > stream-thread [StreamThread-1] Updating suspended tasks to > > contain > > > > > active > > > > > > > tasks [[0_32, 0_3, 0_20, 0_8]] > > > > > > > stream-thread [StreamThread-1] Removing all active tasks > [[0_32, > > > 0_3, > > > > > > 0_20, > > > > > > > 0_8]] > > > > > > > stream-thread [StreamThread-1] Removing all standby tasks [[]] > > > > > > > User provided listener > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread$1 > for > > > > group > > > > > > > new-part-advice failed on partition revocation > > > > > > > ----- Revocation failed and exception handled > > > > > > > > > > > > > > (Re-)joining group new-part-advice > > > > > > > stream-thread [StreamThread-1] found [advice-stream] topics > > > possibly > > > > > > > matching regex > > > > > > > stream-thread [StreamThread-1] updating builder with > > > > > > > SubscriptionUpdates{updatedTopicSubscriptions=[advice-stream]} > > > > topic(s) > > > > > > > with possible matching regex subscription(s) > > > > > > > Sending JoinGroup ((type: JoinGroupRequest, > > > groupId=new-part-advice, > > > > > > > sessionTimeout=10000, rebalanceTimeout=300000, memberId=, > > > > > > > protocolType=consumer, > > > > > > > > > > > > > > groupProtocols=org.apache.kafka.common.requests. > > JoinGroupRequest$ > > > > > > ProtocolMetadata@300c8a1f > > > > > > > )) > > > > > > > to coordinator 192.168.73.198:9092 (id: 2147483643 > > > <(214)%20748-3643> > > > > <(214)%20748-3643> > > > > > <(214)%20748-3643> > > > > > > > rack: null) > > > > > > > > > > > > > > stream-thread [StreamThread-1] New partitions > [[advice-stream-8, > > > > > > > advice-stream-32, advice-stream-3, advice-stream-20]] assigned > at > > > the > > > > > end > > > > > > > of consumer rebalance. > > > > > > > > > > > > > > ----- Same new partitions assigned on reblance > > > > > > > > > > > > > > stream-thread [StreamThread-1] recycling old task 0_32 > > > > > > > stream-thread [StreamThread-1] recycling old task 0_3 > > > > > > > stream-thread [StreamThread-1] recycling old task 0_20 > > > > > > > stream-thread [StreamThread-1] recycling old task 0_8 > > > > > > > > > > > > > > ----- It then shuts down > > > > > > > > > > > > > > stream-thread [StreamThread-1] Shutting down > > > > > > > stream-thread [StreamThread-1] shutdownTasksAndState: shutting > > down > > > > all > > > > > > > active tasks [[0_32, 0_3, 0_20, 0_8]] and standby tasks [[]] > > > > > > > > > > > > > > Uncaught exception at : Tue Feb 07 21:43:15 IST 2017 > > > > > > > org.apache.kafka.streams.errors.StreamsException: > stream-thread > > > > > > > [StreamThread-1] Failed to rebalance > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > StreamThread.runLoop( > > > > > > StreamThread.java:612) > > > > > > > ~[kafka-streams-0.10.2.0.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > > StreamThread.run(StreamThread.java:368) > > > > > > > ~[kafka-streams-0.10.2.0.jar:na] > > > > > > > Caused by: org.apache.kafka.streams.errors.StreamsException: > > > > > > stream-thread > > > > > > > [StreamThread-1] failed to suspend stream tasks > > > > > > > > > > > > > > > > > > > > > So is there a way we can restart that stream thread again. Also > > in > > > > this > > > > > > > case should we assign it rebalanceException, because this is > > > > > > > CommitFailedException and new partitions are already assigned > and > > > > same > > > > > > > thread can continue processing new partitions. > > > > > > > > > > > > > > Thanks > > > > > > > Sachin > > > > > > > > > > > > > > > > > > > > > On Wed, Feb 8, 2017 at 2:19 PM, Sachin Mittal < > > sjmit...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > Hi All, > > > > > > > > I am trying out the 0.10.2.0 rc. > > > > > > > > We have a source stream of 40 partitions. > > > > > > > > > > > > > > > > We start one instance with 4 threads. > > > > > > > > After that we start second instance with same config on a > > > different > > > > > > > > machine and then same way third instance. > > > > > > > > > > > > > > > > After application reaches steady state we start getting > > > > > > > > CommitFailedException. > > > > > > > > Firstly I feel is that message for CommitFailedException > should > > > > > change. > > > > > > > It > > > > > > > > is thrown from number of places and sometimes is just not > > related > > > > to > > > > > > > > subsequent calls to poll() was longer than the configured > > > > > > > > max.poll.interval.ms. > > > > > > > > > > > > > > > > We have verified this and this is never the case on our case > > when > > > > we > > > > > > get > > > > > > > > the exception. So perhaps introduce another Exception class, > > but > > > > this > > > > > > > > message is very misleading and we end up investigation wrong > > > areas. > > > > > > > > > > > > > > > > Anyway on the places where this exception gets thrown in our > > > cases > > > > > > > > 1. Exception trace is this > > > > > > > > stream-thread [StreamThread-4] Failed while executing > > StreamTask > > > > 0_5 > > > > > > due > > > > > > > > to commit consumer offsets: > > > > > > > > org.apache.kafka.clients.consumer.CommitFailedException: > > Commit > > > > > > cannot be > > > > > > > > completed since the group has already rebalanced and assigned > > the > > > > > > > > partitions to another member. This means that the time > between > > > > > > subsequent > > > > > > > > calls to poll() was longer than the configured > > > > max.poll.interval.ms, > > > > > > > > which typically implies that the poll loop is spending too > much > > > > time > > > > > > > > message processing. You can address this either by increasing > > the > > > > > > session > > > > > > > > timeout or by reducing the maximum size of batches returned > in > > > > poll() > > > > > > > with > > > > > > > > max.poll.records. > > > > > > > > at org.apache.kafka.clients.consumer.internals. > > > > > > ConsumerCoordinator. > > > > > > > > sendOffsetCommitRequest(ConsumerCoordinator.java:698) > > > > > > > > [kafka-clients-0.10.2.0.jar:na] > > > > > > > > at org.apache.kafka.clients.consumer.internals. > > > > > > ConsumerCoordinator. > > > > > > > > commitOffsetsSync(ConsumerCoordinator.java:577) > > > > > > > > [kafka-clients-0.10.2.0.jar:na] > > > > > > > > at org.apache.kafka.clients.consumer.KafkaConsumer. > > > > > > > > commitSync(KafkaConsumer.java:1125) > > [kafka-clients-0.10.2.0.jar: > > > > na] > > > > > > > > > > > > > > > > in the code that line is > > > > > > > > // if the generation is null, we are not part of an active > > group > > > > (and > > > > > > we > > > > > > > > expect to be). > > > > > > > > // the only thing we can do is fail the commit and let the > user > > > > > rejoin > > > > > > > > the group in poll() > > > > > > > > if (generation == null) > > > > > > > > return RequestFuture.failure(new CommitFailedException()); > > > > > > > > > > > > > > > > After we investigate what caused the generation to be null we > > > found > > > > > > that > > > > > > > > sometime before that following was thrown > > > > > > > > > > > > > > > > Attempt to heartbeat failed for group new-part-advice since > > > member > > > > id > > > > > > is > > > > > > > > not valid. > > > > > > > > > > > > > > > > We check the code and we see this > > > > > > > > if (error == Errors.UNKNOWN_MEMBER_ID) { > > > > > > > > log.debug("Attempt to heartbeat failed for group {} since > > member > > > id > > > > > is > > > > > > > > not valid.", groupId); > > > > > > > > resetGeneration(); > > > > > > > > future.raise(Errors.UNKNOWN_MEMBER_ID); > > > > > > > > } > > > > > > > > > > > > > > > > We check the code resetGeneration and find that it does > > > > > > > > this.state = MemberState.UNJOINED; > > > > > > > > > > > > > > > > And then in method generation we do something like this > > > > > > > > if (this.state != MemberState.STABLE) > > > > > > > > return null; > > > > > > > > > > > > > > > > So why are we not returning Generation.NO_GENERATION which > was > > > > being > > > > > > set > > > > > > > > by resetGenerationearlier. > > > > > > > > > > > > > > > > Would this line better? > > > > > > > > if (this.state != MemberState.STABLE && !this.rejoinNeeded) > > > > > > > > return null; > > > > > > > > > > > > > > > > Also why are we getting UNKNOWN_MEMBER_ID exception in first > > > place. > > > > > > > > > > > > > > > > 2. This is the second case: > > > > > > > > org.apache.kafka.clients.consumer.CommitFailedException: > > Commit > > > > > > cannot be > > > > > > > > completed since the group has already rebalanced and assigned > > the > > > > > > > > partitions to another member. This means that the time > between > > > > > > subsequent > > > > > > > > calls to poll() was longer than the configured > > > > max.poll.interval.ms, > > > > > > > > which typically implies that the poll loop is spending too > much > > > > time > > > > > > > > message processing. You can address this either by increasing > > the > > > > > > session > > > > > > > > timeout or by reducing the maximum size of batches returned > in > > > > poll() > > > > > > > with > > > > > > > > max.poll.records. > > > > > > > > at org.apache.kafka.clients.consumer.internals. > > > > > > ConsumerCoordinator$ > > > > > > > > OffsetCommitResponseHandler.handle(ConsumerCoordinator. > > java:766) > > > > > > > > ~[kafka-clients-0.10.2.0.jar:na] > > > > > > > > at org.apache.kafka.clients.consumer.internals. > > > > > > ConsumerCoordinator$ > > > > > > > > OffsetCommitResponseHandler.handle(ConsumerCoordinator. > > java:712) > > > > > > > > ~[kafka-clients-0.10.2.0.jar:na] > > > > > > > > > > > > > > > > Here again when we check the code we see that it failed > inside > > > > > > > > OffsetCommitResponseHandler > > > > > > > > > > > > > > > > if (error == Errors.UNKNOWN_MEMBER_ID > > > > > > > > || error == Errors.ILLEGAL_GENERATION > > > > > > > > || error == Errors.REBALANCE_IN_PROGRESS) { > > > > > > > > // need to re-join group > > > > > > > > log.debug("Offset commit for group {} failed: {}", groupId, > > > error. > > > > > > > > message()); > > > > > > > > resetGeneration(); > > > > > > > > future.raise(new CommitFailedException()); > > > > > > > > return; > > > > > > > > } > > > > > > > > > > > > > > > > And we check check the log message before this we find > > > > > > > > Offset commit for group new-part-advice failed: The > coordinator > > > is > > > > > not > > > > > > > > aware of this member. > > > > > > > > > > > > > > > > This error is again due to UNKNOWN_MEMBER_ID error. > > > > > > > > The handling in this case is similar to one we do for > heartbeat > > > > fail. > > > > > > > > > > > > > > > > So again why are we getting this UNKNOWN_MEMBER_ID error and > > > should > > > > > not > > > > > > > we > > > > > > > > handle it with a different exception as these errors are not > > > > related > > > > > to > > > > > > > > poll time at all. > > > > > > > > > > > > > > > > So please let us know what could be the issue here and how > can > > we > > > > fix > > > > > > > this. > > > > > > > > > > > > > > > > Also not that we are running an identical single thread > > > application > > > > > > which > > > > > > > > source from identical single partitioned source topic, and > that > > > > > > > application > > > > > > > > never fails. > > > > > > > > So it is hard to understand if we are doing something wrong > in > > > our > > > > > > logic. > > > > > > > > > > > > > > > > Also from logs and JMX metrics collection we see that poll is > > > > called > > > > > > well > > > > > > > > withing default 30 seconds, usually it is around 5 seconds. > > > > > > > > > > > > > > > > Please share any ideas. > > > > > > > > > > > > > > > > Thanks > > > > > > > > Sachin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >