[FLINK-9211][REST] JarRunHandler submits job to Dispatcher via RPC This closes #5903.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e884a3a4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e884a3a4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e884a3a4 Branch: refs/heads/master Commit: e884a3a4f6ba738fac66846488f931cf85f2e2fc Parents: c7d5910 Author: zentol <[email protected]> Authored: Mon Apr 23 12:35:51 2018 +0200 Committer: zentol <[email protected]> Committed: Wed May 2 15:18:07 2018 +0200 ---------------------------------------------------------------------- .../webmonitor/WebSubmissionExtension.java | 23 +--- .../webmonitor/handlers/JarRunHandler.java | 65 ++++++++--- .../handlers/JarRunMessageParameters.java | 12 +-- .../webmonitor/handlers/JarRunHandlerTest.java | 108 +++++++++++++++++++ .../runtime/webmonitor/WebMonitorUtils.java | 3 +- 5 files changed, 171 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e884a3a4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java index bf3bc34..991005c 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java @@ -20,10 +20,8 @@ package org.apache.flink.runtime.webmonitor; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler; import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHeaders; @@ -53,27 +51,15 @@ public class WebSubmissionExtension implements WebMonitorExtension { private final ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> webSubmissionHandlers; - private final RestClusterClient<?> restClusterClient; - public WebSubmissionExtension( Configuration configuration, CompletableFuture<String> restAddressFuture, - GatewayRetriever<? extends RestfulGateway> leaderRetriever, + GatewayRetriever<? extends DispatcherGateway> leaderRetriever, Map<String, String> responseHeaders, Path jarDir, Executor executor, Time timeout) throws Exception { - final SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService(); - restAddressFuture.thenAccept(restAddress -> settableLeaderRetrievalService.notifyListener( - restAddress, - HighAvailabilityServices.DEFAULT_LEADER_ID)); - - restClusterClient = new RestClusterClient<>( - configuration, - "WebSubmissionHandlers", - settableLeaderRetrievalService); - webSubmissionHandlers = new ArrayList<>(5); final JarUploadHandler jarUploadHandler = new JarUploadHandler( @@ -102,8 +88,7 @@ public class WebSubmissionExtension implements WebMonitorExtension { JarRunHeaders.getInstance(), jarDir, configuration, - executor, - restClusterClient); + executor); final JarDeleteHandler jarDeleteHandler = new JarDeleteHandler( restAddressFuture, @@ -134,8 +119,6 @@ public class WebSubmissionExtension implements WebMonitorExtension { @Override public CompletableFuture<Void> closeAsync() { - restClusterClient.shutdown(); - return CompletableFuture.completedFuture(null); } http://git-wip-us.apache.org/repos/asf/flink/blob/e884a3a4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java index 09b7a8b..2e928b0 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java @@ -23,27 +23,36 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.PackagedProgramUtils; import org.apache.flink.client.program.ProgramInvocationException; -import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rest.handler.AbstractRestHandler; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.MessageHeaders; -import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.util.ScalaUtils; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.FlinkException; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import akka.actor.AddressFromURIString; + import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.IOException; +import java.net.InetSocketAddress; import java.nio.file.Files; import java.nio.file.Path; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; @@ -57,7 +66,7 @@ import static org.apache.flink.shaded.guava18.com.google.common.base.Strings.emp * Handler to submit jobs uploaded via the Web UI. */ public class JarRunHandler extends - AbstractRestHandler<RestfulGateway, EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> { + AbstractRestHandler<DispatcherGateway, EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> { private final Path jarDir; @@ -65,30 +74,26 @@ public class JarRunHandler extends private final Executor executor; - private final RestClusterClient<?> restClusterClient; - public JarRunHandler( final CompletableFuture<String> localRestAddress, - final GatewayRetriever<? extends RestfulGateway> leaderRetriever, + final GatewayRetriever<? extends DispatcherGateway> leaderRetriever, final Time timeout, final Map<String, String> responseHeaders, final MessageHeaders<EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> messageHeaders, final Path jarDir, final Configuration configuration, - final Executor executor, - final RestClusterClient<?> restClusterClient) { + final Executor executor) { super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders); this.jarDir = requireNonNull(jarDir); this.configuration = requireNonNull(configuration); this.executor = requireNonNull(executor); - this.restClusterClient = requireNonNull(restClusterClient); } @Override protected CompletableFuture<JarRunResponseBody> handleRequest( @Nonnull final HandlerRequest<EmptyRequestBody, JarRunMessageParameters> request, - @Nonnull final RestfulGateway gateway) throws RestHandlerException { + @Nonnull final DispatcherGateway gateway) throws RestHandlerException { final String pathParameter = request.getPathParameter(JarIdPathParameter.class); final Path jarFile = jarDir.resolve(pathParameter); @@ -105,9 +110,32 @@ public class JarRunHandler extends savepointRestoreSettings, parallelism); - return jobGraphFuture.thenCompose(jobGraph -> restClusterClient - .submitJob(jobGraph) - .thenApply((jobSubmitResponseBody -> new JarRunResponseBody(jobGraph.getJobID())))) + CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout); + + CompletableFuture<JobGraph> jarUploadFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> { + final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort); + final List<PermanentBlobKey> keys; + try { + keys = BlobClient.uploadJarFiles(address, configuration, jobGraph.getJobID(), jobGraph.getUserJars()); + } catch (IOException ioe) { + throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe)); + } + + for (PermanentBlobKey key : keys) { + jobGraph.addBlob(key); + } + + return jobGraph; + }); + + CompletableFuture<Acknowledge> jobSubmissionFuture = jarUploadFuture.thenCompose(jobGraph -> { + // we have to enable queued scheduling because slots will be allocated lazily + jobGraph.setAllowQueuedScheduling(true); + return gateway.submitJob(jobGraph, timeout); + }); + + return jobSubmissionFuture + .thenCombine(jarUploadFuture, (ack, jobGraph) -> new JarRunResponseBody(jobGraph.getJobID())) .exceptionally(throwable -> { throw new CompletionException(new RestHandlerException( throwable.getMessage(), @@ -160,4 +188,15 @@ public class JarRunHandler extends return jobGraph; }, executor); } + + private static String getDispatcherHost(DispatcherGateway gateway) { + String dispatcherAddress = gateway.getAddress(); + final Optional<String> host = ScalaUtils.toJava(AddressFromURIString.parse(dispatcherAddress).host()); + + return host.orElseGet(() -> { + // if the dispatcher address does not contain a host part, then assume it's running + // on the same machine as the handler + return "localhost"; + }); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/e884a3a4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java index 2d9428c..78267db 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java @@ -31,17 +31,17 @@ import java.util.Collections; */ public class JarRunMessageParameters extends MessageParameters { - private final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter(); + public final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter(); - private final ProgramArgsQueryParameter programArgsQueryParameter = new ProgramArgsQueryParameter(); + public final ProgramArgsQueryParameter programArgsQueryParameter = new ProgramArgsQueryParameter(); - private final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter(); + public final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter(); - private final ParallelismQueryParameter parallelismQueryParameter = new ParallelismQueryParameter(); + public final ParallelismQueryParameter parallelismQueryParameter = new ParallelismQueryParameter(); - private final AllowNonRestoredStateQueryParameter allowNonRestoredStateQueryParameter = new AllowNonRestoredStateQueryParameter(); + public final AllowNonRestoredStateQueryParameter allowNonRestoredStateQueryParameter = new AllowNonRestoredStateQueryParameter(); - private final SavepointPathQueryParameter savepointPathQueryParameter = new SavepointPathQueryParameter(); + public final SavepointPathQueryParameter savepointPathQueryParameter = new SavepointPathQueryParameter(); @Override public Collection<MessagePathParameter<?>> getPathParameters() { http://git-wip-us.apache.org/repos/asf/flink/blob/e884a3a4/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java new file mode 100644 index 0000000..aefe4f1 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.RestClientConfiguration; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.util.RestClientException; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; + +import static org.junit.Assert.assertTrue; + +/** + * Tests for the {@link JarRunHandler}. + */ +public class JarRunHandlerTest { + + @ClassRule + public static final TemporaryFolder TMP = new TemporaryFolder(); + + @Test + public void testRunJar() throws Exception { + Path uploadDir = TMP.newFolder().toPath(); + + Path actualUploadDir = uploadDir.resolve("flink-web-upload"); + Files.createDirectory(actualUploadDir); + + Path emptyJar = actualUploadDir.resolve("empty.jar"); + Files.createFile(emptyJar); + + Configuration config = new Configuration(); + config.setString(WebOptions.UPLOAD_DIR, uploadDir.toString()); + + MiniClusterResource clusterResource = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + config, + 1, + 1 + ), + MiniClusterResource.MiniClusterType.NEW + ); + clusterResource.before(); + + try { + Configuration clientConfig = clusterResource.getClientConfiguration(); + RestClient client = new RestClient(RestClientConfiguration.fromConfiguration(clientConfig), TestingUtils.defaultExecutor()); + + try { + JarRunHeaders headers = JarRunHeaders.getInstance(); + JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters(); + parameters.jarIdPathParameter.resolve(emptyJar.getFileName().toString()); + + String host = clientConfig.getString(RestOptions.ADDRESS); + int port = clientConfig.getInteger(RestOptions.PORT); + + try { + client.sendRequest(host, port, headers, parameters, EmptyRequestBody.getInstance()) + .get(); + } catch (Exception e) { + Optional<RestClientException> expected = ExceptionUtils.findThrowable(e, RestClientException.class); + if (expected.isPresent()) { + // implies the job was actually submitted + assertTrue(expected.get().getMessage().contains("ProgramInvocationException")); + // implies the jar was registered for the job graph (otherwise the jar name would not occur in the exception) + // implies the jar was uploaded (otherwise the file would not be found at all) + assertTrue(expected.get().getMessage().contains("empty.jar'. zip file is empty")); + } else { + throw e; + } + } + } finally { + client.shutdown(Time.milliseconds(10)); + } + } finally { + clusterResource.after(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e884a3a4/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java index 24ecf0c..4b27534 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.WebOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; @@ -219,7 +220,7 @@ public final class WebMonitorUtils { * @throws FlinkException if the web submission extension could not be loaded */ public static WebMonitorExtension loadWebSubmissionExtension( - GatewayRetriever<? extends RestfulGateway> leaderRetriever, + GatewayRetriever<? extends DispatcherGateway> leaderRetriever, CompletableFuture<String> restAddressFuture, Time timeout, Map<String, String> responseHeaders,
