This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 71a7ab9  MR: Add InputFormat (#843)
71a7ab9 is described below

commit 71a7ab986a003fab614d076cffb5a4580062806e
Author: Ratandeep Ratti <[email protected]>
AuthorDate: Mon Apr 6 16:02:56 2020 -0700

    MR: Add InputFormat (#843)
---
 .gitignore                                         |   3 +
 build.gradle                                       |  18 +
 .../main/java/org/apache/iceberg/hadoop/Util.java  |  29 +-
 .../org/apache/iceberg/mr/SerializationUtil.java   |  74 +++
 .../iceberg/mr/mapreduce/IcebergInputFormat.java   | 570 +++++++++++++++++++++
 .../mr/mapreduce/TestIcebergInputFormat.java       | 478 +++++++++++++++++
 settings.gradle                                    |   2 +
 .../org/apache/iceberg/spark/source/Reader.java    |  20 +-
 8 files changed, 1175 insertions(+), 19 deletions(-)

diff --git a/.gitignore b/.gitignore
index 511baa5..13767d6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -35,3 +35,6 @@ spark/tmp/
 .project
 .settings
 bin/
+
+# Hive/metastore files
+metastore_db/
diff --git a/build.gradle b/build.gradle
index ed8a73d..3820e16 100644
--- a/build.gradle
+++ b/build.gradle
@@ -220,6 +220,24 @@ project(':iceberg-hive') {
   }
 }
 
+project(':iceberg-mr') {
+  dependencies {
+    compile project(':iceberg-api')
+    compile project(':iceberg-core')
+    compile project(':iceberg-orc')
+    compile project(':iceberg-parquet')
+    compile project(':iceberg-data')
+
+    compileOnly("org.apache.hadoop:hadoop-client") {
+      exclude group: 'org.apache.avro', module: 'avro'
+    }
+
+    testCompile project(path: ':iceberg-data', configuration: 'testArtifacts')
+    testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
+    testCompile project(path: ':iceberg-core', configuration: 'testArtifacts')
+  }
+}
+
 project(':iceberg-orc') {
   dependencies {
     compile project(':iceberg-api')
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/Util.java 
b/core/src/main/java/org/apache/iceberg/hadoop/Util.java
index a04e5a1..d61fa4d 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/Util.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/Util.java
@@ -19,13 +19,23 @@
 
 package org.apache.iceberg.hadoop;
 
+import com.google.common.collect.Sets;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Util {
+  private static final Logger LOG = LoggerFactory.getLogger(Util.class);
 
-class Util {
   private Util() {
   }
 
@@ -36,4 +46,21 @@ class Util {
       throw new RuntimeIOException(e, "Failed to get file system for path: 
%s", path);
     }
   }
+
+  public static String[] blockLocations(CombinedScanTask task, Configuration 
conf) {
+    Set<String> locationSets = Sets.newHashSet();
+    for (FileScanTask f : task.files()) {
+      Path path = new Path(f.file().path().toString());
+      try {
+        FileSystem fs = path.getFileSystem(conf);
+        for (BlockLocation b : fs.getFileBlockLocations(path, f.start(), 
f.length())) {
+          locationSets.addAll(Arrays.asList(b.getHosts()));
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Failed to get block locations for path {}", path, ioe);
+      }
+    }
+
+    return locationSets.toArray(new String[0]);
+  }
 }
diff --git a/mr/src/main/java/org/apache/iceberg/mr/SerializationUtil.java 
b/mr/src/main/java/org/apache/iceberg/mr/SerializationUtil.java
new file mode 100644
index 0000000..fcd8276
--- /dev/null
+++ b/mr/src/main/java/org/apache/iceberg/mr/SerializationUtil.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+
+public class SerializationUtil {
+
+  private SerializationUtil() {
+  }
+
+  public static byte[] serializeToBytes(Object obj) {
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+      oos.writeObject(obj);
+      return baos.toByteArray();
+    } catch (IOException e) {
+      throw new RuntimeIOException("Failed to serialize object", e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <T> T deserializeFromBytes(byte[] bytes) {
+    if (bytes == null) {
+      return null;
+    }
+
+    try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+        ObjectInputStream ois = new ObjectInputStream(bais)) {
+      return (T) ois.readObject();
+    } catch (IOException e) {
+      throw new RuntimeIOException("Failed to deserialize object", e);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException("Could not read object ", e);
+    }
+  }
+
+  public static String serializeToBase64(Object obj) {
+    byte[] bytes = serializeToBytes(obj);
+    return new String(Base64.getMimeEncoder().encode(bytes), 
StandardCharsets.UTF_8);
+  }
+
+  public static <T> T deserializeFromBase64(String base64) {
+    if (base64 == null) {
+      return null;
+    }
+    byte[] bytes = 
Base64.getMimeDecoder().decode(base64.getBytes(StandardCharsets.UTF_8));
+    return deserializeFromBytes(bytes);
+  }
+}
diff --git 
a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java 
b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
new file mode 100644
index 0000000..4f31abc
--- /dev/null
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapreduce;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mr.SerializationUtil;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generic Mrv2 InputFormat API for Iceberg.
+ * @param <T> T is the in memory data model which can either be Pig tuples, 
Hive rows. Default is Iceberg records
+ */
+public class IcebergInputFormat<T> extends InputFormat<Void, T> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergInputFormat.class);
+
+  static final String AS_OF_TIMESTAMP = "iceberg.mr.as.of.time";
+  static final String CASE_SENSITIVE = "iceberg.mr.case.sensitive";
+  static final String FILTER_EXPRESSION = "iceberg.mr.filter.expression";
+  static final String IN_MEMORY_DATA_MODEL = "iceberg.mr.in.memory.data.model";
+  static final String READ_SCHEMA = "iceberg.mr.read.schema";
+  static final String REUSE_CONTAINERS = "iceberg.mr.reuse.containers";
+  static final String SNAPSHOT_ID = "iceberg.mr.snapshot.id";
+  static final String SPLIT_SIZE = "iceberg.mr.split.size";
+  static final String TABLE_PATH = "iceberg.mr.table.path";
+  static final String TABLE_SCHEMA = "iceberg.mr.table.schema";
+  static final String LOCALITY = "iceberg.mr.locality";
+  static final String CATALOG = "iceberg.mr.catalog";
+  static final String SKIP_RESIDUAL_FILTERING = "skip.residual.filtering";
+
+  private transient List<InputSplit> splits;
+
+  private enum InMemoryDataModel {
+    PIG,
+    HIVE,
+    GENERIC // Default data model is of Iceberg Generics
+  }
+
+  /**
+   * Configures the {@code Job} to use the {@code IcebergInputFormat} and
+   * returns a helper to add further configuration.
+   *
+   * @param job the {@code Job} to configure
+   */
+  public static ConfigBuilder configure(Job job) {
+    job.setInputFormatClass(IcebergInputFormat.class);
+    return new ConfigBuilder(job.getConfiguration());
+  }
+
+  public static class ConfigBuilder {
+    private final Configuration conf;
+
+    public ConfigBuilder(Configuration conf) {
+      this.conf = conf;
+      // defaults
+      conf.setEnum(IN_MEMORY_DATA_MODEL, InMemoryDataModel.GENERIC);
+      conf.setBoolean(SKIP_RESIDUAL_FILTERING, false);
+      conf.setBoolean(CASE_SENSITIVE, true);
+      conf.setBoolean(REUSE_CONTAINERS, false);
+      conf.setBoolean(LOCALITY, false);
+    }
+
+    public ConfigBuilder readFrom(String path) {
+      conf.set(TABLE_PATH, path);
+      Table table = findTable(conf);
+      conf.set(TABLE_SCHEMA, SchemaParser.toJson(table.schema()));
+      return this;
+    }
+
+    public ConfigBuilder filter(Expression expression) {
+      conf.set(FILTER_EXPRESSION, 
SerializationUtil.serializeToBase64(expression));
+      return this;
+    }
+
+    public ConfigBuilder project(Schema schema) {
+      conf.set(READ_SCHEMA, SchemaParser.toJson(schema));
+      return this;
+    }
+
+    public ConfigBuilder reuseContainers(boolean reuse) {
+      conf.setBoolean(REUSE_CONTAINERS, reuse);
+      return this;
+    }
+
+    public ConfigBuilder caseSensitive(boolean caseSensitive) {
+      conf.setBoolean(CASE_SENSITIVE, caseSensitive);
+      return this;
+    }
+
+    public ConfigBuilder snapshotId(long snapshotId) {
+      conf.setLong(SNAPSHOT_ID, snapshotId);
+      return this;
+    }
+
+    public ConfigBuilder asOfTime(long asOfTime) {
+      conf.setLong(AS_OF_TIMESTAMP, asOfTime);
+      return this;
+    }
+
+    public ConfigBuilder splitSize(long splitSize) {
+      conf.setLong(SPLIT_SIZE, splitSize);
+      return this;
+    }
+
+    /**
+     * If this API is called. The input splits
+     * constructed will have host location information
+     */
+    public ConfigBuilder preferLocality() {
+      conf.setBoolean(LOCALITY, true);
+      return this;
+    }
+
+    public ConfigBuilder catalogFunc(Class<? extends Function<Configuration, 
Catalog>> catalogFuncClass) {
+      conf.setClass(CATALOG, catalogFuncClass, Function.class);
+      return this;
+    }
+
+    public ConfigBuilder useHiveRows() {
+      conf.set(IN_MEMORY_DATA_MODEL, InMemoryDataModel.HIVE.name());
+      return this;
+    }
+
+    public ConfigBuilder usePigTuples() {
+      conf.set(IN_MEMORY_DATA_MODEL, InMemoryDataModel.PIG.name());
+      return this;
+    }
+
+    /**
+     * Compute platforms pass down filters to data sources. If the data source 
cannot apply some filters, or only
+     * partially applies the filter, it will return the residual filter back. 
If the platform can correctly apply
+     * the residual filters, then it should call this api. Otherwise the 
current api will throw an exception if the
+     * passed in filter is not completely satisfied.
+     */
+    public ConfigBuilder skipResidualFiltering() {
+      conf.setBoolean(SKIP_RESIDUAL_FILTERING, true);
+      return this;
+    }
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context) {
+    if (splits != null) {
+      LOG.info("Returning cached splits: {}", splits.size());
+      return splits;
+    }
+
+    Configuration conf = context.getConfiguration();
+    Table table = findTable(conf);
+    TableScan scan = table.newScan()
+                          .caseSensitive(conf.getBoolean(CASE_SENSITIVE, 
true));
+    long snapshotId = conf.getLong(SNAPSHOT_ID, -1);
+    if (snapshotId != -1) {
+      scan = scan.useSnapshot(snapshotId);
+    }
+    long asOfTime = conf.getLong(AS_OF_TIMESTAMP, -1);
+    if (asOfTime != -1) {
+      scan = scan.asOfTime(asOfTime);
+    }
+    long splitSize = conf.getLong(SPLIT_SIZE, 0);
+    if (splitSize > 0) {
+      scan = scan.option(TableProperties.SPLIT_SIZE, 
String.valueOf(splitSize));
+    }
+    String schemaStr = conf.get(READ_SCHEMA);
+    if (schemaStr != null) {
+      scan.project(SchemaParser.fromJson(schemaStr));
+    }
+
+    // TODO add a filter parser to get rid of Serialization
+    Expression filter = 
SerializationUtil.deserializeFromBase64(conf.get(FILTER_EXPRESSION));
+    if (filter != null) {
+      scan = scan.filter(filter);
+    }
+
+    splits = Lists.newArrayList();
+    boolean applyResidualFiltering = !conf.getBoolean(SKIP_RESIDUAL_FILTERING, 
false);
+    try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) 
{
+      tasksIterable.forEach(task -> {
+        if (applyResidualFiltering) {
+          //TODO: We do not support residual evaluation yet
+          checkResiduals(task);
+        }
+        splits.add(new IcebergSplit(conf, task));
+      });
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to close table scan: %s", scan);
+    }
+
+    return splits;
+  }
+
+  private static void checkResiduals(CombinedScanTask task) {
+    task.files().forEach(fileScanTask -> {
+      Expression residual = fileScanTask.residual();
+      if (residual != null && !residual.equals(Expressions.alwaysTrue())) {
+        throw new UnsupportedOperationException(
+            String.format(
+                "Filter expression %s is not completely satisfied. Additional 
rows " +
+                    "can be returned not satisfied by the filter expression", 
residual));
+      }
+    });
+  }
+
+  @Override
+  public RecordReader<Void, T> createRecordReader(InputSplit split, 
TaskAttemptContext context) {
+    return new IcebergRecordReader<>();
+  }
+
+  private static final class IcebergRecordReader<T> extends RecordReader<Void, 
T> {
+    private TaskAttemptContext context;
+    private Schema tableSchema;
+    private Schema expectedSchema;
+    private boolean reuseContainers;
+    private boolean caseSensitive;
+    private InMemoryDataModel inMemoryDataModel;
+    private Map<String, Integer> namesToPos;
+    private Iterator<FileScanTask> tasks;
+    private T currentRow;
+    private Iterator<T> currentIterator;
+    private Closeable currentCloseable;
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext newContext) {
+      Configuration conf = newContext.getConfiguration();
+      // For now IcebergInputFormat does its own split planning and does not 
accept FileSplit instances
+      CombinedScanTask task = ((IcebergSplit) split).task;
+      this.context = newContext;
+      this.tasks = task.files().iterator();
+      this.tableSchema = SchemaParser.fromJson(conf.get(TABLE_SCHEMA));
+      String readSchemaStr = conf.get(READ_SCHEMA);
+      this.expectedSchema = readSchemaStr != null ? 
SchemaParser.fromJson(readSchemaStr) : tableSchema;
+      this.namesToPos = buildNameToPos(expectedSchema);
+      this.reuseContainers = conf.getBoolean(REUSE_CONTAINERS, false);
+      this.caseSensitive = conf.getBoolean(CASE_SENSITIVE, true);
+      this.inMemoryDataModel = conf.getEnum(IN_MEMORY_DATA_MODEL, 
InMemoryDataModel.GENERIC);
+      this.currentIterator = open(tasks.next());
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException {
+      while (true) {
+        if (currentIterator.hasNext()) {
+          currentRow = currentIterator.next();
+          return true;
+        } else if (tasks.hasNext()) {
+          currentCloseable.close();
+          currentIterator = open(tasks.next());
+        } else {
+          return false;
+        }
+      }
+    }
+
+    @Override
+    public Void getCurrentKey() {
+      return null;
+    }
+
+    @Override
+    public T getCurrentValue() {
+      return currentRow;
+    }
+
+    @Override
+    public float getProgress() {
+      // TODO: We could give a more accurate progress based on records read 
from the file. Context.getProgress does not
+      // have enough information to give an accurate progress value. This 
isn't that easy, since we don't know how much
+      // of the input split has been processed and we are pushing filters into 
Parquet and ORC. But we do know when a
+      // file is opened and could count the number of rows returned, so we can 
estimate. And we could also add a row
+      // count to the readers so that we can get an accurate count of rows 
that have been either returned or filtered
+      // out.
+      return context.getProgress();
+    }
+
+    @Override
+    public void close() throws IOException {
+      currentCloseable.close();
+    }
+
+    private static Map<String, Integer> buildNameToPos(Schema expectedSchema) {
+      Map<String, Integer> nameToPos = Maps.newHashMap();
+      for (int pos = 0; pos < expectedSchema.asStruct().fields().size(); 
pos++) {
+        Types.NestedField field = expectedSchema.asStruct().fields().get(pos);
+        nameToPos.put(field.name(), pos);
+      }
+      return nameToPos;
+    }
+
+    private Iterator<T> open(FileScanTask currentTask) {
+      DataFile file = currentTask.file();
+      // schema of rows returned by readers
+      PartitionSpec spec = currentTask.spec();
+      Set<Integer> idColumns =  Sets.intersection(spec.identitySourceIds(), 
TypeUtil.getProjectedIds(expectedSchema));
+      boolean hasJoinedPartitionColumns = !idColumns.isEmpty();
+
+      if (hasJoinedPartitionColumns) {
+        Schema readDataSchema = TypeUtil.selectNot(expectedSchema, idColumns);
+        Schema identityPartitionSchema = TypeUtil.select(expectedSchema, 
idColumns);
+        return Iterators.transform(
+            open(currentTask, readDataSchema),
+            row -> withIdentityPartitionColumns(row, identityPartitionSchema, 
spec, file.partition()));
+      } else {
+        return open(currentTask, expectedSchema);
+      }
+    }
+
+    private Iterator<T> open(FileScanTask currentTask, Schema readSchema) {
+      DataFile file = currentTask.file();
+      // TODO we should make use of FileIO to create inputFile
+      InputFile inputFile = HadoopInputFile.fromLocation(file.path(), 
context.getConfiguration());
+      CloseableIterable<T> iterable;
+      switch (file.format()) {
+        case AVRO:
+          iterable = newAvroIterable(inputFile, currentTask, readSchema);
+          break;
+        case ORC:
+          iterable = newOrcIterable(inputFile, currentTask, readSchema);
+          break;
+        case PARQUET:
+          iterable = newParquetIterable(inputFile, currentTask, readSchema);
+          break;
+        default:
+          throw new UnsupportedOperationException(
+              String.format("Cannot read %s file: %s", file.format().name(), 
file.path()));
+      }
+      currentCloseable = iterable;
+      //TODO: Apply residual filtering before returning the iterator
+      return iterable.iterator();
+    }
+
+    @SuppressWarnings("unchecked")
+    private T withIdentityPartitionColumns(
+        T row, Schema identityPartitionSchema, PartitionSpec spec, StructLike 
partition) {
+      switch (inMemoryDataModel) {
+        case PIG:
+        case HIVE:
+          throw new UnsupportedOperationException(
+              "Adding partition columns to Pig and Hive data model are not 
supported yet");
+        case GENERIC:
+          return (T) withIdentityPartitionColumns((Record) row, 
identityPartitionSchema, spec, partition);
+      }
+      return row;
+    }
+
+    private Record withIdentityPartitionColumns(
+        Record record, Schema identityPartitionSchema, PartitionSpec spec, 
StructLike partitionTuple) {
+      List<PartitionField> partitionFields = spec.fields();
+      List<Types.NestedField> identityColumns = 
identityPartitionSchema.columns();
+      GenericRecord row = GenericRecord.create(expectedSchema.asStruct());
+      namesToPos.forEach((name, pos) -> {
+        Object field = record.getField(name);
+        if (field != null) {
+          row.set(pos, field);
+        }
+
+        // if the current name, pos points to an identity partition column, we 
set the
+        // column at pos correctly by reading the corresponding value from 
partitionTuple`
+        for (int i = 0; i < identityColumns.size(); i++) {
+          Types.NestedField identityColumn = identityColumns.get(i);
+          for (int j = 0; j < partitionFields.size(); j++) {
+            PartitionField partitionField = partitionFields.get(j);
+            if (name.equals(identityColumn.name()) &&
+                identityColumn.fieldId() == partitionField.sourceId() &&
+                "identity".equals(partitionField.transform().toString())) {
+              row.set(pos, partitionTuple.get(j, spec.javaClasses()[j]));
+            }
+          }
+        }
+      });
+
+      return row;
+    }
+
+    private CloseableIterable<T> newAvroIterable(InputFile inputFile, 
FileScanTask task, Schema readSchema) {
+      Avro.ReadBuilder avroReadBuilder = Avro.read(inputFile)
+          .project(readSchema)
+          .split(task.start(), task.length());
+      if (reuseContainers) {
+        avroReadBuilder.reuseContainers();
+      }
+
+      switch (inMemoryDataModel) {
+        case PIG:
+        case HIVE:
+          //TODO implement value readers for Pig and Hive
+          throw new UnsupportedOperationException("Avro support not yet 
supported for Pig and Hive");
+        case GENERIC:
+          avroReadBuilder.createReaderFunc(DataReader::create);
+      }
+      return avroReadBuilder.build();
+    }
+
+    private CloseableIterable<T> newParquetIterable(InputFile inputFile, 
FileScanTask task, Schema readSchema) {
+      Parquet.ReadBuilder parquetReadBuilder = Parquet.read(inputFile)
+          .project(readSchema)
+          .filter(task.residual())
+          .caseSensitive(caseSensitive)
+          .split(task.start(), task.length());
+      if (reuseContainers) {
+        parquetReadBuilder.reuseContainers();
+      }
+
+      switch (inMemoryDataModel) {
+        case PIG:
+        case HIVE:
+          //TODO implement value readers for Pig and Hive
+          throw new UnsupportedOperationException("Parquet support not yet 
supported for Pig and Hive");
+        case GENERIC:
+          parquetReadBuilder.createReaderFunc(
+              fileSchema -> GenericParquetReaders.buildReader(readSchema, 
fileSchema));
+      }
+      return parquetReadBuilder.build();
+    }
+
+    private CloseableIterable<T> newOrcIterable(InputFile inputFile, 
FileScanTask task, Schema readSchema) {
+      ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile)
+          .project(readSchema)
+          .caseSensitive(caseSensitive)
+          .split(task.start(), task.length());
+      // ORC does not support reuse containers yet
+      switch (inMemoryDataModel) {
+        case PIG:
+        case HIVE:
+          //TODO: implement value readers for Pig and Hive
+          throw new UnsupportedOperationException("ORC support not yet 
supported for Pig and Hive");
+        case GENERIC:
+          orcReadBuilder.createReaderFunc(fileSchema -> 
GenericOrcReader.buildReader(readSchema, fileSchema));
+      }
+
+      return orcReadBuilder.build();
+    }
+  }
+
+  private static Table findTable(Configuration conf) {
+    String path = conf.get(TABLE_PATH);
+    Preconditions.checkArgument(path != null, "Table path should not be null");
+    if (path.contains("/")) {
+      HadoopTables tables = new HadoopTables(conf);
+      return tables.load(path);
+    }
+
+    String catalogFuncClass = conf.get(CATALOG);
+    if (catalogFuncClass != null) {
+      Function<Configuration, Catalog> catalogFunc = (Function<Configuration, 
Catalog>)
+          DynConstructors.builder(Function.class)
+                         .impl(catalogFuncClass)
+                         .build()
+                         .newInstance();
+      Catalog catalog = catalogFunc.apply(conf);
+      TableIdentifier tableIdentifier = TableIdentifier.parse(path);
+      return catalog.loadTable(tableIdentifier);
+    } else {
+      throw new IllegalArgumentException("No custom catalog specified to load 
table " + path);
+    }
+  }
+
+  static class IcebergSplit extends InputSplit implements Writable {
+    static final String[] ANYWHERE = new String[]{"*"};
+    private CombinedScanTask task;
+    private transient String[] locations;
+    private transient Configuration conf;
+
+    IcebergSplit(Configuration conf, CombinedScanTask task) {
+      this.task = task;
+      this.conf = conf;
+    }
+
+    @Override
+    public long getLength() {
+      return task.files().stream().mapToLong(FileScanTask::length).sum();
+    }
+
+    @Override
+    public String[] getLocations() {
+      boolean localityPreferred = conf.getBoolean(LOCALITY, false);
+      if (!localityPreferred) {
+        return ANYWHERE;
+      }
+      if (locations != null) {
+        return locations;
+      }
+      locations = Util.blockLocations(task, conf);
+      return locations;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      byte[] data = SerializationUtil.serializeToBytes(this.task);
+      out.writeInt(data.length);
+      out.write(data);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      byte[] data = new byte[in.readInt()];
+      in.readFully(data);
+      this.task = SerializationUtil.deserializeFromBytes(data);
+    }
+  }
+}
diff --git 
a/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java 
b/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java
new file mode 100644
index 0000000..1f84890
--- /dev/null
+++ 
b/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapreduce;
+
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestHelpers.Row;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+@RunWith(Parameterized.class)
+public class TestIcebergInputFormat {
+  static final Schema SCHEMA = new Schema(
+      required(1, "data", Types.StringType.get()),
+      required(2, "id", Types.LongType.get()),
+      required(3, "date", Types.StringType.get()));
+
+  static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA)
+      .identity("date")
+      .bucket("id", 1)
+      .build();
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private HadoopTables tables;
+  private Configuration conf;
+
+  @Parameterized.Parameters
+  public static Object[][] parameters() {
+    return new Object[][]{
+        new Object[]{"parquet"},
+        new Object[]{"avro"},
+        new Object[]{"orc"}
+    };
+  }
+
+  private final FileFormat format;
+
+  public TestIcebergInputFormat(String format) {
+    this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
+  }
+
+  @Before
+  public void before() {
+    conf = new Configuration();
+    tables = new HadoopTables(conf);
+  }
+
+  @Test
+  public void testUnpartitionedTable() throws Exception {
+    File location = temp.newFolder(format.name());
+    Assert.assertTrue(location.delete());
+    Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(),
+                                
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
+                                location.toString());
+    List<Record> expectedRecords = RandomGenericData.generate(table.schema(), 
1, 0L);
+    DataFile dataFile = writeFile(table, null, format, expectedRecords);
+    table.newAppend()
+         .appendFile(dataFile)
+         .commit();
+    Job job = Job.getInstance(conf);
+    IcebergInputFormat.ConfigBuilder configBuilder = 
IcebergInputFormat.configure(job);
+    configBuilder.readFrom(location.toString());
+    validate(job, expectedRecords);
+  }
+
+  @Test
+  public void testPartitionedTable() throws Exception {
+    File location = temp.newFolder(format.name());
+    Assert.assertTrue(location.delete());
+    Table table = tables.create(SCHEMA, SPEC,
+                                
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
+                                location.toString());
+    List<Record> expectedRecords = RandomGenericData.generate(table.schema(), 
1, 0L);
+    expectedRecords.get(0).set(2, "2020-03-20");
+    DataFile dataFile = writeFile(table, Row.of("2020-03-20", 0), format, 
expectedRecords);
+    table.newAppend()
+         .appendFile(dataFile)
+         .commit();
+
+    Job job = Job.getInstance(conf);
+    IcebergInputFormat.ConfigBuilder configBuilder = 
IcebergInputFormat.configure(job);
+    configBuilder.readFrom(location.toString());
+    validate(job, expectedRecords);
+  }
+
+  @Test
+  public void testFilterExp() throws Exception {
+    File location = temp.newFolder(format.name());
+    Assert.assertTrue(location.delete());
+    Table table = tables.create(SCHEMA, SPEC,
+                                
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
+                                location.toString());
+    List<Record> expectedRecords = RandomGenericData.generate(table.schema(), 
2, 0L);
+    expectedRecords.get(0).set(2, "2020-03-20");
+    expectedRecords.get(1).set(2, "2020-03-20");
+    DataFile dataFile1 = writeFile(table, Row.of("2020-03-20", 0), format, 
expectedRecords);
+    DataFile dataFile2 = writeFile(table, Row.of("2020-03-21", 0), format,
+                                   RandomGenericData.generate(table.schema(), 
2, 0L));
+    table.newAppend()
+         .appendFile(dataFile1)
+         .appendFile(dataFile2)
+         .commit();
+    Job job = Job.getInstance(conf);
+    IcebergInputFormat.ConfigBuilder configBuilder = 
IcebergInputFormat.configure(job);
+    configBuilder.readFrom(location.toString())
+                 .filter(Expressions.equal("date", "2020-03-20"));
+    validate(job, expectedRecords);
+  }
+
+  @Test
+  public void testResiduals() throws Exception {
+    File location = temp.newFolder(format.name());
+    Assert.assertTrue(location.delete());
+    Table table = tables.create(SCHEMA, SPEC,
+                                
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
+                                location.toString());
+    List<Record> expectedRecords = RandomGenericData.generate(table.schema(), 
2, 0L);
+    expectedRecords.get(0).set(2, "2020-03-20");
+    expectedRecords.get(1).set(2, "2020-03-20");
+    DataFile dataFile = writeFile(table, Row.of("2020-03-20", 0), format, 
expectedRecords);
+    table.newAppend()
+         .appendFile(dataFile)
+         .commit();
+    Job job = Job.getInstance(conf);
+    IcebergInputFormat.ConfigBuilder configBuilder = 
IcebergInputFormat.configure(job);
+    configBuilder.readFrom(location.toString())
+                 .filter(Expressions.and(
+                     Expressions.equal("date", "2020-03-20"),
+                     Expressions.equal("id", 0)));
+
+    AssertHelpers.assertThrows(
+        "Residuals are not evaluated today for Iceberg Generics In memory 
model",
+        UnsupportedOperationException.class, "Filter expression 
ref(name=\"id\") == 0 is not completely satisfied.",
+        () -> validate(job, expectedRecords));
+  }
+
+  @Test
+  public void testProjection() throws Exception {
+    File location = temp.newFolder(format.name());
+    Assert.assertTrue(location.delete());
+    Schema projectedSchema = TypeUtil.select(SCHEMA, ImmutableSet.of(1));
+    Table table = tables.create(SCHEMA, SPEC,
+                                
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
+                                location.toString());
+    List<Record> inputRecords = RandomGenericData.generate(table.schema(), 1, 
0L);
+    DataFile dataFile = writeFile(table, Row.of("2020-03-20", 0), format, 
inputRecords);
+    table.newAppend()
+         .appendFile(dataFile)
+         .commit();
+
+    Job job = Job.getInstance(conf);
+    IcebergInputFormat.ConfigBuilder configBuilder = 
IcebergInputFormat.configure(job);
+    configBuilder
+        .readFrom(location.toString())
+        .project(projectedSchema);
+    List<Record> outputRecords = readRecords(job.getConfiguration());
+    Assert.assertEquals(inputRecords.size(), outputRecords.size());
+    Assert.assertEquals(projectedSchema.asStruct(), 
outputRecords.get(0).struct());
+  }
+
+  private static final Schema LOG_SCHEMA = new Schema(
+      Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+      Types.NestedField.optional(2, "date", Types.StringType.get()),
+      Types.NestedField.optional(3, "level", Types.StringType.get()),
+      Types.NestedField.optional(4, "message", Types.StringType.get())
+  );
+
+  private static final PartitionSpec IDENTITY_PARTITION_SPEC =
+      
PartitionSpec.builderFor(LOG_SCHEMA).identity("date").identity("level").build();
+
+  @Test
+  public void testIdentityPartitionProjections() throws Exception {
+    File location = temp.newFolder(format.name());
+    Assert.assertTrue(location.delete());
+    Table table = tables.create(LOG_SCHEMA, IDENTITY_PARTITION_SPEC,
+                                
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
+                                location.toString());
+
+    List<Record> inputRecords = RandomGenericData.generate(LOG_SCHEMA, 10, 0);
+    Integer idx = 0;
+    AppendFiles append = table.newAppend();
+    for (Record record : inputRecords) {
+      record.set(1, "2020-03-2" + idx);
+      record.set(2, idx.toString());
+      append.appendFile(writeFile(table, Row.of("2020-03-2" + idx, 
idx.toString()), format, ImmutableList.of(record)));
+      idx += 1;
+    }
+    append.commit();
+
+    // individual fields
+    validateIdentityPartitionProjections(location.toString(), 
withColumns("date"), inputRecords);
+    validateIdentityPartitionProjections(location.toString(), 
withColumns("level"), inputRecords);
+    validateIdentityPartitionProjections(location.toString(), 
withColumns("message"), inputRecords);
+    validateIdentityPartitionProjections(location.toString(), 
withColumns("id"), inputRecords);
+    // field pairs
+    validateIdentityPartitionProjections(location.toString(), 
withColumns("date", "message"), inputRecords);
+    validateIdentityPartitionProjections(location.toString(), 
withColumns("level", "message"), inputRecords);
+    validateIdentityPartitionProjections(location.toString(), 
withColumns("date", "level"), inputRecords);
+    // out-of-order pairs
+    validateIdentityPartitionProjections(location.toString(), 
withColumns("message", "date"), inputRecords);
+    validateIdentityPartitionProjections(location.toString(), 
withColumns("message", "level"), inputRecords);
+    validateIdentityPartitionProjections(location.toString(), 
withColumns("level", "date"), inputRecords);
+    // full projection
+    validateIdentityPartitionProjections(location.toString(), LOG_SCHEMA, 
inputRecords);
+    // out-of-order triplets
+    validateIdentityPartitionProjections(location.toString(), 
withColumns("date", "level", "message"), inputRecords);
+    validateIdentityPartitionProjections(location.toString(), 
withColumns("level", "date", "message"), inputRecords);
+    validateIdentityPartitionProjections(location.toString(), 
withColumns("date", "message", "level"), inputRecords);
+    validateIdentityPartitionProjections(location.toString(), 
withColumns("level", "message", "date"), inputRecords);
+    validateIdentityPartitionProjections(location.toString(), 
withColumns("message", "date", "level"), inputRecords);
+    validateIdentityPartitionProjections(location.toString(), 
withColumns("message", "level", "date"), inputRecords);
+  }
+
+  private static Schema withColumns(String... names) {
+    Map<String, Integer> indexByName = 
TypeUtil.indexByName(LOG_SCHEMA.asStruct());
+    Set<Integer> projectedIds = Sets.newHashSet();
+    for (String name : names) {
+      projectedIds.add(indexByName.get(name));
+    }
+    return TypeUtil.select(LOG_SCHEMA, projectedIds);
+  }
+
+  private void validateIdentityPartitionProjections(
+      String tablePath, Schema projectedSchema, List<Record> inputRecords) 
throws Exception {
+    Job job = Job.getInstance(conf);
+    IcebergInputFormat.ConfigBuilder configBuilder = 
IcebergInputFormat.configure(job);
+    configBuilder
+        .readFrom(tablePath)
+        .project(projectedSchema);
+    List<Record> actualRecords = readRecords(job.getConfiguration());
+
+    Set<String> fieldNames = 
TypeUtil.indexByName(projectedSchema.asStruct()).keySet();
+    for (int pos = 0; pos < inputRecords.size(); pos++) {
+      Record inputRecord = inputRecords.get(pos);
+      Record actualRecord = actualRecords.get(pos);
+      Assert.assertEquals("Projected schema should match", 
projectedSchema.asStruct(), actualRecord.struct());
+      for (String name : fieldNames) {
+        Assert.assertEquals(
+            "Projected field " + name + " should match", 
inputRecord.getField(name), actualRecord.getField(name));
+      }
+    }
+  }
+
+  @Test
+  public void testSnapshotReads() throws Exception {
+    File location = temp.newFolder(format.name());
+    Assert.assertTrue(location.delete());
+    Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(),
+                                
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
+                                location.toString());
+    List<Record> expectedRecords = RandomGenericData.generate(table.schema(), 
1, 0L);
+    table.newAppend()
+         .appendFile(writeFile(table, null, format, expectedRecords))
+         .commit();
+    long snapshotId = table.currentSnapshot().snapshotId();
+    table.newAppend()
+         .appendFile(writeFile(table, null, format, 
RandomGenericData.generate(table.schema(), 1, 0L)))
+         .commit();
+
+    Job job = Job.getInstance(conf);
+    IcebergInputFormat.ConfigBuilder configBuilder = 
IcebergInputFormat.configure(job);
+    configBuilder
+        .readFrom(location.toString())
+        .snapshotId(snapshotId);
+
+    validate(job, expectedRecords);
+  }
+
+  @Test
+  public void testLocality() throws Exception {
+    File location = temp.newFolder(format.name());
+    Assert.assertTrue(location.delete());
+    Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(),
+                                
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
+                                location.toString());
+    List<Record> expectedRecords = RandomGenericData.generate(table.schema(), 
1, 0L);
+    table.newAppend()
+         .appendFile(writeFile(table, null, format, expectedRecords))
+         .commit();
+    Job job = Job.getInstance(conf);
+    IcebergInputFormat.ConfigBuilder configBuilder = 
IcebergInputFormat.configure(job);
+    configBuilder.readFrom(location.toString());
+
+    for (InputSplit split : splits(job.getConfiguration())) {
+      Assert.assertArrayEquals(IcebergInputFormat.IcebergSplit.ANYWHERE, 
split.getLocations());
+    }
+
+    configBuilder.preferLocality();
+    for (InputSplit split : splits(job.getConfiguration())) {
+      Assert.assertArrayEquals(new String[]{"localhost"}, 
split.getLocations());
+    }
+  }
+
+  public static class HadoopCatalogFunc implements Function<Configuration, 
Catalog> {
+    @Override
+    public Catalog apply(Configuration conf) {
+      return new HadoopCatalog(conf, conf.get("warehouse.location"));
+    }
+  }
+
+  @Test
+  public void testCustomCatalog() throws Exception {
+    conf = new Configuration();
+    conf.set("warehouse.location", 
temp.newFolder("hadoop_catalog").getAbsolutePath());
+
+    Catalog catalog = new HadoopCatalogFunc().apply(conf);
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", "t");
+    Table table = catalog.createTable(tableIdentifier, SCHEMA, SPEC,
+                                      
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()));
+    List<Record> expectedRecords = RandomGenericData.generate(table.schema(), 
1, 0L);
+    expectedRecords.get(0).set(2, "2020-03-20");
+    DataFile dataFile = writeFile(table, Row.of("2020-03-20", 0), format, 
expectedRecords);
+    table.newAppend()
+         .appendFile(dataFile)
+         .commit();
+
+    Job job = Job.getInstance(conf);
+    IcebergInputFormat.ConfigBuilder configBuilder = 
IcebergInputFormat.configure(job);
+    configBuilder
+        .catalogFunc(HadoopCatalogFunc.class)
+        .readFrom(tableIdentifier.toString());
+    validate(job, expectedRecords);
+  }
+
+  private static void validate(Job job, List<Record> expectedRecords) {
+    List<Record> actualRecords = readRecords(job.getConfiguration());
+    Assert.assertEquals(expectedRecords, actualRecords);
+  }
+
+  private static <T> List<InputSplit> splits(Configuration conf) {
+    TaskAttemptContext context = new TaskAttemptContextImpl(conf, new 
TaskAttemptID());
+    IcebergInputFormat<T> icebergInputFormat = new IcebergInputFormat<>();
+    return icebergInputFormat.getSplits(context);
+  }
+
+  private static <T> List<T> readRecords(Configuration conf) {
+    TaskAttemptContext context = new TaskAttemptContextImpl(conf, new 
TaskAttemptID());
+    IcebergInputFormat<T> icebergInputFormat = new IcebergInputFormat<>();
+    List<InputSplit> splits = icebergInputFormat.getSplits(context);
+    return
+        FluentIterable
+            .from(splits)
+            .transformAndConcat(split -> readRecords(icebergInputFormat, 
split, context))
+            .toList();
+  }
+
+  private static <T> Iterable<T> readRecords(
+      IcebergInputFormat<T> inputFormat, InputSplit split, TaskAttemptContext 
context) {
+    RecordReader<Void, T> recordReader = inputFormat.createRecordReader(split, 
context);
+    List<T> records = new ArrayList<>();
+    try {
+      recordReader.initialize(split, context);
+      while (recordReader.nextKeyValue()) {
+        records.add(recordReader.getCurrentValue());
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return records;
+  }
+
+  private DataFile writeFile(
+      Table table, StructLike partitionData, FileFormat fileFormat, 
List<Record> records) throws IOException {
+    File file = temp.newFile();
+    Assert.assertTrue(file.delete());
+    FileAppender<Record> appender;
+    switch (fileFormat) {
+      case AVRO:
+        appender = Avro.write(Files.localOutput(file))
+            .schema(table.schema())
+            .createWriterFunc(DataWriter::create)
+            .named(fileFormat.name())
+            .build();
+        break;
+      case PARQUET:
+        appender = Parquet.write(Files.localOutput(file))
+            .schema(table.schema())
+            .createWriterFunc(GenericParquetWriter::buildWriter)
+            .named(fileFormat.name())
+            .build();
+        break;
+      case ORC:
+        appender = ORC.write(Files.localOutput(file))
+            .schema(table.schema())
+            .createWriterFunc(GenericOrcWriter::buildWriter)
+            .build();
+        break;
+      default:
+        throw new UnsupportedOperationException("Cannot write format: " + 
fileFormat);
+    }
+
+    try {
+      appender.addAll(records);
+    } finally {
+      appender.close();
+    }
+
+    DataFiles.Builder builder = DataFiles.builder(table.spec())
+        .withPath(file.toString())
+        .withFormat(format)
+        .withFileSizeInBytes(file.length())
+        .withMetrics(appender.metrics());
+    if (partitionData != null) {
+      builder.withPartition(partitionData);
+    }
+    return builder.build();
+  }
+}
diff --git a/settings.gradle b/settings.gradle
index 854b3b2..0c9e592 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -22,6 +22,7 @@ include 'api'
 include 'common'
 include 'core'
 include 'data'
+include 'mr'
 include 'orc'
 include 'parquet'
 include 'spark'
@@ -34,6 +35,7 @@ project(':api').name = 'iceberg-api'
 project(':common').name = 'iceberg-common'
 project(':core').name = 'iceberg-core'
 project(':data').name = 'iceberg-data'
+project(':mr').name = 'iceberg-mr'
 project(':orc').name = 'iceberg-orc'
 project(':arrow').name = 'iceberg-arrow'
 project(':parquet').name = 'iceberg-parquet'
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java 
b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index ea189d3..0d25fb9 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -21,15 +21,11 @@ package org.apache.iceberg.spark.source;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
-import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.CombinedScanTask;
@@ -44,6 +40,7 @@ import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.Util;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.spark.SparkFilters;
@@ -353,20 +350,7 @@ class Reader implements DataSourceReader, 
SupportsPushDownFilters, SupportsPushD
       }
 
       Configuration conf = 
SparkSession.active().sparkContext().hadoopConfiguration();
-      Set<String> locations = Sets.newHashSet();
-      for (FileScanTask f : task.files()) {
-        Path path = new Path(f.file().path().toString());
-        try {
-          FileSystem fs = path.getFileSystem(conf);
-          for (BlockLocation b : fs.getFileBlockLocations(path, f.start(), 
f.length())) {
-            locations.addAll(Arrays.asList(b.getHosts()));
-          }
-        } catch (IOException ioe) {
-          LOG.warn("Failed to get block locations for path {}", path, ioe);
-        }
-      }
-
-      return locations.toArray(new String[0]);
+      return Util.blockLocations(task, conf);
     }
   }
 

Reply via email to