This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b0bfe8b044d3414138be784e710f68bc3f998f6e
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Tue Sep 22 11:43:58 2020 +0200

    [backport] Relevant test mock changes from "[FLINK-18907][test] Refactor 
MockSourceReader"
    
    Make the synchronisation around availability easier to understand.
---
 .../connector/source/mocks/MockSourceReader.java   | 53 +++++++++++++---------
 1 file changed, 32 insertions(+), 21 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
index bdccf88..d474e1f 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
@@ -23,28 +23,30 @@ import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.core.io.InputStatus;
 
+import javax.annotation.concurrent.GuardedBy;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A mock {@link SourceReader} for unit tests.
  */
 public class MockSourceReader implements SourceReader<Integer, 
MockSourceSplit> {
-       private final AtomicReference<CompletableFuture<Void>> availableRef;
-       private List<MockSourceSplit> assignedSplits;
-       private List<SourceEvent> receivedSourceEvents;
+       private final List<MockSourceSplit> assignedSplits = new ArrayList<>();
+       private final List<SourceEvent> receivedSourceEvents = new 
ArrayList<>();
+
        private int currentSplitIndex = 0;
        private boolean started;
        private boolean closed;
 
+       @GuardedBy("this")
+       private CompletableFuture<Void> availableFuture;
+
        public MockSourceReader() {
-               this.assignedSplits = new ArrayList<>();
-               this.receivedSourceEvents = new ArrayList<>();
                this.started = false;
                this.closed = false;
-               this.availableRef = new AtomicReference<>();
+               this.availableFuture = CompletableFuture.completedFuture(null);
        }
 
        @Override
@@ -66,9 +68,13 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
                if (currentSplitIndex < assignedSplits.size()) {
                        
sourceOutput.collect(assignedSplits.get(currentSplitIndex).getNext(false)[0]);
                        return InputStatus.MORE_AVAILABLE;
-               } else {
+               } else if (finished) {
                        // In case no split has available record, return 
depending on whether all the splits has finished.
-                       return finished ? InputStatus.END_OF_INPUT : 
InputStatus.NOTHING_AVAILABLE;
+                       return InputStatus.END_OF_INPUT;
+               }
+               else {
+                       markUnavailable();
+                       return InputStatus.NOTHING_AVAILABLE;
                }
        }
 
@@ -78,19 +84,14 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
        }
 
        @Override
-       public CompletableFuture<Void> isAvailable() {
-               if (currentSplitIndex >= assignedSplits.size()) {
-                       CompletableFuture<Void> future = new 
CompletableFuture<>();
-                       availableRef.compareAndSet(null, future);
-                       return availableRef.get();
-               } else {
-                       return CompletableFuture.completedFuture(null);
-               }
+       public synchronized CompletableFuture<Void> isAvailable() {
+               return availableFuture;
        }
 
        @Override
        public void addSplits(List<MockSourceSplit> splits) {
                assignedSplits.addAll(splits);
+               markAvailable();
        }
 
        @Override
@@ -103,13 +104,23 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
                this.closed = true;
        }
 
+       private synchronized void markUnavailable() {
+               if (availableFuture.isDone()) {
+                       availableFuture = new CompletableFuture<>();
+               }
+       }
+
        // --------------- methods for unit tests ---------------
 
        public void markAvailable() {
-               CompletableFuture<Void> future = availableRef.get();
-               if (future != null) {
-                       future.complete(null);
-                       availableRef.set(null);
+               CompletableFuture<?> toNotify = null;
+               synchronized (this) {
+                       if (!availableFuture.isDone()) {
+                               toNotify =  availableFuture;
+                       }
+               }
+               if (toNotify != null) {
+                       toNotify.complete(null);
                }
        }
 

Reply via email to