This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new d0ce5349fdf [FLINK-34580][rest] Do not erase "pipeline.classpaths" config during REST job deploy d0ce5349fdf is described below commit d0ce5349fdf1a611518eba20a169c475ee0b46c5 Author: Ferenc Csaky <ferenc.cs...@pm.me> AuthorDate: Tue Mar 5 13:59:57 2024 +0100 [FLINK-34580][rest] Do not erase "pipeline.classpaths" config during REST job deploy --- .../flink/client/program/PackagedProgram.java | 6 + .../webmonitor/handlers/utils/JarHandlerUtils.java | 33 +++++- .../handlers/utils/JarHandlerUtilsTest.java | 131 +++++++++++++++------ 3 files changed, 128 insertions(+), 42 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java index f7f5df28d1b..f2bdb9a2e3e 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java @@ -18,6 +18,7 @@ package org.apache.flink.client.program; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ProgramDescription; import org.apache.flink.client.ClientUtils; import org.apache.flink.configuration.Configuration; @@ -683,6 +684,11 @@ public class PackagedProgram implements AutoCloseable { return this; } + @VisibleForTesting + public List<URL> getUserClassPaths() { + return userClassPaths; + } + public PackagedProgram build() throws ProgramInvocationException { if (jarFile == null && entryPointClassName == null) { throw new IllegalArgumentException( diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java index c5e0f492c96..9b3ff11752a 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java @@ -46,6 +46,7 @@ import org.slf4j.Logger; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.net.MalformedURLException; import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; @@ -182,17 +183,22 @@ public class JarHandlerUtils { } try { - return PackagedProgram.newBuilder() - .setJarFile(jarFile.toFile()) - .setEntryPointClassName(entryClass) - .setConfiguration(configuration) - .setArguments(programArgs.toArray(new String[0])) - .build(); + return initPackagedProgramBuilder(configuration).build(); } catch (final ProgramInvocationException e) { throw new CompletionException(e); } } + @VisibleForTesting + PackagedProgram.Builder initPackagedProgramBuilder(Configuration configuration) { + return PackagedProgram.newBuilder() + .setJarFile(jarFile.toFile()) + .setEntryPointClassName(entryClass) + .setConfiguration(configuration) + .setUserClassPaths(getClasspaths(configuration)) + .setArguments(programArgs.toArray(new String[0])); + } + @VisibleForTesting String getEntryClass() { return entryClass; @@ -214,6 +220,21 @@ public class JarHandlerUtils { } } + private static List<URL> getClasspaths(Configuration configuration) { + try { + return ConfigUtils.decodeListFromConfig( + configuration, PipelineOptions.CLASSPATHS, URL::new); + } catch (MalformedURLException e) { + throw new CompletionException( + new RestHandlerException( + String.format( + "Failed to extract '%s' as URLs. Provided value: %s", + PipelineOptions.CLASSPATHS.key(), + configuration.get(PipelineOptions.CLASSPATHS)), + HttpResponseStatus.BAD_REQUEST)); + } + } + /** Parse program arguments in jar run or plan request. */ private static <R extends JarRequestBody, M extends MessageParameters> List<String> getProgramArgs(HandlerRequest<R> request, Logger log) diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtilsTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtilsTest.java index 53643cabe95..d90ae902b05 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtilsTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtilsTest.java @@ -19,7 +19,10 @@ package org.apache.flink.runtime.webmonitor.handlers.utils; import org.apache.flink.api.common.JobID; +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.webmonitor.handlers.JarPlanRequestBody; @@ -31,18 +34,27 @@ import org.junit.jupiter.api.io.TempDir; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + +import java.net.URL; import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link JarHandlerUtils}. */ class JarHandlerUtilsTest { private static final Logger LOG = LoggerFactory.getLogger(JarHandlerUtilsTest.class); + @TempDir private Path tempDir; + @Test void testTokenizeNonQuoted() { final List<String> arguments = JarHandlerUtils.tokenizeArguments("--foo bar"); @@ -65,17 +77,14 @@ class JarHandlerUtilsTest { } @Test - void testFromRequestDefaults(@TempDir Path tmp) throws RestHandlerException { - final JarRunMessageParameters parameters = - JarRunHeaders.getInstance().getUnresolvedMessageParameters(); - - parameters.jarIdPathParameter.resolve("someJar"); + void testFromRequestDefaults() throws Exception { + final JarRunMessageParameters parameters = getDummyMessageParameters(); final HandlerRequest<JarPlanRequestBody> request = HandlerRequest.create(new JarPlanRequestBody(), parameters); final JarHandlerUtils.JarHandlerContext jarHandlerContext = - JarHandlerUtils.JarHandlerContext.fromRequest(request, tmp, LOG); + JarHandlerUtils.JarHandlerContext.fromRequest(request, tempDir, LOG); assertThat(jarHandlerContext.getEntryClass()).isNull(); assertThat(jarHandlerContext.getProgramArgs()).isEmpty(); assertThat(jarHandlerContext.getParallelism()) @@ -84,25 +93,12 @@ class JarHandlerUtilsTest { } @Test - void testFromRequestRequestBody(@TempDir Path tmp) throws RestHandlerException { - final JarRunMessageParameters parameters = - JarRunHeaders.getInstance().getUnresolvedMessageParameters(); - - parameters.jarIdPathParameter.resolve("someJar"); - - final JarPlanRequestBody requestBody = - new JarPlanRequestBody( - "entry-class", - null, - Arrays.asList("arg1", "arg2"), - 37, - JobID.generate(), - null); - final HandlerRequest<JarPlanRequestBody> request = - HandlerRequest.create(requestBody, parameters); + void testFromRequestRequestBody() throws Exception { + final JarPlanRequestBody requestBody = getDummyJarPlanRequestBody("entry-class", 37, null); + final HandlerRequest<JarPlanRequestBody> request = getDummyRequest(requestBody); final JarHandlerUtils.JarHandlerContext jarHandlerContext = - JarHandlerUtils.JarHandlerContext.fromRequest(request, tmp, LOG); + JarHandlerUtils.JarHandlerContext.fromRequest(request, tempDir, LOG); assertThat(jarHandlerContext.getEntryClass()).isEqualTo(requestBody.getEntryClassName()); assertThat(jarHandlerContext.getProgramArgs()) .containsExactlyElementsOf(requestBody.getProgramArgumentsList()); @@ -111,28 +107,91 @@ class JarHandlerUtilsTest { } @Test - void testFromRequestWithParallelismConfig(@TempDir Path tmp) throws RestHandlerException { + void testFromRequestWithParallelismConfig() throws Exception { final int parallelism = 37; - final JarRunMessageParameters parameters = - JarRunHeaders.getInstance().getUnresolvedMessageParameters(); - - parameters.jarIdPathParameter.resolve("someJar"); - final JarPlanRequestBody requestBody = - new JarPlanRequestBody( + getDummyJarPlanRequestBody( "entry-class", null, - Arrays.asList("arg1", "arg2"), - null, - JobID.generate(), Collections.singletonMap( CoreOptions.DEFAULT_PARALLELISM.key(), String.valueOf(parallelism))); - final HandlerRequest<JarPlanRequestBody> request = - HandlerRequest.create(requestBody, parameters); + final HandlerRequest<JarPlanRequestBody> request = getDummyRequest(requestBody); final JarHandlerUtils.JarHandlerContext jarHandlerContext = - JarHandlerUtils.JarHandlerContext.fromRequest(request, tmp, LOG); + JarHandlerUtils.JarHandlerContext.fromRequest(request, tempDir, LOG); assertThat(jarHandlerContext.getParallelism()).isEqualTo(parallelism); } + + @Test + void testClasspathsConfigNotErased() throws Exception { + final JarPlanRequestBody requestBody = + getDummyJarPlanRequestBody( + null, + null, + Collections.singletonMap( + PipelineOptions.CLASSPATHS.key(), + "file:/tmp/some.jar;file:/tmp/another.jar")); + + final HandlerRequest<JarPlanRequestBody> request = getDummyRequest(requestBody); + + final JarHandlerUtils.JarHandlerContext jarHandlerContext = + JarHandlerUtils.JarHandlerContext.fromRequest(request, tempDir, LOG); + + final Configuration originalConfig = request.getRequestBody().getFlinkConfiguration(); + final PackagedProgram.Builder builder = + jarHandlerContext.initPackagedProgramBuilder(originalConfig); + final List<String> retrievedClasspaths = + builder.getUserClassPaths().stream() + .map(URL::toString) + .collect(Collectors.toList()); + + assertThat(retrievedClasspaths).isEqualTo(originalConfig.get(PipelineOptions.CLASSPATHS)); + } + + @Test + void testMalformedClasspathsConfig() throws Exception { + final JarPlanRequestBody requestBody = + getDummyJarPlanRequestBody( + null, + null, + Collections.singletonMap( + PipelineOptions.CLASSPATHS.key(), "invalid|:/jar")); + final HandlerRequest<JarPlanRequestBody> request = getDummyRequest(requestBody); + + final JarHandlerUtils.JarHandlerContext jarHandlerContext = + JarHandlerUtils.JarHandlerContext.fromRequest(request, tempDir, LOG); + + final Configuration originalConfig = request.getRequestBody().getFlinkConfiguration(); + + assertThatThrownBy(() -> jarHandlerContext.initPackagedProgramBuilder(originalConfig)) + .satisfies(anyCauseMatches(RestHandlerException.class, "invalid|:/jar")); + } + + private HandlerRequest<JarPlanRequestBody> getDummyRequest( + @Nullable JarPlanRequestBody requestBody) { + return HandlerRequest.create( + requestBody == null ? new JarPlanRequestBody() : requestBody, + getDummyMessageParameters()); + } + + private JarRunMessageParameters getDummyMessageParameters() { + final JarRunMessageParameters parameters = + JarRunHeaders.getInstance().getUnresolvedMessageParameters(); + + parameters.jarIdPathParameter.resolve("someJar"); + + return parameters; + } + + private JarPlanRequestBody getDummyJarPlanRequestBody( + String entryClass, Integer parallelism, Map<String, String> flinkConfiguration) { + return new JarPlanRequestBody( + entryClass, + null, + Arrays.asList("arg1", "arg2"), + parallelism, + JobID.generate(), + flinkConfiguration); + } }