This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new e5570e3e33a [FLINK-28817][connector/common] NullPointerException in 
HybridSource when restoring from checkpoint
e5570e3e33a is described below

commit e5570e3e33ac33fd1b31d38c86ac6a291e7bc47e
Author: Qishang Zhong <zhongqish...@gmail.com>
AuthorDate: Sat Aug 13 11:21:58 2022 +0800

    [FLINK-28817][connector/common] NullPointerException in HybridSource when 
restoring from checkpoint
---
 .../source/hybrid/HybridSourceSplitEnumerator.java    |  6 +++++-
 .../connector/base/source/hybrid/SwitchedSources.java |  8 +++++++-
 .../hybrid/HybridSourceSplitEnumeratorTest.java       | 19 +++++++++++++++++++
 3 files changed, 31 insertions(+), 2 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
index d27de221af8..61baabeb941 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
@@ -218,7 +218,11 @@ public class HybridSourceSplitEnumerator
             }
 
             if (subtaskSourceIndex < currentSourceIndex) {
-                subtaskSourceIndex++;
+                // find initial or next index for the reader
+                subtaskSourceIndex =
+                        subtaskSourceIndex == -1
+                                ? switchedSources.getFirstSourceIndex()
+                                : ++subtaskSourceIndex;
                 sendSwitchSourceEvent(subtaskId, subtaskSourceIndex);
                 return;
             }
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchedSources.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchedSources.java
index 7911612d258..68128d1faed 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchedSources.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchedSources.java
@@ -25,10 +25,12 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
 
 /** Sources that participated in switching with cached serializers. */
 class SwitchedSources {
-    private final Map<Integer, Source> sources = new HashMap<>();
+    private final SortedMap<Integer, Source> sources = new TreeMap<>();
     private final Map<Integer, SimpleVersionedSerializer<SourceSplit>> 
cachedSerializers =
             new HashMap<>();
 
@@ -45,4 +47,8 @@ class SwitchedSources {
     public void put(int sourceIndex, Source source) {
         sources.put(sourceIndex, Preconditions.checkNotNull(source));
     }
+
+    public int getFirstSourceIndex() {
+        return sources.firstKey();
+    }
 }
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
index 7bcf69c5e72..8b5cb096586 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
@@ -182,6 +182,25 @@ public class HybridSourceSplitEnumeratorTest {
                 Matchers.iterableWithSize(1));
     }
 
+    @Test
+    public void testRestoreEnumeratorAfterFirstSourceWithoutRestoredSplits() 
throws Exception {
+        setupEnumeratorAndTriggerSourceSwitch();
+        HybridSourceEnumeratorState enumeratorState = 
enumerator.snapshotState(0);
+        MockSplitEnumerator underlyingEnumerator = 
getCurrentEnumerator(enumerator);
+        Assert.assertThat(
+                (List<MockSourceSplit>) 
Whitebox.getInternalState(underlyingEnumerator, "splits"),
+                Matchers.iterableWithSize(0));
+        enumerator =
+                (HybridSourceSplitEnumerator) 
source.restoreEnumerator(context, enumeratorState);
+        enumerator.start();
+        // subtask starts at -1 since it has no splits after restore
+        enumerator.handleSourceEvent(SUBTASK0, new 
SourceReaderFinishedEvent(-1));
+        underlyingEnumerator = getCurrentEnumerator(enumerator);
+        Assert.assertThat(
+                (List<MockSourceSplit>) 
Whitebox.getInternalState(underlyingEnumerator, "splits"),
+                Matchers.iterableWithSize(0));
+    }
+
     @Test
     public void testDefaultMethodDelegation() throws Exception {
         setupEnumeratorAndTriggerSourceSwitch();

Reply via email to