Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6203#discussion_r199199366
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 ---
    @@ -71,20 +112,131 @@ public void testSerializationFailureHandling() throws 
Exception {
     
        @Test
        public void testSuccessfulJobSubmission() throws Exception {
    +           final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
    +           try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
    +                   try (ObjectOutputStream objectOut = new 
ObjectOutputStream(fileOut)) {
    +                           objectOut.writeObject(new JobGraph("testjob"));
    +                   }
    +           }
    +
                DispatcherGateway mockGateway = mock(DispatcherGateway.class);
    +           when(mockGateway.getHostname()).thenReturn("localhost");
    +           
when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort()));
                when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
                GatewayRetriever<DispatcherGateway> mockGatewayRetriever = 
mock(GatewayRetriever.class);
     
                JobSubmitHandler handler = new JobSubmitHandler(
                        
CompletableFuture.completedFuture("http://localhost:1234";),
                        mockGatewayRetriever,
                        RpcUtils.INF_TIMEOUT,
    -                   Collections.emptyMap());
    +                   Collections.emptyMap(),
    +                   TestingUtils.defaultExecutor());
     
    -           JobGraph job = new JobGraph("testjob");
    -           JobSubmitRequestBody request = new JobSubmitRequestBody(job);
    +           JobSubmitRequestBody request = new 
JobSubmitRequestBody(jobGraphFile.getFileName().toString(), 
Collections.emptyList(), Collections.emptyList());
     
    -           handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance()), mockGateway)
    +           handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance(), Collections.emptyMap(), 
Collections.emptyMap(), Collections.singleton(jobGraphFile.toFile())), 
mockGateway)
                        .get();
        }
    +
    +   @Test
    +   public void testRejectionOnCountMismatch() throws Exception {
    +           final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
    +           try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
    +                   try (ObjectOutputStream objectOut = new 
ObjectOutputStream(fileOut)) {
    +                           objectOut.writeObject(new JobGraph("testjob"));
    +                   }
    +           }
    +           final Path countExceedingFile = 
TEMPORARY_FOLDER.newFile().toPath();
    +
    +           DispatcherGateway mockGateway = mock(DispatcherGateway.class);
    +           when(mockGateway.getHostname()).thenReturn("localhost");
    +           
when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort()));
    +           when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
    +           GatewayRetriever<DispatcherGateway> mockGatewayRetriever = 
mock(GatewayRetriever.class);
    +
    +           JobSubmitHandler handler = new JobSubmitHandler(
    +                   
CompletableFuture.completedFuture("http://localhost:1234";),
    +                   mockGatewayRetriever,
    +                   RpcUtils.INF_TIMEOUT,
    +                   Collections.emptyMap(),
    +                   TestingUtils.defaultExecutor());
    +
    +           JobSubmitRequestBody request = new 
JobSubmitRequestBody(jobGraphFile.getFileName().toString(), 
Collections.emptyList(), Collections.emptyList());
    +
    +           try {
    +                   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance(), Collections.emptyMap(), 
Collections.emptyMap(), Arrays.asList(jobGraphFile.toFile(), 
countExceedingFile.toFile())), mockGateway)
    +                           .get();
    +           } catch (Exception e) {
    +                   ExceptionUtils.findThrowable(e, candidate -> candidate 
instanceof RestHandlerException && candidate.getMessage().contains("count"));
    +           }
    +   }
    +
    +   @Test
    +   public void testFileHandling() throws Exception {
    +           final String dcEntryName = "entry";
    +
    +           CompletableFuture<JobGraph> submittedJobGraphFuture = new 
CompletableFuture<>();
    +           DispatcherGateway dispatcherGateway = new 
TestingDispatcherGateway.Builder()
    +                   .setBlobServerPort(blobServer.getPort())
    +                   .setSubmitFunction(submittedJobGraph -> {
    +                           
submittedJobGraphFuture.complete(submittedJobGraph);
    +                           return 
CompletableFuture.completedFuture(Acknowledge.get());
    +                   })
    +                   .build();
    +
    +           GatewayRetriever<DispatcherGateway> gatewayRetriever = new 
TestGatewayRetriever(dispatcherGateway);
    +
    +           JobSubmitHandler handler = new JobSubmitHandler(
    +                   
CompletableFuture.completedFuture("http://localhost:1234";),
    +                   gatewayRetriever,
    +                   RpcUtils.INF_TIMEOUT,
    +                   Collections.emptyMap(),
    +                   TestingUtils.defaultExecutor());
    +
    +           final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
    +           final Path jarFile = TEMPORARY_FOLDER.newFile().toPath();
    +           final Path artifactFile = TEMPORARY_FOLDER.newFile().toPath();
    +
    +           final JobGraph jobGraph = new JobGraph();
    +           // the entry that should be updated
    +           jobGraph.addUserArtifact(dcEntryName, new 
DistributedCache.DistributedCacheEntry("random", false));
    +           try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
    +                   try (ObjectOutputStream objectOut = new 
ObjectOutputStream(fileOut)) {
    +                           objectOut.writeObject(jobGraph);
    +                   }
    +           }
    +
    +           JobSubmitRequestBody request = new JobSubmitRequestBody(
    +                   jobGraphFile.getFileName().toString(),
    +                   
Collections.singletonList(jarFile.getFileName().toString()),
    +                   Collections.singleton(new 
JobSubmitRequestBody.DistributedCacheFile(dcEntryName, 
artifactFile.getFileName().toString())));
    +
    +           handler.handleRequest(new HandlerRequest<>(
    +                           request,
    +                           EmptyMessageParameters.getInstance(),
    +                           Collections.emptyMap(),
    +                           Collections.emptyMap(),
    +                           Arrays.asList(jobGraphFile.toFile(), 
jarFile.toFile(), artifactFile.toFile())), dispatcherGateway)
    +                   .get();
    +
    +           Assert.assertTrue("No JobGraph was submitted.", 
submittedJobGraphFuture.isDone());
    +           final JobGraph submittedJobGraph = 
submittedJobGraphFuture.get();
    +           Assert.assertEquals(1, 
submittedJobGraph.getUserJarBlobKeys().size());
    +           Assert.assertEquals(1, 
submittedJobGraph.getUserArtifacts().size());
    +           
Assert.assertNotNull(submittedJobGraph.getUserArtifacts().get(dcEntryName).blobKey);
    --- End diff --
    
    Just a side note, hamcrest offers a bit more expressive assertions which 
generate in many cases better failure messages. E.g., 
`assertThat(submittedJobGraph.getUserArtifacts(), hasSize(1))`.


---

Reply via email to