This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c38b7e68633 [FLINK-39260][python] Support compression formats other 
than '.zip' for config option 'python.files' (#27780)
c38b7e68633 is described below

commit c38b7e6863352c59698fb2897425d36cf911523e
Author: RaoraoXiong <[email protected]>
AuthorDate: Sun Mar 29 18:05:16 2026 +0800

    [FLINK-39260][python] Support compression formats other than '.zip' for 
config option 'python.files' (#27780)
---
 .../org/apache/flink/util/CompressionUtils.java    | 35 ++++++++++++++++
 .../apache/flink/client/python/PythonEnvUtils.java | 11 +++---
 .../env/AbstractPythonEnvironmentManager.java      | 15 ++++---
 .../flink/python/util/CompressionUtilsTest.java    | 46 ++++++++++++++++++++++
 4 files changed, 94 insertions(+), 13 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/util/CompressionUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/CompressionUtils.java
index 06fb73be955..413226c8427 100644
--- a/flink-core/src/main/java/org/apache/flink/util/CompressionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/CompressionUtils.java
@@ -53,6 +53,41 @@ public class CompressionUtils {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CompressionUtils.class);
 
+    /**
+     * Checks if the given file name has a supported compressed file extension.
+     *
+     * @param fileName the file name to check
+     * @return true if the file is a compressed file (zip, jar, tar, tar.gz, 
tgz)
+     */
+    public static boolean isCompressedFile(String fileName) {
+        String lowerCaseFileName = fileName.toLowerCase();
+        return lowerCaseFileName.endsWith(".zip")
+                || lowerCaseFileName.endsWith(".jar")
+                || lowerCaseFileName.endsWith(".tar")
+                || lowerCaseFileName.endsWith(".tar.gz")
+                || lowerCaseFileName.endsWith(".tgz");
+    }
+
+    /**
+     * Gets the base name of the file without the extension. Handles compound 
extensions like
+     * .tar.gz.
+     *
+     * @param fileName the file name to process
+     * @return the base name without extension
+     */
+    public static String getBaseNameWithoutExtension(String fileName) {
+        // .tar.gz is a compound extension and needs special handling to 
remove the entire suffix
+        if (fileName.toLowerCase().endsWith(".tar.gz")) {
+            return fileName.substring(0, fileName.length() - 
".tar.gz".length());
+        }
+        // For other formats (.zip, .jar, .tar, .tgz, etc.), use lastIndexOf 
to handle uniformly
+        int lastDotIndex = fileName.lastIndexOf('.');
+        if (lastDotIndex > 0) {
+            return fileName.substring(0, lastDotIndex);
+        }
+        return fileName;
+    }
+
     public static void extractFile(
             String srcFilePath, String targetDirPath, String originalFileName) 
throws IOException {
         if (hasOneOfSuffixes(originalFileName, ".zip", ".jar")) {
diff --git 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
index a9e151eb425..94c6db2656c 100644
--- 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
+++ 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
@@ -312,14 +312,15 @@ final class PythonEnvUtils {
                 // add the parent directory of .py file itself to PYTHONPATH
                 pythonPathList.add(targetPath.getParent().toString());
             } else if 
(Files.isRegularFile(Paths.get(targetPath.toString()).toRealPath())
-                    && sourceFileName.endsWith(".zip")) {
-                // expand the zip file and add the root directory to PYTHONPATH
-                // as not all zip files are importable
+                    && CompressionUtils.isCompressedFile(sourceFileName)) {
+                // expand the compressed file and add the root directory to 
PYTHONPATH
+                // as not all compressed files are importable
                 Path targetDirectory =
                         new Path(
                                 targetPath.getParent(),
-                                sourceFileName.substring(0, 
sourceFileName.lastIndexOf(".")));
-                FileUtils.expandDirectory(targetPath, targetDirectory);
+                                
CompressionUtils.getBaseNameWithoutExtension(sourceFileName));
+                CompressionUtils.extractFile(
+                        targetPath.toString(), targetDirectory.toString(), 
sourceFileName);
                 pythonPathList.add(targetDirectory.toString());
             } else {
                 pythonPathList.add(targetPath.toString());
diff --git 
a/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java
 
b/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java
index f895f492e8b..1652e51b4de 100644
--- 
a/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java
+++ 
b/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java
@@ -309,20 +309,19 @@ public abstract class AbstractPythonEnvironmentManager 
implements PythonEnvironm
                 // If the python file is file with suffix .py, add the parent 
directory to
                 // PYTHONPATH.
                 pythonPath = String.join(File.separator, filesDirectory, 
distributedCacheFileName);
-            } else if (pythonFile.isFile() && originFileName.endsWith(".zip")) 
{
-                // Expand the zip file and add the root directory to PYTHONPATH
-                // as not all zip files are importable
+            } else if (pythonFile.isFile() && 
CompressionUtils.isCompressedFile(originFileName)) {
+                // Expand the compressed file and add the root directory to 
PYTHONPATH
+                // as not all compressed files are importable
                 org.apache.flink.core.fs.Path targetDirectory =
                         new org.apache.flink.core.fs.Path(
                                 filesDirectory,
                                 String.join(
                                         File.separator,
                                         distributedCacheFileName,
-                                        originFileName.substring(
-                                                0, 
originFileName.lastIndexOf("."))));
-                FileUtils.expandDirectory(
-                        new 
org.apache.flink.core.fs.Path(pythonFile.getAbsolutePath()),
-                        targetDirectory);
+                                        
CompressionUtils.getBaseNameWithoutExtension(
+                                                originFileName)));
+                CompressionUtils.extractFile(
+                        pythonFile.getAbsolutePath(), 
targetDirectory.toString(), originFileName);
                 pythonPath = targetDirectory.toString();
             } else {
                 pythonPath =
diff --git 
a/flink-python/src/test/java/org/apache/flink/python/util/CompressionUtilsTest.java
 
b/flink-python/src/test/java/org/apache/flink/python/util/CompressionUtilsTest.java
index 7edaafb67e7..e53172511a7 100644
--- 
a/flink-python/src/test/java/org/apache/flink/python/util/CompressionUtilsTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/python/util/CompressionUtilsTest.java
@@ -155,4 +155,50 @@ class CompressionUtilsTest {
         }
         return mode;
     }
+
+    @Test
+    void testIsCompressedFile() {
+        // Supported extensions (lowercase)
+        assertThat(CompressionUtils.isCompressedFile("a.zip")).isTrue();
+        assertThat(CompressionUtils.isCompressedFile("a.jar")).isTrue();
+        assertThat(CompressionUtils.isCompressedFile("a.tar")).isTrue();
+        assertThat(CompressionUtils.isCompressedFile("a.tar.gz")).isTrue();
+        assertThat(CompressionUtils.isCompressedFile("a.tgz")).isTrue();
+
+        // Case-insensitive
+        assertThat(CompressionUtils.isCompressedFile("A.ZIP")).isTrue();
+        assertThat(CompressionUtils.isCompressedFile("A.Tar.Gz")).isTrue();
+
+        // With path prefix
+        assertThat(CompressionUtils.isCompressedFile("/tmp/a.zip")).isTrue();
+
+        // Unsupported extensions / no extension
+        assertThat(CompressionUtils.isCompressedFile("a.txt")).isFalse();
+        assertThat(CompressionUtils.isCompressedFile("a.gz")).isFalse();
+        assertThat(CompressionUtils.isCompressedFile("a.rar")).isFalse();
+        assertThat(CompressionUtils.isCompressedFile("archive")).isFalse();
+    }
+
+    @Test
+    void testGetBaseNameWithoutExtension() {
+        // Common extensions
+        
assertThat(CompressionUtils.getBaseNameWithoutExtension("a.zip")).isEqualTo("a");
+        
assertThat(CompressionUtils.getBaseNameWithoutExtension("a.jar")).isEqualTo("a");
+        
assertThat(CompressionUtils.getBaseNameWithoutExtension("a.tar")).isEqualTo("a");
+        
assertThat(CompressionUtils.getBaseNameWithoutExtension("a.tgz")).isEqualTo("a");
+
+        // .tar.gz compound extension (case-insensitive)
+        
assertThat(CompressionUtils.getBaseNameWithoutExtension("a.tar.gz")).isEqualTo("a");
+        
assertThat(CompressionUtils.getBaseNameWithoutExtension("a.TAR.GZ")).isEqualTo("a");
+
+        // Filenames with multiple dots
+        
assertThat(CompressionUtils.getBaseNameWithoutExtension("x.y.zip")).isEqualTo("x.y");
+        
assertThat(CompressionUtils.getBaseNameWithoutExtension("x.y.tar.gz")).isEqualTo("x.y");
+
+        // No extension / dot-prefixed hidden files
+        
assertThat(CompressionUtils.getBaseNameWithoutExtension("archive")).isEqualTo("archive");
+        
assertThat(CompressionUtils.getBaseNameWithoutExtension(".hidden")).isEqualTo(".hidden");
+        assertThat(CompressionUtils.getBaseNameWithoutExtension(".hidden.zip"))
+                .isEqualTo(".hidden");
+    }
 }

Reply via email to