This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 791c1b9  [FLINK-24300] SourceOperator#getAvailableFuture reuses future
791c1b9 is described below

commit 791c1b9c0816a0a990c2e705b0cafc474fef97ca
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Thu Sep 16 14:12:04 2021 +0200

    [FLINK-24300] SourceOperator#getAvailableFuture reuses future
    
    Callers of SourceOperator#getAvailableFuture might call the method
    multiple times even if the returned future does not complete. Before the
    commit each we were creating a new combined future from the
    SourceReader#isAvailable and the forcedStop one. If the underlying
    SourceReader#isAvailable has not changed this operation is unnecessary.
    What is even worse each such operation adds another entry onto the
    source reader's availability future stack which caused performance
    regression.
    
    The commit reuses the combined future if the underlying
    SourceReader#isAvailable future has not changed.
    
    This closes #17303
---
 .../streaming/api/operators/SourceOperator.java    | 33 ++++++++++++++++++----
 .../api/operators/SourceOperatorTest.java          | 12 ++++++++
 2 files changed, 39 insertions(+), 6 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index bcfb639..72ae6cd 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -137,7 +137,8 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
     private OperatingMode operatingMode;
 
     private final CompletableFuture<Void> finished = new CompletableFuture<>();
-    private final CompletableFuture<Void> forcedStop = new 
CompletableFuture<>();
+    private final SourceOperatorAvailabilityHelper availabilityHelper =
+            new SourceOperatorAvailabilityHelper();
 
     private enum OperatingMode {
         READING,
@@ -312,7 +313,7 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
             case OUTPUT_NOT_INITIALIZED:
             case READING:
                 this.operatingMode = OperatingMode.SOURCE_STOPPED;
-                forcedStop.complete(null);
+                availabilityHelper.forceStop();
                 break;
         }
         return finished;
@@ -391,10 +392,7 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
         switch (operatingMode) {
             case OUTPUT_NOT_INITIALIZED:
             case READING:
-                CompletableFuture<Void> sourceReaderAvailable = 
sourceReader.isAvailable();
-                return sourceReaderAvailable == AvailabilityProvider.AVAILABLE
-                        ? sourceReaderAvailable
-                        : CompletableFuture.anyOf(sourceReaderAvailable, 
forcedStop);
+                return availabilityHelper.update(sourceReader.isAvailable());
             case SOURCE_STOPPED:
             case DATA_FINISHED:
                 return AvailabilityProvider.AVAILABLE;
@@ -457,4 +455,27 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
     ListState<SplitT> getReaderState() {
         return readerState;
     }
+
+    private static class SourceOperatorAvailabilityHelper {
+        private final CompletableFuture<Void> forcedStopFuture = new 
CompletableFuture<>();
+        private CompletableFuture<Void> currentReaderFuture;
+        private CompletableFuture<?> currentCombinedFuture;
+
+        public CompletableFuture<?> update(CompletableFuture<Void> 
sourceReaderFuture) {
+            if (sourceReaderFuture == AvailabilityProvider.AVAILABLE) {
+                return sourceReaderFuture;
+            } else if (sourceReaderFuture == currentReaderFuture) {
+                return currentCombinedFuture;
+            } else {
+                currentReaderFuture = sourceReaderFuture;
+                currentCombinedFuture =
+                        CompletableFuture.anyOf(forcedStopFuture, 
sourceReaderFuture);
+                return currentCombinedFuture;
+            }
+        }
+
+        public void forceStop() {
+            this.forcedStopFuture.complete(null);
+        }
+    }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 3bf4671..aa0fb7e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.AvailabilityProvider;
 import 
org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -61,6 +62,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -211,6 +215,14 @@ public class SourceOperatorTest {
         assertEquals(100L, (long) 
mockSourceReader.getAbortedCheckpoints().get(0));
     }
 
+    @Test
+    public void testSameAvailabilityFuture() {
+        final CompletableFuture<?> initialFuture = 
operator.getAvailableFuture();
+        final CompletableFuture<?> secondFuture = 
operator.getAvailableFuture();
+        assertThat(initialFuture, 
not(sameInstance(AvailabilityProvider.AVAILABLE)));
+        assertThat(secondFuture, sameInstance(initialFuture));
+    }
+
     // ---------------- helper methods -------------------------
 
     private StateInitializationContext getStateContext() throws Exception {

Reply via email to