[FLINK-9211][REST] JarRunHandler submits job to Dispatcher via RPC

This closes #5903.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e884a3a4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e884a3a4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e884a3a4

Branch: refs/heads/master
Commit: e884a3a4f6ba738fac66846488f931cf85f2e2fc
Parents: c7d5910
Author: zentol <[email protected]>
Authored: Mon Apr 23 12:35:51 2018 +0200
Committer: zentol <[email protected]>
Committed: Wed May 2 15:18:07 2018 +0200

----------------------------------------------------------------------
 .../webmonitor/WebSubmissionExtension.java      |  23 +---
 .../webmonitor/handlers/JarRunHandler.java      |  65 ++++++++---
 .../handlers/JarRunMessageParameters.java       |  12 +--
 .../webmonitor/handlers/JarRunHandlerTest.java  | 108 +++++++++++++++++++
 .../runtime/webmonitor/WebMonitorUtils.java     |   3 +-
 5 files changed, 171 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e884a3a4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
index bf3bc34..991005c 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
@@ -20,10 +20,8 @@ package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHeaders;
@@ -53,27 +51,15 @@ public class WebSubmissionExtension implements 
WebMonitorExtension {
 
        private final ArrayList<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> webSubmissionHandlers;
 
-       private final RestClusterClient<?> restClusterClient;
-
        public WebSubmissionExtension(
                        Configuration configuration,
                        CompletableFuture<String> restAddressFuture,
-                       GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                       GatewayRetriever<? extends DispatcherGateway> 
leaderRetriever,
                        Map<String, String> responseHeaders,
                        Path jarDir,
                        Executor executor,
                        Time timeout) throws Exception {
 
-               final SettableLeaderRetrievalService 
settableLeaderRetrievalService = new SettableLeaderRetrievalService();
-               restAddressFuture.thenAccept(restAddress -> 
settableLeaderRetrievalService.notifyListener(
-                       restAddress,
-                       HighAvailabilityServices.DEFAULT_LEADER_ID));
-
-               restClusterClient = new RestClusterClient<>(
-                       configuration,
-                       "WebSubmissionHandlers",
-                       settableLeaderRetrievalService);
-
                webSubmissionHandlers = new ArrayList<>(5);
 
                final JarUploadHandler jarUploadHandler = new JarUploadHandler(
@@ -102,8 +88,7 @@ public class WebSubmissionExtension implements 
WebMonitorExtension {
                        JarRunHeaders.getInstance(),
                        jarDir,
                        configuration,
-                       executor,
-                       restClusterClient);
+                       executor);
 
                final JarDeleteHandler jarDeleteHandler = new JarDeleteHandler(
                        restAddressFuture,
@@ -134,8 +119,6 @@ public class WebSubmissionExtension implements 
WebMonitorExtension {
 
        @Override
        public CompletableFuture<Void> closeAsync() {
-               restClusterClient.shutdown();
-
                return CompletableFuture.completedFuture(null);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e884a3a4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 09b7a8b..2e928b0 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -23,27 +23,36 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.PackagedProgramUtils;
 import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
-import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.util.ScalaUtils;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FlinkException;
 
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import akka.actor.AddressFromURIString;
+
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
@@ -57,7 +66,7 @@ import static 
org.apache.flink.shaded.guava18.com.google.common.base.Strings.emp
  * Handler to submit jobs uploaded via the Web UI.
  */
 public class JarRunHandler extends
-               AbstractRestHandler<RestfulGateway, EmptyRequestBody, 
JarRunResponseBody, JarRunMessageParameters> {
+               AbstractRestHandler<DispatcherGateway, EmptyRequestBody, 
JarRunResponseBody, JarRunMessageParameters> {
 
        private final Path jarDir;
 
@@ -65,30 +74,26 @@ public class JarRunHandler extends
 
        private final Executor executor;
 
-       private final RestClusterClient<?> restClusterClient;
-
        public JarRunHandler(
                        final CompletableFuture<String> localRestAddress,
-                       final GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                       final GatewayRetriever<? extends DispatcherGateway> 
leaderRetriever,
                        final Time timeout,
                        final Map<String, String> responseHeaders,
                        final MessageHeaders<EmptyRequestBody, 
JarRunResponseBody, JarRunMessageParameters> messageHeaders,
                        final Path jarDir,
                        final Configuration configuration,
-                       final Executor executor,
-                       final RestClusterClient<?> restClusterClient) {
+                       final Executor executor) {
                super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
 
                this.jarDir = requireNonNull(jarDir);
                this.configuration = requireNonNull(configuration);
                this.executor = requireNonNull(executor);
-               this.restClusterClient = requireNonNull(restClusterClient);
        }
 
        @Override
        protected CompletableFuture<JarRunResponseBody> handleRequest(
                        @Nonnull final HandlerRequest<EmptyRequestBody, 
JarRunMessageParameters> request,
-                       @Nonnull final RestfulGateway gateway) throws 
RestHandlerException {
+                       @Nonnull final DispatcherGateway gateway) throws 
RestHandlerException {
 
                final String pathParameter = 
request.getPathParameter(JarIdPathParameter.class);
                final Path jarFile = jarDir.resolve(pathParameter);
@@ -105,9 +110,32 @@ public class JarRunHandler extends
                        savepointRestoreSettings,
                        parallelism);
 
-               return jobGraphFuture.thenCompose(jobGraph -> restClusterClient
-                       .submitJob(jobGraph)
-                       .thenApply((jobSubmitResponseBody -> new 
JarRunResponseBody(jobGraph.getJobID()))))
+               CompletableFuture<Integer> blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+               CompletableFuture<JobGraph> jarUploadFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+                       final InetSocketAddress address = new 
InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
+                       final List<PermanentBlobKey> keys;
+                       try {
+                               keys = BlobClient.uploadJarFiles(address, 
configuration, jobGraph.getJobID(), jobGraph.getUserJars());
+                       } catch (IOException ioe) {
+                               throw new CompletionException(new 
FlinkException("Could not upload job jar files.", ioe));
+                       }
+
+                       for (PermanentBlobKey key : keys) {
+                               jobGraph.addBlob(key);
+                       }
+
+                       return jobGraph;
+               });
+
+               CompletableFuture<Acknowledge> jobSubmissionFuture = 
jarUploadFuture.thenCompose(jobGraph -> {
+                       // we have to enable queued scheduling because slots 
will be allocated lazily
+                       jobGraph.setAllowQueuedScheduling(true);
+                       return gateway.submitJob(jobGraph, timeout);
+               });
+
+               return jobSubmissionFuture
+                       .thenCombine(jarUploadFuture, (ack, jobGraph) -> new 
JarRunResponseBody(jobGraph.getJobID()))
                        .exceptionally(throwable -> {
                                throw new CompletionException(new 
RestHandlerException(
                                        throwable.getMessage(),
@@ -160,4 +188,15 @@ public class JarRunHandler extends
                        return jobGraph;
                }, executor);
        }
+
+       private static String getDispatcherHost(DispatcherGateway gateway) {
+               String dispatcherAddress = gateway.getAddress();
+               final Optional<String> host = 
ScalaUtils.toJava(AddressFromURIString.parse(dispatcherAddress).host());
+
+               return host.orElseGet(() -> {
+                       // if the dispatcher address does not contain a host 
part, then assume it's running
+                       // on the same machine as the handler
+                       return "localhost";
+               });
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e884a3a4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
index 2d9428c..78267db 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
@@ -31,17 +31,17 @@ import java.util.Collections;
  */
 public class JarRunMessageParameters extends MessageParameters {
 
-       private final JarIdPathParameter jarIdPathParameter = new 
JarIdPathParameter();
+       public final JarIdPathParameter jarIdPathParameter = new 
JarIdPathParameter();
 
-       private final ProgramArgsQueryParameter programArgsQueryParameter = new 
ProgramArgsQueryParameter();
+       public final ProgramArgsQueryParameter programArgsQueryParameter = new 
ProgramArgsQueryParameter();
 
-       private final EntryClassQueryParameter entryClassQueryParameter = new 
EntryClassQueryParameter();
+       public final EntryClassQueryParameter entryClassQueryParameter = new 
EntryClassQueryParameter();
 
-       private final ParallelismQueryParameter parallelismQueryParameter = new 
ParallelismQueryParameter();
+       public final ParallelismQueryParameter parallelismQueryParameter = new 
ParallelismQueryParameter();
 
-       private final AllowNonRestoredStateQueryParameter 
allowNonRestoredStateQueryParameter = new AllowNonRestoredStateQueryParameter();
+       public final AllowNonRestoredStateQueryParameter 
allowNonRestoredStateQueryParameter = new AllowNonRestoredStateQueryParameter();
 
-       private final SavepointPathQueryParameter savepointPathQueryParameter = 
new SavepointPathQueryParameter();
+       public final SavepointPathQueryParameter savepointPathQueryParameter = 
new SavepointPathQueryParameter();
 
        @Override
        public Collection<MessagePathParameter<?>> getPathParameters() {

http://git-wip-us.apache.org/repos/asf/flink/blob/e884a3a4/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
new file mode 100644
index 0000000..aefe4f1
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.RestClientConfiguration;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.util.RestClientException;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Optional;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link JarRunHandler}.
+ */
+public class JarRunHandlerTest {
+
+       @ClassRule
+       public static final TemporaryFolder TMP = new TemporaryFolder();
+
+       @Test
+       public void testRunJar() throws Exception {
+               Path uploadDir = TMP.newFolder().toPath();
+
+               Path actualUploadDir = uploadDir.resolve("flink-web-upload");
+               Files.createDirectory(actualUploadDir);
+
+               Path emptyJar = actualUploadDir.resolve("empty.jar");
+               Files.createFile(emptyJar);
+
+               Configuration config = new Configuration();
+               config.setString(WebOptions.UPLOAD_DIR, uploadDir.toString());
+
+               MiniClusterResource clusterResource = new MiniClusterResource(
+                       new 
MiniClusterResource.MiniClusterResourceConfiguration(
+                               config,
+                               1,
+                               1
+                       ),
+                       MiniClusterResource.MiniClusterType.NEW
+               );
+               clusterResource.before();
+
+               try {
+                       Configuration clientConfig = 
clusterResource.getClientConfiguration();
+                       RestClient client = new 
RestClient(RestClientConfiguration.fromConfiguration(clientConfig), 
TestingUtils.defaultExecutor());
+
+                       try {
+                               JarRunHeaders headers = 
JarRunHeaders.getInstance();
+                               JarRunMessageParameters parameters = 
headers.getUnresolvedMessageParameters();
+                               
parameters.jarIdPathParameter.resolve(emptyJar.getFileName().toString());
+
+                               String host = 
clientConfig.getString(RestOptions.ADDRESS);
+                               int port = 
clientConfig.getInteger(RestOptions.PORT);
+
+                               try {
+                                       client.sendRequest(host, port, headers, 
parameters, EmptyRequestBody.getInstance())
+                                               .get();
+                               } catch (Exception e) {
+                                       Optional<RestClientException> expected 
= ExceptionUtils.findThrowable(e, RestClientException.class);
+                                       if (expected.isPresent()) {
+                                               // implies the job was actually 
submitted
+                                               
assertTrue(expected.get().getMessage().contains("ProgramInvocationException"));
+                                               // implies the jar was 
registered for the job graph (otherwise the jar name would not occur in the 
exception)
+                                               // implies the jar was uploaded 
(otherwise the file would not be found at all)
+                                               
assertTrue(expected.get().getMessage().contains("empty.jar'. zip file is 
empty"));
+                                       } else {
+                                               throw e;
+                                       }
+                               }
+                       } finally {
+                               client.shutdown(Time.milliseconds(10));
+                       }
+               } finally {
+                       clusterResource.after();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e884a3a4/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 24ecf0c..4b27534 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -219,7 +220,7 @@ public final class WebMonitorUtils {
         * @throws FlinkException if the web submission extension could not be 
loaded
         */
        public static WebMonitorExtension loadWebSubmissionExtension(
-                       GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                       GatewayRetriever<? extends DispatcherGateway> 
leaderRetriever,
                        CompletableFuture<String> restAddressFuture,
                        Time timeout,
                        Map<String, String> responseHeaders,

Reply via email to