This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push: new d3977d2 [FLINK-20195][coordination] Deduplicate jobs for overview d3977d2 is described below commit d3977d2912ed284aa1f7cd7c216232f9bcff73e5 Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Sun Dec 12 12:20:41 2021 +0100 [FLINK-20195][coordination] Deduplicate jobs for overview --- .../flink/runtime/dispatcher/Dispatcher.java | 9 +- .../flink/runtime/dispatcher/DispatcherTest.java | 129 +++++++++++++++++++++ .../runtime/jobmaster/TestingJobManagerRunner.java | 24 +++- 3 files changed, 154 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index e08fe58..9560612 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -580,13 +580,12 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher return combinedJobDetails.thenApply( (Collection<JobDetails> runningJobDetails) -> { - final Collection<JobDetails> allJobDetails = - new ArrayList<>(completedJobDetails.size() + runningJobDetails.size()); + final Map<JobID, JobDetails> deduplicatedJobs = new HashMap<>(); - allJobDetails.addAll(runningJobDetails); - allJobDetails.addAll(completedJobDetails); + completedJobDetails.forEach(job -> deduplicatedJobs.put(job.getJobId(), job)); + runningJobDetails.forEach(job -> deduplicatedJobs.put(job.getJobId(), job)); - return new MultipleJobsDetails(allJobDetails); + return new MultipleJobsDetails(deduplicatedJobs.values()); }); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 3923565..fb36a9e 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -61,6 +61,7 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -97,10 +98,13 @@ import java.net.URISyntaxException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Optional; +import java.util.Queue; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -815,6 +819,107 @@ public class DispatcherTest extends AbstractDispatcherTest { } @Test + public void testRequestMultipleJobDetails_returnsSuspendedJobs() throws Exception { + final JobManagerRunnerFactory blockingJobMaster = + new QueuedJobManagerRunnerFactory( + completedJobManagerRunnerWithJobStatus(JobStatus.SUSPENDED)); + + dispatcher = createAndStartDispatcher(heartbeatServices, haServices, blockingJobMaster); + DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); + jobMasterLeaderElectionService.isLeader(UUID.randomUUID()); + + dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); + dispatcherGateway.requestJobResult(jobId, TIMEOUT).get(); + + assertOnlyContainsSingleJobWithState( + JobStatus.SUSPENDED, dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get()); + } + + @Test + public void testRequestMultipleJobDetails_returnsRunningOverSuspendedJob() throws Exception { + final JobManagerRunnerFactory blockingJobMaster = + new QueuedJobManagerRunnerFactory( + completedJobManagerRunnerWithJobStatus(JobStatus.SUSPENDED), + runningJobManagerRunnerWithJobStatus(JobStatus.RUNNING)); + + dispatcher = createAndStartDispatcher(heartbeatServices, haServices, blockingJobMaster); + DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); + jobMasterLeaderElectionService.isLeader(UUID.randomUUID()); + + // run first job, which completes with SUSPENDED + dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); + dispatcherGateway.requestJobResult(jobId, TIMEOUT).get(); + + // run second job, which stays in RUNNING + dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); + + assertOnlyContainsSingleJobWithState( + JobStatus.RUNNING, dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get()); + } + + @Test + public void testRequestMultipleJobDetails_returnsFinishedOverSuspendedJob() throws Exception { + final JobManagerRunnerFactory blockingJobMaster = + new QueuedJobManagerRunnerFactory( + completedJobManagerRunnerWithJobStatus(JobStatus.SUSPENDED), + completedJobManagerRunnerWithJobStatus(JobStatus.FINISHED)); + + dispatcher = createAndStartDispatcher(heartbeatServices, haServices, blockingJobMaster); + DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); + jobMasterLeaderElectionService.isLeader(UUID.randomUUID()); + + // run first job, which completes with SUSPENDED + dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); + dispatcherGateway.requestJobResult(jobId, TIMEOUT).get(); + + // run second job, which completes with FINISHED + dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); + + assertOnlyContainsSingleJobWithState( + JobStatus.FINISHED, dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get()); + } + + private JobManagerRunner runningJobManagerRunnerWithJobStatus( + final JobStatus currentJobStatus) { + Preconditions.checkArgument(!currentJobStatus.isTerminalState()); + + return TestingJobManagerRunner.newBuilder() + .setJobId(jobId) + .setJobDetailsFunction( + () -> + JobDetails.createDetailsForJob( + new ArchivedExecutionGraphBuilder() + .setJobID(jobId) + .setState(currentJobStatus) + .build())) + .build(); + } + + private JobManagerRunner completedJobManagerRunnerWithJobStatus( + final JobStatus finalJobStatus) { + Preconditions.checkArgument(finalJobStatus.isTerminalState()); + + return TestingJobManagerRunner.newBuilder() + .setJobId(jobId) + .setResultFuture( + CompletableFuture.completedFuture( + JobManagerRunnerResult.forSuccess( + new ExecutionGraphInfo( + new ArchivedExecutionGraphBuilder() + .setJobID(jobId) + .setState(finalJobStatus) + .build())))) + .build(); + } + + private static void assertOnlyContainsSingleJobWithState( + final JobStatus expectedJobStatus, final MultipleJobsDetails multipleJobsDetails) { + final Collection<JobDetails> finishedJobDetails = multipleJobsDetails.getJobs(); + assertEquals(1, finishedJobDetails.size()); + assertEquals(expectedJobStatus, finishedJobDetails.iterator().next().getStatus()); + } + + @Test public void testJobDataAreCleanedUpInCorrectOrderOnFinishedJob() throws Exception { testJobDataAreCleanedUpInCorrectOrder(JobStatus.FINISHED); } @@ -1179,6 +1284,30 @@ public class DispatcherTest extends AbstractDispatcherTest { } } + private static class QueuedJobManagerRunnerFactory implements JobManagerRunnerFactory { + + private final Queue<JobManagerRunner> resultFutureQueue; + + private QueuedJobManagerRunnerFactory(JobManagerRunner... resultFutureQueue) { + this.resultFutureQueue = new ArrayDeque<>(Arrays.asList(resultFutureQueue)); + } + + @Override + public JobManagerRunner createJobManagerRunner( + JobGraph jobGraph, + Configuration configuration, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + JobManagerSharedServices jobManagerServices, + JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, + FatalErrorHandler fatalErrorHandler, + long initializationTimestamp) + throws Exception { + return resultFutureQueue.remove(); + } + } + private static class FinishingJobManagerRunnerFactory implements JobManagerRunnerFactory { private final CompletableFuture<JobManagerRunnerResult> resultFuture; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java index e6d613c..0c0994d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.util.Preconditions; import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; /** Testing implementation of the {@link JobManagerRunner}. */ public class TestingJobManagerRunner implements JobManagerRunner { @@ -47,6 +48,8 @@ public class TestingJobManagerRunner implements JobManagerRunner { private final CompletableFuture<JobManagerRunnerResult> resultFuture; + private final Supplier<JobDetails> jobDetailsFunction; + private final OneShotLatch closeAsyncCalledLatch = new OneShotLatch(); private JobStatus jobStatus = JobStatus.INITIALIZING; @@ -55,11 +58,13 @@ public class TestingJobManagerRunner implements JobManagerRunner { JobID jobId, boolean blockingTermination, CompletableFuture<JobMasterGateway> jobMasterGatewayFuture, - CompletableFuture<JobManagerRunnerResult> resultFuture) { + CompletableFuture<JobManagerRunnerResult> resultFuture, + Supplier<JobDetails> jobDetailsFunction) { this.jobId = jobId; this.blockingTermination = blockingTermination; this.jobMasterGatewayFuture = jobMasterGatewayFuture; this.resultFuture = resultFuture; + this.jobDetailsFunction = jobDetailsFunction; this.terminationFuture = new CompletableFuture<>(); final ExecutionGraphInfo suspendedExecutionGraphInfo = @@ -103,7 +108,7 @@ public class TestingJobManagerRunner implements JobManagerRunner { @Override public CompletableFuture<JobDetails> requestJobDetails(Time timeout) { - throw new UnsupportedOperationException(); + return CompletableFuture.completedFuture(jobDetailsFunction.get()); } @Override @@ -166,6 +171,10 @@ public class TestingJobManagerRunner implements JobManagerRunner { private CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = new CompletableFuture<>(); private CompletableFuture<JobManagerRunnerResult> resultFuture = new CompletableFuture<>(); + private Supplier<JobDetails> jobDetailsFunction = + () -> { + throw new UnsupportedOperationException(); + }; private Builder() { // No-op. @@ -194,10 +203,19 @@ public class TestingJobManagerRunner implements JobManagerRunner { return this; } + public Builder setJobDetailsFunction(Supplier<JobDetails> jobDetailsFunction) { + this.jobDetailsFunction = Preconditions.checkNotNull(jobDetailsFunction); + return this; + } + public TestingJobManagerRunner build() { Preconditions.checkNotNull(jobId); return new TestingJobManagerRunner( - jobId, blockingTermination, jobMasterGatewayFuture, resultFuture); + jobId, + blockingTermination, + jobMasterGatewayFuture, + resultFuture, + jobDetailsFunction); } } }