[ 
https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15616185#comment-15616185
 ] 

Guozhang Wang commented on KAFKA-4355:
--------------------------------------

[~mihbor] Thanks for reporting this. A first note is that in {{Metadata}}, the 
{{topics}} set and {{cluster}} object are used in different ways: {{topics}} 
maintains all the topics that this client is interested in (for fetching, or 
producing), and the {{cluster}} object contains the metadata information 
obtained from the brokers. So it is possible that topic {{scheduler}} is in the 
{{topics}} set, indicating that the embedded consumer of the streams client is 
interested in fetching from this topic, whereas {{cluster}} does not have this 
topic in its map, indicating that this broker does not know this topic, 
probably because the brokers hosting this topic is not available during that 
period of time.

As for Streams, I think it should not throw an exception and fail when seeing 
this situation since it is likely to be transient, instead it could just move 
forward without assigning this topic any more, and expecting another rebalance 
to be triggered when this topic is back to be available.

Will fix this logic in {{DefaultPartitionGrouper}} for this JIRA.

> StreamThread intermittently dies with "Topic not found during partition 
> assignment" when broker restarted
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4355
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4355
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.1.0, 0.10.0.0
>         Environment: kafka 0.10.0.0
> kafka 0.10.1.0
> uname -a
> Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC 
> 2016 x86_64 x86_64 x86_64 GNU/Linux
> java -version
> java version "1.8.0_92"
> Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
>            Reporter: Michal Borowiecki
>            Assignee: Guozhang Wang
>
> When (a) starting kafka streams app before the broker or
> (b) restarting the broker while the streams app is running:
> the stream thread intermittently dies with "Topic not found during partition 
> assignment" StreamsException.
> This happens about between one in 5 or one in 10 times.
> Stack trace:
> {noformat}
> Exception in thread "StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Topic not found during 
> partition assignment: scheduler
>       at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
>       at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
>       at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
> {noformat}
> Our app has 2 streams in it, consuming from 2 different topics.
> Sometimes the exception happens on both stream threads. Sometimes only on one 
> of the stream threads.
> The exception is preceded by:
> {noformat}
> [2016-10-28 16:17:55,239] INFO [StreamThread-2] (Re-)joining group 
> pool-scheduler 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2016-10-28 16:17:55,240] INFO [StreamThread-2] Marking the coordinator 
> lp02485.openbet:19373 (id: 2147483647 rack: null) dead for group 
> pool-scheduler 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2016-10-28 16:17:55,342] INFO [StreamThread-2] Discovered coordinator 
> lp02485.openbet:19373 (id: 2147483647 rack: null) for group pool-scheduler. 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2016-10-28 16:17:55,342] INFO [StreamThread-2] (Re-)joining group 
> pool-scheduler 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2016-10-28 16:17:55,357] INFO [StreamThread-2] stream-thread 
> [StreamThread-2] Completed validating internal topics in partition assignor 
> (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor)
> [2016-10-28 16:17:55,357] INFO [StreamThread-2] stream-thread 
> [StreamThread-2] Shutting down 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2016-10-28 16:17:55,357] INFO [StreamThread-2] stream-thread 
> [StreamThread-2] Shutting down 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2016-10-28 16:17:55,358] INFO [StreamThread-2] Closing the Kafka producer 
> with timeoutMillis = 9223372036854775807 ms. 
> (org.apache.kafka.clients.producer.KafkaProducer)
> [2016-10-28 16:17:55,364] INFO [StreamThread-2] stream-thread 
> [StreamThread-2] Removing all active tasks [[]] 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2016-10-28 16:17:55,364] INFO [StreamThread-2] stream-thread 
> [StreamThread-2] Removing all active tasks [[]] 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2016-10-28 16:17:55,365] INFO [StreamThread-2] stream-thread 
> [StreamThread-2] Removing all standby tasks [[]] 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2016-10-28 16:17:55,365] INFO [StreamThread-2] stream-thread 
> [StreamThread-2] Removing all standby tasks [[]] 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2016-10-28 16:17:55,365] INFO [StreamThread-2] stream-thread 
> [StreamThread-2] Stream thread shutdown complete 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2016-10-28 16:17:55,365] INFO [StreamThread-2] stream-thread 
> [StreamThread-2] Stream thread shutdown complete 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> Exception in thread "StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Topic not found during 
> partition assignment: scheduler
> {noformat}
> This is happening regardless if we use kafka streams and broker versions 
> 0.10.0.0 or 0.10.1.0



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to