[ https://issues.apache.org/jira/browse/KAFKA-10758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237407#comment-17237407 ]
Davide Icardi commented on KAFKA-10758: --------------------------------------- [~cadonna] here the details requested: I have this code: {code:java} private val inputCommandsStream = streamsBuilder.stream[Key, Envelop[RawDataCommand]](Pattern.compile("^ingestion\\.datalake\\..+\\..+\\.commands$")) private val inputEventStream = streamsBuilder.stream[Key, Envelop[RawDataEvent]](Pattern.compile("^ingestion\\.datalake\\..+\\..+\\.events")) {code} And here the topology: {code:java} Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [ingestion.datasources.events]) --> KSTREAM-PROCESSOR-0000000001 Processor: KSTREAM-PROCESSOR-0000000001 (stores: []) --> none <-- KSTREAM-SOURCE-0000000000 Sub-topology: 1 Source: KSTREAM-SOURCE-0000000002 (topics: ^ingestion\.datalake\..+\..+\.commands$) --> KSTREAM-LEFTJOIN-0000000006 Processor: KSTREAM-LEFTJOIN-0000000006 (stores: [ingestion.datalake.store.snapshots]) --> KSTREAM-MAP-0000000010, KSTREAM-SINK-0000000007 <-- KSTREAM-SOURCE-0000000002 Source: KSTREAM-SOURCE-0000000003 (topics: ^ingestion\.datalake\..+\..+\.events) --> KSTREAM-FILTER-0000000004 Processor: KSTREAM-FILTER-0000000004 (stores: []) --> KSTREAM-AGGREGATE-0000000005 <-- KSTREAM-SOURCE-0000000003 Processor: KSTREAM-AGGREGATE-0000000005 (stores: [ingestion.datalake.store.snapshots]) --> KTABLE-TOSTREAM-0000000008 <-- KSTREAM-FILTER-0000000004 Processor: KSTREAM-MAP-0000000010 (stores: []) --> KSTREAM-FILTER-0000000013 <-- KSTREAM-LEFTJOIN-0000000006 Processor: KSTREAM-FILTER-0000000013 (stores: []) --> KSTREAM-SINK-0000000012 <-- KSTREAM-MAP-0000000010 Processor: KTABLE-TOSTREAM-0000000008 (stores: []) --> KSTREAM-SINK-0000000009 <-- KSTREAM-AGGREGATE-0000000005 Sink: KSTREAM-SINK-0000000007 (extractor class: service.streaming.EventStreamTopicNameExtractor@20801cbb) <-- KSTREAM-LEFTJOIN-0000000006 Sink: KSTREAM-SINK-0000000009 (extractor class: service.streaming.SnapshotStreamTopicNameExtractor@1c240cf2) <-- KTABLE-TOSTREAM-0000000008 Sink: KSTREAM-SINK-0000000012 (topic: KSTREAM-TOTABLE-0000000011-repartition) <-- KSTREAM-FILTER-0000000013 Sub-topology: 2 Source: KSTREAM-SOURCE-0000000014 (topics: [KSTREAM-TOTABLE-0000000011-repartition]) --> KSTREAM-TOTABLE-0000000011 Processor: KSTREAM-TOTABLE-0000000011 (stores: [ingestion.datalake.store.eventsByMsgId]) --> none <-- KSTREAM-SOURCE-0000000014 {code} It is a work in progress, for sure to be optimized, but I don't understand the reason for the error. thanks! > Kafka Streams consuming from a pattern goes to PENDING_SHUTDOWN when adding a > new topic > --------------------------------------------------------------------------------------- > > Key: KAFKA-10758 > URL: https://issues.apache.org/jira/browse/KAFKA-10758 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.6.0 > Reporter: Davide Icardi > Priority: Major > > I have a simple Kafka Stream app that consumes from multiple input topics > using the _stream_ function that accepts a Pattern > ([link|https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#stream-java.util.regex.Pattern-]). > > Whenever I add a new topic that matches the pattern the kafka stream state > goes to REBALANCING -> ERROR -> PENDING_SHUTDOWN . > If I restart the app it correctly starts reading again without problems. > It is by design? Should I handle this and simply restart the app? > > Kafka Stream version is 2.6.0. > The error is the following: > {code:java} > ERROR o.a.k.s.p.i.ProcessorTopology - Set of source nodes do not match: > sourceNodesByName = [KSTREAM-SOURCE-0000000003, KSTREAM-SOURCE-0000000002] > sourceTopicsByName = [KSTREAM-SOURCE-0000000000, KSTREAM-SOURCE-0000000014, > KSTREAM-SOURCE-0000000003, KSTREAM-SOURCE-0000000002] > org.apache.kafka.common.KafkaException: User rebalance callback throws an > error > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:436) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) > Caused by: java.lang.IllegalStateException: Tried to update source topics > but source nodes did not match > at > org.apache.kafka.streams.processor.internals.ProcessorTopology.updateSourceTopics(ProcessorTopology.java:151) > at > org.apache.kafka.streams.processor.internals.AbstractTask.update(AbstractTask.java:109) > at > org.apache.kafka.streams.processor.internals.StreamTask.update(StreamTask.java:514) > at > org.apache.kafka.streams.processor.internals.TaskManager.updateInputPartitionsAndResume(TaskManager.java:397) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:261) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1428) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:279) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:421) > ... 10 common frames omitted > KafkaStream state is ERROR > 17:28:53.200 [datalake-StreamThread-1] ERROR > o.apache.kafka.streams.KafkaStreams - stream-client [datalake] All stream > threads have died. The instance will be in error state and should be closed. > ============> User rebalance callback throws an error > KafkaStream state is PENDING_SHUTDOWN > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)