[FLINK-9041][tests] Refactor StreamTaskTest to use java 8 CompletableFuture instead of scala/akka Promise
This closes #5912. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40e412ae Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40e412ae Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40e412ae Branch: refs/heads/master Commit: 40e412ae233046a5f6f38cf86288ab5185d9d194 Parents: 3242214 Author: Andrey Zagrebin <[email protected]> Authored: Wed Apr 25 11:02:09 2018 +0200 Committer: zentol <[email protected]> Committed: Wed May 2 15:18:06 2018 +0200 ---------------------------------------------------------------------- .../streaming/runtime/tasks/StreamTaskTest.java | 41 +++++++------------- pom.xml | 2 +- 2 files changed, 15 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/40e412ae/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index caea662..73a575e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; +import org.apache.flink.api.common.time.Deadline; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.CheckpointingOptions; @@ -110,7 +111,6 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; -import akka.dispatch.Futures; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -125,6 +125,7 @@ import org.powermock.modules.junit4.PowerMockRunner; import java.io.Closeable; import java.io.IOException; import java.io.ObjectInputStream; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -135,16 +136,11 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Deadline; -import scala.concurrent.duration.FiniteDuration; -import scala.concurrent.impl.Promise; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -178,7 +174,7 @@ public class StreamTaskTest extends TestLogger { */ @Test public void testEarlyCanceling() throws Exception { - Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow(); + Deadline deadline = Deadline.fromNow(Duration.ofMinutes(2)); StreamConfig cfg = new StreamConfig(new Configuration()); cfg.setOperatorID(new OperatorID(4711L, 42L)); cfg.setStreamOperator(new SlowlyDeserializingOperator()); @@ -194,7 +190,7 @@ public class StreamTaskTest extends TestLogger { Future<ExecutionState> running = testingExecutionStateListener.notifyWhenExecutionState(ExecutionState.RUNNING); // wait until the task thread reached state RUNNING - ExecutionState executionState = Await.result(running, deadline.timeLeft()); + ExecutionState executionState = running.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); // make sure the task is really running if (executionState != ExecutionState.RUNNING) { @@ -208,7 +204,7 @@ public class StreamTaskTest extends TestLogger { Future<ExecutionState> canceling = testingExecutionStateListener.notifyWhenExecutionState(ExecutionState.CANCELING); - executionState = Await.result(canceling, deadline.timeLeft()); + executionState = canceling.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); // the task should reach state canceled eventually assertTrue(executionState == ExecutionState.CANCELING || @@ -858,25 +854,17 @@ public class StreamTaskTest extends TestLogger { private ExecutionState executionState = null; - private final PriorityQueue<Tuple2<ExecutionState, Promise<ExecutionState>>> priorityQueue = new PriorityQueue<>( - 1, - new Comparator<Tuple2<ExecutionState, Promise<ExecutionState>>>() { - @Override - public int compare(Tuple2<ExecutionState, Promise<ExecutionState>> o1, Tuple2<ExecutionState, Promise<ExecutionState>> o2) { - return o1.f0.ordinal() - o2.f0.ordinal(); - } - }); + private final PriorityQueue<Tuple2<ExecutionState, CompletableFuture<ExecutionState>>> priorityQueue = + new PriorityQueue<>(1, Comparator.comparingInt(o -> o.f0.ordinal())); - public Future<ExecutionState> notifyWhenExecutionState(ExecutionState executionState) { + Future<ExecutionState> notifyWhenExecutionState(ExecutionState executionState) { synchronized (priorityQueue) { if (this.executionState != null && this.executionState.ordinal() >= executionState.ordinal()) { - return Futures.<ExecutionState>successful(executionState); + return CompletableFuture.completedFuture(executionState); } else { - Promise<ExecutionState> promise = new Promise.DefaultPromise<ExecutionState>(); - + CompletableFuture<ExecutionState> promise = new CompletableFuture<>(); priorityQueue.offer(Tuple2.of(executionState, promise)); - - return promise.future(); + return promise; } } } @@ -887,9 +875,8 @@ public class StreamTaskTest extends TestLogger { this.executionState = taskExecutionState.getExecutionState(); while (!priorityQueue.isEmpty() && priorityQueue.peek().f0.ordinal() <= executionState.ordinal()) { - Promise<ExecutionState> promise = priorityQueue.poll().f1; - - promise.success(executionState); + CompletableFuture<ExecutionState> promise = priorityQueue.poll().f1; + promise.complete(executionState); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/40e412ae/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index baf4fda..3191da3 100644 --- a/pom.xml +++ b/pom.xml @@ -452,7 +452,7 @@ under the License. <dependency> <groupId>org.javassist</groupId> <artifactId>javassist</artifactId> - <version>3.18.2-GA</version> + <version>3.19.0-GA</version> </dependency> <!-- joda time is pulled in different versions by different transitive dependencies-->
