[ 
https://issues.apache.org/jira/browse/KAFKA-15834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17786564#comment-17786564
 ] 

Matthias J. Sax commented on KAFKA-15834:
-----------------------------------------

Thank for reporting and such a detailed analysis, [~gharris1727]! 

> Subscribing to non-existent topic blocks StreamThread from stopping
> -------------------------------------------------------------------
>
>                 Key: KAFKA-15834
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15834
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.6.0
>            Reporter: Greg Harris
>            Priority: Major
>
> In 
> NamedTopologyIntegrationTest#shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics
>  a topology is created which references an input topic which does not exist. 
> The test as-written passes, but the KafkaStreams#close(Duration) at the end 
> times out, and leaves StreamsThreads running.
> From some cursory investigation it appears that this is happening:
> 1. The consumer calls the StreamsPartitionAssignor, which calls 
> TaskManager#handleRebalanceStart as a side-effect
> 2. handleRebalanceStart sets the rebalanceInProgress flag
> 3. This flag is checked by StreamThread.runLoop, and causes the loop to 
> remain running.
> 4. The consumer never calls StreamsRebalanceListener#onPartitionsAssigned, 
> because the topic does not exist
> 5. Because no partitions are ever assigned, the 
> TaskManager#handleRebalanceComplete never clears the rebalanceInProgress flag
>  
> This log message is printed in a tight loop while the close is ongoing and 
> the consumer is being polled with zero duration:
> {noformat}
> [2023-11-15 11:42:43,661] WARN [Consumer 
> clientId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics-942756f8-5213-4c44-bb6b-5f805884e026-StreamThread-1-consumer,
>  
> groupId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics]
>  Received unknown topic or partition error in fetch for partition 
> unique_topic_prefix-topology-1-store-repartition-0 
> (org.apache.kafka.clients.consumer.internals.FetchCollector:321)
> {noformat}
> Practically, this means that this test leaks two StreamsThreads and the 
> associated clients and sockets, and delays the completion of the test until 
> the KafkaStreams#close(Duration) call times out.
> Either we should change the rebalanceInProgress flag to avoid getting stuck 
> in this rebalance state, or figure out a way to shut down a StreamsThread 
> that is in an extended rebalance state during shutdown.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to