Greg Harris created KAFKA-15834: ----------------------------------- Summary: Subscribing to non-existent topic blocks StreamThread from stopping Key: KAFKA-15834 URL: https://issues.apache.org/jira/browse/KAFKA-15834 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.6.0 Reporter: Greg Harris
In NamedTopologyIntegrationTest#shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics a topology is created which references an input topic which does not exist. The test as-written passes, but the KafkaStreams#close(Duration) at the end times out, and leaves StreamsThreads running. >From some cursory investigation it appears that this is happening: 1. The consumer calls the StreamsPartitionAssignor, which calls TaskManager#handleRebalanceStart as a side-effect 2. handleRebalanceStart sets the rebalanceInProgress flag 3. This flag is checked by StreamThread.runLoop, and causes the loop to remain running. 4. The consumer never calls StreamsRebalanceListener#onPartitionsAssigned, because the topic does not exist 5. Because no partitions are ever assigned, the TaskManager#handleRebalanceComplete never clears the rebalanceInProgress flag This log message is printed in a tight loop while the close is ongoing and the consumer is being polled with zero duration: {noformat} [2023-11-15 11:42:43,661] WARN [Consumer clientId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics-942756f8-5213-4c44-bb6b-5f805884e026-StreamThread-1-consumer, groupId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics] Received unknown topic or partition error in fetch for partition unique_topic_prefix-topology-1-store-repartition-0 (org.apache.kafka.clients.consumer.internals.FetchCollector:321) {noformat} Practically, this means that this test leaks two StreamsThreads and the associated clients and sockets, and delays the completion of the test until the KafkaStreams#close(Duration) call times out. Either we should change the rebalanceInProgress flag to avoid getting stuck in this rebalance state, or figure out a way to shut down a StreamsThread that is in an extended rebalance state during shutdown. -- This message was sent by Atlassian Jira (v8.20.10#820010)