Repository: flink Updated Branches: refs/heads/master a6c8953eb -> d2a8e3741
[FLINK-7332] [futures] Replace Flink's futures with Java 8's CompletableFuture in TaskExecutor This closes #4448. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d2a8e374 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d2a8e374 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d2a8e374 Branch: refs/heads/master Commit: d2a8e37415eb34ca9cb8b2d8c22a33aa99b494a6 Parents: a6c8953 Author: Till Rohrmann <trohrm...@apache.org> Authored: Tue Aug 1 10:46:40 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Aug 1 23:13:56 2017 +0200 ---------------------------------------------------------------------- .../runtime/taskexecutor/TaskExecutor.java | 100 +++++++++---------- 1 file changed, 46 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d2a8e374/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 4c4b0a7..aa4d6d2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -27,9 +27,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.SlotID; -import org.apache.flink.runtime.concurrent.AcceptFunction; -import org.apache.flink.runtime.concurrent.ApplyFunction; -import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; @@ -771,55 +768,51 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { reservedSlots.add(offer); } - Future<Iterable<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots( - getResourceID(), - reservedSlots, - leaderId, - taskManagerConfiguration.getTimeout()); - - Future<Void> acceptedSlotsAcceptFuture = acceptedSlotsFuture.thenAcceptAsync(new AcceptFunction<Iterable<SlotOffer>>() { - @Override - public void accept(Iterable<SlotOffer> acceptedSlots) { - // check if the response is still valid - if (isJobManagerConnectionValid(jobId, leaderId)) { - // mark accepted slots active - for (SlotOffer acceptedSlot : acceptedSlots) { - reservedSlots.remove(acceptedSlot); - } - - final Exception e = new Exception("The slot was rejected by the JobManager."); + CompletableFuture<Iterable<SlotOffer>> acceptedSlotsFuture = FutureUtils.toJava( + jobMasterGateway.offerSlots( + getResourceID(), + reservedSlots, + leaderId, + taskManagerConfiguration.getTimeout())); + + acceptedSlotsFuture.whenCompleteAsync( + (Iterable<SlotOffer> acceptedSlots, Throwable throwable) -> { + if (throwable != null) { + if (throwable instanceof TimeoutException) { + log.info("Slot offering to JobManager did not finish in time. Retrying the slot offering."); + // We ran into a timeout. Try again. + offerSlotsToJobManager(jobId); + } else { + log.warn("Slot offering to JobManager failed. Freeing the slots " + + "and returning them to the ResourceManager.", throwable); - for (SlotOffer rejectedSlot: reservedSlots) { - freeSlot(rejectedSlot.getAllocationId(), e); + // We encountered an exception. Free the slots and return them to the RM. + for (SlotOffer reservedSlot: reservedSlots) { + freeSlot(reservedSlot.getAllocationId(), throwable); + } } } else { - // discard the response since there is a new leader for the job - log.debug("Discard offer slot response since there is a new leader " + - "for the job {}.", jobId); - } - } - }, getMainThreadExecutor()); + // check if the response is still valid + if (isJobManagerConnectionValid(jobId, leaderId)) { + // mark accepted slots active + for (SlotOffer acceptedSlot : acceptedSlots) { + reservedSlots.remove(acceptedSlot); + } - acceptedSlotsAcceptFuture.exceptionally(new ApplyFunction<Throwable, Void>() { - @Override - public Void apply(Throwable throwable) { - if (throwable instanceof TimeoutException) { - log.info("Slot offering to JobManager did not finish in time. Retrying the slot offering."); - // We ran into a timeout. Try again. - offerSlotsToJobManager(jobId); - } else { - log.warn("Slot offering to JobManager failed. Freeing the slots " + - "and returning them to the ResourceManager.", throwable); + final Exception e = new Exception("The slot was rejected by the JobManager."); - // We encountered an exception. Free the slots and return them to the RM. - for (SlotOffer reservedSlot: reservedSlots) { - freeSlot(reservedSlot.getAllocationId(), throwable); + for (SlotOffer rejectedSlot : reservedSlots) { + freeSlot(rejectedSlot.getAllocationId(), e); + } + } else { + // discard the response since there is a new leader for the job + log.debug("Discard offer slot response since there is a new leader " + + "for the job {}.", jobId); } } + }, + getMainThreadExecutor()); - return null; - } - }); } else { log.debug("There are no unassigned slots for the job {}.", jobId); } @@ -992,17 +985,16 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { { final ExecutionAttemptID executionAttemptID = taskExecutionState.getID(); - Future<Acknowledge> futureAcknowledge = jobMasterGateway.updateTaskExecutionState( - jobMasterLeaderId, taskExecutionState); + CompletableFuture<Acknowledge> futureAcknowledge = FutureUtils.toJava( + jobMasterGateway.updateTaskExecutionState(jobMasterLeaderId, taskExecutionState)); - futureAcknowledge.exceptionallyAsync(new ApplyFunction<Throwable, Void>() { - @Override - public Void apply(Throwable value) { - failTask(executionAttemptID, value); - - return null; - } - }, getMainThreadExecutor()); + futureAcknowledge.whenCompleteAsync( + (ack, throwable) -> { + if (throwable != null) { + failTask(executionAttemptID, throwable); + } + }, + getMainThreadExecutor()); } private void unregisterTaskAndNotifyFinalState(