[ 
https://issues.apache.org/jira/browse/KAFKA-10758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237407#comment-17237407
 ] 

Davide Icardi commented on KAFKA-10758:
---------------------------------------

[~cadonna] here the details requested:

I have this code:

 
{code:java}
private val inputCommandsStream =
    streamsBuilder.stream[Key, 
Envelop[RawDataCommand]](Pattern.compile("^ingestion\\.datalake\\..+\\..+\\.commands$"))
private val inputEventStream =
    streamsBuilder.stream[Key, 
Envelop[RawDataEvent]](Pattern.compile("^ingestion\\.datalake\\..+\\..+\\.events"))
{code}
And here the topology:

 
{code:java}
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [ingestion.datasources.events])
      --> KSTREAM-PROCESSOR-0000000001
    Processor: KSTREAM-PROCESSOR-0000000001 (stores: [])
      --> none
      <-- KSTREAM-SOURCE-0000000000  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000002 (topics: 
^ingestion\.datalake\..+\..+\.commands$)
      --> KSTREAM-LEFTJOIN-0000000006
    Processor: KSTREAM-LEFTJOIN-0000000006 (stores: 
[ingestion.datalake.store.snapshots])
      --> KSTREAM-MAP-0000000010, KSTREAM-SINK-0000000007
      <-- KSTREAM-SOURCE-0000000002
    Source: KSTREAM-SOURCE-0000000003 (topics: 
^ingestion\.datalake\..+\..+\.events)
      --> KSTREAM-FILTER-0000000004
    Processor: KSTREAM-FILTER-0000000004 (stores: [])
      --> KSTREAM-AGGREGATE-0000000005
      <-- KSTREAM-SOURCE-0000000003
    Processor: KSTREAM-AGGREGATE-0000000005 (stores: 
[ingestion.datalake.store.snapshots])
      --> KTABLE-TOSTREAM-0000000008
      <-- KSTREAM-FILTER-0000000004
    Processor: KSTREAM-MAP-0000000010 (stores: [])
      --> KSTREAM-FILTER-0000000013
      <-- KSTREAM-LEFTJOIN-0000000006
    Processor: KSTREAM-FILTER-0000000013 (stores: [])
      --> KSTREAM-SINK-0000000012
      <-- KSTREAM-MAP-0000000010
    Processor: KTABLE-TOSTREAM-0000000008 (stores: [])
      --> KSTREAM-SINK-0000000009
      <-- KSTREAM-AGGREGATE-0000000005
    Sink: KSTREAM-SINK-0000000007 (extractor class: 
service.streaming.EventStreamTopicNameExtractor@20801cbb)
      <-- KSTREAM-LEFTJOIN-0000000006
    Sink: KSTREAM-SINK-0000000009 (extractor class: 
service.streaming.SnapshotStreamTopicNameExtractor@1c240cf2)
      <-- KTABLE-TOSTREAM-0000000008
    Sink: KSTREAM-SINK-0000000012 (topic: 
KSTREAM-TOTABLE-0000000011-repartition)
      <-- KSTREAM-FILTER-0000000013  Sub-topology: 2
    Source: KSTREAM-SOURCE-0000000014 (topics: 
[KSTREAM-TOTABLE-0000000011-repartition])
      --> KSTREAM-TOTABLE-0000000011
    Processor: KSTREAM-TOTABLE-0000000011 (stores: 
[ingestion.datalake.store.eventsByMsgId])
      --> none
      <-- KSTREAM-SOURCE-0000000014

{code}
 

It is a work in progress, for sure to be optimized, but I don't understand the 
reason for the error.

 

thanks!

> Kafka Streams consuming from a pattern goes to PENDING_SHUTDOWN when adding a 
> new topic
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10758
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10758
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.6.0
>            Reporter: Davide Icardi
>            Priority: Major
>
> I have a simple Kafka Stream app that consumes from multiple input topics 
> using the _stream_ function that accepts a Pattern 
> ([link|https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#stream-java.util.regex.Pattern-]).
>  
> Whenever I add a new topic that matches the pattern the kafka stream state 
> goes to REBALANCING -> ERROR -> PENDING_SHUTDOWN .
> If I restart the app it correctly starts reading again without problems.
> It is by design? Should I handle this and simply restart the app?
>  
> Kafka Stream version is 2.6.0.
> The error is the following:
> {code:java}
> ERROR o.a.k.s.p.i.ProcessorTopology - Set of source nodes do not match:
> sourceNodesByName = [KSTREAM-SOURCE-0000000003, KSTREAM-SOURCE-0000000002]
> sourceTopicsByName = [KSTREAM-SOURCE-0000000000, KSTREAM-SOURCE-0000000014, 
> KSTREAM-SOURCE-0000000003, KSTREAM-SOURCE-0000000002]
> org.apache.kafka.common.KafkaException: User rebalance callback throws an 
> error
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:436)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
>  Caused by: java.lang.IllegalStateException: Tried to update source topics 
> but source nodes did not match
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorTopology.updateSourceTopics(ProcessorTopology.java:151)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.update(AbstractTask.java:109)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.update(StreamTask.java:514)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateInputPartitionsAndResume(TaskManager.java:397)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:261)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1428)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:279)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:421)
>   ... 10 common frames omitted
>  KafkaStream state is ERROR
>  17:28:53.200 [datalake-StreamThread-1] ERROR 
> o.apache.kafka.streams.KafkaStreams - stream-client [datalake] All stream 
> threads have died. The instance will be in error state and should be closed.
>  ============> User rebalance callback throws an error
>  KafkaStream state is PENDING_SHUTDOWN
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to