This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch vectorized-read
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/vectorized-read by this push:
     new 9578d06  [WIP] Vectorized read using Arrow  (#319)
9578d06 is described below

commit 9578d06719b0d0498dd2b64d94e3baed6f376808
Author: Gautam <[email protected]>
AuthorDate: Fri Jul 26 14:32:10 2019 -0700

    [WIP] Vectorized read using Arrow  (#319)
    
    * First cut impl of reading Parquet FileIterator into ArrowRecordBatch 
based reader
    
    * made num records per arrow batch configurable
    
    * addressed comments
    
    * Added docs for public methods and ArrowReader class
    
    * Fixed javadoc
    
    * WIP first stab at reading into Arrow and returning as InternalRow iterator
    
    * Add publish to snapshot repository by replacing version to 
`1.0-adobe-2.0-SNAPSHOT` (snapshot prefix is required by snapshot repo)
    
    * Adding arrow schema conversion utility
    
    * adding arrow-vector dep to tests
    
    * [WIP] Working vectorization for primitive types. Added test for 
VectorizedSparkParquetReaders.
    
    * [WIP] Added Decimal types to vectorization
    
    * [WIP] added remaining primitive type vectorization and tests
    
    * [WIP] unused imports fixed
    
    * Add argument validation to HadoopTables#create (#298)
    
    * Install source JAR when running install target (#310)
    
    * Bump version to 1.0-adobe-3.0-vectorized-SNAPSHOT
    
    * Temporarily ignore applying style check
    
    * Fixing javadoc error
    
    * Updating versions.lock
    
    * fixed checkstyle errors
    
    * Revert "Bump version to 1.0-adobe-3.0-vectorized-SNAPSHOT"
    
    This reverts commit ceae2fd7c79ce1eaaaa2eed265481cfce6fdce16.
    
    * cleanup
---
 .baseline/checkstyle/.checkstyle.xml.swp           | Bin 45056 -> 0 bytes
 .../apache/iceberg/arrow/reader/ArrowReader.java   | 242 ++++++++++++
 build.gradle                                       |  35 +-
 .../org/apache/iceberg/arrow/ArrowSchemaUtil.java  | 148 ++++++++
 .../org/apache/iceberg/hadoop/HadoopTables.java    |  11 +-
 .../apache/iceberg/arrow/ArrowSchemaUtilTest.java  | 123 ++++++
 .../org/apache/iceberg/data/TableScanIterable.java |   3 +-
 gradle.properties                                  |   4 +
 .../java/org/apache/iceberg/parquet/Parquet.java   |  17 +-
 .../org/apache/iceberg/parquet/ParquetAvro.java    |   6 +-
 .../org/apache/iceberg/parquet/ParquetFilters.java |   4 +-
 .../java/org/apache/iceberg/parquet/ParquetIO.java |   6 +-
 .../apache/iceberg/parquet/ParquetIterable.java    |   2 +-
 .../apache/iceberg/parquet/ParquetReadSupport.java |   4 +-
 .../org/apache/iceberg/parquet/ParquetReader.java  |  41 +-
 .../iceberg/parquet/ParquetValueReaders.java       |  84 +++++
 .../iceberg/parquet/ParquetWriteSupport.java       |   4 +-
 .../org/apache/iceberg/parquet/ParquetWriter.java  |   2 +-
 settings.gradle                                    |   2 +
 .../iceberg/spark/data/SparkParquetReaders.java    |   6 +-
 .../data/vector/VectorizedParquetValueReaders.java | 414 +++++++++++++++++++++
 .../data/vector/VectorizedSparkParquetReaders.java | 245 ++++++++++++
 .../apache/iceberg/spark/source/IcebergSource.java |  30 +-
 .../org/apache/iceberg/spark/source/Reader.java    | 175 +++++----
 .../org/apache/iceberg/spark/data/RandomData.java  |   4 +-
 .../org/apache/iceberg/spark/data/TestHelpers.java |  31 +-
 .../data/TestSparkParquetVectorizedReader.java     | 132 +++++++
 versions.lock                                      |  17 +-
 versions.props                                     |   1 +
 29 files changed, 1672 insertions(+), 121 deletions(-)

diff --git a/.baseline/checkstyle/.checkstyle.xml.swp 
b/.baseline/checkstyle/.checkstyle.xml.swp
deleted file mode 100644
index dac6cf2..0000000
Binary files a/.baseline/checkstyle/.checkstyle.xml.swp and /dev/null differ
diff --git 
a/arrow/src/main/java/org/apache/iceberg/arrow/reader/ArrowReader.java 
b/arrow/src/main/java/org/apache/iceberg/arrow/reader/ArrowReader.java
new file mode 100644
index 0000000..f7e1f6d
--- /dev/null
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/reader/ArrowReader.java
@@ -0,0 +1,242 @@
+package org.apache.iceberg.arrow.reader;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.spark.TaskContext;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.arrow.ArrowUtils;
+import org.apache.spark.sql.execution.arrow.ArrowWriter;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ArrowColumnVector;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.apache.spark.util.TaskCompletionListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/***
+ * This is a helper class for Arrow reading. It provides two main converter 
methods.
+ * These converter methods are currently used to first convert a Parquet 
FileIterator
+ * into Iterator over ArrowRecordBatches. Second, the ArrowRecordBatch is made
+ * into Columnar Batch and exposed as an Iterator over InternalRow. The second 
step is to
+ * done to conform to Spark's current interface. When Spark adds Arrow support 
we will
+ * take the second iterator out and just return the first one.
+ */
+public class ArrowReader {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ArrowReader.class);
+
+  /***
+   * Accepts an iterator over ArrowRecordBatches and copies into 
ColumnarBatches.
+   * Since Spark uses Iterator over InternalRow we return this over 
ColumarBatch.
+   * @param arrowBatchIter
+   * @param sparkSchema
+   * @param timeZoneId
+   * @return
+   */
+  public static InternalRowOverArrowBatchIterator fromBatchIterator(
+      Iterator<ArrowRecordBatch> arrowBatchIter,
+      StructType sparkSchema,
+      String timeZoneId) {
+
+    // timeZoneId required for TimestampType in StructType
+    Schema arrowSchema = ArrowUtils.toArrowSchema(sparkSchema, timeZoneId);
+    BufferAllocator allocator =
+        ArrowUtils.rootAllocator().newChildAllocator("fromBatchIterator", 0, 
Long.MAX_VALUE);
+
+    VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator);
+
+    return new InternalRowOverArrowBatchIterator(arrowBatchIter, allocator, 
root);
+  }
+
+  @NotThreadSafe
+  public static class InternalRowOverArrowBatchIterator implements 
Iterator<InternalRow>, Closeable {
+
+    private final Iterator<ArrowRecordBatch> arrowBatchIter;
+    private final BufferAllocator allocator;
+    private final VectorSchemaRoot root;
+
+    private Iterator<InternalRow> rowIter;
+
+    InternalRowOverArrowBatchIterator(Iterator<ArrowRecordBatch> 
arrowBatchIter,
+        BufferAllocator allocator,
+        VectorSchemaRoot root) {
+
+      this.arrowBatchIter = arrowBatchIter;
+      this.allocator = allocator;
+      this.root = root;
+
+    }
+
+
+
+    @Override
+    public boolean hasNext() {
+      if (rowIter != null && rowIter.hasNext()) {
+        return true;
+      }
+      if (arrowBatchIter.hasNext()) {
+        rowIter = nextBatch();
+        return true;
+      } else {
+        try {
+          close();
+        } catch (IOException ioe) {
+          throw new RuntimeException("Encountered an error while closing 
iterator. "+ioe.getMessage(), ioe);
+        }
+        return false;
+      }
+    }
+
+    @Override
+    public InternalRow next() {
+      return rowIter.next();
+    }
+
+    private Iterator<InternalRow> nextBatch() {
+      ArrowRecordBatch arrowRecordBatch = arrowBatchIter.next();
+      long start = System.currentTimeMillis();
+      root.setRowCount(0);
+      VectorLoader vectorLoader = new VectorLoader(root);
+      vectorLoader.load(arrowRecordBatch);
+      arrowRecordBatch.close();
+
+      List<FieldVector> fieldVectors = root.getFieldVectors();
+      ColumnVector[] columns = new ColumnVector[fieldVectors.size()];
+      for(int i=0; i<fieldVectors.size(); i++) {
+        columns[i] = new ArrowColumnVector(fieldVectors.get(i));
+      }
+
+      ColumnarBatch batch = new ColumnarBatch(columns);
+      batch.setNumRows(root.getRowCount());
+
+      LOG.info("[InternalRowOverArrowIterator] => Created Columnar Batch with 
"+root.getRowCount()+ " rows" +
+          ". Took " + (System.currentTimeMillis() - start) + " milliseconds.");
+      return batch.rowIterator();
+    }
+
+
+    @Override
+    public void close() throws IOException {
+      // arrowWriter.finish();
+      root.close();
+      allocator.close();
+    }
+
+  }
+
+  /**
+   * Acceepts Iterator over InternalRow coming in from ParqeutReader's 
FileIterator
+   * and creates ArrowRecordBatches over that by collecting rows from the 
input iter.
+   * Each next() call over this iterator will collect up to maxRecordsPerBatch 
rows
+   * at a time and create an Arrow batch with it and returns an iterator over 
that.
+   * @param rowIter
+   * @param sparkSchema
+   * @param maxRecordsPerBatch
+   * @param timezonId
+   * @return
+   */
+  public static ArrowRecordBatchIterator toBatchIterator(
+      Iterator<InternalRow> rowIter,
+      StructType sparkSchema, int maxRecordsPerBatch,
+      String timezonId) {
+
+    // StructType sparkSchema = SparkSchemaUtil.convert(icebergSchema);
+    TaskContext context = TaskContext.get();
+
+    Schema arrowSchema = ArrowUtils.toArrowSchema(sparkSchema, timezonId);
+    BufferAllocator allocator = ArrowUtils.rootAllocator().newChildAllocator(
+        "toBatchIterator",
+        0,
+        Long.MAX_VALUE);
+    VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator);
+
+    if (context!=null) {
+      context.addTaskCompletionListener(new TaskCompletionListener() {
+        @Override
+        public void onTaskCompletion(TaskContext context) {
+          root.close();
+          allocator.close();
+        }
+      });
+    }
+
+    return new ArrowRecordBatchIterator(rowIter, root, allocator, 
maxRecordsPerBatch);
+  }
+
+
+  public static class ArrowRecordBatchIterator implements 
Iterator<ArrowRecordBatch>, Closeable {
+
+    final Iterator<InternalRow> rowIterator;
+    final VectorSchemaRoot root;
+    final BufferAllocator allocator;
+    final int maxRecordsPerBatch;
+    final ArrowWriter arrowWriter;
+    final VectorUnloader unloader;
+
+    ArrowRecordBatchIterator(Iterator<InternalRow> rowIterator,
+        VectorSchemaRoot root,
+        BufferAllocator allocator,
+        int maxRecordsPerBatch) {
+
+      this.unloader = new VectorUnloader(root);
+      this.arrowWriter = ArrowWriter.create(root);
+      this.rowIterator = rowIterator;
+      this.root = root;
+      this.allocator = allocator;
+      this.maxRecordsPerBatch = maxRecordsPerBatch;
+    }
+
+    @Override
+    public boolean hasNext() {
+
+      if (!rowIterator.hasNext()) {
+
+        try {
+          close();
+        } catch (IOException ioe) {
+          throw new RuntimeException("Encountered an error while closing 
iterator. "+ioe.getMessage(), ioe);
+        }
+        return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public ArrowRecordBatch next() {
+
+      int rowCount = 0;
+
+      long start = System.currentTimeMillis();
+      while (rowIterator.hasNext() && (maxRecordsPerBatch <= 0 || rowCount < 
maxRecordsPerBatch)) {
+        InternalRow row = rowIterator.next();
+        arrowWriter.write(row);
+        rowCount += 1;
+      }
+      arrowWriter.finish();
+      LOG.info("[ArrowRecordBatchIterator] => Created batch with "+rowCount+ " 
rows. " +
+          "Took "+(System.currentTimeMillis() - start) + " milliseconds.");
+      ArrowRecordBatch batch = unloader.getRecordBatch();
+      return batch;
+    }
+
+    @Override
+    public void close() throws IOException {
+      // arrowWriter.finish();
+      root.close();
+      allocator.close();
+    }
+  }
+}
diff --git a/build.gradle b/build.gradle
index 282ce61..71e8065 100644
--- a/build.gradle
+++ b/build.gradle
@@ -64,6 +64,10 @@ subprojects {
   apply plugin: 'maven' // make pom files for deployment
   apply plugin: 'nebula.maven-base-publish'
 
+  artifacts {
+    archives sourceJar
+  }
+
   compileJava {
     options.encoding = "UTF-8"
   }
@@ -222,6 +226,14 @@ project(':iceberg-core') {
     compile("org.apache.avro:avro") {
       exclude group: 'org.tukaani' // xz compression is not supported
     }
+    compile("org.apache.arrow:arrow-vector") {
+      exclude group: 'io.netty', module: 'netty-buffer'
+      exclude group: 'io.netty', module: 'netty-common'
+    }
+    compileOnly("org.apache.spark:spark-hive_2.11") {
+      exclude group: 'org.apache.avro', module: 'avro'
+    }
+
 
     compile "com.fasterxml.jackson.core:jackson-databind"
     compile "com.fasterxml.jackson.core:jackson-core"
@@ -237,7 +249,11 @@ project(':iceberg-data') {
   dependencies {
     compile project(':iceberg-api')
     compile project(':iceberg-core')
+    compileOnly project(':iceberg-spark')
     compileOnly project(':iceberg-parquet')
+    compileOnly("org.apache.spark:spark-hive_2.11") {
+      exclude group: 'org.apache.avro', module: 'avro'
+    }
 
     testCompile("org.apache.hadoop:hadoop-client") {
       exclude group: 'org.apache.avro', module: 'avro'
@@ -323,14 +339,12 @@ project(':iceberg-parquet') {
   dependencies {
     compile project(':iceberg-api')
     compile project(':iceberg-core')
+    compile project(':iceberg-arrow')
 
-    compile("org.apache.parquet:parquet-avro") {
+    compileOnly("org.apache.spark:spark-hive_2.11") {
       exclude group: 'org.apache.avro', module: 'avro'
-      // already shaded by Parquet
-      exclude group: 'it.unimi.dsi'
-      exclude group: 'org.codehaus.jackson'
     }
-
+    compile "org.apache.parquet:parquet-avro"
     compileOnly "org.apache.avro:avro"
     compileOnly("org.apache.hadoop:hadoop-client") {
       exclude group: 'org.apache.avro', module: 'avro'
@@ -363,6 +377,17 @@ project(':iceberg-spark') {
   }
 }
 
+project(':iceberg-arrow') {
+  dependencies {
+//    compile project(':iceberg-spark')
+    compile project(':iceberg-api')
+
+    compileOnly("org.apache.spark:spark-hive_2.11") {
+      exclude group: 'org.apache.avro', module: 'avro'
+    }
+  }
+}
+
 project(':iceberg-pig') {
   dependencies {
     compile project(':iceberg-api')
diff --git a/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java 
b/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
new file mode 100644
index 0000000..492af61
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.Map;
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.ListType;
+import org.apache.iceberg.types.Types.MapType;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StructType;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+
+public class ArrowSchemaUtil {
+  static final String ORIGINAL_TYPE = "originalType";
+  static final String MAP_TYPE = "mapType";
+  static final String MAP_KEY = "key";
+  static final String MAP_VALUE = "value";
+
+  private ArrowSchemaUtil() { }
+
+  /**
+   * Convert Iceberg schema to Arrow Schema.
+   *
+   * @param schema iceberg schema
+   * @return arrow schema
+   */
+  public static Schema convert(final org.apache.iceberg.Schema schema) {
+    final ImmutableList.Builder<Field> fields = ImmutableList.builder();
+
+    for (NestedField f : schema.columns()) {
+      fields.add(convert(f));
+    }
+
+    return new Schema(fields.build());
+  }
+
+  public static Field convert(final NestedField field) {
+    final ArrowType arrowType;
+
+    final List<Field> children = Lists.newArrayList();
+    Map<String, String> metadata = null;
+
+    switch (field.type().typeId()) {
+      case BINARY:
+        arrowType = ArrowType.Binary.INSTANCE;
+        break;
+      case FIXED:
+        arrowType = new ArrowType.FixedSizeBinary(((Types.FixedType) 
field.type()).length());
+        break;
+      case BOOLEAN:
+        arrowType = ArrowType.Bool.INSTANCE;
+        break;
+      case INTEGER:
+        arrowType = new ArrowType.Int(Integer.SIZE, true);
+        break;
+      case LONG:
+        arrowType = new ArrowType.Int(Long.SIZE, true);
+        break;
+      case FLOAT:
+        arrowType = new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE);
+        break;
+      case DOUBLE:
+        arrowType = new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
+        break;
+      case DECIMAL:
+        final Types.DecimalType decimalType = (Types.DecimalType) field.type();
+        arrowType = new ArrowType.Decimal(decimalType.precision(), 
decimalType.scale());
+        break;
+      case STRING:
+        arrowType = ArrowType.Utf8.INSTANCE;
+        break;
+      case TIME:
+        arrowType = new ArrowType.Time(TimeUnit.MICROSECOND, Long.SIZE);
+        break;
+      case TIMESTAMP:
+        arrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC");
+        break;
+      case DATE:
+        arrowType = new ArrowType.Date(DateUnit.DAY);
+        break;
+      case STRUCT:
+        final StructType struct = field.type().asStructType();
+        arrowType = ArrowType.Struct.INSTANCE;
+
+        for (NestedField nested : struct.fields()) {
+          children.add(convert(nested));
+        }
+        break;
+      case LIST:
+        final ListType listType = field.type().asListType();
+        arrowType = ArrowType.List.INSTANCE;
+
+        for (NestedField nested : listType.fields()) {
+          children.add(convert(nested));
+        }
+        break;
+      case MAP:
+        //Maps are represented as List<Struct<key, value>>
+        metadata = ImmutableMap.of(ORIGINAL_TYPE, MAP_TYPE);
+        final MapType mapType = field.type().asMapType();
+        arrowType = ArrowType.List.INSTANCE;
+
+        final List<Field> entryFields = Lists.newArrayList(
+            convert(required(0, MAP_KEY, mapType.keyType())),
+            convert(optional(0, MAP_VALUE, mapType.valueType()))
+        );
+
+        final Field entry = new Field("",
+            new FieldType(true, new ArrowType.Struct(), null), entryFields);
+        children.add(entry);
+        break;
+      default: throw new UnsupportedOperationException("Unsupported field 
type: " + field);
+    }
+
+    return new Field(field.name(), new FieldType(field.isOptional(), 
arrowType, null, metadata), children);
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java 
b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java
index 553faef..e0de97d 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java
@@ -19,6 +19,8 @@
 
 package org.apache.iceberg.hadoop;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
@@ -69,19 +71,24 @@ public class HadoopTables implements Tables, Configurable {
    * location.
    *
    * @param schema iceberg schema used to create the table
-   * @param spec partition specification
+   * @param spec partitioning spec, if null the table will be unpartitioned
+   * @param properties a string map of table properties, initialized to empty 
if null
    * @param location a path URI (e.g. hdfs:///warehouse/my_table)
    * @return newly created table implementation
    */
   @Override
   public Table create(Schema schema, PartitionSpec spec, Map<String, String> 
properties,
                       String location) {
+    Preconditions.checkNotNull(schema, "A table schema is required");
+
     TableOperations ops = newTableOps(location);
     if (ops.current() != null) {
       throw new AlreadyExistsException("Table already exists at location: " + 
location);
     }
 
-    TableMetadata metadata = TableMetadata.newTableMetadata(ops, schema, spec, 
location, properties);
+    Map<String, String> tableProps = properties == null ? ImmutableMap.of() : 
properties;
+    PartitionSpec partitionSpec = spec == null ? PartitionSpec.unpartitioned() 
: spec;
+    TableMetadata metadata = TableMetadata.newTableMetadata(ops, schema, 
partitionSpec, location, tableProps);
     ops.commit(null, metadata);
 
     return new BaseTable(ops, location);
diff --git 
a/core/src/test/java/org/apache/iceberg/arrow/ArrowSchemaUtilTest.java 
b/core/src/test/java/org/apache/iceberg/arrow/ArrowSchemaUtilTest.java
new file mode 100644
index 0000000..f42ebde
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/arrow/ArrowSchemaUtilTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow;
+
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.BooleanType;
+import org.apache.iceberg.types.Types.DateType;
+import org.apache.iceberg.types.Types.DoubleType;
+import org.apache.iceberg.types.Types.ListType;
+import org.apache.iceberg.types.Types.LongType;
+import org.apache.iceberg.types.Types.MapType;
+import org.apache.iceberg.types.Types.StringType;
+import org.apache.iceberg.types.Types.TimestampType;
+import org.junit.Test;
+
+import static org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID.Bool;
+import static org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID.Date;
+import static 
org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID.FloatingPoint;
+import static org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID.Int;
+import static org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID.List;
+import static 
org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID.Timestamp;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+
+public class ArrowSchemaUtilTest {
+
+  @Test
+  public void convertPrimitive() {
+    Schema iceberg = new Schema(
+        optional(0, "i", Types.IntegerType.get()),
+        optional(1, "b", BooleanType.get()),
+        required(2, "d", DoubleType.get()),
+        required(3, "s", StringType.get()),
+        optional(4, "d2", DateType.get()),
+        optional(5, "ts", TimestampType.withoutZone())
+    );
+
+    org.apache.arrow.vector.types.pojo.Schema arrow = 
ArrowSchemaUtil.convert(iceberg);
+
+    System.out.println(iceberg);
+    System.out.println(arrow);
+
+    validate(iceberg, arrow);
+  }
+
+  @Test
+  public void convertComplex() {
+    Schema iceberg = new Schema(
+        optional(0, "m", MapType.ofOptional(
+            1, 2, StringType.get(),
+            LongType.get())
+        ),
+        required(3, "m2", MapType.ofOptional(
+            4, 5, StringType.get(),
+            ListType.ofOptional(6, TimestampType.withoutZone()))
+        )
+    );
+
+    org.apache.arrow.vector.types.pojo.Schema arrow = 
ArrowSchemaUtil.convert(iceberg);
+
+    System.out.println(iceberg);
+    System.out.println(arrow);
+
+    assertEquals(iceberg.columns().size(), arrow.getFields().size());
+  }
+
+  private void validate(Schema iceberg, 
org.apache.arrow.vector.types.pojo.Schema arrow) {
+    assertEquals(iceberg.columns().size(), arrow.getFields().size());
+
+    for (Types.NestedField nf : iceberg.columns()) {
+      Field field = arrow.findField(nf.name());
+      assertNotNull("Missing filed: " + nf, field);
+
+      validate(nf.type(), field.getType());
+    }
+  }
+
+  private void validate(Type iceberg, ArrowType arrow) {
+    switch (iceberg.typeId()) {
+      case BOOLEAN: assertEquals(Bool, arrow.getTypeID());
+        break;
+      case INTEGER: assertEquals(Int, arrow.getTypeID());
+        break;
+      case LONG: assertEquals(Int, arrow.getTypeID());
+        break;
+      case DOUBLE: assertEquals(FloatingPoint, arrow.getTypeID());
+        break;
+      case STRING: assertEquals(ArrowType.Utf8.INSTANCE.getTypeID(), 
arrow.getTypeID());
+        break;
+      case DATE: assertEquals(Date, arrow.getTypeID());
+        break;
+      case TIMESTAMP: assertEquals(Timestamp, arrow.getTypeID());
+        break;
+      case MAP: assertEquals(List, arrow.getTypeID());
+        break;
+      default: throw new UnsupportedOperationException("Check not implemented 
for type: " + iceberg);
+    }
+  }
+}
diff --git a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java 
b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
index 27a625d..b8d59c9 100644
--- a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
+++ b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.io.CloseableGroup;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.spark.SparkSchemaUtil;
 
 class TableScanIterable extends CloseableGroup implements 
CloseableIterable<Record> {
   private final TableOperations ops;
@@ -89,7 +90,7 @@ class TableScanIterable extends CloseableGroup implements 
CloseableIterable<Reco
 
       case PARQUET:
         Parquet.ReadBuilder parquet = Parquet.read(input)
-            .project(projection)
+            .project(projection, SparkSchemaUtil.convert(projection))
             .createReaderFunc(fileSchema -> 
GenericParquetReaders.buildReader(projection, fileSchema))
             .split(task.start(), task.length());
 
diff --git a/gradle.properties b/gradle.properties
index f2ff982..90a909a 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,2 +1,6 @@
 jmhOutputPath=build/reports/jmh/human-readable-output.txt
 jmhIncludeRegex=.*
+
+artifactory_contextUrl=https://artifactory.corp.adobe.com/artifactory
+artifactory_user_p=
+artifactory_key_p=
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 494f2c5..9ce8dff 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -55,6 +55,7 @@ import org.apache.parquet.hadoop.api.ReadSupport;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.schema.MessageType;
+import org.apache.spark.sql.types.StructType;
 
 import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
 import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT;
@@ -273,6 +274,7 @@ public class Parquet {
     private Long start = null;
     private Long length = null;
     private Schema schema = null;
+    private StructType sparkSchema = null;
     private Expression filter = null;
     private ReadSupport<?> readSupport = null;
     private Function<MessageType, ParquetValueReader<?>> readerFunc = null;
@@ -281,6 +283,7 @@ public class Parquet {
     private Map<String, String> properties = Maps.newHashMap();
     private boolean callInit = false;
     private boolean reuseContainers = false;
+    private int maxRecordsPerBatch = 1000;
 
     private ReadBuilder(InputFile file) {
       this.file = file;
@@ -299,6 +302,12 @@ public class Parquet {
       return this;
     }
 
+    public ReadBuilder project(Schema schema, StructType sparkSchema) {
+      this.schema = schema;
+      this.sparkSchema = sparkSchema;
+      return this;
+    }
+
     public ReadBuilder project(Schema schema) {
       this.schema = schema;
       return this;
@@ -348,6 +357,12 @@ public class Parquet {
       return this;
     }
 
+    public ReadBuilder recordsPerBatch(int numRowsPerBatch) {
+
+      this.maxRecordsPerBatch = numRowsPerBatch;
+      return this;
+    }
+
     @SuppressWarnings("unchecked")
     public <D> CloseableIterable<D> build() {
       if (readerFunc != null) {
@@ -374,7 +389,7 @@ public class Parquet {
         ParquetReadOptions options = optionsBuilder.build();
 
         return new org.apache.iceberg.parquet.ParquetReader<>(
-            file, schema, options, readerFunc, filter, reuseContainers, 
caseSensitive);
+            file, schema, options, readerFunc, filter, reuseContainers, 
caseSensitive, sparkSchema, maxRecordsPerBatch);
       }
 
       ParquetReadBuilder<D> builder = new 
ParquetReadBuilder<>(ParquetIO.file(file));
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvro.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvro.java
index 4c315c3..afcfb78 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvro.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvro.java
@@ -36,8 +36,8 @@ import org.apache.iceberg.avro.AvroSchemaVisitor;
 import org.apache.iceberg.avro.UUIDConversion;
 import org.apache.iceberg.types.TypeUtil;
 
-class ParquetAvro {
-  static Schema parquetAvroSchema(Schema avroSchema) {
+public class ParquetAvro {
+  public static Schema parquetAvroSchema(Schema avroSchema) {
     return AvroSchemaVisitor.visit(avroSchema, new 
ParquetDecimalSchemaConverter());
   }
 
@@ -173,7 +173,7 @@ class ParquetAvro {
     }
   }
 
-  static GenericData DEFAULT_MODEL = new SpecificData() {
+  public static GenericData DEFAULT_MODEL = new SpecificData() {
     private final Conversion<?> fixedDecimalConversion = new 
FixedDecimalConversion();
     private final Conversion<?> intDecimalConversion = new 
IntDecimalConversion();
     private final Conversion<?> longDecimalConversion = new 
LongDecimalConversion();
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java
index b4a675e..cbf2d1c 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java
@@ -37,9 +37,9 @@ import org.apache.parquet.io.api.Binary;
 
 import static org.apache.iceberg.expressions.ExpressionVisitors.visit;
 
-class ParquetFilters {
+public class ParquetFilters {
 
-  static FilterCompat.Filter convert(Schema schema, Expression expr, boolean 
caseSensitive) {
+  public static FilterCompat.Filter convert(Schema schema, Expression expr, 
boolean caseSensitive) {
     FilterPredicate pred = visit(expr, new ConvertFilterToParquet(schema, 
caseSensitive));
     // TODO: handle AlwaysFalse.INSTANCE
     if (pred != null && pred != AlwaysTrue.INSTANCE) {
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java
index 360a055..6432483 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java
@@ -44,11 +44,11 @@ import static 
org.apache.parquet.hadoop.util.HadoopOutputFile.fromPath;
 /**
  * Methods in this class translate from the IO API to Parquet's IO API.
  */
-class ParquetIO {
+public class ParquetIO {
   private ParquetIO() {
   }
 
-  static InputFile file(org.apache.iceberg.io.InputFile file) {
+  public static InputFile file(org.apache.iceberg.io.InputFile file) {
     // TODO: use reflection to avoid depending on classes from iceberg-hadoop
     // TODO: use reflection to avoid depending on classes from hadoop
     if (file instanceof HadoopInputFile) {
@@ -62,7 +62,7 @@ class ParquetIO {
     return new ParquetInputFile(file);
   }
 
-  static OutputFile file(org.apache.iceberg.io.OutputFile file) {
+  public static OutputFile file(org.apache.iceberg.io.OutputFile file) {
     if (file instanceof HadoopOutputFile) {
       HadoopOutputFile hfile = (HadoopOutputFile) file;
       try {
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java
index bc43448..7d6d72c 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java
@@ -31,7 +31,7 @@ import org.apache.parquet.hadoop.ParquetReader;
 public class ParquetIterable<T> extends CloseableGroup implements 
CloseableIterable<T> {
   private final ParquetReader.Builder<T> builder;
 
-  ParquetIterable(ParquetReader.Builder<T> builder) {
+  public ParquetIterable(ParquetReader.Builder<T> builder) {
     this.builder = builder;
   }
 
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java
index 8a0b44c..b0c8a31 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java
@@ -41,12 +41,12 @@ import static 
org.apache.iceberg.parquet.ParquetSchemaUtil.pruneColumnsFallback;
  *
  * @param <T> Java type produced by this read support instance
  */
-class ParquetReadSupport<T> extends ReadSupport<T> {
+public class ParquetReadSupport<T> extends ReadSupport<T> {
   private final Schema expectedSchema;
   private final ReadSupport<T> wrapped;
   private final boolean callInit;
 
-  ParquetReadSupport(Schema expectedSchema, ReadSupport<T> readSupport, 
boolean callInit) {
+  public ParquetReadSupport(Schema expectedSchema, ReadSupport<T> readSupport, 
boolean callInit) {
     this.expectedSchema = expectedSchema;
     this.wrapped = readSupport;
     this.callInit = callInit;
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
index 2ff2bdb..9f1e5fb 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
@@ -23,8 +23,10 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.TimeZone;
 import java.util.function.Function;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.arrow.reader.ArrowReader;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
@@ -36,6 +38,11 @@ import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.schema.MessageType;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.iceberg.parquet.ParquetSchemaUtil.addFallbackIds;
 import static org.apache.iceberg.parquet.ParquetSchemaUtil.hasIds;
@@ -50,18 +57,24 @@ public class ParquetReader<T> extends CloseableGroup 
implements CloseableIterabl
   private final Expression filter;
   private final boolean reuseContainers;
   private final boolean caseSensitive;
+  private final StructType sparkSchema;
+  private final int maxRecordsPerBatch;
+  private static final Logger LOG = 
LoggerFactory.getLogger(ParquetReader.class);
 
   public ParquetReader(InputFile input, Schema expectedSchema, 
ParquetReadOptions options,
                        Function<MessageType, ParquetValueReader<?>> readerFunc,
-                       Expression filter, boolean reuseContainers, boolean 
caseSensitive) {
+                       Expression filter, boolean reuseContainers, boolean 
caseSensitive,
+                       StructType sparkSchema, int maxRecordsPerBatch) {
     this.input = input;
     this.expectedSchema = expectedSchema;
+    this.sparkSchema = sparkSchema;
     this.options = options;
     this.readerFunc = readerFunc;
     // replace alwaysTrue with null to avoid extra work evaluating a trivial 
filter
     this.filter = filter == Expressions.alwaysTrue() ? null : filter;
     this.reuseContainers = reuseContainers;
     this.caseSensitive = caseSensitive;
+    this.maxRecordsPerBatch = maxRecordsPerBatch;
   }
 
   private static class ReadConf<T> {
@@ -185,11 +198,31 @@ public class ParquetReader<T> extends CloseableGroup 
implements CloseableIterabl
 
   @Override
   public Iterator<T> iterator() {
+    // create iterator over file
     FileIterator<T> iter = new FileIterator<>(init());
     addCloseable(iter);
+
     return iter;
   }
 
+  private Iterator<T> arrowBatchAsInternalRow(Iterator<InternalRow> iter) {
+    // Convert InterRow iterator to ArrowRecordBatch Iterator
+    Iterator<InternalRow> rowIterator = iter;
+    ArrowReader.ArrowRecordBatchIterator arrowBatchIter = 
ArrowReader.toBatchIterator(rowIterator,
+        sparkSchema, maxRecordsPerBatch,
+        TimeZone.getDefault().getID());
+    addCloseable(arrowBatchIter);
+
+    // Overlay InternalRow iterator over ArrowRecordbatches
+    ArrowReader.InternalRowOverArrowBatchIterator
+        rowOverbatchIter = ArrowReader.fromBatchIterator(arrowBatchIter,
+        sparkSchema, TimeZone.getDefault().getID());
+
+    addCloseable(rowOverbatchIter);
+
+    return (Iterator)rowOverbatchIter;
+  }
+
   private static class FileIterator<T> implements Iterator<T>, Closeable {
     private final ParquetFileReader reader;
     private final boolean[] shouldSkip;
@@ -226,7 +259,11 @@ public class ParquetReader<T> extends CloseableGroup 
implements CloseableIterabl
       } else {
         this.last = model.read(null);
       }
-      valuesRead += 1;
+      if (last instanceof ColumnarBatch) {
+        valuesRead += ((ColumnarBatch)last).numRows();
+      } else {
+        valuesRead += 1;
+      }
 
       return last;
     }
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
index ac61983..4529367 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
@@ -26,13 +26,18 @@ import java.lang.reflect.Array;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.iceberg.types.Types;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.Type;
+import org.apache.spark.sql.vectorized.ArrowColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
 
 import static java.util.Collections.emptyIterator;
 
@@ -576,6 +581,85 @@ public class ParquetValueReaders {
     }
   }
 
+
+  public static class ColumnarBatchReader implements 
ParquetValueReader<ColumnarBatch>  {
+
+    private final int numFields;
+    private final Types.StructType iceExpectedFields;
+    private final ParquetValueReader<FieldVector>[] readers;
+    private final TripleIterator<?> column;
+    private final TripleIterator<?>[] columns;
+    private final List<TripleIterator<?>> children;
+
+    @SuppressWarnings("unchecked")
+    public ColumnarBatchReader(List<Type> types,
+        Types.StructType icebergExpectedFields,
+        List<ParquetValueReader<FieldVector>> readers) {
+
+      this.numFields = readers.size();
+      this.iceExpectedFields = icebergExpectedFields;
+      this.readers = (ParquetValueReader<FieldVector>[]) Array.newInstance(
+          ParquetValueReader.class, readers.size());
+      this.columns = (TripleIterator<?>[]) 
Array.newInstance(TripleIterator.class, readers.size());
+
+
+      ImmutableList.Builder<TripleIterator<?>> columnsBuilder = 
ImmutableList.builder();
+      for (int i = 0; i < readers.size(); i += 1) {
+        ParquetValueReader<FieldVector> reader = readers.get(i);
+        this.readers[i] = readers.get(i);
+        this.columns[i] = reader.column();
+        columnsBuilder.addAll(reader.columns());
+      }
+
+      this.children = columnsBuilder.build();
+      if (children.size() > 0) {
+        this.column = children.get(0);
+      } else {
+        this.column = NullReader.NULL_COLUMN;
+      }
+
+    }
+
+    @Override
+    public final void setPageSource(PageReadStore pageStore) {
+      for (int i = 0; i < readers.length; i += 1) {
+        readers[i].setPageSource(pageStore);
+      }
+    }
+
+    @Override
+    public final TripleIterator<?> column() {
+      return column;
+    }
+
+    @Override
+    public List<TripleIterator<?>> columns() {
+      return children;
+    }
+
+
+    @Override
+    public final ColumnarBatch read(ColumnarBatch ignore) {
+
+      ArrowColumnVector[] arrowVectorArr = 
(ArrowColumnVector[])Array.newInstance(ArrowColumnVector.class,
+          readers.length);
+
+      int numRows=0;
+      for (int i = 0; i < readers.length; i += 1) {
+
+        FieldVector vec = readers[i].read(null);
+        arrowVectorArr[i] = new ArrowColumnVector(vec);
+        numRows = vec.getValueCount();
+      }
+
+      ColumnarBatch batch = new ColumnarBatch(arrowVectorArr);
+      batch.setNumRows(numRows);
+
+      return  batch;
+    }
+
+  }
+
   public abstract static class StructReader<T, I> implements 
ParquetValueReader<T> {
     private interface Setter<R> {
       void set(R record, int pos, Object reuse);
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java
index 633f9f8..d097c67 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java
@@ -26,12 +26,12 @@ import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.io.api.RecordConsumer;
 import org.apache.parquet.schema.MessageType;
 
-class ParquetWriteSupport<T> extends WriteSupport<T> {
+public class ParquetWriteSupport<T> extends WriteSupport<T> {
   private final MessageType type;
   private final Map<String, String> keyValueMetadata;
   private final WriteSupport<T> wrapped;
 
-  ParquetWriteSupport(MessageType type, Map<String, String> keyValueMetadata, 
WriteSupport<T> writeSupport) {
+  public ParquetWriteSupport(MessageType type, Map<String, String> 
keyValueMetadata, WriteSupport<T> writeSupport) {
     this.type = type;
     this.keyValueMetadata = keyValueMetadata;
     this.wrapped = writeSupport;
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
index c0956a1..e8f95ea 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
@@ -47,7 +47,7 @@ import static java.lang.Math.max;
 import static java.lang.Math.min;
 import static org.apache.iceberg.parquet.ParquetSchemaUtil.convert;
 
-class ParquetWriter<T> implements FileAppender<T>, Closeable {
+public class ParquetWriter<T> implements FileAppender<T>, Closeable {
   private static final DynConstructors.Ctor<PageWriteStore> pageStoreCtor = 
DynConstructors
       .builder(PageWriteStore.class)
       .hiddenImpl("org.apache.parquet.hadoop.ColumnChunkPageWriteStore",
diff --git a/settings.gradle b/settings.gradle
index 4145cfe..e5df47b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -25,6 +25,7 @@ include 'data'
 include 'orc'
 include 'parquet'
 include 'spark'
+include 'arrow'
 include 'pig'
 include 'runtime'
 include 'hive'
@@ -36,6 +37,7 @@ project(':data').name = 'iceberg-data'
 project(':orc').name = 'iceberg-orc'
 project(':parquet').name = 'iceberg-parquet'
 project(':spark').name = 'iceberg-spark'
+project(':arrow').name = 'iceberg-arrow'
 project(':pig').name = 'iceberg-pig'
 project(':runtime').name = 'iceberg-spark-runtime'
 project(':hive').name = 'iceberg-hive'
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java 
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
index 9a36266..e8cc083 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
@@ -118,6 +118,10 @@ public class SparkParquetReaders {
       this.type = type;
     }
 
+    protected MessageType getType() {
+      return type;
+    }
+
     @Override
     public ParquetValueReader<?> message(Types.StructType expected, 
MessageType message,
                                          List<ParquetValueReader<?>> 
fieldReaders) {
@@ -360,7 +364,7 @@ public class SparkParquetReaders {
     }
   }
 
-  private static class StringReader extends PrimitiveReader<UTF8String> {
+  protected static class StringReader extends PrimitiveReader<UTF8String> {
     StringReader(ColumnDescriptor desc) {
       super(desc);
     }
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedParquetValueReaders.java
 
b/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedParquetValueReaders.java
new file mode 100644
index 0000000..3b67c96
--- /dev/null
+++ 
b/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedParquetValueReaders.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.vector;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.iceberg.arrow.ArrowSchemaUtil;
+import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.Type;
+import org.apache.spark.sql.types.Decimal;
+
+/***
+ * Parquet Value Reader implementations for Vectorization.
+ * Contains type-wise readers to read parquet data as vectors.
+ * - Returns Arrow's Field Vector for each type.
+ * - Null values are explicitly handled.
+ * - Type serialization is done based on types in Arrow.
+ * - Creates One Vector per RowGroup. So a Batch would have as many rows as 
there are in the underlying RowGroup.
+ * - Mapping of Iceberg type to Arrow type is done in ArrowSchemaUtil.convert()
+ * - Iceberg to Arrow Type mapping :
+ *   icebergType : LONG       -   Field Vector Type : 
org.apache.arrow.vector.BigIntVector
+ *   icebergType : STRING     -   Field Vector Type : 
org.apache.arrow.vector.VarCharVector
+ *   icebergType : BOOLEAN    -   Field Vector Type : 
org.apache.arrow.vector.BitVector
+ *   icebergType : INTEGER    -   Field Vector Type : 
org.apache.arrow.vector.IntVector
+ *   icebergType : FLOAT      -   Field Vector Type : 
org.apache.arrow.vector.Float4Vector
+ *   icebergType : DOUBLE     -   Field Vector Type : 
org.apache.arrow.vector.Float8Vector
+ *   icebergType : DATE       -   Field Vector Type : 
org.apache.arrow.vector.DateDayVector
+ *   icebergType : TIMESTAMP  -   Field Vector Type : 
org.apache.arrow.vector.TimeStampMicroTZVector
+ *   icebergType : STRING     -   Field Vector Type : 
org.apache.arrow.vector.VarCharVector
+ *   icebergType : BINARY     -   Field Vector Type : 
org.apache.arrow.vector.VarBinaryVector
+ *   icebergField : DECIMAL   -   Field Vector Type : 
org.apache.arrow.vector.DecimalVector
+ */
+public class VectorizedParquetValueReaders {
+
+  public abstract static class VectorReader extends 
ParquetValueReaders.PrimitiveReader<FieldVector> {
+
+    private FieldVector vec;
+    private boolean isOptional;
+
+    VectorReader(ColumnDescriptor desc,
+        Types.NestedField icebergField,
+        RootAllocator rootAlloc) {
+
+      super(desc);
+      this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
+      this.isOptional = 
desc.getPrimitiveType().isRepetition(Type.Repetition.OPTIONAL);
+    }
+
+    protected FieldVector getVector() {
+      return vec;
+    }
+
+    protected boolean isOptional() {
+      return isOptional;
+    }
+
+    @Override
+    public FieldVector read(FieldVector ignore) {
+
+      vec.reset();
+      int ordinal = 0;
+
+      while (column.hasNext()) {
+        // Todo: this check works for flat schemas only
+        // need to get max definition level to do proper check
+        if (isOptional && column.currentDefinitionLevel() == 0) {
+          // handle null
+          column.nextNull();
+          nextNullAt(ordinal);
+        } else {
+          nextValueAt(ordinal);
+        }
+        ordinal++;
+      }
+      vec.setValueCount(ordinal);
+      return vec;
+    }
+
+
+    public int getRowCount() {
+      return vec.getValueCount();
+    }
+
+    protected abstract void nextNullAt(int ordinal);
+
+    protected abstract void nextValueAt(int ordinal);
+  }
+
+  protected static class StringReader extends VectorReader {
+
+    StringReader(ColumnDescriptor desc, Types.NestedField icebergField, 
RootAllocator rootAlloc) {
+      super(desc, icebergField, rootAlloc);
+    }
+
+    @Override
+    protected void nextNullAt(int ordinal) {
+      ((VarCharVector) getVector()).setNull(ordinal);
+    }
+
+    @Override
+    protected void nextValueAt(int ordinal) {
+
+      Binary binary = column.nextBinary();
+      if (binary == null) {
+
+        ((VarCharVector) getVector()).setNull(ordinal);
+
+      } else {
+        String utf8Str = binary.toStringUsingUTF8();
+        ((VarCharVector) getVector()).setSafe(ordinal, utf8Str.getBytes());
+      }
+    }
+
+  }
+
+  protected static class IntegerReader extends VectorReader {
+
+    IntegerReader(ColumnDescriptor desc,
+        Types.NestedField icebergField,
+        RootAllocator rootAlloc) {
+
+      super(desc, icebergField, rootAlloc);
+    }
+
+    @Override
+    protected void nextNullAt(int ordinal) {
+      ((IntVector) getVector()).setNull(ordinal);
+    }
+
+    protected void nextValueAt(int ordinal) {
+
+      int intValue = column.nextInteger();
+      ((IntVector) getVector()).setSafe(ordinal, intValue);
+
+    }
+  }
+
+  protected static class LongReader extends VectorReader {
+
+    LongReader(ColumnDescriptor desc,
+        Types.NestedField icebergField,
+        RootAllocator rootAlloc) {
+
+      super(desc, icebergField, rootAlloc);
+    }
+
+    protected void nextNullAt(int ordinal) {
+      ((BigIntVector) getVector()).setNull(ordinal);
+    }
+
+    protected void nextValueAt(int ordinal) {
+
+      long longValue = column.nextLong();
+      ((BigIntVector) getVector()).setSafe(ordinal, longValue);
+
+    }
+  }
+
+  protected static class TimestampMillisReader extends LongReader {
+
+    TimestampMillisReader(ColumnDescriptor desc,
+        Types.NestedField icebergField,
+        RootAllocator rootAlloc) {
+      super(desc, icebergField, rootAlloc);
+    }
+
+    protected void nextValueAt(int ordinal) {
+
+      long longValue = column.nextLong();
+      ((BigIntVector) getVector()).setSafe(ordinal, 1000 * longValue);
+
+    }
+  }
+
+  protected static class TimestampMicroReader extends VectorReader {
+
+    TimestampMicroReader(ColumnDescriptor desc,
+        Types.NestedField icebergField,
+        RootAllocator rootAlloc) {
+      super(desc, icebergField, rootAlloc);
+    }
+
+    protected void nextNullAt(int ordinal) {
+      ((TimeStampMicroTZVector) getVector()).setNull(ordinal);
+    }
+
+    protected void nextValueAt(int ordinal) {
+
+      long longValue = column.nextLong();
+      ((TimeStampMicroTZVector) getVector()).setSafe(ordinal, longValue);
+
+    }
+  }
+
+  protected static class BooleanReader extends VectorReader {
+
+    BooleanReader(ColumnDescriptor desc,
+        Types.NestedField icebergField,
+        RootAllocator rootAlloc) {
+      super(desc, icebergField, rootAlloc);
+    }
+
+    protected void nextNullAt(int ordinal) {
+      ((BitVector) getVector()).setNull(ordinal);
+    }
+
+    protected void nextValueAt(int ordinal) {
+
+      boolean bool = column.nextBoolean();
+      ((BitVector) getVector()).setSafe(ordinal, bool ? 1 : 0);
+
+    }
+  }
+
+
+  protected static class FloatReader extends VectorReader {
+
+    FloatReader(ColumnDescriptor desc,
+        Types.NestedField icebergField,
+        RootAllocator rootAlloc) {
+      super(desc, icebergField, rootAlloc);
+    }
+
+    protected void nextNullAt(int ordinal) {
+      ((Float4Vector) getVector()).setNull(ordinal);
+    }
+
+    protected void nextValueAt(int ordinal) {
+
+      float floatValue = column.nextFloat();
+      ((Float4Vector) getVector()).setSafe(ordinal, floatValue);
+
+    }
+  }
+
+  protected static class DoubleReader extends VectorReader {
+
+    DoubleReader(ColumnDescriptor desc,
+        Types.NestedField icebergField,
+        RootAllocator rootAlloc) {
+      super(desc, icebergField, rootAlloc);
+    }
+
+    protected void nextNullAt(int ordinal) {
+      ((Float8Vector) getVector()).setNull(ordinal);
+    }
+
+    protected void nextValueAt(int ordinal) {
+
+      double doubleValue = column.nextDouble();
+      ((Float8Vector) getVector()).setSafe(ordinal, doubleValue);
+
+    }
+  }
+
+
+  protected static class BinaryReader extends VectorReader {
+
+    BinaryReader(ColumnDescriptor desc,
+        Types.NestedField icebergField,
+        RootAllocator rootAlloc) {
+      super(desc, icebergField, rootAlloc);
+    }
+
+    protected void nextNullAt(int ordinal) {
+      ((VarBinaryVector) getVector()).setNull(ordinal);
+    }
+
+    protected void nextValueAt(int ordinal) {
+
+      Binary binaryValue = column.nextBinary();
+      ((VarBinaryVector) getVector()).setSafe(ordinal, binaryValue.getBytes());
+
+    }
+  }
+
+
+  protected static class DateReader extends VectorReader {
+
+    DateReader(ColumnDescriptor desc,
+        Types.NestedField icebergField,
+        RootAllocator rootAlloc) {
+      super(desc, icebergField, rootAlloc);
+    }
+
+    protected void nextNullAt(int ordinal) {
+      ((DateDayVector) getVector()).setNull(ordinal);
+    }
+
+    protected void nextValueAt(int ordinal) {
+
+      int dateValue = column.nextInteger();
+      ((DateDayVector) getVector()).setSafe(ordinal, dateValue);
+
+    }
+  }
+
+
+  protected static class IntegerDecimalReader extends VectorReader {
+    private final int precision;
+    private final int scale;
+
+    IntegerDecimalReader(ColumnDescriptor desc,
+        Types.NestedField icebergField,
+        RootAllocator rootAlloc,
+        int precision, int scale) {
+
+      super(desc, icebergField, rootAlloc);
+      this.precision = precision;
+      this.scale = scale;
+    }
+
+    protected void nextNullAt(int ordinal) {
+      ((DecimalVector) getVector()).setNull(ordinal);
+    }
+
+    protected void nextValueAt(int ordinal) {
+
+      int decimalIntValue = column.nextInteger();
+      Decimal decimalValue = Decimal.apply(decimalIntValue, precision, scale);
+
+      ((DecimalVector) getVector()).setSafe(ordinal, 
decimalValue.toJavaBigDecimal());
+
+    }
+  }
+
+  protected static class LongDecimalReader extends VectorReader {
+    private final int precision;
+    private final int scale;
+
+    LongDecimalReader(ColumnDescriptor desc,
+        Types.NestedField icebergField,
+        RootAllocator rootAlloc,
+        int precision, int scale) {
+
+      super(desc, icebergField, rootAlloc);
+      this.precision = precision;
+      this.scale = scale;
+    }
+
+    protected void nextNullAt(int ordinal) {
+      ((DecimalVector) getVector()).setNull(ordinal);
+    }
+
+    protected void nextValueAt(int ordinal) {
+
+      long decimalLongValue = column.nextLong();
+      Decimal decimalValue = Decimal.apply(decimalLongValue, precision, scale);
+
+      ((DecimalVector) getVector()).setSafe(ordinal, 
decimalValue.toJavaBigDecimal());
+
+    }
+  }
+
+  protected static class BinaryDecimalReader extends VectorReader {
+    private final int precision;
+    private final int scale;
+
+    BinaryDecimalReader(ColumnDescriptor desc,
+        Types.NestedField icebergField,
+        RootAllocator rootAlloc,
+        int precision, int scale) {
+
+      super(desc, icebergField, rootAlloc);
+      this.precision = precision;
+      this.scale = scale;
+    }
+
+    protected void nextNullAt(int ordinal) {
+      ((DecimalVector) getVector()).setNull(ordinal);
+    }
+
+    protected void nextValueAt(int ordinal) {
+
+      Binary binaryValue = column.nextBinary();
+      Decimal decimalValue = Decimal.fromDecimal(new BigDecimal(new 
BigInteger(binaryValue.getBytes()), scale));
+
+      ((DecimalVector) getVector()).setSafe(ordinal, 
decimalValue.toJavaBigDecimal());
+
+    }
+  }
+}
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java
 
b/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java
new file mode 100644
index 0000000..f283cda
--- /dev/null
+++ 
b/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.vector;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.arrow.ArrowSchemaUtil;
+import org.apache.iceberg.parquet.ParquetValueReader;
+import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+public class VectorizedSparkParquetReaders {
+
+  private VectorizedSparkParquetReaders() {
+  }
+
+  @SuppressWarnings("unchecked")
+  public static ParquetValueReader<ColumnarBatch> buildReader(
+      Schema tableSchema,
+      Schema expectedSchema,
+      MessageType fileSchema) {
+
+    return (ParquetValueReader<ColumnarBatch>)
+        TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+            new ReadBuilder(tableSchema, expectedSchema, fileSchema));
+  }
+
+  private static class ReadBuilder extends 
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+    private final MessageType parquetSchema;
+    private final Schema projectedIcebergSchema;
+    private final Schema tableIcebergSchema;
+    private final org.apache.arrow.vector.types.pojo.Schema arrowSchema;
+    private final RootAllocator rootAllocator;
+
+    ReadBuilder(Schema tableSchema, Schema projectedIcebergSchema, MessageType 
parquetSchema) {
+      this.parquetSchema = parquetSchema;
+      this.tableIcebergSchema = tableSchema;
+      this.projectedIcebergSchema = projectedIcebergSchema;
+      this.arrowSchema = ArrowSchemaUtil.convert(projectedIcebergSchema);
+      this.rootAllocator = new RootAllocator(Long.MAX_VALUE);
+    }
+
+    @Override
+    public ParquetValueReader<?> message(Types.StructType expected, 
MessageType message,
+        List<ParquetValueReader<?>> fieldReaders) {
+      return struct(expected, message.asGroupType(), fieldReaders);
+    }
+
+    @Override
+    public ParquetValueReader<?> struct(Types.StructType expected, GroupType 
struct,
+        List<ParquetValueReader<?>> fieldReaders) {
+
+      // this works on struct fields and the root iceberg schema which itself 
is a struct.
+
+      // match the expected struct's order
+      Map<Integer, ParquetValueReader<FieldVector>> readersById = 
Maps.newHashMap();
+      Map<Integer, Type> typesById = Maps.newHashMap();
+      List<Type> fields = struct.getFields();
+
+      for (int i = 0; i < fields.size(); i += 1) {
+        Type fieldType = fields.get(i);
+        int fieldD = 
parquetSchema.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
+        int id = fieldType.getId().intValue();
+        // Todo: figure out optional vield reading for vectorized reading
+        // readersById.put(id, 
(ParquetValueReader<FieldVector>)ParquetValueReaders.
+        //     option(fieldType, fieldD, fieldReaders.get(i)));
+
+        readersById.put(id, (ParquetValueReader<FieldVector>) 
fieldReaders.get(i));
+        typesById.put(id, fieldType);
+      }
+
+      List<Types.NestedField> icebergFields = expected != null ?
+          expected.fields() : ImmutableList.of();
+
+      List<ParquetValueReader<FieldVector>> reorderedFields = 
Lists.newArrayListWithExpectedSize(
+          icebergFields.size());
+
+      List<Type> types = 
Lists.newArrayListWithExpectedSize(icebergFields.size());
+
+      for (Types.NestedField field : icebergFields) {
+        int id = field.fieldId();
+        ParquetValueReader<FieldVector> reader = readersById.get(id);
+        if (reader != null) {
+          reorderedFields.add(reader);
+          types.add(typesById.get(id));
+        } else {
+          reorderedFields.add(ParquetValueReaders.nulls());
+          types.add(null);
+        }
+      }
+
+      return new ParquetValueReaders.ColumnarBatchReader(types, expected, 
reorderedFields);
+    }
+
+
+    @Override
+    public ParquetValueReader<?> 
primitive(org.apache.iceberg.types.Type.PrimitiveType expected,
+        PrimitiveType primitive) {
+
+      // Create arrow vector for this field
+      int parquetFieldId = primitive.getId().intValue();
+      ColumnDescriptor desc = 
parquetSchema.getColumnDescription(currentPath());
+      Types.NestedField icebergField = 
tableIcebergSchema.findField(parquetFieldId);
+      // int fieldD = 
parquetSchema.getMaxDefinitionLevel(path(primitive.getName())) - 1;
+      // Field field = 
ArrowSchemaUtil.convert(projectedIcebergSchema.findField(parquetFieldId));
+      // FieldVector vec = field.createVector(rootAllocator);
+
+      if (primitive.getOriginalType() != null) {
+        switch (primitive.getOriginalType()) {
+          case ENUM:
+          case JSON:
+          case UTF8:
+            return new VectorizedParquetValueReaders.StringReader(desc, 
icebergField, rootAllocator);
+          case INT_8:
+          case INT_16:
+          case INT_32:
+            return new VectorizedParquetValueReaders.IntegerReader(desc, 
icebergField, rootAllocator);
+            // if (expected != null && expected.typeId() == 
Types.LongType.get().typeId()) {
+            //   return new ParquetValueReaders.IntAsLongReader(desc);
+            // } else {
+            //   return new ParquetValueReaders.UnboxedReader(desc);
+            // }
+          case DATE:
+            return new VectorizedParquetValueReaders.DateReader(desc, 
icebergField, rootAllocator);
+          case INT_64:
+            return new VectorizedParquetValueReaders.LongReader(desc, 
icebergField, rootAllocator);
+          case TIMESTAMP_MICROS:
+            return new 
VectorizedParquetValueReaders.TimestampMicroReader(desc, icebergField, 
rootAllocator);
+          case TIMESTAMP_MILLIS:
+            return new 
VectorizedParquetValueReaders.TimestampMillisReader(desc, icebergField, 
rootAllocator);
+          case DECIMAL:
+            DecimalMetadata decimal = primitive.getDecimalMetadata();
+            switch (primitive.getPrimitiveTypeName()) {
+              case BINARY:
+              case FIXED_LEN_BYTE_ARRAY:
+                return new 
VectorizedParquetValueReaders.BinaryDecimalReader(desc, icebergField, 
rootAllocator,
+                    decimal.getPrecision(),
+                    decimal.getScale());
+              case INT64:
+                return new 
VectorizedParquetValueReaders.LongDecimalReader(desc, icebergField, 
rootAllocator,
+                    decimal.getPrecision(),
+                    decimal.getScale());
+              case INT32:
+                return new 
VectorizedParquetValueReaders.IntegerDecimalReader(desc, icebergField, 
rootAllocator,
+                    decimal.getPrecision(),
+                    decimal.getScale());
+              default:
+                throw new UnsupportedOperationException(
+                    "Unsupported base type for decimal: " + 
primitive.getPrimitiveTypeName());
+            }
+          case BSON:
+            return new VectorizedParquetValueReaders.BinaryReader(desc, 
icebergField, rootAllocator);
+          default:
+            throw new UnsupportedOperationException(
+                "Unsupported logical type: " + primitive.getOriginalType());
+        }
+      }
+
+      switch (primitive.getPrimitiveTypeName()) {
+        case FIXED_LEN_BYTE_ARRAY:
+        case BINARY:
+          return new VectorizedParquetValueReaders.BinaryReader(desc, 
icebergField, rootAllocator);
+        case INT32:
+          return new VectorizedParquetValueReaders.IntegerReader(desc, 
icebergField, rootAllocator);
+        case FLOAT:
+          return new VectorizedParquetValueReaders.FloatReader(desc, 
icebergField, rootAllocator);
+          // if (expected != null && expected.typeId() == 
org.apache.iceberg.types.Type.TypeID.DOUBLE) {
+          //   return new ParquetValueReaders.FloatAsDoubleReader(desc);
+          // } else {
+          //   return new ParquetValueReaders.UnboxedReader<>(desc);
+          // }
+        case BOOLEAN:
+          return new VectorizedParquetValueReaders.BooleanReader(desc, 
icebergField, rootAllocator);
+        case INT64:
+          return new VectorizedParquetValueReaders.LongReader(desc, 
icebergField, rootAllocator);
+        case DOUBLE:
+          return new VectorizedParquetValueReaders.DoubleReader(desc, 
icebergField, rootAllocator);
+        default:
+          throw new UnsupportedOperationException("Unsupported type: " + 
primitive);
+      }
+    }
+
+    private String[] currentPath() {
+      String[] path = new String[fieldNames.size()];
+      if (!fieldNames.isEmpty()) {
+        Iterator<String> iter = fieldNames.descendingIterator();
+        for (int i = 0; iter.hasNext(); i += 1) {
+          path[i] = iter.next();
+        }
+      }
+
+      return path;
+    }
+
+    protected MessageType type() {
+      return parquetSchema;
+    }
+
+    protected String[] path(String name) {
+      String[] path = new String[fieldNames.size() + 1];
+      path[fieldNames.size()] = name;
+
+      if (!fieldNames.isEmpty()) {
+        Iterator<String> iter = fieldNames.descendingIterator();
+        for (int i = 0; iter.hasNext(); i += 1) {
+          path[i] = iter.next();
+        }
+      }
+
+      return path;
+    }
+  }
+}
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java 
b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index b6dfade..8229b12 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -26,10 +26,7 @@ import java.util.Optional;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.hive.HiveCatalog;
-import org.apache.iceberg.hive.HiveCatalogs;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.types.CheckCompatibility;
 import org.apache.spark.sql.SaveMode;
@@ -46,11 +43,14 @@ import 
org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
 import org.apache.spark.sql.streaming.OutputMode;
 import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, 
DataSourceRegister, StreamWriteSupport {
 
   private SparkSession lazySpark = null;
   private Configuration lazyConf = null;
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergSource.class);
 
   @Override
   public String shortName() {
@@ -98,31 +98,31 @@ public class IcebergSource implements DataSourceV2, 
ReadSupport, WriteSupport, D
     Optional<String> path = options.get("path");
     Preconditions.checkArgument(path.isPresent(), "Cannot open table: path is 
not set");
 
-    if (path.get().contains("/")) {
-      HadoopTables tables = new HadoopTables(conf);
-      return tables.load(path.get());
-    } else {
-      HiveCatalog hiveCatalog = HiveCatalogs.loadCatalog(conf);
-      TableIdentifier tableIdentifier = TableIdentifier.parse(path.get());
-      return hiveCatalog.loadTable(tableIdentifier);
-    }
+    // if (path.get().contains("/")) {
+    HadoopTables tables = new HadoopTables(conf);
+    return tables.load(path.get());
+    // } else {
+    //   HiveCatalog hiveCatalog = HiveCatalogs.loadCatalog(conf);
+    //   TableIdentifier tableIdentifier = TableIdentifier.parse(path.get());
+    //   return hiveCatalog.loadTable(tableIdentifier);
+    // }
   }
 
-  private SparkSession lazySparkSession() {
+  protected SparkSession lazySparkSession() {
     if (lazySpark == null) {
       this.lazySpark = SparkSession.builder().getOrCreate();
     }
     return lazySpark;
   }
 
-  private Configuration lazyBaseConf() {
+  protected Configuration lazyBaseConf() {
     if (lazyConf == null) {
       this.lazyConf = lazySparkSession().sparkContext().hadoopConfiguration();
     }
     return lazyConf;
   }
 
-  private Table getTableAndResolveHadoopConfiguration(
+  protected Table getTableAndResolveHadoopConfiguration(
       DataSourceOptions options, Configuration conf) {
     // Overwrite configurations from the Spark Context with configurations 
from the options.
     mergeIcebergHadoopConfs(conf, options.asMap());
@@ -134,7 +134,7 @@ public class IcebergSource implements DataSourceV2, 
ReadSupport, WriteSupport, D
     return table;
   }
 
-  private static void mergeIcebergHadoopConfs(
+  protected static void mergeIcebergHadoopConfs(
       Configuration baseConf, Map<String, String> options) {
     options.keySet().stream()
         .filter(key -> key.startsWith("hadoop."))
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java 
b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index 91359ff..75d72f1 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -22,7 +22,6 @@ package org.apache.iceberg.spark.source;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import java.io.Closeable;
 import java.io.IOException;
@@ -31,6 +30,7 @@ import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import org.apache.iceberg.CombinedScanTask;
@@ -59,15 +59,13 @@ import org.apache.iceberg.spark.SparkFilters;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.SparkAvroReader;
 import org.apache.iceberg.spark.data.SparkOrcReader;
-import org.apache.iceberg.spark.data.SparkParquetReaders;
-import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.spark.data.vector.VectorizedSparkParquetReaders;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.ByteBuffers;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.Attribute;
 import org.apache.spark.sql.catalyst.expressions.AttributeReference;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.catalyst.expressions.JoinedRow;
 import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
 import org.apache.spark.sql.sources.Filter;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
@@ -78,6 +76,7 @@ import org.apache.spark.sql.sources.v2.reader.Statistics;
 import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
 import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
 import org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics;
+import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch;
 import org.apache.spark.sql.types.BinaryType;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
@@ -85,14 +84,15 @@ import org.apache.spark.sql.types.DecimalType;
 import org.apache.spark.sql.types.StringType;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
 import org.apache.spark.unsafe.types.UTF8String;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import scala.collection.JavaConverters;
 
-class Reader implements DataSourceReader, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns,
+class Reader implements DataSourceReader,
+    SupportsScanColumnarBatch,
+    SupportsPushDownFilters,
+    SupportsPushDownRequiredColumns,
     SupportsReportStatistics {
-  private static final Logger LOG = LoggerFactory.getLogger(Reader.class);
 
   private static final Filter[] NO_FILTERS = new Filter[0];
 
@@ -102,23 +102,33 @@ class Reader implements DataSourceReader, 
SupportsPushDownFilters, SupportsPushD
   private final FileIO fileIo;
   private final EncryptionManager encryptionManager;
   private final boolean caseSensitive;
+  private int numRecordsPerBatch;
   private StructType requestedSchema = null;
   private List<Expression> filterExpressions = null;
   private Filter[] pushedFilters = NO_FILTERS;
 
   // lazy variables
-  private Schema schema = null;
+  private Schema schema;
   private StructType type = null; // cached because Spark accesses it multiple 
times
   private List<CombinedScanTask> tasks = null; // lazy cache of tasks
+  private static final int DEFAULT_NUM_RECORDS_PER_BATCH = 1000;
 
   Reader(Table table, boolean caseSensitive, DataSourceOptions options) {
     this.table = table;
     this.snapshotId = 
options.get("snapshot-id").map(Long::parseLong).orElse(null);
     this.asOfTimestamp = 
options.get("as-of-timestamp").map(Long::parseLong).orElse(null);
+    Optional<String> numRecordsPerBatchOpt = 
options.get("iceberg.read.numrecordsperbatch");
+    this.numRecordsPerBatch = DEFAULT_NUM_RECORDS_PER_BATCH;
+    if (numRecordsPerBatchOpt.isPresent()) {
+      this.numRecordsPerBatch = Integer.parseInt(numRecordsPerBatchOpt.get());
+    }
+    // LOG.info("[IcebergSource] => Reading numRecordsPerBatch = 
"+numRecordsPerBatch);
+
     if (snapshotId != null && asOfTimestamp != null) {
       throw new IllegalArgumentException(
           "Cannot scan using both snapshot-id and as-of-timestamp to select 
the table snapshot");
     }
+
     this.schema = table.schema();
     this.fileIo = table.io();
     this.encryptionManager = table.encryption();
@@ -149,20 +159,37 @@ class Reader implements DataSourceReader, 
SupportsPushDownFilters, SupportsPushD
   }
 
   @Override
-  public List<InputPartition<InternalRow>> planInputPartitions() {
+  public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
     String tableSchemaString = SchemaParser.toJson(table.schema());
     String expectedSchemaString = SchemaParser.toJson(lazySchema());
 
-    List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
+    List<InputPartition<ColumnarBatch>> readTasks = Lists.newArrayList();
     for (CombinedScanTask task : tasks()) {
       readTasks.add(
-          new ReadTask(task, tableSchemaString, expectedSchemaString, fileIo, 
encryptionManager, caseSensitive));
+          new ReadTask(task, tableSchemaString, expectedSchemaString, fileIo, 
encryptionManager,
+              caseSensitive, numRecordsPerBatch));
     }
 
     return readTasks;
   }
 
   @Override
+  public List<InputPartition<InternalRow>> planInputPartitions() {
+    return null;
+    // String tableSchemaString = SchemaParser.toJson(table.schema());
+    // String expectedSchemaString = SchemaParser.toJson(lazySchema());
+    //
+    // List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
+    // for (CombinedScanTask task : tasks()) {
+    //   readTasks.add(
+    //       new ReadTask(task, tableSchemaString, expectedSchemaString, 
fileIo, encryptionManager,
+    //           caseSensitive, numRecordsPerBatch));
+    // }
+    //
+    // return readTasks;
+  }
+
+  @Override
   public Filter[] pushFilters(Filter[] filters) {
     this.tasks = null; // invalidate cached tasks, if present
 
@@ -256,32 +283,34 @@ class Reader implements DataSourceReader, 
SupportsPushDownFilters, SupportsPushD
         table, lazySchema().asStruct(), filterExpressions, caseSensitive);
   }
 
-  private static class ReadTask implements InputPartition<InternalRow>, 
Serializable {
+  private static class ReadTask implements InputPartition<ColumnarBatch>, 
Serializable {
     private final CombinedScanTask task;
     private final String tableSchemaString;
     private final String expectedSchemaString;
     private final FileIO fileIo;
     private final EncryptionManager encryptionManager;
     private final boolean caseSensitive;
+    private final int numRecordsPerBatch;
 
     private transient Schema tableSchema = null;
     private transient Schema expectedSchema = null;
 
     private ReadTask(
         CombinedScanTask task, String tableSchemaString, String 
expectedSchemaString, FileIO fileIo,
-        EncryptionManager encryptionManager, boolean caseSensitive) {
+        EncryptionManager encryptionManager, boolean caseSensitive, int 
numRecordsPerBatch) {
       this.task = task;
       this.tableSchemaString = tableSchemaString;
       this.expectedSchemaString = expectedSchemaString;
       this.fileIo = fileIo;
       this.encryptionManager = encryptionManager;
       this.caseSensitive = caseSensitive;
+      this.numRecordsPerBatch = numRecordsPerBatch;
     }
 
     @Override
-    public InputPartitionReader<InternalRow> createPartitionReader() {
+    public InputPartitionReader<ColumnarBatch> createPartitionReader() {
       return new TaskDataReader(task, lazyTableSchema(), lazyExpectedSchema(), 
fileIo,
-        encryptionManager, caseSensitive);
+        encryptionManager, caseSensitive, numRecordsPerBatch);
     }
 
     private Schema lazyTableSchema() {
@@ -299,7 +328,7 @@ class Reader implements DataSourceReader, 
SupportsPushDownFilters, SupportsPushD
     }
   }
 
-  private static class TaskDataReader implements 
InputPartitionReader<InternalRow> {
+  private static class TaskDataReader implements 
InputPartitionReader<ColumnarBatch> {
     // for some reason, the apply method can't be called from Java without 
reflection
     private static final DynMethods.UnboundMethod APPLY_PROJECTION = 
DynMethods.builder("apply")
         .impl(UnsafeProjection.class, InternalRow.class)
@@ -311,13 +340,14 @@ class Reader implements DataSourceReader, 
SupportsPushDownFilters, SupportsPushD
     private final FileIO fileIo;
     private final Map<String, InputFile> inputFiles;
     private final boolean caseSensitive;
+    private final int numRecordsPerBatch;
 
-    private Iterator<InternalRow> currentIterator = null;
+    private Iterator<ColumnarBatch> currentIterator;
     private Closeable currentCloseable = null;
-    private InternalRow current = null;
+    private ColumnarBatch current = null;
 
     TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema 
expectedSchema, FileIO fileIo,
-                   EncryptionManager encryptionManager, boolean caseSensitive) 
{
+                          EncryptionManager encryptionManager, boolean 
caseSensitive, int numRecordsPerBatch) {
       this.fileIo = fileIo;
       this.tasks = task.files().iterator();
       this.tableSchema = tableSchema;
@@ -333,6 +363,7 @@ class Reader implements DataSourceReader, 
SupportsPushDownFilters, SupportsPushD
       // open last because the schemas and fileIo must be set
       this.currentIterator = open(tasks.next());
       this.caseSensitive = caseSensitive;
+      this.numRecordsPerBatch = numRecordsPerBatch;
     }
 
     @Override
@@ -353,7 +384,7 @@ class Reader implements DataSourceReader, 
SupportsPushDownFilters, SupportsPushD
     }
 
     @Override
-    public InternalRow get() {
+    public ColumnarBatch get() {
       return current;
     }
 
@@ -368,7 +399,7 @@ class Reader implements DataSourceReader, 
SupportsPushDownFilters, SupportsPushD
       }
     }
 
-    private Iterator<InternalRow> open(FileScanTask task) {
+    private Iterator<ColumnarBatch> open(FileScanTask task) {
       DataFile file = task.file();
 
       // schema or rows returned by readers
@@ -383,23 +414,24 @@ class Reader implements DataSourceReader, 
SupportsPushDownFilters, SupportsPushD
       boolean hasExtraFilterColumns = requiredSchema.columns().size() != 
finalSchema.columns().size();
 
       Schema iterSchema;
-      Iterator<InternalRow> iter;
-
-      if (hasJoinedPartitionColumns) {
-        // schema used to read data files
-        Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns);
-        Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns);
-        PartitionRowConverter convertToRow = new 
PartitionRowConverter(partitionSchema, spec);
-        JoinedRow joined = new JoinedRow();
-
-        InternalRow partition = convertToRow.apply(file.partition());
-        joined.withRight(partition);
-
-        // create joined rows and project from the joined schema to the final 
schema
-        iterSchema = TypeUtil.join(readSchema, partitionSchema);
-        iter = Iterators.transform(open(task, readSchema), joined::withLeft);
-
-      } else if (hasExtraFilterColumns) {
+      Iterator<ColumnarBatch> iter;
+
+      // if (hasJoinedPartitionColumns) {
+        // // schema used to read data files
+        // Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns);
+        // Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns);
+        // PartitionRowConverter convertToRow = new 
PartitionRowConverter(partitionSchema, spec);
+        // JoinedRow joined = new JoinedRow();
+        //
+        // InternalRow partition = convertToRow.apply(file.partition());
+        // joined.withRight(partition);
+        //
+        // // create joined rows and project from the joined schema to the 
final schema
+        // iterSchema = TypeUtil.join(readSchema, partitionSchema);
+        // iter = Iterators.transform(open(task, readSchema), 
joined::withLeft);
+      //
+      // } else if (hasExtraFilterColumns) {
+      if (hasExtraFilterColumns) {
         // add projection to the final schema
         iterSchema = requiredSchema;
         iter = open(task, requiredSchema);
@@ -411,36 +443,37 @@ class Reader implements DataSourceReader, 
SupportsPushDownFilters, SupportsPushD
       }
 
       // TODO: remove the projection by reporting the iterator's schema back 
to Spark
-      return Iterators.transform(iter,
-          APPLY_PROJECTION.bind(projection(finalSchema, iterSchema))::invoke);
+      // return Iterators.transform(iter,
+      //     APPLY_PROJECTION.bind(projection(finalSchema, 
iterSchema))::invoke);
+      return iter;
     }
 
-    private Iterator<InternalRow> open(FileScanTask task, Schema readSchema) {
-      CloseableIterable<InternalRow> iter;
-      if (task.isDataTask()) {
-        iter = newDataIterable(task.asDataTask(), readSchema);
-
-      } else {
-        InputFile location = inputFiles.get(task.file().path().toString());
-        Preconditions.checkNotNull(location, "Could not find InputFile 
associated with FileScanTask");
-
-        switch (task.file().format()) {
-          case PARQUET:
-            iter = newParquetIterable(location, task, readSchema);
-            break;
-
-          case AVRO:
-            iter = newAvroIterable(location, task, readSchema);
-            break;
-
-          case ORC:
-            iter = newOrcIterable(location, task, readSchema);
-            break;
-
-          default:
-            throw new UnsupportedOperationException(
-                "Cannot read unknown format: " + task.file().format());
-        }
+    private Iterator<ColumnarBatch> open(FileScanTask task, Schema readSchema) 
{
+      CloseableIterable<ColumnarBatch> iter;
+      // if (task.isDataTask()) {
+      //   iter = newDataIterable(task.asDataTask(), readSchema);
+      //
+      // } else {
+      InputFile location = inputFiles.get(task.file().path().toString());
+      Preconditions.checkNotNull(location, "Could not find InputFile 
associated with FileScanTask");
+
+      switch (task.file().format()) {
+        case PARQUET:
+          iter = newParquetIterable(location, task, readSchema);
+          break;
+        //
+        // case AVRO:
+        //   iter = newAvroIterable(location, task, readSchema);
+        //   break;
+        //
+        // case ORC:
+        //   iter = newOrcIterable(location, task, readSchema);
+        //   break;
+        //
+        default:
+          throw new UnsupportedOperationException(
+              "Cannot read unknown format: " + task.file().format());
+        // }
       }
 
       this.currentCloseable = iter;
@@ -481,15 +514,17 @@ class Reader implements DataSourceReader, 
SupportsPushDownFilters, SupportsPushD
           .build();
     }
 
-    private CloseableIterable<InternalRow> newParquetIterable(InputFile 
location,
+    private CloseableIterable<ColumnarBatch> newParquetIterable(InputFile 
location,
                                                             FileScanTask task,
                                                             Schema readSchema) 
{
       return Parquet.read(location)
-          .project(readSchema)
+          .project(readSchema, SparkSchemaUtil.convert(readSchema))
           .split(task.start(), task.length())
-          .createReaderFunc(fileSchema -> 
SparkParquetReaders.buildReader(readSchema, fileSchema))
+          .createReaderFunc(fileSchema -> 
VectorizedSparkParquetReaders.buildReader(tableSchema, readSchema,
+              fileSchema))
           .filter(task.residual())
           .caseSensitive(caseSensitive)
+          .recordsPerBatch(numRecordsPerBatch)
           .build();
     }
 
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java 
b/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java
index 4b89526..080c0d1 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java
@@ -55,7 +55,9 @@ public class RandomData {
     RandomDataGenerator generator = new RandomDataGenerator(schema, seed);
     List<Record> records = Lists.newArrayListWithExpectedSize(numRecords);
     for (int i = 0; i < numRecords; i += 1) {
-      records.add((Record) TypeUtil.visit(schema, generator));
+      Record rec = (Record) TypeUtil.visit(schema, generator);
+      // System.out.println("Add record "+rec);
+      records.add(rec);
     }
 
     return records;
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java 
b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index c20bc67..f5ee124 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -53,6 +53,7 @@ import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.types.MapType;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
 import org.apache.spark.unsafe.types.UTF8String;
 import org.junit.Assert;
 import scala.collection.Seq;
@@ -64,7 +65,8 @@ import static 
scala.collection.JavaConverters.seqAsJavaListConverter;
 @SuppressWarnings("checkstyle:OverloadMethodsDeclarationOrder")
 public class TestHelpers {
 
-  private TestHelpers() {}
+  private TestHelpers() {
+  }
 
   public static void assertEqualsSafe(Types.StructType struct, Record rec, Row 
row) {
     List<Types.NestedField> fields = struct.fields();
@@ -205,6 +207,33 @@ public class TestHelpers {
     }
   }
 
+  public static void assertEqualsUnsafe(Types.StructType struct, List<Record> 
expected, ColumnarBatch batch) {
+    List<Types.NestedField> fields = struct.fields();
+    for (int r = 0; r < batch.numRows(); r++) {
+
+      Record expRec = expected.get(r);
+      InternalRow actualRow = batch.getRow(r);
+
+      for (int i = 0; i < fields.size(); i += 1) {
+
+        Type fieldType = fields.get(i).type();
+        Object expectedValue = expRec.get(i);
+        // System.out.println("   -> Checking Row "+r+", field #"+i
+        //     + " , Field:"+ fields.get(i).name()
+        //     + " , optional:"+fields.get(i).isOptional()
+        //     + " , type:"+fieldType.typeId()
+        //     + " , expected:"+expectedValue);
+        if (actualRow.isNullAt(i)) {
+
+          Assert.assertTrue("Expect null", expectedValue == null);
+        } else {
+          Object actualValue = actualRow.get(i, convert(fieldType));
+          assertEqualsUnsafe(fieldType, expectedValue, actualValue);
+        }
+      }
+    }
+  }
+
   private static void assertEqualsUnsafe(Types.ListType list, Collection<?> 
expected, ArrayData actual) {
     Type elementType = list.elementType();
     List<?> expectedElements = Lists.newArrayList(expected);
diff --git 
a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetVectorizedReader.java
 
b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetVectorizedReader.java
new file mode 100644
index 0000000..da9a466
--- /dev/null
+++ 
b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetVectorizedReader.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.spark.data.vector.VectorizedSparkParquetReaders;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe;
+
+public class TestSparkParquetVectorizedReader extends AvroDataTest {
+
+  @Override
+  protected void writeAndValidate(Schema schema) throws IOException {
+
+    // Write test data
+    Assume.assumeTrue("Parquet Avro cannot write non-string map keys", null == 
TypeUtil.find(schema,
+        type -> type.isMapType() && type.asMapType().keyType() != 
Types.StringType.get()));
+
+    List<GenericData.Record> expected = RandomData.generateList(schema, 100, 
0L);
+
+    // write a test parquet file using iceberg writer
+    File testFile = temp.newFile();
+    Assert.assertTrue("Delete should succeed", testFile.delete());
+
+    try (FileAppender<GenericData.Record> writer = 
Parquet.write(Files.localOutput(testFile))
+        .schema(schema)
+        .named("test")
+        .build()) {
+      writer.addAll(expected);
+    }
+
+
+    try (CloseableIterable<ColumnarBatch> batchReader = 
Parquet.read(Files.localInput(testFile))
+        .project(schema)
+        .createReaderFunc(type -> 
VectorizedSparkParquetReaders.buildReader(schema, schema, type))
+        .build()) {
+
+      Iterator<ColumnarBatch> batches = batchReader.iterator();
+      int numRowsRead = 0;
+      int numExpectedRead = 0;
+      while (batches.hasNext()) {
+
+        ColumnarBatch batch = batches.next();
+        numRowsRead += batch.numRows();
+
+        List<GenericData.Record> expectedBatch = new 
ArrayList<>(batch.numRows());
+        for (int i = numExpectedRead; i < numExpectedRead + batch.numRows(); 
i++) {
+          expectedBatch.add(expected.get(i));
+        }
+
+        // System.out.println("-> Check "+numExpectedRead+" - "+ 
(numExpectedRead+batch.numRows()));
+        assertEqualsUnsafe(schema.asStruct(), expectedBatch, batch);
+
+        System.out.println("Batch read with " + batch.numRows() + " rows. Read 
" + numRowsRead + " till now. " +
+            "Expected batch " + expectedBatch.size());
+
+        numExpectedRead += batch.numRows();
+      }
+
+      Assert.assertEquals(expected.size(), numRowsRead);
+
+    }
+  }
+
+  @Test
+  public void testArray() throws IOException {
+    System.out.println("Not Supported");
+  }
+
+  @Test
+  public void testArrayOfStructs() throws IOException {
+    System.out.println("Not Supported");
+  }
+
+  @Test
+  public void testMap() throws IOException {
+    System.out.println("Not Supported");
+  }
+
+  @Test
+  public void testNumericMapKey() throws IOException {
+    System.out.println("Not Supported");
+  }
+
+  @Test
+  public void testComplexMapKey() throws IOException {
+    System.out.println("Not Supported");
+  }
+
+  @Test
+  public void testMapOfStructs() throws IOException {
+    System.out.println("Not Supported");
+  }
+
+  @Test
+  public void testMixedTypes() throws IOException {
+    System.out.println("Not Supported");
+  }
+}
diff --git a/versions.lock b/versions.lock
index b7e574d..2f314a2 100644
--- a/versions.lock
+++ b/versions.lock
@@ -7,16 +7,17 @@ com.carrotsearch:hppc:0.7.2 (1 constraints: f70cda14)
 com.clearspring.analytics:stream:2.7.0 (1 constraints: 1a0dd136)
 com.esotericsoftware:kryo-shaded:4.0.2 (2 constraints: b71345a6)
 com.esotericsoftware:minlog:1.3.0 (1 constraints: 670e7c4f)
-com.fasterxml.jackson.core:jackson-annotations:2.7.9 (4 constraints: f24786bf)
+com.fasterxml.jackson.core:jackson-annotations:2.7.9 (5 constraints: f154e19f)
 com.fasterxml.jackson.core:jackson-core:2.7.9 (5 constraints: d748db55)
-com.fasterxml.jackson.core:jackson-databind:2.7.9 (8 constraints: a77bca51)
+com.fasterxml.jackson.core:jackson-databind:2.7.9 (9 constraints: a688bc53)
 com.fasterxml.jackson.module:jackson-module-paranamer:2.7.9 (1 constraints: 
e0154200)
 com.fasterxml.jackson.module:jackson-module-scala_2.11:2.7.9 (1 constraints: 
7f0da251)
 com.github.ben-manes.caffeine:caffeine:2.7.0 (1 constraints: 0b050a36)
 com.github.luben:zstd-jni:1.3.2-2 (1 constraints: 760d7c51)
-com.google.code.findbugs:jsr305:3.0.2 (10 constraints: c483db75)
+com.google.code.findbugs:jsr305:3.0.2 (9 constraints: d276cf3c)
 com.google.code.gson:gson:2.2.4 (1 constraints: 8c0d3f2f)
 com.google.errorprone:error_prone_annotations:2.3.3 (2 constraints: 161a2544)
+com.google.flatbuffers:flatbuffers-java:1.9.0 (2 constraints: e5199714)
 com.google.guava:failureaccess:1.0.1 (1 constraints: 140ae1b4)
 com.google.guava:guava:28.0-jre (23 constraints: cc5c2ea0)
 com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava (1 
constraints: bd17c918)
@@ -40,7 +41,6 @@ com.twitter:chill-java:0.9.3 (2 constraints: a716716f)
 com.twitter:chill_2.11:0.9.3 (2 constraints: 121b92c3)
 com.twitter:parquet-hadoop-bundle:1.6.0 (3 constraints: 7c262424)
 com.univocity:univocity-parsers:2.7.3 (1 constraints: c40ccb27)
-com.vlkan:flatbuffers:1.2.0-3f79e055 (2 constraints: 411e1dee)
 commons-beanutils:commons-beanutils:1.7.0 (1 constraints: da0e635f)
 commons-beanutils:commons-beanutils-core:1.8.0 (1 constraints: 1d134124)
 commons-cli:commons-cli:1.2 (8 constraints: 9467c282)
@@ -66,6 +66,7 @@ io.dropwizard.metrics:metrics-json:3.1.5 (1 constraints: 
1a0dc936)
 io.dropwizard.metrics:metrics-jvm:3.1.5 (1 constraints: 1a0dc936)
 io.netty:netty:3.9.9.Final (9 constraints: 9eb0396d)
 io.netty:netty-all:4.1.17.Final (3 constraints: d2312526)
+it.unimi.dsi:fastutil:7.0.13 (1 constraints: fc0d4043)
 javax.activation:activation:1.1.1 (1 constraints: 140dbb36)
 javax.annotation:javax.annotation-api:1.2 (2 constraints: 2d21193d)
 javax.inject:javax.inject:1 (4 constraints: 852d0c1a)
@@ -94,10 +95,10 @@ org.antlr:antlr4-runtime:4.7 (1 constraints: 7a0e125f)
 org.antlr:stringtemplate:3.2.1 (1 constraints: c10a3bc6)
 org.apache.ant:ant:1.9.1 (3 constraints: a721ed14)
 org.apache.ant:ant-launcher:1.9.1 (1 constraints: 69082485)
-org.apache.arrow:arrow-format:0.10.0 (1 constraints: 1f0de721)
-org.apache.arrow:arrow-memory:0.10.0 (1 constraints: 1f0de721)
-org.apache.arrow:arrow-vector:0.10.0 (1 constraints: e90c9734)
-org.apache.avro:avro:1.8.2 (4 constraints: 3d2eebf3)
+org.apache.arrow:arrow-format:0.12.0 (1 constraints: 210ded21)
+org.apache.arrow:arrow-memory:0.12.0 (1 constraints: 210ded21)
+org.apache.arrow:arrow-vector:0.12.0 (2 constraints: 1d122345)
+org.apache.avro:avro:1.8.2 (5 constraints: 083cf387)
 org.apache.avro:avro-ipc:1.8.2 (1 constraints: f90b5bf4)
 org.apache.avro:avro-mapred:1.8.2 (2 constraints: 3a1a4787)
 org.apache.calcite:calcite-avatica:1.2.0-incubating (4 constraints: a044b922)
diff --git a/versions.props b/versions.props
index 7f71842..80c334e 100644
--- a/versions.props
+++ b/versions.props
@@ -1,6 +1,7 @@
 org.slf4j:slf4j-api = 1.7.5
 com.google.guava:guava = 28.0-jre
 org.apache.avro:avro = 1.8.2
+org.apache.arrow:arrow-vector = 0.12.0
 org.apache.hadoop:* = 2.7.3
 org.apache.hive:hive-standalone-metastore = 1.2.1
 org.apache.orc:orc-core = 1.5.5

Reply via email to