Bowen Li created FLINK-39979:
--------------------------------
Summary: DynamicKafkaSource does not mark reader idle when
metadata removal leaves a subtask with no active subreaders
Key: FLINK-39979
URL: https://issues.apache.org/jira/browse/FLINK-39979
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Affects Versions: 2.2.0
Reporter: Bowen Li
When DynamicKafkaSource consumes a metadata update that removes all assigned
splits for a source subtask, the subtask can end up with an empty
{{{}clusterReaderMap{}}}. The reader releases removed split outputs, but
because there is no child {{KafkaSourceReader}} left, no child can call
{{{}markIdle(){}}}.
The aggregate idleness logic currently only treats the reader as idle when
{{clusterReaderMap}} is non-empty and every child reader is idle. That misses
the “all children removed after metadata update” case, so the source subtask
can remain non-idle and hold back watermarks. In production this manifested as
frozen watermarks and delayed partition release after a large analytics topic
migration removed old clusters.
*Expected Behavior*
After metadata removal leaves a DynamicKafkaSourceReader with no active
subreaders, the reader should mark its aggregate output idle so it does not
block downstream watermarks.
*Actual Behavior*
The reader returns {{{}NOTHING_AVAILABLE{}}}, but does not mark idle because
{{clusterReaderMap.isEmpty()}} makes the aggregate idleness check false.
*Impact*
Large cluster/topic removals can freeze watermarks for affected subtasks even
when Kafka lag is low, delaying StreamFlink partition release and triggering
false/high-severity data freshness alerts.
*Proposed Fix*
Treat an empty {{clusterReaderMap}} as idle once the reader has already
received metadata / started active consumption. Keep startup behavior unchanged
so a fresh reader with no metadata yet does not incorrectly become idle.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)