Repository: flink Updated Branches: refs/heads/master fcac882d2 -> f41eb4b1e
[FLINK-7327] [futures] Replace Flink's future with Java 8's CompletableFuture in StreamRecordQueueEntry This closes #4442. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f41eb4b1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f41eb4b1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f41eb4b1 Branch: refs/heads/master Commit: f41eb4b1ea0112cce4b4edb4a25037fafa2aac23 Parents: fcac882 Author: Till Rohrmann <trohrm...@apache.org> Authored: Mon Jul 31 21:31:26 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Aug 1 14:04:43 2017 +0200 ---------------------------------------------------------------------- .../api/operators/async/AsyncWaitOperator.java | 10 ++++---- .../async/queue/OrderedStreamElementQueue.java | 10 ++++---- .../async/queue/StreamElementQueueEntry.java | 24 ++++++++------------ .../async/queue/StreamRecordQueueEntry.java | 8 +++---- .../queue/UnorderedStreamElementQueue.java | 10 ++++---- .../async/queue/WatermarkQueueEntry.java | 10 ++++---- 6 files changed, 29 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java index 56c199d..a0f626e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java @@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.concurrent.AcceptFunction; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.datastream.AsyncDataStream; @@ -217,12 +216,11 @@ public class AsyncWaitOperator<IN, OUT> // Cancel the timer once we've completed the stream record buffer entry. This will remove // the register trigger task - streamRecordBufferEntry.onComplete(new AcceptFunction<StreamElementQueueEntry<Collection<OUT>>>() { - @Override - public void accept(StreamElementQueueEntry<Collection<OUT>> value) { + streamRecordBufferEntry.onComplete( + (StreamElementQueueEntry<Collection<OUT>> value) -> { timerFuture.cancel(true); - } - }, executor); + }, + executor); } addAsyncBufferEntry(streamRecordBufferEntry); http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java index e573fc1..5133809 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; -import org.apache.flink.runtime.concurrent.AcceptFunction; import org.apache.flink.streaming.api.operators.async.OperatorActions; import org.apache.flink.util.Preconditions; @@ -193,9 +192,8 @@ public class OrderedStreamElementQueue implements StreamElementQueue { queue.addLast(streamElementQueueEntry); - streamElementQueueEntry.onComplete(new AcceptFunction<StreamElementQueueEntry<T>>() { - @Override - public void accept(StreamElementQueueEntry<T> value) { + streamElementQueueEntry.onComplete( + (StreamElementQueueEntry<T> value) -> { try { onCompleteHandler(value); } catch (InterruptedException e) { @@ -206,8 +204,8 @@ public class OrderedStreamElementQueue implements StreamElementQueue { operatorActions.failOperator(new Exception("Could not complete the " + "stream element queue entry: " + value + '.', t)); } - } - }, executor); + }, + executor); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java index 67b1f0f..c59e012 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java @@ -19,13 +19,12 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; -import org.apache.flink.runtime.concurrent.AcceptFunction; -import org.apache.flink.runtime.concurrent.BiFunction; -import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.util.Preconditions; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.function.Consumer; /** * Entry class for the {@link StreamElementQueue}. The stream element queue entry stores the @@ -63,23 +62,18 @@ public abstract class StreamElementQueueEntry<T> implements AsyncResult { * @param executor to run the complete function */ public void onComplete( - final AcceptFunction<StreamElementQueueEntry<T>> completeFunction, + final Consumer<StreamElementQueueEntry<T>> completeFunction, Executor executor) { final StreamElementQueueEntry<T> thisReference = this; - getFuture().handleAsync(new BiFunction<T, Throwable, Void>() { - @Override - public Void apply(T t, Throwable throwable) { - // call the complete function for normal completion as well as exceptional completion - // see FLINK-6435 - completeFunction.accept(thisReference); - - return null; - } - }, executor); + getFuture().whenCompleteAsync( + // call the complete function for normal completion as well as exceptional completion + // see FLINK-6435 + (value, throwable) -> completeFunction.accept(thisReference), + executor); } - protected abstract Future<T> getFuture(); + protected abstract CompletableFuture<T> getFuture(); @Override public final boolean isWatermark() { http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java index 708bf17..2aca10e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java @@ -19,14 +19,12 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; -import org.apache.flink.runtime.concurrent.CompletableFuture; -import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.streaming.api.functions.async.AsyncFunction; import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import java.util.Collection; +import java.util.concurrent.CompletableFuture; /** * {@link StreamElementQueueEntry} implementation for {@link StreamRecord}. This class also acts @@ -52,7 +50,7 @@ public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collect hasTimestamp = streamRecord.hasTimestamp(); timestamp = streamRecord.getTimestamp(); - resultFuture = new FlinkCompletableFuture<>(); + resultFuture = new CompletableFuture<>(); } @Override @@ -71,7 +69,7 @@ public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collect } @Override - protected Future<Collection<OUT>> getFuture() { + protected CompletableFuture<Collection<OUT>> getFuture() { return resultFuture; } http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java index e6f71bf..e2c3426 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; -import org.apache.flink.runtime.concurrent.AcceptFunction; import org.apache.flink.streaming.api.operators.async.OperatorActions; import org.apache.flink.util.Preconditions; @@ -285,9 +284,8 @@ public class UnorderedStreamElementQueue implements StreamElementQueue { lastSet.add(streamElementQueueEntry); } - streamElementQueueEntry.onComplete(new AcceptFunction<StreamElementQueueEntry<T>>() { - @Override - public void accept(StreamElementQueueEntry<T> value) { + streamElementQueueEntry.onComplete( + (StreamElementQueueEntry<T> value) -> { try { onCompleteHandler(value); } catch (InterruptedException e) { @@ -299,8 +297,8 @@ public class UnorderedStreamElementQueue implements StreamElementQueue { operatorActions.failOperator(new Exception("Could not complete the " + "stream element queue entry: " + value + '.', t)); } - } - }, executor); + }, + executor); numberEntries++; } http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java index 1f48303..c9c6e74 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java @@ -19,22 +19,22 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; -import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.streaming.api.watermark.Watermark; +import java.util.concurrent.CompletableFuture; + /** * {@link StreamElementQueueEntry} implementation for the {@link Watermark}. */ @Internal public class WatermarkQueueEntry extends StreamElementQueueEntry<Watermark> implements AsyncWatermarkResult { - private final Future<Watermark> future; + private final CompletableFuture<Watermark> future; public WatermarkQueueEntry(Watermark watermark) { super(watermark); - this.future = FlinkCompletableFuture.completed(watermark); + this.future = CompletableFuture.completedFuture(watermark); } @Override @@ -43,7 +43,7 @@ public class WatermarkQueueEntry extends StreamElementQueueEntry<Watermark> impl } @Override - protected Future<Watermark> getFuture() { + protected CompletableFuture<Watermark> getFuture() { return future; } }