Repository: flink Updated Branches: refs/heads/master 4a9f19b9f -> 4378ac3ae
[FLINK-7322] [futures] Replace Flink's futures with Java 8's CompletableFuture in CheckpointCoordinator Fix failing JobManagerITCase This closes #4436. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4378ac3a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4378ac3a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4378ac3a Branch: refs/heads/master Commit: 4378ac3ae36f12c8678d2090f7c344832d6d0761 Parents: 4a9f19b Author: Till Rohrmann <trohrm...@apache.org> Authored: Mon Jul 31 19:05:22 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Aug 1 13:54:50 2017 +0200 ---------------------------------------------------------------------- .../checkpoint/CheckpointCoordinator.java | 41 ++++++++++---------- .../runtime/checkpoint/PendingCheckpoint.java | 9 ++--- .../flink/runtime/jobmanager/JobManager.scala | 11 +++--- .../checkpoint/CheckpointCoordinatorTest.java | 20 +++++----- .../checkpoint/PendingCheckpointTest.java | 4 +- .../runtime/jobmanager/JobManagerITCase.scala | 7 ++-- .../testingUtils/TestingJobManagerLike.scala | 3 +- 7 files changed, 48 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 3e36158..5cab7f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -26,9 +26,6 @@ import org.apache.flink.configuration.CoreOptions; import org.apache.flink.runtime.checkpoint.hooks.MasterHooks; import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader; import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; -import org.apache.flink.runtime.concurrent.ApplyFunction; -import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -58,6 +55,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -362,7 +360,7 @@ public class CheckpointCoordinator { * configured * @throws Exception Failures during triggering are forwarded */ - public Future<CompletedCheckpoint> triggerSavepoint(long timestamp, String targetDirectory) throws Exception { + public CompletableFuture<CompletedCheckpoint> triggerSavepoint(long timestamp, String targetDirectory) throws Exception { checkNotNull(targetDirectory, "Savepoint target directory"); CheckpointProperties props = CheckpointProperties.forStandardSavepoint(); @@ -377,29 +375,30 @@ public class CheckpointCoordinator { savepointDirectory, false); - Future<CompletedCheckpoint> result; + CompletableFuture<CompletedCheckpoint> result; if (triggerResult.isSuccess()) { result = triggerResult.getPendingCheckpoint().getCompletionFuture(); } else { Throwable cause = new Exception("Failed to trigger savepoint: " + triggerResult.getFailureReason().message()); - result = FlinkCompletableFuture.completedExceptionally(cause); + result = new CompletableFuture<>(); + result.completeExceptionally(cause); + return result; } // Make sure to remove the created base directory on Exceptions - result.exceptionallyAsync(new ApplyFunction<Throwable, Void>() { - @Override - public Void apply(Throwable value) { - try { - SavepointStore.deleteSavepointDirectory(savepointDirectory); - } catch (Throwable t) { - LOG.warn("Failed to delete savepoint directory " + savepointDirectory - + " after failed savepoint.", t); + result.whenCompleteAsync( + (CompletedCheckpoint checkpoint, Throwable throwable) -> { + if (throwable != null) { + try { + SavepointStore.deleteSavepointDirectory(savepointDirectory); + } catch (Throwable t) { + LOG.warn("Failed to delete savepoint directory " + savepointDirectory + + " after failed savepoint.", t); + } } - - return null; - } - }, executor); + }, + executor); return result; } @@ -427,7 +426,7 @@ public class CheckpointCoordinator { */ @VisibleForTesting @Internal - public Future<CompletedCheckpoint> triggerCheckpoint(long timestamp, CheckpointOptions options) throws Exception { + public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(long timestamp, CheckpointOptions options) throws Exception { switch (options.getCheckpointType()) { case SAVEPOINT: return triggerSavepoint(timestamp, options.getTargetLocation()); @@ -440,7 +439,9 @@ public class CheckpointCoordinator { return triggerResult.getPendingCheckpoint().getCompletionFuture(); } else { Throwable cause = new Exception("Failed to trigger checkpoint: " + triggerResult.getFailureReason().message()); - return FlinkCompletableFuture.completedExceptionally(cause); + CompletableFuture<CompletedCheckpoint> failedResult = new CompletableFuture<>(); + failedResult.completeExceptionally(cause); + return failedResult; } default: http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 0633fec..3472fc2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -22,8 +22,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.checkpoint.savepoint.Savepoint; import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2; -import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -47,6 +45,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; @@ -104,7 +103,7 @@ public class PendingCheckpoint { private final String targetDirectory; /** The promise to fulfill once the checkpoint has been completed. */ - private final FlinkCompletableFuture<CompletedCheckpoint> onCompletionPromise; + private final CompletableFuture<CompletedCheckpoint> onCompletionPromise; /** The executor for potentially blocking I/O operations, like state disposal */ private final Executor executor; @@ -149,7 +148,7 @@ public class PendingCheckpoint { this.operatorStates = new HashMap<>(); this.masterState = new ArrayList<>(); this.acknowledgedTasks = new HashSet<>(verticesToConfirm.size()); - this.onCompletionPromise = new FlinkCompletableFuture<>(); + this.onCompletionPromise = new CompletableFuture<>(); } // -------------------------------------------------------------------------------------------- @@ -249,7 +248,7 @@ public class PendingCheckpoint { * * @return A future to the completed checkpoint */ - public Future<CompletedCheckpoint> getCompletionFuture() { + public CompletableFuture<CompletedCheckpoint> getCompletionFuture() { return onCompletionPromise; } http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 3128cfc..a6712ad 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -21,7 +21,8 @@ package org.apache.flink.runtime.jobmanager import java.io.IOException import java.net._ import java.util.UUID -import java.util.concurrent.{TimeUnit, Future => _, TimeoutException => _, _} +import java.util.concurrent.{Future => JavaFuture, _} +import java.util.function.BiFunction import akka.actor.Status.{Failure, Success} import akka.actor._ @@ -38,13 +39,13 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} import org.apache.flink.runtime.blob.{BlobServer, BlobStore} import org.apache.flink.runtime.checkpoint._ -import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, SavepointStore} +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore import org.apache.flink.runtime.client._ import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.clusterframework.messages._ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager import org.apache.flink.runtime.clusterframework.types.ResourceID -import org.apache.flink.runtime.concurrent.{AcceptFunction, ApplyFunction, BiFunction, Executors => FlinkExecutors} +import org.apache.flink.runtime.concurrent.{AcceptFunction, ApplyFunction, Executors => FlinkExecutors} import org.apache.flink.runtime.execution.SuppressRestartsException import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager} import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory @@ -58,7 +59,6 @@ import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkSchedule import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway import org.apache.flink.runtime.jobmaster.JobMaster import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService} -import org.apache.flink.runtime.jobmaster.JobMaster.{ARCHIVE_NAME, JOB_MANAGER_NAME} import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged import org.apache.flink.runtime.messages.JobManagerMessages._ @@ -80,12 +80,11 @@ import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils import org.apache.flink.runtime.security.SecurityUtils import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration import org.apache.flink.runtime.taskexecutor.TaskExecutor -import org.apache.flink.runtime.taskexecutor.TaskExecutor.TASK_MANAGER_NAME import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.util._ import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} -import org.apache.flink.util.{ConfigurationUtil, InstantiationUtil, NetUtils} +import org.apache.flink.util.{InstantiationUtil, NetUtils} import scala.collection.JavaConverters._ import scala.collection.mutable http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 186a819..e78152a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -26,7 +26,6 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -76,6 +75,7 @@ import java.util.Map; import java.util.Random; import java.util.UUID; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -1446,7 +1446,7 @@ public class CheckpointCoordinatorTest extends TestLogger { // trigger the first checkpoint. this should succeed String savepointDir = tmpFolder.newFolder().getAbsolutePath(); - Future<CompletedCheckpoint> savepointFuture = coord.triggerSavepoint(timestamp, savepointDir); + CompletableFuture<CompletedCheckpoint> savepointFuture = coord.triggerSavepoint(timestamp, savepointDir); assertFalse(savepointFuture.isDone()); // validate that we have a pending savepoint @@ -1601,7 +1601,7 @@ public class CheckpointCoordinatorTest extends TestLogger { String savepointDir = tmpFolder.newFolder().getAbsolutePath(); // Trigger savepoint and checkpoint - Future<CompletedCheckpoint> savepointFuture1 = coord.triggerSavepoint(timestamp, savepointDir); + CompletableFuture<CompletedCheckpoint> savepointFuture1 = coord.triggerSavepoint(timestamp, savepointDir); long savepointId1 = counter.getLast(); assertEquals(1, coord.getNumberOfPendingCheckpoints()); @@ -1626,7 +1626,7 @@ public class CheckpointCoordinatorTest extends TestLogger { long checkpointId3 = counter.getLast(); assertEquals(2, coord.getNumberOfPendingCheckpoints()); - Future<CompletedCheckpoint> savepointFuture2 = coord.triggerSavepoint(timestamp + 4, savepointDir); + CompletableFuture<CompletedCheckpoint> savepointFuture2 = coord.triggerSavepoint(timestamp + 4, savepointDir); long savepointId2 = counter.getLast(); assertEquals(3, coord.getNumberOfPendingCheckpoints()); @@ -1911,7 +1911,7 @@ public class CheckpointCoordinatorTest extends TestLogger { null, Executors.directExecutor()); - List<Future<CompletedCheckpoint>> savepointFutures = new ArrayList<>(); + List<CompletableFuture<CompletedCheckpoint>> savepointFutures = new ArrayList<>(); int numSavepoints = 5; @@ -1923,7 +1923,7 @@ public class CheckpointCoordinatorTest extends TestLogger { } // After triggering multiple savepoints, all should in progress - for (Future<CompletedCheckpoint> savepointFuture : savepointFutures) { + for (CompletableFuture<CompletedCheckpoint> savepointFuture : savepointFutures) { assertFalse(savepointFuture.isDone()); } @@ -1934,7 +1934,7 @@ public class CheckpointCoordinatorTest extends TestLogger { } // After ACKs, all should be completed - for (Future<CompletedCheckpoint> savepointFuture : savepointFutures) { + for (CompletableFuture<CompletedCheckpoint> savepointFuture : savepointFutures) { assertTrue(savepointFuture.isDone()); } } @@ -1966,10 +1966,10 @@ public class CheckpointCoordinatorTest extends TestLogger { String savepointDir = tmpFolder.newFolder().getAbsolutePath(); - Future<CompletedCheckpoint> savepoint0 = coord.triggerSavepoint(0, savepointDir); + CompletableFuture<CompletedCheckpoint> savepoint0 = coord.triggerSavepoint(0, savepointDir); assertFalse("Did not trigger savepoint", savepoint0.isDone()); - Future<CompletedCheckpoint> savepoint1 = coord.triggerSavepoint(1, savepointDir); + CompletableFuture<CompletedCheckpoint> savepoint1 = coord.triggerSavepoint(1, savepointDir); assertFalse("Did not trigger savepoint", savepoint1.isDone()); } @@ -3600,7 +3600,7 @@ public class CheckpointCoordinatorTest extends TestLogger { assertTrue(1 == completedCheckpointStore.getNumberOfRetainedCheckpoints()); // trigger a savepoint --> this should not have any effect on the CompletedCheckpointStore - Future<CompletedCheckpoint> savepointFuture = checkpointCoordinator.triggerSavepoint(savepointTimestamp, savepointDir); + CompletableFuture<CompletedCheckpoint> savepointFuture = checkpointCoordinator.triggerSavepoint(savepointTimestamp, savepointDir); checkpointCoordinator.receiveAcknowledgeMessage( new AcknowledgeCheckpoint( http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index a96b597..7d103d0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; @@ -40,6 +39,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Queue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; @@ -134,7 +134,7 @@ public class PendingCheckpointTest { // Abort declined PendingCheckpoint pending = createPendingCheckpoint(props, "ignored"); - Future<CompletedCheckpoint> future = pending.getCompletionFuture(); + CompletableFuture<CompletedCheckpoint> future = pending.getCompletionFuture(); assertFalse(future.isDone()); pending.abortDeclined(); http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala index 5fb9ddf..e209608 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala @@ -18,6 +18,8 @@ package org.apache.flink.runtime.jobmanager +import java.util.concurrent.CompletableFuture + import akka.actor.ActorSystem import akka.testkit.{ImplicitSender, TestKit} import akka.util.Timeout @@ -25,7 +27,6 @@ import org.apache.flink.api.common.JobID import org.apache.flink.runtime.akka.ListeningBehaviour import org.apache.flink.runtime.checkpoint.{CheckpointCoordinator, CompletedCheckpoint} import org.apache.flink.runtime.client.JobExecutionException -import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture import org.apache.flink.runtime.io.network.partition.ResultPartitionType import org.apache.flink.runtime.jobgraph.tasks.{ExternalizedCheckpointSettings, JobCheckpointingSettings} import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex, ScheduleMode} @@ -913,7 +914,7 @@ class JobManagerITCase(_system: ActorSystem) doThrow(new Exception("Expected Test Exception")) .when(checkpointCoordinator) .triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString()) - val savepointPathPromise = new FlinkCompletableFuture[CompletedCheckpoint]() + val savepointPathPromise = new CompletableFuture[CompletedCheckpoint]() doReturn(savepointPathPromise) .when(checkpointCoordinator) .triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString()) @@ -982,7 +983,7 @@ class JobManagerITCase(_system: ActorSystem) .when(checkpointCoordinator) .triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString()) - val savepointPromise = new FlinkCompletableFuture[CompletedCheckpoint]() + val savepointPromise = new CompletableFuture[CompletedCheckpoint]() doReturn(savepointPromise) .when(checkpointCoordinator) .triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString()) http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala index 3d3af95..cd88133 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala @@ -18,6 +18,8 @@ package org.apache.flink.runtime.testingUtils +import java.util.function.BiFunction + import akka.actor.{ActorRef, Cancellable, Terminated} import akka.pattern.{ask, pipe} import org.apache.flink.api.common.JobID @@ -25,7 +27,6 @@ import org.apache.flink.runtime.FlinkActor import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType import org.apache.flink.runtime.checkpoint.CompletedCheckpoint import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore -import org.apache.flink.runtime.concurrent.BiFunction import org.apache.flink.runtime.execution.ExecutionState import org.apache.flink.runtime.jobgraph.JobStatus import org.apache.flink.runtime.jobmanager.JobManager