This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 00f2c62749206dff95ae6fd02f0e3d8b8530be89 Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Sat Feb 5 17:30:23 2022 +0100 [FLINK-25817] Unify logic between TaskExecutor.requestSlot and TaskExecutor.tryLoadLocalAllocationSnapshots This commit unifies the logic between TaskExecutor.requestSlot and TaskExecutor.tryLoadLocalAllocationSnapshots. This helps to reduce maintaince costs since both method use the same logic. This closes #18237. --- .../flink/runtime/taskexecutor/TaskExecutor.java | 48 +++++++++++++--------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 7e4afb2..94a2262 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -1080,10 +1080,28 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { slotId, jobId, targetAddress, allocationId, resourceProfile)); try { - allocateSlot(slotId, jobId, allocationId, resourceProfile); - } catch (SlotAllocationException sae) { - return FutureUtils.completedExceptionally(sae); + final boolean isConnected = + allocateSlotForJob(jobId, slotId, allocationId, resourceProfile, targetAddress); + + if (isConnected) { + offerSlotsToJobManager(jobId); + } + + return CompletableFuture.completedFuture(Acknowledge.get()); + } catch (SlotAllocationException e) { + log.debug("Could not allocate slot for allocation id {}.", allocationId, e); + return FutureUtils.completedExceptionally(e); } + } + + private boolean allocateSlotForJob( + JobID jobId, + SlotID slotId, + AllocationID allocationId, + ResourceProfile resourceProfile, + String targetAddress) + throws SlotAllocationException { + allocateSlot(slotId, jobId, allocationId, resourceProfile); final JobTable.Job job; @@ -1109,15 +1127,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { onFatalError(new Exception("Could not free slot " + slotId)); } - return FutureUtils.completedExceptionally( - new SlotAllocationException("Could not create new job.", e)); - } - - if (job.isConnected()) { - offerSlotsToJobManager(jobId); + throw new SlotAllocationException("Could not create new job.", e); } - return CompletableFuture.completedFuture(Acknowledge.get()); + return job.isConnected(); } private TaskExecutorJobServices registerNewJobAndCreateServices( @@ -2052,19 +2065,14 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { final Set<AllocationID> allocatedSlots = new HashSet<>(); for (SlotAllocationSnapshot slotAllocationSnapshot : slotAllocationSnapshots) { try { - allocateSlot( - slotAllocationSnapshot.getSlotID(), + allocateSlotForJob( slotAllocationSnapshot.getJobId(), + slotAllocationSnapshot.getSlotID(), slotAllocationSnapshot.getAllocationId(), - slotAllocationSnapshot.getResourceProfile()); + slotAllocationSnapshot.getResourceProfile(), + slotAllocationSnapshot.getJobTargetAddress()); - jobTable.getOrCreateJob( - slotAllocationSnapshot.getJobId(), - () -> - registerNewJobAndCreateServices( - slotAllocationSnapshot.getJobId(), - slotAllocationSnapshot.getJobTargetAddress())); - } catch (Exception e) { + } catch (SlotAllocationException e) { log.debug("Cannot reallocate restored slot {}.", slotAllocationSnapshot, e); }