Repository: flink
Updated Branches:
  refs/heads/master fcac882d2 -> f41eb4b1e


[FLINK-7327] [futures] Replace Flink's future with Java 8's CompletableFuture 
in StreamRecordQueueEntry

This closes #4442.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f41eb4b1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f41eb4b1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f41eb4b1

Branch: refs/heads/master
Commit: f41eb4b1ea0112cce4b4edb4a25037fafa2aac23
Parents: fcac882
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Mon Jul 31 21:31:26 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Aug 1 14:04:43 2017 +0200

----------------------------------------------------------------------
 .../api/operators/async/AsyncWaitOperator.java  | 10 ++++----
 .../async/queue/OrderedStreamElementQueue.java  | 10 ++++----
 .../async/queue/StreamElementQueueEntry.java    | 24 ++++++++------------
 .../async/queue/StreamRecordQueueEntry.java     |  8 +++----
 .../queue/UnorderedStreamElementQueue.java      | 10 ++++----
 .../async/queue/WatermarkQueueEntry.java        | 10 ++++----
 6 files changed, 29 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index 56c199d..a0f626e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
@@ -217,12 +216,11 @@ public class AsyncWaitOperator<IN, OUT>
 
                        // Cancel the timer once we've completed the stream 
record buffer entry. This will remove
                        // the register trigger task
-                       streamRecordBufferEntry.onComplete(new 
AcceptFunction<StreamElementQueueEntry<Collection<OUT>>>() {
-                               @Override
-                               public void 
accept(StreamElementQueueEntry<Collection<OUT>> value) {
+                       streamRecordBufferEntry.onComplete(
+                               (StreamElementQueueEntry<Collection<OUT>> 
value) -> {
                                        timerFuture.cancel(true);
-                               }
-                       }, executor);
+                               },
+                               executor);
                }
 
                addAsyncBufferEntry(streamRecordBufferEntry);

http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
index e573fc1..5133809 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.streaming.api.operators.async.OperatorActions;
 import org.apache.flink.util.Preconditions;
 
@@ -193,9 +192,8 @@ public class OrderedStreamElementQueue implements 
StreamElementQueue {
 
                queue.addLast(streamElementQueueEntry);
 
-               streamElementQueueEntry.onComplete(new 
AcceptFunction<StreamElementQueueEntry<T>>() {
-                       @Override
-                       public void accept(StreamElementQueueEntry<T> value) {
+               streamElementQueueEntry.onComplete(
+                       (StreamElementQueueEntry<T> value) -> {
                                try {
                                        onCompleteHandler(value);
                                } catch (InterruptedException e) {
@@ -206,8 +204,8 @@ public class OrderedStreamElementQueue implements 
StreamElementQueue {
                                        operatorActions.failOperator(new 
Exception("Could not complete the " +
                                                "stream element queue entry: " 
+ value + '.', t));
                                }
-                       }
-               }, executor);
+                       },
+                       executor);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
index 67b1f0f..c59e012 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
@@ -19,13 +19,12 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.BiFunction;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.util.Preconditions;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.function.Consumer;
 
 /**
  * Entry class for the {@link StreamElementQueue}. The stream element queue 
entry stores the
@@ -63,23 +62,18 @@ public abstract class StreamElementQueueEntry<T> implements 
AsyncResult {
         * @param executor to run the complete function
         */
        public void onComplete(
-                       final AcceptFunction<StreamElementQueueEntry<T>> 
completeFunction,
+                       final Consumer<StreamElementQueueEntry<T>> 
completeFunction,
                        Executor executor) {
                final StreamElementQueueEntry<T> thisReference = this;
 
-               getFuture().handleAsync(new BiFunction<T, Throwable, Void>() {
-                       @Override
-                       public Void apply(T t, Throwable throwable) {
-                               // call the complete function for normal 
completion as well as exceptional completion
-                               // see FLINK-6435
-                               completeFunction.accept(thisReference);
-
-                               return null;
-                       }
-               }, executor);
+               getFuture().whenCompleteAsync(
+                       // call the complete function for normal completion as 
well as exceptional completion
+                       // see FLINK-6435
+                       (value, throwable) -> 
completeFunction.accept(thisReference),
+                       executor);
        }
 
-       protected abstract Future<T> getFuture();
+       protected abstract CompletableFuture<T> getFuture();
 
        @Override
        public final boolean isWatermark() {

http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
index 708bf17..2aca10e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
@@ -19,14 +19,12 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.streaming.api.functions.async.AsyncFunction;
 import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * {@link StreamElementQueueEntry} implementation for {@link StreamRecord}. 
This class also acts
@@ -52,7 +50,7 @@ public class StreamRecordQueueEntry<OUT> extends 
StreamElementQueueEntry<Collect
                hasTimestamp = streamRecord.hasTimestamp();
                timestamp = streamRecord.getTimestamp();
 
-               resultFuture = new FlinkCompletableFuture<>();
+               resultFuture = new CompletableFuture<>();
        }
 
        @Override
@@ -71,7 +69,7 @@ public class StreamRecordQueueEntry<OUT> extends 
StreamElementQueueEntry<Collect
        }
 
        @Override
-       protected Future<Collection<OUT>> getFuture() {
+       protected CompletableFuture<Collection<OUT>> getFuture() {
                return resultFuture;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
index e6f71bf..e2c3426 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.streaming.api.operators.async.OperatorActions;
 import org.apache.flink.util.Preconditions;
 
@@ -285,9 +284,8 @@ public class UnorderedStreamElementQueue implements 
StreamElementQueue {
                        lastSet.add(streamElementQueueEntry);
                }
 
-               streamElementQueueEntry.onComplete(new 
AcceptFunction<StreamElementQueueEntry<T>>() {
-                       @Override
-                       public void accept(StreamElementQueueEntry<T> value) {
+               streamElementQueueEntry.onComplete(
+                       (StreamElementQueueEntry<T> value) -> {
                                try {
                                        onCompleteHandler(value);
                                } catch (InterruptedException e) {
@@ -299,8 +297,8 @@ public class UnorderedStreamElementQueue implements 
StreamElementQueue {
                                        operatorActions.failOperator(new 
Exception("Could not complete the " +
                                                "stream element queue entry: " 
+ value + '.', t));
                                }
-                       }
-               }, executor);
+                       },
+                       executor);
 
                numberEntries++;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
index 1f48303..c9c6e74 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
@@ -19,22 +19,22 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.streaming.api.watermark.Watermark;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * {@link StreamElementQueueEntry} implementation for the {@link Watermark}.
  */
 @Internal
 public class WatermarkQueueEntry extends StreamElementQueueEntry<Watermark> 
implements AsyncWatermarkResult {
 
-       private final Future<Watermark> future;
+       private final CompletableFuture<Watermark> future;
 
        public WatermarkQueueEntry(Watermark watermark) {
                super(watermark);
 
-               this.future = FlinkCompletableFuture.completed(watermark);
+               this.future = CompletableFuture.completedFuture(watermark);
        }
 
        @Override
@@ -43,7 +43,7 @@ public class WatermarkQueueEntry extends 
StreamElementQueueEntry<Watermark> impl
        }
 
        @Override
-       protected Future<Watermark> getFuture() {
+       protected CompletableFuture<Watermark> getFuture() {
                return future;
        }
 }

Reply via email to