This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 00f2c62749206dff95ae6fd02f0e3d8b8530be89
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Sat Feb 5 17:30:23 2022 +0100

    [FLINK-25817] Unify logic between TaskExecutor.requestSlot and 
TaskExecutor.tryLoadLocalAllocationSnapshots
    
    This commit unifies the logic between TaskExecutor.requestSlot and 
TaskExecutor.tryLoadLocalAllocationSnapshots.
    This helps to reduce maintaince costs since both method use the same logic.
    
    This closes #18237.
---
 .../flink/runtime/taskexecutor/TaskExecutor.java   | 48 +++++++++++++---------
 1 file changed, 28 insertions(+), 20 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 7e4afb2..94a2262 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -1080,10 +1080,28 @@ public class TaskExecutor extends RpcEndpoint 
implements TaskExecutorGateway {
                         slotId, jobId, targetAddress, allocationId, 
resourceProfile));
 
         try {
-            allocateSlot(slotId, jobId, allocationId, resourceProfile);
-        } catch (SlotAllocationException sae) {
-            return FutureUtils.completedExceptionally(sae);
+            final boolean isConnected =
+                    allocateSlotForJob(jobId, slotId, allocationId, 
resourceProfile, targetAddress);
+
+            if (isConnected) {
+                offerSlotsToJobManager(jobId);
+            }
+
+            return CompletableFuture.completedFuture(Acknowledge.get());
+        } catch (SlotAllocationException e) {
+            log.debug("Could not allocate slot for allocation id {}.", 
allocationId, e);
+            return FutureUtils.completedExceptionally(e);
         }
+    }
+
+    private boolean allocateSlotForJob(
+            JobID jobId,
+            SlotID slotId,
+            AllocationID allocationId,
+            ResourceProfile resourceProfile,
+            String targetAddress)
+            throws SlotAllocationException {
+        allocateSlot(slotId, jobId, allocationId, resourceProfile);
 
         final JobTable.Job job;
 
@@ -1109,15 +1127,10 @@ public class TaskExecutor extends RpcEndpoint 
implements TaskExecutorGateway {
                 onFatalError(new Exception("Could not free slot " + slotId));
             }
 
-            return FutureUtils.completedExceptionally(
-                    new SlotAllocationException("Could not create new job.", 
e));
-        }
-
-        if (job.isConnected()) {
-            offerSlotsToJobManager(jobId);
+            throw new SlotAllocationException("Could not create new job.", e);
         }
 
-        return CompletableFuture.completedFuture(Acknowledge.get());
+        return job.isConnected();
     }
 
     private TaskExecutorJobServices registerNewJobAndCreateServices(
@@ -2052,19 +2065,14 @@ public class TaskExecutor extends RpcEndpoint 
implements TaskExecutorGateway {
         final Set<AllocationID> allocatedSlots = new HashSet<>();
         for (SlotAllocationSnapshot slotAllocationSnapshot : 
slotAllocationSnapshots) {
             try {
-                allocateSlot(
-                        slotAllocationSnapshot.getSlotID(),
+                allocateSlotForJob(
                         slotAllocationSnapshot.getJobId(),
+                        slotAllocationSnapshot.getSlotID(),
                         slotAllocationSnapshot.getAllocationId(),
-                        slotAllocationSnapshot.getResourceProfile());
+                        slotAllocationSnapshot.getResourceProfile(),
+                        slotAllocationSnapshot.getJobTargetAddress());
 
-                jobTable.getOrCreateJob(
-                        slotAllocationSnapshot.getJobId(),
-                        () ->
-                                registerNewJobAndCreateServices(
-                                        slotAllocationSnapshot.getJobId(),
-                                        
slotAllocationSnapshot.getJobTargetAddress()));
-            } catch (Exception e) {
+            } catch (SlotAllocationException e) {
                 log.debug("Cannot reallocate restored slot {}.", 
slotAllocationSnapshot, e);
             }
 

Reply via email to