This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7111e33f4 [Bug] Fix the parquet.compression do no work in parquet
format (#1502)
7111e33f4 is described below
commit 7111e33f454cfa4d838d3d9d963b2e642bd5926b
Author: Aitozi <[email protected]>
AuthorDate: Wed Jul 5 20:41:13 2023 +0800
[Bug] Fix the parquet.compression do no work in parquet format (#1502)
---
.../parquet/writer/RowDataParquetBuilder.java | 23 ++++++++++++-------
.../format/parquet/ParquetFileFormatTest.java | 26 ++++++++++++----------
.../format/parquet/ParquetReadWriteTest.java | 4 +++-
3 files changed, 32 insertions(+), 21 deletions(-)
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
index a9cdc2156..e4874f579 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
@@ -28,6 +28,8 @@ import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.OutputFile;
+import javax.annotation.Nullable;
+
import java.io.IOException;
/** A {@link ParquetBuilder} for {@link InternalRow}. */
@@ -45,15 +47,8 @@ public class RowDataParquetBuilder implements
ParquetBuilder<InternalRow> {
public ParquetWriter<InternalRow> createWriter(OutputFile out, String
compression)
throws IOException {
- String compressName = CompressionCodecName.SNAPPY.name();
- if (null != compression) {
- compressName = compression;
- }
-
return new ParquetRowDataBuilder(out, rowType)
- .withCompressionCodec(
- CompressionCodecName.fromConf(
-
conf.getString(ParquetOutputFormat.COMPRESSION, compressName)))
+
.withCompressionCodec(CompressionCodecName.fromConf(getCompression(compression)))
.withRowGroupSize(
conf.getLong(
ParquetOutputFormat.BLOCK_SIZE,
ParquetWriter.DEFAULT_BLOCK_SIZE))
@@ -80,4 +75,16 @@ public class RowDataParquetBuilder implements
ParquetBuilder<InternalRow> {
ParquetProperties.DEFAULT_WRITER_VERSION.toString())))
.build();
}
+
+ public String getCompression(@Nullable String compression) {
+ String compressName;
+ if (null != compression) {
+ compressName = compression;
+ } else {
+ compressName =
+ conf.getString(
+ ParquetOutputFormat.COMPRESSION,
CompressionCodecName.SNAPPY.name());
+ }
+ return compressName;
+ }
}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java
index da53a7781..0dd03f9b0 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.format.parquet;
import org.apache.paimon.format.FileFormatFactory.FormatContext;
+import org.apache.paimon.format.parquet.writer.RowDataParquetBuilder;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.options.Options;
@@ -61,24 +62,25 @@ public class ParquetFileFormatTest {
@Test
public void testDefaultCompressionCodecName() {
+ // no parquet.compression and no file.compression
Options conf = new Options();
-
assertThat(getCompressionCodec(conf)).isEqualTo(CompressionCodec.SNAPPY.name());
+ RowDataParquetBuilder builder =
+ new RowDataParquetBuilder(
+ new RowType(new ArrayList<>()),
getParquetConfiguration(conf));
+
assertThat(builder.getCompression(null)).isEqualTo(CompressionCodec.SNAPPY.name());
}
@Test
- public void testSpecifiedCompressionCodecName() {
- String lz4 = CompressionCodec.LZ4.name();
+ public void testFileCompressionHigherPreference() {
Options conf = new Options();
+ String lz4 = CompressionCodec.LZ4.name();
conf.setString(ParquetOutputFormat.COMPRESSION, lz4);
- assertThat(getCompressionCodec(conf)).isEqualTo(lz4);
- }
-
- private String getCompressionCodec(Options conf) {
- Options formatOptions = conf.removePrefix(IDENTIFIER + ".");
- ParquetFileFormat parquet =
- new ParquetFileFormatFactory().create(new
FormatContext(formatOptions, 1024));
- return getParquetConfiguration(parquet.formatOptions())
- .getString(ParquetOutputFormat.COMPRESSION, null);
+ RowDataParquetBuilder builder =
+ new RowDataParquetBuilder(
+ new RowType(new ArrayList<>()),
+ getParquetConfiguration(conf.removePrefix(IDENTIFIER +
".")));
+ assertThat(builder.getCompression(null)).isEqualTo(lz4);
+
assertThat(builder.getCompression("SNAPPY")).isEqualTo(CompressionCodec.SNAPPY.name());
}
@Test
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
index 429614366..6542af029 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
@@ -297,8 +297,10 @@ public class ParquetReadWriteTest {
conf.setInteger("parquet.block.size", rowGroupSize);
ParquetWriterFactory factory =
new ParquetWriterFactory(new RowDataParquetBuilder(ROW_TYPE,
conf));
+ String[] candidates = new String[] {"snappy", "zstd", "gzip"};
+ String compress = candidates[new Random().nextInt(3)];
FormatWriter writer =
- factory.create(new LocalFileIO().newOutputStream(path, false),
"SNAPPY");
+ factory.create(new LocalFileIO().newOutputStream(path, false),
compress);
for (InternalRow row : rows) {
writer.addElement(row);
}