This is an automated email from the ASF dual-hosted git repository.
bowenli86 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new 61f81bd0 [FLINK-39514] Release dynamic Kafka split outputs on metadata
shrink (#245)
61f81bd0 is described below
commit 61f81bd0e66d4a8e12048fbf96283650871ad8aa
Author: lnbest0707 <[email protected]>
AuthorDate: Fri Apr 24 09:11:49 2026 -0700
[FLINK-39514] Release dynamic Kafka split outputs on metadata shrink (#245)
- Release Dynamic Kafka Source split-local outputs when metadata refresh
removes assigned splits.
- Defer releases until the runtime ReaderOutput exists for restored/pending
splits filtered during startup.
- Add reader tests for both assigned split removal and restored pending
split cleanup.
---
.../source/reader/DynamicKafkaSourceReader.java | 30 +++++++++++
.../reader/DynamicKafkaSourceReaderTest.java | 63 ++++++++++++++++++++++
2 files changed, 93 insertions(+)
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
index ca153895..3e4d3cf6 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
@@ -93,12 +93,14 @@ public class DynamicKafkaSourceReader<T> implements
SourceReader<T, DynamicKafka
private final NavigableMap<String, KafkaSourceReader<T>> clusterReaderMap;
private final Map<String, Properties> clustersProperties;
private final List<DynamicKafkaSourceSplit> pendingSplits;
+ private final Set<String> pendingSplitOutputReleases;
private MultipleFuturesAvailabilityHelper availabilityHelper;
private int availabilityHelperSize;
private boolean isActivelyConsumingSplits;
private boolean isNoMoreSplits;
private AtomicBoolean restartingReaders;
+ private ReaderOutput<T> latestReaderOutput;
public DynamicKafkaSourceReader(
SourceReaderContext readerContext,
@@ -121,6 +123,7 @@ public class DynamicKafkaSourceReader<T> implements
SourceReader<T, DynamicKafka
this.isActivelyConsumingSplits = false;
this.restartingReaders = new AtomicBoolean();
this.clustersProperties = new HashMap<>();
+ this.pendingSplitOutputReleases = new HashSet<>();
}
/**
@@ -137,6 +140,9 @@ public class DynamicKafkaSourceReader<T> implements
SourceReader<T, DynamicKafka
@Override
public InputStatus pollNext(ReaderOutput<T> readerOutput) throws Exception
{
+ latestReaderOutput = readerOutput;
+ releasePendingSplitOutputs(readerOutput);
+
// at startup, do not return end of input if metadata event has not
been received
if (clusterReaderMap.isEmpty()) {
return logAndReturnInputStatus(InputStatus.NOTHING_AVAILABLE);
@@ -183,6 +189,9 @@ public class DynamicKafkaSourceReader<T> implements
SourceReader<T, DynamicKafka
@Override
public void addSplits(List<DynamicKafkaSourceSplit> splits) {
logger.info("Adding splits to reader {}: {}",
readerContext.getIndexOfSubtask(), splits);
+ for (DynamicKafkaSourceSplit split : splits) {
+ pendingSplitOutputReleases.remove(split.splitId());
+ }
// at startup, don't add splits until we get confirmation from
enumerator of the current
// metadata
@@ -291,6 +300,7 @@ public class DynamicKafkaSourceReader<T> implements
SourceReader<T, DynamicKafka
.computeIfAbsent(split.getKafkaClusterId(), (ignore)
-> new ArrayList<>())
.add(split);
} else {
+ releaseOrDeferSplitOutput(split.splitId());
logger.info("Skipping outdated split due to metadata changes:
{}", split);
}
}
@@ -345,6 +355,7 @@ public class DynamicKafkaSourceReader<T> implements
SourceReader<T, DynamicKafka
isSplitForActiveClusters(
pendingSplit,
newClustersAndTopics);
if (!splitValid) {
+
releaseOrDeferSplitOutput(pendingSplit.splitId());
logger.info(
"Removing invalid split
for reader: {}",
pendingSplit);
@@ -361,6 +372,25 @@ public class DynamicKafkaSourceReader<T> implements
SourceReader<T, DynamicKafka
}
}
+ private void releaseOrDeferSplitOutput(String splitId) {
+ if (latestReaderOutput == null) {
+ pendingSplitOutputReleases.add(splitId);
+ } else {
+ latestReaderOutput.releaseOutputForSplit(splitId);
+ }
+ }
+
+ private void releasePendingSplitOutputs(ReaderOutput<T> readerOutput) {
+ if (pendingSplitOutputReleases.isEmpty()) {
+ return;
+ }
+
+ for (String splitId : pendingSplitOutputReleases) {
+ readerOutput.releaseOutputForSplit(splitId);
+ }
+ pendingSplitOutputReleases.clear();
+ }
+
private static boolean isSplitForActiveClusters(
DynamicKafkaSourceSplit split, Map<String, Set<String>> metadata) {
return metadata.containsKey(split.getKafkaClusterId())
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java
index b4226cfd..f7c91fb5 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java
@@ -76,6 +76,19 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class DynamicKafkaSourceReaderTest extends
SourceReaderTestBase<DynamicKafkaSourceSplit> {
private static final String TOPIC = "DynamicKafkaSourceReaderTest";
+ private static class TrackingReaderOutput<E> extends
TestingReaderOutput<E> {
+ private final List<String> releasedSplitIds = new ArrayList<>();
+
+ @Override
+ public void releaseOutputForSplit(String splitId) {
+ releasedSplitIds.add(splitId);
+ }
+
+ private List<String> releasedSplitIds() {
+ return releasedSplitIds;
+ }
+ }
+
// we are testing two clusters and SourceReaderTestBase expects there to
be a total of 10 splits
private static final int NUM_SPLITS_PER_CLUSTER = 5;
@@ -122,9 +135,59 @@ public class DynamicKafkaSourceReaderTest extends
SourceReaderTestBase<DynamicKa
splits.stream()
.filter(split ->
!split.getKafkaClusterId().equals(kafkaClusterId0))
.collect(Collectors.toList());
+ List<String> cluster0SplitIds =
+ splits.stream()
+ .filter(split ->
split.getKafkaClusterId().equals(kafkaClusterId0))
+ .map(DynamicKafkaSourceSplit::splitId)
+ .collect(Collectors.toList());
assertThat(reader.snapshotState(-1))
.as("The splits should not contain any split related to
cluster 0")
.containsExactlyInAnyOrderElementsOf(splitsWithoutCluster0);
+
+ TrackingReaderOutput<Integer> readerOutput = new
TrackingReaderOutput<>();
+ assertThat(readerOutput.releasedSplitIds()).isEmpty();
+ reader.pollNext(readerOutput);
+ assertThat(readerOutput.releasedSplitIds())
+ .as(
+ "Pending restored split outputs should be released
once ReaderOutput exists")
+ .containsExactlyInAnyOrderElementsOf(cluster0SplitIds);
+ }
+ }
+
+ @Test
+ void testMetadataRemovalReleasesAssignedSplitOutputs() throws Exception {
+ TestingReaderContext context = new TestingReaderContext();
+ try (DynamicKafkaSourceReader<Integer> reader =
createReaderWithoutStart(context)) {
+ reader.start();
+
reader.handleSourceEvents(DynamicKafkaSourceTestHelper.getMetadataUpdateEvent(TOPIC));
+
+ DynamicKafkaSourceSplit cluster0Split =
+ new DynamicKafkaSourceSplit(
+ kafkaClusterId0,
+ new KafkaPartitionSplit(
+ new TopicPartition(TOPIC, 0),
+ 0L,
+ KafkaPartitionSplit.NO_STOPPING_OFFSET));
+ DynamicKafkaSourceSplit cluster1Split =
+ new DynamicKafkaSourceSplit(
+ kafkaClusterId1,
+ new KafkaPartitionSplit(
+ new TopicPartition(TOPIC, 0),
+ 0L,
+ KafkaPartitionSplit.NO_STOPPING_OFFSET));
+ reader.addSplits(ImmutableList.of(cluster0Split, cluster1Split));
+
+ TrackingReaderOutput<Integer> readerOutput = new
TrackingReaderOutput<>();
+ reader.pollNext(readerOutput);
+
+ KafkaStream kafkaStream =
DynamicKafkaSourceTestHelper.getKafkaStream(TOPIC);
+ kafkaStream.getClusterMetadataMap().remove(kafkaClusterId0);
+ reader.handleSourceEvents(new
MetadataUpdateEvent(Collections.singleton(kafkaStream)));
+
+ assertThat(readerOutput.releasedSplitIds())
+ .as(
+ "Removed assigned splits should stop contributing
to split-local watermarks")
+ .containsExactly(cluster0Split.splitId());
}
}