This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch v1.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git

commit 723b34a56012bf857a5e7da540adfe12a44bfe4c
Author: Jiabao Sun <jiabao....@xtransfer.cn>
AuthorDate: Wed Jul 12 17:12:06 2023 +0800

    [FLINK-32348][connectors/mongodb][tests] Fix MongoDB tests are flaky and 
time out
    
    This closes  #13.
---
 .../mongodb/source/enumerator/MongoSourceEnumerator.java    | 13 +++++++++----
 .../source/reader/split/MongoScanSourceSplitReader.java     |  1 -
 2 files changed, 9 insertions(+), 5 deletions(-)

diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/MongoSourceEnumerator.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/MongoSourceEnumerator.java
index d13a843..951c527 100644
--- 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/MongoSourceEnumerator.java
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/MongoSourceEnumerator.java
@@ -97,6 +97,15 @@ public class MongoSourceEnumerator
                 continue;
             }
 
+            // close idle readers
+            if (splitAssigner.noMoreSplits() && boundedness == 
Boundedness.BOUNDED) {
+                context.signalNoMoreSplits(nextAwaiting);
+                awaitingReader.remove();
+                LOG.info(
+                        "All scan splits have been assigned, closing idle 
reader {}", nextAwaiting);
+                continue;
+            }
+
             Optional<MongoSourceSplit> split = splitAssigner.getNext();
             if (split.isPresent()) {
                 final MongoSourceSplit mongoSplit = split.get();
@@ -104,10 +113,6 @@ public class MongoSourceEnumerator
                 awaitingReader.remove();
                 LOG.info("Assign split {} to subtask {}", mongoSplit, 
nextAwaiting);
                 break;
-            } else if (splitAssigner.noMoreSplits() && boundedness == 
Boundedness.BOUNDED) {
-                LOG.info("All splits have been assigned");
-                
context.registeredReaders().keySet().forEach(context::signalNoMoreSplits);
-                break;
             } else {
                 // there is no available splits by now, skip assigning
                 break;
diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java
index 4702f94..134fe73 100644
--- 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java
@@ -119,7 +119,6 @@ public class MongoScanSourceSplitReader implements 
MongoSourceSplitReader<MongoS
             throw new IOException("Scan records form MongoDB failed", e);
         } finally {
             if (finished) {
-                currentSplit = null;
                 closeCursor();
             }
         }

Reply via email to