This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 8c293c7  [FLINK-24010][connector/common] HybridSourceReader/Enumerator 
delegate checkpoint notifications
8c293c7 is described below

commit 8c293c7e03bf9c5221ae50843a3af445276afa24
Author: Thomas Weise <t...@apache.org>
AuthorDate: Thu Aug 26 14:22:01 2021 -0700

    [FLINK-24010][connector/common] HybridSourceReader/Enumerator delegate 
checkpoint notifications
---
 .../base/source/hybrid/HybridSourceReader.java     | 14 +++++++++
 .../source/hybrid/HybridSourceSplitEnumerator.java | 10 +++++++
 .../base/source/hybrid/HybridSourceReaderTest.java | 34 ++++++++++++++++++++++
 .../hybrid/HybridSourceSplitEnumeratorTest.java    | 21 +++++++++++++
 4 files changed, 79 insertions(+)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
index cdd2e8b..28d4011 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
@@ -121,6 +121,20 @@ public class HybridSourceReader<T> implements 
SourceReader<T, HybridSourceSplit>
     }
 
     @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        if (currentReader != null) {
+            currentReader.notifyCheckpointComplete(checkpointId);
+        }
+    }
+
+    @Override
+    public void notifyCheckpointAborted(long checkpointId) throws Exception {
+        if (currentReader != null) {
+            currentReader.notifyCheckpointAborted(checkpointId);
+        }
+    }
+
+    @Override
     public CompletableFuture<Void> isAvailable() {
         return availabilityFuture;
     }
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
index 5a2c010..0f2b036 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
@@ -176,6 +176,16 @@ public class HybridSourceSplitEnumerator
     }
 
     @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        currentEnumerator.notifyCheckpointComplete(checkpointId);
+    }
+
+    @Override
+    public void notifyCheckpointAborted(long checkpointId) throws Exception {
+        currentEnumerator.notifyCheckpointAborted(checkpointId);
+    }
+
+    @Override
     public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
         LOG.debug(
                 "handleSourceEvent {} subtask={} pendingSplits={}",
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
index 3bf77f3..7882333 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.mock.Whitebox;
 import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -165,6 +166,39 @@ public class HybridSourceReaderTest {
         reader.close();
     }
 
+    @Test
+    public void testDefaultMethodDelegation() throws Exception {
+        TestingReaderContext readerContext = new TestingReaderContext();
+        TestingReaderOutput<Integer> readerOutput = new 
TestingReaderOutput<>();
+        MockBaseSource source =
+                new MockBaseSource(1, 1, Boundedness.BOUNDED) {
+                    @Override
+                    public SourceReader<Integer, MockSourceSplit> createReader(
+                            SourceReaderContext readerContext) {
+                        return Mockito.spy(super.createReader(readerContext));
+                    }
+                };
+
+        Map<Integer, Source> switchedSources = new HashMap<>();
+
+        HybridSourceReader<Integer> reader =
+                new HybridSourceReader<>(readerContext, switchedSources);
+
+        reader.start();
+        assertAndClearSourceReaderFinishedEvent(readerContext, -1);
+        reader.handleSourceEvents(new SwitchSourceEvent(0, source, false));
+        Assert.assertEquals(source, switchedSources.get(0));
+        SourceReader<Integer, MockSourceSplit> underlyingReader = 
currentReader(reader);
+
+        reader.notifyCheckpointComplete(1);
+        Mockito.verify(underlyingReader).notifyCheckpointComplete(1);
+
+        reader.notifyCheckpointAborted(1);
+        Mockito.verify(underlyingReader).notifyCheckpointAborted(1);
+
+        reader.close();
+    }
+
     private static SourceReader<Integer, MockSourceSplit> currentReader(
             HybridSourceReader<?> reader) {
         return (SourceReader) Whitebox.getInternalState(reader, 
"currentReader");
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
index 2a8fd57..59a1836 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.mock.Whitebox;
 import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -176,6 +177,26 @@ public class HybridSourceSplitEnumeratorTest {
         Assert.assertEquals(1, ((List) 
enumeratorState.getWrappedState()).size());
     }
 
+    @Test
+    public void testDefaultMethodDelegation() throws Exception {
+        setupEnumeratorAndTriggerSourceSwitch();
+        SplitEnumerator<MockSourceSplit, Object> underlyingEnumeratorSpy =
+                Mockito.spy(
+                        (SplitEnumerator<MockSourceSplit, Object>)
+                                Whitebox.getInternalState(enumerator, 
"currentEnumerator"));
+        Whitebox.setInternalState(enumerator, "currentEnumerator", 
underlyingEnumeratorSpy);
+
+        enumerator.notifyCheckpointComplete(1);
+        Mockito.verify(underlyingEnumeratorSpy).notifyCheckpointComplete(1);
+
+        enumerator.notifyCheckpointAborted(2);
+        Mockito.verify(underlyingEnumeratorSpy).notifyCheckpointAborted(2);
+
+        SwitchSourceEvent se = new SwitchSourceEvent(0, null, false);
+        enumerator.handleSourceEvent(0, se);
+        Mockito.verify(underlyingEnumeratorSpy).handleSourceEvent(0, se);
+    }
+
     private static class UnderlyingEnumeratorWrapper
             implements SplitEnumerator<MockSourceSplit, Object> {
         private static final MockSourceSplit SPLIT_1 = new MockSourceSplit(0, 
0, 1);

Reply via email to