Repository: flink
Updated Branches:
  refs/heads/master 4a9f19b9f -> 4378ac3ae


[FLINK-7322] [futures] Replace Flink's futures with Java 8's CompletableFuture 
in CheckpointCoordinator

Fix failing JobManagerITCase

This closes #4436.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4378ac3a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4378ac3a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4378ac3a

Branch: refs/heads/master
Commit: 4378ac3ae36f12c8678d2090f7c344832d6d0761
Parents: 4a9f19b
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Mon Jul 31 19:05:22 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Aug 1 13:54:50 2017 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 41 ++++++++++----------
 .../runtime/checkpoint/PendingCheckpoint.java   |  9 ++---
 .../flink/runtime/jobmanager/JobManager.scala   | 11 +++---
 .../checkpoint/CheckpointCoordinatorTest.java   | 20 +++++-----
 .../checkpoint/PendingCheckpointTest.java       |  4 +-
 .../runtime/jobmanager/JobManagerITCase.scala   |  7 ++--
 .../testingUtils/TestingJobManagerLike.scala    |  3 +-
 7 files changed, 48 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 3e36158..5cab7f8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -26,9 +26,6 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -58,6 +55,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -362,7 +360,7 @@ public class CheckpointCoordinator {
         *                               configured
         * @throws Exception             Failures during triggering are 
forwarded
         */
-       public Future<CompletedCheckpoint> triggerSavepoint(long timestamp, 
String targetDirectory) throws Exception {
+       public CompletableFuture<CompletedCheckpoint> triggerSavepoint(long 
timestamp, String targetDirectory) throws Exception {
                checkNotNull(targetDirectory, "Savepoint target directory");
 
                CheckpointProperties props = 
CheckpointProperties.forStandardSavepoint();
@@ -377,29 +375,30 @@ public class CheckpointCoordinator {
                        savepointDirectory,
                        false);
 
-               Future<CompletedCheckpoint> result;
+               CompletableFuture<CompletedCheckpoint> result;
 
                if (triggerResult.isSuccess()) {
                        result = 
triggerResult.getPendingCheckpoint().getCompletionFuture();
                } else {
                        Throwable cause = new Exception("Failed to trigger 
savepoint: " + triggerResult.getFailureReason().message());
-                       result = 
FlinkCompletableFuture.completedExceptionally(cause);
+                       result = new CompletableFuture<>();
+                       result.completeExceptionally(cause);
+                       return result;
                }
 
                // Make sure to remove the created base directory on Exceptions
-               result.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
-                       @Override
-                       public Void apply(Throwable value) {
-                               try {
-                                       
SavepointStore.deleteSavepointDirectory(savepointDirectory);
-                               } catch (Throwable t) {
-                                       LOG.warn("Failed to delete savepoint 
directory " + savepointDirectory
-                                               + " after failed savepoint.", 
t);
+               result.whenCompleteAsync(
+                       (CompletedCheckpoint checkpoint, Throwable throwable) 
-> {
+                               if (throwable != null) {
+                                       try {
+                                               
SavepointStore.deleteSavepointDirectory(savepointDirectory);
+                                       } catch (Throwable t) {
+                                               LOG.warn("Failed to delete 
savepoint directory " + savepointDirectory
+                                                       + " after failed 
savepoint.", t);
+                                       }
                                }
-
-                               return null;
-                       }
-               }, executor);
+                       },
+                       executor);
 
                return result;
        }
@@ -427,7 +426,7 @@ public class CheckpointCoordinator {
         */
        @VisibleForTesting
        @Internal
-       public Future<CompletedCheckpoint> triggerCheckpoint(long timestamp, 
CheckpointOptions options) throws Exception {
+       public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(long 
timestamp, CheckpointOptions options) throws Exception {
                switch (options.getCheckpointType()) {
                        case SAVEPOINT:
                                return triggerSavepoint(timestamp, 
options.getTargetLocation());
@@ -440,7 +439,9 @@ public class CheckpointCoordinator {
                                        return 
triggerResult.getPendingCheckpoint().getCompletionFuture();
                                } else {
                                        Throwable cause = new Exception("Failed 
to trigger checkpoint: " + triggerResult.getFailureReason().message());
-                                       return 
FlinkCompletableFuture.completedExceptionally(cause);
+                                       CompletableFuture<CompletedCheckpoint> 
failedResult = new CompletableFuture<>();
+                                       
failedResult.completeExceptionally(cause);
+                                       return failedResult;
                                }
 
                        default:

http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 0633fec..3472fc2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -47,6 +45,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 
@@ -104,7 +103,7 @@ public class PendingCheckpoint {
        private final String targetDirectory;
 
        /** The promise to fulfill once the checkpoint has been completed. */
-       private final FlinkCompletableFuture<CompletedCheckpoint> 
onCompletionPromise;
+       private final CompletableFuture<CompletedCheckpoint> 
onCompletionPromise;
 
        /** The executor for potentially blocking I/O operations, like state 
disposal */
        private final Executor executor;
@@ -149,7 +148,7 @@ public class PendingCheckpoint {
                this.operatorStates = new HashMap<>();
                this.masterState = new ArrayList<>();
                this.acknowledgedTasks = new 
HashSet<>(verticesToConfirm.size());
-               this.onCompletionPromise = new FlinkCompletableFuture<>();
+               this.onCompletionPromise = new CompletableFuture<>();
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -249,7 +248,7 @@ public class PendingCheckpoint {
         *
         * @return A future to the completed checkpoint
         */
-       public Future<CompletedCheckpoint> getCompletionFuture() {
+       public CompletableFuture<CompletedCheckpoint> getCompletionFuture() {
                return onCompletionPromise;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 3128cfc..a6712ad 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -21,7 +21,8 @@ package org.apache.flink.runtime.jobmanager
 import java.io.IOException
 import java.net._
 import java.util.UUID
-import java.util.concurrent.{TimeUnit, Future => _, TimeoutException => _, _}
+import java.util.concurrent.{Future => JavaFuture, _}
+import java.util.function.BiFunction
 
 import akka.actor.Status.{Failure, Success}
 import akka.actor._
@@ -38,13 +39,13 @@ import 
org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.blob.{BlobServer, BlobStore}
 import org.apache.flink.runtime.checkpoint._
-import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, 
SavepointStore}
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.client._
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.messages._
 import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.concurrent.{AcceptFunction, ApplyFunction, 
BiFunction, Executors => FlinkExecutors}
+import org.apache.flink.runtime.concurrent.{AcceptFunction, ApplyFunction, 
Executors => FlinkExecutors}
 import org.apache.flink.runtime.execution.SuppressRestartsException
 import 
org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, 
LibraryCacheManager}
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
@@ -58,7 +59,6 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkSchedule
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway
 import org.apache.flink.runtime.jobmaster.JobMaster
 import org.apache.flink.runtime.leaderelection.{LeaderContender, 
LeaderElectionService}
-import org.apache.flink.runtime.jobmaster.JobMaster.{ARCHIVE_NAME, 
JOB_MANAGER_NAME}
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
 import 
org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
 import org.apache.flink.runtime.messages.JobManagerMessages._
@@ -80,12 +80,11 @@ import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
 import org.apache.flink.runtime.taskexecutor.TaskExecutor
-import org.apache.flink.runtime.taskexecutor.TaskExecutor.TASK_MANAGER_NAME
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, 
LogMessages}
-import org.apache.flink.util.{ConfigurationUtil, InstantiationUtil, NetUtils}
+import org.apache.flink.util.{InstantiationUtil, NetUtils}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable

http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 186a819..e78152a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -76,6 +75,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -1446,7 +1446,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
 
                // trigger the first checkpoint. this should succeed
                String savepointDir = tmpFolder.newFolder().getAbsolutePath();
-               Future<CompletedCheckpoint> savepointFuture = 
coord.triggerSavepoint(timestamp, savepointDir);
+               CompletableFuture<CompletedCheckpoint> savepointFuture = 
coord.triggerSavepoint(timestamp, savepointDir);
                assertFalse(savepointFuture.isDone());
 
                // validate that we have a pending savepoint
@@ -1601,7 +1601,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
                // Trigger savepoint and checkpoint
-               Future<CompletedCheckpoint> savepointFuture1 = 
coord.triggerSavepoint(timestamp, savepointDir);
+               CompletableFuture<CompletedCheckpoint> savepointFuture1 = 
coord.triggerSavepoint(timestamp, savepointDir);
                long savepointId1 = counter.getLast();
                assertEquals(1, coord.getNumberOfPendingCheckpoints());
 
@@ -1626,7 +1626,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                long checkpointId3 = counter.getLast();
                assertEquals(2, coord.getNumberOfPendingCheckpoints());
 
-               Future<CompletedCheckpoint> savepointFuture2 = 
coord.triggerSavepoint(timestamp + 4, savepointDir);
+               CompletableFuture<CompletedCheckpoint> savepointFuture2 = 
coord.triggerSavepoint(timestamp + 4, savepointDir);
                long savepointId2 = counter.getLast();
                assertEquals(3, coord.getNumberOfPendingCheckpoints());
 
@@ -1911,7 +1911,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        null,
                        Executors.directExecutor());
 
-               List<Future<CompletedCheckpoint>> savepointFutures = new 
ArrayList<>();
+               List<CompletableFuture<CompletedCheckpoint>> savepointFutures = 
new ArrayList<>();
 
                int numSavepoints = 5;
 
@@ -1923,7 +1923,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                }
 
                // After triggering multiple savepoints, all should in progress
-               for (Future<CompletedCheckpoint> savepointFuture : 
savepointFutures) {
+               for (CompletableFuture<CompletedCheckpoint> savepointFuture : 
savepointFutures) {
                        assertFalse(savepointFuture.isDone());
                }
 
@@ -1934,7 +1934,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                }
 
                // After ACKs, all should be completed
-               for (Future<CompletedCheckpoint> savepointFuture : 
savepointFutures) {
+               for (CompletableFuture<CompletedCheckpoint> savepointFuture : 
savepointFutures) {
                        assertTrue(savepointFuture.isDone());
                }
        }
@@ -1966,10 +1966,10 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
 
                String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
-               Future<CompletedCheckpoint> savepoint0 = 
coord.triggerSavepoint(0, savepointDir);
+               CompletableFuture<CompletedCheckpoint> savepoint0 = 
coord.triggerSavepoint(0, savepointDir);
                assertFalse("Did not trigger savepoint", savepoint0.isDone());
 
-               Future<CompletedCheckpoint> savepoint1 = 
coord.triggerSavepoint(1, savepointDir);
+               CompletableFuture<CompletedCheckpoint> savepoint1 = 
coord.triggerSavepoint(1, savepointDir);
                assertFalse("Did not trigger savepoint", savepoint1.isDone());
        }
 
@@ -3600,7 +3600,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                assertTrue(1 == 
completedCheckpointStore.getNumberOfRetainedCheckpoints());
 
                // trigger a savepoint --> this should not have any effect on 
the CompletedCheckpointStore
-               Future<CompletedCheckpoint> savepointFuture = 
checkpointCoordinator.triggerSavepoint(savepointTimestamp, savepointDir);
+               CompletableFuture<CompletedCheckpoint> savepointFuture = 
checkpointCoordinator.triggerSavepoint(savepointTimestamp, savepointDir);
 
                checkpointCoordinator.receiveAcknowledgeMessage(
                        new AcknowledgeCheckpoint(

http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index a96b597..7d103d0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -40,6 +39,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 
@@ -134,7 +134,7 @@ public class PendingCheckpointTest {
 
                // Abort declined
                PendingCheckpoint pending = createPendingCheckpoint(props, 
"ignored");
-               Future<CompletedCheckpoint> future = 
pending.getCompletionFuture();
+               CompletableFuture<CompletedCheckpoint> future = 
pending.getCompletionFuture();
 
                assertFalse(future.isDone());
                pending.abortDeclined();

http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 5fb9ddf..e209608 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.jobmanager
 
+import java.util.concurrent.CompletableFuture
+
 import akka.actor.ActorSystem
 import akka.testkit.{ImplicitSender, TestKit}
 import akka.util.Timeout
@@ -25,7 +27,6 @@ import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.checkpoint.{CheckpointCoordinator, 
CompletedCheckpoint}
 import org.apache.flink.runtime.client.JobExecutionException
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType
 import 
org.apache.flink.runtime.jobgraph.tasks.{ExternalizedCheckpointSettings, 
JobCheckpointingSettings}
 import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, 
JobVertex, ScheduleMode}
@@ -913,7 +914,7 @@ class JobManagerITCase(_system: ActorSystem)
           doThrow(new Exception("Expected Test Exception"))
             .when(checkpointCoordinator)
             .triggerSavepoint(org.mockito.Matchers.anyLong(), 
org.mockito.Matchers.anyString())
-          val savepointPathPromise = new 
FlinkCompletableFuture[CompletedCheckpoint]()
+          val savepointPathPromise = new 
CompletableFuture[CompletedCheckpoint]()
           doReturn(savepointPathPromise)
             .when(checkpointCoordinator)
             .triggerSavepoint(org.mockito.Matchers.anyLong(), 
org.mockito.Matchers.anyString())
@@ -982,7 +983,7 @@ class JobManagerITCase(_system: ActorSystem)
             .when(checkpointCoordinator)
             .triggerSavepoint(org.mockito.Matchers.anyLong(), 
org.mockito.Matchers.anyString())
 
-          val savepointPromise = new 
FlinkCompletableFuture[CompletedCheckpoint]()
+          val savepointPromise = new CompletableFuture[CompletedCheckpoint]()
           doReturn(savepointPromise)
             .when(checkpointCoordinator)
             .triggerSavepoint(org.mockito.Matchers.anyLong(), 
org.mockito.Matchers.anyString())

http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index 3d3af95..cd88133 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.testingUtils
 
+import java.util.function.BiFunction
+
 import akka.actor.{ActorRef, Cancellable, Terminated}
 import akka.pattern.{ask, pipe}
 import org.apache.flink.api.common.JobID
@@ -25,7 +27,6 @@ import org.apache.flink.runtime.FlinkActor
 import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
-import org.apache.flink.runtime.concurrent.BiFunction
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.jobmanager.JobManager

Reply via email to