This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push: new 91e1314feda [FLINK-30596][coordination] Fix duplicate jobs when submitting with the same jobId 91e1314feda is described below commit 91e1314fedafc7a23fc239dcbb7907ec2b32d1bb Author: Mohsen Rezaei <7763122+mohsenrezaei...@users.noreply.github.com> AuthorDate: Wed Jun 28 08:45:37 2023 -0700 [FLINK-30596][coordination] Fix duplicate jobs when submitting with the same jobId --- .../flink/runtime/dispatcher/Dispatcher.java | 25 +++++++--- .../flink/runtime/dispatcher/DispatcherTest.java | 53 ++++++++++++++++++++++ 2 files changed, 72 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 1dad8df9ad1..7f09b61795c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -173,6 +173,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> @Nullable private final String metricServiceQueryAddress; private final Map<JobID, CompletableFuture<Void>> jobManagerRunnerTerminationFutures; + private final Set<JobID> submittedAndWaitingTerminationJobIDs; protected final CompletableFuture<ApplicationStatus> shutDownFuture; @@ -278,6 +279,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> this.jobManagerRunnerTerminationFutures = new HashMap<>(INITIAL_JOB_MANAGER_RUNNER_REGISTRY_CAPACITY); + this.submittedAndWaitingTerminationJobIDs = new HashSet<>(); this.shutDownFuture = new CompletableFuture<>(); @@ -544,14 +546,16 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> } /** - * Checks whether the given job has already been submitted or executed. + * Checks whether the given job has already been submitted, executed, or awaiting termination. * * @param jobId identifying the submitted job * @return true if the job has already been submitted (is running) or has been executed * @throws FlinkException if the job scheduling status cannot be retrieved */ private boolean isDuplicateJob(JobID jobId) throws FlinkException { - return isInGloballyTerminalState(jobId) || jobManagerRunnerRegistry.isRegistered(jobId); + return isInGloballyTerminalState(jobId) + || jobManagerRunnerRegistry.isRegistered(jobId) + || submittedAndWaitingTerminationJobIDs.contains(jobId); } /** @@ -593,9 +597,17 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) { applyParallelismOverrides(jobGraph); log.info("Submitting job '{}' ({}).", jobGraph.getName(), jobGraph.getJobID()); + + // track as an outstanding job + submittedAndWaitingTerminationJobIDs.add(jobGraph.getJobID()); + return waitForTerminatingJob(jobGraph.getJobID(), jobGraph, this::persistAndRunJob) .handle((ignored, throwable) -> handleTermination(jobGraph.getJobID(), throwable)) - .thenCompose(Function.identity()); + .thenCompose(Function.identity()) + .whenComplete( + (ignored, throwable) -> + // job is done processing, whether failed or finished + submittedAndWaitingTerminationJobIDs.remove(jobGraph.getJobID())); } private CompletableFuture<Acknowledge> handleTermination( @@ -1424,13 +1436,14 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> throwable)); }); - return jobManagerTerminationFuture.thenAcceptAsync( + return FutureUtils.thenAcceptAsyncIfNotDone( + jobManagerTerminationFuture, + getMainThreadExecutor(), FunctionUtils.uncheckedConsumer( (ignored) -> { jobManagerRunnerTerminationFutures.remove(jobId); action.accept(jobGraph); - }), - getMainThreadExecutor()); + })); } @VisibleForTesting diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index f3f30e5b440..fe541bf7161 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -111,6 +111,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -229,6 +230,58 @@ public class DispatcherTest extends AbstractDispatcherTest { assertDuplicateJobSubmission(); } + @Test + public void testDuplicateJobSubmissionIsDetectedOnSimultaneousSubmission() throws Exception { + dispatcher = + createAndStartDispatcher( + heartbeatServices, + haServices, + new TestingJobMasterServiceLeadershipRunnerFactory()); + final DispatcherGateway dispatcherGateway = + dispatcher.getSelfGateway(DispatcherGateway.class); + + final int numThreads = 5; + final CountDownLatch prepareLatch = new CountDownLatch(numThreads); + final OneShotLatch startLatch = new OneShotLatch(); + + final Collection<Throwable> exceptions = Collections.synchronizedList(new ArrayList<>()); + final Collection<Thread> threads = new ArrayList<>(); + for (int x = 0; x < numThreads; x++) { + threads.add( + new Thread( + () -> { + try { + prepareLatch.countDown(); + startLatch.await(); + dispatcherGateway.submitJob(jobGraph, TIMEOUT).join(); + } catch (Throwable t) { + exceptions.add(t); + } + })); + } + + // start worker threads and trigger job submissions + threads.forEach(Thread::start); + prepareLatch.await(); + startLatch.trigger(); + + // wait for the job submissions to happen + for (Thread thread : threads) { + thread.join(); + } + + // verify the job was actually submitted + dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).join(); + + // verify that all but one submission failed as duplicates + Assertions.assertThat(exceptions) + .hasSize(numThreads - 1) + .allSatisfy( + t -> + Assertions.assertThat(t) + .hasCauseInstanceOf(DuplicateJobSubmissionException.class)); + } + private void assertDuplicateJobSubmission() throws Exception { dispatcher = createAndStartDispatcher(