This is an automated email from the ASF dual-hosted git repository.
xccui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new cedb558bba5 [FLINK-37308] Support pauseOrResumeSplits in HybridSource
(#26483)
cedb558bba5 is described below
commit cedb558bba5388c727ab9ce4778d8d2a1c2bb6aa
Author: Xingcan Cui <[email protected]>
AuthorDate: Wed Nov 5 11:52:19 2025 -0500
[FLINK-37308] Support pauseOrResumeSplits in HybridSource (#26483)
---
.../base/source/hybrid/HybridSourceReader.java | 7 ++++++
.../base/source/hybrid/HybridSourceReaderTest.java | 27 ++++++++++++++++++++++
2 files changed, 34 insertions(+)
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
index 2f113078fe3..18c049f7c05 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -247,4 +248,10 @@ public class HybridSourceReader<T> implements
SourceReader<T, HybridSourceSplit>
addSplits(splits);
}
}
+
+ @Override
+ public void pauseOrResumeSplits(
+ Collection<String> splitsToPause, Collection<String>
splitsToResume) {
+ currentReader.pauseOrResumeSplits(splitsToPause, splitsToResume);
+ }
}
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
index d1acc86e636..7cf1a63fa9b 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
@@ -310,6 +310,33 @@ class HybridSourceReaderTest {
reader.close();
}
+ @Test
+ void testPauseResumeCurrentSourceSplits() throws Exception {
+ TestingReaderContext readerContext = new TestingReaderContext();
+ MockBaseSource source =
+ new MockBaseSource(1, 1, Boundedness.BOUNDED) {
+ @Override
+ public SourceReader<Integer, MockSourceSplit> createReader(
+ SourceReaderContext readerContext) {
+ return Mockito.spy(super.createReader(readerContext));
+ }
+ };
+
+ HybridSourceReader<Integer> reader = new
HybridSourceReader<>(readerContext);
+
+ reader.start();
+ assertAndClearSourceReaderFinishedEvent(readerContext, -1);
+ reader.handleSourceEvents(new SwitchSourceEvent(0, source, false));
+ SourceReader<Integer, MockSourceSplit> underlyingReader =
currentReader(reader);
+
+ reader.pauseOrResumeSplits(
+ Collections.singletonList("foo"),
Collections.singletonList("bar"));
+ Mockito.verify(underlyingReader)
+ .pauseOrResumeSplits(
+ Collections.singletonList("foo"),
Collections.singletonList("bar"));
+ reader.close();
+ }
+
private static SourceReader<Integer, MockSourceSplit> currentReader(
HybridSourceReader<?> reader) {
return Whitebox.getInternalState(reader, "currentReader");