This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch vectorized-read
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/vectorized-read by this push:
new 9578d06 [WIP] Vectorized read using Arrow (#319)
9578d06 is described below
commit 9578d06719b0d0498dd2b64d94e3baed6f376808
Author: Gautam <[email protected]>
AuthorDate: Fri Jul 26 14:32:10 2019 -0700
[WIP] Vectorized read using Arrow (#319)
* First cut impl of reading Parquet FileIterator into ArrowRecordBatch
based reader
* made num records per arrow batch configurable
* addressed comments
* Added docs for public methods and ArrowReader class
* Fixed javadoc
* WIP first stab at reading into Arrow and returning as InternalRow iterator
* Add publish to snapshot repository by replacing version to
`1.0-adobe-2.0-SNAPSHOT` (snapshot prefix is required by snapshot repo)
* Adding arrow schema conversion utility
* adding arrow-vector dep to tests
* [WIP] Working vectorization for primitive types. Added test for
VectorizedSparkParquetReaders.
* [WIP] Added Decimal types to vectorization
* [WIP] added remaining primitive type vectorization and tests
* [WIP] unused imports fixed
* Add argument validation to HadoopTables#create (#298)
* Install source JAR when running install target (#310)
* Bump version to 1.0-adobe-3.0-vectorized-SNAPSHOT
* Temporarily ignore applying style check
* Fixing javadoc error
* Updating versions.lock
* fixed checkstyle errors
* Revert "Bump version to 1.0-adobe-3.0-vectorized-SNAPSHOT"
This reverts commit ceae2fd7c79ce1eaaaa2eed265481cfce6fdce16.
* cleanup
---
.baseline/checkstyle/.checkstyle.xml.swp | Bin 45056 -> 0 bytes
.../apache/iceberg/arrow/reader/ArrowReader.java | 242 ++++++++++++
build.gradle | 35 +-
.../org/apache/iceberg/arrow/ArrowSchemaUtil.java | 148 ++++++++
.../org/apache/iceberg/hadoop/HadoopTables.java | 11 +-
.../apache/iceberg/arrow/ArrowSchemaUtilTest.java | 123 ++++++
.../org/apache/iceberg/data/TableScanIterable.java | 3 +-
gradle.properties | 4 +
.../java/org/apache/iceberg/parquet/Parquet.java | 17 +-
.../org/apache/iceberg/parquet/ParquetAvro.java | 6 +-
.../org/apache/iceberg/parquet/ParquetFilters.java | 4 +-
.../java/org/apache/iceberg/parquet/ParquetIO.java | 6 +-
.../apache/iceberg/parquet/ParquetIterable.java | 2 +-
.../apache/iceberg/parquet/ParquetReadSupport.java | 4 +-
.../org/apache/iceberg/parquet/ParquetReader.java | 41 +-
.../iceberg/parquet/ParquetValueReaders.java | 84 +++++
.../iceberg/parquet/ParquetWriteSupport.java | 4 +-
.../org/apache/iceberg/parquet/ParquetWriter.java | 2 +-
settings.gradle | 2 +
.../iceberg/spark/data/SparkParquetReaders.java | 6 +-
.../data/vector/VectorizedParquetValueReaders.java | 414 +++++++++++++++++++++
.../data/vector/VectorizedSparkParquetReaders.java | 245 ++++++++++++
.../apache/iceberg/spark/source/IcebergSource.java | 30 +-
.../org/apache/iceberg/spark/source/Reader.java | 175 +++++----
.../org/apache/iceberg/spark/data/RandomData.java | 4 +-
.../org/apache/iceberg/spark/data/TestHelpers.java | 31 +-
.../data/TestSparkParquetVectorizedReader.java | 132 +++++++
versions.lock | 17 +-
versions.props | 1 +
29 files changed, 1672 insertions(+), 121 deletions(-)
diff --git a/.baseline/checkstyle/.checkstyle.xml.swp
b/.baseline/checkstyle/.checkstyle.xml.swp
deleted file mode 100644
index dac6cf2..0000000
Binary files a/.baseline/checkstyle/.checkstyle.xml.swp and /dev/null differ
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/reader/ArrowReader.java
b/arrow/src/main/java/org/apache/iceberg/arrow/reader/ArrowReader.java
new file mode 100644
index 0000000..f7e1f6d
--- /dev/null
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/reader/ArrowReader.java
@@ -0,0 +1,242 @@
+package org.apache.iceberg.arrow.reader;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.spark.TaskContext;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.arrow.ArrowUtils;
+import org.apache.spark.sql.execution.arrow.ArrowWriter;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ArrowColumnVector;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.apache.spark.util.TaskCompletionListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/***
+ * This is a helper class for Arrow reading. It provides two main converter
methods.
+ * These converter methods are currently used to first convert a Parquet
FileIterator
+ * into Iterator over ArrowRecordBatches. Second, the ArrowRecordBatch is made
+ * into Columnar Batch and exposed as an Iterator over InternalRow. The second
step is to
+ * done to conform to Spark's current interface. When Spark adds Arrow support
we will
+ * take the second iterator out and just return the first one.
+ */
+public class ArrowReader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ArrowReader.class);
+
+ /***
+ * Accepts an iterator over ArrowRecordBatches and copies into
ColumnarBatches.
+ * Since Spark uses Iterator over InternalRow we return this over
ColumarBatch.
+ * @param arrowBatchIter
+ * @param sparkSchema
+ * @param timeZoneId
+ * @return
+ */
+ public static InternalRowOverArrowBatchIterator fromBatchIterator(
+ Iterator<ArrowRecordBatch> arrowBatchIter,
+ StructType sparkSchema,
+ String timeZoneId) {
+
+ // timeZoneId required for TimestampType in StructType
+ Schema arrowSchema = ArrowUtils.toArrowSchema(sparkSchema, timeZoneId);
+ BufferAllocator allocator =
+ ArrowUtils.rootAllocator().newChildAllocator("fromBatchIterator", 0,
Long.MAX_VALUE);
+
+ VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator);
+
+ return new InternalRowOverArrowBatchIterator(arrowBatchIter, allocator,
root);
+ }
+
+ @NotThreadSafe
+ public static class InternalRowOverArrowBatchIterator implements
Iterator<InternalRow>, Closeable {
+
+ private final Iterator<ArrowRecordBatch> arrowBatchIter;
+ private final BufferAllocator allocator;
+ private final VectorSchemaRoot root;
+
+ private Iterator<InternalRow> rowIter;
+
+ InternalRowOverArrowBatchIterator(Iterator<ArrowRecordBatch>
arrowBatchIter,
+ BufferAllocator allocator,
+ VectorSchemaRoot root) {
+
+ this.arrowBatchIter = arrowBatchIter;
+ this.allocator = allocator;
+ this.root = root;
+
+ }
+
+
+
+ @Override
+ public boolean hasNext() {
+ if (rowIter != null && rowIter.hasNext()) {
+ return true;
+ }
+ if (arrowBatchIter.hasNext()) {
+ rowIter = nextBatch();
+ return true;
+ } else {
+ try {
+ close();
+ } catch (IOException ioe) {
+ throw new RuntimeException("Encountered an error while closing
iterator. "+ioe.getMessage(), ioe);
+ }
+ return false;
+ }
+ }
+
+ @Override
+ public InternalRow next() {
+ return rowIter.next();
+ }
+
+ private Iterator<InternalRow> nextBatch() {
+ ArrowRecordBatch arrowRecordBatch = arrowBatchIter.next();
+ long start = System.currentTimeMillis();
+ root.setRowCount(0);
+ VectorLoader vectorLoader = new VectorLoader(root);
+ vectorLoader.load(arrowRecordBatch);
+ arrowRecordBatch.close();
+
+ List<FieldVector> fieldVectors = root.getFieldVectors();
+ ColumnVector[] columns = new ColumnVector[fieldVectors.size()];
+ for(int i=0; i<fieldVectors.size(); i++) {
+ columns[i] = new ArrowColumnVector(fieldVectors.get(i));
+ }
+
+ ColumnarBatch batch = new ColumnarBatch(columns);
+ batch.setNumRows(root.getRowCount());
+
+ LOG.info("[InternalRowOverArrowIterator] => Created Columnar Batch with
"+root.getRowCount()+ " rows" +
+ ". Took " + (System.currentTimeMillis() - start) + " milliseconds.");
+ return batch.rowIterator();
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ // arrowWriter.finish();
+ root.close();
+ allocator.close();
+ }
+
+ }
+
+ /**
+ * Acceepts Iterator over InternalRow coming in from ParqeutReader's
FileIterator
+ * and creates ArrowRecordBatches over that by collecting rows from the
input iter.
+ * Each next() call over this iterator will collect up to maxRecordsPerBatch
rows
+ * at a time and create an Arrow batch with it and returns an iterator over
that.
+ * @param rowIter
+ * @param sparkSchema
+ * @param maxRecordsPerBatch
+ * @param timezonId
+ * @return
+ */
+ public static ArrowRecordBatchIterator toBatchIterator(
+ Iterator<InternalRow> rowIter,
+ StructType sparkSchema, int maxRecordsPerBatch,
+ String timezonId) {
+
+ // StructType sparkSchema = SparkSchemaUtil.convert(icebergSchema);
+ TaskContext context = TaskContext.get();
+
+ Schema arrowSchema = ArrowUtils.toArrowSchema(sparkSchema, timezonId);
+ BufferAllocator allocator = ArrowUtils.rootAllocator().newChildAllocator(
+ "toBatchIterator",
+ 0,
+ Long.MAX_VALUE);
+ VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator);
+
+ if (context!=null) {
+ context.addTaskCompletionListener(new TaskCompletionListener() {
+ @Override
+ public void onTaskCompletion(TaskContext context) {
+ root.close();
+ allocator.close();
+ }
+ });
+ }
+
+ return new ArrowRecordBatchIterator(rowIter, root, allocator,
maxRecordsPerBatch);
+ }
+
+
+ public static class ArrowRecordBatchIterator implements
Iterator<ArrowRecordBatch>, Closeable {
+
+ final Iterator<InternalRow> rowIterator;
+ final VectorSchemaRoot root;
+ final BufferAllocator allocator;
+ final int maxRecordsPerBatch;
+ final ArrowWriter arrowWriter;
+ final VectorUnloader unloader;
+
+ ArrowRecordBatchIterator(Iterator<InternalRow> rowIterator,
+ VectorSchemaRoot root,
+ BufferAllocator allocator,
+ int maxRecordsPerBatch) {
+
+ this.unloader = new VectorUnloader(root);
+ this.arrowWriter = ArrowWriter.create(root);
+ this.rowIterator = rowIterator;
+ this.root = root;
+ this.allocator = allocator;
+ this.maxRecordsPerBatch = maxRecordsPerBatch;
+ }
+
+ @Override
+ public boolean hasNext() {
+
+ if (!rowIterator.hasNext()) {
+
+ try {
+ close();
+ } catch (IOException ioe) {
+ throw new RuntimeException("Encountered an error while closing
iterator. "+ioe.getMessage(), ioe);
+ }
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public ArrowRecordBatch next() {
+
+ int rowCount = 0;
+
+ long start = System.currentTimeMillis();
+ while (rowIterator.hasNext() && (maxRecordsPerBatch <= 0 || rowCount <
maxRecordsPerBatch)) {
+ InternalRow row = rowIterator.next();
+ arrowWriter.write(row);
+ rowCount += 1;
+ }
+ arrowWriter.finish();
+ LOG.info("[ArrowRecordBatchIterator] => Created batch with "+rowCount+ "
rows. " +
+ "Took "+(System.currentTimeMillis() - start) + " milliseconds.");
+ ArrowRecordBatch batch = unloader.getRecordBatch();
+ return batch;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // arrowWriter.finish();
+ root.close();
+ allocator.close();
+ }
+ }
+}
diff --git a/build.gradle b/build.gradle
index 282ce61..71e8065 100644
--- a/build.gradle
+++ b/build.gradle
@@ -64,6 +64,10 @@ subprojects {
apply plugin: 'maven' // make pom files for deployment
apply plugin: 'nebula.maven-base-publish'
+ artifacts {
+ archives sourceJar
+ }
+
compileJava {
options.encoding = "UTF-8"
}
@@ -222,6 +226,14 @@ project(':iceberg-core') {
compile("org.apache.avro:avro") {
exclude group: 'org.tukaani' // xz compression is not supported
}
+ compile("org.apache.arrow:arrow-vector") {
+ exclude group: 'io.netty', module: 'netty-buffer'
+ exclude group: 'io.netty', module: 'netty-common'
+ }
+ compileOnly("org.apache.spark:spark-hive_2.11") {
+ exclude group: 'org.apache.avro', module: 'avro'
+ }
+
compile "com.fasterxml.jackson.core:jackson-databind"
compile "com.fasterxml.jackson.core:jackson-core"
@@ -237,7 +249,11 @@ project(':iceberg-data') {
dependencies {
compile project(':iceberg-api')
compile project(':iceberg-core')
+ compileOnly project(':iceberg-spark')
compileOnly project(':iceberg-parquet')
+ compileOnly("org.apache.spark:spark-hive_2.11") {
+ exclude group: 'org.apache.avro', module: 'avro'
+ }
testCompile("org.apache.hadoop:hadoop-client") {
exclude group: 'org.apache.avro', module: 'avro'
@@ -323,14 +339,12 @@ project(':iceberg-parquet') {
dependencies {
compile project(':iceberg-api')
compile project(':iceberg-core')
+ compile project(':iceberg-arrow')
- compile("org.apache.parquet:parquet-avro") {
+ compileOnly("org.apache.spark:spark-hive_2.11") {
exclude group: 'org.apache.avro', module: 'avro'
- // already shaded by Parquet
- exclude group: 'it.unimi.dsi'
- exclude group: 'org.codehaus.jackson'
}
-
+ compile "org.apache.parquet:parquet-avro"
compileOnly "org.apache.avro:avro"
compileOnly("org.apache.hadoop:hadoop-client") {
exclude group: 'org.apache.avro', module: 'avro'
@@ -363,6 +377,17 @@ project(':iceberg-spark') {
}
}
+project(':iceberg-arrow') {
+ dependencies {
+// compile project(':iceberg-spark')
+ compile project(':iceberg-api')
+
+ compileOnly("org.apache.spark:spark-hive_2.11") {
+ exclude group: 'org.apache.avro', module: 'avro'
+ }
+ }
+}
+
project(':iceberg-pig') {
dependencies {
compile project(':iceberg-api')
diff --git a/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
b/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
new file mode 100644
index 0000000..492af61
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.Map;
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.ListType;
+import org.apache.iceberg.types.Types.MapType;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StructType;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+
+public class ArrowSchemaUtil {
+ static final String ORIGINAL_TYPE = "originalType";
+ static final String MAP_TYPE = "mapType";
+ static final String MAP_KEY = "key";
+ static final String MAP_VALUE = "value";
+
+ private ArrowSchemaUtil() { }
+
+ /**
+ * Convert Iceberg schema to Arrow Schema.
+ *
+ * @param schema iceberg schema
+ * @return arrow schema
+ */
+ public static Schema convert(final org.apache.iceberg.Schema schema) {
+ final ImmutableList.Builder<Field> fields = ImmutableList.builder();
+
+ for (NestedField f : schema.columns()) {
+ fields.add(convert(f));
+ }
+
+ return new Schema(fields.build());
+ }
+
+ public static Field convert(final NestedField field) {
+ final ArrowType arrowType;
+
+ final List<Field> children = Lists.newArrayList();
+ Map<String, String> metadata = null;
+
+ switch (field.type().typeId()) {
+ case BINARY:
+ arrowType = ArrowType.Binary.INSTANCE;
+ break;
+ case FIXED:
+ arrowType = new ArrowType.FixedSizeBinary(((Types.FixedType)
field.type()).length());
+ break;
+ case BOOLEAN:
+ arrowType = ArrowType.Bool.INSTANCE;
+ break;
+ case INTEGER:
+ arrowType = new ArrowType.Int(Integer.SIZE, true);
+ break;
+ case LONG:
+ arrowType = new ArrowType.Int(Long.SIZE, true);
+ break;
+ case FLOAT:
+ arrowType = new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE);
+ break;
+ case DOUBLE:
+ arrowType = new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
+ break;
+ case DECIMAL:
+ final Types.DecimalType decimalType = (Types.DecimalType) field.type();
+ arrowType = new ArrowType.Decimal(decimalType.precision(),
decimalType.scale());
+ break;
+ case STRING:
+ arrowType = ArrowType.Utf8.INSTANCE;
+ break;
+ case TIME:
+ arrowType = new ArrowType.Time(TimeUnit.MICROSECOND, Long.SIZE);
+ break;
+ case TIMESTAMP:
+ arrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC");
+ break;
+ case DATE:
+ arrowType = new ArrowType.Date(DateUnit.DAY);
+ break;
+ case STRUCT:
+ final StructType struct = field.type().asStructType();
+ arrowType = ArrowType.Struct.INSTANCE;
+
+ for (NestedField nested : struct.fields()) {
+ children.add(convert(nested));
+ }
+ break;
+ case LIST:
+ final ListType listType = field.type().asListType();
+ arrowType = ArrowType.List.INSTANCE;
+
+ for (NestedField nested : listType.fields()) {
+ children.add(convert(nested));
+ }
+ break;
+ case MAP:
+ //Maps are represented as List<Struct<key, value>>
+ metadata = ImmutableMap.of(ORIGINAL_TYPE, MAP_TYPE);
+ final MapType mapType = field.type().asMapType();
+ arrowType = ArrowType.List.INSTANCE;
+
+ final List<Field> entryFields = Lists.newArrayList(
+ convert(required(0, MAP_KEY, mapType.keyType())),
+ convert(optional(0, MAP_VALUE, mapType.valueType()))
+ );
+
+ final Field entry = new Field("",
+ new FieldType(true, new ArrowType.Struct(), null), entryFields);
+ children.add(entry);
+ break;
+ default: throw new UnsupportedOperationException("Unsupported field
type: " + field);
+ }
+
+ return new Field(field.name(), new FieldType(field.isOptional(),
arrowType, null, metadata), children);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java
b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java
index 553faef..e0de97d 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java
@@ -19,6 +19,8 @@
package org.apache.iceberg.hadoop;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
@@ -69,19 +71,24 @@ public class HadoopTables implements Tables, Configurable {
* location.
*
* @param schema iceberg schema used to create the table
- * @param spec partition specification
+ * @param spec partitioning spec, if null the table will be unpartitioned
+ * @param properties a string map of table properties, initialized to empty
if null
* @param location a path URI (e.g. hdfs:///warehouse/my_table)
* @return newly created table implementation
*/
@Override
public Table create(Schema schema, PartitionSpec spec, Map<String, String>
properties,
String location) {
+ Preconditions.checkNotNull(schema, "A table schema is required");
+
TableOperations ops = newTableOps(location);
if (ops.current() != null) {
throw new AlreadyExistsException("Table already exists at location: " +
location);
}
- TableMetadata metadata = TableMetadata.newTableMetadata(ops, schema, spec,
location, properties);
+ Map<String, String> tableProps = properties == null ? ImmutableMap.of() :
properties;
+ PartitionSpec partitionSpec = spec == null ? PartitionSpec.unpartitioned()
: spec;
+ TableMetadata metadata = TableMetadata.newTableMetadata(ops, schema,
partitionSpec, location, tableProps);
ops.commit(null, metadata);
return new BaseTable(ops, location);
diff --git
a/core/src/test/java/org/apache/iceberg/arrow/ArrowSchemaUtilTest.java
b/core/src/test/java/org/apache/iceberg/arrow/ArrowSchemaUtilTest.java
new file mode 100644
index 0000000..f42ebde
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/arrow/ArrowSchemaUtilTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow;
+
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.BooleanType;
+import org.apache.iceberg.types.Types.DateType;
+import org.apache.iceberg.types.Types.DoubleType;
+import org.apache.iceberg.types.Types.ListType;
+import org.apache.iceberg.types.Types.LongType;
+import org.apache.iceberg.types.Types.MapType;
+import org.apache.iceberg.types.Types.StringType;
+import org.apache.iceberg.types.Types.TimestampType;
+import org.junit.Test;
+
+import static org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID.Bool;
+import static org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID.Date;
+import static
org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID.FloatingPoint;
+import static org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID.Int;
+import static org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID.List;
+import static
org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID.Timestamp;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+
+public class ArrowSchemaUtilTest {
+
+ @Test
+ public void convertPrimitive() {
+ Schema iceberg = new Schema(
+ optional(0, "i", Types.IntegerType.get()),
+ optional(1, "b", BooleanType.get()),
+ required(2, "d", DoubleType.get()),
+ required(3, "s", StringType.get()),
+ optional(4, "d2", DateType.get()),
+ optional(5, "ts", TimestampType.withoutZone())
+ );
+
+ org.apache.arrow.vector.types.pojo.Schema arrow =
ArrowSchemaUtil.convert(iceberg);
+
+ System.out.println(iceberg);
+ System.out.println(arrow);
+
+ validate(iceberg, arrow);
+ }
+
+ @Test
+ public void convertComplex() {
+ Schema iceberg = new Schema(
+ optional(0, "m", MapType.ofOptional(
+ 1, 2, StringType.get(),
+ LongType.get())
+ ),
+ required(3, "m2", MapType.ofOptional(
+ 4, 5, StringType.get(),
+ ListType.ofOptional(6, TimestampType.withoutZone()))
+ )
+ );
+
+ org.apache.arrow.vector.types.pojo.Schema arrow =
ArrowSchemaUtil.convert(iceberg);
+
+ System.out.println(iceberg);
+ System.out.println(arrow);
+
+ assertEquals(iceberg.columns().size(), arrow.getFields().size());
+ }
+
+ private void validate(Schema iceberg,
org.apache.arrow.vector.types.pojo.Schema arrow) {
+ assertEquals(iceberg.columns().size(), arrow.getFields().size());
+
+ for (Types.NestedField nf : iceberg.columns()) {
+ Field field = arrow.findField(nf.name());
+ assertNotNull("Missing filed: " + nf, field);
+
+ validate(nf.type(), field.getType());
+ }
+ }
+
+ private void validate(Type iceberg, ArrowType arrow) {
+ switch (iceberg.typeId()) {
+ case BOOLEAN: assertEquals(Bool, arrow.getTypeID());
+ break;
+ case INTEGER: assertEquals(Int, arrow.getTypeID());
+ break;
+ case LONG: assertEquals(Int, arrow.getTypeID());
+ break;
+ case DOUBLE: assertEquals(FloatingPoint, arrow.getTypeID());
+ break;
+ case STRING: assertEquals(ArrowType.Utf8.INSTANCE.getTypeID(),
arrow.getTypeID());
+ break;
+ case DATE: assertEquals(Date, arrow.getTypeID());
+ break;
+ case TIMESTAMP: assertEquals(Timestamp, arrow.getTypeID());
+ break;
+ case MAP: assertEquals(List, arrow.getTypeID());
+ break;
+ default: throw new UnsupportedOperationException("Check not implemented
for type: " + iceberg);
+ }
+ }
+}
diff --git a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
index 27a625d..b8d59c9 100644
--- a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
+++ b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.spark.SparkSchemaUtil;
class TableScanIterable extends CloseableGroup implements
CloseableIterable<Record> {
private final TableOperations ops;
@@ -89,7 +90,7 @@ class TableScanIterable extends CloseableGroup implements
CloseableIterable<Reco
case PARQUET:
Parquet.ReadBuilder parquet = Parquet.read(input)
- .project(projection)
+ .project(projection, SparkSchemaUtil.convert(projection))
.createReaderFunc(fileSchema ->
GenericParquetReaders.buildReader(projection, fileSchema))
.split(task.start(), task.length());
diff --git a/gradle.properties b/gradle.properties
index f2ff982..90a909a 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,2 +1,6 @@
jmhOutputPath=build/reports/jmh/human-readable-output.txt
jmhIncludeRegex=.*
+
+artifactory_contextUrl=https://artifactory.corp.adobe.com/artifactory
+artifactory_user_p=
+artifactory_key_p=
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 494f2c5..9ce8dff 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -55,6 +55,7 @@ import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
+import org.apache.spark.sql.types.StructType;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT;
@@ -273,6 +274,7 @@ public class Parquet {
private Long start = null;
private Long length = null;
private Schema schema = null;
+ private StructType sparkSchema = null;
private Expression filter = null;
private ReadSupport<?> readSupport = null;
private Function<MessageType, ParquetValueReader<?>> readerFunc = null;
@@ -281,6 +283,7 @@ public class Parquet {
private Map<String, String> properties = Maps.newHashMap();
private boolean callInit = false;
private boolean reuseContainers = false;
+ private int maxRecordsPerBatch = 1000;
private ReadBuilder(InputFile file) {
this.file = file;
@@ -299,6 +302,12 @@ public class Parquet {
return this;
}
+ public ReadBuilder project(Schema schema, StructType sparkSchema) {
+ this.schema = schema;
+ this.sparkSchema = sparkSchema;
+ return this;
+ }
+
public ReadBuilder project(Schema schema) {
this.schema = schema;
return this;
@@ -348,6 +357,12 @@ public class Parquet {
return this;
}
+ public ReadBuilder recordsPerBatch(int numRowsPerBatch) {
+
+ this.maxRecordsPerBatch = numRowsPerBatch;
+ return this;
+ }
+
@SuppressWarnings("unchecked")
public <D> CloseableIterable<D> build() {
if (readerFunc != null) {
@@ -374,7 +389,7 @@ public class Parquet {
ParquetReadOptions options = optionsBuilder.build();
return new org.apache.iceberg.parquet.ParquetReader<>(
- file, schema, options, readerFunc, filter, reuseContainers,
caseSensitive);
+ file, schema, options, readerFunc, filter, reuseContainers,
caseSensitive, sparkSchema, maxRecordsPerBatch);
}
ParquetReadBuilder<D> builder = new
ParquetReadBuilder<>(ParquetIO.file(file));
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvro.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvro.java
index 4c315c3..afcfb78 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvro.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvro.java
@@ -36,8 +36,8 @@ import org.apache.iceberg.avro.AvroSchemaVisitor;
import org.apache.iceberg.avro.UUIDConversion;
import org.apache.iceberg.types.TypeUtil;
-class ParquetAvro {
- static Schema parquetAvroSchema(Schema avroSchema) {
+public class ParquetAvro {
+ public static Schema parquetAvroSchema(Schema avroSchema) {
return AvroSchemaVisitor.visit(avroSchema, new
ParquetDecimalSchemaConverter());
}
@@ -173,7 +173,7 @@ class ParquetAvro {
}
}
- static GenericData DEFAULT_MODEL = new SpecificData() {
+ public static GenericData DEFAULT_MODEL = new SpecificData() {
private final Conversion<?> fixedDecimalConversion = new
FixedDecimalConversion();
private final Conversion<?> intDecimalConversion = new
IntDecimalConversion();
private final Conversion<?> longDecimalConversion = new
LongDecimalConversion();
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java
index b4a675e..cbf2d1c 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java
@@ -37,9 +37,9 @@ import org.apache.parquet.io.api.Binary;
import static org.apache.iceberg.expressions.ExpressionVisitors.visit;
-class ParquetFilters {
+public class ParquetFilters {
- static FilterCompat.Filter convert(Schema schema, Expression expr, boolean
caseSensitive) {
+ public static FilterCompat.Filter convert(Schema schema, Expression expr,
boolean caseSensitive) {
FilterPredicate pred = visit(expr, new ConvertFilterToParquet(schema,
caseSensitive));
// TODO: handle AlwaysFalse.INSTANCE
if (pred != null && pred != AlwaysTrue.INSTANCE) {
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java
index 360a055..6432483 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java
@@ -44,11 +44,11 @@ import static
org.apache.parquet.hadoop.util.HadoopOutputFile.fromPath;
/**
* Methods in this class translate from the IO API to Parquet's IO API.
*/
-class ParquetIO {
+public class ParquetIO {
private ParquetIO() {
}
- static InputFile file(org.apache.iceberg.io.InputFile file) {
+ public static InputFile file(org.apache.iceberg.io.InputFile file) {
// TODO: use reflection to avoid depending on classes from iceberg-hadoop
// TODO: use reflection to avoid depending on classes from hadoop
if (file instanceof HadoopInputFile) {
@@ -62,7 +62,7 @@ class ParquetIO {
return new ParquetInputFile(file);
}
- static OutputFile file(org.apache.iceberg.io.OutputFile file) {
+ public static OutputFile file(org.apache.iceberg.io.OutputFile file) {
if (file instanceof HadoopOutputFile) {
HadoopOutputFile hfile = (HadoopOutputFile) file;
try {
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java
index bc43448..7d6d72c 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java
@@ -31,7 +31,7 @@ import org.apache.parquet.hadoop.ParquetReader;
public class ParquetIterable<T> extends CloseableGroup implements
CloseableIterable<T> {
private final ParquetReader.Builder<T> builder;
- ParquetIterable(ParquetReader.Builder<T> builder) {
+ public ParquetIterable(ParquetReader.Builder<T> builder) {
this.builder = builder;
}
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java
index 8a0b44c..b0c8a31 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java
@@ -41,12 +41,12 @@ import static
org.apache.iceberg.parquet.ParquetSchemaUtil.pruneColumnsFallback;
*
* @param <T> Java type produced by this read support instance
*/
-class ParquetReadSupport<T> extends ReadSupport<T> {
+public class ParquetReadSupport<T> extends ReadSupport<T> {
private final Schema expectedSchema;
private final ReadSupport<T> wrapped;
private final boolean callInit;
- ParquetReadSupport(Schema expectedSchema, ReadSupport<T> readSupport,
boolean callInit) {
+ public ParquetReadSupport(Schema expectedSchema, ReadSupport<T> readSupport,
boolean callInit) {
this.expectedSchema = expectedSchema;
this.wrapped = readSupport;
this.callInit = callInit;
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
index 2ff2bdb..9f1e5fb 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
@@ -23,8 +23,10 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
+import java.util.TimeZone;
import java.util.function.Function;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.arrow.reader.ArrowReader;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
@@ -36,6 +38,11 @@ import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.schema.MessageType;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.iceberg.parquet.ParquetSchemaUtil.addFallbackIds;
import static org.apache.iceberg.parquet.ParquetSchemaUtil.hasIds;
@@ -50,18 +57,24 @@ public class ParquetReader<T> extends CloseableGroup
implements CloseableIterabl
private final Expression filter;
private final boolean reuseContainers;
private final boolean caseSensitive;
+ private final StructType sparkSchema;
+ private final int maxRecordsPerBatch;
+ private static final Logger LOG =
LoggerFactory.getLogger(ParquetReader.class);
public ParquetReader(InputFile input, Schema expectedSchema,
ParquetReadOptions options,
Function<MessageType, ParquetValueReader<?>> readerFunc,
- Expression filter, boolean reuseContainers, boolean
caseSensitive) {
+ Expression filter, boolean reuseContainers, boolean
caseSensitive,
+ StructType sparkSchema, int maxRecordsPerBatch) {
this.input = input;
this.expectedSchema = expectedSchema;
+ this.sparkSchema = sparkSchema;
this.options = options;
this.readerFunc = readerFunc;
// replace alwaysTrue with null to avoid extra work evaluating a trivial
filter
this.filter = filter == Expressions.alwaysTrue() ? null : filter;
this.reuseContainers = reuseContainers;
this.caseSensitive = caseSensitive;
+ this.maxRecordsPerBatch = maxRecordsPerBatch;
}
private static class ReadConf<T> {
@@ -185,11 +198,31 @@ public class ParquetReader<T> extends CloseableGroup
implements CloseableIterabl
@Override
public Iterator<T> iterator() {
+ // create iterator over file
FileIterator<T> iter = new FileIterator<>(init());
addCloseable(iter);
+
return iter;
}
+ private Iterator<T> arrowBatchAsInternalRow(Iterator<InternalRow> iter) {
+ // Convert InterRow iterator to ArrowRecordBatch Iterator
+ Iterator<InternalRow> rowIterator = iter;
+ ArrowReader.ArrowRecordBatchIterator arrowBatchIter =
ArrowReader.toBatchIterator(rowIterator,
+ sparkSchema, maxRecordsPerBatch,
+ TimeZone.getDefault().getID());
+ addCloseable(arrowBatchIter);
+
+ // Overlay InternalRow iterator over ArrowRecordbatches
+ ArrowReader.InternalRowOverArrowBatchIterator
+ rowOverbatchIter = ArrowReader.fromBatchIterator(arrowBatchIter,
+ sparkSchema, TimeZone.getDefault().getID());
+
+ addCloseable(rowOverbatchIter);
+
+ return (Iterator)rowOverbatchIter;
+ }
+
private static class FileIterator<T> implements Iterator<T>, Closeable {
private final ParquetFileReader reader;
private final boolean[] shouldSkip;
@@ -226,7 +259,11 @@ public class ParquetReader<T> extends CloseableGroup
implements CloseableIterabl
} else {
this.last = model.read(null);
}
- valuesRead += 1;
+ if (last instanceof ColumnarBatch) {
+ valuesRead += ((ColumnarBatch)last).numRows();
+ } else {
+ valuesRead += 1;
+ }
return last;
}
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
index ac61983..4529367 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
@@ -26,13 +26,18 @@ import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.iceberg.types.Types;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.Type;
+import org.apache.spark.sql.vectorized.ArrowColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
import static java.util.Collections.emptyIterator;
@@ -576,6 +581,85 @@ public class ParquetValueReaders {
}
}
+
+ public static class ColumnarBatchReader implements
ParquetValueReader<ColumnarBatch> {
+
+ private final int numFields;
+ private final Types.StructType iceExpectedFields;
+ private final ParquetValueReader<FieldVector>[] readers;
+ private final TripleIterator<?> column;
+ private final TripleIterator<?>[] columns;
+ private final List<TripleIterator<?>> children;
+
+ @SuppressWarnings("unchecked")
+ public ColumnarBatchReader(List<Type> types,
+ Types.StructType icebergExpectedFields,
+ List<ParquetValueReader<FieldVector>> readers) {
+
+ this.numFields = readers.size();
+ this.iceExpectedFields = icebergExpectedFields;
+ this.readers = (ParquetValueReader<FieldVector>[]) Array.newInstance(
+ ParquetValueReader.class, readers.size());
+ this.columns = (TripleIterator<?>[])
Array.newInstance(TripleIterator.class, readers.size());
+
+
+ ImmutableList.Builder<TripleIterator<?>> columnsBuilder =
ImmutableList.builder();
+ for (int i = 0; i < readers.size(); i += 1) {
+ ParquetValueReader<FieldVector> reader = readers.get(i);
+ this.readers[i] = readers.get(i);
+ this.columns[i] = reader.column();
+ columnsBuilder.addAll(reader.columns());
+ }
+
+ this.children = columnsBuilder.build();
+ if (children.size() > 0) {
+ this.column = children.get(0);
+ } else {
+ this.column = NullReader.NULL_COLUMN;
+ }
+
+ }
+
+ @Override
+ public final void setPageSource(PageReadStore pageStore) {
+ for (int i = 0; i < readers.length; i += 1) {
+ readers[i].setPageSource(pageStore);
+ }
+ }
+
+ @Override
+ public final TripleIterator<?> column() {
+ return column;
+ }
+
+ @Override
+ public List<TripleIterator<?>> columns() {
+ return children;
+ }
+
+
+ @Override
+ public final ColumnarBatch read(ColumnarBatch ignore) {
+
+ ArrowColumnVector[] arrowVectorArr =
(ArrowColumnVector[])Array.newInstance(ArrowColumnVector.class,
+ readers.length);
+
+ int numRows=0;
+ for (int i = 0; i < readers.length; i += 1) {
+
+ FieldVector vec = readers[i].read(null);
+ arrowVectorArr[i] = new ArrowColumnVector(vec);
+ numRows = vec.getValueCount();
+ }
+
+ ColumnarBatch batch = new ColumnarBatch(arrowVectorArr);
+ batch.setNumRows(numRows);
+
+ return batch;
+ }
+
+ }
+
public abstract static class StructReader<T, I> implements
ParquetValueReader<T> {
private interface Setter<R> {
void set(R record, int pos, Object reuse);
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java
index 633f9f8..d097c67 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java
@@ -26,12 +26,12 @@ import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.MessageType;
-class ParquetWriteSupport<T> extends WriteSupport<T> {
+public class ParquetWriteSupport<T> extends WriteSupport<T> {
private final MessageType type;
private final Map<String, String> keyValueMetadata;
private final WriteSupport<T> wrapped;
- ParquetWriteSupport(MessageType type, Map<String, String> keyValueMetadata,
WriteSupport<T> writeSupport) {
+ public ParquetWriteSupport(MessageType type, Map<String, String>
keyValueMetadata, WriteSupport<T> writeSupport) {
this.type = type;
this.keyValueMetadata = keyValueMetadata;
this.wrapped = writeSupport;
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
index c0956a1..e8f95ea 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
@@ -47,7 +47,7 @@ import static java.lang.Math.max;
import static java.lang.Math.min;
import static org.apache.iceberg.parquet.ParquetSchemaUtil.convert;
-class ParquetWriter<T> implements FileAppender<T>, Closeable {
+public class ParquetWriter<T> implements FileAppender<T>, Closeable {
private static final DynConstructors.Ctor<PageWriteStore> pageStoreCtor =
DynConstructors
.builder(PageWriteStore.class)
.hiddenImpl("org.apache.parquet.hadoop.ColumnChunkPageWriteStore",
diff --git a/settings.gradle b/settings.gradle
index 4145cfe..e5df47b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -25,6 +25,7 @@ include 'data'
include 'orc'
include 'parquet'
include 'spark'
+include 'arrow'
include 'pig'
include 'runtime'
include 'hive'
@@ -36,6 +37,7 @@ project(':data').name = 'iceberg-data'
project(':orc').name = 'iceberg-orc'
project(':parquet').name = 'iceberg-parquet'
project(':spark').name = 'iceberg-spark'
+project(':arrow').name = 'iceberg-arrow'
project(':pig').name = 'iceberg-pig'
project(':runtime').name = 'iceberg-spark-runtime'
project(':hive').name = 'iceberg-hive'
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
index 9a36266..e8cc083 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
@@ -118,6 +118,10 @@ public class SparkParquetReaders {
this.type = type;
}
+ protected MessageType getType() {
+ return type;
+ }
+
@Override
public ParquetValueReader<?> message(Types.StructType expected,
MessageType message,
List<ParquetValueReader<?>>
fieldReaders) {
@@ -360,7 +364,7 @@ public class SparkParquetReaders {
}
}
- private static class StringReader extends PrimitiveReader<UTF8String> {
+ protected static class StringReader extends PrimitiveReader<UTF8String> {
StringReader(ColumnDescriptor desc) {
super(desc);
}
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedParquetValueReaders.java
b/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedParquetValueReaders.java
new file mode 100644
index 0000000..3b67c96
--- /dev/null
+++
b/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedParquetValueReaders.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.vector;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.iceberg.arrow.ArrowSchemaUtil;
+import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.Type;
+import org.apache.spark.sql.types.Decimal;
+
+/***
+ * Parquet Value Reader implementations for Vectorization.
+ * Contains type-wise readers to read parquet data as vectors.
+ * - Returns Arrow's Field Vector for each type.
+ * - Null values are explicitly handled.
+ * - Type serialization is done based on types in Arrow.
+ * - Creates One Vector per RowGroup. So a Batch would have as many rows as
there are in the underlying RowGroup.
+ * - Mapping of Iceberg type to Arrow type is done in ArrowSchemaUtil.convert()
+ * - Iceberg to Arrow Type mapping :
+ * icebergType : LONG - Field Vector Type :
org.apache.arrow.vector.BigIntVector
+ * icebergType : STRING - Field Vector Type :
org.apache.arrow.vector.VarCharVector
+ * icebergType : BOOLEAN - Field Vector Type :
org.apache.arrow.vector.BitVector
+ * icebergType : INTEGER - Field Vector Type :
org.apache.arrow.vector.IntVector
+ * icebergType : FLOAT - Field Vector Type :
org.apache.arrow.vector.Float4Vector
+ * icebergType : DOUBLE - Field Vector Type :
org.apache.arrow.vector.Float8Vector
+ * icebergType : DATE - Field Vector Type :
org.apache.arrow.vector.DateDayVector
+ * icebergType : TIMESTAMP - Field Vector Type :
org.apache.arrow.vector.TimeStampMicroTZVector
+ * icebergType : STRING - Field Vector Type :
org.apache.arrow.vector.VarCharVector
+ * icebergType : BINARY - Field Vector Type :
org.apache.arrow.vector.VarBinaryVector
+ * icebergField : DECIMAL - Field Vector Type :
org.apache.arrow.vector.DecimalVector
+ */
+public class VectorizedParquetValueReaders {
+
+ public abstract static class VectorReader extends
ParquetValueReaders.PrimitiveReader<FieldVector> {
+
+ private FieldVector vec;
+ private boolean isOptional;
+
+ VectorReader(ColumnDescriptor desc,
+ Types.NestedField icebergField,
+ RootAllocator rootAlloc) {
+
+ super(desc);
+ this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
+ this.isOptional =
desc.getPrimitiveType().isRepetition(Type.Repetition.OPTIONAL);
+ }
+
+ protected FieldVector getVector() {
+ return vec;
+ }
+
+ protected boolean isOptional() {
+ return isOptional;
+ }
+
+ @Override
+ public FieldVector read(FieldVector ignore) {
+
+ vec.reset();
+ int ordinal = 0;
+
+ while (column.hasNext()) {
+ // Todo: this check works for flat schemas only
+ // need to get max definition level to do proper check
+ if (isOptional && column.currentDefinitionLevel() == 0) {
+ // handle null
+ column.nextNull();
+ nextNullAt(ordinal);
+ } else {
+ nextValueAt(ordinal);
+ }
+ ordinal++;
+ }
+ vec.setValueCount(ordinal);
+ return vec;
+ }
+
+
+ public int getRowCount() {
+ return vec.getValueCount();
+ }
+
+ protected abstract void nextNullAt(int ordinal);
+
+ protected abstract void nextValueAt(int ordinal);
+ }
+
+ protected static class StringReader extends VectorReader {
+
+ StringReader(ColumnDescriptor desc, Types.NestedField icebergField,
RootAllocator rootAlloc) {
+ super(desc, icebergField, rootAlloc);
+ }
+
+ @Override
+ protected void nextNullAt(int ordinal) {
+ ((VarCharVector) getVector()).setNull(ordinal);
+ }
+
+ @Override
+ protected void nextValueAt(int ordinal) {
+
+ Binary binary = column.nextBinary();
+ if (binary == null) {
+
+ ((VarCharVector) getVector()).setNull(ordinal);
+
+ } else {
+ String utf8Str = binary.toStringUsingUTF8();
+ ((VarCharVector) getVector()).setSafe(ordinal, utf8Str.getBytes());
+ }
+ }
+
+ }
+
+ protected static class IntegerReader extends VectorReader {
+
+ IntegerReader(ColumnDescriptor desc,
+ Types.NestedField icebergField,
+ RootAllocator rootAlloc) {
+
+ super(desc, icebergField, rootAlloc);
+ }
+
+ @Override
+ protected void nextNullAt(int ordinal) {
+ ((IntVector) getVector()).setNull(ordinal);
+ }
+
+ protected void nextValueAt(int ordinal) {
+
+ int intValue = column.nextInteger();
+ ((IntVector) getVector()).setSafe(ordinal, intValue);
+
+ }
+ }
+
+ protected static class LongReader extends VectorReader {
+
+ LongReader(ColumnDescriptor desc,
+ Types.NestedField icebergField,
+ RootAllocator rootAlloc) {
+
+ super(desc, icebergField, rootAlloc);
+ }
+
+ protected void nextNullAt(int ordinal) {
+ ((BigIntVector) getVector()).setNull(ordinal);
+ }
+
+ protected void nextValueAt(int ordinal) {
+
+ long longValue = column.nextLong();
+ ((BigIntVector) getVector()).setSafe(ordinal, longValue);
+
+ }
+ }
+
+ protected static class TimestampMillisReader extends LongReader {
+
+ TimestampMillisReader(ColumnDescriptor desc,
+ Types.NestedField icebergField,
+ RootAllocator rootAlloc) {
+ super(desc, icebergField, rootAlloc);
+ }
+
+ protected void nextValueAt(int ordinal) {
+
+ long longValue = column.nextLong();
+ ((BigIntVector) getVector()).setSafe(ordinal, 1000 * longValue);
+
+ }
+ }
+
+ protected static class TimestampMicroReader extends VectorReader {
+
+ TimestampMicroReader(ColumnDescriptor desc,
+ Types.NestedField icebergField,
+ RootAllocator rootAlloc) {
+ super(desc, icebergField, rootAlloc);
+ }
+
+ protected void nextNullAt(int ordinal) {
+ ((TimeStampMicroTZVector) getVector()).setNull(ordinal);
+ }
+
+ protected void nextValueAt(int ordinal) {
+
+ long longValue = column.nextLong();
+ ((TimeStampMicroTZVector) getVector()).setSafe(ordinal, longValue);
+
+ }
+ }
+
+ protected static class BooleanReader extends VectorReader {
+
+ BooleanReader(ColumnDescriptor desc,
+ Types.NestedField icebergField,
+ RootAllocator rootAlloc) {
+ super(desc, icebergField, rootAlloc);
+ }
+
+ protected void nextNullAt(int ordinal) {
+ ((BitVector) getVector()).setNull(ordinal);
+ }
+
+ protected void nextValueAt(int ordinal) {
+
+ boolean bool = column.nextBoolean();
+ ((BitVector) getVector()).setSafe(ordinal, bool ? 1 : 0);
+
+ }
+ }
+
+
+ protected static class FloatReader extends VectorReader {
+
+ FloatReader(ColumnDescriptor desc,
+ Types.NestedField icebergField,
+ RootAllocator rootAlloc) {
+ super(desc, icebergField, rootAlloc);
+ }
+
+ protected void nextNullAt(int ordinal) {
+ ((Float4Vector) getVector()).setNull(ordinal);
+ }
+
+ protected void nextValueAt(int ordinal) {
+
+ float floatValue = column.nextFloat();
+ ((Float4Vector) getVector()).setSafe(ordinal, floatValue);
+
+ }
+ }
+
+ protected static class DoubleReader extends VectorReader {
+
+ DoubleReader(ColumnDescriptor desc,
+ Types.NestedField icebergField,
+ RootAllocator rootAlloc) {
+ super(desc, icebergField, rootAlloc);
+ }
+
+ protected void nextNullAt(int ordinal) {
+ ((Float8Vector) getVector()).setNull(ordinal);
+ }
+
+ protected void nextValueAt(int ordinal) {
+
+ double doubleValue = column.nextDouble();
+ ((Float8Vector) getVector()).setSafe(ordinal, doubleValue);
+
+ }
+ }
+
+
+ protected static class BinaryReader extends VectorReader {
+
+ BinaryReader(ColumnDescriptor desc,
+ Types.NestedField icebergField,
+ RootAllocator rootAlloc) {
+ super(desc, icebergField, rootAlloc);
+ }
+
+ protected void nextNullAt(int ordinal) {
+ ((VarBinaryVector) getVector()).setNull(ordinal);
+ }
+
+ protected void nextValueAt(int ordinal) {
+
+ Binary binaryValue = column.nextBinary();
+ ((VarBinaryVector) getVector()).setSafe(ordinal, binaryValue.getBytes());
+
+ }
+ }
+
+
+ protected static class DateReader extends VectorReader {
+
+ DateReader(ColumnDescriptor desc,
+ Types.NestedField icebergField,
+ RootAllocator rootAlloc) {
+ super(desc, icebergField, rootAlloc);
+ }
+
+ protected void nextNullAt(int ordinal) {
+ ((DateDayVector) getVector()).setNull(ordinal);
+ }
+
+ protected void nextValueAt(int ordinal) {
+
+ int dateValue = column.nextInteger();
+ ((DateDayVector) getVector()).setSafe(ordinal, dateValue);
+
+ }
+ }
+
+
+ protected static class IntegerDecimalReader extends VectorReader {
+ private final int precision;
+ private final int scale;
+
+ IntegerDecimalReader(ColumnDescriptor desc,
+ Types.NestedField icebergField,
+ RootAllocator rootAlloc,
+ int precision, int scale) {
+
+ super(desc, icebergField, rootAlloc);
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ protected void nextNullAt(int ordinal) {
+ ((DecimalVector) getVector()).setNull(ordinal);
+ }
+
+ protected void nextValueAt(int ordinal) {
+
+ int decimalIntValue = column.nextInteger();
+ Decimal decimalValue = Decimal.apply(decimalIntValue, precision, scale);
+
+ ((DecimalVector) getVector()).setSafe(ordinal,
decimalValue.toJavaBigDecimal());
+
+ }
+ }
+
+ protected static class LongDecimalReader extends VectorReader {
+ private final int precision;
+ private final int scale;
+
+ LongDecimalReader(ColumnDescriptor desc,
+ Types.NestedField icebergField,
+ RootAllocator rootAlloc,
+ int precision, int scale) {
+
+ super(desc, icebergField, rootAlloc);
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ protected void nextNullAt(int ordinal) {
+ ((DecimalVector) getVector()).setNull(ordinal);
+ }
+
+ protected void nextValueAt(int ordinal) {
+
+ long decimalLongValue = column.nextLong();
+ Decimal decimalValue = Decimal.apply(decimalLongValue, precision, scale);
+
+ ((DecimalVector) getVector()).setSafe(ordinal,
decimalValue.toJavaBigDecimal());
+
+ }
+ }
+
+ protected static class BinaryDecimalReader extends VectorReader {
+ private final int precision;
+ private final int scale;
+
+ BinaryDecimalReader(ColumnDescriptor desc,
+ Types.NestedField icebergField,
+ RootAllocator rootAlloc,
+ int precision, int scale) {
+
+ super(desc, icebergField, rootAlloc);
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ protected void nextNullAt(int ordinal) {
+ ((DecimalVector) getVector()).setNull(ordinal);
+ }
+
+ protected void nextValueAt(int ordinal) {
+
+ Binary binaryValue = column.nextBinary();
+ Decimal decimalValue = Decimal.fromDecimal(new BigDecimal(new
BigInteger(binaryValue.getBytes()), scale));
+
+ ((DecimalVector) getVector()).setSafe(ordinal,
decimalValue.toJavaBigDecimal());
+
+ }
+ }
+}
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java
b/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java
new file mode 100644
index 0000000..f283cda
--- /dev/null
+++
b/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.vector;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.arrow.ArrowSchemaUtil;
+import org.apache.iceberg.parquet.ParquetValueReader;
+import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+public class VectorizedSparkParquetReaders {
+
+ private VectorizedSparkParquetReaders() {
+ }
+
+ @SuppressWarnings("unchecked")
+ public static ParquetValueReader<ColumnarBatch> buildReader(
+ Schema tableSchema,
+ Schema expectedSchema,
+ MessageType fileSchema) {
+
+ return (ParquetValueReader<ColumnarBatch>)
+ TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+ new ReadBuilder(tableSchema, expectedSchema, fileSchema));
+ }
+
+ private static class ReadBuilder extends
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+ private final MessageType parquetSchema;
+ private final Schema projectedIcebergSchema;
+ private final Schema tableIcebergSchema;
+ private final org.apache.arrow.vector.types.pojo.Schema arrowSchema;
+ private final RootAllocator rootAllocator;
+
+ ReadBuilder(Schema tableSchema, Schema projectedIcebergSchema, MessageType
parquetSchema) {
+ this.parquetSchema = parquetSchema;
+ this.tableIcebergSchema = tableSchema;
+ this.projectedIcebergSchema = projectedIcebergSchema;
+ this.arrowSchema = ArrowSchemaUtil.convert(projectedIcebergSchema);
+ this.rootAllocator = new RootAllocator(Long.MAX_VALUE);
+ }
+
+ @Override
+ public ParquetValueReader<?> message(Types.StructType expected,
MessageType message,
+ List<ParquetValueReader<?>> fieldReaders) {
+ return struct(expected, message.asGroupType(), fieldReaders);
+ }
+
+ @Override
+ public ParquetValueReader<?> struct(Types.StructType expected, GroupType
struct,
+ List<ParquetValueReader<?>> fieldReaders) {
+
+ // this works on struct fields and the root iceberg schema which itself
is a struct.
+
+ // match the expected struct's order
+ Map<Integer, ParquetValueReader<FieldVector>> readersById =
Maps.newHashMap();
+ Map<Integer, Type> typesById = Maps.newHashMap();
+ List<Type> fields = struct.getFields();
+
+ for (int i = 0; i < fields.size(); i += 1) {
+ Type fieldType = fields.get(i);
+ int fieldD =
parquetSchema.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
+ int id = fieldType.getId().intValue();
+ // Todo: figure out optional vield reading for vectorized reading
+ // readersById.put(id,
(ParquetValueReader<FieldVector>)ParquetValueReaders.
+ // option(fieldType, fieldD, fieldReaders.get(i)));
+
+ readersById.put(id, (ParquetValueReader<FieldVector>)
fieldReaders.get(i));
+ typesById.put(id, fieldType);
+ }
+
+ List<Types.NestedField> icebergFields = expected != null ?
+ expected.fields() : ImmutableList.of();
+
+ List<ParquetValueReader<FieldVector>> reorderedFields =
Lists.newArrayListWithExpectedSize(
+ icebergFields.size());
+
+ List<Type> types =
Lists.newArrayListWithExpectedSize(icebergFields.size());
+
+ for (Types.NestedField field : icebergFields) {
+ int id = field.fieldId();
+ ParquetValueReader<FieldVector> reader = readersById.get(id);
+ if (reader != null) {
+ reorderedFields.add(reader);
+ types.add(typesById.get(id));
+ } else {
+ reorderedFields.add(ParquetValueReaders.nulls());
+ types.add(null);
+ }
+ }
+
+ return new ParquetValueReaders.ColumnarBatchReader(types, expected,
reorderedFields);
+ }
+
+
+ @Override
+ public ParquetValueReader<?>
primitive(org.apache.iceberg.types.Type.PrimitiveType expected,
+ PrimitiveType primitive) {
+
+ // Create arrow vector for this field
+ int parquetFieldId = primitive.getId().intValue();
+ ColumnDescriptor desc =
parquetSchema.getColumnDescription(currentPath());
+ Types.NestedField icebergField =
tableIcebergSchema.findField(parquetFieldId);
+ // int fieldD =
parquetSchema.getMaxDefinitionLevel(path(primitive.getName())) - 1;
+ // Field field =
ArrowSchemaUtil.convert(projectedIcebergSchema.findField(parquetFieldId));
+ // FieldVector vec = field.createVector(rootAllocator);
+
+ if (primitive.getOriginalType() != null) {
+ switch (primitive.getOriginalType()) {
+ case ENUM:
+ case JSON:
+ case UTF8:
+ return new VectorizedParquetValueReaders.StringReader(desc,
icebergField, rootAllocator);
+ case INT_8:
+ case INT_16:
+ case INT_32:
+ return new VectorizedParquetValueReaders.IntegerReader(desc,
icebergField, rootAllocator);
+ // if (expected != null && expected.typeId() ==
Types.LongType.get().typeId()) {
+ // return new ParquetValueReaders.IntAsLongReader(desc);
+ // } else {
+ // return new ParquetValueReaders.UnboxedReader(desc);
+ // }
+ case DATE:
+ return new VectorizedParquetValueReaders.DateReader(desc,
icebergField, rootAllocator);
+ case INT_64:
+ return new VectorizedParquetValueReaders.LongReader(desc,
icebergField, rootAllocator);
+ case TIMESTAMP_MICROS:
+ return new
VectorizedParquetValueReaders.TimestampMicroReader(desc, icebergField,
rootAllocator);
+ case TIMESTAMP_MILLIS:
+ return new
VectorizedParquetValueReaders.TimestampMillisReader(desc, icebergField,
rootAllocator);
+ case DECIMAL:
+ DecimalMetadata decimal = primitive.getDecimalMetadata();
+ switch (primitive.getPrimitiveTypeName()) {
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ return new
VectorizedParquetValueReaders.BinaryDecimalReader(desc, icebergField,
rootAllocator,
+ decimal.getPrecision(),
+ decimal.getScale());
+ case INT64:
+ return new
VectorizedParquetValueReaders.LongDecimalReader(desc, icebergField,
rootAllocator,
+ decimal.getPrecision(),
+ decimal.getScale());
+ case INT32:
+ return new
VectorizedParquetValueReaders.IntegerDecimalReader(desc, icebergField,
rootAllocator,
+ decimal.getPrecision(),
+ decimal.getScale());
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported base type for decimal: " +
primitive.getPrimitiveTypeName());
+ }
+ case BSON:
+ return new VectorizedParquetValueReaders.BinaryReader(desc,
icebergField, rootAllocator);
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported logical type: " + primitive.getOriginalType());
+ }
+ }
+
+ switch (primitive.getPrimitiveTypeName()) {
+ case FIXED_LEN_BYTE_ARRAY:
+ case BINARY:
+ return new VectorizedParquetValueReaders.BinaryReader(desc,
icebergField, rootAllocator);
+ case INT32:
+ return new VectorizedParquetValueReaders.IntegerReader(desc,
icebergField, rootAllocator);
+ case FLOAT:
+ return new VectorizedParquetValueReaders.FloatReader(desc,
icebergField, rootAllocator);
+ // if (expected != null && expected.typeId() ==
org.apache.iceberg.types.Type.TypeID.DOUBLE) {
+ // return new ParquetValueReaders.FloatAsDoubleReader(desc);
+ // } else {
+ // return new ParquetValueReaders.UnboxedReader<>(desc);
+ // }
+ case BOOLEAN:
+ return new VectorizedParquetValueReaders.BooleanReader(desc,
icebergField, rootAllocator);
+ case INT64:
+ return new VectorizedParquetValueReaders.LongReader(desc,
icebergField, rootAllocator);
+ case DOUBLE:
+ return new VectorizedParquetValueReaders.DoubleReader(desc,
icebergField, rootAllocator);
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " +
primitive);
+ }
+ }
+
+ private String[] currentPath() {
+ String[] path = new String[fieldNames.size()];
+ if (!fieldNames.isEmpty()) {
+ Iterator<String> iter = fieldNames.descendingIterator();
+ for (int i = 0; iter.hasNext(); i += 1) {
+ path[i] = iter.next();
+ }
+ }
+
+ return path;
+ }
+
+ protected MessageType type() {
+ return parquetSchema;
+ }
+
+ protected String[] path(String name) {
+ String[] path = new String[fieldNames.size() + 1];
+ path[fieldNames.size()] = name;
+
+ if (!fieldNames.isEmpty()) {
+ Iterator<String> iter = fieldNames.descendingIterator();
+ for (int i = 0; iter.hasNext(); i += 1) {
+ path[i] = iter.next();
+ }
+ }
+
+ return path;
+ }
+ }
+}
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index b6dfade..8229b12 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -26,10 +26,7 @@ import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
-import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.hive.HiveCatalog;
-import org.apache.iceberg.hive.HiveCatalogs;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.CheckCompatibility;
import org.apache.spark.sql.SaveMode;
@@ -46,11 +43,14 @@ import
org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport,
DataSourceRegister, StreamWriteSupport {
private SparkSession lazySpark = null;
private Configuration lazyConf = null;
+ private static final Logger LOG =
LoggerFactory.getLogger(IcebergSource.class);
@Override
public String shortName() {
@@ -98,31 +98,31 @@ public class IcebergSource implements DataSourceV2,
ReadSupport, WriteSupport, D
Optional<String> path = options.get("path");
Preconditions.checkArgument(path.isPresent(), "Cannot open table: path is
not set");
- if (path.get().contains("/")) {
- HadoopTables tables = new HadoopTables(conf);
- return tables.load(path.get());
- } else {
- HiveCatalog hiveCatalog = HiveCatalogs.loadCatalog(conf);
- TableIdentifier tableIdentifier = TableIdentifier.parse(path.get());
- return hiveCatalog.loadTable(tableIdentifier);
- }
+ // if (path.get().contains("/")) {
+ HadoopTables tables = new HadoopTables(conf);
+ return tables.load(path.get());
+ // } else {
+ // HiveCatalog hiveCatalog = HiveCatalogs.loadCatalog(conf);
+ // TableIdentifier tableIdentifier = TableIdentifier.parse(path.get());
+ // return hiveCatalog.loadTable(tableIdentifier);
+ // }
}
- private SparkSession lazySparkSession() {
+ protected SparkSession lazySparkSession() {
if (lazySpark == null) {
this.lazySpark = SparkSession.builder().getOrCreate();
}
return lazySpark;
}
- private Configuration lazyBaseConf() {
+ protected Configuration lazyBaseConf() {
if (lazyConf == null) {
this.lazyConf = lazySparkSession().sparkContext().hadoopConfiguration();
}
return lazyConf;
}
- private Table getTableAndResolveHadoopConfiguration(
+ protected Table getTableAndResolveHadoopConfiguration(
DataSourceOptions options, Configuration conf) {
// Overwrite configurations from the Spark Context with configurations
from the options.
mergeIcebergHadoopConfs(conf, options.asMap());
@@ -134,7 +134,7 @@ public class IcebergSource implements DataSourceV2,
ReadSupport, WriteSupport, D
return table;
}
- private static void mergeIcebergHadoopConfs(
+ protected static void mergeIcebergHadoopConfs(
Configuration baseConf, Map<String, String> options) {
options.keySet().stream()
.filter(key -> key.startsWith("hadoop."))
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index 91359ff..75d72f1 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -22,7 +22,6 @@ package org.apache.iceberg.spark.source;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import java.io.Closeable;
import java.io.IOException;
@@ -31,6 +30,7 @@ import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import org.apache.iceberg.CombinedScanTask;
@@ -59,15 +59,13 @@ import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.SparkAvroReader;
import org.apache.iceberg.spark.data.SparkOrcReader;
-import org.apache.iceberg.spark.data.SparkParquetReaders;
-import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.spark.data.vector.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.catalyst.expressions.JoinedRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
@@ -78,6 +76,7 @@ import org.apache.spark.sql.sources.v2.reader.Statistics;
import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
import org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics;
+import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
@@ -85,14 +84,15 @@ import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.unsafe.types.UTF8String;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
-class Reader implements DataSourceReader, SupportsPushDownFilters,
SupportsPushDownRequiredColumns,
+class Reader implements DataSourceReader,
+ SupportsScanColumnarBatch,
+ SupportsPushDownFilters,
+ SupportsPushDownRequiredColumns,
SupportsReportStatistics {
- private static final Logger LOG = LoggerFactory.getLogger(Reader.class);
private static final Filter[] NO_FILTERS = new Filter[0];
@@ -102,23 +102,33 @@ class Reader implements DataSourceReader,
SupportsPushDownFilters, SupportsPushD
private final FileIO fileIo;
private final EncryptionManager encryptionManager;
private final boolean caseSensitive;
+ private int numRecordsPerBatch;
private StructType requestedSchema = null;
private List<Expression> filterExpressions = null;
private Filter[] pushedFilters = NO_FILTERS;
// lazy variables
- private Schema schema = null;
+ private Schema schema;
private StructType type = null; // cached because Spark accesses it multiple
times
private List<CombinedScanTask> tasks = null; // lazy cache of tasks
+ private static final int DEFAULT_NUM_RECORDS_PER_BATCH = 1000;
Reader(Table table, boolean caseSensitive, DataSourceOptions options) {
this.table = table;
this.snapshotId =
options.get("snapshot-id").map(Long::parseLong).orElse(null);
this.asOfTimestamp =
options.get("as-of-timestamp").map(Long::parseLong).orElse(null);
+ Optional<String> numRecordsPerBatchOpt =
options.get("iceberg.read.numrecordsperbatch");
+ this.numRecordsPerBatch = DEFAULT_NUM_RECORDS_PER_BATCH;
+ if (numRecordsPerBatchOpt.isPresent()) {
+ this.numRecordsPerBatch = Integer.parseInt(numRecordsPerBatchOpt.get());
+ }
+ // LOG.info("[IcebergSource] => Reading numRecordsPerBatch =
"+numRecordsPerBatch);
+
if (snapshotId != null && asOfTimestamp != null) {
throw new IllegalArgumentException(
"Cannot scan using both snapshot-id and as-of-timestamp to select
the table snapshot");
}
+
this.schema = table.schema();
this.fileIo = table.io();
this.encryptionManager = table.encryption();
@@ -149,20 +159,37 @@ class Reader implements DataSourceReader,
SupportsPushDownFilters, SupportsPushD
}
@Override
- public List<InputPartition<InternalRow>> planInputPartitions() {
+ public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
String tableSchemaString = SchemaParser.toJson(table.schema());
String expectedSchemaString = SchemaParser.toJson(lazySchema());
- List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
+ List<InputPartition<ColumnarBatch>> readTasks = Lists.newArrayList();
for (CombinedScanTask task : tasks()) {
readTasks.add(
- new ReadTask(task, tableSchemaString, expectedSchemaString, fileIo,
encryptionManager, caseSensitive));
+ new ReadTask(task, tableSchemaString, expectedSchemaString, fileIo,
encryptionManager,
+ caseSensitive, numRecordsPerBatch));
}
return readTasks;
}
@Override
+ public List<InputPartition<InternalRow>> planInputPartitions() {
+ return null;
+ // String tableSchemaString = SchemaParser.toJson(table.schema());
+ // String expectedSchemaString = SchemaParser.toJson(lazySchema());
+ //
+ // List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
+ // for (CombinedScanTask task : tasks()) {
+ // readTasks.add(
+ // new ReadTask(task, tableSchemaString, expectedSchemaString,
fileIo, encryptionManager,
+ // caseSensitive, numRecordsPerBatch));
+ // }
+ //
+ // return readTasks;
+ }
+
+ @Override
public Filter[] pushFilters(Filter[] filters) {
this.tasks = null; // invalidate cached tasks, if present
@@ -256,32 +283,34 @@ class Reader implements DataSourceReader,
SupportsPushDownFilters, SupportsPushD
table, lazySchema().asStruct(), filterExpressions, caseSensitive);
}
- private static class ReadTask implements InputPartition<InternalRow>,
Serializable {
+ private static class ReadTask implements InputPartition<ColumnarBatch>,
Serializable {
private final CombinedScanTask task;
private final String tableSchemaString;
private final String expectedSchemaString;
private final FileIO fileIo;
private final EncryptionManager encryptionManager;
private final boolean caseSensitive;
+ private final int numRecordsPerBatch;
private transient Schema tableSchema = null;
private transient Schema expectedSchema = null;
private ReadTask(
CombinedScanTask task, String tableSchemaString, String
expectedSchemaString, FileIO fileIo,
- EncryptionManager encryptionManager, boolean caseSensitive) {
+ EncryptionManager encryptionManager, boolean caseSensitive, int
numRecordsPerBatch) {
this.task = task;
this.tableSchemaString = tableSchemaString;
this.expectedSchemaString = expectedSchemaString;
this.fileIo = fileIo;
this.encryptionManager = encryptionManager;
this.caseSensitive = caseSensitive;
+ this.numRecordsPerBatch = numRecordsPerBatch;
}
@Override
- public InputPartitionReader<InternalRow> createPartitionReader() {
+ public InputPartitionReader<ColumnarBatch> createPartitionReader() {
return new TaskDataReader(task, lazyTableSchema(), lazyExpectedSchema(),
fileIo,
- encryptionManager, caseSensitive);
+ encryptionManager, caseSensitive, numRecordsPerBatch);
}
private Schema lazyTableSchema() {
@@ -299,7 +328,7 @@ class Reader implements DataSourceReader,
SupportsPushDownFilters, SupportsPushD
}
}
- private static class TaskDataReader implements
InputPartitionReader<InternalRow> {
+ private static class TaskDataReader implements
InputPartitionReader<ColumnarBatch> {
// for some reason, the apply method can't be called from Java without
reflection
private static final DynMethods.UnboundMethod APPLY_PROJECTION =
DynMethods.builder("apply")
.impl(UnsafeProjection.class, InternalRow.class)
@@ -311,13 +340,14 @@ class Reader implements DataSourceReader,
SupportsPushDownFilters, SupportsPushD
private final FileIO fileIo;
private final Map<String, InputFile> inputFiles;
private final boolean caseSensitive;
+ private final int numRecordsPerBatch;
- private Iterator<InternalRow> currentIterator = null;
+ private Iterator<ColumnarBatch> currentIterator;
private Closeable currentCloseable = null;
- private InternalRow current = null;
+ private ColumnarBatch current = null;
TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema
expectedSchema, FileIO fileIo,
- EncryptionManager encryptionManager, boolean caseSensitive)
{
+ EncryptionManager encryptionManager, boolean
caseSensitive, int numRecordsPerBatch) {
this.fileIo = fileIo;
this.tasks = task.files().iterator();
this.tableSchema = tableSchema;
@@ -333,6 +363,7 @@ class Reader implements DataSourceReader,
SupportsPushDownFilters, SupportsPushD
// open last because the schemas and fileIo must be set
this.currentIterator = open(tasks.next());
this.caseSensitive = caseSensitive;
+ this.numRecordsPerBatch = numRecordsPerBatch;
}
@Override
@@ -353,7 +384,7 @@ class Reader implements DataSourceReader,
SupportsPushDownFilters, SupportsPushD
}
@Override
- public InternalRow get() {
+ public ColumnarBatch get() {
return current;
}
@@ -368,7 +399,7 @@ class Reader implements DataSourceReader,
SupportsPushDownFilters, SupportsPushD
}
}
- private Iterator<InternalRow> open(FileScanTask task) {
+ private Iterator<ColumnarBatch> open(FileScanTask task) {
DataFile file = task.file();
// schema or rows returned by readers
@@ -383,23 +414,24 @@ class Reader implements DataSourceReader,
SupportsPushDownFilters, SupportsPushD
boolean hasExtraFilterColumns = requiredSchema.columns().size() !=
finalSchema.columns().size();
Schema iterSchema;
- Iterator<InternalRow> iter;
-
- if (hasJoinedPartitionColumns) {
- // schema used to read data files
- Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns);
- Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns);
- PartitionRowConverter convertToRow = new
PartitionRowConverter(partitionSchema, spec);
- JoinedRow joined = new JoinedRow();
-
- InternalRow partition = convertToRow.apply(file.partition());
- joined.withRight(partition);
-
- // create joined rows and project from the joined schema to the final
schema
- iterSchema = TypeUtil.join(readSchema, partitionSchema);
- iter = Iterators.transform(open(task, readSchema), joined::withLeft);
-
- } else if (hasExtraFilterColumns) {
+ Iterator<ColumnarBatch> iter;
+
+ // if (hasJoinedPartitionColumns) {
+ // // schema used to read data files
+ // Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns);
+ // Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns);
+ // PartitionRowConverter convertToRow = new
PartitionRowConverter(partitionSchema, spec);
+ // JoinedRow joined = new JoinedRow();
+ //
+ // InternalRow partition = convertToRow.apply(file.partition());
+ // joined.withRight(partition);
+ //
+ // // create joined rows and project from the joined schema to the
final schema
+ // iterSchema = TypeUtil.join(readSchema, partitionSchema);
+ // iter = Iterators.transform(open(task, readSchema),
joined::withLeft);
+ //
+ // } else if (hasExtraFilterColumns) {
+ if (hasExtraFilterColumns) {
// add projection to the final schema
iterSchema = requiredSchema;
iter = open(task, requiredSchema);
@@ -411,36 +443,37 @@ class Reader implements DataSourceReader,
SupportsPushDownFilters, SupportsPushD
}
// TODO: remove the projection by reporting the iterator's schema back
to Spark
- return Iterators.transform(iter,
- APPLY_PROJECTION.bind(projection(finalSchema, iterSchema))::invoke);
+ // return Iterators.transform(iter,
+ // APPLY_PROJECTION.bind(projection(finalSchema,
iterSchema))::invoke);
+ return iter;
}
- private Iterator<InternalRow> open(FileScanTask task, Schema readSchema) {
- CloseableIterable<InternalRow> iter;
- if (task.isDataTask()) {
- iter = newDataIterable(task.asDataTask(), readSchema);
-
- } else {
- InputFile location = inputFiles.get(task.file().path().toString());
- Preconditions.checkNotNull(location, "Could not find InputFile
associated with FileScanTask");
-
- switch (task.file().format()) {
- case PARQUET:
- iter = newParquetIterable(location, task, readSchema);
- break;
-
- case AVRO:
- iter = newAvroIterable(location, task, readSchema);
- break;
-
- case ORC:
- iter = newOrcIterable(location, task, readSchema);
- break;
-
- default:
- throw new UnsupportedOperationException(
- "Cannot read unknown format: " + task.file().format());
- }
+ private Iterator<ColumnarBatch> open(FileScanTask task, Schema readSchema)
{
+ CloseableIterable<ColumnarBatch> iter;
+ // if (task.isDataTask()) {
+ // iter = newDataIterable(task.asDataTask(), readSchema);
+ //
+ // } else {
+ InputFile location = inputFiles.get(task.file().path().toString());
+ Preconditions.checkNotNull(location, "Could not find InputFile
associated with FileScanTask");
+
+ switch (task.file().format()) {
+ case PARQUET:
+ iter = newParquetIterable(location, task, readSchema);
+ break;
+ //
+ // case AVRO:
+ // iter = newAvroIterable(location, task, readSchema);
+ // break;
+ //
+ // case ORC:
+ // iter = newOrcIterable(location, task, readSchema);
+ // break;
+ //
+ default:
+ throw new UnsupportedOperationException(
+ "Cannot read unknown format: " + task.file().format());
+ // }
}
this.currentCloseable = iter;
@@ -481,15 +514,17 @@ class Reader implements DataSourceReader,
SupportsPushDownFilters, SupportsPushD
.build();
}
- private CloseableIterable<InternalRow> newParquetIterable(InputFile
location,
+ private CloseableIterable<ColumnarBatch> newParquetIterable(InputFile
location,
FileScanTask task,
Schema readSchema)
{
return Parquet.read(location)
- .project(readSchema)
+ .project(readSchema, SparkSchemaUtil.convert(readSchema))
.split(task.start(), task.length())
- .createReaderFunc(fileSchema ->
SparkParquetReaders.buildReader(readSchema, fileSchema))
+ .createReaderFunc(fileSchema ->
VectorizedSparkParquetReaders.buildReader(tableSchema, readSchema,
+ fileSchema))
.filter(task.residual())
.caseSensitive(caseSensitive)
+ .recordsPerBatch(numRecordsPerBatch)
.build();
}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java
b/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java
index 4b89526..080c0d1 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java
@@ -55,7 +55,9 @@ public class RandomData {
RandomDataGenerator generator = new RandomDataGenerator(schema, seed);
List<Record> records = Lists.newArrayListWithExpectedSize(numRecords);
for (int i = 0; i < numRecords; i += 1) {
- records.add((Record) TypeUtil.visit(schema, generator));
+ Record rec = (Record) TypeUtil.visit(schema, generator);
+ // System.out.println("Add record "+rec);
+ records.add(rec);
}
return records;
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index c20bc67..f5ee124 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -53,6 +53,7 @@ import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.unsafe.types.UTF8String;
import org.junit.Assert;
import scala.collection.Seq;
@@ -64,7 +65,8 @@ import static
scala.collection.JavaConverters.seqAsJavaListConverter;
@SuppressWarnings("checkstyle:OverloadMethodsDeclarationOrder")
public class TestHelpers {
- private TestHelpers() {}
+ private TestHelpers() {
+ }
public static void assertEqualsSafe(Types.StructType struct, Record rec, Row
row) {
List<Types.NestedField> fields = struct.fields();
@@ -205,6 +207,33 @@ public class TestHelpers {
}
}
+ public static void assertEqualsUnsafe(Types.StructType struct, List<Record>
expected, ColumnarBatch batch) {
+ List<Types.NestedField> fields = struct.fields();
+ for (int r = 0; r < batch.numRows(); r++) {
+
+ Record expRec = expected.get(r);
+ InternalRow actualRow = batch.getRow(r);
+
+ for (int i = 0; i < fields.size(); i += 1) {
+
+ Type fieldType = fields.get(i).type();
+ Object expectedValue = expRec.get(i);
+ // System.out.println(" -> Checking Row "+r+", field #"+i
+ // + " , Field:"+ fields.get(i).name()
+ // + " , optional:"+fields.get(i).isOptional()
+ // + " , type:"+fieldType.typeId()
+ // + " , expected:"+expectedValue);
+ if (actualRow.isNullAt(i)) {
+
+ Assert.assertTrue("Expect null", expectedValue == null);
+ } else {
+ Object actualValue = actualRow.get(i, convert(fieldType));
+ assertEqualsUnsafe(fieldType, expectedValue, actualValue);
+ }
+ }
+ }
+ }
+
private static void assertEqualsUnsafe(Types.ListType list, Collection<?>
expected, ArrayData actual) {
Type elementType = list.elementType();
List<?> expectedElements = Lists.newArrayList(expected);
diff --git
a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetVectorizedReader.java
b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetVectorizedReader.java
new file mode 100644
index 0000000..da9a466
--- /dev/null
+++
b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetVectorizedReader.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.spark.data.vector.VectorizedSparkParquetReaders;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe;
+
+public class TestSparkParquetVectorizedReader extends AvroDataTest {
+
+ @Override
+ protected void writeAndValidate(Schema schema) throws IOException {
+
+ // Write test data
+ Assume.assumeTrue("Parquet Avro cannot write non-string map keys", null ==
TypeUtil.find(schema,
+ type -> type.isMapType() && type.asMapType().keyType() !=
Types.StringType.get()));
+
+ List<GenericData.Record> expected = RandomData.generateList(schema, 100,
0L);
+
+ // write a test parquet file using iceberg writer
+ File testFile = temp.newFile();
+ Assert.assertTrue("Delete should succeed", testFile.delete());
+
+ try (FileAppender<GenericData.Record> writer =
Parquet.write(Files.localOutput(testFile))
+ .schema(schema)
+ .named("test")
+ .build()) {
+ writer.addAll(expected);
+ }
+
+
+ try (CloseableIterable<ColumnarBatch> batchReader =
Parquet.read(Files.localInput(testFile))
+ .project(schema)
+ .createReaderFunc(type ->
VectorizedSparkParquetReaders.buildReader(schema, schema, type))
+ .build()) {
+
+ Iterator<ColumnarBatch> batches = batchReader.iterator();
+ int numRowsRead = 0;
+ int numExpectedRead = 0;
+ while (batches.hasNext()) {
+
+ ColumnarBatch batch = batches.next();
+ numRowsRead += batch.numRows();
+
+ List<GenericData.Record> expectedBatch = new
ArrayList<>(batch.numRows());
+ for (int i = numExpectedRead; i < numExpectedRead + batch.numRows();
i++) {
+ expectedBatch.add(expected.get(i));
+ }
+
+ // System.out.println("-> Check "+numExpectedRead+" - "+
(numExpectedRead+batch.numRows()));
+ assertEqualsUnsafe(schema.asStruct(), expectedBatch, batch);
+
+ System.out.println("Batch read with " + batch.numRows() + " rows. Read
" + numRowsRead + " till now. " +
+ "Expected batch " + expectedBatch.size());
+
+ numExpectedRead += batch.numRows();
+ }
+
+ Assert.assertEquals(expected.size(), numRowsRead);
+
+ }
+ }
+
+ @Test
+ public void testArray() throws IOException {
+ System.out.println("Not Supported");
+ }
+
+ @Test
+ public void testArrayOfStructs() throws IOException {
+ System.out.println("Not Supported");
+ }
+
+ @Test
+ public void testMap() throws IOException {
+ System.out.println("Not Supported");
+ }
+
+ @Test
+ public void testNumericMapKey() throws IOException {
+ System.out.println("Not Supported");
+ }
+
+ @Test
+ public void testComplexMapKey() throws IOException {
+ System.out.println("Not Supported");
+ }
+
+ @Test
+ public void testMapOfStructs() throws IOException {
+ System.out.println("Not Supported");
+ }
+
+ @Test
+ public void testMixedTypes() throws IOException {
+ System.out.println("Not Supported");
+ }
+}
diff --git a/versions.lock b/versions.lock
index b7e574d..2f314a2 100644
--- a/versions.lock
+++ b/versions.lock
@@ -7,16 +7,17 @@ com.carrotsearch:hppc:0.7.2 (1 constraints: f70cda14)
com.clearspring.analytics:stream:2.7.0 (1 constraints: 1a0dd136)
com.esotericsoftware:kryo-shaded:4.0.2 (2 constraints: b71345a6)
com.esotericsoftware:minlog:1.3.0 (1 constraints: 670e7c4f)
-com.fasterxml.jackson.core:jackson-annotations:2.7.9 (4 constraints: f24786bf)
+com.fasterxml.jackson.core:jackson-annotations:2.7.9 (5 constraints: f154e19f)
com.fasterxml.jackson.core:jackson-core:2.7.9 (5 constraints: d748db55)
-com.fasterxml.jackson.core:jackson-databind:2.7.9 (8 constraints: a77bca51)
+com.fasterxml.jackson.core:jackson-databind:2.7.9 (9 constraints: a688bc53)
com.fasterxml.jackson.module:jackson-module-paranamer:2.7.9 (1 constraints:
e0154200)
com.fasterxml.jackson.module:jackson-module-scala_2.11:2.7.9 (1 constraints:
7f0da251)
com.github.ben-manes.caffeine:caffeine:2.7.0 (1 constraints: 0b050a36)
com.github.luben:zstd-jni:1.3.2-2 (1 constraints: 760d7c51)
-com.google.code.findbugs:jsr305:3.0.2 (10 constraints: c483db75)
+com.google.code.findbugs:jsr305:3.0.2 (9 constraints: d276cf3c)
com.google.code.gson:gson:2.2.4 (1 constraints: 8c0d3f2f)
com.google.errorprone:error_prone_annotations:2.3.3 (2 constraints: 161a2544)
+com.google.flatbuffers:flatbuffers-java:1.9.0 (2 constraints: e5199714)
com.google.guava:failureaccess:1.0.1 (1 constraints: 140ae1b4)
com.google.guava:guava:28.0-jre (23 constraints: cc5c2ea0)
com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava (1
constraints: bd17c918)
@@ -40,7 +41,6 @@ com.twitter:chill-java:0.9.3 (2 constraints: a716716f)
com.twitter:chill_2.11:0.9.3 (2 constraints: 121b92c3)
com.twitter:parquet-hadoop-bundle:1.6.0 (3 constraints: 7c262424)
com.univocity:univocity-parsers:2.7.3 (1 constraints: c40ccb27)
-com.vlkan:flatbuffers:1.2.0-3f79e055 (2 constraints: 411e1dee)
commons-beanutils:commons-beanutils:1.7.0 (1 constraints: da0e635f)
commons-beanutils:commons-beanutils-core:1.8.0 (1 constraints: 1d134124)
commons-cli:commons-cli:1.2 (8 constraints: 9467c282)
@@ -66,6 +66,7 @@ io.dropwizard.metrics:metrics-json:3.1.5 (1 constraints:
1a0dc936)
io.dropwizard.metrics:metrics-jvm:3.1.5 (1 constraints: 1a0dc936)
io.netty:netty:3.9.9.Final (9 constraints: 9eb0396d)
io.netty:netty-all:4.1.17.Final (3 constraints: d2312526)
+it.unimi.dsi:fastutil:7.0.13 (1 constraints: fc0d4043)
javax.activation:activation:1.1.1 (1 constraints: 140dbb36)
javax.annotation:javax.annotation-api:1.2 (2 constraints: 2d21193d)
javax.inject:javax.inject:1 (4 constraints: 852d0c1a)
@@ -94,10 +95,10 @@ org.antlr:antlr4-runtime:4.7 (1 constraints: 7a0e125f)
org.antlr:stringtemplate:3.2.1 (1 constraints: c10a3bc6)
org.apache.ant:ant:1.9.1 (3 constraints: a721ed14)
org.apache.ant:ant-launcher:1.9.1 (1 constraints: 69082485)
-org.apache.arrow:arrow-format:0.10.0 (1 constraints: 1f0de721)
-org.apache.arrow:arrow-memory:0.10.0 (1 constraints: 1f0de721)
-org.apache.arrow:arrow-vector:0.10.0 (1 constraints: e90c9734)
-org.apache.avro:avro:1.8.2 (4 constraints: 3d2eebf3)
+org.apache.arrow:arrow-format:0.12.0 (1 constraints: 210ded21)
+org.apache.arrow:arrow-memory:0.12.0 (1 constraints: 210ded21)
+org.apache.arrow:arrow-vector:0.12.0 (2 constraints: 1d122345)
+org.apache.avro:avro:1.8.2 (5 constraints: 083cf387)
org.apache.avro:avro-ipc:1.8.2 (1 constraints: f90b5bf4)
org.apache.avro:avro-mapred:1.8.2 (2 constraints: 3a1a4787)
org.apache.calcite:calcite-avatica:1.2.0-incubating (4 constraints: a044b922)
diff --git a/versions.props b/versions.props
index 7f71842..80c334e 100644
--- a/versions.props
+++ b/versions.props
@@ -1,6 +1,7 @@
org.slf4j:slf4j-api = 1.7.5
com.google.guava:guava = 28.0-jre
org.apache.avro:avro = 1.8.2
+org.apache.arrow:arrow-vector = 0.12.0
org.apache.hadoop:* = 2.7.3
org.apache.hive:hive-standalone-metastore = 1.2.1
org.apache.orc:orc-core = 1.5.5