Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199198445 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java --- @@ -71,20 +112,131 @@ public void testSerializationFailureHandling() throws Exception { @Test public void testSuccessfulJobSubmission() throws Exception { + final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath(); + try (OutputStream fileOut = Files.newOutputStream(jobGraphFile)) { + try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) { + objectOut.writeObject(new JobGraph("testjob")); + } + } + DispatcherGateway mockGateway = mock(DispatcherGateway.class); + when(mockGateway.getHostname()).thenReturn("localhost"); + when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort())); when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class); JobSubmitHandler handler = new JobSubmitHandler( CompletableFuture.completedFuture("http://localhost:1234"), mockGatewayRetriever, RpcUtils.INF_TIMEOUT, - Collections.emptyMap()); + Collections.emptyMap(), + TestingUtils.defaultExecutor()); - JobGraph job = new JobGraph("testjob"); - JobSubmitRequestBody request = new JobSubmitRequestBody(job); + JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList(), Collections.emptyList()); - handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway) + handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance(), Collections.emptyMap(), Collections.emptyMap(), Collections.singleton(jobGraphFile.toFile())), mockGateway) .get(); } + + @Test + public void testRejectionOnCountMismatch() throws Exception { + final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath(); + try (OutputStream fileOut = Files.newOutputStream(jobGraphFile)) { + try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) { + objectOut.writeObject(new JobGraph("testjob")); + } + } + final Path countExceedingFile = TEMPORARY_FOLDER.newFile().toPath(); + + DispatcherGateway mockGateway = mock(DispatcherGateway.class); --- End diff -- Maybe we could replace it by the `TestingDispatcherGateway`
---