This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7111e33f4 [Bug] Fix the parquet.compression do no work in parquet 
format (#1502)
7111e33f4 is described below

commit 7111e33f454cfa4d838d3d9d963b2e642bd5926b
Author: Aitozi <[email protected]>
AuthorDate: Wed Jul 5 20:41:13 2023 +0800

    [Bug] Fix the parquet.compression do no work in parquet format (#1502)
---
 .../parquet/writer/RowDataParquetBuilder.java      | 23 ++++++++++++-------
 .../format/parquet/ParquetFileFormatTest.java      | 26 ++++++++++++----------
 .../format/parquet/ParquetReadWriteTest.java       |  4 +++-
 3 files changed, 32 insertions(+), 21 deletions(-)

diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
index a9cdc2156..e4874f579 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
@@ -28,6 +28,8 @@ import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.io.OutputFile;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 
 /** A {@link ParquetBuilder} for {@link InternalRow}. */
@@ -45,15 +47,8 @@ public class RowDataParquetBuilder implements 
ParquetBuilder<InternalRow> {
     public ParquetWriter<InternalRow> createWriter(OutputFile out, String 
compression)
             throws IOException {
 
-        String compressName = CompressionCodecName.SNAPPY.name();
-        if (null != compression) {
-            compressName = compression;
-        }
-
         return new ParquetRowDataBuilder(out, rowType)
-                .withCompressionCodec(
-                        CompressionCodecName.fromConf(
-                                
conf.getString(ParquetOutputFormat.COMPRESSION, compressName)))
+                
.withCompressionCodec(CompressionCodecName.fromConf(getCompression(compression)))
                 .withRowGroupSize(
                         conf.getLong(
                                 ParquetOutputFormat.BLOCK_SIZE, 
ParquetWriter.DEFAULT_BLOCK_SIZE))
@@ -80,4 +75,16 @@ public class RowDataParquetBuilder implements 
ParquetBuilder<InternalRow> {
                                         
ParquetProperties.DEFAULT_WRITER_VERSION.toString())))
                 .build();
     }
+
+    public String getCompression(@Nullable String compression) {
+        String compressName;
+        if (null != compression) {
+            compressName = compression;
+        } else {
+            compressName =
+                    conf.getString(
+                            ParquetOutputFormat.COMPRESSION, 
CompressionCodecName.SNAPPY.name());
+        }
+        return compressName;
+    }
 }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java
index da53a7781..0dd03f9b0 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.format.parquet;
 
 import org.apache.paimon.format.FileFormatFactory.FormatContext;
+import org.apache.paimon.format.parquet.writer.RowDataParquetBuilder;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.ConfigOptions;
 import org.apache.paimon.options.Options;
@@ -61,24 +62,25 @@ public class ParquetFileFormatTest {
 
     @Test
     public void testDefaultCompressionCodecName() {
+        // no parquet.compression and no file.compression
         Options conf = new Options();
-        
assertThat(getCompressionCodec(conf)).isEqualTo(CompressionCodec.SNAPPY.name());
+        RowDataParquetBuilder builder =
+                new RowDataParquetBuilder(
+                        new RowType(new ArrayList<>()), 
getParquetConfiguration(conf));
+        
assertThat(builder.getCompression(null)).isEqualTo(CompressionCodec.SNAPPY.name());
     }
 
     @Test
-    public void testSpecifiedCompressionCodecName() {
-        String lz4 = CompressionCodec.LZ4.name();
+    public void testFileCompressionHigherPreference() {
         Options conf = new Options();
+        String lz4 = CompressionCodec.LZ4.name();
         conf.setString(ParquetOutputFormat.COMPRESSION, lz4);
-        assertThat(getCompressionCodec(conf)).isEqualTo(lz4);
-    }
-
-    private String getCompressionCodec(Options conf) {
-        Options formatOptions = conf.removePrefix(IDENTIFIER + ".");
-        ParquetFileFormat parquet =
-                new ParquetFileFormatFactory().create(new 
FormatContext(formatOptions, 1024));
-        return getParquetConfiguration(parquet.formatOptions())
-                .getString(ParquetOutputFormat.COMPRESSION, null);
+        RowDataParquetBuilder builder =
+                new RowDataParquetBuilder(
+                        new RowType(new ArrayList<>()),
+                        getParquetConfiguration(conf.removePrefix(IDENTIFIER + 
".")));
+        assertThat(builder.getCompression(null)).isEqualTo(lz4);
+        
assertThat(builder.getCompression("SNAPPY")).isEqualTo(CompressionCodec.SNAPPY.name());
     }
 
     @Test
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
index 429614366..6542af029 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
@@ -297,8 +297,10 @@ public class ParquetReadWriteTest {
         conf.setInteger("parquet.block.size", rowGroupSize);
         ParquetWriterFactory factory =
                 new ParquetWriterFactory(new RowDataParquetBuilder(ROW_TYPE, 
conf));
+        String[] candidates = new String[] {"snappy", "zstd", "gzip"};
+        String compress = candidates[new Random().nextInt(3)];
         FormatWriter writer =
-                factory.create(new LocalFileIO().newOutputStream(path, false), 
"SNAPPY");
+                factory.create(new LocalFileIO().newOutputStream(path, false), 
compress);
         for (InternalRow row : rows) {
             writer.addElement(row);
         }

Reply via email to