[ https://issues.apache.org/jira/browse/KAFKA-15834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Greg Harris resolved KAFKA-15834. --------------------------------- Resolution: Fixed > Subscribing to non-existent topic blocks StreamThread from stopping > ------------------------------------------------------------------- > > Key: KAFKA-15834 > URL: https://issues.apache.org/jira/browse/KAFKA-15834 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.6.0 > Reporter: Greg Harris > Priority: Major > Fix For: 3.8.0 > > > In > NamedTopologyIntegrationTest#shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics > a topology is created which references an input topic which does not exist. > The test as-written passes, but the KafkaStreams#close(Duration) at the end > times out, and leaves StreamsThreads running. > From some cursory investigation it appears that this is happening: > 1. The consumer calls the StreamsPartitionAssignor, which calls > TaskManager#handleRebalanceStart as a side-effect > 2. handleRebalanceStart sets the rebalanceInProgress flag > 3. This flag is checked by StreamThread.runLoop, and causes the loop to > remain running. > 4. The consumer never calls StreamsRebalanceListener#onPartitionsAssigned, > because the topic does not exist > 5. Because no partitions are ever assigned, the > TaskManager#handleRebalanceComplete never clears the rebalanceInProgress flag > > This log message is printed in a tight loop while the close is ongoing and > the consumer is being polled with zero duration: > {noformat} > [2023-11-15 11:42:43,661] WARN [Consumer > clientId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics-942756f8-5213-4c44-bb6b-5f805884e026-StreamThread-1-consumer, > > groupId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics] > Received unknown topic or partition error in fetch for partition > unique_topic_prefix-topology-1-store-repartition-0 > (org.apache.kafka.clients.consumer.internals.FetchCollector:321) > {noformat} > Practically, this means that this test leaks two StreamsThreads and the > associated clients and sockets, and delays the completion of the test until > the KafkaStreams#close(Duration) call times out. > Either we should change the rebalanceInProgress flag to avoid getting stuck > in this rebalance state, or figure out a way to shut down a StreamsThread > that is in an extended rebalance state during shutdown. -- This message was sent by Atlassian Jira (v8.20.10#820010)