This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new a534853  Fix Parquet write properties forwarded from table properites 
(#183)
a534853 is described below

commit a534853b1d40bebd4227943f1abe5e29264675a2
Author: Ryan Rupp <[email protected]>
AuthorDate: Fri May 10 17:45:45 2019 -0500

    Fix Parquet write properties forwarded from table properites (#183)
    
    This updates Iceberg to use ParquetProperties directly instead of 
configuration properties.
---
 .../java/org/apache/iceberg/parquet/Parquet.java   | 39 ++++++----
 .../org/apache/iceberg/parquet/ParquetWriter.java  |  8 +-
 .../iceberg/parquet/BaseParquetWritingTest.java    | 67 ++++++++++++++++
 .../org/apache/iceberg/parquet/TestParquet.java    | 90 ++++++++++++++++++++++
 .../apache/iceberg/parquet/TestParquetMetrics.java | 20 +----
 5 files changed, 184 insertions(+), 40 deletions(-)

diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 22c5f18..7ffc818 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -42,6 +42,8 @@ import org.apache.parquet.HadoopReadOptions;
 import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.avro.AvroReadSupport;
 import org.apache.parquet.avro.AvroWriteSupport;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.parquet.hadoop.ParquetReader;
@@ -144,13 +146,6 @@ public class Parquet {
       }
     }
 
-    void forwardConfig(String parquetProperty, String icebergProperty, String 
defaultValue) {
-      String value = config.getOrDefault(icebergProperty, defaultValue);
-      if (value != null) {
-        set(parquetProperty, value);
-      }
-    }
-
     public <D> FileAppender<D> build() throws IOException {
       Preconditions.checkNotNull(schema, "Schema is required");
       Preconditions.checkNotNull(name, "Table name is required and cannot be 
null");
@@ -158,13 +153,15 @@ public class Parquet {
       // add the Iceberg schema to keyValueMetadata
       meta("iceberg.schema", SchemaParser.toJson(schema));
 
-      // add Parquet configuration
-      forwardConfig("parquet.block.size",
-          PARQUET_ROW_GROUP_SIZE_BYTES, PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT);
-      forwardConfig("parquet.page.size",
-          PARQUET_PAGE_SIZE_BYTES, PARQUET_PAGE_SIZE_BYTES_DEFAULT);
-      forwardConfig("parquet.dictionary.page.size",
-          PARQUET_DICT_SIZE_BYTES, PARQUET_DICT_SIZE_BYTES_DEFAULT);
+      // Map Iceberg properties to pass down to the Parquet writer
+      int rowGroupSize = Integer.parseInt(config.getOrDefault(
+          PARQUET_ROW_GROUP_SIZE_BYTES, PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT));
+      int pageSize = Integer.parseInt(config.getOrDefault(
+          PARQUET_PAGE_SIZE_BYTES, PARQUET_PAGE_SIZE_BYTES_DEFAULT));
+      int dictionaryPageSize = Integer.parseInt(config.getOrDefault(
+          PARQUET_DICT_SIZE_BYTES, PARQUET_DICT_SIZE_BYTES_DEFAULT));
+
+      WriterVersion writerVersion = WriterVersion.PARQUET_1_0;
 
       set("parquet.avro.write-old-list-structure", "false");
       MessageType type = ParquetSchemaUtil.convert(schema, name);
@@ -183,18 +180,26 @@ public class Parquet {
           conf.set(entry.getKey(), entry.getValue());
         }
 
-        long rowGroupSize = Long.parseLong(config.getOrDefault(
-            PARQUET_ROW_GROUP_SIZE_BYTES, 
PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT));
+        ParquetProperties parquetProperties = ParquetProperties.builder()
+            .withWriterVersion(writerVersion)
+            .withDictionaryPageSize(pageSize)
+            .withDictionaryPageSize(dictionaryPageSize)
+            .build();
+
         return new org.apache.iceberg.parquet.ParquetWriter<>(
-            conf, file, schema, rowGroupSize, metadata, createWriterFunc, 
codec());
+            conf, file, schema, rowGroupSize, metadata, createWriterFunc, 
codec(), parquetProperties);
       } else {
         return new ParquetWriteAdapter<>(new 
ParquetWriteBuilder<D>(ParquetIO.file(file))
+            .withWriterVersion(writerVersion)
             .setType(type)
             .setConfig(config)
             .setKeyValueMetadata(metadata)
             .setWriteSupport(getWriteSupport(type))
             .withCompressionCodec(codec())
             .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) // TODO: support 
modes
+            .withRowGroupSize(rowGroupSize)
+            .withPageSize(pageSize)
+            .withDictionaryPageSize(dictionaryPageSize)
             .build());
       }
     }
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
index 6366f2a..8ad126e 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
@@ -63,9 +63,7 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
   private final OutputFile output;
   private final long targetRowGroupSize;
   private final Map<String, String> metadata;
-  private final ParquetProperties props = ParquetProperties.builder()
-      .withWriterVersion(PARQUET_1_0)
-      .build();
+  private final ParquetProperties props;
   private final CodecFactory.BytesCompressor compressor;
   private final MessageType parquetSchema;
   private final ParquetValueWriter<T> model;
@@ -81,9 +79,11 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable 
{
   ParquetWriter(Configuration conf, OutputFile output, Schema schema, long 
rowGroupSize,
                 Map<String, String> metadata,
                 Function<MessageType, ParquetValueWriter<?>> createWriterFunc,
-                CompressionCodecName codec) {
+                CompressionCodecName codec,
+                ParquetProperties properties) {
     this.output = output;
     this.targetRowGroupSize = rowGroupSize;
+    this.props = properties;
     this.metadata = ImmutableMap.copyOf(metadata);
     this.compressor = new CodecFactory(conf, 
props.getPageSizeThreshold()).getCompressor(codec);
     this.parquetSchema = convert(schema, "table");
diff --git 
a/parquet/src/test/java/org/apache/iceberg/parquet/BaseParquetWritingTest.java 
b/parquet/src/test/java/org/apache/iceberg/parquet/BaseParquetWritingTest.java
new file mode 100644
index 0000000..d587b64
--- /dev/null
+++ 
b/parquet/src/test/java/org/apache/iceberg/parquet/BaseParquetWritingTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.parquet.schema.MessageType;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.Files.localOutput;
+
+/**
+ * Base utility test class for tests that need to write Parquet files
+ */
+public abstract class BaseParquetWritingTest {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  File writeRecords(Schema schema, GenericData.Record... records) throws 
IOException {
+    return writeRecords(schema, Collections.emptyMap(), null, records);
+  }
+
+  File writeRecords(
+      Schema schema, Map<String, String> properties,
+      Function<MessageType, ParquetValueWriter<?>> createWriterFunc,
+      GenericData.Record... records) throws IOException {
+    File tmpFolder = temp.newFolder("parquet");
+    String filename = UUID.randomUUID().toString();
+    File file = new File(tmpFolder, FileFormat.PARQUET.addExtension(filename));
+    try (FileAppender<GenericData.Record> writer = 
Parquet.write(localOutput(file))
+        .schema(schema)
+        .setAll(properties)
+        .createWriterFunc(createWriterFunc)
+        .build()) {
+      writer.addAll(Lists.newArrayList(records));
+    }
+    return file;
+  }
+}
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java 
b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
new file mode 100644
index 0000000..9429e45
--- /dev/null
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.types.Types.IntegerType;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.schema.MessageType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.iceberg.Files.localInput;
+import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestParquet extends BaseParquetWritingTest {
+
+  @Test
+  public void testRowGroupSizeConfigurable() throws IOException {
+    // Without an explicit writer function
+    File parquetFile = generateFileWithTwoRowGroups(null);
+
+    try (ParquetFileReader reader = 
ParquetFileReader.open(ParquetIO.file(localInput(parquetFile)))) {
+      Assert.assertEquals(2, reader.getRowGroups().size());
+    }
+  }
+
+  @Test
+  public void testRowGroupSizeConfigurableWithWriter() throws IOException {
+    File parquetFile = 
generateFileWithTwoRowGroups(ParquetAvroWriter::buildWriter);
+
+    try (ParquetFileReader reader = 
ParquetFileReader.open(ParquetIO.file(localInput(parquetFile)))) {
+      Assert.assertEquals(2, reader.getRowGroups().size());
+    }
+  }
+
+  private File generateFileWithTwoRowGroups(Function<MessageType, 
ParquetValueWriter<?>> createWriterFunc)
+      throws IOException {
+    Schema schema = new Schema(
+        optional(1, "intCol", IntegerType.get())
+    );
+
+    int minimumRowGroupRecordCount = 100;
+    int desiredRecordCount = minimumRowGroupRecordCount + 1;
+
+    List<GenericData.Record> records = new ArrayList<>(desiredRecordCount);
+    org.apache.avro.Schema avroSchema = 
AvroSchemaUtil.convert(schema.asStruct());
+    for (int i = 1; i <= desiredRecordCount; i++) {
+      GenericData.Record record = new GenericData.Record(avroSchema);
+      record.put("intCol", i);
+      records.add(record);
+    }
+
+    // Force multiple row groups by making the byte size very small
+    // Note there'a also minimumRowGroupRecordCount which cannot be configured 
so we have to write
+    // at least that many records for a new row group to occur
+    return writeRecords(
+        schema,
+        ImmutableMap.of(
+            PARQUET_ROW_GROUP_SIZE_BYTES,
+            Integer.toString(minimumRowGroupRecordCount * Integer.BYTES)),
+        createWriterFunc,
+        records.toArray(new GenericData.Record[] {}));
+  }
+}
diff --git 
a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java 
b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
index 229e77a..ec03850 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
@@ -32,11 +32,9 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericFixed;
 import org.apache.commons.io.Charsets;
-import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Metrics;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.avro.AvroSchemaUtil;
-import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types.BinaryType;
 import org.apache.iceberg.types.Types.BooleanType;
@@ -55,20 +53,15 @@ import org.apache.iceberg.types.Types.TimeType;
 import org.apache.iceberg.types.Types.TimestampType;
 import org.apache.iceberg.types.Types.UUIDType;
 import org.junit.Assert;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
 import static org.apache.iceberg.Files.localInput;
-import static org.apache.iceberg.Files.localOutput;
 import static org.apache.iceberg.types.Conversions.fromByteBuffer;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
 
-public class TestParquetMetrics {
+public class TestParquetMetrics extends BaseParquetWritingTest {
 
-  @Rule
-  public TemporaryFolder temp = new TemporaryFolder();
   private final UUID uuid = UUID.randomUUID();
   private final GenericFixed fixed = new GenericData.Fixed(
       org.apache.avro.Schema.createFixed("fixedCol", null, null, 4),
@@ -291,15 +284,4 @@ public class TestParquetMetrics {
         upperBounds.containsKey(fieldId) ? fromByteBuffer(type, 
upperBounds.get(fieldId)) : null);
   }
 
-  private File writeRecords(Schema schema, Record... records) throws 
IOException {
-    File tmpFolder = temp.newFolder("parquet");
-    String filename = UUID.randomUUID().toString();
-    File file = new File(tmpFolder, FileFormat.PARQUET.addExtension(filename));
-    try (FileAppender<Record> writer = Parquet.write(localOutput(file))
-        .schema(schema)
-        .build()) {
-      writer.addAll(Lists.newArrayList(records));
-    }
-    return file;
-  }
 }

Reply via email to