This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch v1.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git
commit 723b34a56012bf857a5e7da540adfe12a44bfe4c Author: Jiabao Sun <jiabao....@xtransfer.cn> AuthorDate: Wed Jul 12 17:12:06 2023 +0800 [FLINK-32348][connectors/mongodb][tests] Fix MongoDB tests are flaky and time out This closes #13. --- .../mongodb/source/enumerator/MongoSourceEnumerator.java | 13 +++++++++---- .../source/reader/split/MongoScanSourceSplitReader.java | 1 - 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/MongoSourceEnumerator.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/MongoSourceEnumerator.java index d13a843..951c527 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/MongoSourceEnumerator.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/MongoSourceEnumerator.java @@ -97,6 +97,15 @@ public class MongoSourceEnumerator continue; } + // close idle readers + if (splitAssigner.noMoreSplits() && boundedness == Boundedness.BOUNDED) { + context.signalNoMoreSplits(nextAwaiting); + awaitingReader.remove(); + LOG.info( + "All scan splits have been assigned, closing idle reader {}", nextAwaiting); + continue; + } + Optional<MongoSourceSplit> split = splitAssigner.getNext(); if (split.isPresent()) { final MongoSourceSplit mongoSplit = split.get(); @@ -104,10 +113,6 @@ public class MongoSourceEnumerator awaitingReader.remove(); LOG.info("Assign split {} to subtask {}", mongoSplit, nextAwaiting); break; - } else if (splitAssigner.noMoreSplits() && boundedness == Boundedness.BOUNDED) { - LOG.info("All splits have been assigned"); - context.registeredReaders().keySet().forEach(context::signalNoMoreSplits); - break; } else { // there is no available splits by now, skip assigning break; diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java index 4702f94..134fe73 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java @@ -119,7 +119,6 @@ public class MongoScanSourceSplitReader implements MongoSourceSplitReader<MongoS throw new IOException("Scan records form MongoDB failed", e); } finally { if (finished) { - currentSplit = null; closeCursor(); } }