This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
     new 013b337329f [FLINK-33360][connector/common] Clean up finishedReaders 
after switch to next Enumerator
013b337329f is described below

commit 013b337329ff4732e62d631cac8429cb68dd4776
Author: fengjiajie <laputafa...@gmail.com>
AuthorDate: Tue Oct 31 15:20:52 2023 +0800

    [FLINK-33360][connector/common] Clean up finishedReaders after switch to 
next Enumerator
    
    This closes #23616.
---
 .../source/hybrid/HybridSourceSplitEnumerator.java |  1 +
 .../hybrid/HybridSourceSplitEnumeratorTest.java    | 36 ++++++++++++++++++++++
 2 files changed, 37 insertions(+)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
index b1eeec1327c..df8aaf67d95 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
@@ -259,6 +259,7 @@ public class HybridSourceSplitEnumerator
         if (currentEnumerator != null) {
             try {
                 currentEnumerator.close();
+                finishedReaders.clear();
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
index fcde32811f4..8b068d645b6 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
@@ -252,6 +252,42 @@ public class HybridSourceSplitEnumeratorTest {
         assertThat(context.hasNoMoreSplits(0)).isTrue();
     }
 
+    @Test
+    public void testMultiSubtaskSwitchEnumerator() {
+        context = new MockSplitEnumeratorContext<>(2);
+        source =
+                HybridSource.builder(MOCK_SOURCE)
+                        .addSource(MOCK_SOURCE)
+                        .addSource(MOCK_SOURCE)
+                        .build();
+
+        enumerator = (HybridSourceSplitEnumerator) 
source.createEnumerator(context);
+        enumerator.start();
+
+        registerReader(context, enumerator, SUBTASK0);
+        registerReader(context, enumerator, SUBTASK1);
+        enumerator.handleSourceEvent(SUBTASK0, new 
SourceReaderFinishedEvent(-1));
+        enumerator.handleSourceEvent(SUBTASK1, new 
SourceReaderFinishedEvent(-1));
+
+        assertThat(getCurrentSourceIndex(enumerator)).isEqualTo(0);
+        enumerator.handleSourceEvent(SUBTASK0, new 
SourceReaderFinishedEvent(0));
+        assertThat(getCurrentSourceIndex(enumerator)).isEqualTo(0);
+        enumerator.handleSourceEvent(SUBTASK1, new 
SourceReaderFinishedEvent(0));
+        assertThat(getCurrentSourceIndex(enumerator))
+                .as("all reader finished source-0")
+                .isEqualTo(1);
+
+        enumerator.handleSourceEvent(SUBTASK0, new 
SourceReaderFinishedEvent(1));
+        assertThat(getCurrentSourceIndex(enumerator))
+                .as(
+                        "only reader-0 has finished reading, reader-1 is not 
yet done, so do not switch to the next source")
+                .isEqualTo(1);
+        enumerator.handleSourceEvent(SUBTASK1, new 
SourceReaderFinishedEvent(1));
+        assertThat(getCurrentSourceIndex(enumerator))
+                .as("all reader finished source-1")
+                .isEqualTo(2);
+    }
+
     private static class UnderlyingEnumeratorWrapper
             implements SplitEnumerator<MockSourceSplit, Object> {
         private static final MockSourceSplit SPLIT_1 = new MockSourceSplit(0, 
0, 1);

Reply via email to