This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit b0bfe8b044d3414138be784e710f68bc3f998f6e Author: Piotr Nowojski <piotr.nowoj...@gmail.com> AuthorDate: Tue Sep 22 11:43:58 2020 +0200 [backport] Relevant test mock changes from "[FLINK-18907][test] Refactor MockSourceReader" Make the synchronisation around availability easier to understand. --- .../connector/source/mocks/MockSourceReader.java | 53 +++++++++++++--------- 1 file changed, 32 insertions(+), 21 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java index bdccf88..d474e1f 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java @@ -23,28 +23,30 @@ import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.core.io.InputStatus; +import javax.annotation.concurrent.GuardedBy; + import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicReference; /** * A mock {@link SourceReader} for unit tests. */ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> { - private final AtomicReference<CompletableFuture<Void>> availableRef; - private List<MockSourceSplit> assignedSplits; - private List<SourceEvent> receivedSourceEvents; + private final List<MockSourceSplit> assignedSplits = new ArrayList<>(); + private final List<SourceEvent> receivedSourceEvents = new ArrayList<>(); + private int currentSplitIndex = 0; private boolean started; private boolean closed; + @GuardedBy("this") + private CompletableFuture<Void> availableFuture; + public MockSourceReader() { - this.assignedSplits = new ArrayList<>(); - this.receivedSourceEvents = new ArrayList<>(); this.started = false; this.closed = false; - this.availableRef = new AtomicReference<>(); + this.availableFuture = CompletableFuture.completedFuture(null); } @Override @@ -66,9 +68,13 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> if (currentSplitIndex < assignedSplits.size()) { sourceOutput.collect(assignedSplits.get(currentSplitIndex).getNext(false)[0]); return InputStatus.MORE_AVAILABLE; - } else { + } else if (finished) { // In case no split has available record, return depending on whether all the splits has finished. - return finished ? InputStatus.END_OF_INPUT : InputStatus.NOTHING_AVAILABLE; + return InputStatus.END_OF_INPUT; + } + else { + markUnavailable(); + return InputStatus.NOTHING_AVAILABLE; } } @@ -78,19 +84,14 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> } @Override - public CompletableFuture<Void> isAvailable() { - if (currentSplitIndex >= assignedSplits.size()) { - CompletableFuture<Void> future = new CompletableFuture<>(); - availableRef.compareAndSet(null, future); - return availableRef.get(); - } else { - return CompletableFuture.completedFuture(null); - } + public synchronized CompletableFuture<Void> isAvailable() { + return availableFuture; } @Override public void addSplits(List<MockSourceSplit> splits) { assignedSplits.addAll(splits); + markAvailable(); } @Override @@ -103,13 +104,23 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> this.closed = true; } + private synchronized void markUnavailable() { + if (availableFuture.isDone()) { + availableFuture = new CompletableFuture<>(); + } + } + // --------------- methods for unit tests --------------- public void markAvailable() { - CompletableFuture<Void> future = availableRef.get(); - if (future != null) { - future.complete(null); - availableRef.set(null); + CompletableFuture<?> toNotify = null; + synchronized (this) { + if (!availableFuture.isDone()) { + toNotify = availableFuture; + } + } + if (toNotify != null) { + toNotify.complete(null); } }