[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-07-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6203


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-07-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199867852
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -315,36 +315,61 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
+   CompletableFuture jobGraphFileFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   final java.nio.file.Path jobGraphFile = 
Files.createTempFile("flink-jobgraph", ".bin");
+   try (ObjectOutputStream objectOut = new 
ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
+   objectOut.writeObject(jobGraph);
+   }
+   return jobGraphFile;
+   } catch (IOException e) {
+   throw new CompletionException(new 
FlinkException("Failed to serialize JobGraph.", e));
+   }
+   }, executorService);
 
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
+   CompletableFuture>> requestFuture = 
jobGraphFileFuture.thenApply(jobGraphFile -> {
+   List jarFileNames = new ArrayList<>(8);
+   List 
artifactFileNames = new ArrayList<>(8);
+   Collection filesToUpload = new 
ArrayList<>(8);
 
-   try {
-   
ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, 
flinkConfig));
-   } catch (Exception e) {
-   throw new CompletionException(e);
-   }
+   filesToUpload.add(new FileUpload(jobGraphFile, 
RestConstants.CONTENT_TYPE_BINARY));
 
-   return jobGraph;
-   });
+   for (Path jar : jobGraph.getUserJars()) {
+   jarFileNames.add(jar.getName());
+   filesToUpload.add(new 
FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
+   }
 
-   CompletableFuture submissionFuture = 
jobUploadFuture.thenCompose(
-   (JobGraph jobGraphToSubmit) -> {
-   log.info("Submitting job graph.");
+   for (Map.Entry artifacts : 
jobGraph.getUserArtifacts().entrySet()) {
+   artifactFileNames.add(new 
JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new 
Path(artifacts.getValue().filePath).getName()));
+   filesToUpload.add(new 
FileUpload(Paths.get(artifacts.getValue().filePath), 
RestConstants.CONTENT_TYPE_BINARY));
+   }
 
-   try {
-   return sendRequest(
-   JobSubmitHeaders.getInstance(),
-   new 
JobSubmitRequestBody(jobGraph));
-   } catch (IOException ioe) {
-   throw new CompletionException(new 
FlinkException("Could not create JobSubmitRequestBody.", ioe));
-   }
-   });
+   final JobSubmitRequestBody requestBody = new 
JobSubmitRequestBody(
+   jobGraphFile.getFileName().toString(),
+   jarFileNames,
+   artifactFileNames);
+
+   return Tuple2.of(requestBody, 
Collections.unmodifiableCollection(filesToUpload));
+   });
+
+   final CompletableFuture submissionFuture 
= requestFuture.thenCompose(
+   requestAndFileUploads -> sendRetriableRequest(
+   JobSubmitHeaders.getInstance(),
+   EmptyMessageParameters.getInstance(),
+   requestAndFileUploads.f0,
+   requestAndFileUploads.f1,
+   

[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-07-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199866326
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -315,36 +315,61 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
+   CompletableFuture jobGraphFileFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   final java.nio.file.Path jobGraphFile = 
Files.createTempFile("flink-jobgraph", ".bin");
+   try (ObjectOutputStream objectOut = new 
ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
+   objectOut.writeObject(jobGraph);
+   }
+   return jobGraphFile;
+   } catch (IOException e) {
+   throw new CompletionException(new 
FlinkException("Failed to serialize JobGraph.", e));
+   }
+   }, executorService);
 
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
+   CompletableFuture>> requestFuture = 
jobGraphFileFuture.thenApply(jobGraphFile -> {
+   List jarFileNames = new ArrayList<>(8);
+   List 
artifactFileNames = new ArrayList<>(8);
+   Collection filesToUpload = new 
ArrayList<>(8);
 
-   try {
-   
ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, 
flinkConfig));
-   } catch (Exception e) {
-   throw new CompletionException(e);
-   }
+   filesToUpload.add(new FileUpload(jobGraphFile, 
RestConstants.CONTENT_TYPE_BINARY));
 
-   return jobGraph;
-   });
+   for (Path jar : jobGraph.getUserJars()) {
+   jarFileNames.add(jar.getName());
+   filesToUpload.add(new 
FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
+   }
 
-   CompletableFuture submissionFuture = 
jobUploadFuture.thenCompose(
-   (JobGraph jobGraphToSubmit) -> {
-   log.info("Submitting job graph.");
+   for (Map.Entry artifacts : 
jobGraph.getUserArtifacts().entrySet()) {
+   artifactFileNames.add(new 
JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new 
Path(artifacts.getValue().filePath).getName()));
+   filesToUpload.add(new 
FileUpload(Paths.get(artifacts.getValue().filePath), 
RestConstants.CONTENT_TYPE_BINARY));
+   }
 
-   try {
-   return sendRequest(
-   JobSubmitHeaders.getInstance(),
-   new 
JobSubmitRequestBody(jobGraph));
-   } catch (IOException ioe) {
-   throw new CompletionException(new 
FlinkException("Could not create JobSubmitRequestBody.", ioe));
-   }
-   });
+   final JobSubmitRequestBody requestBody = new 
JobSubmitRequestBody(
+   jobGraphFile.getFileName().toString(),
+   jarFileNames,
+   artifactFileNames);
+
+   return Tuple2.of(requestBody, 
Collections.unmodifiableCollection(filesToUpload));
+   });
+
+   final CompletableFuture submissionFuture 
= requestFuture.thenCompose(
+   requestAndFileUploads -> sendRetriableRequest(
+   JobSubmitHeaders.getInstance(),
+   EmptyMessageParameters.getInstance(),
+   requestAndFileUploads.f0,
+   requestAndFileUploads.f1,
+   

[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-07-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199794599
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
-   throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.getFileName().toString(),
+   entry -> entry
+   ));
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile);
+
+   Collection jarFiles = new 
ArrayList<>(requestBody.jarFileNames.size());
+   for (String jarFileName : requestBody.jarFileNames) {
+   Path jarFile = getPathAndAssertUpload(jarFileName, 
"Jar", nameToFile);
+   jarFiles.add(new 
org.apache.flink.core.fs.Path(jarFile.toString()));
+   }
+
+   Collection> 
artifacts = new ArrayList<>(requestBody.artifactFileNames.size());
+   for (JobSubmitRequestBody.DistributedCacheFile artifactFileName 
: requestBody.artifactFileNames) {
+   Path artifactFile = 
getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile);
+   artifacts.add(Tuple2.of(artifactFileName.entryName, new 
org.apache.flink.core.fs.Path(artifactFile.toString(;
}
 
-   return gateway.submitJob(jobGraph, timeout)
-   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+   Map 
temporaryHack = artifacts.stream()
+   .collect(Collectors.toMap(
+   tuple -> tuple.f0,
+   // the actual entry definition is mostly 
irrelevant as only the blobkey is accessed
+   // blame whoever wrote the ClientUtils API
+   tuple -> new 
DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false)
+   ));
+
+   // TODO: use executor
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   });
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+   final InetSocketAddress address = new 
InetSocketAddress(gateway.getHostname(), blobServerPort);
+   try (BlobClient blobClient = new BlobClient(address, 
new Configuration())) {
+   Collection jarBlobKeys = 
ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient);
+   ClientUtils.setUserJarBlobKeys(jarBlobKeys, 
jobGraph);
+
+   Collection> 
artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), 
temporaryHack, blobClient);
+   ClientUtils.setUserArtifactBlobKeys(jobGraph, 
artifactBlobKeys);
+   } catch (IOException e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Could not upload job files.",
+   

[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-07-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199793741
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -315,36 +315,61 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
+   CompletableFuture jobGraphFileFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   final java.nio.file.Path jobGraphFile = 
Files.createTempFile("flink-jobgraph", ".bin");
+   try (ObjectOutputStream objectOut = new 
ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
+   objectOut.writeObject(jobGraph);
+   }
+   return jobGraphFile;
+   } catch (IOException e) {
+   throw new CompletionException(new 
FlinkException("Failed to serialize JobGraph.", e));
+   }
+   }, executorService);
 
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
+   CompletableFuture>> requestFuture = 
jobGraphFileFuture.thenApply(jobGraphFile -> {
+   List jarFileNames = new ArrayList<>(8);
+   List 
artifactFileNames = new ArrayList<>(8);
+   Collection filesToUpload = new 
ArrayList<>(8);
 
-   try {
-   
ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, 
flinkConfig));
-   } catch (Exception e) {
-   throw new CompletionException(e);
-   }
+   filesToUpload.add(new FileUpload(jobGraphFile, 
RestConstants.CONTENT_TYPE_BINARY));
 
-   return jobGraph;
-   });
+   for (Path jar : jobGraph.getUserJars()) {
+   jarFileNames.add(jar.getName());
+   filesToUpload.add(new 
FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
+   }
 
-   CompletableFuture submissionFuture = 
jobUploadFuture.thenCompose(
-   (JobGraph jobGraphToSubmit) -> {
-   log.info("Submitting job graph.");
+   for (Map.Entry artifacts : 
jobGraph.getUserArtifacts().entrySet()) {
+   artifactFileNames.add(new 
JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new 
Path(artifacts.getValue().filePath).getName()));
+   filesToUpload.add(new 
FileUpload(Paths.get(artifacts.getValue().filePath), 
RestConstants.CONTENT_TYPE_BINARY));
+   }
 
-   try {
-   return sendRequest(
-   JobSubmitHeaders.getInstance(),
-   new 
JobSubmitRequestBody(jobGraph));
-   } catch (IOException ioe) {
-   throw new CompletionException(new 
FlinkException("Could not create JobSubmitRequestBody.", ioe));
-   }
-   });
+   final JobSubmitRequestBody requestBody = new 
JobSubmitRequestBody(
+   jobGraphFile.getFileName().toString(),
+   jarFileNames,
+   artifactFileNames);
+
+   return Tuple2.of(requestBody, 
Collections.unmodifiableCollection(filesToUpload));
+   });
+
+   final CompletableFuture submissionFuture 
= requestFuture.thenCompose(
+   requestAndFileUploads -> sendRetriableRequest(
+   JobSubmitHeaders.getInstance(),
+   EmptyMessageParameters.getInstance(),
+   requestAndFileUploads.f0,
+   requestAndFileUploads.f1,
+

[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-07-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199780706
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 ---
@@ -105,7 +106,8 @@ protected void respondToRequest(ChannelHandlerContext 
ctx, HttpRequest httpReque
messageHeaders.getResponseStatusCode(),
responseHeaders);
}
-   });
+   }).whenComplete((P resp, Throwable throwable) -> 
processingFinishedFuture.complete(null));
--- End diff --

I think we are swallowing potential exceptions here. I think it would be 
better to do something like
```
return response.whenComplete(...).thenApply(ignored -> null)
```

That way we would also get rid of the `processingFinishedFuture`.


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-07-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199780490
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 ---
@@ -79,6 +79,7 @@ protected void respondToRequest(ChannelHandlerContext 
ctx, HttpRequest httpReque
response = FutureUtils.completedExceptionally(e);
}
 
+   CompletableFuture processingFinishedFuture = new 
CompletableFuture<>();
response.whenComplete((P resp, Throwable throwable) -> {
--- End diff --

This is a good point.


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-07-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199741252
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
 ---
@@ -18,64 +18,118 @@
 
 package org.apache.flink.runtime.rest.messages.job;
 
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.rest.messages.RequestBody;
-import org.apache.flink.util.Preconditions;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
 
 /**
  * Request for submitting a job.
  *
- * We currently require the job-jars to be uploaded through the 
blob-server.
+ * This request only contains the names of files that must be present 
on the server, and defines how these files are
+ * interpreted.
  */
 public final class JobSubmitRequestBody implements RequestBody {
 
-   private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = 
"serializedJobGraph";
+   private static final String FIELD_NAME_JOB_GRAPH = "jobGraphFileName";
+   private static final String FIELD_NAME_JOB_JARS = "jobJarFileNames";
+   private static final String FIELD_NAME_JOB_ARTIFACTS = 
"jobArtifactFileNames";
 
-   /**
-* The serialized job graph.
-*/
-   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
-   public final byte[] serializedJobGraph;
+   @JsonProperty(FIELD_NAME_JOB_GRAPH)
+   public final String jobGraphFileName;
 
-   public JobSubmitRequestBody(JobGraph jobGraph) throws IOException {
-   this(serializeJobGraph(jobGraph));
-   }
+   @JsonProperty(FIELD_NAME_JOB_JARS)
+   public final Collection jarFileNames;
+
+   @JsonProperty(FIELD_NAME_JOB_ARTIFACTS)
+   public final Collection artifactFileNames;
 
-   @JsonCreator
--- End diff --

revert


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-07-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199456197
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 ---
@@ -119,15 +113,17 @@ public void testSuccessfulJobSubmission() throws 
Exception {
}
}
 
-   DispatcherGateway mockGateway = mock(DispatcherGateway.class);
-   when(mockGateway.getHostname()).thenReturn("localhost");
-   
when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort()));
-   when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
-   GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
+   TestingDispatcherGateway.Builder builder = new 
TestingDispatcherGateway.Builder();
--- End diff --

This block is written the way it is since methods that the 
`TestingDispatcherGateway.Builder` inherits return a 
`TestingRestfulGateway.Builder`, which also applies to `build`.
it's a bit cumbersome, but I couldn't find a solution that doesn't include 
copying the entire `TestingRestfulGateway.Builder`.


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-07-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199450747
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
+   CompletableFuture submissionFuture = 
CompletableFuture.supplyAsync(
+   () -> {
+   log.info("Submitting job graph.");
 
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
+   List jarFileNames = new ArrayList<>(8);
+   List 
artifactFileNames = new ArrayList<>(8);
+   Collection filesToUpload = new 
ArrayList<>(8);
 
+   // TODO: need configurable location
--- End diff --

Will do once the PR is merged.


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-07-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199439130
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
+   CompletableFuture submissionFuture = 
CompletableFuture.supplyAsync(
+   () -> {
+   log.info("Submitting job graph.");
 
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
+   List jarFileNames = new ArrayList<>(8);
+   List 
artifactFileNames = new ArrayList<>(8);
+   Collection filesToUpload = new 
ArrayList<>(8);
 
+   // TODO: need configurable location
+   final java.nio.file.Path jobGraphFile;
try {
-   
ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, 
flinkConfig));
-   } catch (Exception e) {
-   throw new CompletionException(e);
+   jobGraphFile = 
Files.createTempFile("flink-jobgraph", ".bin");
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream 
objectOut = new ObjectOutputStream(fileOut)) {
+   
objectOut.writeObject(jobGraph);
+   }
+   }
+   filesToUpload.add(new 
FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
+   } catch (IOException e) {
+   throw new CompletionException("Failed 
to serialize JobGraph.", e);
}
 
-   return jobGraph;
-   });
-
-   CompletableFuture submissionFuture = 
jobUploadFuture.thenCompose(
-   (JobGraph jobGraphToSubmit) -> {
-   log.info("Submitting job graph.");
+   for (Path jar : jobGraph.getUserJars()) {
+   jarFileNames.add(jar.getName());
+   filesToUpload.add(new 
FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
+   }
 
-   try {
-   return sendRequest(
-   JobSubmitHeaders.getInstance(),
-   new 
JobSubmitRequestBody(jobGraph));
-   } catch (IOException ioe) {
-   throw new CompletionException(new 
FlinkException("Could not create JobSubmitRequestBody.", ioe));
+   for (Map.Entry artifacts : 
jobGraph.getUserArtifacts().entrySet()) {
+   artifactFileNames.add(new 
JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new 
Path(artifacts.getValue().filePath).getName()));
+   filesToUpload.add(new 
FileUpload(Paths.get(artifacts.getValue().filePath), 
RestConstants.CONTENT_TYPE_BINARY));
}
-   });
+
+   final CompletableFuture 
submitFuture = sendRetriableRequest(
--- End diff --

This is a slightly more extensive change since the cleanup needs access to 
the `jobGraphFile`. I'll see what i can do.


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-07-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199427903
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 ---
@@ -79,6 +79,7 @@ protected void respondToRequest(ChannelHandlerContext 
ctx, HttpRequest httpReque
response = FutureUtils.completedExceptionally(e);
}
 
+   CompletableFuture processingFinishedFuture = new 
CompletableFuture<>();
response.whenComplete((P resp, Throwable throwable) -> {
--- End diff --

seems odd to modify the signature to make the code prettier.
There's no use-case for the handlers to return anything but null, so why 
even allow it? Doesn't this go against the principle of using the most 
restrictive interface?


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199333574
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -34,38 +40,137 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler {
 
+   private static final String FILE_TYPE_GRAPH = "JobGraph";
+   private static final String FILE_TYPE_JAR = "Jar";
+   private static final String FILE_TYPE_ARTIFACT = "Artifact";
+
+   private final Executor executor;
+
public JobSubmitHandler(
CompletableFuture localRestAddress,
GatewayRetriever 
leaderRetriever,
Time timeout,
-   Map headers) {
+   Map headers,
+   Executor executor) {
super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
+   this.executor = executor;
}
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.toPath().getFileName().toString(),
+   File::toPath
+   ));
+
+   if (uploadedFiles.size() != nameToFile.size()) {
throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   String.format("The number of uploaded files was 
%s than the expected count. Expected: %s Actual %s",
+   uploadedFiles.size() < 
nameToFile.size() ? "lower" : "higher",
+   nameToFile.size(),
+   uploadedFiles.size()),
+   HttpResponseStatus.BAD_REQUEST
+   );
+   }
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_GRAPH, 
nameToFile);
+
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   }, executor);
+
+   Collection jarFiles = 
getJarFilesToUpload(nameToFile, requestBody.jarFileNames);
+
+   Collection> 
artifacts = getArtifactFilesToUpload(nameToFile, requestBody.artifactFileNames);
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+   final InetSocketAddress address = new 
InetSocketAddress(gateway.getHostname(), blobServerPort);
+   try (BlobClient blobClient = new BlobClient(address, 
new Configuration())){
+   Collection jarBlobKeys = 

[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199236398
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -34,38 +40,137 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler {
 
+   private static final String FILE_TYPE_GRAPH = "JobGraph";
+   private static final String FILE_TYPE_JAR = "Jar";
+   private static final String FILE_TYPE_ARTIFACT = "Artifact";
+
+   private final Executor executor;
+
public JobSubmitHandler(
CompletableFuture localRestAddress,
GatewayRetriever 
leaderRetriever,
Time timeout,
-   Map headers) {
+   Map headers,
+   Executor executor) {
super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
+   this.executor = executor;
}
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.toPath().getFileName().toString(),
+   File::toPath
+   ));
+
+   if (uploadedFiles.size() != nameToFile.size()) {
throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   String.format("The number of uploaded files was 
%s than the expected count. Expected: %s Actual %s",
+   uploadedFiles.size() < 
nameToFile.size() ? "lower" : "higher",
+   nameToFile.size(),
+   uploadedFiles.size()),
+   HttpResponseStatus.BAD_REQUEST
+   );
+   }
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_GRAPH, 
nameToFile);
+
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   }, executor);
+
+   Collection jarFiles = 
getJarFilesToUpload(nameToFile, requestBody.jarFileNames);
+
+   Collection> 
artifacts = getArtifactFilesToUpload(nameToFile, requestBody.artifactFileNames);
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+   final InetSocketAddress address = new 
InetSocketAddress(gateway.getHostname(), blobServerPort);
+   try (BlobClient blobClient = new BlobClient(address, 
new Configuration())){
+   Collection jarBlobKeys = 

[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199197307
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 ---
@@ -47,8 +65,30 @@
  */
 public class JobSubmitHandlerTest extends TestLogger {
 
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+   private static BlobServer blobServer;
+
+   @BeforeClass
+   public static void setup() throws IOException {
+   Configuration config = new Configuration();
+   config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+
+   blobServer = new BlobServer(config, new VoidBlobStore());
+   blobServer.start();
+   }
+
+   @AfterClass
+   public static void teardown() throws IOException {
+   if (blobServer != null) {
+   blobServer.close();
+   }
+   }
+
@Test
public void testSerializationFailureHandling() throws Exception {
+   final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
DispatcherGateway mockGateway = mock(DispatcherGateway.class);
when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
--- End diff --

Here we could use instead `() -> new CompletableFuture()`


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199181211
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
+   CompletableFuture submissionFuture = 
CompletableFuture.supplyAsync(
+   () -> {
+   log.info("Submitting job graph.");
 
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
+   List jarFileNames = new ArrayList<>(8);
+   List 
artifactFileNames = new ArrayList<>(8);
+   Collection filesToUpload = new 
ArrayList<>(8);
 
+   // TODO: need configurable location
+   final java.nio.file.Path jobGraphFile;
try {
-   
ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, 
flinkConfig));
-   } catch (Exception e) {
-   throw new CompletionException(e);
+   jobGraphFile = 
Files.createTempFile("flink-jobgraph", ".bin");
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream 
objectOut = new ObjectOutputStream(fileOut)) {
+   
objectOut.writeObject(jobGraph);
+   }
+   }
+   filesToUpload.add(new 
FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
+   } catch (IOException e) {
+   throw new CompletionException("Failed 
to serialize JobGraph.", e);
}
 
-   return jobGraph;
-   });
-
-   CompletableFuture submissionFuture = 
jobUploadFuture.thenCompose(
-   (JobGraph jobGraphToSubmit) -> {
-   log.info("Submitting job graph.");
+   for (Path jar : jobGraph.getUserJars()) {
+   jarFileNames.add(jar.getName());
+   filesToUpload.add(new 
FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
+   }
 
-   try {
-   return sendRequest(
-   JobSubmitHeaders.getInstance(),
-   new 
JobSubmitRequestBody(jobGraph));
-   } catch (IOException ioe) {
-   throw new CompletionException(new 
FlinkException("Could not create JobSubmitRequestBody.", ioe));
+   for (Map.Entry artifacts : 
jobGraph.getUserArtifacts().entrySet()) {
+   artifactFileNames.add(new 
JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new 
Path(artifacts.getValue().filePath).getName()));
+   filesToUpload.add(new 
FileUpload(Paths.get(artifacts.getValue().filePath), 
RestConstants.CONTENT_TYPE_BINARY));
}
-   });
+
+   final CompletableFuture 
submitFuture = sendRetriableRequest(
+   JobSubmitHeaders.getInstance(),
+   EmptyMessageParameters.getInstance(),
+   new JobSubmitRequestBody(
+   
jobGraphFile.getFileName().toString(),
+   jarFileNames,
+   artifactFileNames),
+   filesToUpload,
+   

[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199178551
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
+   CompletableFuture submissionFuture = 
CompletableFuture.supplyAsync(
+   () -> {
+   log.info("Submitting job graph.");
 
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
+   List jarFileNames = new ArrayList<>(8);
+   List 
artifactFileNames = new ArrayList<>(8);
+   Collection filesToUpload = new 
ArrayList<>(8);
 
+   // TODO: need configurable location
+   final java.nio.file.Path jobGraphFile;
try {
-   
ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, 
flinkConfig));
-   } catch (Exception e) {
-   throw new CompletionException(e);
+   jobGraphFile = 
Files.createTempFile("flink-jobgraph", ".bin");
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream 
objectOut = new ObjectOutputStream(fileOut)) {
--- End diff --

I think we could combine these to `try` statement to `new 
ObjectOutputStream(Files.newOutputStream(jobGraphFile))`


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199187606
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -34,38 +40,137 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler {
 
+   private static final String FILE_TYPE_GRAPH = "JobGraph";
+   private static final String FILE_TYPE_JAR = "Jar";
+   private static final String FILE_TYPE_ARTIFACT = "Artifact";
+
+   private final Executor executor;
+
public JobSubmitHandler(
CompletableFuture localRestAddress,
GatewayRetriever 
leaderRetriever,
Time timeout,
-   Map headers) {
+   Map headers,
+   Executor executor) {
super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
+   this.executor = executor;
}
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.toPath().getFileName().toString(),
+   File::toPath
+   ));
+
+   if (uploadedFiles.size() != nameToFile.size()) {
throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   String.format("The number of uploaded files was 
%s than the expected count. Expected: %s Actual %s",
+   uploadedFiles.size() < 
nameToFile.size() ? "lower" : "higher",
+   nameToFile.size(),
+   uploadedFiles.size()),
+   HttpResponseStatus.BAD_REQUEST
+   );
+   }
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_GRAPH, 
nameToFile);
--- End diff --

maybe make `final`.


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199187175
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -34,38 +40,137 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler {
 
+   private static final String FILE_TYPE_GRAPH = "JobGraph";
+   private static final String FILE_TYPE_JAR = "Jar";
+   private static final String FILE_TYPE_ARTIFACT = "Artifact";
+
+   private final Executor executor;
+
public JobSubmitHandler(
CompletableFuture localRestAddress,
GatewayRetriever 
leaderRetriever,
Time timeout,
-   Map headers) {
+   Map headers,
+   Executor executor) {
super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
+   this.executor = executor;
}
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.toPath().getFileName().toString(),
--- End diff --

Let's add a type `(File file) -> ...`


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199177926
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
+   CompletableFuture submissionFuture = 
CompletableFuture.supplyAsync(
+   () -> {
+   log.info("Submitting job graph.");
 
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
+   List jarFileNames = new ArrayList<>(8);
+   List 
artifactFileNames = new ArrayList<>(8);
+   Collection filesToUpload = new 
ArrayList<>(8);
 
+   // TODO: need configurable location
--- End diff --

Let's create a follow up story for it and remove the TODO


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199195443
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -34,38 +40,137 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler {
 
+   private static final String FILE_TYPE_GRAPH = "JobGraph";
+   private static final String FILE_TYPE_JAR = "Jar";
+   private static final String FILE_TYPE_ARTIFACT = "Artifact";
+
+   private final Executor executor;
+
public JobSubmitHandler(
CompletableFuture localRestAddress,
GatewayRetriever 
leaderRetriever,
Time timeout,
-   Map headers) {
+   Map headers,
+   Executor executor) {
super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
+   this.executor = executor;
}
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.toPath().getFileName().toString(),
+   File::toPath
--- End diff --

Moreover we would no longer have to prefix the Flink `Path` type because we 
no longer have an ambiguity.


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199199366
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 ---
@@ -71,20 +112,131 @@ public void testSerializationFailureHandling() throws 
Exception {
 
@Test
public void testSuccessfulJobSubmission() throws Exception {
+   final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream objectOut = new 
ObjectOutputStream(fileOut)) {
+   objectOut.writeObject(new JobGraph("testjob"));
+   }
+   }
+
DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+   when(mockGateway.getHostname()).thenReturn("localhost");
+   
when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort()));
when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
 
JobSubmitHandler handler = new JobSubmitHandler(

CompletableFuture.completedFuture("http://localhost:1234;),
mockGatewayRetriever,
RpcUtils.INF_TIMEOUT,
-   Collections.emptyMap());
+   Collections.emptyMap(),
+   TestingUtils.defaultExecutor());
 
-   JobGraph job = new JobGraph("testjob");
-   JobSubmitRequestBody request = new JobSubmitRequestBody(job);
+   JobSubmitRequestBody request = new 
JobSubmitRequestBody(jobGraphFile.getFileName().toString(), 
Collections.emptyList(), Collections.emptyList());
 
-   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance()), mockGateway)
+   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance(), Collections.emptyMap(), 
Collections.emptyMap(), Collections.singleton(jobGraphFile.toFile())), 
mockGateway)
.get();
}
+
+   @Test
+   public void testRejectionOnCountMismatch() throws Exception {
+   final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream objectOut = new 
ObjectOutputStream(fileOut)) {
+   objectOut.writeObject(new JobGraph("testjob"));
+   }
+   }
+   final Path countExceedingFile = 
TEMPORARY_FOLDER.newFile().toPath();
+
+   DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+   when(mockGateway.getHostname()).thenReturn("localhost");
+   
when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort()));
+   when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+   GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
+
+   JobSubmitHandler handler = new JobSubmitHandler(
+   
CompletableFuture.completedFuture("http://localhost:1234;),
+   mockGatewayRetriever,
+   RpcUtils.INF_TIMEOUT,
+   Collections.emptyMap(),
+   TestingUtils.defaultExecutor());
+
+   JobSubmitRequestBody request = new 
JobSubmitRequestBody(jobGraphFile.getFileName().toString(), 
Collections.emptyList(), Collections.emptyList());
+
+   try {
+   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance(), Collections.emptyMap(), 
Collections.emptyMap(), Arrays.asList(jobGraphFile.toFile(), 
countExceedingFile.toFile())), mockGateway)
+   .get();
+   } catch (Exception e) {
+   ExceptionUtils.findThrowable(e, candidate -> candidate 
instanceof RestHandlerException && candidate.getMessage().contains("count"));
+   }
+   }
+
+   @Test
+   public void testFileHandling() throws Exception {
+   final String dcEntryName = "entry";
+
+   CompletableFuture submittedJobGraphFuture = new 
CompletableFuture<>();
+   DispatcherGateway dispatcherGateway = new 
TestingDispatcherGateway.Builder()
+   .setBlobServerPort(blobServer.getPort())
+

[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199188278
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -34,38 +40,137 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler {
 
+   private static final String FILE_TYPE_GRAPH = "JobGraph";
+   private static final String FILE_TYPE_JAR = "Jar";
+   private static final String FILE_TYPE_ARTIFACT = "Artifact";
+
+   private final Executor executor;
+
public JobSubmitHandler(
CompletableFuture localRestAddress,
GatewayRetriever 
leaderRetriever,
Time timeout,
-   Map headers) {
+   Map headers,
+   Executor executor) {
super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
+   this.executor = executor;
}
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.toPath().getFileName().toString(),
+   File::toPath
+   ));
+
+   if (uploadedFiles.size() != nameToFile.size()) {
throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   String.format("The number of uploaded files was 
%s than the expected count. Expected: %s Actual %s",
+   uploadedFiles.size() < 
nameToFile.size() ? "lower" : "higher",
+   nameToFile.size(),
+   uploadedFiles.size()),
+   HttpResponseStatus.BAD_REQUEST
+   );
+   }
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_GRAPH, 
nameToFile);
+
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   }, executor);
+
+   Collection jarFiles = 
getJarFilesToUpload(nameToFile, requestBody.jarFileNames);
+
+   Collection> 
artifacts = getArtifactFilesToUpload(nameToFile, requestBody.artifactFileNames);
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
--- End diff --

Let's add types to the lambda parameters.


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199189017
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -34,38 +40,137 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler {
 
+   private static final String FILE_TYPE_GRAPH = "JobGraph";
+   private static final String FILE_TYPE_JAR = "Jar";
+   private static final String FILE_TYPE_ARTIFACT = "Artifact";
+
+   private final Executor executor;
+
public JobSubmitHandler(
CompletableFuture localRestAddress,
GatewayRetriever 
leaderRetriever,
Time timeout,
-   Map headers) {
+   Map headers,
+   Executor executor) {
super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
+   this.executor = executor;
}
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.toPath().getFileName().toString(),
+   File::toPath
+   ));
+
+   if (uploadedFiles.size() != nameToFile.size()) {
throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   String.format("The number of uploaded files was 
%s than the expected count. Expected: %s Actual %s",
+   uploadedFiles.size() < 
nameToFile.size() ? "lower" : "higher",
+   nameToFile.size(),
+   uploadedFiles.size()),
+   HttpResponseStatus.BAD_REQUEST
+   );
+   }
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_GRAPH, 
nameToFile);
+
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   }, executor);
+
+   Collection jarFiles = 
getJarFilesToUpload(nameToFile, requestBody.jarFileNames);
+
+   Collection> 
artifacts = getArtifactFilesToUpload(nameToFile, requestBody.artifactFileNames);
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+   final InetSocketAddress address = new 
InetSocketAddress(gateway.getHostname(), blobServerPort);
+   try (BlobClient blobClient = new BlobClient(address, 
new Configuration())){
+   Collection jarBlobKeys = 

[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199198445
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 ---
@@ -71,20 +112,131 @@ public void testSerializationFailureHandling() throws 
Exception {
 
@Test
public void testSuccessfulJobSubmission() throws Exception {
+   final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream objectOut = new 
ObjectOutputStream(fileOut)) {
+   objectOut.writeObject(new JobGraph("testjob"));
+   }
+   }
+
DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+   when(mockGateway.getHostname()).thenReturn("localhost");
+   
when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort()));
when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
 
JobSubmitHandler handler = new JobSubmitHandler(

CompletableFuture.completedFuture("http://localhost:1234;),
mockGatewayRetriever,
RpcUtils.INF_TIMEOUT,
-   Collections.emptyMap());
+   Collections.emptyMap(),
+   TestingUtils.defaultExecutor());
 
-   JobGraph job = new JobGraph("testjob");
-   JobSubmitRequestBody request = new JobSubmitRequestBody(job);
+   JobSubmitRequestBody request = new 
JobSubmitRequestBody(jobGraphFile.getFileName().toString(), 
Collections.emptyList(), Collections.emptyList());
 
-   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance()), mockGateway)
+   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance(), Collections.emptyMap(), 
Collections.emptyMap(), Collections.singleton(jobGraphFile.toFile())), 
mockGateway)
.get();
}
+
+   @Test
+   public void testRejectionOnCountMismatch() throws Exception {
+   final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream objectOut = new 
ObjectOutputStream(fileOut)) {
+   objectOut.writeObject(new JobGraph("testjob"));
+   }
+   }
+   final Path countExceedingFile = 
TEMPORARY_FOLDER.newFile().toPath();
+
+   DispatcherGateway mockGateway = mock(DispatcherGateway.class);
--- End diff --

Maybe we could replace it by the `TestingDispatcherGateway`


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199191206
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -34,38 +40,137 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler {
 
+   private static final String FILE_TYPE_GRAPH = "JobGraph";
+   private static final String FILE_TYPE_JAR = "Jar";
+   private static final String FILE_TYPE_ARTIFACT = "Artifact";
+
+   private final Executor executor;
+
public JobSubmitHandler(
CompletableFuture localRestAddress,
GatewayRetriever 
leaderRetriever,
Time timeout,
-   Map headers) {
+   Map headers,
+   Executor executor) {
super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
+   this.executor = executor;
}
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.toPath().getFileName().toString(),
+   File::toPath
+   ));
+
+   if (uploadedFiles.size() != nameToFile.size()) {
throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   String.format("The number of uploaded files was 
%s than the expected count. Expected: %s Actual %s",
+   uploadedFiles.size() < 
nameToFile.size() ? "lower" : "higher",
+   nameToFile.size(),
+   uploadedFiles.size()),
+   HttpResponseStatus.BAD_REQUEST
+   );
+   }
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_GRAPH, 
nameToFile);
+
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   }, executor);
+
+   Collection jarFiles = 
getJarFilesToUpload(nameToFile, requestBody.jarFileNames);
+
+   Collection> 
artifacts = getArtifactFilesToUpload(nameToFile, requestBody.artifactFileNames);
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+   final InetSocketAddress address = new 
InetSocketAddress(gateway.getHostname(), blobServerPort);
+   try (BlobClient blobClient = new BlobClient(address, 
new Configuration())){
+   Collection jarBlobKeys = 

[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199195959
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
 ---
@@ -152,6 +152,7 @@ protected void respondToRequest(ChannelHandlerContext 
ctx, HttpRequest httpReque
},
ctx.executor());
 
+   CompletableFuture processingFinishedFuture = new 
CompletableFuture<>();
--- End diff --

Let's remove this future and simply return the `whenComplete` return value.


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199197524
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 ---
@@ -71,20 +112,131 @@ public void testSerializationFailureHandling() throws 
Exception {
 
@Test
public void testSuccessfulJobSubmission() throws Exception {
+   final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream objectOut = new 
ObjectOutputStream(fileOut)) {
--- End diff --

These tries could be collapsed.


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199198501
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 ---
@@ -71,20 +112,131 @@ public void testSerializationFailureHandling() throws 
Exception {
 
@Test
public void testSuccessfulJobSubmission() throws Exception {
+   final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream objectOut = new 
ObjectOutputStream(fileOut)) {
+   objectOut.writeObject(new JobGraph("testjob"));
+   }
+   }
+
DispatcherGateway mockGateway = mock(DispatcherGateway.class);
--- End diff --

Maybe replace by `TestingDispatcherGateway`.


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199196388
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
 ---
@@ -18,64 +18,118 @@
 
 package org.apache.flink.runtime.rest.messages.job;
 
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.rest.messages.RequestBody;
-import org.apache.flink.util.Preconditions;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
 
 /**
  * Request for submitting a job.
  *
- * We currently require the job-jars to be uploaded through the 
blob-server.
+ * This request only contains the names of files that must be present 
on the server, and defines how these files are
+ * interpreted.
  */
 public final class JobSubmitRequestBody implements RequestBody {
 
-   private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = 
"serializedJobGraph";
+   private static final String FIELD_NAME_JOB_GRAPH = "jobGraphFileName";
+   private static final String FIELD_NAME_JOB_JARS = "jobJarFileNames";
+   private static final String FIELD_NAME_JOB_ARTIFACTS = 
"jobArtifactFileNames";
 
-   /**
-* The serialized job graph.
-*/
-   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
-   public final byte[] serializedJobGraph;
+   @JsonProperty(FIELD_NAME_JOB_GRAPH)
+   public final String jobGraphFileName;
 
-   public JobSubmitRequestBody(JobGraph jobGraph) throws IOException {
-   this(serializeJobGraph(jobGraph));
-   }
+   @JsonProperty(FIELD_NAME_JOB_JARS)
+   public final Collection jarFileNames;
+
+   @JsonProperty(FIELD_NAME_JOB_ARTIFACTS)
+   public final Collection artifactFileNames;
 
-   @JsonCreator
public JobSubmitRequestBody(
-   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) byte[] 
serializedJobGraph) {
-   this.serializedJobGraph = 
Preconditions.checkNotNull(serializedJobGraph);
+   @JsonProperty(FIELD_NAME_JOB_GRAPH) String 
jobGraphFileName,
+   @JsonProperty(FIELD_NAME_JOB_JARS) Collection 
jarFileNames,
+   @JsonProperty(FIELD_NAME_JOB_ARTIFACTS) 
Collection artifactFileNames) {
+   this.jobGraphFileName = jobGraphFileName;
+   this.jarFileNames = jarFileNames;
+   this.artifactFileNames = artifactFileNames;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+   JobSubmitRequestBody that = (JobSubmitRequestBody) o;
+   return Objects.equals(jobGraphFileName, that.jobGraphFileName) 
&&
+   Objects.equals(jarFileNames, that.jarFileNames) &&
+   Objects.equals(artifactFileNames, 
that.artifactFileNames);
}
 
@Override
public int hashCode() {
-   return 71 * Arrays.hashCode(this.serializedJobGraph);
+   return Objects.hash(jobGraphFileName, jarFileNames, 
artifactFileNames);
}
 
@Override
-   public boolean equals(Object object) {
-   if (object instanceof JobSubmitRequestBody) {
-   JobSubmitRequestBody other = (JobSubmitRequestBody) 
object;
-   return Arrays.equals(this.serializedJobGraph, 
other.serializedJobGraph);
-   }
-   return false;
+   public String toString() {
+   return "JobSubmitRequestBody{" +
+   "jobGraphFileName='" + jobGraphFileName + '\'' +
+   ", jarFileNames=" + jarFileNames +
+   ", artifactFileNames=" + artifactFileNames +
+   '}';
}
 
-   private static byte[] serializeJobGraph(JobGraph jobGraph) throws 
IOException {
-   try (ByteArrayOutputStream baos = new ByteArrayOutputStream(64 
* 1024)) {
-   ObjectOutputStream out = new ObjectOutputStream(baos);
+   /**
+* Descriptor for a distributed cache file.
+*/
+   public static class DistributedCacheFile {
+   private static final String FIELD_NAME_ENTRY_NAME = "entryName";
+   private static final String FIELD_NAME_FILE_NAME = "fileName";

[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199195175
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -34,38 +40,137 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler {
 
+   private static final String FILE_TYPE_GRAPH = "JobGraph";
+   private static final String FILE_TYPE_JAR = "Jar";
+   private static final String FILE_TYPE_ARTIFACT = "Artifact";
+
+   private final Executor executor;
+
public JobSubmitHandler(
CompletableFuture localRestAddress,
GatewayRetriever 
leaderRetriever,
Time timeout,
-   Map headers) {
+   Map headers,
+   Executor executor) {
super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
+   this.executor = executor;
}
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.toPath().getFileName().toString(),
+   File::toPath
--- End diff --

I think we can completely remove the `nio.Path` dependency if we use 
`Path::fromLocalFile` with `Path` being Flink's path implementation. The 
benefit would be that we would not have to change types in the 
`getJarFilesToUpload` and `getArtifactFilesToUpload` methods. Instead of 
`Files.newInputStream(jobGraphFile)` we would have to call 
`FileSystem.getLocalFileSystem().open(jobGraphFile)`.


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199190292
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
-   throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.getFileName().toString(),
+   entry -> entry
+   ));
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile);
+
+   Collection jarFiles = new 
ArrayList<>(requestBody.jarFileNames.size());
+   for (String jarFileName : requestBody.jarFileNames) {
+   Path jarFile = getPathAndAssertUpload(jarFileName, 
"Jar", nameToFile);
+   jarFiles.add(new 
org.apache.flink.core.fs.Path(jarFile.toString()));
+   }
+
+   Collection> 
artifacts = new ArrayList<>(requestBody.artifactFileNames.size());
+   for (JobSubmitRequestBody.DistributedCacheFile artifactFileName 
: requestBody.artifactFileNames) {
+   Path artifactFile = 
getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile);
+   artifacts.add(Tuple2.of(artifactFileName.entryName, new 
org.apache.flink.core.fs.Path(artifactFile.toString(;
}
 
-   return gateway.submitJob(jobGraph, timeout)
-   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+   Map 
temporaryHack = artifacts.stream()
+   .collect(Collectors.toMap(
+   tuple -> tuple.f0,
+   // the actual entry definition is mostly 
irrelevant as only the blobkey is accessed
+   // blame whoever wrote the ClientUtils API
+   tuple -> new 
DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false)
+   ));
+
+   // TODO: use executor
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   });
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+   final InetSocketAddress address = new 
InetSocketAddress(gateway.getHostname(), blobServerPort);
+   try (BlobClient blobClient = new BlobClient(address, 
new Configuration())) {
+   Collection jarBlobKeys = 
ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient);
+   ClientUtils.setUserJarBlobKeys(jarBlobKeys, 
jobGraph);
+
+   Collection> 
artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), 
temporaryHack, blobClient);
+   ClientUtils.setUserArtifactBlobKeys(jobGraph, 
artifactBlobKeys);
+   } catch (IOException e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Could not upload job files.",
+   

[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199182588
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
+   CompletableFuture submissionFuture = 
CompletableFuture.supplyAsync(
+   () -> {
+   log.info("Submitting job graph.");
 
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
+   List jarFileNames = new ArrayList<>(8);
+   List 
artifactFileNames = new ArrayList<>(8);
+   Collection filesToUpload = new 
ArrayList<>(8);
 
+   // TODO: need configurable location
+   final java.nio.file.Path jobGraphFile;
try {
-   
ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, 
flinkConfig));
-   } catch (Exception e) {
-   throw new CompletionException(e);
+   jobGraphFile = 
Files.createTempFile("flink-jobgraph", ".bin");
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream 
objectOut = new ObjectOutputStream(fileOut)) {
+   
objectOut.writeObject(jobGraph);
+   }
+   }
+   filesToUpload.add(new 
FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
+   } catch (IOException e) {
+   throw new CompletionException("Failed 
to serialize JobGraph.", e);
}
 
-   return jobGraph;
-   });
-
-   CompletableFuture submissionFuture = 
jobUploadFuture.thenCompose(
-   (JobGraph jobGraphToSubmit) -> {
-   log.info("Submitting job graph.");
+   for (Path jar : jobGraph.getUserJars()) {
+   jarFileNames.add(jar.getName());
+   filesToUpload.add(new 
FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
+   }
 
-   try {
-   return sendRequest(
-   JobSubmitHeaders.getInstance(),
-   new 
JobSubmitRequestBody(jobGraph));
-   } catch (IOException ioe) {
-   throw new CompletionException(new 
FlinkException("Could not create JobSubmitRequestBody.", ioe));
+   for (Map.Entry artifacts : 
jobGraph.getUserArtifacts().entrySet()) {
+   artifactFileNames.add(new 
JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new 
Path(artifacts.getValue().filePath).getName()));
+   filesToUpload.add(new 
FileUpload(Paths.get(artifacts.getValue().filePath), 
RestConstants.CONTENT_TYPE_BINARY));
}
-   });
+
+   final CompletableFuture 
submitFuture = sendRetriableRequest(
--- End diff --

Shall we split up the resource preparation and sending the actual request 
into different steps? Then this `Supplier` would be much simpler:
```
CompletableFuture> 
requestFuture = CompletableFuture.supplyAsync(() -> ..., executorService);
CompletableFuture submissionFuture = 
requestFuture.thenCompose(requestBody -> sendRetriableRequest());
submissionFuture.whenComplete((ignoredA, ignoredB) -> cleanupFiles));
return submisssionFuture;
```


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199198055
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 ---
@@ -71,20 +112,131 @@ public void testSerializationFailureHandling() throws 
Exception {
 
@Test
public void testSuccessfulJobSubmission() throws Exception {
+   final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream objectOut = new 
ObjectOutputStream(fileOut)) {
+   objectOut.writeObject(new JobGraph("testjob"));
+   }
+   }
+
DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+   when(mockGateway.getHostname()).thenReturn("localhost");
+   
when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort()));
when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
 
JobSubmitHandler handler = new JobSubmitHandler(

CompletableFuture.completedFuture("http://localhost:1234;),
mockGatewayRetriever,
RpcUtils.INF_TIMEOUT,
-   Collections.emptyMap());
+   Collections.emptyMap(),
+   TestingUtils.defaultExecutor());
 
-   JobGraph job = new JobGraph("testjob");
-   JobSubmitRequestBody request = new JobSubmitRequestBody(job);
+   JobSubmitRequestBody request = new 
JobSubmitRequestBody(jobGraphFile.getFileName().toString(), 
Collections.emptyList(), Collections.emptyList());
 
-   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance()), mockGateway)
+   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance(), Collections.emptyMap(), 
Collections.emptyMap(), Collections.singleton(jobGraphFile.toFile())), 
mockGateway)
.get();
}
+
+   @Test
+   public void testRejectionOnCountMismatch() throws Exception {
+   final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream objectOut = new 
ObjectOutputStream(fileOut)) {
+   objectOut.writeObject(new JobGraph("testjob"));
+   }
+   }
+   final Path countExceedingFile = 
TEMPORARY_FOLDER.newFile().toPath();
+
+   DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+   when(mockGateway.getHostname()).thenReturn("localhost");
+   
when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort()));
+   when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
--- End diff --

This is duplicate code, can we avoid it?


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199192714
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -34,38 +40,137 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler {
 
+   private static final String FILE_TYPE_GRAPH = "JobGraph";
--- End diff --

Let's call it `FILE_TYPE_JOB_GRAPH`


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199187385
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -34,38 +40,137 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler {
 
+   private static final String FILE_TYPE_GRAPH = "JobGraph";
+   private static final String FILE_TYPE_JAR = "Jar";
+   private static final String FILE_TYPE_ARTIFACT = "Artifact";
+
+   private final Executor executor;
+
public JobSubmitHandler(
CompletableFuture localRestAddress,
GatewayRetriever 
leaderRetriever,
Time timeout,
-   Map headers) {
+   Map headers,
+   Executor executor) {
super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
+   this.executor = executor;
}
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.toPath().getFileName().toString(),
--- End diff --

Alternatively we could also call `File::getName`


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199186445
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 ---
@@ -79,6 +79,7 @@ protected void respondToRequest(ChannelHandlerContext 
ctx, HttpRequest httpReque
response = FutureUtils.completedExceptionally(e);
}
 
+   CompletableFuture processingFinishedFuture = new 
CompletableFuture<>();
response.whenComplete((P resp, Throwable throwable) -> {
--- End diff --

Can't we simply return the result of the `whenComplete` stage here? We 
would just need to change the return type of `respondToRequest` to 
`CompletableFuture`.


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199188036
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -34,38 +40,137 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler {
 
+   private static final String FILE_TYPE_GRAPH = "JobGraph";
+   private static final String FILE_TYPE_JAR = "Jar";
+   private static final String FILE_TYPE_ARTIFACT = "Artifact";
+
+   private final Executor executor;
+
public JobSubmitHandler(
CompletableFuture localRestAddress,
GatewayRetriever 
leaderRetriever,
Time timeout,
-   Map headers) {
+   Map headers,
+   Executor executor) {
super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
+   this.executor = executor;
}
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.toPath().getFileName().toString(),
+   File::toPath
+   ));
+
+   if (uploadedFiles.size() != nameToFile.size()) {
throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   String.format("The number of uploaded files was 
%s than the expected count. Expected: %s Actual %s",
+   uploadedFiles.size() < 
nameToFile.size() ? "lower" : "higher",
+   nameToFile.size(),
+   uploadedFiles.size()),
+   HttpResponseStatus.BAD_REQUEST
+   );
+   }
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_GRAPH, 
nameToFile);
+
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   }, executor);
+
+   Collection jarFiles = 
getJarFilesToUpload(nameToFile, requestBody.jarFileNames);
--- End diff --

In `getPathAndAssertUpload` the parameters are in the other order. Would be 
cool to make it consistent.


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199179398
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
+   CompletableFuture submissionFuture = 
CompletableFuture.supplyAsync(
+   () -> {
+   log.info("Submitting job graph.");
 
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
+   List jarFileNames = new ArrayList<>(8);
+   List 
artifactFileNames = new ArrayList<>(8);
+   Collection filesToUpload = new 
ArrayList<>(8);
 
+   // TODO: need configurable location
+   final java.nio.file.Path jobGraphFile;
try {
-   
ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, 
flinkConfig));
-   } catch (Exception e) {
-   throw new CompletionException(e);
+   jobGraphFile = 
Files.createTempFile("flink-jobgraph", ".bin");
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream 
objectOut = new ObjectOutputStream(fileOut)) {
+   
objectOut.writeObject(jobGraph);
+   }
+   }
+   filesToUpload.add(new 
FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
+   } catch (IOException e) {
+   throw new CompletionException("Failed 
to serialize JobGraph.", e);
--- End diff --

I think the failure message `"Failed to serialize JobGraph` should go to a 
dedicated exception because completion exceptions can be filtered out. `throw 
new CompletionException(new FlinkException("Failed to serialize JobGraph.", e))`


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199094230
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
-   throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   Collection uploadedFiles = request.getUploadedFiles();
--- End diff --

What about `ClientUtils`, should they also accept `File` collections?


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199077598
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
-   throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   Collection uploadedFiles = request.getUploadedFiles();
--- End diff --

So how far do you propose to go? Should `HandlerRequest` also receive a 
`Collection`, or convert the existing `Collection`? Should 
`FileUploads#getUploadedFiles` return a `Collection`?


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r198178787
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -315,42 +315,58 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
+   CompletableFuture submissionFuture = 
CompletableFuture.supplyAsync(
+   () -> {
+   log.info("Submitting job graph.");
+
+   List jarFileNames = new ArrayList<>(8);
+   List 
artifactFileNames = new ArrayList<>(8);
+   Collection filesToUpload = new 
ArrayList<>(8);
 
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
-   final List keys;
+   // TODO: need configurable location
+   final java.nio.file.Path jobGraphFile;
try {
-   log.info("Uploading jar files.");
-   keys = BlobClient.uploadFiles(address, 
flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
-   jobGraph.uploadUserArtifacts(address, 
flinkConfig);
-   } catch (IOException ioe) {
-   throw new CompletionException(new 
FlinkException("Could not upload job files.", ioe));
+   jobGraphFile = 
Files.createTempFile("flink-jobgraph", ".bin");
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream 
objectOut = new ObjectOutputStream(fileOut)) {
+   
objectOut.writeObject(jobGraph);
+   }
+   }
+   filesToUpload.add(new 
FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
+   } catch (IOException e) {
+   throw new RuntimeException("lol", e);
--- End diff --

needs a proper exception


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-26 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r198033040
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
-   throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   Collection uploadedFiles = request.getUploadedFiles();
--- End diff --

I agree that nio `Paths` are more powerful. I'm just wondering whether we 
actually need this flexibility here. Usually it is a good idea to make the 
interface as restrictive as possible and widen it on demand.

Moreover, there is also Flink's `Path` which comes with a bit of different 
semantics. For example, you have the safety net for closing open file streams 
which is also used to interrupt I/O operations which are otherwise not 
interruptible. Mixing the nio paths in there, might give the false impression 
that this also applies to them.

So I'm just saying that we should make this decision consciously and not 
only based on which type is more convenient to be used.


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r197735309
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
-   throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   Collection uploadedFiles = request.getUploadedFiles();
--- End diff --

I was wondering when you might bring that up :)

I'm not a fan of exposing `File`.
* whether it _really is a local file_ shouldn't be relevant to the handler
* nio Paths are more flexible than files; for example if the 
DistributedCache were to return files we wouldn't have to extract zips as we 
could mount it with a `ZipFileSystem` instead
* always having to convert using `File#toPath` is rather tedious


---


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r197493894
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
-   throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.getFileName().toString(),
+   entry -> entry
+   ));
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile);
+
+   Collection jarFiles = new 
ArrayList<>(requestBody.jarFileNames.size());
+   for (String jarFileName : requestBody.jarFileNames) {
+   Path jarFile = getPathAndAssertUpload(jarFileName, 
"Jar", nameToFile);
+   jarFiles.add(new 
org.apache.flink.core.fs.Path(jarFile.toString()));
+   }
+
+   Collection> 
artifacts = new ArrayList<>(requestBody.artifactFileNames.size());
+   for (JobSubmitRequestBody.DistributedCacheFile artifactFileName 
: requestBody.artifactFileNames) {
+   Path artifactFile = 
getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile);
+   artifacts.add(Tuple2.of(artifactFileName.entryName, new 
org.apache.flink.core.fs.Path(artifactFile.toString(;
}
 
-   return gateway.submitJob(jobGraph, timeout)
-   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+   Map 
temporaryHack = artifacts.stream()
+   .collect(Collectors.toMap(
+   tuple -> tuple.f0,
+   // the actual entry definition is mostly 
irrelevant as only the blobkey is accessed
+   // blame whoever wrote the ClientUtils API
+   tuple -> new 
DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false)
+   ));
+
+   // TODO: use executor
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   });
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+   final InetSocketAddress address = new 
InetSocketAddress(gateway.getHostname(), blobServerPort);
+   try (BlobClient blobClient = new BlobClient(address, 
new Configuration())) {
+   Collection jarBlobKeys = 
ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient);
+   ClientUtils.setUserJarBlobKeys(jarBlobKeys, 
jobGraph);
+
+   Collection> 
artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), 
temporaryHack, blobClient);
+   ClientUtils.setUserArtifactBlobKeys(jobGraph, 
artifactBlobKeys);
+   } catch (IOException e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Could not upload job files.",
+   

[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r197492045
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
-   throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.getFileName().toString(),
+   entry -> entry
+   ));
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile);
+
+   Collection jarFiles = new 
ArrayList<>(requestBody.jarFileNames.size());
+   for (String jarFileName : requestBody.jarFileNames) {
+   Path jarFile = getPathAndAssertUpload(jarFileName, 
"Jar", nameToFile);
+   jarFiles.add(new 
org.apache.flink.core.fs.Path(jarFile.toString()));
+   }
+
+   Collection> 
artifacts = new ArrayList<>(requestBody.artifactFileNames.size());
+   for (JobSubmitRequestBody.DistributedCacheFile artifactFileName 
: requestBody.artifactFileNames) {
+   Path artifactFile = 
getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile);
+   artifacts.add(Tuple2.of(artifactFileName.entryName, new 
org.apache.flink.core.fs.Path(artifactFile.toString(;
}
 
-   return gateway.submitJob(jobGraph, timeout)
-   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+   Map 
temporaryHack = artifacts.stream()
+   .collect(Collectors.toMap(
+   tuple -> tuple.f0,
+   // the actual entry definition is mostly 
irrelevant as only the blobkey is accessed
+   // blame whoever wrote the ClientUtils API
+   tuple -> new 
DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false)
+   ));
+
+   // TODO: use executor
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   });
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+   final InetSocketAddress address = new 
InetSocketAddress(gateway.getHostname(), blobServerPort);
+   try (BlobClient blobClient = new BlobClient(address, 
new Configuration())) {
+   Collection jarBlobKeys = 
ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient);
+   ClientUtils.setUserJarBlobKeys(jarBlobKeys, 
jobGraph);
+
+   Collection> 
artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), 
temporaryHack, blobClient);
+   ClientUtils.setUserArtifactBlobKeys(jobGraph, 
artifactBlobKeys);
+   } catch (IOException e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Could not upload job files.",
+   

[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r197487718
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
-   throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.getFileName().toString(),
+   entry -> entry
+   ));
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile);
+
+   Collection jarFiles = new 
ArrayList<>(requestBody.jarFileNames.size());
+   for (String jarFileName : requestBody.jarFileNames) {
+   Path jarFile = getPathAndAssertUpload(jarFileName, 
"Jar", nameToFile);
+   jarFiles.add(new 
org.apache.flink.core.fs.Path(jarFile.toString()));
+   }
+
+   Collection> 
artifacts = new ArrayList<>(requestBody.artifactFileNames.size());
+   for (JobSubmitRequestBody.DistributedCacheFile artifactFileName 
: requestBody.artifactFileNames) {
+   Path artifactFile = 
getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile);
+   artifacts.add(Tuple2.of(artifactFileName.entryName, new 
org.apache.flink.core.fs.Path(artifactFile.toString(;
}
 
-   return gateway.submitJob(jobGraph, timeout)
-   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+   Map 
temporaryHack = artifacts.stream()
+   .collect(Collectors.toMap(
+   tuple -> tuple.f0,
+   // the actual entry definition is mostly 
irrelevant as only the blobkey is accessed
+   // blame whoever wrote the ClientUtils API
+   tuple -> new 
DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false)
+   ));
+
+   // TODO: use executor
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   });
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+   final InetSocketAddress address = new 
InetSocketAddress(gateway.getHostname(), blobServerPort);
+   try (BlobClient blobClient = new BlobClient(address, 
new Configuration())) {
+   Collection jarBlobKeys = 
ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient);
+   ClientUtils.setUserJarBlobKeys(jarBlobKeys, 
jobGraph);
+
+   Collection> 
artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), 
temporaryHack, blobClient);
+   ClientUtils.setUserArtifactBlobKeys(jobGraph, 
artifactBlobKeys);
+   } catch (IOException e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Could not upload job files.",
+   

[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r197477484
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
-   throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.getFileName().toString(),
+   entry -> entry
+   ));
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile);
+
+   Collection jarFiles = new 
ArrayList<>(requestBody.jarFileNames.size());
+   for (String jarFileName : requestBody.jarFileNames) {
+   Path jarFile = getPathAndAssertUpload(jarFileName, 
"Jar", nameToFile);
+   jarFiles.add(new 
org.apache.flink.core.fs.Path(jarFile.toString()));
+   }
+
+   Collection> 
artifacts = new ArrayList<>(requestBody.artifactFileNames.size());
+   for (JobSubmitRequestBody.DistributedCacheFile artifactFileName 
: requestBody.artifactFileNames) {
+   Path artifactFile = 
getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile);
+   artifacts.add(Tuple2.of(artifactFileName.entryName, new 
org.apache.flink.core.fs.Path(artifactFile.toString(;
}
 
-   return gateway.submitJob(jobGraph, timeout)
-   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+   Map 
temporaryHack = artifacts.stream()
+   .collect(Collectors.toMap(
+   tuple -> tuple.f0,
+   // the actual entry definition is mostly 
irrelevant as only the blobkey is accessed
+   // blame whoever wrote the ClientUtils API
+   tuple -> new 
DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false)
+   ));
+
+   // TODO: use executor
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   });
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+   final InetSocketAddress address = new 
InetSocketAddress(gateway.getHostname(), blobServerPort);
+   try (BlobClient blobClient = new BlobClient(address, 
new Configuration())) {
+   Collection jarBlobKeys = 
ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient);
+   ClientUtils.setUserJarBlobKeys(jarBlobKeys, 
jobGraph);
+
+   Collection> 
artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), 
temporaryHack, blobClient);
+   ClientUtils.setUserArtifactBlobKeys(jobGraph, 
artifactBlobKeys);
+   } catch (IOException e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Could not upload job files.",
+   

[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r197461746
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
-   throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.getFileName().toString(),
+   entry -> entry
+   ));
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile);
+
+   Collection jarFiles = new 
ArrayList<>(requestBody.jarFileNames.size());
+   for (String jarFileName : requestBody.jarFileNames) {
+   Path jarFile = getPathAndAssertUpload(jarFileName, 
"Jar", nameToFile);
+   jarFiles.add(new 
org.apache.flink.core.fs.Path(jarFile.toString()));
+   }
+
+   Collection> 
artifacts = new ArrayList<>(requestBody.artifactFileNames.size());
+   for (JobSubmitRequestBody.DistributedCacheFile artifactFileName 
: requestBody.artifactFileNames) {
+   Path artifactFile = 
getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile);
+   artifacts.add(Tuple2.of(artifactFileName.entryName, new 
org.apache.flink.core.fs.Path(artifactFile.toString(;
}
 
-   return gateway.submitJob(jobGraph, timeout)
-   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+   Map 
temporaryHack = artifacts.stream()
+   .collect(Collectors.toMap(
+   tuple -> tuple.f0,
+   // the actual entry definition is mostly 
irrelevant as only the blobkey is accessed
+   // blame whoever wrote the ClientUtils API
+   tuple -> new 
DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false)
+   ));
+
+   // TODO: use executor
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   });
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+   final InetSocketAddress address = new 
InetSocketAddress(gateway.getHostname(), blobServerPort);
+   try (BlobClient blobClient = new BlobClient(address, 
new Configuration())) {
+   Collection jarBlobKeys = 
ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient);
+   ClientUtils.setUserJarBlobKeys(jarBlobKeys, 
jobGraph);
+
+   Collection> 
artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), 
temporaryHack, blobClient);
+   ClientUtils.setUserArtifactBlobKeys(jobGraph, 
artifactBlobKeys);
+   } catch (IOException e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Could not upload job files.",
+   

[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r197456037
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
-   throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.getFileName().toString(),
+   entry -> entry
+   ));
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile);
+
+   Collection jarFiles = new 
ArrayList<>(requestBody.jarFileNames.size());
+   for (String jarFileName : requestBody.jarFileNames) {
+   Path jarFile = getPathAndAssertUpload(jarFileName, 
"Jar", nameToFile);
+   jarFiles.add(new 
org.apache.flink.core.fs.Path(jarFile.toString()));
+   }
+
+   Collection> 
artifacts = new ArrayList<>(requestBody.artifactFileNames.size());
+   for (JobSubmitRequestBody.DistributedCacheFile artifactFileName 
: requestBody.artifactFileNames) {
+   Path artifactFile = 
getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile);
+   artifacts.add(Tuple2.of(artifactFileName.entryName, new 
org.apache.flink.core.fs.Path(artifactFile.toString(;
}
 
-   return gateway.submitJob(jobGraph, timeout)
-   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+   Map 
temporaryHack = artifacts.stream()
+   .collect(Collectors.toMap(
+   tuple -> tuple.f0,
+   // the actual entry definition is mostly 
irrelevant as only the blobkey is accessed
+   // blame whoever wrote the ClientUtils API
+   tuple -> new 
DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false)
+   ));
+
+   // TODO: use executor
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   });
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+   final InetSocketAddress address = new 
InetSocketAddress(gateway.getHostname(), blobServerPort);
+   try (BlobClient blobClient = new BlobClient(address, 
new Configuration())) {
+   Collection jarBlobKeys = 
ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient);
+   ClientUtils.setUserJarBlobKeys(jarBlobKeys, 
jobGraph);
+
+   Collection> 
artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), 
temporaryHack, blobClient);
+   ClientUtils.setUserArtifactBlobKeys(jobGraph, 
artifactBlobKeys);
+   } catch (IOException e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Could not upload job files.",
+