Repository: flink
Updated Branches:
  refs/heads/master a6c8953eb -> d2a8e3741


[FLINK-7332] [futures] Replace Flink's futures with Java 8's CompletableFuture 
in TaskExecutor

This closes #4448.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d2a8e374
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d2a8e374
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d2a8e374

Branch: refs/heads/master
Commit: d2a8e37415eb34ca9cb8b2d8c22a33aa99b494a6
Parents: a6c8953
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Tue Aug 1 10:46:40 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Aug 1 23:13:56 2017 +0200

----------------------------------------------------------------------
 .../runtime/taskexecutor/TaskExecutor.java      | 100 +++++++++----------
 1 file changed, 46 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d2a8e374/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 4c4b0a7..aa4d6d2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -27,9 +27,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
@@ -771,55 +768,51 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                                        reservedSlots.add(offer);
                                }
 
-                               Future<Iterable<SlotOffer>> acceptedSlotsFuture 
= jobMasterGateway.offerSlots(
-                                       getResourceID(),
-                                       reservedSlots,
-                                       leaderId,
-                                       taskManagerConfiguration.getTimeout());
-
-                               Future<Void> acceptedSlotsAcceptFuture = 
acceptedSlotsFuture.thenAcceptAsync(new AcceptFunction<Iterable<SlotOffer>>() {
-                                       @Override
-                                       public void accept(Iterable<SlotOffer> 
acceptedSlots) {
-                                               // check if the response is 
still valid
-                                               if 
(isJobManagerConnectionValid(jobId, leaderId)) {
-                                                       // mark accepted slots 
active
-                                                       for (SlotOffer 
acceptedSlot : acceptedSlots) {
-                                                               
reservedSlots.remove(acceptedSlot);
-                                                       }
-
-                                                       final Exception e = new 
Exception("The slot was rejected by the JobManager.");
+                               CompletableFuture<Iterable<SlotOffer>> 
acceptedSlotsFuture = FutureUtils.toJava(
+                                       jobMasterGateway.offerSlots(
+                                               getResourceID(),
+                                               reservedSlots,
+                                               leaderId,
+                                               
taskManagerConfiguration.getTimeout()));
+
+                               acceptedSlotsFuture.whenCompleteAsync(
+                                       (Iterable<SlotOffer> acceptedSlots, 
Throwable throwable) -> {
+                                               if (throwable != null) {
+                                                       if (throwable 
instanceof TimeoutException) {
+                                                               log.info("Slot 
offering to JobManager did not finish in time. Retrying the slot offering.");
+                                                               // We ran into 
a timeout. Try again.
+                                                               
offerSlotsToJobManager(jobId);
+                                                       } else {
+                                                               log.warn("Slot 
offering to JobManager failed. Freeing the slots " +
+                                                                       "and 
returning them to the ResourceManager.", throwable);
 
-                                                       for (SlotOffer 
rejectedSlot: reservedSlots) {
-                                                               
freeSlot(rejectedSlot.getAllocationId(), e);
+                                                               // We 
encountered an exception. Free the slots and return them to the RM.
+                                                               for (SlotOffer 
reservedSlot: reservedSlots) {
+                                                                       
freeSlot(reservedSlot.getAllocationId(), throwable);
+                                                               }
                                                        }
                                                } else {
-                                                       // discard the response 
since there is a new leader for the job
-                                                       log.debug("Discard 
offer slot response since there is a new leader " +
-                                                               "for the job 
{}.", jobId);
-                                               }
-                                       }
-                               }, getMainThreadExecutor());
+                                                       // check if the 
response is still valid
+                                                       if 
(isJobManagerConnectionValid(jobId, leaderId)) {
+                                                               // mark 
accepted slots active
+                                                               for (SlotOffer 
acceptedSlot : acceptedSlots) {
+                                                                       
reservedSlots.remove(acceptedSlot);
+                                                               }
 
-                               acceptedSlotsAcceptFuture.exceptionally(new 
ApplyFunction<Throwable, Void>() {
-                                       @Override
-                                       public Void apply(Throwable throwable) {
-                                               if (throwable instanceof 
TimeoutException) {
-                                                       log.info("Slot offering 
to JobManager did not finish in time. Retrying the slot offering.");
-                                                       // We ran into a 
timeout. Try again.
-                                                       
offerSlotsToJobManager(jobId);
-                                               } else {
-                                                       log.warn("Slot offering 
to JobManager failed. Freeing the slots " +
-                                                                       "and 
returning them to the ResourceManager.", throwable);
+                                                               final Exception 
e = new Exception("The slot was rejected by the JobManager.");
 
-                                                       // We encountered an 
exception. Free the slots and return them to the RM.
-                                                       for (SlotOffer 
reservedSlot: reservedSlots) {
-                                                               
freeSlot(reservedSlot.getAllocationId(), throwable);
+                                                               for (SlotOffer 
rejectedSlot : reservedSlots) {
+                                                                       
freeSlot(rejectedSlot.getAllocationId(), e);
+                                                               }
+                                                       } else {
+                                                               // discard the 
response since there is a new leader for the job
+                                                               
log.debug("Discard offer slot response since there is a new leader " +
+                                                                       "for 
the job {}.", jobId);
                                                        }
                                                }
+                                       },
+                                       getMainThreadExecutor());
 
-                                               return null;
-                                       }
-                               });
                        } else {
                                log.debug("There are no unassigned slots for 
the job {}.", jobId);
                        }
@@ -992,17 +985,16 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        {
                final ExecutionAttemptID executionAttemptID = 
taskExecutionState.getID();
 
-               Future<Acknowledge> futureAcknowledge = 
jobMasterGateway.updateTaskExecutionState(
-                               jobMasterLeaderId, taskExecutionState);
+               CompletableFuture<Acknowledge> futureAcknowledge = 
FutureUtils.toJava(
+                       
jobMasterGateway.updateTaskExecutionState(jobMasterLeaderId, 
taskExecutionState));
 
-               futureAcknowledge.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
-                       @Override
-                       public Void apply(Throwable value) {
-                               failTask(executionAttemptID, value);
-
-                               return null;
-                       }
-               }, getMainThreadExecutor());
+               futureAcknowledge.whenCompleteAsync(
+                       (ack, throwable) -> {
+                               if (throwable != null) {
+                                       failTask(executionAttemptID, throwable);
+                               }
+                       },
+                       getMainThreadExecutor());
        }
 
        private void unregisterTaskAndNotifyFinalState(

Reply via email to