This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d4464005416e317bd6974a78948d5a727e8baffc Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Thu Mar 30 14:44:39 2023 +0200 [hotfix] Remove unused createdJobManagerRunnerLatch --- .../flink/runtime/dispatcher/DispatcherTest.java | 46 ++++++---------------- 1 file changed, 12 insertions(+), 34 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index f3f30e5b440..9849ecd1dba 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -119,7 +119,6 @@ import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -146,8 +145,6 @@ public class DispatcherTest extends AbstractDispatcherTest { private TestingLeaderElectionService jobMasterLeaderElectionService; - private CountDownLatch createdJobManagerRunnerLatch; - /** Instance under test. */ private TestingDispatcher dispatcher; @@ -158,7 +155,6 @@ public class DispatcherTest extends AbstractDispatcherTest { jobId = jobGraph.getJobID(); jobMasterLeaderElectionService = new TestingLeaderElectionService(); haServices.setJobMasterLeaderElectionService(jobId, jobMasterLeaderElectionService); - createdJobManagerRunnerLatch = new CountDownLatch(2); } @Nonnull @@ -196,8 +192,7 @@ public class DispatcherTest extends AbstractDispatcherTest { createAndStartDispatcher( heartbeatServices, haServices, - new ExpectedJobIdJobManagerRunnerFactory( - jobId, createdJobManagerRunnerLatch)); + new ExpectedJobIdJobManagerRunnerFactory(jobId)); DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); @@ -234,8 +229,7 @@ public class DispatcherTest extends AbstractDispatcherTest { createAndStartDispatcher( heartbeatServices, haServices, - new ExpectedJobIdJobManagerRunnerFactory( - jobId, createdJobManagerRunnerLatch)); + new ExpectedJobIdJobManagerRunnerFactory(jobId)); final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); final CompletableFuture<Acknowledge> submitFuture = @@ -252,9 +246,7 @@ public class DispatcherTest extends AbstractDispatcherTest { public void testDuplicateJobSubmissionWithRunningJobId() throws Exception { dispatcher = createTestingDispatcherBuilder() - .setJobManagerRunnerFactory( - new ExpectedJobIdJobManagerRunnerFactory( - jobId, createdJobManagerRunnerLatch)) + .setJobManagerRunnerFactory(new ExpectedJobIdJobManagerRunnerFactory(jobId)) .setRecoveredJobs(Collections.singleton(jobGraph)) .build(rpcService); dispatcher.start(); @@ -291,8 +283,7 @@ public class DispatcherTest extends AbstractDispatcherTest { createAndStartDispatcher( heartbeatServices, haServices, - new ExpectedJobIdJobManagerRunnerFactory( - jobId, createdJobManagerRunnerLatch)); + new ExpectedJobIdJobManagerRunnerFactory(jobId)); DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); @@ -371,8 +362,7 @@ public class DispatcherTest extends AbstractDispatcherTest { createAndStartDispatcher( heartbeatServices, haServices, - new ExpectedJobIdJobManagerRunnerFactory( - jobId, createdJobManagerRunnerLatch)); + new ExpectedJobIdJobManagerRunnerFactory(jobId)); jobMasterLeaderElectionService.isLeader(UUID.randomUUID()); DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); @@ -571,8 +561,7 @@ public class DispatcherTest extends AbstractDispatcherTest { createAndStartDispatcher( heartbeatServices, haServices, - new ExpectedJobIdJobManagerRunnerFactory( - jobId, createdJobManagerRunnerLatch)); + new ExpectedJobIdJobManagerRunnerFactory(jobId)); final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); @@ -606,8 +595,7 @@ public class DispatcherTest extends AbstractDispatcherTest { createAndStartDispatcher( heartbeatServices, haServices, - new ExpectedJobIdJobManagerRunnerFactory( - jobId, createdJobManagerRunnerLatch)); + new ExpectedJobIdJobManagerRunnerFactory(jobId)); final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); @@ -629,8 +617,7 @@ public class DispatcherTest extends AbstractDispatcherTest { createAndStartDispatcher( heartbeatServices, haServices, - new ExpectedJobIdJobManagerRunnerFactory( - jobId, createdJobManagerRunnerLatch)); + new ExpectedJobIdJobManagerRunnerFactory(jobId)); final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); @@ -818,8 +805,7 @@ public class DispatcherTest extends AbstractDispatcherTest { createAndStartDispatcher( heartbeatServices, haServices, - new ExpectedJobIdJobManagerRunnerFactory( - jobId, createdJobManagerRunnerLatch)); + new ExpectedJobIdJobManagerRunnerFactory(jobId)); DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); @@ -1058,8 +1044,7 @@ public class DispatcherTest extends AbstractDispatcherTest { createAndStartDispatcher( heartbeatServices, haServices, - new ExpectedJobIdJobManagerRunnerFactory( - jobId, createdJobManagerRunnerLatch)); + new ExpectedJobIdJobManagerRunnerFactory(jobId)); DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); Assert.assertEquals(jobGraph.findVertexByID(v1.getID()).getParallelism(), 1); @@ -1138,8 +1123,7 @@ public class DispatcherTest extends AbstractDispatcherTest { createAndStartDispatcher( heartbeatServices, haServices, - new ExpectedJobIdJobManagerRunnerFactory( - jobId, createdJobManagerRunnerLatch)); + new ExpectedJobIdJobManagerRunnerFactory(jobId)); final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); final JobID failedJobId = new JobID(); @@ -1359,12 +1343,8 @@ public class DispatcherTest extends AbstractDispatcherTest { private final JobID expectedJobId; - private final CountDownLatch createdJobManagerRunnerLatch; - - private ExpectedJobIdJobManagerRunnerFactory( - JobID expectedJobId, CountDownLatch createdJobManagerRunnerLatch) { + private ExpectedJobIdJobManagerRunnerFactory(JobID expectedJobId) { this.expectedJobId = expectedJobId; - this.createdJobManagerRunnerLatch = createdJobManagerRunnerLatch; } @Override @@ -1381,8 +1361,6 @@ public class DispatcherTest extends AbstractDispatcherTest { throws Exception { assertEquals(expectedJobId, jobGraph.getJobID()); - createdJobManagerRunnerLatch.countDown(); - return JobMasterServiceLeadershipRunnerFactory.INSTANCE.createJobManagerRunner( jobGraph, configuration,