This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new 91e1314feda [FLINK-30596][coordination] Fix duplicate jobs when 
submitting with the same jobId
91e1314feda is described below

commit 91e1314fedafc7a23fc239dcbb7907ec2b32d1bb
Author: Mohsen Rezaei <7763122+mohsenrezaei...@users.noreply.github.com>
AuthorDate: Wed Jun 28 08:45:37 2023 -0700

    [FLINK-30596][coordination] Fix duplicate jobs when submitting with the 
same jobId
---
 .../flink/runtime/dispatcher/Dispatcher.java       | 25 +++++++---
 .../flink/runtime/dispatcher/DispatcherTest.java   | 53 ++++++++++++++++++++++
 2 files changed, 72 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 1dad8df9ad1..7f09b61795c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -173,6 +173,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
     @Nullable private final String metricServiceQueryAddress;
 
     private final Map<JobID, CompletableFuture<Void>> 
jobManagerRunnerTerminationFutures;
+    private final Set<JobID> submittedAndWaitingTerminationJobIDs;
 
     protected final CompletableFuture<ApplicationStatus> shutDownFuture;
 
@@ -278,6 +279,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
 
         this.jobManagerRunnerTerminationFutures =
                 new HashMap<>(INITIAL_JOB_MANAGER_RUNNER_REGISTRY_CAPACITY);
+        this.submittedAndWaitingTerminationJobIDs = new HashSet<>();
 
         this.shutDownFuture = new CompletableFuture<>();
 
@@ -544,14 +546,16 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
     }
 
     /**
-     * Checks whether the given job has already been submitted or executed.
+     * Checks whether the given job has already been submitted, executed, or 
awaiting termination.
      *
      * @param jobId identifying the submitted job
      * @return true if the job has already been submitted (is running) or has 
been executed
      * @throws FlinkException if the job scheduling status cannot be retrieved
      */
     private boolean isDuplicateJob(JobID jobId) throws FlinkException {
-        return isInGloballyTerminalState(jobId) || 
jobManagerRunnerRegistry.isRegistered(jobId);
+        return isInGloballyTerminalState(jobId)
+                || jobManagerRunnerRegistry.isRegistered(jobId)
+                || submittedAndWaitingTerminationJobIDs.contains(jobId);
     }
 
     /**
@@ -593,9 +597,17 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
     private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph 
jobGraph) {
         applyParallelismOverrides(jobGraph);
         log.info("Submitting job '{}' ({}).", jobGraph.getName(), 
jobGraph.getJobID());
+
+        // track as an outstanding job
+        submittedAndWaitingTerminationJobIDs.add(jobGraph.getJobID());
+
         return waitForTerminatingJob(jobGraph.getJobID(), jobGraph, 
this::persistAndRunJob)
                 .handle((ignored, throwable) -> 
handleTermination(jobGraph.getJobID(), throwable))
-                .thenCompose(Function.identity());
+                .thenCompose(Function.identity())
+                .whenComplete(
+                        (ignored, throwable) ->
+                                // job is done processing, whether failed or 
finished
+                                
submittedAndWaitingTerminationJobIDs.remove(jobGraph.getJobID()));
     }
 
     private CompletableFuture<Acknowledge> handleTermination(
@@ -1424,13 +1436,14 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                                                     throwable));
                                 });
 
-        return jobManagerTerminationFuture.thenAcceptAsync(
+        return FutureUtils.thenAcceptAsyncIfNotDone(
+                jobManagerTerminationFuture,
+                getMainThreadExecutor(),
                 FunctionUtils.uncheckedConsumer(
                         (ignored) -> {
                             jobManagerRunnerTerminationFutures.remove(jobId);
                             action.accept(jobGraph);
-                        }),
-                getMainThreadExecutor());
+                        }));
     }
 
     @VisibleForTesting
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index f3f30e5b440..fe541bf7161 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -111,6 +111,7 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -229,6 +230,58 @@ public class DispatcherTest extends AbstractDispatcherTest 
{
         assertDuplicateJobSubmission();
     }
 
+    @Test
+    public void testDuplicateJobSubmissionIsDetectedOnSimultaneousSubmission() 
throws Exception {
+        dispatcher =
+                createAndStartDispatcher(
+                        heartbeatServices,
+                        haServices,
+                        new TestingJobMasterServiceLeadershipRunnerFactory());
+        final DispatcherGateway dispatcherGateway =
+                dispatcher.getSelfGateway(DispatcherGateway.class);
+
+        final int numThreads = 5;
+        final CountDownLatch prepareLatch = new CountDownLatch(numThreads);
+        final OneShotLatch startLatch = new OneShotLatch();
+
+        final Collection<Throwable> exceptions = 
Collections.synchronizedList(new ArrayList<>());
+        final Collection<Thread> threads = new ArrayList<>();
+        for (int x = 0; x < numThreads; x++) {
+            threads.add(
+                    new Thread(
+                            () -> {
+                                try {
+                                    prepareLatch.countDown();
+                                    startLatch.await();
+                                    dispatcherGateway.submitJob(jobGraph, 
TIMEOUT).join();
+                                } catch (Throwable t) {
+                                    exceptions.add(t);
+                                }
+                            }));
+        }
+
+        // start worker threads and trigger job submissions
+        threads.forEach(Thread::start);
+        prepareLatch.await();
+        startLatch.trigger();
+
+        // wait for the job submissions to happen
+        for (Thread thread : threads) {
+            thread.join();
+        }
+
+        // verify the job was actually submitted
+        dispatcherGateway.requestJobStatus(jobGraph.getJobID(), 
TIMEOUT).join();
+
+        // verify that all but one submission failed as duplicates
+        Assertions.assertThat(exceptions)
+                .hasSize(numThreads - 1)
+                .allSatisfy(
+                        t ->
+                                Assertions.assertThat(t)
+                                        
.hasCauseInstanceOf(DuplicateJobSubmissionException.class));
+    }
+
     private void assertDuplicateJobSubmission() throws Exception {
         dispatcher =
                 createAndStartDispatcher(

Reply via email to