Repository: flink Updated Branches: refs/heads/master 748448b5f -> 7c639c600
[FLINK-7318] [futures] Replace Flink's futures in StackTraceSampleCoordinator with Java 8 CompletableFuture This closes #4431. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c639c60 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c639c60 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c639c60 Branch: refs/heads/master Commit: 7c639c600eb08923ec9dc0caaf337d70e2ac1719 Parents: 748448b Author: Till Rohrmann <trohrm...@apache.org> Authored: Mon Jul 31 17:55:06 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Aug 1 13:38:37 2017 +0200 ---------------------------------------------------------------------- .../webmonitor/BackPressureStatsTracker.java | 6 +-- .../webmonitor/StackTraceSampleCoordinator.java | 45 ++++++++++---------- .../BackPressureStatsTrackerTest.java | 8 ++-- .../StackTraceSampleCoordinatorITCase.java | 7 +-- .../StackTraceSampleCoordinatorTest.java | 32 +++++++------- 5 files changed, 50 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7c639c60/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java index 894309c..26be769 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java @@ -19,8 +19,6 @@ package org.apache.flink.runtime.webmonitor; import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.concurrent.BiFunction; -import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; @@ -38,8 +36,10 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import scala.Option; @@ -177,7 +177,7 @@ public class BackPressureStatsTracker { LOG.debug("Triggering stack trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices())); } - Future<StackTraceSample> sample = coordinator.triggerStackTraceSample( + CompletableFuture<StackTraceSample> sample = coordinator.triggerStackTraceSample( vertex.getTaskVertices(), numSamples, delayBetweenSamples, http://git-wip-us.apache.org/repos/asf/flink/blob/7c639c60/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java index 5a85343..3521f58 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java @@ -19,10 +19,7 @@ package org.apache.flink.runtime.webmonitor; import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.concurrent.BiFunction; -import org.apache.flink.runtime.concurrent.CompletableFuture; -import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -42,6 +39,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import static org.apache.flink.util.Preconditions.checkArgument; @@ -105,7 +103,7 @@ public class StackTraceSampleCoordinator { * @return A future of the completed stack trace sample */ @SuppressWarnings("unchecked") - public Future<StackTraceSample> triggerStackTraceSample( + public CompletableFuture<StackTraceSample> triggerStackTraceSample( ExecutionVertex[] tasksToSample, int numSamples, Time delayBetweenSamples, @@ -128,15 +126,18 @@ public class StackTraceSampleCoordinator { executions[i] = execution; triggerIds[i] = execution.getAttemptId(); } else { - return FlinkCompletableFuture.completedExceptionally( - new IllegalStateException("Task " + tasksToSample[i] + CompletableFuture<StackTraceSample> result = new CompletableFuture(); + result.completeExceptionally(new IllegalStateException("Task " + tasksToSample[i] .getTaskNameWithSubtaskIndex() + " is not running.")); + return result; } } synchronized (lock) { if (isShutDown) { - return FlinkCompletableFuture.completedExceptionally(new IllegalStateException("Shut down")); + CompletableFuture<StackTraceSample> result = new CompletableFuture(); + result.completeExceptionally(new IllegalStateException("Shut down")); + return result; } final int sampleId = sampleIdCounter++; @@ -158,16 +159,16 @@ public class StackTraceSampleCoordinator { // Trigger all samples for (Execution execution: executions) { - final Future<StackTraceSampleResponse> stackTraceSampleFuture = execution.requestStackTraceSample( - sampleId, - numSamples, - delayBetweenSamples, - maxStackTraceDepth, - timeout); - - stackTraceSampleFuture.handleAsync(new BiFunction<StackTraceSampleResponse, Throwable, Void>() { - @Override - public Void apply(StackTraceSampleResponse stackTraceSampleResponse, Throwable throwable) { + final CompletableFuture<StackTraceSampleResponse> stackTraceSampleFuture = FutureUtils.toJava( + execution.requestStackTraceSample( + sampleId, + numSamples, + delayBetweenSamples, + maxStackTraceDepth, + timeout)); + + stackTraceSampleFuture.handleAsync( + (StackTraceSampleResponse stackTraceSampleResponse, Throwable throwable) -> { if (stackTraceSampleResponse != null) { collectStackTraces( stackTraceSampleResponse.getSampleId(), @@ -178,8 +179,8 @@ public class StackTraceSampleCoordinator { } return null; - } - }, executor); + }, + executor); } return pending.getStackTraceSampleFuture(); @@ -321,7 +322,7 @@ public class StackTraceSampleCoordinator { this.startTime = System.currentTimeMillis(); this.pendingTasks = new HashSet<>(Arrays.asList(tasksToCollect)); this.stackTracesByTask = Maps.newHashMapWithExpectedSize(tasksToCollect.length); - this.stackTraceFuture = new FlinkCompletableFuture<>(); + this.stackTraceFuture = new CompletableFuture<>(); } int getSampleId() { @@ -388,7 +389,7 @@ public class StackTraceSampleCoordinator { } @SuppressWarnings("unchecked") - Future<StackTraceSample> getStackTraceSampleFuture() { + CompletableFuture<StackTraceSample> getStackTraceSampleFuture() { return stackTraceFuture; } } http://git-wip-us.apache.org/repos/asf/flink/blob/7c639c60/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java index efb410d..e99d1b7 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java @@ -20,8 +20,6 @@ package org.apache.flink.runtime.webmonitor; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.concurrent.CompletableFuture; -import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionGraph; @@ -29,6 +27,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -36,6 +35,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import static org.junit.Assert.assertEquals; @@ -51,13 +51,13 @@ import static org.mockito.Mockito.when; /** * Tests for the BackPressureStatsTracker. */ -public class BackPressureStatsTrackerTest { +public class BackPressureStatsTrackerTest extends TestLogger { /** Tests simple statistics with fake stack traces. */ @Test @SuppressWarnings("unchecked") public void testTriggerStackTraceSample() throws Exception { - CompletableFuture<StackTraceSample> sampleFuture = new FlinkCompletableFuture<>(); + CompletableFuture<StackTraceSample> sampleFuture = new CompletableFuture<>(); StackTraceSampleCoordinator sampleCoordinator = mock(StackTraceSampleCoordinator.class); when(sampleCoordinator.triggerStackTraceSample( http://git-wip-us.apache.org/repos/asf/flink/blob/7c639c60/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java index e494a9b..47b43a5 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java @@ -23,7 +23,6 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobClient; -import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -44,9 +43,11 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import scala.concurrent.Await; +import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.AllVerticesRunning; @@ -150,7 +151,7 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger { StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator( testActorSystem.dispatcher(), 60000); - Future<StackTraceSample> sampleFuture = coordinator.triggerStackTraceSample( + CompletableFuture<StackTraceSample> sampleFuture = coordinator.triggerStackTraceSample( vertex.getTaskVertices(), // Do this often so we have a good // chance of removing the job during @@ -164,7 +165,7 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger { Thread.sleep(sleepTime); // Cancel job - scala.concurrent.Future<?> removeFuture = jm.ask( + Future<?> removeFuture = jm.ask( new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), remaining()); http://git-wip-us.apache.org/repos/asf/flink/blob/7c639c60/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java index bf79b3f..7d8535a 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java @@ -21,8 +21,6 @@ package org.apache.flink.runtime.webmonitor; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.concurrent.CompletableFuture; -import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.Execution; @@ -31,6 +29,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample; import org.apache.flink.runtime.messages.StackTraceSampleResponse; +import org.apache.flink.util.TestLogger; import akka.actor.ActorSystem; import org.junit.AfterClass; @@ -41,6 +40,7 @@ import org.junit.Test; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -62,7 +62,7 @@ import static org.mockito.Mockito.when; /** * Test for the {@link StackTraceSampleCoordinator}. */ -public class StackTraceSampleCoordinatorTest { +public class StackTraceSampleCoordinatorTest extends TestLogger { private static ActorSystem system; @@ -99,7 +99,7 @@ public class StackTraceSampleCoordinatorTest { Time delayBetweenSamples = Time.milliseconds(100L); int maxStackTraceDepth = 0; - Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( + CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( vertices, numSamples, delayBetweenSamples, maxStackTraceDepth); // Verify messages have been sent @@ -169,7 +169,7 @@ public class StackTraceSampleCoordinatorTest { mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.DEPLOYING, true) }; - Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( + CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( vertices, 1, Time.milliseconds(100L), @@ -194,7 +194,7 @@ public class StackTraceSampleCoordinatorTest { mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, false) }; - Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( + CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( vertices, 1, Time.milliseconds(100L), @@ -227,7 +227,7 @@ public class StackTraceSampleCoordinatorTest { timeout) }; - Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( + CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( vertices, 1, Time.milliseconds(100L), 0); // Wait for the timeout @@ -273,7 +273,7 @@ public class StackTraceSampleCoordinatorTest { mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), }; - Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( + CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( vertices, 1, Time.milliseconds(100L), 0); assertFalse(sampleFuture.isDone()); @@ -295,7 +295,7 @@ public class StackTraceSampleCoordinatorTest { mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), }; - Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( + CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( vertices, 1, Time.milliseconds(100L), 0); assertFalse(sampleFuture.isDone()); @@ -316,7 +316,7 @@ public class StackTraceSampleCoordinatorTest { mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), }; - Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( + CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample( vertices, 1, Time.milliseconds(100L), 0); assertFalse(sampleFuture.isDone()); @@ -350,7 +350,7 @@ public class StackTraceSampleCoordinatorTest { mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), }; - List<Future<StackTraceSample>> sampleFutures = new ArrayList<>(); + List<CompletableFuture<StackTraceSample>> sampleFutures = new ArrayList<>(); // Trigger sampleFutures.add(coord.triggerStackTraceSample( @@ -359,7 +359,7 @@ public class StackTraceSampleCoordinatorTest { sampleFutures.add(coord.triggerStackTraceSample( vertices, 1, Time.milliseconds(100L), 0)); - for (Future<StackTraceSample> future : sampleFutures) { + for (CompletableFuture<StackTraceSample> future : sampleFutures) { assertFalse(future.isDone()); } @@ -367,12 +367,12 @@ public class StackTraceSampleCoordinatorTest { coord.shutDown(); // Verify all completed - for (Future<StackTraceSample> future : sampleFutures) { + for (CompletableFuture<StackTraceSample> future : sampleFutures) { assertTrue(future.isDone()); } // Verify new trigger returns failed future - Future<StackTraceSample> future = coord.triggerStackTraceSample( + CompletableFuture<StackTraceSample> future = coord.triggerStackTraceSample( vertices, 1, Time.milliseconds(100L), 0); assertTrue(future.isDone()); @@ -400,7 +400,7 @@ public class StackTraceSampleCoordinatorTest { .thenReturn( sendSuccess ? FlinkCompletableFuture.completed(mock(StackTraceSampleResponse.class)) : - FlinkCompletableFuture.<StackTraceSampleResponse>completedExceptionally(new Exception("Send failed"))); + FlinkCompletableFuture.completedExceptionally(new Exception("Send failed"))); ExecutionVertex vertex = mock(ExecutionVertex.class); when(vertex.getJobvertexId()).thenReturn(new JobVertexID()); @@ -415,7 +415,7 @@ public class StackTraceSampleCoordinatorTest { ScheduledExecutorService scheduledExecutorService, int timeout) { - final CompletableFuture<StackTraceSampleResponse> future = new FlinkCompletableFuture<>(); + final FlinkCompletableFuture<StackTraceSampleResponse> future = new FlinkCompletableFuture<>(); Execution exec = mock(Execution.class); when(exec.getAttemptId()).thenReturn(executionId);