This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c55afa064e [core] text formats support compress (#6103)
c55afa064e is described below
commit c55afa064e9ffcdec40ca4a2c1f316cf70649fc1
Author: jerry <[email protected]>
AuthorDate: Sat Aug 23 13:18:13 2025 +0800
[core] text formats support compress (#6103)
---
.../org/apache/paimon/format/CompressionType.java | 137 +++++++++++++
.../apache/paimon/format/FormatReadWriteTest.java | 10 +-
.../org/apache/paimon/io/DataFilePathFactory.java | 30 ++-
.../apache/paimon/flink/BatchFileStoreITCase.java | 20 ++
.../apache/paimon/format/BaseTextFileReader.java | 23 ++-
.../apache/paimon/format/BaseTextFileWriter.java | 35 +++-
.../org/apache/paimon/format/TextCompression.java | 137 +++++++++++++
.../apache/paimon/format/csv/CsvFileFormat.java | 19 +-
.../apache/paimon/format/csv/CsvFileReader.java | 23 ++-
.../apache/paimon/format/csv/CsvFormatWriter.java | 36 ++--
.../apache/paimon/format/csv/CsvReaderFactory.java | 5 +-
.../apache/paimon/format/json/JsonFileFormat.java | 21 +-
.../apache/paimon/format/json/JsonFileReader.java | 16 +-
.../paimon/format/json/JsonFormatWriter.java | 12 +-
.../paimon/format/json/JsonReaderFactory.java | 5 +-
.../apache/paimon/format/BaseCompressionTest.java | 221 +++++++++++++++++++++
.../paimon/format/csv/CsvCompressionTest.java | 54 +++++
.../paimon/format/csv/CsvFileFormatTest.java | 11 +-
.../paimon/format/json/JsonCompressionTest.java | 64 ++++++
.../paimon/format/json/JsonFileFormatTest.java | 11 +-
.../org/apache/paimon/spark/SparkWriteITCase.java | 8 +-
21 files changed, 821 insertions(+), 77 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/CompressionType.java
b/paimon-common/src/main/java/org/apache/paimon/format/CompressionType.java
new file mode 100644
index 0000000000..236db62fb7
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/format/CompressionType.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format;
+
+import org.apache.paimon.options.description.DescribedEnum;
+import org.apache.paimon.options.description.InlineElement;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.paimon.options.description.TextElement.text;
+
+/** Compression types supported by Paimon file formats. */
+public enum CompressionType implements DescribedEnum {
+ NONE("none", "No compression.", null, ""),
+ GZIP(
+ "gzip",
+ "GZIP compression using the deflate algorithm.",
+ "org.apache.hadoop.io.compress.GzipCodec",
+ "gz"),
+ BZIP2(
+ "bzip2",
+ "BZIP2 compression using the Burrows-Wheeler algorithm.",
+ "org.apache.hadoop.io.compress.BZip2Codec",
+ "bz2"),
+ DEFLATE(
+ "deflate",
+ "DEFLATE compression using the deflate algorithm.",
+ "org.apache.hadoop.io.compress.DeflateCodec",
+ "deflate"),
+ SNAPPY(
+ "snappy",
+ "Snappy compression for fast compression and decompression.",
+ "org.apache.hadoop.io.compress.SnappyCodec",
+ "snappy"),
+ LZ4(
+ "lz4",
+ "LZ4 compression for very fast compression and decompression.",
+ "org.apache.hadoop.io.compress.Lz4Codec",
+ "lz4"),
+ ZSTD(
+ "zstd",
+ "Zstandard compression for high compression ratio and speed.",
+ "org.apache.hadoop.io.compress.ZStandardCodec",
+ "zst");
+
+ private final String value;
+ private final String className;
+ private final String fileExtension;
+ private final String description;
+
+ private static final Set<String> SUPPORTED_EXTENSIONS;
+
+ static {
+ Set<String> extensions = new HashSet<>();
+ for (CompressionType type : CompressionType.values()) {
+ if (type != CompressionType.NONE
+ && type.fileExtension() != null
+ && !type.fileExtension().isEmpty()) {
+ extensions.add(type.fileExtension().toLowerCase());
+ }
+ }
+ SUPPORTED_EXTENSIONS = Collections.unmodifiableSet(extensions);
+ }
+
+ CompressionType(String value, String description, String className, String
fileExtension) {
+ this.value = value;
+ this.description = description;
+ this.className = className;
+ this.fileExtension = fileExtension;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+
+ public String value() {
+ return value;
+ }
+
+ public String hadoopCodecClassName() {
+ return className;
+ }
+
+ public String fileExtension() {
+ return fileExtension;
+ }
+
+ public static CompressionType fromValue(String value) {
+ if (value == null || value.isEmpty()) {
+ return NONE;
+ }
+
+ for (CompressionType type : CompressionType.values()) {
+ if (type.value.equalsIgnoreCase(value)) {
+ return type;
+ }
+ }
+ return NONE;
+ }
+
+ /**
+ * Check if the given extension is a supported compression extension.
+ *
+ * @param extension the file extension to check
+ * @return true if the extension is a supported compression extension,
false otherwise
+ */
+ public static boolean isSupportedExtension(String extension) {
+ if (extension == null || extension.isEmpty()) {
+ return false;
+ }
+ return SUPPORTED_EXTENSIONS.contains(extension.toLowerCase());
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
index 40dcd975fc..5e1f186c85 100644
---
a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
@@ -140,6 +140,10 @@ public abstract class FormatReadWriteTest {
return true;
}
+ public String compression() {
+ return "zstd";
+ }
+
@Test
public void testNestedReadPruning() throws Exception {
if (!supportNestedReadPruning()) {
@@ -250,10 +254,10 @@ public abstract class FormatReadWriteTest {
FormatWriter writer;
PositionOutputStream out = null;
if (factory instanceof SupportsDirectWrite) {
- writer = ((SupportsDirectWrite) factory).create(fileIO, file,
"zstd");
+ writer = ((SupportsDirectWrite) factory).create(fileIO, file,
this.compression());
} else {
out = fileIO.newOutputStream(file, false);
- writer = factory.create(out, "zstd");
+ writer = factory.create(out, this.compression());
}
for (InternalRow row : rows) {
writer.addElement(row);
@@ -381,7 +385,7 @@ public abstract class FormatReadWriteTest {
Path filePath = new Path(parent, UUID.randomUUID().toString());
FormatWriterFactory writerFactory =
jsonFormat.createWriterFactory(rowType);
try (FormatWriter writer =
- writerFactory.create(fileIO.newOutputStream(filePath, false),
"none")) {
+ writerFactory.create(fileIO.newOutputStream(filePath, false),
compression())) {
for (InternalRow row : testData) {
writer.addElement(row);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
index e8193866db..933825271f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
@@ -19,6 +19,7 @@
package org.apache.paimon.io;
import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.format.CompressionType;
import org.apache.paimon.fs.ExternalPathProvider;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.FileEntry;
@@ -88,8 +89,14 @@ public class DataFilePathFactory {
private String newFileName(String prefix) {
String extension;
- if (fileSuffixIncludeCompression) {
- extension = "." + fileCompression + "." + formatIdentifier;
+ if (isTextFormat(formatIdentifier)) {
+ String compressionExtension =
+ CompressionType.fromValue(fileCompression).fileExtension();
+ extension = "." + formatIdentifier + "." + compressionExtension;
+ } else if (fileSuffixIncludeCompression) {
+ String compressionExtension =
+ CompressionType.fromValue(fileCompression).fileExtension();
+ extension = "." + compressionExtension + "." + formatIdentifier;
} else {
extension = "." + formatIdentifier;
}
@@ -162,7 +169,19 @@ public class DataFilePathFactory {
throw new IllegalArgumentException(fileName + " is not a legal
file name.");
}
- return fileName.substring(index + 1);
+ String extension = fileName.substring(index + 1);
+ if (CompressionType.isSupportedExtension(extension)) {
+ int secondLastDot = fileName.lastIndexOf('.', index - 1);
+ if (secondLastDot != -1) {
+ String formatIdentifier = fileName.substring(secondLastDot +
1, index);
+ // If the format is json or csv, return that instead of the
compression extension
+ if (isTextFormat(formatIdentifier)) {
+ return formatIdentifier;
+ }
+ }
+ }
+
+ return extension;
}
public boolean isExternalPath() {
@@ -173,4 +192,9 @@ public class DataFilePathFactory {
String uuid() {
return uuid;
}
+
+ private static boolean isTextFormat(String formatIdentifier) {
+ return "json".equalsIgnoreCase(formatIdentifier)
+ || "csv".equalsIgnoreCase(formatIdentifier);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index f61f1fa1d7..0c24824ef8 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -68,6 +68,16 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
sql("CREATE TABLE CSV (a INT, b INT, c INT) WITH
('file.format'='csv')");
sql("INSERT INTO CSV VALUES (1, 2, 3)");
assertThat(sql("SELECT * FROM CSV")).containsExactly(Row.of(1, 2, 3));
+
+ sql(
+ "CREATE TABLE CSV_GZIP (a INT, b INT, c INT) WITH
('file.format'='csv', 'file.compression'='gzip')");
+ sql("INSERT INTO CSV_GZIP VALUES (1, 2, 3)");
+ assertThat(sql("SELECT * FROM CSV_GZIP")).containsExactly(Row.of(1, 2,
3));
+ List<String> files =
+ sql("select file_path from `CSV_GZIP$files`").stream()
+ .map(r -> r.getField(0).toString())
+ .collect(Collectors.toList());
+ assertThat(files).allMatch(file -> file.endsWith("csv.gz"));
}
@Test
@@ -75,6 +85,16 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
sql("CREATE TABLE JSON_T (a INT, b INT, c INT) WITH
('file.format'='json')");
sql("INSERT INTO JSON_T VALUES (1, 2, 3)");
assertThat(sql("SELECT * FROM JSON_T")).containsExactly(Row.of(1, 2,
3));
+
+ sql(
+ "CREATE TABLE JSON_GZIP (a INT, b INT, c INT) WITH
('file.format'='json', 'file.compression'='gzip')");
+ sql("INSERT INTO JSON_GZIP VALUES (1, 2, 3)");
+ assertThat(sql("SELECT * FROM JSON_GZIP")).containsExactly(Row.of(1,
2, 3));
+ List<String> files =
+ sql("select file_path from `JSON_GZIP$files`").stream()
+ .map(r -> r.getField(0).toString())
+ .collect(Collectors.toList());
+ assertThat(files).allMatch(file -> file.endsWith("json.gz"));
}
@Test
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileReader.java
index 3cf2ec92cd..033f9bf65d 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileReader.java
@@ -21,6 +21,7 @@ package org.apache.paimon.format;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.types.RowType;
@@ -29,6 +30,7 @@ import javax.annotation.Nullable;
import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
@@ -37,17 +39,21 @@ public abstract class BaseTextFileReader implements
FileRecordReader<InternalRow
protected final Path filePath;
protected final RowType rowType;
+ protected final InputStream decompressedStream;
protected final BufferedReader bufferedReader;
protected boolean readerClosed = false;
protected BaseTextRecordIterator reader;
- protected BaseTextFileReader(FileIO fileIO, Path filePath, RowType
rowType) throws IOException {
+ protected BaseTextFileReader(FileIO fileIO, Path filePath, RowType
rowType, Options options)
+ throws IOException {
this.filePath = filePath;
this.rowType = rowType;
+ this.decompressedStream =
+ TextCompression.createDecompressedInputStream(
+ fileIO.newInputStream(filePath), filePath, options);
this.bufferedReader =
new BufferedReader(
- new InputStreamReader(
- fileIO.newInputStream(filePath),
StandardCharsets.UTF_8));
+ new InputStreamReader(this.decompressedStream,
StandardCharsets.UTF_8));
this.reader = createRecordIterator();
}
@@ -89,8 +95,15 @@ public abstract class BaseTextFileReader implements
FileRecordReader<InternalRow
@Override
public void close() throws IOException {
- if (!readerClosed && bufferedReader != null) {
- bufferedReader.close();
+ if (!readerClosed) {
+ // Close the buffered reader first
+ if (bufferedReader != null) {
+ bufferedReader.close();
+ }
+ // Explicitly close the decompressed stream to prevent resource
leaks
+ if (decompressedStream != null) {
+ decompressedStream.close();
+ }
readerClosed = true;
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileWriter.java
index 6be2123ed3..f6534afd61 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileWriter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileWriter.java
@@ -20,10 +20,12 @@ package org.apache.paimon.format;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.options.Options;
import org.apache.paimon.types.RowType;
import java.io.BufferedWriter;
import java.io.IOException;
+import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
@@ -34,11 +36,21 @@ public abstract class BaseTextFileWriter implements
FormatWriter {
protected final BufferedWriter writer;
protected final RowType rowType;
- protected BaseTextFileWriter(PositionOutputStream outputStream, RowType
rowType) {
+ protected BaseTextFileWriter(
+ PositionOutputStream outputStream,
+ RowType rowType,
+ Options formatOptions,
+ CompressionType compressionType)
+ throws IOException {
this.outputStream = outputStream;
- this.rowType = rowType;
+ OutputStream compressedStream =
+ TextCompression.createCompressedOutputStream(
+ outputStream, compressionType, formatOptions);
this.writer =
- new BufferedWriter(new OutputStreamWriter(outputStream,
StandardCharsets.UTF_8));
+ new BufferedWriter(
+ new OutputStreamWriter(compressedStream,
StandardCharsets.UTF_8),
+ getOptimalBufferSize(compressionType));
+ this.rowType = rowType;
}
/**
@@ -61,4 +73,21 @@ public abstract class BaseTextFileWriter implements
FormatWriter {
}
return false;
}
+
+ private int getOptimalBufferSize(CompressionType compressionType) {
+ switch (compressionType) {
+ case GZIP:
+ case DEFLATE:
+ return 65536; // 64KB for deflate-based compression
+ case SNAPPY:
+ case LZ4:
+ return 131072; // 128KB for fast compression
+ case ZSTD:
+ return 262144; // 256KB for high compression ratio
+ case BZIP2:
+ return 65536; // 64KB for bzip2
+ default:
+ return 65536; // Default 64KB buffer size
+ }
+ }
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/TextCompression.java
b/paimon-format/src/main/java/org/apache/paimon/format/TextCompression.java
new file mode 100644
index 0000000000..4994b2448a
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/paimon/format/TextCompression.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.HadoopUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Optional;
+
+/** Utility class for handling text file compression and decompression using
Hadoop codecs. */
+public class TextCompression {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TextCompression.class);
+ /**
+ * Creates a compressed output stream using Hadoop's compression codecs.
+ *
+ * @param out The underlying output stream
+ * @param compression The compression format
+ * @param options Paimon options for Hadoop configuration
+ * @return Compressed output stream
+ * @throws IOException If compression stream creation fails
+ */
+ public static OutputStream createCompressedOutputStream(
+ PositionOutputStream out, CompressionType compression, Options
options)
+ throws IOException {
+ Optional<CompressionCodec> codecOpt =
+ getCompressionCodecByCompression(compression, options);
+ if (codecOpt.isPresent()) {
+ return codecOpt.get().createOutputStream(out);
+ }
+ return out;
+ }
+
+ /**
+ * Creates a decompressed input stream using Hadoop's compression codecs.
+ *
+ * @param inputStream The underlying input stream
+ * @param filePath The file path (used to detect compression from
extension)
+ * @param options Paimon options for Hadoop configuration
+ * @return Decompressed input stream
+ */
+ public static InputStream createDecompressedInputStream(
+ SeekableInputStream inputStream, Path filePath, Options options)
throws IOException {
+ try {
+ Configuration conf = HadoopUtils.getHadoopConfiguration(options);
+ CompressionCodecFactory codecFactory = new
CompressionCodecFactory(conf);
+
+ Optional<CompressionCodec> codecOpt =
+ Optional.ofNullable(
+ codecFactory.getCodec(
+ new
org.apache.hadoop.fs.Path(filePath.toString())));
+ if (codecOpt.isPresent()) {
+ return codecOpt.get().createInputStream(inputStream);
+ }
+ return inputStream;
+ } catch (Throwable e) {
+ throw new IOException("Failed to create decompression stream", e);
+ }
+ }
+
+ public static CompressionType getTextCompressionType(String compression,
Options options) {
+ CompressionType compressionType =
CompressionType.fromValue(compression);
+ Optional<CompressionCodec> codecOpt =
+ getCompressionCodecByCompression(compressionType, options);
+ if (codecOpt.isPresent()) {
+ return CompressionType.fromValue(compression);
+ }
+ return CompressionType.NONE;
+ }
+
+ /**
+ * Gets a compression codec by compression type.
+ *
+ * @param compressionType The compression type
+ * @param options Paimon options for Hadoop configuration
+ * @return Optional CompressionCodec instance
+ */
+ public static Optional<CompressionCodec> getCompressionCodecByCompression(
+ CompressionType compressionType, Options options) {
+ if (compressionType == null || CompressionType.NONE ==
compressionType) {
+ return Optional.empty();
+ }
+
+ try {
+ Configuration conf = HadoopUtils.getHadoopConfiguration(options);
+ String codecName = compressionType.hadoopCodecClassName();
+ if (codecName != null) {
+ Class<?> codecClass = Class.forName(codecName);
+ if (CompressionCodec.class.isAssignableFrom(codecClass)) {
+ CompressionCodec codec =
+ (CompressionCodec)
codecClass.getDeclaredConstructor().newInstance();
+ if (codec instanceof org.apache.hadoop.conf.Configurable) {
+ ((org.apache.hadoop.conf.Configurable)
codec).setConf(conf);
+ }
+ // Test if the codec is actually usable by creating a test
stream
+ try {
+ codec.createOutputStream(new
java.io.ByteArrayOutputStream());
+ return Optional.of(codec);
+ } catch (Throwable e) {
+ LOG.warn("Failed to create compression, so use none",
e);
+ }
+ }
+ }
+ } catch (Throwable e) {
+ LOG.warn("Failed to create compression, so use none", e);
+ }
+ return Optional.empty();
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
index 6dce1b470a..59acbdffa6 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
@@ -19,11 +19,13 @@
package org.apache.paimon.format.csv;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.CompressionType;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FileFormatFactory.FormatContext;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.TextCompression;
import org.apache.paimon.fs.CloseShieldOutputStream;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.options.Options;
@@ -34,6 +36,7 @@ import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.List;
/** CSV {@link FileFormat}. */
@@ -55,12 +58,12 @@ public class CsvFileFormat extends FileFormat {
@Override
public FormatReaderFactory createReaderFactory(
RowType projectedRowType, @Nullable List<Predicate> filters) {
- return new CsvReaderFactory(projectedRowType, new CsvOptions(options));
+ return new CsvReaderFactory(projectedRowType, options);
}
@Override
public FormatWriterFactory createWriterFactory(RowType type) {
- return new CsvWriterFactory(type, new CsvOptions(options));
+ return new CsvWriterFactory(type, options);
}
@Override
@@ -103,16 +106,20 @@ public class CsvFileFormat extends FileFormat {
private static class CsvWriterFactory implements FormatWriterFactory {
private final RowType rowType;
- private final CsvOptions options;
+ private final Options options;
- public CsvWriterFactory(RowType rowType, CsvOptions options) {
+ public CsvWriterFactory(RowType rowType, Options options) {
this.rowType = rowType;
this.options = options;
}
@Override
- public FormatWriter create(PositionOutputStream out, String
compression) {
- return new CsvFormatWriter(new CloseShieldOutputStream(out),
rowType, options);
+ public FormatWriter create(PositionOutputStream out, String
compression)
+ throws IOException {
+ CompressionType compressionType =
+ TextCompression.getTextCompressionType(compression,
options);
+ return new CsvFormatWriter(
+ new CloseShieldOutputStream(out), rowType, options,
compressionType);
}
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
index c85ab8208e..dbb174528a 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
@@ -26,6 +26,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.BaseTextFileReader;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.DataTypes;
@@ -49,20 +50,20 @@ public class CsvFileReader extends BaseTextFileReader {
private static final Map<String, CastExecutor<?, ?>> CAST_EXECUTOR_CACHE =
new ConcurrentHashMap<>(32);
- private final CsvOptions options;
+ private final CsvOptions formatOptions;
private final CsvSchema schema;
private boolean headerSkipped = false;
- public CsvFileReader(FileIO fileIO, Path filePath, RowType rowType,
CsvOptions options)
+ public CsvFileReader(FileIO fileIO, Path filePath, RowType rowType,
Options options)
throws IOException {
- super(fileIO, filePath, rowType);
- this.options = options;
+ super(fileIO, filePath, rowType, options);
+ this.formatOptions = new CsvOptions(options);
this.schema =
CsvSchema.emptySchema()
- .withQuoteChar(options.quoteCharacter().charAt(0))
-
.withColumnSeparator(options.fieldDelimiter().charAt(0))
- .withEscapeChar(options.escapeCharacter().charAt(0));
- if (!options.includeHeader()) {
+
.withQuoteChar(formatOptions.quoteCharacter().charAt(0))
+
.withColumnSeparator(formatOptions.fieldDelimiter().charAt(0))
+
.withEscapeChar(formatOptions.escapeCharacter().charAt(0));
+ if (!formatOptions.includeHeader()) {
this.schema.withoutHeader();
}
}
@@ -80,7 +81,7 @@ public class CsvFileReader extends BaseTextFileReader {
@Override
protected void setupReading() throws IOException {
// Skip header if needed
- if (options.includeHeader() && !headerSkipped) {
+ if (formatOptions.includeHeader() && !headerSkipped) {
bufferedReader.readLine();
headerSkipped = true;
}
@@ -108,7 +109,7 @@ public class CsvFileReader extends BaseTextFileReader {
String field = fields[i];
// Fast path for null values
- if (field == null || field.equals(options.nullLiteral()) ||
field.isEmpty()) {
+ if (field == null || field.equals(formatOptions.nullLiteral()) ||
field.isEmpty()) {
values[i] = null;
continue;
}
@@ -122,7 +123,7 @@ public class CsvFileReader extends BaseTextFileReader {
/** Optimized field parsing with caching and fast paths for common types.
*/
private Object parseFieldOptimized(String field, DataType dataType) {
- if (field == null || field.equals(options.nullLiteral())) {
+ if (field == null || field.equals(formatOptions.nullLiteral())) {
return null;
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
index 754bb5a192..87d1abb5d2 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
@@ -22,7 +22,9 @@ import org.apache.paimon.casting.CastExecutor;
import org.apache.paimon.casting.CastExecutors;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.BaseTextFileWriter;
+import org.apache.paimon.format.CompressionType;
import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.RowType;
@@ -40,20 +42,22 @@ public class CsvFormatWriter extends BaseTextFileWriter {
private static final Map<String, CastExecutor<?, ?>> CAST_EXECUTOR_CACHE =
new ConcurrentHashMap<>(32);
- private final CsvOptions options;
+ private final CsvOptions csvOptions;
private boolean headerWritten = false;
private final StringBuilder stringBuilder;
- public CsvFormatWriter(PositionOutputStream out, RowType rowType,
CsvOptions options) {
- super(out, rowType);
- this.options = options;
+ public CsvFormatWriter(
+ PositionOutputStream out, RowType rowType, Options options,
CompressionType compression)
+ throws IOException {
+ super(out, rowType, options, compression);
+ this.csvOptions = new CsvOptions(options);
this.stringBuilder = new StringBuilder();
}
@Override
public void addElement(InternalRow element) throws IOException {
// Write header if needed
- if (options.includeHeader() && !headerWritten) {
+ if (csvOptions.includeHeader() && !headerWritten) {
writeHeader();
headerWritten = true;
}
@@ -64,7 +68,7 @@ public class CsvFormatWriter extends BaseTextFileWriter {
int fieldCount = rowType.getFieldCount();
for (int i = 0; i < fieldCount; i++) {
if (i > 0) {
- stringBuilder.append(options.fieldDelimiter());
+ stringBuilder.append(csvOptions.fieldDelimiter());
}
Object value =
@@ -72,7 +76,7 @@ public class CsvFormatWriter extends BaseTextFileWriter {
String fieldValue = escapeField(castToStringOptimized(value,
rowType.getTypeAt(i)));
stringBuilder.append(fieldValue);
}
- stringBuilder.append(options.lineDelimiter());
+ stringBuilder.append(csvOptions.lineDelimiter());
writer.write(stringBuilder.toString());
}
@@ -83,24 +87,24 @@ public class CsvFormatWriter extends BaseTextFileWriter {
int fieldCount = rowType.getFieldCount();
for (int i = 0; i < fieldCount; i++) {
if (i > 0) {
- stringBuilder.append(options.fieldDelimiter());
+ stringBuilder.append(csvOptions.fieldDelimiter());
}
stringBuilder.append(escapeField(rowType.getFieldNames().get(i)));
}
- stringBuilder.append(options.lineDelimiter());
+ stringBuilder.append(csvOptions.lineDelimiter());
writer.write(stringBuilder.toString());
}
private String escapeField(String field) {
if (field == null) {
- return options.nullLiteral();
+ return csvOptions.nullLiteral();
}
// Optimized escaping with early exit checks
boolean needsQuoting =
- field.indexOf(options.fieldDelimiter().charAt(0)) >= 0
- || field.indexOf(options.lineDelimiter().charAt(0)) >= 0
- || field.indexOf(options.quoteCharacter().charAt(0))
>= 0;
+ field.indexOf(csvOptions.fieldDelimiter().charAt(0)) >= 0
+ || field.indexOf(csvOptions.lineDelimiter().charAt(0))
>= 0
+ ||
field.indexOf(csvOptions.quoteCharacter().charAt(0)) >= 0;
if (!needsQuoting) {
return field;
@@ -109,9 +113,9 @@ public class CsvFormatWriter extends BaseTextFileWriter {
// Only escape if needed
String escaped =
field.replace(
- options.quoteCharacter(),
- options.escapeCharacter() + options.quoteCharacter());
- return options.quoteCharacter() + escaped + options.quoteCharacter();
+ csvOptions.quoteCharacter(),
+ csvOptions.escapeCharacter() +
csvOptions.quoteCharacter());
+ return csvOptions.quoteCharacter() + escaped +
csvOptions.quoteCharacter();
}
/** Optimized string casting with caching and fast paths for common types.
*/
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvReaderFactory.java
index cb26e3faf8..23fcf09fd6 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvReaderFactory.java
@@ -20,6 +20,7 @@ package org.apache.paimon.format.csv;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.options.Options;
import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.types.RowType;
@@ -29,9 +30,9 @@ import java.io.IOException;
public class CsvReaderFactory implements FormatReaderFactory {
private final RowType rowType;
- private final CsvOptions options;
+ private final Options options;
- public CsvReaderFactory(RowType rowType, CsvOptions options) {
+ public CsvReaderFactory(RowType rowType, Options options) {
this.rowType = rowType;
this.options = options;
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java
index 49790fefc2..54ccf59602 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java
@@ -19,11 +19,13 @@
package org.apache.paimon.format.json;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.CompressionType;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FileFormatFactory.FormatContext;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.TextCompression;
import org.apache.paimon.fs.CloseShieldOutputStream;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.options.Options;
@@ -34,6 +36,7 @@ import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.List;
/** JSON {@link FileFormat}. */
@@ -45,18 +48,18 @@ public class JsonFileFormat extends FileFormat {
public JsonFileFormat(FormatContext context) {
super(IDENTIFIER);
- this.options = getIdentifierPrefixOptions(context.options());
+ this.options = context.options();
}
@Override
public FormatReaderFactory createReaderFactory(
RowType projectedRowType, @Nullable List<Predicate> filters) {
- return new JsonReaderFactory(projectedRowType, new
JsonOptions(options));
+ return new JsonReaderFactory(projectedRowType, options);
}
@Override
public FormatWriterFactory createWriterFactory(RowType type) {
- return new JsonWriterFactory(type, new JsonOptions(options));
+ return new JsonWriterFactory(type, options);
}
@Override
@@ -102,16 +105,20 @@ public class JsonFileFormat extends FileFormat {
private static class JsonWriterFactory implements FormatWriterFactory {
private final RowType rowType;
- private final JsonOptions options;
+ private final Options options;
- public JsonWriterFactory(RowType rowType, JsonOptions options) {
+ public JsonWriterFactory(RowType rowType, Options options) {
this.rowType = rowType;
this.options = options;
}
@Override
- public FormatWriter create(PositionOutputStream out, String
compression) {
- return new JsonFormatWriter(new CloseShieldOutputStream(out),
rowType, options);
+ public FormatWriter create(PositionOutputStream out, String
compression)
+ throws IOException {
+ CompressionType compressionType =
+ TextCompression.getTextCompressionType(compression,
options);
+ return new JsonFormatWriter(
+ new CloseShieldOutputStream(out), rowType, options,
compressionType);
}
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
index 96d5ecfa5d..b46f92835e 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
@@ -28,6 +28,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.BaseTextFileReader;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
@@ -53,10 +54,10 @@ public class JsonFileReader extends BaseTextFileReader {
private final JsonOptions options;
- public JsonFileReader(FileIO fileIO, Path filePath, RowType rowType,
JsonOptions options)
+ public JsonFileReader(FileIO fileIO, Path filePath, RowType rowType,
Options options)
throws IOException {
- super(fileIO, filePath, rowType);
- this.options = options;
+ super(fileIO, filePath, rowType, options);
+ this.options = new JsonOptions(options);
}
@Override
@@ -67,7 +68,8 @@ public class JsonFileReader extends BaseTextFileReader {
@Override
protected InternalRow parseLine(String line) throws IOException {
try {
- return convertJsonStringToRow(line, rowType, options);
+ JsonNode jsonNode =
JsonSerdeUtil.OBJECT_MAPPER_INSTANCE.readTree(line);
+ return (InternalRow) convertJsonValue(jsonNode, rowType, options);
} catch (JsonProcessingException e) {
if (options.ignoreParseErrors()) {
return null;
@@ -88,12 +90,6 @@ public class JsonFileReader extends BaseTextFileReader {
// No additional JSON-specific iterator logic needed
}
- private InternalRow convertJsonStringToRow(String line, RowType rowType,
JsonOptions options)
- throws JsonProcessingException {
- JsonNode jsonNode =
JsonSerdeUtil.OBJECT_MAPPER_INSTANCE.readTree(line);
- return (InternalRow) convertJsonValue(jsonNode, rowType, options);
- }
-
private Object convertJsonValue(JsonNode node, DataType dataType,
JsonOptions options) {
if (node == null || node.isNull()) {
return null;
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
index de6b1a70f9..414a7eb0fd 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
@@ -25,7 +25,9 @@ import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.BaseTextFileWriter;
+import org.apache.paimon.format.CompressionType;
import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.options.Options;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
@@ -52,9 +54,13 @@ public class JsonFormatWriter extends BaseTextFileWriter {
private final char lineDelimiter;
public JsonFormatWriter(
- PositionOutputStream outputStream, RowType rowType, JsonOptions
options) {
- super(outputStream, rowType);
- this.lineDelimiter = options.getLineDelimiter().charAt(0);
+ PositionOutputStream outputStream,
+ RowType rowType,
+ Options options,
+ CompressionType compressionType)
+ throws IOException {
+ super(outputStream, rowType, options, compressionType);
+ this.lineDelimiter = (new
JsonOptions(options)).getLineDelimiter().charAt(0);
}
@Override
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonReaderFactory.java
index b82d77d948..709f2d345a 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonReaderFactory.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.types.RowType;
@@ -31,9 +32,9 @@ import java.io.IOException;
public class JsonReaderFactory implements FormatReaderFactory {
private final RowType projectedRowType;
- private final JsonOptions options;
+ private final Options options;
- public JsonReaderFactory(RowType projectedRowType, JsonOptions options) {
+ public JsonReaderFactory(RowType projectedRowType, Options options) {
this.projectedRowType = projectedRowType;
this.options = options;
}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/BaseCompressionTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/BaseCompressionTest.java
new file mode 100644
index 0000000000..7f616597d8
--- /dev/null
+++
b/paimon-format/src/test/java/org/apache/paimon/format/BaseCompressionTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FileFormatFactory.FormatContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Base class for compression tests across different file formats. */
+public abstract class BaseCompressionTest {
+
+ @TempDir protected java.nio.file.Path tempDir;
+
+ protected final RowType rowType =
+ RowType.of(
+ DataTypes.INT(), DataTypes.STRING(), DataTypes.DOUBLE(),
DataTypes.BOOLEAN());
+
+ protected final List<InternalRow> testData =
+ Arrays.asList(
+ GenericRow.of(1, BinaryString.fromString("Alice"), 100.5,
true),
+ GenericRow.of(2, BinaryString.fromString("Bob"), 200.75,
false),
+ GenericRow.of(3, BinaryString.fromString("Charlie"),
300.25, true),
+ GenericRow.of(4, BinaryString.fromString("Diana"), 400.0,
false));
+
+ private List<CompressionType> compressionTypes =
+ Arrays.asList(
+ CompressionType.NONE,
+ CompressionType.GZIP,
+ CompressionType.BZIP2,
+ CompressionType.DEFLATE,
+ CompressionType.ZSTD);
+
+ /** Returns the file format for testing. */
+ protected abstract FileFormat createFileFormat(Options options);
+
+ /** Returns the file extension for the format. */
+ protected abstract String getFormatExtension();
+
+ @Test
+ void testCompression() {
+ compressionTypes.forEach(
+ compression -> {
+ try {
+ testCompressionRoundTrip(
+ compression.value(),
+ String.format(
+ "test_compress.%s.%s",
+ getFormatExtension(),
compression.fileExtension()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Test
+ void testCompressionInitFail() throws IOException {
+ CompressionType compression = CompressionType.SNAPPY;
+ assertThrows(
+ IOException.class,
+ () ->
+ testCompressionRoundTrip(
+ compression.value(),
+ String.format(
+ "test_compress.%s.%s",
+ getFormatExtension(),
compression.fileExtension())));
+ }
+
+ @Test
+ void testCompressionDetectionFromFileName() {
+ compressionTypes.forEach(
+ compression -> {
+ try {
+ testAutoCompressionDetection(
+ "test_auto."
+ + getFormatExtension()
+ + "."
+ + compression.fileExtension(),
+ compression.value());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Test
+ void testUnsupportedCompressionFormat() throws IOException {
+ Options options = new Options();
+ options.set(CoreOptions.FILE_COMPRESSION, "unsupported");
+ testCompressionRoundTripWithOptions(options,
"test_file_unsupported_compress");
+ }
+
+ protected void testCompressionRoundTrip(String compression, String
fileName)
+ throws IOException {
+ Options options = new Options();
+ options.set(CoreOptions.FILE_COMPRESSION, compression);
+ testCompressionRoundTripWithOptions(options, fileName);
+ }
+
+ protected void testCompressionRoundTripWithOptions(Options options, String
fileName)
+ throws IOException {
+ FileFormat format = createFileFormat(options);
+
+ // Validate the format and compression
+ format.validateDataFields(rowType);
+
+ Path filePath = new Path(tempDir.resolve(fileName).toString());
+ FileIO fileIO = new LocalFileIO();
+
+ // Write data with compression
+ FormatWriterFactory writerFactory =
format.createWriterFactory(rowType);
+ try (FormatWriter writer =
+ writerFactory.create(
+ fileIO.newOutputStream(filePath, false),
+ options.get(CoreOptions.FILE_COMPRESSION))) {
+ for (InternalRow row : testData) {
+ writer.addElement(row);
+ }
+ }
+
+ // Read data back
+ FormatReaderFactory readerFactory =
format.createReaderFactory(rowType, null);
+ List<InternalRow> readData = new ArrayList<>();
+
+ try (RecordReader<InternalRow> reader =
+ readerFactory.createReader(
+ new FormatReaderContext(fileIO, filePath,
fileIO.getFileSize(filePath)))) {
+ reader.forEachRemaining(readData::add);
+ }
+
+ // Verify data integrity
+ assertThat(readData).hasSize(testData.size());
+ for (int i = 0; i < testData.size(); i++) {
+ InternalRow expected = testData.get(i);
+ InternalRow actual = readData.get(i);
+
+ assertThat(actual.getInt(0)).isEqualTo(expected.getInt(0));
+
assertThat(actual.getString(1).toString()).isEqualTo(expected.getString(1).toString());
+ assertThat(actual.getDouble(2)).isEqualTo(expected.getDouble(2));
+ assertThat(actual.getBoolean(3)).isEqualTo(expected.getBoolean(3));
+ }
+ }
+
+ protected void testAutoCompressionDetection(String fileName, String
compression)
+ throws IOException {
+ // Write file with compression
+ Options writeOptions = new Options();
+ writeOptions.set(CoreOptions.FILE_COMPRESSION, compression);
+
+ FileFormat format = createFileFormat(writeOptions);
+ Path filePath = new Path(tempDir.resolve(fileName).toString());
+ FileIO fileIO = new LocalFileIO();
+
+ // Write compressed data
+ FormatWriterFactory writerFactory =
format.createWriterFactory(rowType);
+ try (FormatWriter writer =
+ writerFactory.create(fileIO.newOutputStream(filePath, false),
compression)) {
+ writer.addElement(testData.get(0)); // Write just one row for this
test
+ }
+
+ // Read back with auto-detection (no compression specified in read
options)
+ Options readOptions = new Options();
+ readOptions.set(CoreOptions.FILE_COMPRESSION, "none"); // Default to
none
+
+ FileFormat readFormat = createFileFormat(readOptions);
+ FormatReaderFactory readerFactory =
readFormat.createReaderFactory(rowType, null);
+
+ List<InternalRow> readData = new ArrayList<>();
+ try (RecordReader<InternalRow> reader =
+ readerFactory.createReader(
+ new FormatReaderContext(fileIO, filePath,
fileIO.getFileSize(filePath)))) {
+ reader.forEachRemaining(readData::add);
+ }
+
+ // Should successfully read the data regardless of compression
+ assertThat(readData).hasSize(1);
+ InternalRow expected = testData.get(0);
+ InternalRow actual = readData.get(0);
+ assertThat(actual.getInt(0)).isEqualTo(expected.getInt(0));
+
assertThat(actual.getString(1).toString()).isEqualTo(expected.getString(1).toString());
+ }
+
+ protected FormatContext createFormatContext(Options options) {
+ return new FormatContext(options, 1024, 1024);
+ }
+}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvCompressionTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvCompressionTest.java
new file mode 100644
index 0000000000..159aa3715e
--- /dev/null
+++
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvCompressionTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.csv;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.format.BaseCompressionTest;
+import org.apache.paimon.format.CompressionType;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.options.Options;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+/** Test for CSV compression functionality. */
+class CsvCompressionTest extends BaseCompressionTest {
+
+ @Override
+ protected FileFormat createFileFormat(Options options) {
+ return new CsvFileFormat(createFormatContext(options));
+ }
+
+ @Override
+ protected String getFormatExtension() {
+ return "csv";
+ }
+
+ @Test
+ void testCompressionWithCustomOptions() throws IOException {
+ Options options = new Options();
+ options.set(CoreOptions.FILE_COMPRESSION,
CompressionType.GZIP.value());
+ options.set(CsvOptions.FIELD_DELIMITER, ";");
+ options.set(CsvOptions.INCLUDE_HEADER, true);
+
+ String fileName = "test_custom_options.csv.gz";
+ testCompressionRoundTripWithOptions(options, fileName);
+ }
+}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
index bed689bd4d..2b12874588 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
@@ -18,12 +18,14 @@
package org.apache.paimon.format.csv;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.format.CompressionType;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FileFormatFactory.FormatContext;
import org.apache.paimon.format.FormatReadWriteTest;
@@ -61,7 +63,9 @@ public class CsvFileFormatTest extends FormatReadWriteTest {
@Override
protected FileFormat fileFormat() {
- return new CsvFileFormatFactory().create(new FormatContext(new
Options(), 1024, 1024));
+ Options options = new Options();
+ options.set(CoreOptions.FILE_COMPRESSION, compression());
+ return new CsvFileFormatFactory().create(new FormatContext(options,
1024, 1024));
}
@Test
@@ -480,6 +484,11 @@ public class CsvFileFormatTest extends FormatReadWriteTest
{
return false;
}
+ @Override
+ public String compression() {
+ return CompressionType.NONE.value();
+ }
+
@Override
public boolean supportDataFileWithoutExtension() {
return true;
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/json/JsonCompressionTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonCompressionTest.java
new file mode 100644
index 0000000000..afd79fa878
--- /dev/null
+++
b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonCompressionTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.json;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.format.BaseCompressionTest;
+import org.apache.paimon.format.CompressionType;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.options.Options;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+/** Test for JSON compression functionality. */
+class JsonCompressionTest extends BaseCompressionTest {
+
+ @Override
+ protected FileFormat createFileFormat(Options options) {
+ return new JsonFileFormat(createFormatContext(options));
+ }
+
+ @Override
+ protected String getFormatExtension() {
+ return "json";
+ }
+
+ @Test
+ void testCompressionWithCustomJsonOptions() throws IOException {
+ Options options = new Options();
+ options.set(CoreOptions.FILE_COMPRESSION, "gzip");
+ options.set(JsonOptions.JSON_IGNORE_PARSE_ERRORS, true);
+ options.set(JsonOptions.JSON_MAP_NULL_KEY_MODE,
JsonOptions.MapNullKeyMode.DROP);
+ options.set(JsonOptions.LINE_DELIMITER, "\n");
+
+ String fileName = "test_custom_json_options.json.gz";
+ testCompressionRoundTripWithOptions(options, fileName);
+ }
+
+ @Test
+ void testJsonCompressionWithComplexData() throws IOException {
+ // Test with complex JSON structures and different compression formats
+ testCompressionRoundTrip(CompressionType.GZIP.value(),
"test_complex_gzip.json.gz");
+ testCompressionRoundTrip(
+ CompressionType.DEFLATE.value(),
"test_complex_deflate.json.deflate");
+ testCompressionRoundTrip(CompressionType.NONE.value(),
"test_complex_none.json");
+ }
+}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java
index dd38623279..82f6d10a22 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java
@@ -18,11 +18,13 @@
package org.apache.paimon.format.json;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericMap;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.format.CompressionType;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FileFormatFactory;
import org.apache.paimon.format.FormatReadWriteTest;
@@ -56,7 +58,14 @@ public class JsonFileFormatTest extends FormatReadWriteTest {
@Override
protected FileFormat fileFormat() {
- return new JsonFileFormat(new FileFormatFactory.FormatContext(new
Options(), 1024, 1024));
+ Options options = new Options();
+ options.set(CoreOptions.FILE_COMPRESSION, compression());
+ return new JsonFileFormat(new FileFormatFactory.FormatContext(options,
1024, 1024));
+ }
+
+ @Override
+ public String compression() {
+ return CompressionType.NONE.value();
}
@Test
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
index 51c2e63b5f..d889c1ac66 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
@@ -638,8 +638,8 @@ public class SparkWriteITCase {
Assertions.assertEquals(4, files.size());
String defaultExtension = "." + "parquet";
- String newExtension = "." + "zstd" + "." + "parquet";
- // two data files end with ".parquet", two data file end with
".zstd.parquet"
+ String newExtension = "." + "zst" + "." + "parquet";
+ // two data files end with ".parquet", two data file end with
".zst.parquet"
Assertions.assertEquals(
2,
files.stream()
@@ -682,8 +682,8 @@ public class SparkWriteITCase {
.filter(name -> name.contains("changelog-"))
.collect(Collectors.toList());
String defaultExtension = "." + "parquet";
- String newExtension = "." + "zstd" + "." + "parquet";
- // one changelog file end with ".parquet", one changelog file end with
".zstd.parquet"
+ String newExtension = "." + "zst" + "." + "parquet";
+ // one changelog file end with ".parquet", one changelog file end with
".zst.parquet"
Assertions.assertEquals(
1,
files.stream()