[FLINK-9041][tests] Refactor StreamTaskTest to use java 8 CompletableFuture 
instead of scala/akka Promise

This closes #5912.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40e412ae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40e412ae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40e412ae

Branch: refs/heads/master
Commit: 40e412ae233046a5f6f38cf86288ab5185d9d194
Parents: 3242214
Author: Andrey Zagrebin <[email protected]>
Authored: Wed Apr 25 11:02:09 2018 +0200
Committer: zentol <[email protected]>
Committed: Wed May 2 15:18:06 2018 +0200

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTaskTest.java | 41 +++++++-------------
 pom.xml                                         |  2 +-
 2 files changed, 15 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/40e412ae/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index caea662..73a575e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.CheckpointingOptions;
@@ -110,7 +111,6 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
-import akka.dispatch.Futures;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -125,6 +125,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -135,16 +136,11 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-import scala.concurrent.impl.Promise;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -178,7 +174,7 @@ public class StreamTaskTest extends TestLogger {
         */
        @Test
        public void testEarlyCanceling() throws Exception {
-               Deadline deadline = new FiniteDuration(2, 
TimeUnit.MINUTES).fromNow();
+               Deadline deadline = Deadline.fromNow(Duration.ofMinutes(2));
                StreamConfig cfg = new StreamConfig(new Configuration());
                cfg.setOperatorID(new OperatorID(4711L, 42L));
                cfg.setStreamOperator(new SlowlyDeserializingOperator());
@@ -194,7 +190,7 @@ public class StreamTaskTest extends TestLogger {
                Future<ExecutionState> running = 
testingExecutionStateListener.notifyWhenExecutionState(ExecutionState.RUNNING);
 
                // wait until the task thread reached state RUNNING
-               ExecutionState executionState = Await.result(running, 
deadline.timeLeft());
+               ExecutionState executionState = 
running.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
                // make sure the task is really running
                if (executionState != ExecutionState.RUNNING) {
@@ -208,7 +204,7 @@ public class StreamTaskTest extends TestLogger {
 
                Future<ExecutionState> canceling = 
testingExecutionStateListener.notifyWhenExecutionState(ExecutionState.CANCELING);
 
-               executionState = Await.result(canceling, deadline.timeLeft());
+               executionState = canceling.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
 
                // the task should reach state canceled eventually
                assertTrue(executionState == ExecutionState.CANCELING ||
@@ -858,25 +854,17 @@ public class StreamTaskTest extends TestLogger {
 
                private ExecutionState executionState = null;
 
-               private final PriorityQueue<Tuple2<ExecutionState, 
Promise<ExecutionState>>> priorityQueue = new PriorityQueue<>(
-                       1,
-                       new Comparator<Tuple2<ExecutionState, 
Promise<ExecutionState>>>() {
-                               @Override
-                               public int compare(Tuple2<ExecutionState, 
Promise<ExecutionState>> o1, Tuple2<ExecutionState, Promise<ExecutionState>> 
o2) {
-                                       return o1.f0.ordinal() - 
o2.f0.ordinal();
-                               }
-                       });
+               private final PriorityQueue<Tuple2<ExecutionState, 
CompletableFuture<ExecutionState>>> priorityQueue =
+                       new PriorityQueue<>(1, Comparator.comparingInt(o -> 
o.f0.ordinal()));
 
-               public Future<ExecutionState> 
notifyWhenExecutionState(ExecutionState executionState) {
+               Future<ExecutionState> notifyWhenExecutionState(ExecutionState 
executionState) {
                        synchronized (priorityQueue) {
                                if (this.executionState != null && 
this.executionState.ordinal() >= executionState.ordinal()) {
-                                       return 
Futures.<ExecutionState>successful(executionState);
+                                       return 
CompletableFuture.completedFuture(executionState);
                                } else {
-                                       Promise<ExecutionState> promise = new 
Promise.DefaultPromise<ExecutionState>();
-
+                                       CompletableFuture<ExecutionState> 
promise = new CompletableFuture<>();
                                        
priorityQueue.offer(Tuple2.of(executionState, promise));
-
-                                       return promise.future();
+                                       return promise;
                                }
                        }
                }
@@ -887,9 +875,8 @@ public class StreamTaskTest extends TestLogger {
                                this.executionState = 
taskExecutionState.getExecutionState();
 
                                while (!priorityQueue.isEmpty() && 
priorityQueue.peek().f0.ordinal() <= executionState.ordinal()) {
-                                       Promise<ExecutionState> promise = 
priorityQueue.poll().f1;
-
-                                       promise.success(executionState);
+                                       CompletableFuture<ExecutionState> 
promise = priorityQueue.poll().f1;
+                                       promise.complete(executionState);
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/40e412ae/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index baf4fda..3191da3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -452,7 +452,7 @@ under the License.
                        <dependency>
                                <groupId>org.javassist</groupId>
                                <artifactId>javassist</artifactId>
-                               <version>3.18.2-GA</version>
+                               <version>3.19.0-GA</version>
                        </dependency>
 
                        <!-- joda time is pulled in different versions by 
different transitive dependencies-->

Reply via email to