Repository: flink
Updated Branches:
  refs/heads/master 748448b5f -> 7c639c600


[FLINK-7318] [futures] Replace Flink's futures in StackTraceSampleCoordinator 
with Java 8 CompletableFuture

This closes #4431.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c639c60
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c639c60
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c639c60

Branch: refs/heads/master
Commit: 7c639c600eb08923ec9dc0caaf337d70e2ac1719
Parents: 748448b
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Mon Jul 31 17:55:06 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Aug 1 13:38:37 2017 +0200

----------------------------------------------------------------------
 .../webmonitor/BackPressureStatsTracker.java    |  6 +--
 .../webmonitor/StackTraceSampleCoordinator.java | 45 ++++++++++----------
 .../BackPressureStatsTrackerTest.java           |  8 ++--
 .../StackTraceSampleCoordinatorITCase.java      |  7 +--
 .../StackTraceSampleCoordinatorTest.java        | 32 +++++++-------
 5 files changed, 50 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7c639c60/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
index 894309c..26be769 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
@@ -19,8 +19,6 @@
 package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.BiFunction;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -38,8 +36,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
 
 import scala.Option;
 
@@ -177,7 +177,7 @@ public class BackPressureStatsTracker {
                                                LOG.debug("Triggering stack 
trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices()));
                                        }
 
-                                       Future<StackTraceSample> sample = 
coordinator.triggerStackTraceSample(
+                                       CompletableFuture<StackTraceSample> 
sample = coordinator.triggerStackTraceSample(
                                                        
vertex.getTaskVertices(),
                                                        numSamples,
                                                        delayBetweenSamples,

http://git-wip-us.apache.org/repos/asf/flink/blob/7c639c60/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
index 5a85343..3521f58 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
@@ -19,10 +19,7 @@
 package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.BiFunction;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -42,6 +39,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -105,7 +103,7 @@ public class StackTraceSampleCoordinator {
         * @return A future of the completed stack trace sample
         */
        @SuppressWarnings("unchecked")
-       public Future<StackTraceSample> triggerStackTraceSample(
+       public CompletableFuture<StackTraceSample> triggerStackTraceSample(
                        ExecutionVertex[] tasksToSample,
                        int numSamples,
                        Time delayBetweenSamples,
@@ -128,15 +126,18 @@ public class StackTraceSampleCoordinator {
                                executions[i] = execution;
                                triggerIds[i] = execution.getAttemptId();
                        } else {
-                               return 
FlinkCompletableFuture.completedExceptionally(
-                                       new IllegalStateException("Task " + 
tasksToSample[i]
+                               CompletableFuture<StackTraceSample> result = 
new CompletableFuture();
+                               result.completeExceptionally(new 
IllegalStateException("Task " + tasksToSample[i]
                                        .getTaskNameWithSubtaskIndex() + " is 
not running."));
+                               return result;
                        }
                }
 
                synchronized (lock) {
                        if (isShutDown) {
-                               return 
FlinkCompletableFuture.completedExceptionally(new IllegalStateException("Shut 
down"));
+                               CompletableFuture<StackTraceSample> result = 
new CompletableFuture();
+                               result.completeExceptionally(new 
IllegalStateException("Shut down"));
+                               return result;
                        }
 
                        final int sampleId = sampleIdCounter++;
@@ -158,16 +159,16 @@ public class StackTraceSampleCoordinator {
 
                        // Trigger all samples
                        for (Execution execution: executions) {
-                               final Future<StackTraceSampleResponse> 
stackTraceSampleFuture = execution.requestStackTraceSample(
-                                       sampleId,
-                                       numSamples,
-                                       delayBetweenSamples,
-                                       maxStackTraceDepth,
-                                       timeout);
-
-                               stackTraceSampleFuture.handleAsync(new 
BiFunction<StackTraceSampleResponse, Throwable, Void>() {
-                                       @Override
-                                       public Void 
apply(StackTraceSampleResponse stackTraceSampleResponse, Throwable throwable) {
+                               final 
CompletableFuture<StackTraceSampleResponse> stackTraceSampleFuture = 
FutureUtils.toJava(
+                                       execution.requestStackTraceSample(
+                                               sampleId,
+                                               numSamples,
+                                               delayBetweenSamples,
+                                               maxStackTraceDepth,
+                                               timeout));
+
+                               stackTraceSampleFuture.handleAsync(
+                                       (StackTraceSampleResponse 
stackTraceSampleResponse, Throwable throwable) -> {
                                                if (stackTraceSampleResponse != 
null) {
                                                        collectStackTraces(
                                                                
stackTraceSampleResponse.getSampleId(),
@@ -178,8 +179,8 @@ public class StackTraceSampleCoordinator {
                                                }
 
                                                return null;
-                                       }
-                               }, executor);
+                                       },
+                                       executor);
                        }
 
                        return pending.getStackTraceSampleFuture();
@@ -321,7 +322,7 @@ public class StackTraceSampleCoordinator {
                        this.startTime = System.currentTimeMillis();
                        this.pendingTasks = new 
HashSet<>(Arrays.asList(tasksToCollect));
                        this.stackTracesByTask = 
Maps.newHashMapWithExpectedSize(tasksToCollect.length);
-                       this.stackTraceFuture = new FlinkCompletableFuture<>();
+                       this.stackTraceFuture = new CompletableFuture<>();
                }
 
                int getSampleId() {
@@ -388,7 +389,7 @@ public class StackTraceSampleCoordinator {
                }
 
                @SuppressWarnings("unchecked")
-               Future<StackTraceSample> getStackTraceSampleFuture() {
+               CompletableFuture<StackTraceSample> getStackTraceSampleFuture() 
{
                        return stackTraceFuture;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7c639c60/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
index efb410d..e99d1b7 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
@@ -20,8 +20,6 @@ package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -29,6 +27,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
@@ -36,6 +35,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 import static org.junit.Assert.assertEquals;
@@ -51,13 +51,13 @@ import static org.mockito.Mockito.when;
 /**
  * Tests for the BackPressureStatsTracker.
  */
-public class BackPressureStatsTrackerTest {
+public class BackPressureStatsTrackerTest extends TestLogger {
 
        /** Tests simple statistics with fake stack traces. */
        @Test
        @SuppressWarnings("unchecked")
        public void testTriggerStackTraceSample() throws Exception {
-               CompletableFuture<StackTraceSample> sampleFuture = new 
FlinkCompletableFuture<>();
+               CompletableFuture<StackTraceSample> sampleFuture = new 
CompletableFuture<>();
 
                StackTraceSampleCoordinator sampleCoordinator = 
mock(StackTraceSampleCoordinator.class);
                when(sampleCoordinator.triggerStackTraceSample(

http://git-wip-us.apache.org/repos/asf/flink/blob/7c639c60/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index e494a9b..47b43a5 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -23,7 +23,6 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -44,9 +43,11 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import scala.concurrent.Await;
+import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.AllVerticesRunning;
@@ -150,7 +151,7 @@ public class StackTraceSampleCoordinatorITCase extends 
TestLogger {
                                                                
StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator(
                                                                                
testActorSystem.dispatcher(), 60000);
 
-                                                               
Future<StackTraceSample> sampleFuture = coordinator.triggerStackTraceSample(
+                                                               
CompletableFuture<StackTraceSample> sampleFuture = 
coordinator.triggerStackTraceSample(
                                                                        
vertex.getTaskVertices(),
                                                                        // Do 
this often so we have a good
                                                                        // 
chance of removing the job during
@@ -164,7 +165,7 @@ public class StackTraceSampleCoordinatorITCase extends 
TestLogger {
                                                                
Thread.sleep(sleepTime);
 
                                                                // Cancel job
-                                                               
scala.concurrent.Future<?> removeFuture = jm.ask(
+                                                               Future<?> 
removeFuture = jm.ask(
                                                                                
new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()),
                                                                                
remaining());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7c639c60/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
index bf79b3f..7d8535a 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
@@ -21,8 +21,6 @@ package org.apache.flink.runtime.webmonitor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
@@ -31,6 +29,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import 
org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
+import org.apache.flink.util.TestLogger;
 
 import akka.actor.ActorSystem;
 import org.junit.AfterClass;
@@ -41,6 +40,7 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -62,7 +62,7 @@ import static org.mockito.Mockito.when;
 /**
  * Test for the {@link StackTraceSampleCoordinator}.
  */
-public class StackTraceSampleCoordinatorTest {
+public class StackTraceSampleCoordinatorTest extends TestLogger {
 
        private static ActorSystem system;
 
@@ -99,7 +99,7 @@ public class StackTraceSampleCoordinatorTest {
                Time delayBetweenSamples = Time.milliseconds(100L);
                int maxStackTraceDepth = 0;
 
-               Future<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
+               CompletableFuture<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
                                vertices, numSamples, delayBetweenSamples, 
maxStackTraceDepth);
 
                // Verify messages have been sent
@@ -169,7 +169,7 @@ public class StackTraceSampleCoordinatorTest {
                                mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.DEPLOYING, true)
                };
 
-               Future<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
+               CompletableFuture<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
                        vertices,
                        1,
                        Time.milliseconds(100L),
@@ -194,7 +194,7 @@ public class StackTraceSampleCoordinatorTest {
                                mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, false)
                };
 
-               Future<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
+               CompletableFuture<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
                        vertices,
                        1,
                        Time.milliseconds(100L),
@@ -227,7 +227,7 @@ public class StackTraceSampleCoordinatorTest {
                                        timeout)
                        };
 
-                       Future<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
+                       CompletableFuture<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
                                vertices, 1, Time.milliseconds(100L), 0);
 
                        // Wait for the timeout
@@ -273,7 +273,7 @@ public class StackTraceSampleCoordinatorTest {
                                mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true),
                };
 
-               Future<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
+               CompletableFuture<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
                                vertices, 1, Time.milliseconds(100L), 0);
 
                assertFalse(sampleFuture.isDone());
@@ -295,7 +295,7 @@ public class StackTraceSampleCoordinatorTest {
                                mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true),
                };
 
-               Future<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
+               CompletableFuture<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
                                vertices, 1, Time.milliseconds(100L), 0);
 
                assertFalse(sampleFuture.isDone());
@@ -316,7 +316,7 @@ public class StackTraceSampleCoordinatorTest {
                                mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true),
                };
 
-               Future<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
+               CompletableFuture<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
                                vertices, 1, Time.milliseconds(100L), 0);
 
                assertFalse(sampleFuture.isDone());
@@ -350,7 +350,7 @@ public class StackTraceSampleCoordinatorTest {
                                mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true),
                };
 
-               List<Future<StackTraceSample>> sampleFutures = new 
ArrayList<>();
+               List<CompletableFuture<StackTraceSample>> sampleFutures = new 
ArrayList<>();
 
                // Trigger
                sampleFutures.add(coord.triggerStackTraceSample(
@@ -359,7 +359,7 @@ public class StackTraceSampleCoordinatorTest {
                sampleFutures.add(coord.triggerStackTraceSample(
                                vertices, 1, Time.milliseconds(100L), 0));
 
-               for (Future<StackTraceSample> future : sampleFutures) {
+               for (CompletableFuture<StackTraceSample> future : 
sampleFutures) {
                        assertFalse(future.isDone());
                }
 
@@ -367,12 +367,12 @@ public class StackTraceSampleCoordinatorTest {
                coord.shutDown();
 
                // Verify all completed
-               for (Future<StackTraceSample> future : sampleFutures) {
+               for (CompletableFuture<StackTraceSample> future : 
sampleFutures) {
                        assertTrue(future.isDone());
                }
 
                // Verify new trigger returns failed future
-               Future<StackTraceSample> future = coord.triggerStackTraceSample(
+               CompletableFuture<StackTraceSample> future = 
coord.triggerStackTraceSample(
                                vertices, 1, Time.milliseconds(100L), 0);
 
                assertTrue(future.isDone());
@@ -400,7 +400,7 @@ public class StackTraceSampleCoordinatorTest {
                        .thenReturn(
                                sendSuccess ?
                                        
FlinkCompletableFuture.completed(mock(StackTraceSampleResponse.class)) :
-                                       
FlinkCompletableFuture.<StackTraceSampleResponse>completedExceptionally(new 
Exception("Send failed")));
+                                       
FlinkCompletableFuture.completedExceptionally(new Exception("Send failed")));
 
                ExecutionVertex vertex = mock(ExecutionVertex.class);
                when(vertex.getJobvertexId()).thenReturn(new JobVertexID());
@@ -415,7 +415,7 @@ public class StackTraceSampleCoordinatorTest {
                ScheduledExecutorService scheduledExecutorService,
                int timeout) {
 
-               final CompletableFuture<StackTraceSampleResponse> future = new 
FlinkCompletableFuture<>();
+               final FlinkCompletableFuture<StackTraceSampleResponse> future = 
new FlinkCompletableFuture<>();
 
                Execution exec = mock(Execution.class);
                when(exec.getAttemptId()).thenReturn(executionId);

Reply via email to