This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c38b7e68633 [FLINK-39260][python] Support compression formats other
than '.zip' for config option 'python.files' (#27780)
c38b7e68633 is described below
commit c38b7e6863352c59698fb2897425d36cf911523e
Author: RaoraoXiong <[email protected]>
AuthorDate: Sun Mar 29 18:05:16 2026 +0800
[FLINK-39260][python] Support compression formats other than '.zip' for
config option 'python.files' (#27780)
---
.../org/apache/flink/util/CompressionUtils.java | 35 ++++++++++++++++
.../apache/flink/client/python/PythonEnvUtils.java | 11 +++---
.../env/AbstractPythonEnvironmentManager.java | 15 ++++---
.../flink/python/util/CompressionUtilsTest.java | 46 ++++++++++++++++++++++
4 files changed, 94 insertions(+), 13 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/util/CompressionUtils.java
b/flink-core/src/main/java/org/apache/flink/util/CompressionUtils.java
index 06fb73be955..413226c8427 100644
--- a/flink-core/src/main/java/org/apache/flink/util/CompressionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/CompressionUtils.java
@@ -53,6 +53,41 @@ public class CompressionUtils {
private static final Logger LOG =
LoggerFactory.getLogger(CompressionUtils.class);
+ /**
+ * Checks if the given file name has a supported compressed file extension.
+ *
+ * @param fileName the file name to check
+ * @return true if the file is a compressed file (zip, jar, tar, tar.gz,
tgz)
+ */
+ public static boolean isCompressedFile(String fileName) {
+ String lowerCaseFileName = fileName.toLowerCase();
+ return lowerCaseFileName.endsWith(".zip")
+ || lowerCaseFileName.endsWith(".jar")
+ || lowerCaseFileName.endsWith(".tar")
+ || lowerCaseFileName.endsWith(".tar.gz")
+ || lowerCaseFileName.endsWith(".tgz");
+ }
+
+ /**
+ * Gets the base name of the file without the extension. Handles compound
extensions like
+ * .tar.gz.
+ *
+ * @param fileName the file name to process
+ * @return the base name without extension
+ */
+ public static String getBaseNameWithoutExtension(String fileName) {
+ // .tar.gz is a compound extension and needs special handling to
remove the entire suffix
+ if (fileName.toLowerCase().endsWith(".tar.gz")) {
+ return fileName.substring(0, fileName.length() -
".tar.gz".length());
+ }
+ // For other formats (.zip, .jar, .tar, .tgz, etc.), use lastIndexOf
to handle uniformly
+ int lastDotIndex = fileName.lastIndexOf('.');
+ if (lastDotIndex > 0) {
+ return fileName.substring(0, lastDotIndex);
+ }
+ return fileName;
+ }
+
public static void extractFile(
String srcFilePath, String targetDirPath, String originalFileName)
throws IOException {
if (hasOneOfSuffixes(originalFileName, ".zip", ".jar")) {
diff --git
a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
index a9e151eb425..94c6db2656c 100644
---
a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
+++
b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
@@ -312,14 +312,15 @@ final class PythonEnvUtils {
// add the parent directory of .py file itself to PYTHONPATH
pythonPathList.add(targetPath.getParent().toString());
} else if
(Files.isRegularFile(Paths.get(targetPath.toString()).toRealPath())
- && sourceFileName.endsWith(".zip")) {
- // expand the zip file and add the root directory to PYTHONPATH
- // as not all zip files are importable
+ && CompressionUtils.isCompressedFile(sourceFileName)) {
+ // expand the compressed file and add the root directory to
PYTHONPATH
+ // as not all compressed files are importable
Path targetDirectory =
new Path(
targetPath.getParent(),
- sourceFileName.substring(0,
sourceFileName.lastIndexOf(".")));
- FileUtils.expandDirectory(targetPath, targetDirectory);
+
CompressionUtils.getBaseNameWithoutExtension(sourceFileName));
+ CompressionUtils.extractFile(
+ targetPath.toString(), targetDirectory.toString(),
sourceFileName);
pythonPathList.add(targetDirectory.toString());
} else {
pythonPathList.add(targetPath.toString());
diff --git
a/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java
b/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java
index f895f492e8b..1652e51b4de 100644
---
a/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java
+++
b/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java
@@ -309,20 +309,19 @@ public abstract class AbstractPythonEnvironmentManager
implements PythonEnvironm
// If the python file is file with suffix .py, add the parent
directory to
// PYTHONPATH.
pythonPath = String.join(File.separator, filesDirectory,
distributedCacheFileName);
- } else if (pythonFile.isFile() && originFileName.endsWith(".zip"))
{
- // Expand the zip file and add the root directory to PYTHONPATH
- // as not all zip files are importable
+ } else if (pythonFile.isFile() &&
CompressionUtils.isCompressedFile(originFileName)) {
+ // Expand the compressed file and add the root directory to
PYTHONPATH
+ // as not all compressed files are importable
org.apache.flink.core.fs.Path targetDirectory =
new org.apache.flink.core.fs.Path(
filesDirectory,
String.join(
File.separator,
distributedCacheFileName,
- originFileName.substring(
- 0,
originFileName.lastIndexOf("."))));
- FileUtils.expandDirectory(
- new
org.apache.flink.core.fs.Path(pythonFile.getAbsolutePath()),
- targetDirectory);
+
CompressionUtils.getBaseNameWithoutExtension(
+ originFileName)));
+ CompressionUtils.extractFile(
+ pythonFile.getAbsolutePath(),
targetDirectory.toString(), originFileName);
pythonPath = targetDirectory.toString();
} else {
pythonPath =
diff --git
a/flink-python/src/test/java/org/apache/flink/python/util/CompressionUtilsTest.java
b/flink-python/src/test/java/org/apache/flink/python/util/CompressionUtilsTest.java
index 7edaafb67e7..e53172511a7 100644
---
a/flink-python/src/test/java/org/apache/flink/python/util/CompressionUtilsTest.java
+++
b/flink-python/src/test/java/org/apache/flink/python/util/CompressionUtilsTest.java
@@ -155,4 +155,50 @@ class CompressionUtilsTest {
}
return mode;
}
+
+ @Test
+ void testIsCompressedFile() {
+ // Supported extensions (lowercase)
+ assertThat(CompressionUtils.isCompressedFile("a.zip")).isTrue();
+ assertThat(CompressionUtils.isCompressedFile("a.jar")).isTrue();
+ assertThat(CompressionUtils.isCompressedFile("a.tar")).isTrue();
+ assertThat(CompressionUtils.isCompressedFile("a.tar.gz")).isTrue();
+ assertThat(CompressionUtils.isCompressedFile("a.tgz")).isTrue();
+
+ // Case-insensitive
+ assertThat(CompressionUtils.isCompressedFile("A.ZIP")).isTrue();
+ assertThat(CompressionUtils.isCompressedFile("A.Tar.Gz")).isTrue();
+
+ // With path prefix
+ assertThat(CompressionUtils.isCompressedFile("/tmp/a.zip")).isTrue();
+
+ // Unsupported extensions / no extension
+ assertThat(CompressionUtils.isCompressedFile("a.txt")).isFalse();
+ assertThat(CompressionUtils.isCompressedFile("a.gz")).isFalse();
+ assertThat(CompressionUtils.isCompressedFile("a.rar")).isFalse();
+ assertThat(CompressionUtils.isCompressedFile("archive")).isFalse();
+ }
+
+ @Test
+ void testGetBaseNameWithoutExtension() {
+ // Common extensions
+
assertThat(CompressionUtils.getBaseNameWithoutExtension("a.zip")).isEqualTo("a");
+
assertThat(CompressionUtils.getBaseNameWithoutExtension("a.jar")).isEqualTo("a");
+
assertThat(CompressionUtils.getBaseNameWithoutExtension("a.tar")).isEqualTo("a");
+
assertThat(CompressionUtils.getBaseNameWithoutExtension("a.tgz")).isEqualTo("a");
+
+ // .tar.gz compound extension (case-insensitive)
+
assertThat(CompressionUtils.getBaseNameWithoutExtension("a.tar.gz")).isEqualTo("a");
+
assertThat(CompressionUtils.getBaseNameWithoutExtension("a.TAR.GZ")).isEqualTo("a");
+
+ // Filenames with multiple dots
+
assertThat(CompressionUtils.getBaseNameWithoutExtension("x.y.zip")).isEqualTo("x.y");
+
assertThat(CompressionUtils.getBaseNameWithoutExtension("x.y.tar.gz")).isEqualTo("x.y");
+
+ // No extension / dot-prefixed hidden files
+
assertThat(CompressionUtils.getBaseNameWithoutExtension("archive")).isEqualTo("archive");
+
assertThat(CompressionUtils.getBaseNameWithoutExtension(".hidden")).isEqualTo(".hidden");
+ assertThat(CompressionUtils.getBaseNameWithoutExtension(".hidden.zip"))
+ .isEqualTo(".hidden");
+ }
}