This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push: new 791c1b9 [FLINK-24300] SourceOperator#getAvailableFuture reuses future 791c1b9 is described below commit 791c1b9c0816a0a990c2e705b0cafc474fef97ca Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Thu Sep 16 14:12:04 2021 +0200 [FLINK-24300] SourceOperator#getAvailableFuture reuses future Callers of SourceOperator#getAvailableFuture might call the method multiple times even if the returned future does not complete. Before the commit each we were creating a new combined future from the SourceReader#isAvailable and the forcedStop one. If the underlying SourceReader#isAvailable has not changed this operation is unnecessary. What is even worse each such operation adds another entry onto the source reader's availability future stack which caused performance regression. The commit reuses the combined future if the underlying SourceReader#isAvailable future has not changed. This closes #17303 --- .../streaming/api/operators/SourceOperator.java | 33 ++++++++++++++++++---- .../api/operators/SourceOperatorTest.java | 12 ++++++++ 2 files changed, 39 insertions(+), 6 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index bcfb639..72ae6cd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -137,7 +137,8 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr private OperatingMode operatingMode; private final CompletableFuture<Void> finished = new CompletableFuture<>(); - private final CompletableFuture<Void> forcedStop = new CompletableFuture<>(); + private final SourceOperatorAvailabilityHelper availabilityHelper = + new SourceOperatorAvailabilityHelper(); private enum OperatingMode { READING, @@ -312,7 +313,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr case OUTPUT_NOT_INITIALIZED: case READING: this.operatingMode = OperatingMode.SOURCE_STOPPED; - forcedStop.complete(null); + availabilityHelper.forceStop(); break; } return finished; @@ -391,10 +392,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr switch (operatingMode) { case OUTPUT_NOT_INITIALIZED: case READING: - CompletableFuture<Void> sourceReaderAvailable = sourceReader.isAvailable(); - return sourceReaderAvailable == AvailabilityProvider.AVAILABLE - ? sourceReaderAvailable - : CompletableFuture.anyOf(sourceReaderAvailable, forcedStop); + return availabilityHelper.update(sourceReader.isAvailable()); case SOURCE_STOPPED: case DATA_FINISHED: return AvailabilityProvider.AVAILABLE; @@ -457,4 +455,27 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr ListState<SplitT> getReaderState() { return readerState; } + + private static class SourceOperatorAvailabilityHelper { + private final CompletableFuture<Void> forcedStopFuture = new CompletableFuture<>(); + private CompletableFuture<Void> currentReaderFuture; + private CompletableFuture<?> currentCombinedFuture; + + public CompletableFuture<?> update(CompletableFuture<Void> sourceReaderFuture) { + if (sourceReaderFuture == AvailabilityProvider.AVAILABLE) { + return sourceReaderFuture; + } else if (sourceReaderFuture == currentReaderFuture) { + return currentCombinedFuture; + } else { + currentReaderFuture = sourceReaderFuture; + currentCombinedFuture = + CompletableFuture.anyOf(forcedStopFuture, sourceReaderFuture); + return currentCombinedFuture; + } + } + + public void forceStop() { + this.forcedStopFuture.complete(null); + } + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java index 3bf4671..aa0fb7e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.AvailabilityProvider; import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.testutils.MockEnvironment; @@ -61,6 +62,9 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -211,6 +215,14 @@ public class SourceOperatorTest { assertEquals(100L, (long) mockSourceReader.getAbortedCheckpoints().get(0)); } + @Test + public void testSameAvailabilityFuture() { + final CompletableFuture<?> initialFuture = operator.getAvailableFuture(); + final CompletableFuture<?> secondFuture = operator.getAvailableFuture(); + assertThat(initialFuture, not(sameInstance(AvailabilityProvider.AVAILABLE))); + assertThat(secondFuture, sameInstance(initialFuture)); + } + // ---------------- helper methods ------------------------- private StateInitializationContext getStateContext() throws Exception {