This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push: new 5207fe5 [FLINK-25022][rest] Run jars in separate threads 5207fe5 is described below commit 5207fe560dc6a054beb0eb0a25af009215ca9f23 Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Sat Dec 4 14:24:26 2021 +0100 [FLINK-25022][rest] Run jars in separate threads Use a dedicated thread to run each jar, so that pooled threads can't keep references to user-code (e.g., in a ThreadLocal). --- .../util/concurrent/SeparateThreadExecutor.java | 39 ++++++ .../runtime/webmonitor/WebSubmissionExtension.java | 57 +++++++- .../webmonitor/handlers/JarMessageParameters.java | 4 +- .../runtime/webmonitor/handlers/JarRunHandler.java | 4 +- .../webmonitor/handlers/JarUploadHandler.java | 4 +- .../webmonitor/WebSubmissionExtensionTest.java | 145 +++++++++++++++++++++ 6 files changed, 245 insertions(+), 8 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/concurrent/SeparateThreadExecutor.java b/flink-core/src/main/java/org/apache/flink/util/concurrent/SeparateThreadExecutor.java new file mode 100644 index 0000000..3d1ecba --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/concurrent/SeparateThreadExecutor.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util.concurrent; + +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadFactory; + +/** An {@link Executor} that runs every runnable in a separate thread. */ +public final class SeparateThreadExecutor implements Executor { + private final ThreadFactory threadFactory; + + public SeparateThreadExecutor(ThreadFactory threadFactory) { + this.threadFactory = Preconditions.checkNotNull(threadFactory); + } + + @Override + public void execute(@Nonnull Runnable command) { + threadFactory.newThread(command).start(); + } +} diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java index 3d411c7..0829502 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java @@ -18,8 +18,10 @@ package org.apache.flink.runtime.webmonitor; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.deployment.application.ApplicationRunner; import org.apache.flink.client.deployment.application.DetachedApplicationRunner; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.dispatcher.DispatcherGateway; @@ -36,6 +38,8 @@ import org.apache.flink.runtime.webmonitor.handlers.JarRunHeaders; import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler; import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.SeparateThreadExecutor; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; @@ -45,6 +49,7 @@ import java.util.Collection; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.function.Supplier; /** Container for the web submission handlers. */ public class WebSubmissionExtension implements WebMonitorExtension { @@ -52,6 +57,10 @@ public class WebSubmissionExtension implements WebMonitorExtension { private final ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> webSubmissionHandlers; + // for easier access during testing + private final JarUploadHandler jarUploadHandler; + private final JarRunHandler jarRunHandler; + public WebSubmissionExtension( Configuration configuration, GatewayRetriever<? extends DispatcherGateway> leaderRetriever, @@ -61,10 +70,38 @@ public class WebSubmissionExtension implements WebMonitorExtension { Executor executor, Time timeout) throws Exception { + this( + configuration, + leaderRetriever, + responseHeaders, + localAddressFuture, + jarDir, + executor, + timeout, + () -> new DetachedApplicationRunner(true)); + } + + @VisibleForTesting + WebSubmissionExtension( + Configuration configuration, + GatewayRetriever<? extends DispatcherGateway> leaderRetriever, + Map<String, String> responseHeaders, + CompletableFuture<String> localAddressFuture, + Path jarDir, + Executor executor, + Time timeout, + Supplier<ApplicationRunner> applicationRunnerSupplier) + throws Exception { webSubmissionHandlers = new ArrayList<>(); - final JarUploadHandler jarUploadHandler = + final Executor jarRunExecutor = + new SeparateThreadExecutor( + new ExecutorThreadFactory.Builder() + .setPoolName("flink-jar-runner") + .build()); + + jarUploadHandler = new JarUploadHandler( leaderRetriever, timeout, @@ -84,7 +121,7 @@ public class WebSubmissionExtension implements WebMonitorExtension { configuration, executor); - final JarRunHandler jarRunHandler = + jarRunHandler = new JarRunHandler( leaderRetriever, timeout, @@ -92,8 +129,8 @@ public class WebSubmissionExtension implements WebMonitorExtension { JarRunHeaders.getInstance(), jarDir, configuration, - executor, - () -> new DetachedApplicationRunner(true)); + jarRunExecutor, + applicationRunnerSupplier); final JarDeleteHandler jarDeleteHandler = new JarDeleteHandler( @@ -112,7 +149,7 @@ public class WebSubmissionExtension implements WebMonitorExtension { JarPlanGetHeaders.getInstance(), jarDir, configuration, - executor); + jarRunExecutor); final JarPlanHandler postJarPlanHandler = new JarPlanHandler( @@ -141,4 +178,14 @@ public class WebSubmissionExtension implements WebMonitorExtension { public Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> getHandlers() { return webSubmissionHandlers; } + + @VisibleForTesting + JarUploadHandler getJarUploadHandler() { + return jarUploadHandler; + } + + @VisibleForTesting + JarRunHandler getJarRunHandler() { + return jarRunHandler; + } } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java index 6201d04..280387f 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.rest.messages.MessageParameters; import org.apache.flink.runtime.rest.messages.MessagePathParameter; import org.apache.flink.runtime.rest.messages.MessageQueryParameter; @@ -29,7 +30,8 @@ import java.util.Collections; /** Base class of {@link MessageParameters} for {@link JarRunHandler} and {@link JarPlanHandler}. */ abstract class JarMessageParameters extends MessageParameters { - final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter(); + @VisibleForTesting + public final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter(); final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter(); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java index 0ceb636..b74ee66 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.client.deployment.application.ApplicationRunner; import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor; @@ -82,7 +83,8 @@ public class JarRunHandler } @Override - protected CompletableFuture<JarRunResponseBody> handleRequest( + @VisibleForTesting + public CompletableFuture<JarRunResponseBody> handleRequest( @Nonnull final HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request, @Nonnull final DispatcherGateway gateway) throws RestHandlerException { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java index 3dfe94f..4ac3e89 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.rest.handler.AbstractRestHandler; import org.apache.flink.runtime.rest.handler.HandlerRequest; @@ -68,7 +69,8 @@ public class JarUploadHandler } @Override - protected CompletableFuture<JarUploadResponseBody> handleRequest( + @VisibleForTesting + public CompletableFuture<JarUploadResponseBody> handleRequest( @Nonnull final HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull final RestfulGateway gateway) throws RestHandlerException { diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtensionTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtensionTest.java new file mode 100644 index 0000000..cd5a2e8 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtensionTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.client.deployment.application.ApplicationRunner; +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.webmonitor.handlers.JarIdPathParameter; +import org.apache.flink.runtime.webmonitor.handlers.JarRunHandler; +import org.apache.flink.runtime.webmonitor.handlers.JarRunMessageParameters; +import org.apache.flink.runtime.webmonitor.handlers.JarRunRequestBody; +import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler; +import org.apache.flink.util.concurrent.Executors; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +/** Tests for the {@link WebSubmissionExtension}. */ +public class WebSubmissionExtensionTest { + + private static final String JAR_NAME = "output-test-program.jar"; + + @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void applicationsRunInSeparateThreads() throws Exception { + final Path tempDir = temporaryFolder.getRoot().toPath(); + + final Path uploadDir = Files.createDirectories(tempDir.resolve("uploadDir")); + // create a copy because the upload handler moves uploaded jars (because it assumes it to be + // a temporary file) + final Path jarFile = + Files.copy( + Paths.get(System.getProperty("targetDir")).resolve(JAR_NAME), + tempDir.resolve("app.jar")); + + final DispatcherGateway dispatcherGateway = new TestingDispatcherGateway.Builder().build(); + + final ThreadCapturingApplicationRunner threadCapturingApplicationRunner = + new ThreadCapturingApplicationRunner(); + + final WebSubmissionExtension webSubmissionExtension = + new WebSubmissionExtension( + new Configuration(), + () -> CompletableFuture.completedFuture(dispatcherGateway), + Collections.emptyMap(), + new CompletableFuture<>(), + uploadDir, + Executors.directExecutor(), + Time.of(5, TimeUnit.SECONDS), + () -> threadCapturingApplicationRunner); + + final String jarPath = uploadJar(webSubmissionExtension, jarFile, dispatcherGateway); + final String jarId = Paths.get(jarPath).getFileName().toString(); + + final JarRunHandler jarRunHandler = webSubmissionExtension.getJarRunHandler(); + + final Map<String, String> pathParameters = new HashMap<>(); + pathParameters.put(JarIdPathParameter.KEY, jarId); + final HandlerRequest<JarRunRequestBody, JarRunMessageParameters> runRequest = + new HandlerRequest( + new JarRunRequestBody(), + new JarRunMessageParameters(), + pathParameters, + Collections.emptyMap()); + + // run several applications in sequence, and verify that each thread is unique + int numApplications = 20; + for (int i = 0; i < numApplications; i++) { + jarRunHandler.handleRequest(runRequest, dispatcherGateway).get(); + } + assertEquals(numApplications, threadCapturingApplicationRunner.getThreads().size()); + } + + private static String uploadJar( + WebSubmissionExtension extension, Path jarFile, DispatcherGateway dispatcherGateway) + throws Exception { + final JarUploadHandler jarUploadHandler = extension.getJarUploadHandler(); + + final HandlerRequest<EmptyRequestBody, EmptyMessageParameters> uploadRequest = + new HandlerRequest<>( + EmptyRequestBody.getInstance(), + EmptyMessageParameters.getInstance(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.singletonList(jarFile.toFile())); + + return jarUploadHandler.handleRequest(uploadRequest, dispatcherGateway).get().getFilename(); + } + + private static class ThreadCapturingApplicationRunner implements ApplicationRunner { + + private final Set<Thread> threads = Collections.newSetFromMap(new IdentityHashMap<>()); + + @Override + public List<JobID> run( + DispatcherGateway dispatcherGateway, + PackagedProgram program, + Configuration configuration) { + threads.add(Thread.currentThread()); + return Collections.singletonList(new JobID()); + } + + public Collection<Thread> getThreads() { + return threads; + } + } +}