This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push: new e5570e3e33a [FLINK-28817][connector/common] NullPointerException in HybridSource when restoring from checkpoint e5570e3e33a is described below commit e5570e3e33ac33fd1b31d38c86ac6a291e7bc47e Author: Qishang Zhong <zhongqish...@gmail.com> AuthorDate: Sat Aug 13 11:21:58 2022 +0800 [FLINK-28817][connector/common] NullPointerException in HybridSource when restoring from checkpoint --- .../source/hybrid/HybridSourceSplitEnumerator.java | 6 +++++- .../connector/base/source/hybrid/SwitchedSources.java | 8 +++++++- .../hybrid/HybridSourceSplitEnumeratorTest.java | 19 +++++++++++++++++++ 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java index d27de221af8..61baabeb941 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java @@ -218,7 +218,11 @@ public class HybridSourceSplitEnumerator } if (subtaskSourceIndex < currentSourceIndex) { - subtaskSourceIndex++; + // find initial or next index for the reader + subtaskSourceIndex = + subtaskSourceIndex == -1 + ? switchedSources.getFirstSourceIndex() + : ++subtaskSourceIndex; sendSwitchSourceEvent(subtaskId, subtaskSourceIndex); return; } diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchedSources.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchedSources.java index 7911612d258..68128d1faed 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchedSources.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchedSources.java @@ -25,10 +25,12 @@ import org.apache.flink.util.Preconditions; import java.util.HashMap; import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; /** Sources that participated in switching with cached serializers. */ class SwitchedSources { - private final Map<Integer, Source> sources = new HashMap<>(); + private final SortedMap<Integer, Source> sources = new TreeMap<>(); private final Map<Integer, SimpleVersionedSerializer<SourceSplit>> cachedSerializers = new HashMap<>(); @@ -45,4 +47,8 @@ class SwitchedSources { public void put(int sourceIndex, Source source) { sources.put(sourceIndex, Preconditions.checkNotNull(source)); } + + public int getFirstSourceIndex() { + return sources.firstKey(); + } } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java index 7bcf69c5e72..8b5cb096586 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java @@ -182,6 +182,25 @@ public class HybridSourceSplitEnumeratorTest { Matchers.iterableWithSize(1)); } + @Test + public void testRestoreEnumeratorAfterFirstSourceWithoutRestoredSplits() throws Exception { + setupEnumeratorAndTriggerSourceSwitch(); + HybridSourceEnumeratorState enumeratorState = enumerator.snapshotState(0); + MockSplitEnumerator underlyingEnumerator = getCurrentEnumerator(enumerator); + Assert.assertThat( + (List<MockSourceSplit>) Whitebox.getInternalState(underlyingEnumerator, "splits"), + Matchers.iterableWithSize(0)); + enumerator = + (HybridSourceSplitEnumerator) source.restoreEnumerator(context, enumeratorState); + enumerator.start(); + // subtask starts at -1 since it has no splits after restore + enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(-1)); + underlyingEnumerator = getCurrentEnumerator(enumerator); + Assert.assertThat( + (List<MockSourceSplit>) Whitebox.getInternalState(underlyingEnumerator, "splits"), + Matchers.iterableWithSize(0)); + } + @Test public void testDefaultMethodDelegation() throws Exception { setupEnumeratorAndTriggerSourceSwitch();