This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.20 by this push:
     new 1352bed47b9 [FLINK-37266][python] Fix the issue that Python 
dependencies options doesn't work for PythonDriver
1352bed47b9 is described below

commit 1352bed47b96eb9c796fbbcc49d2c3b74129b628
Author: Dian Fu <[email protected]>
AuthorDate: Thu Oct 30 21:06:52 2025 +0800

    [FLINK-37266][python] Fix the issue that Python dependencies options 
doesn't work for PythonDriver
    
    This closes #27176.
---
 .../apache/flink/client/python/PythonDriver.java   |  2 +
 .../flink/client/python/PythonDriverOptions.java   | 11 +++++
 .../python/PythonDriverOptionsParserFactory.java   | 22 +++++++++-
 .../PythonDriverOptionsParserFactoryTest.java      | 48 ++++++++++++++++++++++
 .../flink/client/python/PythonDriverTest.java      |  8 +++-
 5 files changed, 88 insertions(+), 3 deletions(-)

diff --git 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
index 005fb04af0e..d60ec563300 100644
--- 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
+++ 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
@@ -71,6 +71,8 @@ public final class PythonDriver {
         // also get its configuration from batch environments.
         Configuration config = 
ExecutionEnvironment.getExecutionEnvironment().getConfiguration();
 
+        config.addAll(pythonDriverOptions.getPythonDependencyConfig());
+
         // start gateway server
         GatewayServer gatewayServer = PythonEnvUtils.startGatewayServer();
         PythonEnvUtils.setGatewayServer(gatewayServer);
diff --git 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonDriverOptions.java
 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonDriverOptions.java
index 792af77b421..6ce5456471a 100644
--- 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonDriverOptions.java
+++ 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonDriverOptions.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.client.python;
 
+import org.apache.flink.configuration.Configuration;
+
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
@@ -29,12 +31,19 @@ import static java.util.Objects.requireNonNull;
 /** Options for the {@link PythonDriver}. */
 final class PythonDriverOptions {
 
+    private final Configuration pythonDependencyConfig;
+
     @Nullable private final String entryPointModule;
 
     @Nullable private final String entryPointScript;
 
     @Nonnull private final List<String> programArgs;
 
+    @Nonnull
+    Configuration getPythonDependencyConfig() {
+        return pythonDependencyConfig;
+    }
+
     @Nullable
     String getEntryPointModule() {
         return entryPointModule;
@@ -50,9 +59,11 @@ final class PythonDriverOptions {
     }
 
     PythonDriverOptions(
+            @Nonnull Configuration pythonDependencyConfig,
             @Nullable String entryPointModule,
             @Nullable String entryPointScript,
             List<String> programArgs) {
+        this.pythonDependencyConfig = requireNonNull(pythonDependencyConfig);
         this.entryPointModule = entryPointModule;
         this.entryPointScript = entryPointScript;
         this.programArgs = requireNonNull(programArgs, "programArgs");
diff --git 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonDriverOptionsParserFactory.java
 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonDriverOptionsParserFactory.java
index ded5577642a..f3d5b0bd61d 100644
--- 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonDriverOptionsParserFactory.java
+++ 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonDriverOptionsParserFactory.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.client.python;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.util.PythonDependencyUtils;
 import org.apache.flink.runtime.entrypoint.FlinkParseException;
 import org.apache.flink.runtime.entrypoint.parser.ParserResultFactory;
 
@@ -26,7 +28,13 @@ import org.apache.commons.cli.Options;
 
 import javax.annotation.Nonnull;
 
+import static org.apache.flink.client.cli.CliFrontendParser.PYARCHIVE_OPTION;
+import static 
org.apache.flink.client.cli.CliFrontendParser.PYCLIENTEXEC_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.PYEXEC_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.PYFILES_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.PYMODULE_OPTION;
+import static 
org.apache.flink.client.cli.CliFrontendParser.PYREQUIREMENTS_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.PYTHON_PATH;
 import static org.apache.flink.client.cli.CliFrontendParser.PY_OPTION;
 
 /**
@@ -40,6 +48,12 @@ final class PythonDriverOptionsParserFactory implements 
ParserResultFactory<Pyth
         final Options options = new Options();
         options.addOption(PY_OPTION);
         options.addOption(PYMODULE_OPTION);
+        options.addOption(PYFILES_OPTION);
+        options.addOption(PYREQUIREMENTS_OPTION);
+        options.addOption(PYARCHIVE_OPTION);
+        options.addOption(PYEXEC_OPTION);
+        options.addOption(PYCLIENTEXEC_OPTION);
+        options.addOption(PYTHON_PATH);
         return options;
     }
 
@@ -67,7 +81,13 @@ final class PythonDriverOptionsParserFactory implements 
ParserResultFactory<Pyth
                     "The Python entry point has not been specified. It can be 
specified with options -py or -pym");
         }
 
+        Configuration pythonDependencyConfig =
+                
PythonDependencyUtils.parsePythonDependencyConfiguration(commandLine);
+
         return new PythonDriverOptions(
-                entryPointModule, entryPointScript, commandLine.getArgList());
+                pythonDependencyConfig,
+                entryPointModule,
+                entryPointScript,
+                commandLine.getArgList());
     }
 }
diff --git 
a/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverOptionsParserFactoryTest.java
 
b/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverOptionsParserFactoryTest.java
index 939cbbb894f..180879f68d7 100644
--- 
a/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverOptionsParserFactoryTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverOptionsParserFactoryTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.python;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.entrypoint.FlinkParseException;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
 
@@ -25,8 +26,15 @@ import org.junit.jupiter.api.Test;
 
 import java.util.List;
 
+import static org.apache.flink.python.PythonOptions.PYTHON_ARCHIVES;
+import static org.apache.flink.python.PythonOptions.PYTHON_CLIENT_EXECUTABLE;
+import static org.apache.flink.python.PythonOptions.PYTHON_EXECUTABLE;
+import static org.apache.flink.python.PythonOptions.PYTHON_FILES;
+import static org.apache.flink.python.PythonOptions.PYTHON_PATH;
+import static org.apache.flink.python.PythonOptions.PYTHON_REQUIREMENTS;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Tests for the {@link PythonDriverOptionsParserFactory}. */
 class PythonDriverOptionsParserFactoryTest {
@@ -66,6 +74,46 @@ class PythonDriverOptionsParserFactoryTest {
                 .isInstanceOf(FlinkParseException.class);
     }
 
+    @Test
+    void testPythonDependencies() throws FlinkParseException {
+        final String[] args = {
+            "--python",
+            "xxx.py",
+            "-pyfs",
+            "dep1.zip,dep2.zip",
+            "-pyreq",
+            "requirements.txt",
+            "-pyarch",
+            "venv.zip",
+            "-pyexec",
+            "python3.9",
+            "-pyclientexec",
+            "python3.9",
+            "--pyPythonPath",
+            "/python/lib64/python3.9",
+            "--input",
+            "in.txt",
+        };
+
+        final PythonDriverOptions pythonCommandOptions = 
commandLineParser.parse(args);
+
+        assertTrue(pythonCommandOptions.getEntryPointScript().isPresent());
+        
assertThat(pythonCommandOptions.getEntryPointScript().get()).isEqualTo("xxx.py");
+
+        // verify the python program arguments
+        final List<String> programArgs = pythonCommandOptions.getProgramArgs();
+        assertThat(programArgs).containsExactly("--input", "in.txt");
+
+        // verify python dependencies
+        Configuration configuration = 
pythonCommandOptions.getPythonDependencyConfig();
+        
assertThat(configuration.get(PYTHON_FILES)).isEqualTo("dep1.zip,dep2.zip");
+        
assertThat(configuration.get(PYTHON_REQUIREMENTS)).isEqualTo("requirements.txt");
+        assertThat(configuration.get(PYTHON_ARCHIVES)).isEqualTo("venv.zip");
+        
assertThat(configuration.get(PYTHON_EXECUTABLE)).isEqualTo("python3.9");
+        
assertThat(configuration.get(PYTHON_CLIENT_EXECUTABLE)).isEqualTo("python3.9");
+        
assertThat(configuration.get(PYTHON_PATH)).isEqualTo("/python/lib64/python3.9");
+    }
+
     private void verifyPythonDriverOptionsParsing(final String[] args) throws 
FlinkParseException {
         final PythonDriverOptions pythonCommandOptions = 
commandLineParser.parse(args);
 
diff --git 
a/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java
 
b/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java
index d8b174e7f3b..6aebd7cc8ec 100644
--- 
a/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.client.python;
 
+import org.apache.flink.configuration.Configuration;
+
 import org.junit.jupiter.api.Test;
 import py4j.GatewayServer;
 
@@ -50,7 +52,8 @@ class PythonDriverTest {
         args.add("--input");
         args.add("in.txt");
 
-        PythonDriverOptions pythonDriverOptions = new 
PythonDriverOptions("xxx", null, args);
+        PythonDriverOptions pythonDriverOptions =
+                new PythonDriverOptions(new Configuration(), "xxx", null, 
args);
         List<String> commands = 
PythonDriver.constructPythonCommands(pythonDriverOptions);
         // verify the generated commands
         assertThat(commands).containsExactly("-u", "-m", "xxx", "--input", 
"in.txt");
@@ -62,7 +65,8 @@ class PythonDriverTest {
         args.add("--input");
         args.add("in.txt");
 
-        PythonDriverOptions pythonDriverOptions = new 
PythonDriverOptions(null, "xxx.py", args);
+        PythonDriverOptions pythonDriverOptions =
+                new PythonDriverOptions(new Configuration(), null, "xxx.py", 
args);
         List<String> commands = 
PythonDriver.constructPythonCommands(pythonDriverOptions);
         assertThat(commands).containsExactly("-u", "xxx.py", "--input", 
"in.txt");
     }

Reply via email to