This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push: new 8c293c7 [FLINK-24010][connector/common] HybridSourceReader/Enumerator delegate checkpoint notifications 8c293c7 is described below commit 8c293c7e03bf9c5221ae50843a3af445276afa24 Author: Thomas Weise <t...@apache.org> AuthorDate: Thu Aug 26 14:22:01 2021 -0700 [FLINK-24010][connector/common] HybridSourceReader/Enumerator delegate checkpoint notifications --- .../base/source/hybrid/HybridSourceReader.java | 14 +++++++++ .../source/hybrid/HybridSourceSplitEnumerator.java | 10 +++++++ .../base/source/hybrid/HybridSourceReaderTest.java | 34 ++++++++++++++++++++++ .../hybrid/HybridSourceSplitEnumeratorTest.java | 21 +++++++++++++ 4 files changed, 79 insertions(+) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java index cdd2e8b..28d4011 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java @@ -121,6 +121,20 @@ public class HybridSourceReader<T> implements SourceReader<T, HybridSourceSplit> } @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + if (currentReader != null) { + currentReader.notifyCheckpointComplete(checkpointId); + } + } + + @Override + public void notifyCheckpointAborted(long checkpointId) throws Exception { + if (currentReader != null) { + currentReader.notifyCheckpointAborted(checkpointId); + } + } + + @Override public CompletableFuture<Void> isAvailable() { return availabilityFuture; } diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java index 5a2c010..0f2b036 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java @@ -176,6 +176,16 @@ public class HybridSourceSplitEnumerator } @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + currentEnumerator.notifyCheckpointComplete(checkpointId); + } + + @Override + public void notifyCheckpointAborted(long checkpointId) throws Exception { + currentEnumerator.notifyCheckpointAborted(checkpointId); + } + + @Override public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { LOG.debug( "handleSourceEvent {} subtask={} pendingSplits={}", diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java index 3bf77f3..7882333 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java @@ -33,6 +33,7 @@ import org.apache.flink.mock.Whitebox; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import java.util.Collections; import java.util.HashMap; @@ -165,6 +166,39 @@ public class HybridSourceReaderTest { reader.close(); } + @Test + public void testDefaultMethodDelegation() throws Exception { + TestingReaderContext readerContext = new TestingReaderContext(); + TestingReaderOutput<Integer> readerOutput = new TestingReaderOutput<>(); + MockBaseSource source = + new MockBaseSource(1, 1, Boundedness.BOUNDED) { + @Override + public SourceReader<Integer, MockSourceSplit> createReader( + SourceReaderContext readerContext) { + return Mockito.spy(super.createReader(readerContext)); + } + }; + + Map<Integer, Source> switchedSources = new HashMap<>(); + + HybridSourceReader<Integer> reader = + new HybridSourceReader<>(readerContext, switchedSources); + + reader.start(); + assertAndClearSourceReaderFinishedEvent(readerContext, -1); + reader.handleSourceEvents(new SwitchSourceEvent(0, source, false)); + Assert.assertEquals(source, switchedSources.get(0)); + SourceReader<Integer, MockSourceSplit> underlyingReader = currentReader(reader); + + reader.notifyCheckpointComplete(1); + Mockito.verify(underlyingReader).notifyCheckpointComplete(1); + + reader.notifyCheckpointAborted(1); + Mockito.verify(underlyingReader).notifyCheckpointAborted(1); + + reader.close(); + } + private static SourceReader<Integer, MockSourceSplit> currentReader( HybridSourceReader<?> reader) { return (SourceReader) Whitebox.getInternalState(reader, "currentReader"); diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java index 2a8fd57..59a1836 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java @@ -32,6 +32,7 @@ import org.apache.flink.mock.Whitebox; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import java.io.IOException; import java.util.ArrayList; @@ -176,6 +177,26 @@ public class HybridSourceSplitEnumeratorTest { Assert.assertEquals(1, ((List) enumeratorState.getWrappedState()).size()); } + @Test + public void testDefaultMethodDelegation() throws Exception { + setupEnumeratorAndTriggerSourceSwitch(); + SplitEnumerator<MockSourceSplit, Object> underlyingEnumeratorSpy = + Mockito.spy( + (SplitEnumerator<MockSourceSplit, Object>) + Whitebox.getInternalState(enumerator, "currentEnumerator")); + Whitebox.setInternalState(enumerator, "currentEnumerator", underlyingEnumeratorSpy); + + enumerator.notifyCheckpointComplete(1); + Mockito.verify(underlyingEnumeratorSpy).notifyCheckpointComplete(1); + + enumerator.notifyCheckpointAborted(2); + Mockito.verify(underlyingEnumeratorSpy).notifyCheckpointAborted(2); + + SwitchSourceEvent se = new SwitchSourceEvent(0, null, false); + enumerator.handleSourceEvent(0, se); + Mockito.verify(underlyingEnumeratorSpy).handleSourceEvent(0, se); + } + private static class UnderlyingEnumeratorWrapper implements SplitEnumerator<MockSourceSplit, Object> { private static final MockSourceSplit SPLIT_1 = new MockSourceSplit(0, 0, 1);