This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b75338e8e794feb37ea8c3cd1d1b8c2f12d58f72
Author: Etienne Chauchot <echauc...@apache.org>
AuthorDate: Fri Sep 8 11:31:18 2023 +0200

    [FLINK-33059][connectors][format][filesystem] Generalize compression test 
to all compression formats
---
 .../flink/api/common/io/FileInputFormat.java       |  4 +++
 .../flink/api/common/io/FileInputFormatTest.java   | 32 ++++++++++++----------
 .../org/apache/flink/testutils/TestFileUtils.java  | 21 ++++++++++++++
 3 files changed, 42 insertions(+), 15 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 3a3ad24c9df..6a2a78e6cc2 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -157,6 +157,10 @@ public abstract class FileInputFormat<OT> extends 
RichInputFormat<OT, FileInputS
         }
     }
 
+    public static Set<String> getSupportedCompressionFormats() {
+        return INFLATER_INPUT_STREAM_FACTORIES.keySet();
+    }
+
     /**
      * Returns the extension of a file name (!= a path).
      *
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
index 8b233c85cca..09b4607c548 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.api.common.io;
 
+import java.util.Set;
+
 import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.ConfigConstants;
@@ -592,31 +594,29 @@ public class FileInputFormatTest {
     //  Unsplittable input files
     // ------------------------------------------------------------------------
 
-    // ---- Tests for .deflate ---------
+    // ---- Tests for compressed files  ---------
 
     /**
      * Create directory with files with .deflate extension and see if it 
creates a split for each
      * file. Each split has to start from the beginning.
      */
     @Test
-    public void testFileInputSplit() {
+    public void testFileInputFormatWithCompression() {
         try {
             String tempFile =
-                    TestFileUtils.createTempFileDirExtension(
-                            temporaryFolder.newFolder(),
-                            ".deflate",
-                            "some",
-                            "stupid",
-                            "meaningless",
-                            "files");
+                    TestFileUtils.createTempTextFileDirCompressionFormats(
+                            temporaryFolder.newFolder());
             final DummyFileInputFormat format = new DummyFileInputFormat();
             format.setFilePath(tempFile);
             format.configure(new Configuration());
             FileInputSplit[] splits = format.createInputSplits(2);
-            Assert.assertEquals(4, splits.length);
+            final Set<String> supportedCompressionFormats =
+                    FileInputFormat.getSupportedCompressionFormats();
+            Assert.assertEquals(supportedCompressionFormats.size(), 
splits.length);
             for (FileInputSplit split : splits) {
                 Assert.assertEquals(
-                        -1L, split.getLength()); // unsplittable deflate files 
have this size as a
+                        FileInputFormat.READ_WHOLE_SPLIT_FLAG,
+                        split.getLength()); // unsplittable compressed files 
have this size as a
                 // flag for "read whole file"
                 Assert.assertEquals(0L, split.getStart()); // always read from 
the beginning.
             }
@@ -630,12 +630,14 @@ public class FileInputFormatTest {
             formatMixed.setFilePath(tempFile);
             formatMixed.configure(new Configuration());
             FileInputSplit[] splitsMixed = formatMixed.createInputSplits(2);
-            Assert.assertEquals(5, splitsMixed.length);
+            Assert.assertEquals(supportedCompressionFormats.size() + 1, 
splitsMixed.length);
             for (FileInputSplit split : splitsMixed) {
-                if (split.getPath().getName().endsWith(".deflate")) {
+                final String extension =
+                        
FileInputFormat.extractFileExtension(split.getPath().getName());
+                if (supportedCompressionFormats.contains(extension)) {
                     Assert.assertEquals(
-                            -1L,
-                            split.getLength()); // unsplittable deflate files 
have this size as a
+                            FileInputFormat.READ_WHOLE_SPLIT_FLAG,
+                            split.getLength()); // unsplittable compressed 
files have this size as a
                     // flag for "read whole file"
                     Assert.assertEquals(0L, split.getStart()); // always read 
from the beginning.
                 } else {
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java 
b/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java
index 71c3825416f..9dedf0cc37c 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java
@@ -26,6 +26,8 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.nio.file.Files;
 
+import org.apache.flink.api.common.io.FileInputFormat;
+
 public class TestFileUtils {
 
     private static final String FILE_PREFIX = "flink_test_";
@@ -136,6 +138,25 @@ public class TestFileUtils {
         return f.toURI().toString();
     }
 
+    public static String createTempTextFileDirCompressionFormats(File tempDir) 
throws IOException {
+        File f = null;
+        do {
+            f = new File(tempDir, randomFileName(FILE_SUFFIX));
+        } while (f.exists());
+        f.mkdirs();
+        f.deleteOnExit();
+
+        for (String extension : 
FileInputFormat.getSupportedCompressionFormats()) {
+            File child = new File(f, randomFileName("." + extension));
+            child.deleteOnExit();
+
+            try (BufferedWriter out = new BufferedWriter(new 
FileWriter(child))) {
+                out.write("random text " + Math.random());
+            }
+        }
+        return f.toURI().toString();
+    }
+
     public static String randomFileName() {
         return randomFileName(FILE_SUFFIX);
     }

Reply via email to