XComp commented on code in PR #28201:
URL: https://github.com/apache/flink/pull/28201#discussion_r3371877747


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -241,29 +254,83 @@ public boolean isInitialized() {
 
     @Override
     public void grantLeadership(UUID leaderSessionID) {
-        runIfStateRunning(
-                () -> startJobMasterServiceProcessAsync(leaderSessionID),
-                "starting a new JobMasterServiceProcess");
+        synchronized (lock) {

Review Comment:
   why are you inlining the `runIfStateRunning` method here instead of keeping 
it? We just have to edit the callback that's passed in, don't we?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -355,17 +426,48 @@ private void confirmLeadership(
     private void forwardResultFuture(
             UUID leaderSessionId, CompletableFuture<JobManagerRunnerResult> 
resultFuture) {
         resultFuture.whenComplete(
-                (jobManagerRunnerResult, throwable) ->
-                        runIfValidLeader(
-                                leaderSessionId,
-                                () -> onJobCompletion(jobManagerRunnerResult, 
throwable),
-                                "result future forwarding"));
+                (jobManagerRunnerResult, throwable) -> {
+                    rememberGloballyTerminalResultIfCurrentProcess(
+                            leaderSessionId, jobManagerRunnerResult);
+                    runIfValidLeader(
+                            leaderSessionId,
+                            () -> onJobCompletion(jobManagerRunnerResult, 
throwable),
+                            "result future forwarding");
+                });
+    }
+
+    private void rememberGloballyTerminalResultIfCurrentProcess(

Review Comment:
   ```suggestion
       private void cacheGloballyTerminalResultIfCurrentProcess(
   ```
   nit: maybe, stick to one term here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -241,29 +254,83 @@ public boolean isInitialized() {
 
     @Override
     public void grantLeadership(UUID leaderSessionID) {
-        runIfStateRunning(
-                () -> startJobMasterServiceProcessAsync(leaderSessionID),
-                "starting a new JobMasterServiceProcess");
+        synchronized (lock) {
+            if (!isRunning()) {
+                LOG.debug(
+                        "Ignore 'starting a new JobMasterServiceProcess' 
because the leadership runner is no longer running.");
+                return;
+            }
+            sequentialOperation =
+                    sequentialOperation.thenCompose(
+                            unused -> 
flushPendingOrStartNewProcessAsync(leaderSessionID));
+            handleAsyncOperationError(sequentialOperation, "Could not start 
the job manager.");
+        }
+    }
+
+    private CompletableFuture<Void> flushPendingOrStartNewProcessAsync(UUID 
leaderSessionId) {

Review Comment:
   ```suggestion
       private CompletableFuture<Void> 
handledCachedGloballyTerminalJobOrStartNewProcessAsync(UUID leaderSessionId) {
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -355,17 +426,48 @@ private void confirmLeadership(
     private void forwardResultFuture(
             UUID leaderSessionId, CompletableFuture<JobManagerRunnerResult> 
resultFuture) {
         resultFuture.whenComplete(
-                (jobManagerRunnerResult, throwable) ->
-                        runIfValidLeader(
-                                leaderSessionId,
-                                () -> onJobCompletion(jobManagerRunnerResult, 
throwable),
-                                "result future forwarding"));
+                (jobManagerRunnerResult, throwable) -> {
+                    rememberGloballyTerminalResultIfCurrentProcess(

Review Comment:
   ```suggestion
                       // The JobManagerResult needs to be cached for the 
current process even if the leadership was lost
                       rememberGloballyTerminalResultIfCurrentProcess(
   ```
   we should make it explicit why we're not guarding this method call with via 
leader election grant



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -355,17 +426,48 @@ private void confirmLeadership(
     private void forwardResultFuture(
             UUID leaderSessionId, CompletableFuture<JobManagerRunnerResult> 
resultFuture) {
         resultFuture.whenComplete(
-                (jobManagerRunnerResult, throwable) ->
-                        runIfValidLeader(
-                                leaderSessionId,
-                                () -> onJobCompletion(jobManagerRunnerResult, 
throwable),
-                                "result future forwarding"));
+                (jobManagerRunnerResult, throwable) -> {
+                    rememberGloballyTerminalResultIfCurrentProcess(
+                            leaderSessionId, jobManagerRunnerResult);
+                    runIfValidLeader(
+                            leaderSessionId,
+                            () -> onJobCompletion(jobManagerRunnerResult, 
throwable),
+                            "result future forwarding");
+                });
+    }
+
+    private void rememberGloballyTerminalResultIfCurrentProcess(

Review Comment:
   ```suggestion
       private void cacheGloballyTerminalResultIfCurrentProcess(
   ```
   nit: maybe, stick to one term here.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java:
##########
@@ -246,17 +249,26 @@ void 
testJobMasterCreationFailureCompletesJobManagerRunnerWithInitializationErro
 
     @Nonnull
     private ExecutionGraphInfo createFailedExecutionGraphInfo(FlinkException 
testException) {
+        return createExecutionGraphInfo(JobStatus.FAILED, testException);
+    }
+
+    @Nonnull

Review Comment:
   nit: Let's remove the @Nonnull annotations instead of introducing a new one. 
The non-null case should be the default one.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -241,29 +254,83 @@ public boolean isInitialized() {
 
     @Override
     public void grantLeadership(UUID leaderSessionID) {
-        runIfStateRunning(
-                () -> startJobMasterServiceProcessAsync(leaderSessionID),
-                "starting a new JobMasterServiceProcess");
+        synchronized (lock) {
+            if (!isRunning()) {
+                LOG.debug(
+                        "Ignore 'starting a new JobMasterServiceProcess' 
because the leadership runner is no longer running.");
+                return;
+            }
+            sequentialOperation =
+                    sequentialOperation.thenCompose(
+                            unused -> 
flushPendingOrStartNewProcessAsync(leaderSessionID));
+            handleAsyncOperationError(sequentialOperation, "Could not start 
the job manager.");
+        }
+    }
+
+    private CompletableFuture<Void> flushPendingOrStartNewProcessAsync(UUID 
leaderSessionId) {
+        final JobManagerRunnerResult cachedTerminalResult;
+        synchronized (lock) {
+            if (!isRunning()) {
+                return FutureUtils.completedVoidFuture();
+            }
+            cachedTerminalResult = takePendingTerminalResult();
+            if (cachedTerminalResult != null) {
+                state = State.JOB_COMPLETED;
+            }
+        }
+
+        if (cachedTerminalResult != null) {
+            LOG.info(
+                    "Flushing previously observed globally terminal result for 
job {} on re-grant; not starting a new {}. Job state: {}.",
+                    getJobID(),
+                    JobMasterServiceProcess.class.getSimpleName(),
+                    cachedTerminalResult
+                            .getExecutionGraphInfo()
+                            .getArchivedExecutionGraph()
+                            .getState());
+            resultFuture.complete(cachedTerminalResult);
+            return FutureUtils.completedVoidFuture();
+        }
+
+        return jobResultStore
+                .hasJobResultEntryAsync(getJobID())
+                .thenCompose(
+                        hasJobResult ->
+                                hasJobResult
+                                        ? 
handleJobAlreadyDoneIfValidLeader(leaderSessionId)
+                                        : 
createNewJobMasterServiceProcessIfValidLeader(
+                                                leaderSessionId));
     }
 
     @GuardedBy("lock")
-    private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
-        sequentialOperation =
-                sequentialOperation.thenCompose(
-                        unused ->
-                                jobResultStore
-                                        .hasJobResultEntryAsync(getJobID())
-                                        .thenCompose(
-                                                hasJobResult -> {
-                                                    if (hasJobResult) {
-                                                        return 
handleJobAlreadyDoneIfValidLeader(
-                                                                
leaderSessionId);
-                                                    } else {
-                                                        return 
createNewJobMasterServiceProcessIfValidLeader(
-                                                                
leaderSessionId);
-                                                    }
-                                                }));
-        handleAsyncOperationError(sequentialOperation, "Could not start the 
job manager.");
+    private JobManagerRunnerResult takePendingTerminalResult() {
+        final JobManagerRunnerResult terminalResult = pendingTerminalResult;
+        pendingTerminalResult = null;
+        if (terminalResult != null) {
+            currentJobMasterServiceProcessLeaderId = null;
+        }
+        return terminalResult;
+    }
+
+    private void completeResultFutureAfterClose() {
+        JobManagerRunnerResult closeResult;
+        synchronized (lock) {
+            closeResult = takePendingTerminalResult();
+            if (closeResult == null) {
+                closeResult =
+                        JobManagerRunnerResult.forSuccess(
+                                
createExecutionGraphInfoWithJobStatus(JobStatus.SUSPENDED));
+                currentJobMasterServiceProcessLeaderId = null;

Review Comment:
   we reset `currentJobMasterServiceProcessLeaderId` here if `closeResult` is 
null (i.e. no terminal result is cached) and in `takePendingTerminalResult` if 
there is a terminal result cached. Can we remove the reset in 
`takePendingTerminalResult` and move the reset in this cache out of the if 
block?
   
   That makes `takePendingTerminalResult` a single-purpose method and makes the 
code easier.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -62,13 +62,21 @@
  * <p>All leadership operations are serialized. This means that granting the 
leadership has to
  * complete before the leadership can be revoked and vice versa.
  *
- * <p>The {@link #resultFuture} can be completed with the following values: * *
+ * <p>The {@link #resultFuture} can be completed with the following values:
  *
  * <ul>
  *   <li>{@link JobManagerRunnerResult} to signal an initialization failure of 
the {@link
- *       JobMasterService} or the completion of a job
+ *       JobMasterService}, the completion of a job, or a globally terminal 
result observed before
+ *       leadership revocation could be forwarded
  *   <li>{@link Exception} to signal an unexpected failure
  * </ul>
+ *
+ * <p>To close the race between a globally terminal result and a leadership 
revocation that strips
+ * the forwarded result (see FLINK-39704), terminal results are cached in 
{@link
+ * #pendingTerminalResult} the moment they are observed. The cache is 
populated by {@link
+ * #rememberGloballyTerminalResultIfCurrentProcess}, drained by either {@link 
#grantLeadership} (on
+ * re-grant) or {@link #completeResultFutureAfterClose} (on close), and 
cleared by {@link
+ * #onJobCompletion} when forwarding succeeds normally.

Review Comment:
   I feel like this paragraph shouldn't live in the classes' JavaDoc. It's kind 
of an internal detail. Can we move the comment next to the newly added fields 
instead? WDYT?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -241,29 +254,83 @@ public boolean isInitialized() {
 
     @Override
     public void grantLeadership(UUID leaderSessionID) {
-        runIfStateRunning(
-                () -> startJobMasterServiceProcessAsync(leaderSessionID),
-                "starting a new JobMasterServiceProcess");
+        synchronized (lock) {
+            if (!isRunning()) {
+                LOG.debug(
+                        "Ignore 'starting a new JobMasterServiceProcess' 
because the leadership runner is no longer running.");
+                return;
+            }
+            sequentialOperation =
+                    sequentialOperation.thenCompose(
+                            unused -> 
flushPendingOrStartNewProcessAsync(leaderSessionID));
+            handleAsyncOperationError(sequentialOperation, "Could not start 
the job manager.");
+        }
+    }
+
+    private CompletableFuture<Void> flushPendingOrStartNewProcessAsync(UUID 
leaderSessionId) {
+        final JobManagerRunnerResult cachedTerminalResult;
+        synchronized (lock) {
+            if (!isRunning()) {
+                return FutureUtils.completedVoidFuture();
+            }
+            cachedTerminalResult = takePendingTerminalResult();
+            if (cachedTerminalResult != null) {
+                state = State.JOB_COMPLETED;
+            }
+        }
+
+        if (cachedTerminalResult != null) {
+            LOG.info(
+                    "Flushing previously observed globally terminal result for 
job {} on re-grant; not starting a new {}. Job state: {}.",
+                    getJobID(),
+                    JobMasterServiceProcess.class.getSimpleName(),
+                    cachedTerminalResult
+                            .getExecutionGraphInfo()
+                            .getArchivedExecutionGraph()
+                            .getState());
+            resultFuture.complete(cachedTerminalResult);
+            return FutureUtils.completedVoidFuture();
+        }
+
+        return jobResultStore
+                .hasJobResultEntryAsync(getJobID())
+                .thenCompose(
+                        hasJobResult ->
+                                hasJobResult
+                                        ? 
handleJobAlreadyDoneIfValidLeader(leaderSessionId)
+                                        : 
createNewJobMasterServiceProcessIfValidLeader(
+                                                leaderSessionId));
     }
 
     @GuardedBy("lock")
-    private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
-        sequentialOperation =
-                sequentialOperation.thenCompose(
-                        unused ->
-                                jobResultStore
-                                        .hasJobResultEntryAsync(getJobID())
-                                        .thenCompose(
-                                                hasJobResult -> {
-                                                    if (hasJobResult) {
-                                                        return 
handleJobAlreadyDoneIfValidLeader(
-                                                                
leaderSessionId);
-                                                    } else {
-                                                        return 
createNewJobMasterServiceProcessIfValidLeader(
-                                                                
leaderSessionId);
-                                                    }
-                                                }));
-        handleAsyncOperationError(sequentialOperation, "Could not start the 
job manager.");
+    private JobManagerRunnerResult takePendingTerminalResult() {
+        final JobManagerRunnerResult terminalResult = pendingTerminalResult;
+        pendingTerminalResult = null;
+        if (terminalResult != null) {
+            currentJobMasterServiceProcessLeaderId = null;
+        }
+        return terminalResult;
+    }
+
+    private void completeResultFutureAfterClose() {
+        JobManagerRunnerResult closeResult;
+        synchronized (lock) {
+            closeResult = takePendingTerminalResult();
+            if (closeResult == null) {
+                closeResult =
+                        JobManagerRunnerResult.forSuccess(
+                                
createExecutionGraphInfoWithJobStatus(JobStatus.SUSPENDED));
+                currentJobMasterServiceProcessLeaderId = null;

Review Comment:
   ```suggestion
                   // resetting leader ID to handle concurrent 
rememberGloballyTerminalResultIfCurrentProcess call from updating the 
JobManagerResult
                   currentJobMasterServiceProcessLeaderId = null;
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java:
##########
@@ -268,6 +268,34 @@ void testSuccessOnTerminalState() {
                                         == JobStatus.FINISHED);
     }
 
+    @Test

Review Comment:
   test coverage: We lack a test which verifies that we do not update the 
result if the leaderSessionId is wrong (essentially, the else branch of [this 
if 
condition](https://github.com/apache/flink/blob/3b56b6c4afaecf81e6130226bd0e54030656a627/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java#L445)).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -355,17 +426,48 @@ private void confirmLeadership(
     private void forwardResultFuture(
             UUID leaderSessionId, CompletableFuture<JobManagerRunnerResult> 
resultFuture) {
         resultFuture.whenComplete(
-                (jobManagerRunnerResult, throwable) ->
-                        runIfValidLeader(
-                                leaderSessionId,
-                                () -> onJobCompletion(jobManagerRunnerResult, 
throwable),
-                                "result future forwarding"));
+                (jobManagerRunnerResult, throwable) -> {
+                    rememberGloballyTerminalResultIfCurrentProcess(
+                            leaderSessionId, jobManagerRunnerResult);
+                    runIfValidLeader(
+                            leaderSessionId,
+                            () -> onJobCompletion(jobManagerRunnerResult, 
throwable),
+                            "result future forwarding");
+                });
+    }
+
+    private void rememberGloballyTerminalResultIfCurrentProcess(
+            UUID leaderSessionId, JobManagerRunnerResult 
jobManagerRunnerResult) {
+        synchronized (lock) {
+            if (resultFuture.isDone()) {
+                return;
+            }
+            if (leaderSessionId.equals(currentJobMasterServiceProcessLeaderId)
+                    && isGloballyTerminalResult(jobManagerRunnerResult)) {

Review Comment:
   ```suggestion
                       // initialization failures should still result in the 
job being suspended - no need to set cache JobManagerRunnerResult to trigger 
job startup retry after failover
                       && isGloballyTerminalResult(jobManagerRunnerResult)) {
   ```
   it's probably worth adding some context on the why here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -241,29 +254,83 @@ public boolean isInitialized() {
 
     @Override
     public void grantLeadership(UUID leaderSessionID) {
-        runIfStateRunning(
-                () -> startJobMasterServiceProcessAsync(leaderSessionID),
-                "starting a new JobMasterServiceProcess");
+        synchronized (lock) {
+            if (!isRunning()) {
+                LOG.debug(
+                        "Ignore 'starting a new JobMasterServiceProcess' 
because the leadership runner is no longer running.");
+                return;
+            }
+            sequentialOperation =
+                    sequentialOperation.thenCompose(
+                            unused -> 
flushPendingOrStartNewProcessAsync(leaderSessionID));
+            handleAsyncOperationError(sequentialOperation, "Could not start 
the job manager.");
+        }
+    }
+
+    private CompletableFuture<Void> flushPendingOrStartNewProcessAsync(UUID 
leaderSessionId) {
+        final JobManagerRunnerResult cachedTerminalResult;
+        synchronized (lock) {
+            if (!isRunning()) {
+                return FutureUtils.completedVoidFuture();
+            }
+            cachedTerminalResult = takePendingTerminalResult();
+            if (cachedTerminalResult != null) {
+                state = State.JOB_COMPLETED;
+            }
+        }
+
+        if (cachedTerminalResult != null) {
+            LOG.info(
+                    "Flushing previously observed globally terminal result for 
job {} on re-grant; not starting a new {}. Job state: {}.",
+                    getJobID(),
+                    JobMasterServiceProcess.class.getSimpleName(),
+                    cachedTerminalResult
+                            .getExecutionGraphInfo()
+                            .getArchivedExecutionGraph()
+                            .getState());
+            resultFuture.complete(cachedTerminalResult);
+            return FutureUtils.completedVoidFuture();
+        }
+
+        return jobResultStore
+                .hasJobResultEntryAsync(getJobID())
+                .thenCompose(
+                        hasJobResult ->
+                                hasJobResult
+                                        ? 
handleJobAlreadyDoneIfValidLeader(leaderSessionId)
+                                        : 
createNewJobMasterServiceProcessIfValidLeader(
+                                                leaderSessionId));
     }
 
     @GuardedBy("lock")
-    private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
-        sequentialOperation =
-                sequentialOperation.thenCompose(
-                        unused ->
-                                jobResultStore
-                                        .hasJobResultEntryAsync(getJobID())
-                                        .thenCompose(
-                                                hasJobResult -> {
-                                                    if (hasJobResult) {
-                                                        return 
handleJobAlreadyDoneIfValidLeader(
-                                                                
leaderSessionId);
-                                                    } else {
-                                                        return 
createNewJobMasterServiceProcessIfValidLeader(
-                                                                
leaderSessionId);
-                                                    }
-                                                }));
-        handleAsyncOperationError(sequentialOperation, "Could not start the 
job manager.");
+    private JobManagerRunnerResult takePendingTerminalResult() {
+        final JobManagerRunnerResult terminalResult = pendingTerminalResult;
+        pendingTerminalResult = null;
+        if (terminalResult != null) {
+            currentJobMasterServiceProcessLeaderId = null;

Review Comment:
   is resetting `currentJobMasterServiceProcessLeaderId` actually necessary or 
just added for consistency purposes? 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to