This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new a534853 Fix Parquet write properties forwarded from table properites
(#183)
a534853 is described below
commit a534853b1d40bebd4227943f1abe5e29264675a2
Author: Ryan Rupp <[email protected]>
AuthorDate: Fri May 10 17:45:45 2019 -0500
Fix Parquet write properties forwarded from table properites (#183)
This updates Iceberg to use ParquetProperties directly instead of
configuration properties.
---
.../java/org/apache/iceberg/parquet/Parquet.java | 39 ++++++----
.../org/apache/iceberg/parquet/ParquetWriter.java | 8 +-
.../iceberg/parquet/BaseParquetWritingTest.java | 67 ++++++++++++++++
.../org/apache/iceberg/parquet/TestParquet.java | 90 ++++++++++++++++++++++
.../apache/iceberg/parquet/TestParquetMetrics.java | 20 +----
5 files changed, 184 insertions(+), 40 deletions(-)
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 22c5f18..7ffc818 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -42,6 +42,8 @@ import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroWriteSupport;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetReader;
@@ -144,13 +146,6 @@ public class Parquet {
}
}
- void forwardConfig(String parquetProperty, String icebergProperty, String
defaultValue) {
- String value = config.getOrDefault(icebergProperty, defaultValue);
- if (value != null) {
- set(parquetProperty, value);
- }
- }
-
public <D> FileAppender<D> build() throws IOException {
Preconditions.checkNotNull(schema, "Schema is required");
Preconditions.checkNotNull(name, "Table name is required and cannot be
null");
@@ -158,13 +153,15 @@ public class Parquet {
// add the Iceberg schema to keyValueMetadata
meta("iceberg.schema", SchemaParser.toJson(schema));
- // add Parquet configuration
- forwardConfig("parquet.block.size",
- PARQUET_ROW_GROUP_SIZE_BYTES, PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT);
- forwardConfig("parquet.page.size",
- PARQUET_PAGE_SIZE_BYTES, PARQUET_PAGE_SIZE_BYTES_DEFAULT);
- forwardConfig("parquet.dictionary.page.size",
- PARQUET_DICT_SIZE_BYTES, PARQUET_DICT_SIZE_BYTES_DEFAULT);
+ // Map Iceberg properties to pass down to the Parquet writer
+ int rowGroupSize = Integer.parseInt(config.getOrDefault(
+ PARQUET_ROW_GROUP_SIZE_BYTES, PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT));
+ int pageSize = Integer.parseInt(config.getOrDefault(
+ PARQUET_PAGE_SIZE_BYTES, PARQUET_PAGE_SIZE_BYTES_DEFAULT));
+ int dictionaryPageSize = Integer.parseInt(config.getOrDefault(
+ PARQUET_DICT_SIZE_BYTES, PARQUET_DICT_SIZE_BYTES_DEFAULT));
+
+ WriterVersion writerVersion = WriterVersion.PARQUET_1_0;
set("parquet.avro.write-old-list-structure", "false");
MessageType type = ParquetSchemaUtil.convert(schema, name);
@@ -183,18 +180,26 @@ public class Parquet {
conf.set(entry.getKey(), entry.getValue());
}
- long rowGroupSize = Long.parseLong(config.getOrDefault(
- PARQUET_ROW_GROUP_SIZE_BYTES,
PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT));
+ ParquetProperties parquetProperties = ParquetProperties.builder()
+ .withWriterVersion(writerVersion)
+ .withDictionaryPageSize(pageSize)
+ .withDictionaryPageSize(dictionaryPageSize)
+ .build();
+
return new org.apache.iceberg.parquet.ParquetWriter<>(
- conf, file, schema, rowGroupSize, metadata, createWriterFunc,
codec());
+ conf, file, schema, rowGroupSize, metadata, createWriterFunc,
codec(), parquetProperties);
} else {
return new ParquetWriteAdapter<>(new
ParquetWriteBuilder<D>(ParquetIO.file(file))
+ .withWriterVersion(writerVersion)
.setType(type)
.setConfig(config)
.setKeyValueMetadata(metadata)
.setWriteSupport(getWriteSupport(type))
.withCompressionCodec(codec())
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE) // TODO: support
modes
+ .withRowGroupSize(rowGroupSize)
+ .withPageSize(pageSize)
+ .withDictionaryPageSize(dictionaryPageSize)
.build());
}
}
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
index 6366f2a..8ad126e 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
@@ -63,9 +63,7 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
private final OutputFile output;
private final long targetRowGroupSize;
private final Map<String, String> metadata;
- private final ParquetProperties props = ParquetProperties.builder()
- .withWriterVersion(PARQUET_1_0)
- .build();
+ private final ParquetProperties props;
private final CodecFactory.BytesCompressor compressor;
private final MessageType parquetSchema;
private final ParquetValueWriter<T> model;
@@ -81,9 +79,11 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable
{
ParquetWriter(Configuration conf, OutputFile output, Schema schema, long
rowGroupSize,
Map<String, String> metadata,
Function<MessageType, ParquetValueWriter<?>> createWriterFunc,
- CompressionCodecName codec) {
+ CompressionCodecName codec,
+ ParquetProperties properties) {
this.output = output;
this.targetRowGroupSize = rowGroupSize;
+ this.props = properties;
this.metadata = ImmutableMap.copyOf(metadata);
this.compressor = new CodecFactory(conf,
props.getPageSizeThreshold()).getCompressor(codec);
this.parquetSchema = convert(schema, "table");
diff --git
a/parquet/src/test/java/org/apache/iceberg/parquet/BaseParquetWritingTest.java
b/parquet/src/test/java/org/apache/iceberg/parquet/BaseParquetWritingTest.java
new file mode 100644
index 0000000..d587b64
--- /dev/null
+++
b/parquet/src/test/java/org/apache/iceberg/parquet/BaseParquetWritingTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.parquet.schema.MessageType;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.Files.localOutput;
+
+/**
+ * Base utility test class for tests that need to write Parquet files
+ */
+public abstract class BaseParquetWritingTest {
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ File writeRecords(Schema schema, GenericData.Record... records) throws
IOException {
+ return writeRecords(schema, Collections.emptyMap(), null, records);
+ }
+
+ File writeRecords(
+ Schema schema, Map<String, String> properties,
+ Function<MessageType, ParquetValueWriter<?>> createWriterFunc,
+ GenericData.Record... records) throws IOException {
+ File tmpFolder = temp.newFolder("parquet");
+ String filename = UUID.randomUUID().toString();
+ File file = new File(tmpFolder, FileFormat.PARQUET.addExtension(filename));
+ try (FileAppender<GenericData.Record> writer =
Parquet.write(localOutput(file))
+ .schema(schema)
+ .setAll(properties)
+ .createWriterFunc(createWriterFunc)
+ .build()) {
+ writer.addAll(Lists.newArrayList(records));
+ }
+ return file;
+ }
+}
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
new file mode 100644
index 0000000..9429e45
--- /dev/null
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.types.Types.IntegerType;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.schema.MessageType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.iceberg.Files.localInput;
+import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestParquet extends BaseParquetWritingTest {
+
+ @Test
+ public void testRowGroupSizeConfigurable() throws IOException {
+ // Without an explicit writer function
+ File parquetFile = generateFileWithTwoRowGroups(null);
+
+ try (ParquetFileReader reader =
ParquetFileReader.open(ParquetIO.file(localInput(parquetFile)))) {
+ Assert.assertEquals(2, reader.getRowGroups().size());
+ }
+ }
+
+ @Test
+ public void testRowGroupSizeConfigurableWithWriter() throws IOException {
+ File parquetFile =
generateFileWithTwoRowGroups(ParquetAvroWriter::buildWriter);
+
+ try (ParquetFileReader reader =
ParquetFileReader.open(ParquetIO.file(localInput(parquetFile)))) {
+ Assert.assertEquals(2, reader.getRowGroups().size());
+ }
+ }
+
+ private File generateFileWithTwoRowGroups(Function<MessageType,
ParquetValueWriter<?>> createWriterFunc)
+ throws IOException {
+ Schema schema = new Schema(
+ optional(1, "intCol", IntegerType.get())
+ );
+
+ int minimumRowGroupRecordCount = 100;
+ int desiredRecordCount = minimumRowGroupRecordCount + 1;
+
+ List<GenericData.Record> records = new ArrayList<>(desiredRecordCount);
+ org.apache.avro.Schema avroSchema =
AvroSchemaUtil.convert(schema.asStruct());
+ for (int i = 1; i <= desiredRecordCount; i++) {
+ GenericData.Record record = new GenericData.Record(avroSchema);
+ record.put("intCol", i);
+ records.add(record);
+ }
+
+ // Force multiple row groups by making the byte size very small
+ // Note there'a also minimumRowGroupRecordCount which cannot be configured
so we have to write
+ // at least that many records for a new row group to occur
+ return writeRecords(
+ schema,
+ ImmutableMap.of(
+ PARQUET_ROW_GROUP_SIZE_BYTES,
+ Integer.toString(minimumRowGroupRecordCount * Integer.BYTES)),
+ createWriterFunc,
+ records.toArray(new GenericData.Record[] {}));
+ }
+}
diff --git
a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
index 229e77a..ec03850 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
@@ -32,11 +32,9 @@ import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericFixed;
import org.apache.commons.io.Charsets;
-import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
-import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types.BinaryType;
import org.apache.iceberg.types.Types.BooleanType;
@@ -55,20 +53,15 @@ import org.apache.iceberg.types.Types.TimeType;
import org.apache.iceberg.types.Types.TimestampType;
import org.apache.iceberg.types.Types.UUIDType;
import org.junit.Assert;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import static org.apache.iceberg.Files.localInput;
-import static org.apache.iceberg.Files.localOutput;
import static org.apache.iceberg.types.Conversions.fromByteBuffer;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
-public class TestParquetMetrics {
+public class TestParquetMetrics extends BaseParquetWritingTest {
- @Rule
- public TemporaryFolder temp = new TemporaryFolder();
private final UUID uuid = UUID.randomUUID();
private final GenericFixed fixed = new GenericData.Fixed(
org.apache.avro.Schema.createFixed("fixedCol", null, null, 4),
@@ -291,15 +284,4 @@ public class TestParquetMetrics {
upperBounds.containsKey(fieldId) ? fromByteBuffer(type,
upperBounds.get(fieldId)) : null);
}
- private File writeRecords(Schema schema, Record... records) throws
IOException {
- File tmpFolder = temp.newFolder("parquet");
- String filename = UUID.randomUUID().toString();
- File file = new File(tmpFolder, FileFormat.PARQUET.addExtension(filename));
- try (FileAppender<Record> writer = Parquet.write(localOutput(file))
- .schema(schema)
- .build()) {
- writer.addAll(Lists.newArrayList(records));
- }
- return file;
- }
}