This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d11edd  ORC read integration for Spark 2.4.0 (#139)
6d11edd is described below

commit 6d11edd196c6ba7813af2145035787ac2b41ffda
Author: Edgar Rodriguez <[email protected]>
AuthorDate: Thu May 16 13:15:37 2019 -0700

    ORC read integration for Spark 2.4.0 (#139)
---
 build.gradle                                       |   5 +-
 .../java/org/apache/iceberg/orc/ColumnIdMap.java   |  23 +-
 orc/src/main/java/org/apache/iceberg/orc/ORC.java  |  85 ++-
 .../org/apache/iceberg/orc/OrcFileAppender.java    | 103 ++-
 .../java/org/apache/iceberg/orc/OrcIterable.java   | 125 ++++
 .../org/apache/iceberg/orc/OrcValueReader.java     |  34 +
 .../org/apache/iceberg/orc/OrcValueWriter.java     |  37 +
 .../org/apache/iceberg/orc/TypeConversion.java     |  23 +-
 ...erator.java => VectorizedRowBatchIterator.java} |  34 +-
 .../apache/iceberg/spark/data/SparkOrcReader.java  | 782 +++++++++++++++++++++
 .../apache/iceberg/spark/data/SparkOrcWriter.java  | 409 +++++++++++
 .../org/apache/iceberg/spark/source/Reader.java    |  17 +
 .../org/apache/iceberg/spark/data/TestHelpers.java |   3 +
 .../iceberg/spark/data/TestSparkOrcReader.java     |  64 ++
 14 files changed, 1641 insertions(+), 103 deletions(-)

diff --git a/build.gradle b/build.gradle
index 5161e92..51d4382 100644
--- a/build.gradle
+++ b/build.gradle
@@ -76,7 +76,7 @@ subprojects {
   ext {
     hadoopVersion = '2.7.3'
     avroVersion = '1.8.2'
-    orcVersion = '1.4.2'
+    orcVersion = '1.5.5'
     parquetVersion = '1.10.0'
     hiveVersion = '1.2.1'
 
@@ -266,6 +266,7 @@ project(':iceberg-spark') {
     compile project(':iceberg-api')
     compile project(':iceberg-common')
     compile project(':iceberg-core')
+    compile project(':iceberg-orc')
     compile project(':iceberg-parquet')
 
     compileOnly "org.apache.avro:avro:$avroVersion"
@@ -369,11 +370,13 @@ project(':iceberg-presto-runtime') {
     dependencies {
         shadow project(':iceberg-api')
         shadow project(':iceberg-core')
+        shadow project(':iceberg-orc')
         shadow project(':iceberg-parquet')
         shadow project(':iceberg-hive')
 
         shadow "org.apache.parquet:parquet-avro:$parquetVersion"
         shadow "org.apache.avro:avro:$avroVersion"
+        shadow "org.apache.orc:orc-core:$orcVersion:nohive"
         shadow ("org.apache.hive:hive-metastore:$hiveVersion") {
             exclude group: 'org.apache.hadoop', module: 'hadoop-common'
 //            exclude group: 'org.apache.orc', module: 'orc-core'
diff --git a/orc/src/main/java/org/apache/iceberg/orc/ColumnIdMap.java 
b/orc/src/main/java/org/apache/iceberg/orc/ColumnIdMap.java
index 330554f..16dc3b0 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/ColumnIdMap.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/ColumnIdMap.java
@@ -1,17 +1,20 @@
 /*
- * Copyright 2018 Hortonworks
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
  *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.iceberg.orc;
diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java 
b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
index 157a761..c6cb036 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
@@ -1,38 +1,47 @@
 /*
- * Copyright 2018 Hortonworks
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
  *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.iceberg.orc;
 
 import com.google.common.base.Preconditions;
-import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.hadoop.HadoopInputFile;
 import org.apache.iceberg.hadoop.HadoopOutputFile;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.orc.OrcConf;
 import org.apache.orc.OrcFile;
-import org.apache.orc.Reader;
 import org.apache.orc.TypeDescription;
 
+import static 
org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch.DEFAULT_SIZE;
+
 public class ORC {
+
+  static final String VECTOR_ROW_BATCH_SIZE = "iceberg.orc.vectorbatch.size";
+
   private ORC() {
   }
 
@@ -44,6 +53,7 @@ public class ORC {
     private final OutputFile file;
     private final Configuration conf;
     private Schema schema = null;
+    private Function<TypeDescription, OrcValueWriter<?>>  createWriterFunc;
     private Map<String, byte[]> metadata = new HashMap<>();
 
     private WriteBuilder(OutputFile file) {
@@ -65,15 +75,27 @@ public class ORC {
       return this;
     }
 
+    public WriteBuilder createWriterFunc(Function<TypeDescription, 
OrcValueWriter<?>> writerFunction) {
+      this.createWriterFunc = writerFunction;
+      return this;
+    }
+
+    public WriteBuilder setAll(Map<String, String> properties) {
+      properties.forEach(conf::set);
+      return this;
+    }
+
     public WriteBuilder schema(Schema schema) {
       this.schema = schema;
       return this;
     }
 
-    public OrcFileAppender build() {
-      OrcFile.WriterOptions options =
-          OrcFile.writerOptions(conf);
-      return new OrcFileAppender(schema, file, options, metadata);
+    public <D> FileAppender<D> build() {
+      Preconditions.checkNotNull(schema, "Schema is required");
+      OrcFile.WriterOptions options = OrcFile.writerOptions(conf);
+      return new OrcFileAppender<>(TypeConversion.toOrc(schema, new 
ColumnIdMap()),
+          this.file, createWriterFunc, options, metadata,
+          conf.getInt(VECTOR_ROW_BATCH_SIZE, DEFAULT_SIZE));
     }
   }
 
@@ -88,6 +110,8 @@ public class ORC {
     private Long start = null;
     private Long length = null;
 
+    private Function<Schema, OrcValueReader<?>> readerFunction;
+
     private ReadBuilder(InputFile file) {
       Preconditions.checkNotNull(file, "Input file cannot be null");
       this.file = file;
@@ -116,27 +140,24 @@ public class ORC {
       return this;
     }
 
+    public ReadBuilder caseSensitive(boolean caseSensitive) {
+      OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(this.conf, 
caseSensitive);
+      return this;
+    }
+
     public ReadBuilder config(String property, String value) {
       conf.set(property, value);
       return this;
     }
 
-    public OrcIterator build() {
+    public ReadBuilder createReaderFunc(Function<Schema, OrcValueReader<?>> 
readerFunction) {
+      this.readerFunction = readerFunction;
+      return this;
+    }
+
+    public <D> CloseableIterable<D> build() {
       Preconditions.checkNotNull(schema, "Schema is required");
-      try {
-        Path path = new Path(file.location());
-        Reader reader = OrcFile.createReader(path, 
OrcFile.readerOptions(conf));
-        ColumnIdMap columnIds = new ColumnIdMap();
-        TypeDescription orcSchema = TypeConversion.toOrc(schema, columnIds);
-        Reader.Options options = reader.options();
-        if (start != null) {
-          options.range(start, length);
-        }
-        options.schema(orcSchema);
-        return new OrcIterator(path, orcSchema, reader.rows(options));
-      } catch (IOException e) {
-        throw new RuntimeException("Can't open " + file.location(), e);
-      }
+      return new OrcIterable<>(file, conf, schema, start, length, 
readerFunction);
     }
   }
 }
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java 
b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
index 257e084..60c738c 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
@@ -1,18 +1,22 @@
 /*
- * Copyright 2018 Hortonworks
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
  *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
+
 package org.apache.iceberg.orc;
 
 import com.google.common.base.Preconditions;
@@ -20,9 +24,9 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.function.Function;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.Metrics;
-import org.apache.iceberg.Schema;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.orc.ColumnStatistics;
@@ -34,36 +38,40 @@ import 
org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 /**
  * Create a file appender for ORC.
  */
-public class OrcFileAppender implements FileAppender<VectorizedRowBatch> {
-  private final Writer writer;
+class OrcFileAppender<D> implements FileAppender<D> {
+  private final int batchSize;
   private final TypeDescription orcSchema;
   private final ColumnIdMap columnIds = new ColumnIdMap();
   private final Path path;
+  private final Writer writer;
+  private final VectorizedRowBatch batch;
+  private final OrcValueWriter<D> valueWriter;
   private boolean isClosed = false;
 
-  public static final String COLUMN_NUMBERS_ATTRIBUTE = "iceberg.column.ids";
+  private static final String COLUMN_NUMBERS_ATTRIBUTE = "iceberg.column.ids";
 
-  OrcFileAppender(Schema schema,
-                  OutputFile file,
-                  OrcFile.WriterOptions options,
-                  Map<String,byte[]> metadata) {
-    orcSchema = TypeConversion.toOrc(schema, columnIds);
-    options.setSchema(orcSchema);
+  OrcFileAppender(TypeDescription schema, OutputFile file,
+                  Function<TypeDescription, OrcValueWriter<?>> 
createWriterFunc,
+                  OrcFile.WriterOptions options, Map<String, byte[]> metadata,
+                  int batchSize) {
+    orcSchema = schema;
     path = new Path(file.location());
-    try {
-      writer = OrcFile.createWriter(path, options);
-    } catch (IOException e) {
-      throw new RuntimeException("Can't create file " + path, e);
-    }
-    writer.addUserMetadata(COLUMN_NUMBERS_ATTRIBUTE, columnIds.serialize());
-    metadata.forEach(
-        (key,value) -> writer.addUserMetadata(key, ByteBuffer.wrap(value)));
+    this.batchSize = batchSize;
+    batch = orcSchema.createRowBatch(this.batchSize);
+
+    options.setSchema(orcSchema);
+    writer = newOrcWriter(file, columnIds, options, metadata);
+    valueWriter = newOrcValueWriter(orcSchema, createWriterFunc);
   }
 
   @Override
-  public void add(VectorizedRowBatch datum) {
+  public void add(D datum) {
     try {
-      writer.addRowBatch(datum);
+      valueWriter.write(datum, batch);
+      if (batch.size == this.batchSize) {
+        writer.addRowBatch(batch);
+        batch.reset();
+      }
     } catch (IOException e) {
       throw new RuntimeException("Problem writing to ORC file " + path, e);
     }
@@ -108,12 +116,39 @@ public class OrcFileAppender implements 
FileAppender<VectorizedRowBatch> {
   @Override
   public void close() throws IOException {
     if (!isClosed) {
-      this.isClosed = true;
-      writer.close();
+      try {
+        if (batch.size > 0) {
+          writer.addRowBatch(batch);
+          batch.reset();
+        }
+      } finally {
+        writer.close();
+        this.isClosed = true;
+      }
     }
   }
 
-  public TypeDescription getSchema() {
-    return orcSchema;
+  private static Writer newOrcWriter(OutputFile file,
+                                     ColumnIdMap columnIds,
+                                     OrcFile.WriterOptions options, 
Map<String, byte[]> metadata) {
+    final Path locPath = new Path(file.location());
+    final Writer writer;
+
+    try {
+      writer = OrcFile.createWriter(locPath, options);
+    } catch (IOException e) {
+      throw new RuntimeException("Can't create file " + locPath, e);
+    }
+
+    writer.addUserMetadata(COLUMN_NUMBERS_ATTRIBUTE, columnIds.serialize());
+    metadata.forEach((key,value) -> writer.addUserMetadata(key, 
ByteBuffer.wrap(value)));
+
+    return writer;
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <D> OrcValueWriter<D> newOrcValueWriter(TypeDescription 
schema,
+                                                         
Function<TypeDescription, OrcValueWriter<?>> createWriterFunc) {
+    return (OrcValueWriter<D>) createWriterFunc.apply(schema);
   }
 }
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java 
b/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
new file mode 100644
index 0000000..b4bed83
--- /dev/null
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.orc;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+/**
+ * Iterable used to read rows from ORC.
+ */
+class OrcIterable<T> extends CloseableGroup implements CloseableIterable<T> {
+  private final Configuration config;
+  private final Schema schema;
+  private final InputFile file;
+  private final Long start;
+  private final Long length;
+  private final Function<Schema, OrcValueReader<?>> readerFunction;
+
+  OrcIterable(InputFile file, Configuration config, Schema schema,
+                     Long start, Long length,
+                     Function<Schema, OrcValueReader<?>> readerFunction) {
+    this.schema = schema;
+    this.readerFunction = readerFunction;
+    this.file = file;
+    this.start = start;
+    this.length = length;
+    this.config = config;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Iterator<T> iterator() {
+    return new OrcIterator(
+        newOrcIterator(file, TypeConversion.toOrc(schema, new ColumnIdMap()),
+            start, length, newFileReader(file, config)),
+        readerFunction.apply(schema));
+  }
+
+  private static VectorizedRowBatchIterator newOrcIterator(InputFile file,
+                                                           TypeDescription 
readerSchema,
+                                                           Long start, Long 
length,
+                                                           Reader 
orcFileReader) {
+    final Reader.Options options = orcFileReader.options();
+    if (start != null) {
+      options.range(start, length);
+    }
+    options.schema(readerSchema);
+
+    try {
+      return new VectorizedRowBatchIterator(file.location(), readerSchema, 
orcFileReader.rows(options));
+    } catch (IOException ioe) {
+      throw new RuntimeIOException(ioe, "Failed to get ORC rows for file: %s", 
file);
+    }
+  }
+
+  private static Reader newFileReader(InputFile file, Configuration config) {
+    try {
+      return OrcFile.createReader(new Path(file.location()),
+          OrcFile.readerOptions(config));
+    } catch (IOException ioe) {
+      throw new RuntimeIOException(ioe, "Failed to open file: %s", file);
+    }
+  }
+
+  private static class OrcIterator<T> implements Iterator<T> {
+
+    private int nextRow;
+    private VectorizedRowBatch current;
+
+    final VectorizedRowBatchIterator batchIter;
+    final OrcValueReader<T> reader;
+
+    OrcIterator(VectorizedRowBatchIterator batchIter, OrcValueReader<T> 
reader) {
+      this.batchIter = batchIter;
+      this.reader = reader;
+      current = null;
+      nextRow = 0;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return (current != null && nextRow < current.size) || 
batchIter.hasNext();
+    }
+
+    @Override
+    public T next() {
+      if (current == null || nextRow >= current.size) {
+        current = batchIter.next();
+        nextRow = 0;
+      }
+
+      return this.reader.read(current, nextRow++);
+    }
+  }
+
+}
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcValueReader.java 
b/orc/src/main/java/org/apache/iceberg/orc/OrcValueReader.java
new file mode 100644
index 0000000..cfc9ebb
--- /dev/null
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcValueReader.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.orc;
+
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+/**
+ * Used for implementing ORC value readers.
+ */
+public interface OrcValueReader<T> {
+
+  /**
+   * Reads a value in row.
+   */
+  T read(VectorizedRowBatch batch, int row);
+
+}
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java 
b/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java
new file mode 100644
index 0000000..5f1e167
--- /dev/null
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.orc;
+
+import java.io.IOException;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+/**
+ * Write data value of a schema.
+ */
+public interface OrcValueWriter<T> {
+
+  /**
+   * Writes the data.
+   * @param value the data value to write.
+   * @param output the VectorizedRowBatch to which the output will be written.
+   * @throws IOException if there's any IO error while writing the data value.
+   */
+  void write(T value, VectorizedRowBatch output) throws IOException;
+}
diff --git a/orc/src/main/java/org/apache/iceberg/orc/TypeConversion.java 
b/orc/src/main/java/org/apache/iceberg/orc/TypeConversion.java
index bc57f8d..f9839f6 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/TypeConversion.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/TypeConversion.java
@@ -1,17 +1,20 @@
 /*
- * Copyright 2018 Hortonworks
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
  *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.iceberg.orc;
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcIterator.java 
b/orc/src/main/java/org/apache/iceberg/orc/VectorizedRowBatchIterator.java
similarity index 55%
rename from orc/src/main/java/org/apache/iceberg/orc/OrcIterator.java
rename to 
orc/src/main/java/org/apache/iceberg/orc/VectorizedRowBatchIterator.java
index 589e5ee..ddc0bce 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/OrcIterator.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/VectorizedRowBatchIterator.java
@@ -1,17 +1,20 @@
 /*
- * Copyright 2018 Hortonworks
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
  *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.iceberg.orc;
@@ -19,7 +22,6 @@ package org.apache.iceberg.orc;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Iterator;
-import org.apache.hadoop.fs.Path;
 import org.apache.orc.RecordReader;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
@@ -29,14 +31,14 @@ import 
org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
  * Because the same VectorizedRowBatch is reused on each call to next,
  * it gets changed when hasNext or next is called.
  */
-public class OrcIterator implements Iterator<VectorizedRowBatch>, Closeable {
-  private final Path filename;
+public class VectorizedRowBatchIterator implements 
Iterator<VectorizedRowBatch>, Closeable {
+  private final String fileLocation;
   private final RecordReader rows;
   private final VectorizedRowBatch batch;
   private boolean advanced = false;
 
-  OrcIterator(Path filename, TypeDescription schema, RecordReader rows) {
-    this.filename = filename;
+  VectorizedRowBatchIterator(String fileLocation, TypeDescription schema, 
RecordReader rows) {
+    this.fileLocation = fileLocation;
     this.rows = rows;
     this.batch = schema.createRowBatch();
   }
@@ -51,7 +53,7 @@ public class OrcIterator implements 
Iterator<VectorizedRowBatch>, Closeable {
       try {
         rows.nextBatch(batch);
       } catch (IOException e) {
-        throw new RuntimeException("Problem reading ORC file " + filename, e);
+        throw new RuntimeException("Problem reading ORC file " + fileLocation, 
e);
       }
       advanced = true;
     }
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java 
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
new file mode 100644
index 0000000..301a550
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
@@ -0,0 +1,782 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.orc.ColumnIdMap;
+import org.apache.iceberg.orc.OrcValueReader;
+import org.apache.iceberg.orc.TypeConversion;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.storage.serde2.io.DateWritable;
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
+import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter;
+import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
+import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.Platform;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.util.List;
+
+/**
+ * Converts the OrcInterator, which returns ORC's VectorizedRowBatch to a
+ * set of Spark's UnsafeRows.
+ *
+ * It minimizes allocations by reusing most of the objects in the 
implementation.
+ */
+public class SparkOrcReader implements OrcValueReader<InternalRow> {
+  private final static int INITIAL_SIZE = 128 * 1024;
+  private final int numFields;
+  private final TypeDescription readSchema;
+
+  public SparkOrcReader(Schema readSchema) {
+    this.readSchema = TypeConversion.toOrc(readSchema, new ColumnIdMap());
+    numFields = readSchema.columns().size();
+  }
+
+  private Converter[] buildConverters(final UnsafeRowWriter writer) {
+    final Converter[] converters = new Converter[numFields];
+    for(int c = 0; c < numFields; ++c) {
+      converters[c] = buildConverter(writer, readSchema.getChildren().get(c));
+    }
+    return converters;
+  }
+
+  @Override
+  public InternalRow read(VectorizedRowBatch batch, int row) {
+    final UnsafeRowWriter rowWriter = new UnsafeRowWriter(numFields, 
INITIAL_SIZE);
+    final Converter[] converters = buildConverters(rowWriter);
+
+    rowWriter.reset();
+    rowWriter.zeroOutNullBytes();
+    for(int c=0; c < batch.cols.length; ++c) {
+      converters[c].convert(rowWriter, c, batch.cols[c], row);
+    }
+    return rowWriter.getRow();
+  }
+
+  private static String rowToString(SpecializedGetters row, TypeDescription 
schema) {
+    final List<TypeDescription> children = schema.getChildren();
+    final StringBuilder rowBuilder = new StringBuilder("{");
+
+    for(int c = 0; c < children.size(); ++c) {
+      rowBuilder.append("\"");
+      rowBuilder.append(schema.getFieldNames().get(c));
+      rowBuilder.append("\": ");
+      rowBuilder.append(rowEntryToString(row, c, children.get(c)));
+      if (c != children.size() - 1) {
+        rowBuilder.append(", ");
+      }
+    }
+    rowBuilder.append("}");
+    return rowBuilder.toString();
+  }
+
+  private static String rowEntryToString(SpecializedGetters row, int ord, 
TypeDescription schema) {
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+        return Boolean.toString(row.getBoolean(ord));
+      case BYTE:
+        return Byte.toString(row.getByte(ord));
+      case SHORT:
+        return Short.toString(row.getShort(ord));
+      case INT:
+        return Integer.toString(row.getInt(ord));
+      case LONG:
+        return Long.toString(row.getLong(ord));
+      case FLOAT:
+        return Float.toString(row.getFloat(ord));
+      case DOUBLE:
+        return Double.toString(row.getDouble(ord));
+      case CHAR:
+      case VARCHAR:
+      case STRING:
+        return "\"" + row.getUTF8String(ord) + "\"";
+      case BINARY: {
+        byte[] bin = row.getBinary(ord);
+        final StringBuilder binStr;
+        if (bin == null) {
+          binStr = new StringBuilder("null");
+        } else {
+          binStr = new StringBuilder("[");
+          for (int i = 0; i < bin.length; ++i) {
+            if (i != 0) {
+              binStr.append(", ");
+            }
+            int v = bin[i] & 0xff;
+            if (v < 16) {
+              binStr.append("0");
+              binStr.append(Integer.toHexString(v));
+            } else {
+              binStr.append(Integer.toHexString(v));
+            }
+          }
+          binStr.append("]");
+        }
+        return binStr.toString();
+      }
+      case DECIMAL:
+        return row.getDecimal(ord, schema.getPrecision(), 
schema.getScale()).toString();
+      case DATE:
+        return "\"" + new DateWritable(row.getInt(ord)) + "\"";
+      case TIMESTAMP:
+        return "\"" + new Timestamp(row.getLong(ord)) + "\"";
+      case STRUCT:
+        return rowToString(row.getStruct(ord, schema.getChildren().size()), 
schema);
+      case LIST: {
+        TypeDescription child = schema.getChildren().get(0);
+        final StringBuilder listStr = new StringBuilder("[");
+        ArrayData list = row.getArray(ord);
+        for(int e=0; e < list.numElements(); ++e) {
+          if (e != 0) {
+            listStr.append(", ");
+          }
+          listStr.append(rowEntryToString(list, e, child));
+        }
+        listStr.append("]");
+        return listStr.toString();
+      }
+      case MAP: {
+        TypeDescription keyType = schema.getChildren().get(0);
+        TypeDescription valueType = schema.getChildren().get(1);
+        MapData map = row.getMap(ord);
+        ArrayData keys = map.keyArray();
+        ArrayData values = map.valueArray();
+        StringBuilder mapStr = new StringBuilder("[");
+        for(int e=0; e < map.numElements(); ++e) {
+          if (e != 0) {
+            mapStr.append(", ");
+          }
+          mapStr.append(rowEntryToString(keys, e, keyType));
+          mapStr.append(": ");
+          mapStr.append(rowEntryToString(values, e, valueType));
+        }
+        mapStr.append("]");
+        return mapStr.toString();
+      }
+      default:
+        throw new IllegalArgumentException("Unhandled type " + schema);
+    }
+  }
+
+  private static int getArrayElementSize(TypeDescription type) {
+    switch (type.getCategory()) {
+      case BOOLEAN:
+      case BYTE:
+        return 1;
+      case SHORT:
+        return 2;
+      case INT:
+      case FLOAT:
+        return 4;
+      default:
+        return 8;
+    }
+  }
+
+  /**
+   * The common interface for converting from a ORC ColumnVector to a Spark
+   * UnsafeRow. UnsafeRows need two different interfaces for writers and thus
+   * we have two methods the first is for structs (UnsafeRowWriter) and the
+   * second is for lists and maps (UnsafeArrayWriter). If Spark adds a common
+   * interface similar to SpecializedGetters we could that and a single set of
+   * methods.
+   */
+  interface Converter {
+    void convert(UnsafeRowWriter writer, int column, ColumnVector vector, int 
row);
+    void convert(UnsafeArrayWriter writer, int element, ColumnVector vector, 
int row);
+  }
+
+  private static class BooleanConverter implements Converter {
+    @Override
+    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector, int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNullAt(column);
+      } else {
+        writer.write(column, ((LongColumnVector) vector).vector[row] != 0);
+      }
+    }
+
+    @Override
+    public void convert(UnsafeArrayWriter writer, int element,
+                        ColumnVector vector, int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNull(element);
+      } else {
+        writer.write(element, ((LongColumnVector) vector).vector[row] != 0);
+      }
+    }
+  }
+
+  private static class ByteConverter implements Converter {
+    @Override
+    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector,
+                        int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNullAt(column);
+      } else {
+        writer.write(column, (byte) ((LongColumnVector) vector).vector[row]);
+      }
+    }
+
+    @Override
+    public void convert(UnsafeArrayWriter writer, int element,
+                        ColumnVector vector, int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNull(element);
+      } else {
+        writer.write(element, (byte) ((LongColumnVector) vector).vector[row]);
+      }
+    }
+  }
+
+  private static class ShortConverter implements Converter {
+    @Override
+    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector,
+                        int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNullAt(column);
+      } else {
+        writer.write(column, (short) ((LongColumnVector) vector).vector[row]);
+      }
+    }
+
+    @Override
+    public void convert(UnsafeArrayWriter writer, int element,
+                        ColumnVector vector, int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNull(element);
+      } else {
+        writer.write(element, (short) ((LongColumnVector) vector).vector[row]);
+      }
+    }
+  }
+
+  private static class IntConverter implements Converter {
+    @Override
+    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector,
+                        int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNullAt(column);
+      } else {
+        writer.write(column, (int) ((LongColumnVector) vector).vector[row]);
+      }
+    }
+
+    @Override
+    public void convert(UnsafeArrayWriter writer, int element,
+                        ColumnVector vector, int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNull(element);
+      } else {
+        writer.write(element, (int) ((LongColumnVector) vector).vector[row]);
+      }
+    }
+  }
+
+  private static class LongConverter implements Converter {
+    @Override
+    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector,
+                        int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNullAt(column);
+      } else {
+        writer.write(column, ((LongColumnVector) vector).vector[row]);
+      }
+    }
+
+    @Override
+    public void convert(UnsafeArrayWriter writer, int element,
+                        ColumnVector vector, int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNull(element);
+      } else {
+        writer.write(element, ((LongColumnVector) vector).vector[row]);
+      }
+    }
+  }
+
+  private static class FloatConverter implements Converter {
+    @Override
+    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector,
+                        int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNullAt(column);
+      } else {
+        writer.write(column, (float) ((DoubleColumnVector) 
vector).vector[row]);
+      }
+    }
+
+    @Override
+    public void convert(UnsafeArrayWriter writer, int element,
+                        ColumnVector vector, int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNull(element);
+      } else {
+        writer.write(element, (float) ((DoubleColumnVector) 
vector).vector[row]);
+      }
+    }
+  }
+
+  private static class DoubleConverter implements Converter {
+    @Override
+    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector,
+                        int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNullAt(column);
+      } else {
+        writer.write(column, ((DoubleColumnVector) vector).vector[row]);
+      }
+    }
+
+    @Override
+    public void convert(UnsafeArrayWriter writer, int element,
+                        ColumnVector vector, int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNull(element);
+      } else {
+        writer.write(element, ((DoubleColumnVector) vector).vector[row]);
+      }
+    }
+  }
+
+  private static class TimestampConverter implements Converter {
+
+    private long convert(TimestampColumnVector vector, int row) {
+      // compute microseconds past 1970.
+      return (vector.time[row]/1000) * 1_000_000 + vector.nanos[row] / 1000;
+    }
+
+    @Override
+    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector,
+                        int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNullAt(column);
+      } else {
+        writer.write(column, convert((TimestampColumnVector) vector, row));
+      }
+    }
+
+    @Override
+    public void convert(UnsafeArrayWriter writer, int element,
+                        ColumnVector vector, int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNull(element);
+      } else {
+        writer.write(element, convert((TimestampColumnVector) vector, row));
+      }
+    }
+  }
+
+  private static class BinaryConverter implements Converter {
+
+    @Override
+    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector, int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNullAt(column);
+      } else {
+        BytesColumnVector v = (BytesColumnVector) vector;
+        writer.write(column, v.vector[row], v.start[row], v.length[row]);
+      }
+    }
+
+    @Override
+    public void convert(UnsafeArrayWriter writer, int element, ColumnVector 
vector, int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNull(element);
+      } else {
+        final BytesColumnVector v = (BytesColumnVector) vector;
+        writer.write(element, v.vector[row], v.start[row], v.length[row]);
+      }
+    }
+  }
+
+  private static class Decimal18Converter implements Converter {
+    final int precision;
+    final int scale;
+
+    Decimal18Converter(int precision, int scale) {
+      this.precision = precision;
+      this.scale = scale;
+    }
+
+    @Override
+    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector,
+                        int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNullAt(column);
+      } else {
+        HiveDecimalWritable v = ((DecimalColumnVector) vector).vector[row];
+        writer.write(column,
+            new Decimal().set(v.serialize64(v.scale()), v.precision(), 
v.scale()),
+            precision, scale);
+      }
+    }
+
+    @Override
+    public void convert(UnsafeArrayWriter writer, int element,
+                        ColumnVector vector, int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNull(element);
+      } else {
+        HiveDecimalWritable v = ((DecimalColumnVector) vector).vector[row];
+        writer.write(element,
+            new Decimal().set(v.serialize64(v.scale()), v.precision(), 
v.scale()),
+            precision, scale);
+      }
+    }
+  }
+
+  private static class Decimal38Converter implements Converter {
+    final int precision;
+    final int scale;
+
+    Decimal38Converter(int precision, int scale) {
+      this.precision = precision;
+      this.scale = scale;
+    }
+
+    @Override
+    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector,
+                        int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNullAt(column);
+      } else {
+        BigDecimal v = ((DecimalColumnVector) vector).vector[row]
+            .getHiveDecimal().bigDecimalValue();
+        writer.write(column,
+            new Decimal().set(new scala.math.BigDecimal(v), precision, scale),
+            precision, scale);
+      }
+    }
+
+    @Override
+    public void convert(UnsafeArrayWriter writer, int element,
+                        ColumnVector vector, int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNull(element);
+      } else {
+        BigDecimal v = ((DecimalColumnVector) vector).vector[row]
+            .getHiveDecimal().bigDecimalValue();
+        writer.write(element,
+            new Decimal().set(new scala.math.BigDecimal(v), precision, scale),
+            precision, scale);
+      }
+    }
+  }
+
+  private static class StructConverter implements Converter {
+    private final Converter[] children;
+    private final UnsafeRowWriter childWriter;
+
+    StructConverter(final UnsafeWriter parentWriter, final TypeDescription 
schema) {
+      children = new Converter[schema.getChildren().size()];
+      for(int c=0; c < children.length; ++c) {
+        children[c] = buildConverter(parentWriter, 
schema.getChildren().get(c));
+      }
+      childWriter = new UnsafeRowWriter(parentWriter, children.length);
+    }
+
+    int writeStruct(StructColumnVector vector, int row) {
+      int start = childWriter.cursor();
+      childWriter.resetRowWriter();
+      for(int c=0; c < children.length; ++c) {
+        children[c].convert(childWriter, c, vector.fields[c], row);
+      }
+      return start;
+    }
+
+    @Override
+    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector, int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNullAt(column);
+      } else {
+        int start = writeStruct((StructColumnVector) vector, row);
+        writer.setOffsetAndSizeFromPreviousCursor(column, start);
+      }
+    }
+
+    @Override
+    public void convert(UnsafeArrayWriter writer, int element,
+                        ColumnVector vector, int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNull(element);
+      } else {
+        int start = writeStruct((StructColumnVector) vector, row);
+        writer.setOffsetAndSizeFromPreviousCursor(element, start);
+      }
+    }
+  }
+
+  private static class ListConverter implements Converter {
+    private final Converter children;
+    private final UnsafeArrayWriter childWriter;
+
+    ListConverter(final UnsafeWriter parentWriter, TypeDescription schema) {
+      TypeDescription child = schema.getChildren().get(0);
+      children = buildConverter(parentWriter, child);
+      childWriter = new UnsafeArrayWriter(parentWriter, 
getArrayElementSize(child));
+
+    }
+
+    int writeList(ListColumnVector v, int row) {
+      int offset = (int) v.offsets[row];
+      int length = (int) v.lengths[row];
+      int start = childWriter.cursor();
+      childWriter.initialize(length);
+      for(int c = 0; c < length; ++c) {
+        children.convert(childWriter, c, v.child, offset + c);
+      }
+      return start;
+    }
+
+    @Override
+    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector,
+                        int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNullAt(column);
+      } else {
+        int start = writeList((ListColumnVector) vector, row);
+        writer.setOffsetAndSizeFromPreviousCursor(column, start);
+      }
+    }
+
+    @Override
+    public void convert(UnsafeArrayWriter writer, int element,
+                        ColumnVector vector, int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNull(element);
+      } else {
+        int start = writeList((ListColumnVector) vector, row);
+        writer.setOffsetAndSizeFromPreviousCursor(element, start);
+      }
+    }
+  }
+
+  private static class MapConverter implements Converter {
+    private final Converter keyConvert;
+    private final Converter valueConvert;
+
+    private final UnsafeArrayWriter keyWriter;
+    private final UnsafeArrayWriter valueWriter;
+
+    private final int keySize;
+    private final int valueSize;
+
+    private final int KEY_SIZE_BYTES = 8;
+
+    MapConverter(final UnsafeWriter parentWriter, TypeDescription schema) {
+      final TypeDescription keyType = schema.getChildren().get(0);
+      final TypeDescription valueType = schema.getChildren().get(1);
+      keyConvert = buildConverter(parentWriter, keyType);
+      keySize = getArrayElementSize(keyType);
+      keyWriter = new UnsafeArrayWriter(parentWriter, keySize);
+      valueConvert = buildConverter(parentWriter, valueType);
+      valueSize = getArrayElementSize(valueType);
+      valueWriter = new UnsafeArrayWriter(parentWriter, valueSize);
+    }
+
+    int writeMap(MapColumnVector v, int row) {
+      final int offset = (int) v.offsets[row];
+      final int length = (int) v.lengths[row];
+      final int start = keyWriter.cursor();
+
+      // save room for the key size
+      keyWriter.grow(KEY_SIZE_BYTES);
+      keyWriter.increaseCursor(KEY_SIZE_BYTES);
+
+      // serialize the keys
+      keyWriter.initialize(length);
+      for(int c = 0; c < length; ++c) {
+        keyConvert.convert(keyWriter, c, v.keys, offset + c);
+      }
+      // store the serialized size of the keys
+      Platform.putLong(keyWriter.getBuffer(), start,
+                keyWriter.cursor() - start - KEY_SIZE_BYTES);
+
+      // serialize the values
+      valueWriter.initialize(length);
+      for(int c = 0; c < length; ++c) {
+        valueConvert.convert(valueWriter, c, v.values, offset + c);
+      }
+      return start;
+    }
+
+    @Override
+    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector, int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNullAt(column);
+      } else {
+        int start = writeMap((MapColumnVector) vector, row);
+        writer.setOffsetAndSizeFromPreviousCursor(column, start);
+      }
+    }
+
+    @Override
+    public void convert(UnsafeArrayWriter writer, int element, ColumnVector 
vector, int row) {
+      if (vector.isRepeating) {
+        row = 0;
+      }
+      if (!vector.noNulls && vector.isNull[row]) {
+        writer.setNull(element);
+      } else {
+        int start = writeMap((MapColumnVector) vector, row);
+        writer.setOffsetAndSizeFromPreviousCursor(element, start);
+      }
+    }
+  }
+
+  static Converter buildConverter(final UnsafeWriter writer, final 
TypeDescription schema) {
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+        return new BooleanConverter();
+      case BYTE:
+        return new ByteConverter();
+      case SHORT:
+        return new ShortConverter();
+      case DATE:
+      case INT:
+        return new IntConverter();
+      case LONG:
+        return new LongConverter();
+      case FLOAT:
+        return new FloatConverter();
+      case DOUBLE:
+        return new DoubleConverter();
+      case TIMESTAMP:
+        return new TimestampConverter();
+      case DECIMAL:
+        if (schema.getPrecision() <= Decimal.MAX_LONG_DIGITS()) {
+          return new Decimal18Converter(schema.getPrecision(), 
schema.getScale());
+        } else {
+          return new Decimal38Converter(schema.getPrecision(), 
schema.getScale());
+        }
+      case BINARY:
+      case STRING:
+      case CHAR:
+      case VARCHAR:
+        return new BinaryConverter();
+      case STRUCT:
+        return new StructConverter(writer, schema);
+      case LIST:
+        return new ListConverter(writer, schema);
+      case MAP:
+        return new MapConverter(writer, schema);
+      default:
+        throw new IllegalArgumentException("Unhandled type " + schema);
+    }
+  }
+}
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java 
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
new file mode 100644
index 0000000..80e2878
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data;
+
+import org.apache.iceberg.orc.OrcValueWriter;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.common.type.HiveDecimal;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+
+import java.util.List;
+
+/**
+ * This class acts as an adaptor from an OrcFileAppender to a
+ * FileAppender&lt;InternalRow&gt;.
+ */
+public class SparkOrcWriter implements OrcValueWriter<InternalRow> {
+
+  private final Converter[] converters;
+
+  public SparkOrcWriter(TypeDescription schema) {
+    converters = buildConverters(schema);
+  }
+
+  @Override
+  public void write(InternalRow value, VectorizedRowBatch output) {
+    int row = output.size++;
+    for(int c=0; c < converters.length; ++c) {
+      converters[c].addValue(row, c, value, output.cols[c]);
+    }
+  }
+
+  /**
+   * The interface for the conversion from Spark's SpecializedGetters to
+   * ORC's ColumnVectors.
+   */
+  interface Converter {
+    /**
+     * Take a value from the Spark data value and add it to the ORC output.
+     * @param rowId the row in the ColumnVector
+     * @param column either the column number or element number
+     * @param data either an InternalRow or ArrayData
+     * @param output the ColumnVector to put the value into
+     */
+    void addValue(int rowId, int column, SpecializedGetters data,
+                  ColumnVector output);
+  }
+
+  static class BooleanConverter implements Converter {
+    public void addValue(int rowId, int column, SpecializedGetters data,
+                         ColumnVector output) {
+      if (data.isNullAt(column)) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data.getBoolean(column) ? 
1 : 0;
+      }
+    }
+  }
+
+  static class ByteConverter implements Converter {
+    public void addValue(int rowId, int column, SpecializedGetters data,
+                         ColumnVector output) {
+      if (data.isNullAt(column)) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data.getByte(column);
+      }
+    }
+  }
+
+  static class ShortConverter implements Converter {
+    public void addValue(int rowId, int column, SpecializedGetters data,
+                         ColumnVector output) {
+      if (data.isNullAt(column)) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data.getShort(column);
+      }
+    }
+  }
+
+  static class IntConverter implements Converter {
+    public void addValue(int rowId, int column, SpecializedGetters data,
+                         ColumnVector output) {
+      if (data.isNullAt(column)) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data.getInt(column);
+      }
+    }
+  }
+
+  static class LongConverter implements Converter {
+    public void addValue(int rowId, int column, SpecializedGetters data,
+                         ColumnVector output) {
+      if (data.isNullAt(column)) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data.getLong(column);
+      }
+    }
+  }
+
+  static class FloatConverter implements Converter {
+    public void addValue(int rowId, int column, SpecializedGetters data,
+                         ColumnVector output) {
+      if (data.isNullAt(column)) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((DoubleColumnVector) output).vector[rowId] = data.getFloat(column);
+      }
+    }
+  }
+
+  static class DoubleConverter implements Converter {
+    public void addValue(int rowId, int column, SpecializedGetters data,
+                         ColumnVector output) {
+      if (data.isNullAt(column)) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((DoubleColumnVector) output).vector[rowId] = data.getDouble(column);
+      }
+    }
+  }
+
+  static class StringConverter implements Converter {
+    public void addValue(int rowId, int column, SpecializedGetters data,
+                         ColumnVector output) {
+      if (data.isNullAt(column)) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        byte[] value = data.getUTF8String(column).getBytes();
+        ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
+      }
+    }
+  }
+
+  static class BytesConverter implements Converter {
+    public void addValue(int rowId, int column, SpecializedGetters data,
+                         ColumnVector output) {
+      if (data.isNullAt(column)) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        // getBinary always makes a copy, so we don't need to worry about it
+        // being changed behind our back.
+        byte[] value = data.getBinary(column);
+        ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
+      }
+    }
+  }
+
+  static class TimestampConverter implements Converter {
+
+    public void addValue(int rowId, int column, SpecializedGetters data,
+                         ColumnVector output) {
+      if (data.isNullAt(column)) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        TimestampColumnVector cv = (TimestampColumnVector) output;
+        long micros = data.getLong(column);
+        cv.time[rowId] = (micros / 1_000_000) * 1000;
+        int nanos = (int) (micros % 1_000_000) * 1000;
+        if (nanos < 0) {
+          nanos += 1_000_000_000;
+         cv.time[rowId] -= 1000;
+        }
+        cv.nanos[rowId] = nanos;
+      }
+    }
+  }
+
+  static class Decimal18Converter implements Converter {
+    private final int precision;
+    private final int scale;
+
+    Decimal18Converter(TypeDescription schema) {
+      precision = schema.getPrecision();
+      scale = schema.getScale();
+    }
+
+    public void addValue(int rowId, int column, SpecializedGetters data,
+                         ColumnVector output) {
+      if (data.isNullAt(column)) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((DecimalColumnVector) output).vector[rowId].setFromLongAndScale(
+            data.getDecimal(column, precision, scale).toUnscaledLong(), scale);
+      }
+    }
+  }
+
+  static class Decimal38Converter implements Converter {
+    private final int precision;
+    private final int scale;
+
+    Decimal38Converter(TypeDescription schema) {
+      precision = schema.getPrecision();
+      scale = schema.getScale();
+    }
+    public void addValue(int rowId, int column, SpecializedGetters data,
+                         ColumnVector output) {
+      if (data.isNullAt(column)) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((DecimalColumnVector) output).vector[rowId].set(
+            HiveDecimal.create(data.getDecimal(column, precision, scale)
+                .toJavaBigDecimal()));
+      }
+    }
+  }
+
+  static class StructConverter implements Converter {
+    private final Converter[] children;
+
+    StructConverter(TypeDescription schema) {
+      children = new Converter[schema.getChildren().size()];
+      for(int c=0; c < children.length; ++c) {
+        children[c] = buildConverter(schema.getChildren().get(c));
+      }
+    }
+
+    public void addValue(int rowId, int column, SpecializedGetters data,
+                         ColumnVector output) {
+      if (data.isNullAt(column)) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        InternalRow value = data.getStruct(column, children.length);
+        StructColumnVector cv = (StructColumnVector) output;
+        for(int c=0; c < children.length; ++c) {
+          children[c].addValue(rowId, c, value, cv.fields[c]);
+        }
+      }
+    }
+  }
+
+  static class ListConverter implements Converter {
+    private final Converter children;
+
+    ListConverter(TypeDescription schema) {
+      children = buildConverter(schema.getChildren().get(0));
+    }
+
+    public void addValue(int rowId, int column, SpecializedGetters data,
+                         ColumnVector output) {
+      if (data.isNullAt(column)) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ArrayData value = data.getArray(column);
+        ListColumnVector cv = (ListColumnVector) output;
+        // record the length and start of the list elements
+        cv.lengths[rowId] = value.numElements();
+        cv.offsets[rowId] = cv.childCount;
+        cv.childCount += cv.lengths[rowId];
+        // make sure the child is big enough
+        cv.child.ensureSize(cv.childCount, true);
+        // Add each element
+        for(int e=0; e < cv.lengths[rowId]; ++e) {
+          children.addValue((int) (e + cv.offsets[rowId]), e, value, cv.child);
+        }
+      }
+    }
+  }
+
+  static class MapConverter implements Converter {
+    private final Converter keyConverter;
+    private final Converter valueConverter;
+
+    MapConverter(TypeDescription schema) {
+      keyConverter = buildConverter(schema.getChildren().get(0));
+      valueConverter = buildConverter(schema.getChildren().get(1));
+    }
+
+    public void addValue(int rowId, int column, SpecializedGetters data,
+                         ColumnVector output) {
+      if (data.isNullAt(column)) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        MapData map = data.getMap(column);
+        ArrayData key = map.keyArray();
+        ArrayData value = map.valueArray();
+        MapColumnVector cv = (MapColumnVector) output;
+        // record the length and start of the list elements
+        cv.lengths[rowId] = value.numElements();
+        cv.offsets[rowId] = cv.childCount;
+        cv.childCount += cv.lengths[rowId];
+        // make sure the child is big enough
+        cv.keys.ensureSize(cv.childCount, true);
+        cv.values.ensureSize(cv.childCount, true);
+        // Add each element
+        for(int e=0; e < cv.lengths[rowId]; ++e) {
+          int pos = (int)(e + cv.offsets[rowId]);
+          keyConverter.addValue(pos, e, key, cv.keys);
+          valueConverter.addValue(pos, e, value, cv.values);
+        }
+      }
+    }
+  }
+
+  private static Converter buildConverter(TypeDescription schema) {
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+        return new BooleanConverter();
+      case BYTE:
+        return new ByteConverter();
+      case SHORT:
+        return new ShortConverter();
+      case DATE:
+      case INT:
+        return new IntConverter();
+      case LONG:
+        return new LongConverter();
+      case FLOAT:
+        return new FloatConverter();
+      case DOUBLE:
+        return new DoubleConverter();
+      case BINARY:
+        return new BytesConverter();
+      case STRING:
+      case CHAR:
+      case VARCHAR:
+        return new StringConverter();
+      case DECIMAL:
+        return schema.getPrecision() <= 18
+            ? new Decimal18Converter(schema)
+            : new Decimal38Converter(schema);
+      case TIMESTAMP:
+        return new TimestampConverter();
+      case STRUCT:
+        return new StructConverter(schema);
+      case LIST:
+        return new ListConverter(schema);
+      case MAP:
+        return new MapConverter(schema);
+    }
+    throw new IllegalArgumentException("Unhandled type " + schema);
+  }
+
+  private static Converter[] buildConverters(TypeDescription schema) {
+    if (schema.getCategory() != TypeDescription.Category.STRUCT) {
+      throw new IllegalArgumentException("Top level must be a struct " + 
schema);
+    }
+    List<TypeDescription> children = schema.getChildren();
+    Converter[] result = new Converter[children.size()];
+    for(int c=0; c < children.size(); ++c) {
+      result[c] = buildConverter(children.get(c));
+    }
+    return result;
+  }
+
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java 
b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index a74d9cd..63a33f9 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -51,6 +51,7 @@ import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.spark.SparkFilters;
 import org.apache.iceberg.spark.SparkSchemaUtil;
@@ -59,6 +60,7 @@ import org.apache.iceberg.spark.data.SparkParquetReaders;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.spark.data.SparkOrcReader;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.Attribute;
 import org.apache.spark.sql.catalyst.expressions.AttributeReference;
@@ -433,6 +435,10 @@ class Reader implements DataSourceReader, 
SupportsPushDownFilters, SupportsPushD
           iter = newAvroIterable(location, task, readSchema);
           break;
 
+        case ORC:
+          iter = newOrcIterable(location, task, readSchema);
+          break;
+
         default:
           throw new UnsupportedOperationException(
               "Cannot read unknown format: " + task.file().format());
@@ -465,6 +471,17 @@ class Reader implements DataSourceReader, 
SupportsPushDownFilters, SupportsPushD
           .caseSensitive(caseSensitive)
           .build();
     }
+
+    private CloseableIterable<InternalRow> newOrcIterable(InputFile location,
+                                                          FileScanTask task,
+                                                          Schema readSchema) {
+      return ORC.read(location)
+          .schema(readSchema)
+          .split(task.start(), task.length())
+          .createReaderFunc(SparkOrcReader::new)
+          .caseSensitive(caseSensitive)
+          .build();
+    }
   }
 
   private static class PartitionRowConverter implements Function<StructLike, 
InternalRow> {
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java 
b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index a760455..69d0d84 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -52,6 +52,7 @@ import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.types.MapType;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.BinaryType;
 import org.apache.spark.unsafe.types.UTF8String;
 import org.junit.Assert;
 import scala.collection.Seq;
@@ -594,6 +595,8 @@ public class TestHelpers {
           actual instanceof MapData);
       assertEquals(context, (MapType) type, (MapData) expected, (MapData) 
actual);
 
+    } else if (type instanceof BinaryType) {
+      assertEqualBytes(context, (byte[]) expected, (byte[]) actual);
     } else {
       Assert.assertEquals("Value should match expected: " + context, expected, 
actual);
     }
diff --git 
a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java 
b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java
new file mode 100644
index 0000000..1a20ff8
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data;
+
+import static org.apache.iceberg.spark.data.TestHelpers.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.orc.ORC;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.junit.Assert;
+
+public class TestSparkOrcReader extends AvroDataTest {
+  @Override
+  protected void writeAndValidate(Schema schema) throws IOException {
+    final Iterable<InternalRow> expected = RandomData
+        .generateSpark(schema, 100, 0L);
+
+    final File testFile = temp.newFile();
+    Assert.assertTrue("Delete should succeed", testFile.delete());
+
+    try (FileAppender<InternalRow> writer = 
ORC.write(Files.localOutput(testFile))
+        .createWriterFunc(SparkOrcWriter::new)
+        .schema(schema)
+        .build()) {
+      writer.addAll(expected);
+    }
+
+    try (CloseableIterable<InternalRow> reader = 
ORC.read(Files.localInput(testFile))
+        .schema(schema)
+        .createReaderFunc(SparkOrcReader::new)
+        .build()) {
+      final Iterator<InternalRow> actualRows = reader.iterator();
+      final Iterator<InternalRow> expectedRows = expected.iterator();
+      while (expectedRows.hasNext()) {
+        Assert.assertTrue("Should have expected number of rows", 
actualRows.hasNext());
+        assertEquals(schema, expectedRows.next(), actualRows.next());
+      }
+      Assert.assertFalse("Should not have extra rows", actualRows.hasNext());
+    }
+  }
+}

Reply via email to