[ 
https://issues.apache.org/jira/browse/KAFKA-15834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17787677#comment-17787677
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15834:
------------------------------------------------

Yeah great analysis, thanks [~gharris1727] 

I'm a bit confused by point #4, however – is this a change in behavior 
(possibly related to KIP-848)? It's my understanding that the 
#onPartitionsAssigned callback is guaranteed to always be invoked regardless of 
whether the set of partitions being newly assigned is non-empty or not. This is 
in contrast with the #onPartitionsRevoked and #onPartitionsLost callbacks, 
which are only invoked when the set of partitions to act upon is non-empty.

I think one could argue that this inconsistency is not ideal, but the behavior 
of always invoking #onPartitionsAssigned is a stated guarantee in the public 
contract of ConsumerRebalanceListener. See [this 
paragraph|https://github.com/apache/kafka/blob/254335d24ab6b6d13142dcdb53fec3856c16de9e/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L67]
 of the javadocs. In other words, I don't think we can change this without a 
KIP, and if this behavior was modified recently then we need to revert that 
change until a KIP is accepted.

> Subscribing to non-existent topic blocks StreamThread from stopping
> -------------------------------------------------------------------
>
>                 Key: KAFKA-15834
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15834
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.6.0
>            Reporter: Greg Harris
>            Priority: Major
>
> In 
> NamedTopologyIntegrationTest#shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics
>  a topology is created which references an input topic which does not exist. 
> The test as-written passes, but the KafkaStreams#close(Duration) at the end 
> times out, and leaves StreamsThreads running.
> From some cursory investigation it appears that this is happening:
> 1. The consumer calls the StreamsPartitionAssignor, which calls 
> TaskManager#handleRebalanceStart as a side-effect
> 2. handleRebalanceStart sets the rebalanceInProgress flag
> 3. This flag is checked by StreamThread.runLoop, and causes the loop to 
> remain running.
> 4. The consumer never calls StreamsRebalanceListener#onPartitionsAssigned, 
> because the topic does not exist
> 5. Because no partitions are ever assigned, the 
> TaskManager#handleRebalanceComplete never clears the rebalanceInProgress flag
>  
> This log message is printed in a tight loop while the close is ongoing and 
> the consumer is being polled with zero duration:
> {noformat}
> [2023-11-15 11:42:43,661] WARN [Consumer 
> clientId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics-942756f8-5213-4c44-bb6b-5f805884e026-StreamThread-1-consumer,
>  
> groupId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics]
>  Received unknown topic or partition error in fetch for partition 
> unique_topic_prefix-topology-1-store-repartition-0 
> (org.apache.kafka.clients.consumer.internals.FetchCollector:321)
> {noformat}
> Practically, this means that this test leaks two StreamsThreads and the 
> associated clients and sockets, and delays the completion of the test until 
> the KafkaStreams#close(Duration) call times out.
> Either we should change the rebalanceInProgress flag to avoid getting stuck 
> in this rebalance state, or figure out a way to shut down a StreamsThread 
> that is in an extended rebalance state during shutdown.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to