This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d0ce5349fdf [FLINK-34580][rest] Do not erase "pipeline.classpaths" 
config during REST job deploy
d0ce5349fdf is described below

commit d0ce5349fdf1a611518eba20a169c475ee0b46c5
Author: Ferenc Csaky <ferenc.cs...@pm.me>
AuthorDate: Tue Mar 5 13:59:57 2024 +0100

    [FLINK-34580][rest] Do not erase "pipeline.classpaths" config during REST 
job deploy
---
 .../flink/client/program/PackagedProgram.java      |   6 +
 .../webmonitor/handlers/utils/JarHandlerUtils.java |  33 +++++-
 .../handlers/utils/JarHandlerUtilsTest.java        | 131 +++++++++++++++------
 3 files changed, 128 insertions(+), 42 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index f7f5df28d1b..f2bdb9a2e3e 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.program;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.configuration.Configuration;
@@ -683,6 +684,11 @@ public class PackagedProgram implements AutoCloseable {
             return this;
         }
 
+        @VisibleForTesting
+        public List<URL> getUserClassPaths() {
+            return userClassPaths;
+        }
+
         public PackagedProgram build() throws ProgramInvocationException {
             if (jarFile == null && entryPointClassName == null) {
                 throw new IllegalArgumentException(
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
index c5e0f492c96..9b3ff11752a 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
@@ -46,6 +46,7 @@ import org.slf4j.Logger;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.net.MalformedURLException;
 import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -182,17 +183,22 @@ public class JarHandlerUtils {
             }
 
             try {
-                return PackagedProgram.newBuilder()
-                        .setJarFile(jarFile.toFile())
-                        .setEntryPointClassName(entryClass)
-                        .setConfiguration(configuration)
-                        .setArguments(programArgs.toArray(new String[0]))
-                        .build();
+                return initPackagedProgramBuilder(configuration).build();
             } catch (final ProgramInvocationException e) {
                 throw new CompletionException(e);
             }
         }
 
+        @VisibleForTesting
+        PackagedProgram.Builder initPackagedProgramBuilder(Configuration 
configuration) {
+            return PackagedProgram.newBuilder()
+                    .setJarFile(jarFile.toFile())
+                    .setEntryPointClassName(entryClass)
+                    .setConfiguration(configuration)
+                    .setUserClassPaths(getClasspaths(configuration))
+                    .setArguments(programArgs.toArray(new String[0]));
+        }
+
         @VisibleForTesting
         String getEntryClass() {
             return entryClass;
@@ -214,6 +220,21 @@ public class JarHandlerUtils {
         }
     }
 
+    private static List<URL> getClasspaths(Configuration configuration) {
+        try {
+            return ConfigUtils.decodeListFromConfig(
+                    configuration, PipelineOptions.CLASSPATHS, URL::new);
+        } catch (MalformedURLException e) {
+            throw new CompletionException(
+                    new RestHandlerException(
+                            String.format(
+                                    "Failed to extract '%s' as URLs. Provided 
value: %s",
+                                    PipelineOptions.CLASSPATHS.key(),
+                                    
configuration.get(PipelineOptions.CLASSPATHS)),
+                            HttpResponseStatus.BAD_REQUEST));
+        }
+    }
+
     /** Parse program arguments in jar run or plan request. */
     private static <R extends JarRequestBody, M extends MessageParameters>
             List<String> getProgramArgs(HandlerRequest<R> request, Logger log)
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtilsTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtilsTest.java
index 53643cabe95..d90ae902b05 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtilsTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtilsTest.java
@@ -19,7 +19,10 @@
 package org.apache.flink.runtime.webmonitor.handlers.utils;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.webmonitor.handlers.JarPlanRequestBody;
@@ -31,18 +34,27 @@ import org.junit.jupiter.api.io.TempDir;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
+import java.net.URL;
 import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link JarHandlerUtils}. */
 class JarHandlerUtilsTest {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(JarHandlerUtilsTest.class);
 
+    @TempDir private Path tempDir;
+
     @Test
     void testTokenizeNonQuoted() {
         final List<String> arguments = 
JarHandlerUtils.tokenizeArguments("--foo bar");
@@ -65,17 +77,14 @@ class JarHandlerUtilsTest {
     }
 
     @Test
-    void testFromRequestDefaults(@TempDir Path tmp) throws 
RestHandlerException {
-        final JarRunMessageParameters parameters =
-                JarRunHeaders.getInstance().getUnresolvedMessageParameters();
-
-        parameters.jarIdPathParameter.resolve("someJar");
+    void testFromRequestDefaults() throws Exception {
+        final JarRunMessageParameters parameters = getDummyMessageParameters();
 
         final HandlerRequest<JarPlanRequestBody> request =
                 HandlerRequest.create(new JarPlanRequestBody(), parameters);
 
         final JarHandlerUtils.JarHandlerContext jarHandlerContext =
-                JarHandlerUtils.JarHandlerContext.fromRequest(request, tmp, 
LOG);
+                JarHandlerUtils.JarHandlerContext.fromRequest(request, 
tempDir, LOG);
         assertThat(jarHandlerContext.getEntryClass()).isNull();
         assertThat(jarHandlerContext.getProgramArgs()).isEmpty();
         assertThat(jarHandlerContext.getParallelism())
@@ -84,25 +93,12 @@ class JarHandlerUtilsTest {
     }
 
     @Test
-    void testFromRequestRequestBody(@TempDir Path tmp) throws 
RestHandlerException {
-        final JarRunMessageParameters parameters =
-                JarRunHeaders.getInstance().getUnresolvedMessageParameters();
-
-        parameters.jarIdPathParameter.resolve("someJar");
-
-        final JarPlanRequestBody requestBody =
-                new JarPlanRequestBody(
-                        "entry-class",
-                        null,
-                        Arrays.asList("arg1", "arg2"),
-                        37,
-                        JobID.generate(),
-                        null);
-        final HandlerRequest<JarPlanRequestBody> request =
-                HandlerRequest.create(requestBody, parameters);
+    void testFromRequestRequestBody() throws Exception {
+        final JarPlanRequestBody requestBody = 
getDummyJarPlanRequestBody("entry-class", 37, null);
+        final HandlerRequest<JarPlanRequestBody> request = 
getDummyRequest(requestBody);
 
         final JarHandlerUtils.JarHandlerContext jarHandlerContext =
-                JarHandlerUtils.JarHandlerContext.fromRequest(request, tmp, 
LOG);
+                JarHandlerUtils.JarHandlerContext.fromRequest(request, 
tempDir, LOG);
         
assertThat(jarHandlerContext.getEntryClass()).isEqualTo(requestBody.getEntryClassName());
         assertThat(jarHandlerContext.getProgramArgs())
                 
.containsExactlyElementsOf(requestBody.getProgramArgumentsList());
@@ -111,28 +107,91 @@ class JarHandlerUtilsTest {
     }
 
     @Test
-    void testFromRequestWithParallelismConfig(@TempDir Path tmp) throws 
RestHandlerException {
+    void testFromRequestWithParallelismConfig() throws Exception {
         final int parallelism = 37;
-        final JarRunMessageParameters parameters =
-                JarRunHeaders.getInstance().getUnresolvedMessageParameters();
-
-        parameters.jarIdPathParameter.resolve("someJar");
-
         final JarPlanRequestBody requestBody =
-                new JarPlanRequestBody(
+                getDummyJarPlanRequestBody(
                         "entry-class",
                         null,
-                        Arrays.asList("arg1", "arg2"),
-                        null,
-                        JobID.generate(),
                         Collections.singletonMap(
                                 CoreOptions.DEFAULT_PARALLELISM.key(),
                                 String.valueOf(parallelism)));
-        final HandlerRequest<JarPlanRequestBody> request =
-                HandlerRequest.create(requestBody, parameters);
+        final HandlerRequest<JarPlanRequestBody> request = 
getDummyRequest(requestBody);
 
         final JarHandlerUtils.JarHandlerContext jarHandlerContext =
-                JarHandlerUtils.JarHandlerContext.fromRequest(request, tmp, 
LOG);
+                JarHandlerUtils.JarHandlerContext.fromRequest(request, 
tempDir, LOG);
         assertThat(jarHandlerContext.getParallelism()).isEqualTo(parallelism);
     }
+
+    @Test
+    void testClasspathsConfigNotErased() throws Exception {
+        final JarPlanRequestBody requestBody =
+                getDummyJarPlanRequestBody(
+                        null,
+                        null,
+                        Collections.singletonMap(
+                                PipelineOptions.CLASSPATHS.key(),
+                                "file:/tmp/some.jar;file:/tmp/another.jar"));
+
+        final HandlerRequest<JarPlanRequestBody> request = 
getDummyRequest(requestBody);
+
+        final JarHandlerUtils.JarHandlerContext jarHandlerContext =
+                JarHandlerUtils.JarHandlerContext.fromRequest(request, 
tempDir, LOG);
+
+        final Configuration originalConfig = 
request.getRequestBody().getFlinkConfiguration();
+        final PackagedProgram.Builder builder =
+                jarHandlerContext.initPackagedProgramBuilder(originalConfig);
+        final List<String> retrievedClasspaths =
+                builder.getUserClassPaths().stream()
+                        .map(URL::toString)
+                        .collect(Collectors.toList());
+
+        
assertThat(retrievedClasspaths).isEqualTo(originalConfig.get(PipelineOptions.CLASSPATHS));
+    }
+
+    @Test
+    void testMalformedClasspathsConfig() throws Exception {
+        final JarPlanRequestBody requestBody =
+                getDummyJarPlanRequestBody(
+                        null,
+                        null,
+                        Collections.singletonMap(
+                                PipelineOptions.CLASSPATHS.key(), 
"invalid|:/jar"));
+        final HandlerRequest<JarPlanRequestBody> request = 
getDummyRequest(requestBody);
+
+        final JarHandlerUtils.JarHandlerContext jarHandlerContext =
+                JarHandlerUtils.JarHandlerContext.fromRequest(request, 
tempDir, LOG);
+
+        final Configuration originalConfig = 
request.getRequestBody().getFlinkConfiguration();
+
+        assertThatThrownBy(() -> 
jarHandlerContext.initPackagedProgramBuilder(originalConfig))
+                .satisfies(anyCauseMatches(RestHandlerException.class, 
"invalid|:/jar"));
+    }
+
+    private HandlerRequest<JarPlanRequestBody> getDummyRequest(
+            @Nullable JarPlanRequestBody requestBody) {
+        return HandlerRequest.create(
+                requestBody == null ? new JarPlanRequestBody() : requestBody,
+                getDummyMessageParameters());
+    }
+
+    private JarRunMessageParameters getDummyMessageParameters() {
+        final JarRunMessageParameters parameters =
+                JarRunHeaders.getInstance().getUnresolvedMessageParameters();
+
+        parameters.jarIdPathParameter.resolve("someJar");
+
+        return parameters;
+    }
+
+    private JarPlanRequestBody getDummyJarPlanRequestBody(
+            String entryClass, Integer parallelism, Map<String, String> 
flinkConfiguration) {
+        return new JarPlanRequestBody(
+                entryClass,
+                null,
+                Arrays.asList("arg1", "arg2"),
+                parallelism,
+                JobID.generate(),
+                flinkConfiguration);
+    }
 }

Reply via email to