This is an automated email from the ASF dual-hosted git repository.

bowenli86 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 61f81bd0 [FLINK-39514] Release dynamic Kafka split outputs on metadata 
shrink (#245)
61f81bd0 is described below

commit 61f81bd0e66d4a8e12048fbf96283650871ad8aa
Author: lnbest0707 <[email protected]>
AuthorDate: Fri Apr 24 09:11:49 2026 -0700

    [FLINK-39514] Release dynamic Kafka split outputs on metadata shrink (#245)
    
    - Release Dynamic Kafka Source split-local outputs when metadata refresh 
removes assigned splits.
    - Defer releases until the runtime ReaderOutput exists for restored/pending 
splits filtered during startup.
    - Add reader tests for both assigned split removal and restored pending 
split cleanup.
---
 .../source/reader/DynamicKafkaSourceReader.java    | 30 +++++++++++
 .../reader/DynamicKafkaSourceReaderTest.java       | 63 ++++++++++++++++++++++
 2 files changed, 93 insertions(+)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
index ca153895..3e4d3cf6 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
@@ -93,12 +93,14 @@ public class DynamicKafkaSourceReader<T> implements 
SourceReader<T, DynamicKafka
     private final NavigableMap<String, KafkaSourceReader<T>> clusterReaderMap;
     private final Map<String, Properties> clustersProperties;
     private final List<DynamicKafkaSourceSplit> pendingSplits;
+    private final Set<String> pendingSplitOutputReleases;
 
     private MultipleFuturesAvailabilityHelper availabilityHelper;
     private int availabilityHelperSize;
     private boolean isActivelyConsumingSplits;
     private boolean isNoMoreSplits;
     private AtomicBoolean restartingReaders;
+    private ReaderOutput<T> latestReaderOutput;
 
     public DynamicKafkaSourceReader(
             SourceReaderContext readerContext,
@@ -121,6 +123,7 @@ public class DynamicKafkaSourceReader<T> implements 
SourceReader<T, DynamicKafka
         this.isActivelyConsumingSplits = false;
         this.restartingReaders = new AtomicBoolean();
         this.clustersProperties = new HashMap<>();
+        this.pendingSplitOutputReleases = new HashSet<>();
     }
 
     /**
@@ -137,6 +140,9 @@ public class DynamicKafkaSourceReader<T> implements 
SourceReader<T, DynamicKafka
 
     @Override
     public InputStatus pollNext(ReaderOutput<T> readerOutput) throws Exception 
{
+        latestReaderOutput = readerOutput;
+        releasePendingSplitOutputs(readerOutput);
+
         // at startup, do not return end of input if metadata event has not 
been received
         if (clusterReaderMap.isEmpty()) {
             return logAndReturnInputStatus(InputStatus.NOTHING_AVAILABLE);
@@ -183,6 +189,9 @@ public class DynamicKafkaSourceReader<T> implements 
SourceReader<T, DynamicKafka
     @Override
     public void addSplits(List<DynamicKafkaSourceSplit> splits) {
         logger.info("Adding splits to reader {}: {}", 
readerContext.getIndexOfSubtask(), splits);
+        for (DynamicKafkaSourceSplit split : splits) {
+            pendingSplitOutputReleases.remove(split.splitId());
+        }
 
         // at startup, don't add splits until we get confirmation from 
enumerator of the current
         // metadata
@@ -291,6 +300,7 @@ public class DynamicKafkaSourceReader<T> implements 
SourceReader<T, DynamicKafka
                         .computeIfAbsent(split.getKafkaClusterId(), (ignore) 
-> new ArrayList<>())
                         .add(split);
             } else {
+                releaseOrDeferSplitOutput(split.splitId());
                 logger.info("Skipping outdated split due to metadata changes: 
{}", split);
             }
         }
@@ -345,6 +355,7 @@ public class DynamicKafkaSourceReader<T> implements 
SourceReader<T, DynamicKafka
                                                 isSplitForActiveClusters(
                                                         pendingSplit, 
newClustersAndTopics);
                                         if (!splitValid) {
+                                            
releaseOrDeferSplitOutput(pendingSplit.splitId());
                                             logger.info(
                                                     "Removing invalid split 
for reader: {}",
                                                     pendingSplit);
@@ -361,6 +372,25 @@ public class DynamicKafkaSourceReader<T> implements 
SourceReader<T, DynamicKafka
         }
     }
 
+    private void releaseOrDeferSplitOutput(String splitId) {
+        if (latestReaderOutput == null) {
+            pendingSplitOutputReleases.add(splitId);
+        } else {
+            latestReaderOutput.releaseOutputForSplit(splitId);
+        }
+    }
+
+    private void releasePendingSplitOutputs(ReaderOutput<T> readerOutput) {
+        if (pendingSplitOutputReleases.isEmpty()) {
+            return;
+        }
+
+        for (String splitId : pendingSplitOutputReleases) {
+            readerOutput.releaseOutputForSplit(splitId);
+        }
+        pendingSplitOutputReleases.clear();
+    }
+
     private static boolean isSplitForActiveClusters(
             DynamicKafkaSourceSplit split, Map<String, Set<String>> metadata) {
         return metadata.containsKey(split.getKafkaClusterId())
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java
index b4226cfd..f7c91fb5 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java
@@ -76,6 +76,19 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 public class DynamicKafkaSourceReaderTest extends 
SourceReaderTestBase<DynamicKafkaSourceSplit> {
     private static final String TOPIC = "DynamicKafkaSourceReaderTest";
 
+    private static class TrackingReaderOutput<E> extends 
TestingReaderOutput<E> {
+        private final List<String> releasedSplitIds = new ArrayList<>();
+
+        @Override
+        public void releaseOutputForSplit(String splitId) {
+            releasedSplitIds.add(splitId);
+        }
+
+        private List<String> releasedSplitIds() {
+            return releasedSplitIds;
+        }
+    }
+
     // we are testing two clusters and SourceReaderTestBase expects there to 
be a total of 10 splits
     private static final int NUM_SPLITS_PER_CLUSTER = 5;
 
@@ -122,9 +135,59 @@ public class DynamicKafkaSourceReaderTest extends 
SourceReaderTestBase<DynamicKa
                     splits.stream()
                             .filter(split -> 
!split.getKafkaClusterId().equals(kafkaClusterId0))
                             .collect(Collectors.toList());
+            List<String> cluster0SplitIds =
+                    splits.stream()
+                            .filter(split -> 
split.getKafkaClusterId().equals(kafkaClusterId0))
+                            .map(DynamicKafkaSourceSplit::splitId)
+                            .collect(Collectors.toList());
             assertThat(reader.snapshotState(-1))
                     .as("The splits should not contain any split related to 
cluster 0")
                     
.containsExactlyInAnyOrderElementsOf(splitsWithoutCluster0);
+
+            TrackingReaderOutput<Integer> readerOutput = new 
TrackingReaderOutput<>();
+            assertThat(readerOutput.releasedSplitIds()).isEmpty();
+            reader.pollNext(readerOutput);
+            assertThat(readerOutput.releasedSplitIds())
+                    .as(
+                            "Pending restored split outputs should be released 
once ReaderOutput exists")
+                    .containsExactlyInAnyOrderElementsOf(cluster0SplitIds);
+        }
+    }
+
+    @Test
+    void testMetadataRemovalReleasesAssignedSplitOutputs() throws Exception {
+        TestingReaderContext context = new TestingReaderContext();
+        try (DynamicKafkaSourceReader<Integer> reader = 
createReaderWithoutStart(context)) {
+            reader.start();
+            
reader.handleSourceEvents(DynamicKafkaSourceTestHelper.getMetadataUpdateEvent(TOPIC));
+
+            DynamicKafkaSourceSplit cluster0Split =
+                    new DynamicKafkaSourceSplit(
+                            kafkaClusterId0,
+                            new KafkaPartitionSplit(
+                                    new TopicPartition(TOPIC, 0),
+                                    0L,
+                                    KafkaPartitionSplit.NO_STOPPING_OFFSET));
+            DynamicKafkaSourceSplit cluster1Split =
+                    new DynamicKafkaSourceSplit(
+                            kafkaClusterId1,
+                            new KafkaPartitionSplit(
+                                    new TopicPartition(TOPIC, 0),
+                                    0L,
+                                    KafkaPartitionSplit.NO_STOPPING_OFFSET));
+            reader.addSplits(ImmutableList.of(cluster0Split, cluster1Split));
+
+            TrackingReaderOutput<Integer> readerOutput = new 
TrackingReaderOutput<>();
+            reader.pollNext(readerOutput);
+
+            KafkaStream kafkaStream = 
DynamicKafkaSourceTestHelper.getKafkaStream(TOPIC);
+            kafkaStream.getClusterMetadataMap().remove(kafkaClusterId0);
+            reader.handleSourceEvents(new 
MetadataUpdateEvent(Collections.singleton(kafkaStream)));
+
+            assertThat(readerOutput.releasedSplitIds())
+                    .as(
+                            "Removed assigned splits should stop contributing 
to split-local watermarks")
+                    .containsExactly(cluster0Split.splitId());
         }
     }
 

Reply via email to