This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 592e97f ORC: Grow list and map child vectors with a growth factor of
3 (#2218)
592e97f is described below
commit 592e97f5daf0716ec6f57b4f049dd191d8094fad
Author: Shardul Mahadik <[email protected]>
AuthorDate: Fri Feb 5 18:04:47 2021 -0800
ORC: Grow list and map child vectors with a growth factor of 3 (#2218)
---
.../apache/iceberg/data/orc/GenericOrcWriters.java | 13 ++-
.../apache/iceberg/flink/data/FlinkOrcWriters.java | 13 ++-
.../iceberg/spark/data/SparkOrcValueWriters.java | 13 ++-
.../IcebergSourceNestedListDataBenchmark.java | 56 ++++++++++++
...ebergSourceNestedListORCDataWriteBenchmark.java | 99 ++++++++++++++++++++++
...gSourceNestedListParquetDataWriteBenchmark.java | 89 +++++++++++++++++++
6 files changed, 274 insertions(+), 9 deletions(-)
diff --git
a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
index e4eff78..6589424 100644
--- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
+++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
@@ -461,7 +461,7 @@ public class GenericOrcWriters {
cv.offsets[rowId] = cv.childCount;
cv.childCount = (int) (cv.childCount + cv.lengths[rowId]);
// make sure the child is big enough
- cv.child.ensureSize(cv.childCount, true);
+ growColumnVector(cv.child, cv.childCount);
// Add each element
for (int e = 0; e < cv.lengths[rowId]; ++e) {
element.write((int) (e + cv.offsets[rowId]), value.get(e), cv.child);
@@ -502,8 +502,8 @@ public class GenericOrcWriters {
cv.offsets[rowId] = cv.childCount;
cv.childCount = (int) (cv.childCount + cv.lengths[rowId]);
// make sure the child is big enough
- cv.keys.ensureSize(cv.childCount, true);
- cv.values.ensureSize(cv.childCount, true);
+ growColumnVector(cv.keys, cv.childCount);
+ growColumnVector(cv.values, cv.childCount);
// Add each element
for (int e = 0; e < cv.lengths[rowId]; ++e) {
int pos = (int) (e + cv.offsets[rowId]);
@@ -517,4 +517,11 @@ public class GenericOrcWriters {
return Stream.concat(keyWriter.metrics(), valueWriter.metrics());
}
}
+
+ private static void growColumnVector(ColumnVector cv, int requestedSize) {
+ if (cv.isNull.length < requestedSize) {
+ // Use growth factor of 3 to avoid frequent array allocations
+ cv.ensureSize(requestedSize * 3, true);
+ }
+ }
}
diff --git
a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java
b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java
index f088cb5..758d73d 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java
@@ -249,7 +249,7 @@ class FlinkOrcWriters {
cv.offsets[rowId] = cv.childCount;
cv.childCount = (int) (cv.childCount + cv.lengths[rowId]);
// make sure the child is big enough.
- cv.child.ensureSize(cv.childCount, true);
+ growColumnVector(cv.child, cv.childCount);
for (int e = 0; e < cv.lengths[rowId]; ++e) {
Object value = elementGetter.getElementOrNull(data, e);
@@ -295,8 +295,8 @@ class FlinkOrcWriters {
cv.offsets[rowId] = cv.childCount;
cv.childCount = (int) (cv.childCount + cv.lengths[rowId]);
// make sure the child is big enough
- cv.keys.ensureSize(cv.childCount, true);
- cv.values.ensureSize(cv.childCount, true);
+ growColumnVector(cv.keys, cv.childCount);
+ growColumnVector(cv.values, cv.childCount);
// Add each element
for (int e = 0; e < cv.lengths[rowId]; ++e) {
int pos = (int) (e + cv.offsets[rowId]);
@@ -348,4 +348,11 @@ class FlinkOrcWriters {
return writers.stream().flatMap(OrcValueWriter::metrics);
}
}
+
+ private static void growColumnVector(ColumnVector cv, int requestedSize) {
+ if (cv.isNull.length < requestedSize) {
+ // Use growth factor of 3 to avoid frequent array allocations
+ cv.ensureSize(requestedSize * 3, true);
+ }
+ }
}
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java
index 78e013e..4b40750 100644
---
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java
+++
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java
@@ -273,7 +273,7 @@ class SparkOrcValueWriters {
cv.offsets[rowId] = cv.childCount;
cv.childCount = (int) (cv.childCount + cv.lengths[rowId]);
// make sure the child is big enough
- cv.child.ensureSize(cv.childCount, true);
+ growColumnVector(cv.child, cv.childCount);
// Add each element
for (int e = 0; e < cv.lengths[rowId]; ++e) {
writer.write((int) (e + cv.offsets[rowId]), e, value, cv.child);
@@ -306,8 +306,8 @@ class SparkOrcValueWriters {
cv.offsets[rowId] = cv.childCount;
cv.childCount = (int) (cv.childCount + cv.lengths[rowId]);
// make sure the child is big enough
- cv.keys.ensureSize(cv.childCount, true);
- cv.values.ensureSize(cv.childCount, true);
+ growColumnVector(cv.keys, cv.childCount);
+ growColumnVector(cv.values, cv.childCount);
// Add each element
for (int e = 0; e < cv.lengths[rowId]; ++e) {
int pos = (int) (e + cv.offsets[rowId]);
@@ -321,4 +321,11 @@ class SparkOrcValueWriters {
return Stream.concat(keyWriter.metrics(), valueWriter.metrics());
}
}
+
+ private static void growColumnVector(ColumnVector cv, int requestedSize) {
+ if (cv.isNull.length < requestedSize) {
+ // Use growth factor of 3 to avoid frequent array allocations
+ cv.ensureSize(requestedSize * 3, true);
+ }
+ }
}
diff --git
a/spark2/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedListDataBenchmark.java
b/spark2/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedListDataBenchmark.java
new file mode 100644
index 0000000..369a150
--- /dev/null
+++
b/spark2/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedListDataBenchmark.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public abstract class IcebergSourceNestedListDataBenchmark extends
IcebergSourceBenchmark {
+
+ @Override
+ protected Configuration initHadoopConf() {
+ return new Configuration();
+ }
+
+ @Override
+ protected final Table initTable() {
+ Schema schema = new Schema(
+ required(0, "id", Types.LongType.get()),
+ optional(1, "outerlist", Types.ListType.ofOptional(2,
+ Types.StructType.of(required(3, "innerlist",
Types.ListType.ofRequired(4, Types.StringType.get())))
+ ))
+ );
+ PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
+ HadoopTables tables = new HadoopTables(hadoopConf());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(TableProperties.METADATA_COMPRESSION, "gzip");
+ return tables.create(schema, partitionSpec, properties,
newTableLocation());
+ }
+}
diff --git
a/spark2/src/jmh/java/org/apache/iceberg/spark/source/orc/IcebergSourceNestedListORCDataWriteBenchmark.java
b/spark2/src/jmh/java/org/apache/iceberg/spark/source/orc/IcebergSourceNestedListORCDataWriteBenchmark.java
new file mode 100644
index 0000000..a1106cb
--- /dev/null
+++
b/spark2/src/jmh/java/org/apache/iceberg/spark/source/orc/IcebergSourceNestedListORCDataWriteBenchmark.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.orc;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.source.IcebergSourceNestedListDataBenchmark;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+
+import static org.apache.spark.sql.functions.array_repeat;
+import static org.apache.spark.sql.functions.expr;
+import static org.apache.spark.sql.functions.struct;
+
+/**
+ * A benchmark that evaluates the performance of writing nested Parquet data
using Iceberg
+ * and the built-in file source in Spark.
+ *
+ * To run this benchmark:
+ * <code>
+ * ./gradlew :iceberg-spark2:jmh
+ * -PjmhIncludeRegex=IcebergSourceNestedListORCDataWriteBenchmark
+ *
-PjmhOutputPath=benchmark/iceberg-source-nested-list-orc-data-write-benchmark-result.txt
+ * </code>
+ */
+public class IcebergSourceNestedListORCDataWriteBenchmark extends
IcebergSourceNestedListDataBenchmark {
+
+ @Setup
+ public void setupBenchmark() {
+ setupSpark();
+ }
+
+ @TearDown
+ public void tearDownBenchmark() throws IOException {
+ tearDownSpark();
+ cleanupFiles();
+ }
+
+ @Param({"2000", "20000"})
+ private int numRows;
+
+ @Benchmark
+ @Threads(1)
+ public void writeIceberg() {
+ String tableLocation = table().location();
+ benchmarkData().write().format("iceberg").option("write-format", "orc")
+ .mode(SaveMode.Append).save(tableLocation);
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void writeIcebergDictionaryOff() {
+ Map<String, String> tableProperties = Maps.newHashMap();
+ tableProperties.put("orc.dictionary.key.threshold", "0");
+ withTableProperties(tableProperties, () -> {
+ String tableLocation = table().location();
+ benchmarkData().write().format("iceberg").option("write-format", "orc")
+ .mode(SaveMode.Append).save(tableLocation);
+ });
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void writeFileSource() {
+ benchmarkData().write().mode(SaveMode.Append).orc(dataLocation());
+ }
+
+ private Dataset<Row> benchmarkData() {
+ return spark().range(numRows)
+ .withColumn("outerlist", array_repeat(struct(
+ expr("array_repeat(CAST(id AS string), 1000) AS innerlist")),
+ 10))
+ .coalesce(1);
+ }
+}
diff --git
a/spark2/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedListParquetDataWriteBenchmark.java
b/spark2/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedListParquetDataWriteBenchmark.java
new file mode 100644
index 0000000..160b370
--- /dev/null
+++
b/spark2/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedListParquetDataWriteBenchmark.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.parquet;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.source.IcebergSourceNestedListDataBenchmark;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.internal.SQLConf;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+
+import static org.apache.spark.sql.functions.array_repeat;
+import static org.apache.spark.sql.functions.expr;
+import static org.apache.spark.sql.functions.struct;
+
+/**
+ * A benchmark that evaluates the performance of writing nested Parquet data
using Iceberg
+ * and the built-in file source in Spark.
+ *
+ * To run this benchmark:
+ * <code>
+ * ./gradlew :iceberg-spark2:jmh
+ * -PjmhIncludeRegex=IcebergSourceNestedListParquetDataWriteBenchmark
+ *
-PjmhOutputPath=benchmark/iceberg-source-nested-list-parquet-data-write-benchmark-result.txt
+ * </code>
+ */
+public class IcebergSourceNestedListParquetDataWriteBenchmark extends
IcebergSourceNestedListDataBenchmark {
+
+ @Setup
+ public void setupBenchmark() {
+ setupSpark();
+ }
+
+ @TearDown
+ public void tearDownBenchmark() throws IOException {
+ tearDownSpark();
+ cleanupFiles();
+ }
+
+ @Param({"2000", "20000"})
+ private int numRows;
+
+ @Benchmark
+ @Threads(1)
+ public void writeIceberg() {
+ String tableLocation = table().location();
+
benchmarkData().write().format("iceberg").mode(SaveMode.Append).save(tableLocation);
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void writeFileSource() {
+ Map<String, String> conf = Maps.newHashMap();
+ conf.put(SQLConf.PARQUET_COMPRESSION().key(), "gzip");
+ withSQLConf(conf, () ->
benchmarkData().write().mode(SaveMode.Append).parquet(dataLocation()));
+ }
+
+ private Dataset<Row> benchmarkData() {
+ return spark().range(numRows)
+ .withColumn("outerlist", array_repeat(struct(
+ expr("array_repeat(CAST(id AS string), 1000) AS innerlist")),
+ 10))
+ .coalesce(1);
+ }
+}