Bowen Li created FLINK-39979:
--------------------------------

             Summary: DynamicKafkaSource does not mark reader idle when 
metadata removal leaves a subtask with no active subreaders
                 Key: FLINK-39979
                 URL: https://issues.apache.org/jira/browse/FLINK-39979
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 2.2.0
            Reporter: Bowen Li


When DynamicKafkaSource consumes a metadata update that removes all assigned 
splits for a source subtask, the subtask can end up with an empty 
{{{}clusterReaderMap{}}}. The reader releases removed split outputs, but 
because there is no child {{KafkaSourceReader}} left, no child can call 
{{{}markIdle(){}}}.

The aggregate idleness logic currently only treats the reader as idle when 
{{clusterReaderMap}} is non-empty and every child reader is idle. That misses 
the “all children removed after metadata update” case, so the source subtask 
can remain non-idle and hold back watermarks. In production this manifested as 
frozen watermarks and delayed partition release after a large analytics topic 
migration removed old clusters.

*Expected Behavior*
After metadata removal leaves a DynamicKafkaSourceReader with no active 
subreaders, the reader should mark its aggregate output idle so it does not 
block downstream watermarks.

*Actual Behavior*
The reader returns {{{}NOTHING_AVAILABLE{}}}, but does not mark idle because 
{{clusterReaderMap.isEmpty()}} makes the aggregate idleness check false.

*Impact*
Large cluster/topic removals can freeze watermarks for affected subtasks even 
when Kafka lag is low, delaying StreamFlink partition release and triggering 
false/high-severity data freshness alerts.

*Proposed Fix*
Treat an empty {{clusterReaderMap}} as idle once the reader has already 
received metadata / started active consumption. Keep startup behavior unchanged 
so a fresh reader with no metadata yet does not incorrectly become idle.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to