This is an automated email from the ASF dual-hosted git repository.

xccui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cedb558bba5 [FLINK-37308] Support pauseOrResumeSplits in HybridSource 
(#26483)
cedb558bba5 is described below

commit cedb558bba5388c727ab9ce4778d8d2a1c2bb6aa
Author: Xingcan Cui <[email protected]>
AuthorDate: Wed Nov 5 11:52:19 2025 -0500

    [FLINK-37308] Support pauseOrResumeSplits in HybridSource (#26483)
---
 .../base/source/hybrid/HybridSourceReader.java     |  7 ++++++
 .../base/source/hybrid/HybridSourceReaderTest.java | 27 ++++++++++++++++++++++
 2 files changed, 34 insertions(+)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
index 2f113078fe3..18c049f7c05 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -247,4 +248,10 @@ public class HybridSourceReader<T> implements 
SourceReader<T, HybridSourceSplit>
             addSplits(splits);
         }
     }
+
+    @Override
+    public void pauseOrResumeSplits(
+            Collection<String> splitsToPause, Collection<String> 
splitsToResume) {
+        currentReader.pauseOrResumeSplits(splitsToPause, splitsToResume);
+    }
 }
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
index d1acc86e636..7cf1a63fa9b 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
@@ -310,6 +310,33 @@ class HybridSourceReaderTest {
         reader.close();
     }
 
+    @Test
+    void testPauseResumeCurrentSourceSplits() throws Exception {
+        TestingReaderContext readerContext = new TestingReaderContext();
+        MockBaseSource source =
+                new MockBaseSource(1, 1, Boundedness.BOUNDED) {
+                    @Override
+                    public SourceReader<Integer, MockSourceSplit> createReader(
+                            SourceReaderContext readerContext) {
+                        return Mockito.spy(super.createReader(readerContext));
+                    }
+                };
+
+        HybridSourceReader<Integer> reader = new 
HybridSourceReader<>(readerContext);
+
+        reader.start();
+        assertAndClearSourceReaderFinishedEvent(readerContext, -1);
+        reader.handleSourceEvents(new SwitchSourceEvent(0, source, false));
+        SourceReader<Integer, MockSourceSplit> underlyingReader = 
currentReader(reader);
+
+        reader.pauseOrResumeSplits(
+                Collections.singletonList("foo"), 
Collections.singletonList("bar"));
+        Mockito.verify(underlyingReader)
+                .pauseOrResumeSplits(
+                        Collections.singletonList("foo"), 
Collections.singletonList("bar"));
+        reader.close();
+    }
+
     private static SourceReader<Integer, MockSourceSplit> currentReader(
             HybridSourceReader<?> reader) {
         return Whitebox.getInternalState(reader, "currentReader");

Reply via email to