[ 
https://issues.apache.org/jira/browse/KAFKA-10758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237778#comment-17237778
 ] 

A. Sophie Blee-Goldman commented on KAFKA-10758:
------------------------------------------------

Hey [~davideicardi], thanks for submitting this ticket. Looks like there's a 
bug in the topic update logic. I've opened a PR which we should be able to get 
into the 2.6.1 and 2.7.0 releases.

Just to answer some of your other questions: yes, this error wouldn't appear 
right after the topic was created but only once the Streams app refreshed its 
topic metadata (5min default as you said). Yes, it should be safe to just 
restart the application after hitting this error as a workaround. And no, this 
was not by design :) 

Sorry for the trouble. Obviously I'd recommend upgrading to 2.6.1 to get the 
fix once it's been released, but for now you should be ok to just ignore it and 
start up the application again. You'll only hit this error once, since after 
the restart there's no need to update anything

> Kafka Streams consuming from a pattern goes to PENDING_SHUTDOWN when adding a 
> new topic
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10758
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10758
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.6.0
>            Reporter: Davide Icardi
>            Assignee: A. Sophie Blee-Goldman
>            Priority: Major
>
> I have a simple Kafka Stream app that consumes from multiple input topics 
> using the _stream_ function that accepts a Pattern 
> ([link|https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#stream-java.util.regex.Pattern-]).
>  
> Whenever I add a new topic that matches the pattern the kafka stream state 
> goes to REBALANCING -> ERROR -> PENDING_SHUTDOWN .
> If I restart the app it correctly starts reading again without problems.
> It is by design? Should I handle this and simply restart the app?
>  
> Kafka Stream version is 2.6.0.
> The error is the following:
> {code:java}
> ERROR o.a.k.s.p.i.ProcessorTopology - Set of source nodes do not match:
> sourceNodesByName = [KSTREAM-SOURCE-0000000003, KSTREAM-SOURCE-0000000002]
> sourceTopicsByName = [KSTREAM-SOURCE-0000000000, KSTREAM-SOURCE-0000000014, 
> KSTREAM-SOURCE-0000000003, KSTREAM-SOURCE-0000000002]
> org.apache.kafka.common.KafkaException: User rebalance callback throws an 
> error
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:436)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
>  Caused by: java.lang.IllegalStateException: Tried to update source topics 
> but source nodes did not match
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorTopology.updateSourceTopics(ProcessorTopology.java:151)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.update(AbstractTask.java:109)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.update(StreamTask.java:514)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateInputPartitionsAndResume(TaskManager.java:397)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:261)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1428)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:279)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:421)
>   ... 10 common frames omitted
>  KafkaStream state is ERROR
>  17:28:53.200 [datalake-StreamThread-1] ERROR 
> o.apache.kafka.streams.KafkaStreams - stream-client [datalake] All stream 
> threads have died. The instance will be in error state and should be closed.
>  ============> User rebalance callback throws an error
>  KafkaStream state is PENDING_SHUTDOWN
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to