This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 592e97f  ORC: Grow list and map child vectors with a growth factor of 
3 (#2218)
592e97f is described below

commit 592e97f5daf0716ec6f57b4f049dd191d8094fad
Author: Shardul Mahadik <[email protected]>
AuthorDate: Fri Feb 5 18:04:47 2021 -0800

    ORC: Grow list and map child vectors with a growth factor of 3 (#2218)
---
 .../apache/iceberg/data/orc/GenericOrcWriters.java | 13 ++-
 .../apache/iceberg/flink/data/FlinkOrcWriters.java | 13 ++-
 .../iceberg/spark/data/SparkOrcValueWriters.java   | 13 ++-
 .../IcebergSourceNestedListDataBenchmark.java      | 56 ++++++++++++
 ...ebergSourceNestedListORCDataWriteBenchmark.java | 99 ++++++++++++++++++++++
 ...gSourceNestedListParquetDataWriteBenchmark.java | 89 +++++++++++++++++++
 6 files changed, 274 insertions(+), 9 deletions(-)

diff --git 
a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java 
b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
index e4eff78..6589424 100644
--- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
+++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
@@ -461,7 +461,7 @@ public class GenericOrcWriters {
       cv.offsets[rowId] = cv.childCount;
       cv.childCount = (int) (cv.childCount + cv.lengths[rowId]);
       // make sure the child is big enough
-      cv.child.ensureSize(cv.childCount, true);
+      growColumnVector(cv.child, cv.childCount);
       // Add each element
       for (int e = 0; e < cv.lengths[rowId]; ++e) {
         element.write((int) (e + cv.offsets[rowId]), value.get(e), cv.child);
@@ -502,8 +502,8 @@ public class GenericOrcWriters {
       cv.offsets[rowId] = cv.childCount;
       cv.childCount = (int) (cv.childCount + cv.lengths[rowId]);
       // make sure the child is big enough
-      cv.keys.ensureSize(cv.childCount, true);
-      cv.values.ensureSize(cv.childCount, true);
+      growColumnVector(cv.keys, cv.childCount);
+      growColumnVector(cv.values, cv.childCount);
       // Add each element
       for (int e = 0; e < cv.lengths[rowId]; ++e) {
         int pos = (int) (e + cv.offsets[rowId]);
@@ -517,4 +517,11 @@ public class GenericOrcWriters {
       return Stream.concat(keyWriter.metrics(), valueWriter.metrics());
     }
   }
+
+  private static void growColumnVector(ColumnVector cv, int requestedSize) {
+    if (cv.isNull.length < requestedSize) {
+      // Use growth factor of 3 to avoid frequent array allocations
+      cv.ensureSize(requestedSize * 3, true);
+    }
+  }
 }
diff --git 
a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java 
b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java
index f088cb5..758d73d 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java
@@ -249,7 +249,7 @@ class FlinkOrcWriters {
       cv.offsets[rowId] = cv.childCount;
       cv.childCount = (int) (cv.childCount + cv.lengths[rowId]);
       // make sure the child is big enough.
-      cv.child.ensureSize(cv.childCount, true);
+      growColumnVector(cv.child, cv.childCount);
 
       for (int e = 0; e < cv.lengths[rowId]; ++e) {
         Object value = elementGetter.getElementOrNull(data, e);
@@ -295,8 +295,8 @@ class FlinkOrcWriters {
       cv.offsets[rowId] = cv.childCount;
       cv.childCount = (int) (cv.childCount + cv.lengths[rowId]);
       // make sure the child is big enough
-      cv.keys.ensureSize(cv.childCount, true);
-      cv.values.ensureSize(cv.childCount, true);
+      growColumnVector(cv.keys, cv.childCount);
+      growColumnVector(cv.values, cv.childCount);
       // Add each element
       for (int e = 0; e < cv.lengths[rowId]; ++e) {
         int pos = (int) (e + cv.offsets[rowId]);
@@ -348,4 +348,11 @@ class FlinkOrcWriters {
       return writers.stream().flatMap(OrcValueWriter::metrics);
     }
   }
+
+  private static void growColumnVector(ColumnVector cv, int requestedSize) {
+    if (cv.isNull.length < requestedSize) {
+      // Use growth factor of 3 to avoid frequent array allocations
+      cv.ensureSize(requestedSize * 3, true);
+    }
+  }
 }
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java 
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java
index 78e013e..4b40750 100644
--- 
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java
+++ 
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java
@@ -273,7 +273,7 @@ class SparkOrcValueWriters {
       cv.offsets[rowId] = cv.childCount;
       cv.childCount = (int) (cv.childCount + cv.lengths[rowId]);
       // make sure the child is big enough
-      cv.child.ensureSize(cv.childCount, true);
+      growColumnVector(cv.child, cv.childCount);
       // Add each element
       for (int e = 0; e < cv.lengths[rowId]; ++e) {
         writer.write((int) (e + cv.offsets[rowId]), e, value, cv.child);
@@ -306,8 +306,8 @@ class SparkOrcValueWriters {
       cv.offsets[rowId] = cv.childCount;
       cv.childCount = (int) (cv.childCount + cv.lengths[rowId]);
       // make sure the child is big enough
-      cv.keys.ensureSize(cv.childCount, true);
-      cv.values.ensureSize(cv.childCount, true);
+      growColumnVector(cv.keys, cv.childCount);
+      growColumnVector(cv.values, cv.childCount);
       // Add each element
       for (int e = 0; e < cv.lengths[rowId]; ++e) {
         int pos = (int) (e + cv.offsets[rowId]);
@@ -321,4 +321,11 @@ class SparkOrcValueWriters {
       return Stream.concat(keyWriter.metrics(), valueWriter.metrics());
     }
   }
+
+  private static void growColumnVector(ColumnVector cv, int requestedSize) {
+    if (cv.isNull.length < requestedSize) {
+      // Use growth factor of 3 to avoid frequent array allocations
+      cv.ensureSize(requestedSize * 3, true);
+    }
+  }
 }
diff --git 
a/spark2/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedListDataBenchmark.java
 
b/spark2/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedListDataBenchmark.java
new file mode 100644
index 0000000..369a150
--- /dev/null
+++ 
b/spark2/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedListDataBenchmark.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public abstract class IcebergSourceNestedListDataBenchmark extends 
IcebergSourceBenchmark {
+
+  @Override
+  protected Configuration initHadoopConf() {
+    return new Configuration();
+  }
+
+  @Override
+  protected final Table initTable() {
+    Schema schema = new Schema(
+        required(0, "id", Types.LongType.get()),
+        optional(1, "outerlist", Types.ListType.ofOptional(2,
+            Types.StructType.of(required(3, "innerlist", 
Types.ListType.ofRequired(4, Types.StringType.get())))
+        ))
+    );
+    PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
+    HadoopTables tables = new HadoopTables(hadoopConf());
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(TableProperties.METADATA_COMPRESSION, "gzip");
+    return tables.create(schema, partitionSpec, properties, 
newTableLocation());
+  }
+}
diff --git 
a/spark2/src/jmh/java/org/apache/iceberg/spark/source/orc/IcebergSourceNestedListORCDataWriteBenchmark.java
 
b/spark2/src/jmh/java/org/apache/iceberg/spark/source/orc/IcebergSourceNestedListORCDataWriteBenchmark.java
new file mode 100644
index 0000000..a1106cb
--- /dev/null
+++ 
b/spark2/src/jmh/java/org/apache/iceberg/spark/source/orc/IcebergSourceNestedListORCDataWriteBenchmark.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.orc;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.source.IcebergSourceNestedListDataBenchmark;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+
+import static org.apache.spark.sql.functions.array_repeat;
+import static org.apache.spark.sql.functions.expr;
+import static org.apache.spark.sql.functions.struct;
+
+/**
+ * A benchmark that evaluates the performance of writing nested Parquet data 
using Iceberg
+ * and the built-in file source in Spark.
+ *
+ * To run this benchmark:
+ * <code>
+ *   ./gradlew :iceberg-spark2:jmh
+ *       -PjmhIncludeRegex=IcebergSourceNestedListORCDataWriteBenchmark
+ *       
-PjmhOutputPath=benchmark/iceberg-source-nested-list-orc-data-write-benchmark-result.txt
+ * </code>
+ */
+public class IcebergSourceNestedListORCDataWriteBenchmark extends 
IcebergSourceNestedListDataBenchmark {
+
+  @Setup
+  public void setupBenchmark() {
+    setupSpark();
+  }
+
+  @TearDown
+  public void tearDownBenchmark() throws IOException {
+    tearDownSpark();
+    cleanupFiles();
+  }
+
+  @Param({"2000", "20000"})
+  private int numRows;
+
+  @Benchmark
+  @Threads(1)
+  public void writeIceberg() {
+    String tableLocation = table().location();
+    benchmarkData().write().format("iceberg").option("write-format", "orc")
+        .mode(SaveMode.Append).save(tableLocation);
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void writeIcebergDictionaryOff() {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put("orc.dictionary.key.threshold", "0");
+    withTableProperties(tableProperties, () -> {
+      String tableLocation = table().location();
+      benchmarkData().write().format("iceberg").option("write-format", "orc")
+          .mode(SaveMode.Append).save(tableLocation);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void writeFileSource() {
+    benchmarkData().write().mode(SaveMode.Append).orc(dataLocation());
+  }
+
+  private Dataset<Row> benchmarkData() {
+    return spark().range(numRows)
+        .withColumn("outerlist", array_repeat(struct(
+            expr("array_repeat(CAST(id AS string), 1000) AS innerlist")),
+            10))
+        .coalesce(1);
+  }
+}
diff --git 
a/spark2/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedListParquetDataWriteBenchmark.java
 
b/spark2/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedListParquetDataWriteBenchmark.java
new file mode 100644
index 0000000..160b370
--- /dev/null
+++ 
b/spark2/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedListParquetDataWriteBenchmark.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.parquet;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.source.IcebergSourceNestedListDataBenchmark;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.internal.SQLConf;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+
+import static org.apache.spark.sql.functions.array_repeat;
+import static org.apache.spark.sql.functions.expr;
+import static org.apache.spark.sql.functions.struct;
+
+/**
+ * A benchmark that evaluates the performance of writing nested Parquet data 
using Iceberg
+ * and the built-in file source in Spark.
+ *
+ * To run this benchmark:
+ * <code>
+ *   ./gradlew :iceberg-spark2:jmh
+ *       -PjmhIncludeRegex=IcebergSourceNestedListParquetDataWriteBenchmark
+ *       
-PjmhOutputPath=benchmark/iceberg-source-nested-list-parquet-data-write-benchmark-result.txt
+ * </code>
+ */
+public class IcebergSourceNestedListParquetDataWriteBenchmark extends 
IcebergSourceNestedListDataBenchmark {
+
+  @Setup
+  public void setupBenchmark() {
+    setupSpark();
+  }
+
+  @TearDown
+  public void tearDownBenchmark() throws IOException {
+    tearDownSpark();
+    cleanupFiles();
+  }
+
+  @Param({"2000", "20000"})
+  private int numRows;
+
+  @Benchmark
+  @Threads(1)
+  public void writeIceberg() {
+    String tableLocation = table().location();
+    
benchmarkData().write().format("iceberg").mode(SaveMode.Append).save(tableLocation);
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void writeFileSource() {
+    Map<String, String> conf = Maps.newHashMap();
+    conf.put(SQLConf.PARQUET_COMPRESSION().key(), "gzip");
+    withSQLConf(conf, () -> 
benchmarkData().write().mode(SaveMode.Append).parquet(dataLocation()));
+  }
+
+  private Dataset<Row> benchmarkData() {
+    return spark().range(numRows)
+        .withColumn("outerlist", array_repeat(struct(
+            expr("array_repeat(CAST(id AS string), 1000) AS innerlist")),
+            10))
+        .coalesce(1);
+  }
+}

Reply via email to