This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 71a7ab9 MR: Add InputFormat (#843)
71a7ab9 is described below
commit 71a7ab986a003fab614d076cffb5a4580062806e
Author: Ratandeep Ratti <[email protected]>
AuthorDate: Mon Apr 6 16:02:56 2020 -0700
MR: Add InputFormat (#843)
---
.gitignore | 3 +
build.gradle | 18 +
.../main/java/org/apache/iceberg/hadoop/Util.java | 29 +-
.../org/apache/iceberg/mr/SerializationUtil.java | 74 +++
.../iceberg/mr/mapreduce/IcebergInputFormat.java | 570 +++++++++++++++++++++
.../mr/mapreduce/TestIcebergInputFormat.java | 478 +++++++++++++++++
settings.gradle | 2 +
.../org/apache/iceberg/spark/source/Reader.java | 20 +-
8 files changed, 1175 insertions(+), 19 deletions(-)
diff --git a/.gitignore b/.gitignore
index 511baa5..13767d6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -35,3 +35,6 @@ spark/tmp/
.project
.settings
bin/
+
+# Hive/metastore files
+metastore_db/
diff --git a/build.gradle b/build.gradle
index ed8a73d..3820e16 100644
--- a/build.gradle
+++ b/build.gradle
@@ -220,6 +220,24 @@ project(':iceberg-hive') {
}
}
+project(':iceberg-mr') {
+ dependencies {
+ compile project(':iceberg-api')
+ compile project(':iceberg-core')
+ compile project(':iceberg-orc')
+ compile project(':iceberg-parquet')
+ compile project(':iceberg-data')
+
+ compileOnly("org.apache.hadoop:hadoop-client") {
+ exclude group: 'org.apache.avro', module: 'avro'
+ }
+
+ testCompile project(path: ':iceberg-data', configuration: 'testArtifacts')
+ testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
+ testCompile project(path: ':iceberg-core', configuration: 'testArtifacts')
+ }
+}
+
project(':iceberg-orc') {
dependencies {
compile project(':iceberg-api')
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/Util.java
b/core/src/main/java/org/apache/iceberg/hadoop/Util.java
index a04e5a1..d61fa4d 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/Util.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/Util.java
@@ -19,13 +19,23 @@
package org.apache.iceberg.hadoop;
+import com.google.common.collect.Sets;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Util {
+ private static final Logger LOG = LoggerFactory.getLogger(Util.class);
-class Util {
private Util() {
}
@@ -36,4 +46,21 @@ class Util {
throw new RuntimeIOException(e, "Failed to get file system for path:
%s", path);
}
}
+
+ public static String[] blockLocations(CombinedScanTask task, Configuration
conf) {
+ Set<String> locationSets = Sets.newHashSet();
+ for (FileScanTask f : task.files()) {
+ Path path = new Path(f.file().path().toString());
+ try {
+ FileSystem fs = path.getFileSystem(conf);
+ for (BlockLocation b : fs.getFileBlockLocations(path, f.start(),
f.length())) {
+ locationSets.addAll(Arrays.asList(b.getHosts()));
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Failed to get block locations for path {}", path, ioe);
+ }
+ }
+
+ return locationSets.toArray(new String[0]);
+ }
}
diff --git a/mr/src/main/java/org/apache/iceberg/mr/SerializationUtil.java
b/mr/src/main/java/org/apache/iceberg/mr/SerializationUtil.java
new file mode 100644
index 0000000..fcd8276
--- /dev/null
+++ b/mr/src/main/java/org/apache/iceberg/mr/SerializationUtil.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+
+public class SerializationUtil {
+
+ private SerializationUtil() {
+ }
+
+ public static byte[] serializeToBytes(Object obj) {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(obj);
+ return baos.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeIOException("Failed to serialize object", e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T> T deserializeFromBytes(byte[] bytes) {
+ if (bytes == null) {
+ return null;
+ }
+
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bais)) {
+ return (T) ois.readObject();
+ } catch (IOException e) {
+ throw new RuntimeIOException("Failed to deserialize object", e);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Could not read object ", e);
+ }
+ }
+
+ public static String serializeToBase64(Object obj) {
+ byte[] bytes = serializeToBytes(obj);
+ return new String(Base64.getMimeEncoder().encode(bytes),
StandardCharsets.UTF_8);
+ }
+
+ public static <T> T deserializeFromBase64(String base64) {
+ if (base64 == null) {
+ return null;
+ }
+ byte[] bytes =
Base64.getMimeDecoder().decode(base64.getBytes(StandardCharsets.UTF_8));
+ return deserializeFromBytes(bytes);
+ }
+}
diff --git
a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
new file mode 100644
index 0000000..4f31abc
--- /dev/null
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapreduce;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mr.SerializationUtil;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generic Mrv2 InputFormat API for Iceberg.
+ * @param <T> T is the in memory data model which can either be Pig tuples,
Hive rows. Default is Iceberg records
+ */
+public class IcebergInputFormat<T> extends InputFormat<Void, T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(IcebergInputFormat.class);
+
+ static final String AS_OF_TIMESTAMP = "iceberg.mr.as.of.time";
+ static final String CASE_SENSITIVE = "iceberg.mr.case.sensitive";
+ static final String FILTER_EXPRESSION = "iceberg.mr.filter.expression";
+ static final String IN_MEMORY_DATA_MODEL = "iceberg.mr.in.memory.data.model";
+ static final String READ_SCHEMA = "iceberg.mr.read.schema";
+ static final String REUSE_CONTAINERS = "iceberg.mr.reuse.containers";
+ static final String SNAPSHOT_ID = "iceberg.mr.snapshot.id";
+ static final String SPLIT_SIZE = "iceberg.mr.split.size";
+ static final String TABLE_PATH = "iceberg.mr.table.path";
+ static final String TABLE_SCHEMA = "iceberg.mr.table.schema";
+ static final String LOCALITY = "iceberg.mr.locality";
+ static final String CATALOG = "iceberg.mr.catalog";
+ static final String SKIP_RESIDUAL_FILTERING = "skip.residual.filtering";
+
+ private transient List<InputSplit> splits;
+
+ private enum InMemoryDataModel {
+ PIG,
+ HIVE,
+ GENERIC // Default data model is of Iceberg Generics
+ }
+
+ /**
+ * Configures the {@code Job} to use the {@code IcebergInputFormat} and
+ * returns a helper to add further configuration.
+ *
+ * @param job the {@code Job} to configure
+ */
+ public static ConfigBuilder configure(Job job) {
+ job.setInputFormatClass(IcebergInputFormat.class);
+ return new ConfigBuilder(job.getConfiguration());
+ }
+
+ public static class ConfigBuilder {
+ private final Configuration conf;
+
+ public ConfigBuilder(Configuration conf) {
+ this.conf = conf;
+ // defaults
+ conf.setEnum(IN_MEMORY_DATA_MODEL, InMemoryDataModel.GENERIC);
+ conf.setBoolean(SKIP_RESIDUAL_FILTERING, false);
+ conf.setBoolean(CASE_SENSITIVE, true);
+ conf.setBoolean(REUSE_CONTAINERS, false);
+ conf.setBoolean(LOCALITY, false);
+ }
+
+ public ConfigBuilder readFrom(String path) {
+ conf.set(TABLE_PATH, path);
+ Table table = findTable(conf);
+ conf.set(TABLE_SCHEMA, SchemaParser.toJson(table.schema()));
+ return this;
+ }
+
+ public ConfigBuilder filter(Expression expression) {
+ conf.set(FILTER_EXPRESSION,
SerializationUtil.serializeToBase64(expression));
+ return this;
+ }
+
+ public ConfigBuilder project(Schema schema) {
+ conf.set(READ_SCHEMA, SchemaParser.toJson(schema));
+ return this;
+ }
+
+ public ConfigBuilder reuseContainers(boolean reuse) {
+ conf.setBoolean(REUSE_CONTAINERS, reuse);
+ return this;
+ }
+
+ public ConfigBuilder caseSensitive(boolean caseSensitive) {
+ conf.setBoolean(CASE_SENSITIVE, caseSensitive);
+ return this;
+ }
+
+ public ConfigBuilder snapshotId(long snapshotId) {
+ conf.setLong(SNAPSHOT_ID, snapshotId);
+ return this;
+ }
+
+ public ConfigBuilder asOfTime(long asOfTime) {
+ conf.setLong(AS_OF_TIMESTAMP, asOfTime);
+ return this;
+ }
+
+ public ConfigBuilder splitSize(long splitSize) {
+ conf.setLong(SPLIT_SIZE, splitSize);
+ return this;
+ }
+
+ /**
+ * If this API is called. The input splits
+ * constructed will have host location information
+ */
+ public ConfigBuilder preferLocality() {
+ conf.setBoolean(LOCALITY, true);
+ return this;
+ }
+
+ public ConfigBuilder catalogFunc(Class<? extends Function<Configuration,
Catalog>> catalogFuncClass) {
+ conf.setClass(CATALOG, catalogFuncClass, Function.class);
+ return this;
+ }
+
+ public ConfigBuilder useHiveRows() {
+ conf.set(IN_MEMORY_DATA_MODEL, InMemoryDataModel.HIVE.name());
+ return this;
+ }
+
+ public ConfigBuilder usePigTuples() {
+ conf.set(IN_MEMORY_DATA_MODEL, InMemoryDataModel.PIG.name());
+ return this;
+ }
+
+ /**
+ * Compute platforms pass down filters to data sources. If the data source
cannot apply some filters, or only
+ * partially applies the filter, it will return the residual filter back.
If the platform can correctly apply
+ * the residual filters, then it should call this api. Otherwise the
current api will throw an exception if the
+ * passed in filter is not completely satisfied.
+ */
+ public ConfigBuilder skipResidualFiltering() {
+ conf.setBoolean(SKIP_RESIDUAL_FILTERING, true);
+ return this;
+ }
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context) {
+ if (splits != null) {
+ LOG.info("Returning cached splits: {}", splits.size());
+ return splits;
+ }
+
+ Configuration conf = context.getConfiguration();
+ Table table = findTable(conf);
+ TableScan scan = table.newScan()
+ .caseSensitive(conf.getBoolean(CASE_SENSITIVE,
true));
+ long snapshotId = conf.getLong(SNAPSHOT_ID, -1);
+ if (snapshotId != -1) {
+ scan = scan.useSnapshot(snapshotId);
+ }
+ long asOfTime = conf.getLong(AS_OF_TIMESTAMP, -1);
+ if (asOfTime != -1) {
+ scan = scan.asOfTime(asOfTime);
+ }
+ long splitSize = conf.getLong(SPLIT_SIZE, 0);
+ if (splitSize > 0) {
+ scan = scan.option(TableProperties.SPLIT_SIZE,
String.valueOf(splitSize));
+ }
+ String schemaStr = conf.get(READ_SCHEMA);
+ if (schemaStr != null) {
+ scan.project(SchemaParser.fromJson(schemaStr));
+ }
+
+ // TODO add a filter parser to get rid of Serialization
+ Expression filter =
SerializationUtil.deserializeFromBase64(conf.get(FILTER_EXPRESSION));
+ if (filter != null) {
+ scan = scan.filter(filter);
+ }
+
+ splits = Lists.newArrayList();
+ boolean applyResidualFiltering = !conf.getBoolean(SKIP_RESIDUAL_FILTERING,
false);
+ try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks())
{
+ tasksIterable.forEach(task -> {
+ if (applyResidualFiltering) {
+ //TODO: We do not support residual evaluation yet
+ checkResiduals(task);
+ }
+ splits.add(new IcebergSplit(conf, task));
+ });
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to close table scan: %s", scan);
+ }
+
+ return splits;
+ }
+
+ private static void checkResiduals(CombinedScanTask task) {
+ task.files().forEach(fileScanTask -> {
+ Expression residual = fileScanTask.residual();
+ if (residual != null && !residual.equals(Expressions.alwaysTrue())) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Filter expression %s is not completely satisfied. Additional
rows " +
+ "can be returned not satisfied by the filter expression",
residual));
+ }
+ });
+ }
+
+ @Override
+ public RecordReader<Void, T> createRecordReader(InputSplit split,
TaskAttemptContext context) {
+ return new IcebergRecordReader<>();
+ }
+
+ private static final class IcebergRecordReader<T> extends RecordReader<Void,
T> {
+ private TaskAttemptContext context;
+ private Schema tableSchema;
+ private Schema expectedSchema;
+ private boolean reuseContainers;
+ private boolean caseSensitive;
+ private InMemoryDataModel inMemoryDataModel;
+ private Map<String, Integer> namesToPos;
+ private Iterator<FileScanTask> tasks;
+ private T currentRow;
+ private Iterator<T> currentIterator;
+ private Closeable currentCloseable;
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext newContext) {
+ Configuration conf = newContext.getConfiguration();
+ // For now IcebergInputFormat does its own split planning and does not
accept FileSplit instances
+ CombinedScanTask task = ((IcebergSplit) split).task;
+ this.context = newContext;
+ this.tasks = task.files().iterator();
+ this.tableSchema = SchemaParser.fromJson(conf.get(TABLE_SCHEMA));
+ String readSchemaStr = conf.get(READ_SCHEMA);
+ this.expectedSchema = readSchemaStr != null ?
SchemaParser.fromJson(readSchemaStr) : tableSchema;
+ this.namesToPos = buildNameToPos(expectedSchema);
+ this.reuseContainers = conf.getBoolean(REUSE_CONTAINERS, false);
+ this.caseSensitive = conf.getBoolean(CASE_SENSITIVE, true);
+ this.inMemoryDataModel = conf.getEnum(IN_MEMORY_DATA_MODEL,
InMemoryDataModel.GENERIC);
+ this.currentIterator = open(tasks.next());
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException {
+ while (true) {
+ if (currentIterator.hasNext()) {
+ currentRow = currentIterator.next();
+ return true;
+ } else if (tasks.hasNext()) {
+ currentCloseable.close();
+ currentIterator = open(tasks.next());
+ } else {
+ return false;
+ }
+ }
+ }
+
+ @Override
+ public Void getCurrentKey() {
+ return null;
+ }
+
+ @Override
+ public T getCurrentValue() {
+ return currentRow;
+ }
+
+ @Override
+ public float getProgress() {
+ // TODO: We could give a more accurate progress based on records read
from the file. Context.getProgress does not
+ // have enough information to give an accurate progress value. This
isn't that easy, since we don't know how much
+ // of the input split has been processed and we are pushing filters into
Parquet and ORC. But we do know when a
+ // file is opened and could count the number of rows returned, so we can
estimate. And we could also add a row
+ // count to the readers so that we can get an accurate count of rows
that have been either returned or filtered
+ // out.
+ return context.getProgress();
+ }
+
+ @Override
+ public void close() throws IOException {
+ currentCloseable.close();
+ }
+
+ private static Map<String, Integer> buildNameToPos(Schema expectedSchema) {
+ Map<String, Integer> nameToPos = Maps.newHashMap();
+ for (int pos = 0; pos < expectedSchema.asStruct().fields().size();
pos++) {
+ Types.NestedField field = expectedSchema.asStruct().fields().get(pos);
+ nameToPos.put(field.name(), pos);
+ }
+ return nameToPos;
+ }
+
+ private Iterator<T> open(FileScanTask currentTask) {
+ DataFile file = currentTask.file();
+ // schema of rows returned by readers
+ PartitionSpec spec = currentTask.spec();
+ Set<Integer> idColumns = Sets.intersection(spec.identitySourceIds(),
TypeUtil.getProjectedIds(expectedSchema));
+ boolean hasJoinedPartitionColumns = !idColumns.isEmpty();
+
+ if (hasJoinedPartitionColumns) {
+ Schema readDataSchema = TypeUtil.selectNot(expectedSchema, idColumns);
+ Schema identityPartitionSchema = TypeUtil.select(expectedSchema,
idColumns);
+ return Iterators.transform(
+ open(currentTask, readDataSchema),
+ row -> withIdentityPartitionColumns(row, identityPartitionSchema,
spec, file.partition()));
+ } else {
+ return open(currentTask, expectedSchema);
+ }
+ }
+
+ private Iterator<T> open(FileScanTask currentTask, Schema readSchema) {
+ DataFile file = currentTask.file();
+ // TODO we should make use of FileIO to create inputFile
+ InputFile inputFile = HadoopInputFile.fromLocation(file.path(),
context.getConfiguration());
+ CloseableIterable<T> iterable;
+ switch (file.format()) {
+ case AVRO:
+ iterable = newAvroIterable(inputFile, currentTask, readSchema);
+ break;
+ case ORC:
+ iterable = newOrcIterable(inputFile, currentTask, readSchema);
+ break;
+ case PARQUET:
+ iterable = newParquetIterable(inputFile, currentTask, readSchema);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Cannot read %s file: %s", file.format().name(),
file.path()));
+ }
+ currentCloseable = iterable;
+ //TODO: Apply residual filtering before returning the iterator
+ return iterable.iterator();
+ }
+
+ @SuppressWarnings("unchecked")
+ private T withIdentityPartitionColumns(
+ T row, Schema identityPartitionSchema, PartitionSpec spec, StructLike
partition) {
+ switch (inMemoryDataModel) {
+ case PIG:
+ case HIVE:
+ throw new UnsupportedOperationException(
+ "Adding partition columns to Pig and Hive data model are not
supported yet");
+ case GENERIC:
+ return (T) withIdentityPartitionColumns((Record) row,
identityPartitionSchema, spec, partition);
+ }
+ return row;
+ }
+
+ private Record withIdentityPartitionColumns(
+ Record record, Schema identityPartitionSchema, PartitionSpec spec,
StructLike partitionTuple) {
+ List<PartitionField> partitionFields = spec.fields();
+ List<Types.NestedField> identityColumns =
identityPartitionSchema.columns();
+ GenericRecord row = GenericRecord.create(expectedSchema.asStruct());
+ namesToPos.forEach((name, pos) -> {
+ Object field = record.getField(name);
+ if (field != null) {
+ row.set(pos, field);
+ }
+
+ // if the current name, pos points to an identity partition column, we
set the
+ // column at pos correctly by reading the corresponding value from
partitionTuple`
+ for (int i = 0; i < identityColumns.size(); i++) {
+ Types.NestedField identityColumn = identityColumns.get(i);
+ for (int j = 0; j < partitionFields.size(); j++) {
+ PartitionField partitionField = partitionFields.get(j);
+ if (name.equals(identityColumn.name()) &&
+ identityColumn.fieldId() == partitionField.sourceId() &&
+ "identity".equals(partitionField.transform().toString())) {
+ row.set(pos, partitionTuple.get(j, spec.javaClasses()[j]));
+ }
+ }
+ }
+ });
+
+ return row;
+ }
+
+ private CloseableIterable<T> newAvroIterable(InputFile inputFile,
FileScanTask task, Schema readSchema) {
+ Avro.ReadBuilder avroReadBuilder = Avro.read(inputFile)
+ .project(readSchema)
+ .split(task.start(), task.length());
+ if (reuseContainers) {
+ avroReadBuilder.reuseContainers();
+ }
+
+ switch (inMemoryDataModel) {
+ case PIG:
+ case HIVE:
+ //TODO implement value readers for Pig and Hive
+ throw new UnsupportedOperationException("Avro support not yet
supported for Pig and Hive");
+ case GENERIC:
+ avroReadBuilder.createReaderFunc(DataReader::create);
+ }
+ return avroReadBuilder.build();
+ }
+
+ private CloseableIterable<T> newParquetIterable(InputFile inputFile,
FileScanTask task, Schema readSchema) {
+ Parquet.ReadBuilder parquetReadBuilder = Parquet.read(inputFile)
+ .project(readSchema)
+ .filter(task.residual())
+ .caseSensitive(caseSensitive)
+ .split(task.start(), task.length());
+ if (reuseContainers) {
+ parquetReadBuilder.reuseContainers();
+ }
+
+ switch (inMemoryDataModel) {
+ case PIG:
+ case HIVE:
+ //TODO implement value readers for Pig and Hive
+ throw new UnsupportedOperationException("Parquet support not yet
supported for Pig and Hive");
+ case GENERIC:
+ parquetReadBuilder.createReaderFunc(
+ fileSchema -> GenericParquetReaders.buildReader(readSchema,
fileSchema));
+ }
+ return parquetReadBuilder.build();
+ }
+
+ private CloseableIterable<T> newOrcIterable(InputFile inputFile,
FileScanTask task, Schema readSchema) {
+ ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile)
+ .project(readSchema)
+ .caseSensitive(caseSensitive)
+ .split(task.start(), task.length());
+ // ORC does not support reuse containers yet
+ switch (inMemoryDataModel) {
+ case PIG:
+ case HIVE:
+ //TODO: implement value readers for Pig and Hive
+ throw new UnsupportedOperationException("ORC support not yet
supported for Pig and Hive");
+ case GENERIC:
+ orcReadBuilder.createReaderFunc(fileSchema ->
GenericOrcReader.buildReader(readSchema, fileSchema));
+ }
+
+ return orcReadBuilder.build();
+ }
+ }
+
+ private static Table findTable(Configuration conf) {
+ String path = conf.get(TABLE_PATH);
+ Preconditions.checkArgument(path != null, "Table path should not be null");
+ if (path.contains("/")) {
+ HadoopTables tables = new HadoopTables(conf);
+ return tables.load(path);
+ }
+
+ String catalogFuncClass = conf.get(CATALOG);
+ if (catalogFuncClass != null) {
+ Function<Configuration, Catalog> catalogFunc = (Function<Configuration,
Catalog>)
+ DynConstructors.builder(Function.class)
+ .impl(catalogFuncClass)
+ .build()
+ .newInstance();
+ Catalog catalog = catalogFunc.apply(conf);
+ TableIdentifier tableIdentifier = TableIdentifier.parse(path);
+ return catalog.loadTable(tableIdentifier);
+ } else {
+ throw new IllegalArgumentException("No custom catalog specified to load
table " + path);
+ }
+ }
+
+ static class IcebergSplit extends InputSplit implements Writable {
+ static final String[] ANYWHERE = new String[]{"*"};
+ private CombinedScanTask task;
+ private transient String[] locations;
+ private transient Configuration conf;
+
+ IcebergSplit(Configuration conf, CombinedScanTask task) {
+ this.task = task;
+ this.conf = conf;
+ }
+
+ @Override
+ public long getLength() {
+ return task.files().stream().mapToLong(FileScanTask::length).sum();
+ }
+
+ @Override
+ public String[] getLocations() {
+ boolean localityPreferred = conf.getBoolean(LOCALITY, false);
+ if (!localityPreferred) {
+ return ANYWHERE;
+ }
+ if (locations != null) {
+ return locations;
+ }
+ locations = Util.blockLocations(task, conf);
+ return locations;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ byte[] data = SerializationUtil.serializeToBytes(this.task);
+ out.writeInt(data.length);
+ out.write(data);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ byte[] data = new byte[in.readInt()];
+ in.readFully(data);
+ this.task = SerializationUtil.deserializeFromBytes(data);
+ }
+ }
+}
diff --git
a/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java
b/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java
new file mode 100644
index 0000000..1f84890
--- /dev/null
+++
b/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapreduce;
+
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestHelpers.Row;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+@RunWith(Parameterized.class)
+public class TestIcebergInputFormat {
+ static final Schema SCHEMA = new Schema(
+ required(1, "data", Types.StringType.get()),
+ required(2, "id", Types.LongType.get()),
+ required(3, "date", Types.StringType.get()));
+
+ static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA)
+ .identity("date")
+ .bucket("id", 1)
+ .build();
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+ private HadoopTables tables;
+ private Configuration conf;
+
+ @Parameterized.Parameters
+ public static Object[][] parameters() {
+ return new Object[][]{
+ new Object[]{"parquet"},
+ new Object[]{"avro"},
+ new Object[]{"orc"}
+ };
+ }
+
+ private final FileFormat format;
+
+ public TestIcebergInputFormat(String format) {
+ this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
+ }
+
+ @Before
+ public void before() {
+ conf = new Configuration();
+ tables = new HadoopTables(conf);
+ }
+
+ @Test
+ public void testUnpartitionedTable() throws Exception {
+ File location = temp.newFolder(format.name());
+ Assert.assertTrue(location.delete());
+ Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(),
+
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
+ location.toString());
+ List<Record> expectedRecords = RandomGenericData.generate(table.schema(),
1, 0L);
+ DataFile dataFile = writeFile(table, null, format, expectedRecords);
+ table.newAppend()
+ .appendFile(dataFile)
+ .commit();
+ Job job = Job.getInstance(conf);
+ IcebergInputFormat.ConfigBuilder configBuilder =
IcebergInputFormat.configure(job);
+ configBuilder.readFrom(location.toString());
+ validate(job, expectedRecords);
+ }
+
+ @Test
+ public void testPartitionedTable() throws Exception {
+ File location = temp.newFolder(format.name());
+ Assert.assertTrue(location.delete());
+ Table table = tables.create(SCHEMA, SPEC,
+
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
+ location.toString());
+ List<Record> expectedRecords = RandomGenericData.generate(table.schema(),
1, 0L);
+ expectedRecords.get(0).set(2, "2020-03-20");
+ DataFile dataFile = writeFile(table, Row.of("2020-03-20", 0), format,
expectedRecords);
+ table.newAppend()
+ .appendFile(dataFile)
+ .commit();
+
+ Job job = Job.getInstance(conf);
+ IcebergInputFormat.ConfigBuilder configBuilder =
IcebergInputFormat.configure(job);
+ configBuilder.readFrom(location.toString());
+ validate(job, expectedRecords);
+ }
+
+ @Test
+ public void testFilterExp() throws Exception {
+ File location = temp.newFolder(format.name());
+ Assert.assertTrue(location.delete());
+ Table table = tables.create(SCHEMA, SPEC,
+
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
+ location.toString());
+ List<Record> expectedRecords = RandomGenericData.generate(table.schema(),
2, 0L);
+ expectedRecords.get(0).set(2, "2020-03-20");
+ expectedRecords.get(1).set(2, "2020-03-20");
+ DataFile dataFile1 = writeFile(table, Row.of("2020-03-20", 0), format,
expectedRecords);
+ DataFile dataFile2 = writeFile(table, Row.of("2020-03-21", 0), format,
+ RandomGenericData.generate(table.schema(),
2, 0L));
+ table.newAppend()
+ .appendFile(dataFile1)
+ .appendFile(dataFile2)
+ .commit();
+ Job job = Job.getInstance(conf);
+ IcebergInputFormat.ConfigBuilder configBuilder =
IcebergInputFormat.configure(job);
+ configBuilder.readFrom(location.toString())
+ .filter(Expressions.equal("date", "2020-03-20"));
+ validate(job, expectedRecords);
+ }
+
+ @Test
+ public void testResiduals() throws Exception {
+ File location = temp.newFolder(format.name());
+ Assert.assertTrue(location.delete());
+ Table table = tables.create(SCHEMA, SPEC,
+
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
+ location.toString());
+ List<Record> expectedRecords = RandomGenericData.generate(table.schema(),
2, 0L);
+ expectedRecords.get(0).set(2, "2020-03-20");
+ expectedRecords.get(1).set(2, "2020-03-20");
+ DataFile dataFile = writeFile(table, Row.of("2020-03-20", 0), format,
expectedRecords);
+ table.newAppend()
+ .appendFile(dataFile)
+ .commit();
+ Job job = Job.getInstance(conf);
+ IcebergInputFormat.ConfigBuilder configBuilder =
IcebergInputFormat.configure(job);
+ configBuilder.readFrom(location.toString())
+ .filter(Expressions.and(
+ Expressions.equal("date", "2020-03-20"),
+ Expressions.equal("id", 0)));
+
+ AssertHelpers.assertThrows(
+ "Residuals are not evaluated today for Iceberg Generics In memory
model",
+ UnsupportedOperationException.class, "Filter expression
ref(name=\"id\") == 0 is not completely satisfied.",
+ () -> validate(job, expectedRecords));
+ }
+
+ @Test
+ public void testProjection() throws Exception {
+ File location = temp.newFolder(format.name());
+ Assert.assertTrue(location.delete());
+ Schema projectedSchema = TypeUtil.select(SCHEMA, ImmutableSet.of(1));
+ Table table = tables.create(SCHEMA, SPEC,
+
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
+ location.toString());
+ List<Record> inputRecords = RandomGenericData.generate(table.schema(), 1,
0L);
+ DataFile dataFile = writeFile(table, Row.of("2020-03-20", 0), format,
inputRecords);
+ table.newAppend()
+ .appendFile(dataFile)
+ .commit();
+
+ Job job = Job.getInstance(conf);
+ IcebergInputFormat.ConfigBuilder configBuilder =
IcebergInputFormat.configure(job);
+ configBuilder
+ .readFrom(location.toString())
+ .project(projectedSchema);
+ List<Record> outputRecords = readRecords(job.getConfiguration());
+ Assert.assertEquals(inputRecords.size(), outputRecords.size());
+ Assert.assertEquals(projectedSchema.asStruct(),
outputRecords.get(0).struct());
+ }
+
+ private static final Schema LOG_SCHEMA = new Schema(
+ Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "date", Types.StringType.get()),
+ Types.NestedField.optional(3, "level", Types.StringType.get()),
+ Types.NestedField.optional(4, "message", Types.StringType.get())
+ );
+
+ private static final PartitionSpec IDENTITY_PARTITION_SPEC =
+
PartitionSpec.builderFor(LOG_SCHEMA).identity("date").identity("level").build();
+
+ @Test
+ public void testIdentityPartitionProjections() throws Exception {
+ File location = temp.newFolder(format.name());
+ Assert.assertTrue(location.delete());
+ Table table = tables.create(LOG_SCHEMA, IDENTITY_PARTITION_SPEC,
+
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
+ location.toString());
+
+ List<Record> inputRecords = RandomGenericData.generate(LOG_SCHEMA, 10, 0);
+ Integer idx = 0;
+ AppendFiles append = table.newAppend();
+ for (Record record : inputRecords) {
+ record.set(1, "2020-03-2" + idx);
+ record.set(2, idx.toString());
+ append.appendFile(writeFile(table, Row.of("2020-03-2" + idx,
idx.toString()), format, ImmutableList.of(record)));
+ idx += 1;
+ }
+ append.commit();
+
+ // individual fields
+ validateIdentityPartitionProjections(location.toString(),
withColumns("date"), inputRecords);
+ validateIdentityPartitionProjections(location.toString(),
withColumns("level"), inputRecords);
+ validateIdentityPartitionProjections(location.toString(),
withColumns("message"), inputRecords);
+ validateIdentityPartitionProjections(location.toString(),
withColumns("id"), inputRecords);
+ // field pairs
+ validateIdentityPartitionProjections(location.toString(),
withColumns("date", "message"), inputRecords);
+ validateIdentityPartitionProjections(location.toString(),
withColumns("level", "message"), inputRecords);
+ validateIdentityPartitionProjections(location.toString(),
withColumns("date", "level"), inputRecords);
+ // out-of-order pairs
+ validateIdentityPartitionProjections(location.toString(),
withColumns("message", "date"), inputRecords);
+ validateIdentityPartitionProjections(location.toString(),
withColumns("message", "level"), inputRecords);
+ validateIdentityPartitionProjections(location.toString(),
withColumns("level", "date"), inputRecords);
+ // full projection
+ validateIdentityPartitionProjections(location.toString(), LOG_SCHEMA,
inputRecords);
+ // out-of-order triplets
+ validateIdentityPartitionProjections(location.toString(),
withColumns("date", "level", "message"), inputRecords);
+ validateIdentityPartitionProjections(location.toString(),
withColumns("level", "date", "message"), inputRecords);
+ validateIdentityPartitionProjections(location.toString(),
withColumns("date", "message", "level"), inputRecords);
+ validateIdentityPartitionProjections(location.toString(),
withColumns("level", "message", "date"), inputRecords);
+ validateIdentityPartitionProjections(location.toString(),
withColumns("message", "date", "level"), inputRecords);
+ validateIdentityPartitionProjections(location.toString(),
withColumns("message", "level", "date"), inputRecords);
+ }
+
+ private static Schema withColumns(String... names) {
+ Map<String, Integer> indexByName =
TypeUtil.indexByName(LOG_SCHEMA.asStruct());
+ Set<Integer> projectedIds = Sets.newHashSet();
+ for (String name : names) {
+ projectedIds.add(indexByName.get(name));
+ }
+ return TypeUtil.select(LOG_SCHEMA, projectedIds);
+ }
+
+ private void validateIdentityPartitionProjections(
+ String tablePath, Schema projectedSchema, List<Record> inputRecords)
throws Exception {
+ Job job = Job.getInstance(conf);
+ IcebergInputFormat.ConfigBuilder configBuilder =
IcebergInputFormat.configure(job);
+ configBuilder
+ .readFrom(tablePath)
+ .project(projectedSchema);
+ List<Record> actualRecords = readRecords(job.getConfiguration());
+
+ Set<String> fieldNames =
TypeUtil.indexByName(projectedSchema.asStruct()).keySet();
+ for (int pos = 0; pos < inputRecords.size(); pos++) {
+ Record inputRecord = inputRecords.get(pos);
+ Record actualRecord = actualRecords.get(pos);
+ Assert.assertEquals("Projected schema should match",
projectedSchema.asStruct(), actualRecord.struct());
+ for (String name : fieldNames) {
+ Assert.assertEquals(
+ "Projected field " + name + " should match",
inputRecord.getField(name), actualRecord.getField(name));
+ }
+ }
+ }
+
+ @Test
+ public void testSnapshotReads() throws Exception {
+ File location = temp.newFolder(format.name());
+ Assert.assertTrue(location.delete());
+ Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(),
+
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
+ location.toString());
+ List<Record> expectedRecords = RandomGenericData.generate(table.schema(),
1, 0L);
+ table.newAppend()
+ .appendFile(writeFile(table, null, format, expectedRecords))
+ .commit();
+ long snapshotId = table.currentSnapshot().snapshotId();
+ table.newAppend()
+ .appendFile(writeFile(table, null, format,
RandomGenericData.generate(table.schema(), 1, 0L)))
+ .commit();
+
+ Job job = Job.getInstance(conf);
+ IcebergInputFormat.ConfigBuilder configBuilder =
IcebergInputFormat.configure(job);
+ configBuilder
+ .readFrom(location.toString())
+ .snapshotId(snapshotId);
+
+ validate(job, expectedRecords);
+ }
+
+ @Test
+ public void testLocality() throws Exception {
+ File location = temp.newFolder(format.name());
+ Assert.assertTrue(location.delete());
+ Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(),
+
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
+ location.toString());
+ List<Record> expectedRecords = RandomGenericData.generate(table.schema(),
1, 0L);
+ table.newAppend()
+ .appendFile(writeFile(table, null, format, expectedRecords))
+ .commit();
+ Job job = Job.getInstance(conf);
+ IcebergInputFormat.ConfigBuilder configBuilder =
IcebergInputFormat.configure(job);
+ configBuilder.readFrom(location.toString());
+
+ for (InputSplit split : splits(job.getConfiguration())) {
+ Assert.assertArrayEquals(IcebergInputFormat.IcebergSplit.ANYWHERE,
split.getLocations());
+ }
+
+ configBuilder.preferLocality();
+ for (InputSplit split : splits(job.getConfiguration())) {
+ Assert.assertArrayEquals(new String[]{"localhost"},
split.getLocations());
+ }
+ }
+
+ public static class HadoopCatalogFunc implements Function<Configuration,
Catalog> {
+ @Override
+ public Catalog apply(Configuration conf) {
+ return new HadoopCatalog(conf, conf.get("warehouse.location"));
+ }
+ }
+
+ @Test
+ public void testCustomCatalog() throws Exception {
+ conf = new Configuration();
+ conf.set("warehouse.location",
temp.newFolder("hadoop_catalog").getAbsolutePath());
+
+ Catalog catalog = new HadoopCatalogFunc().apply(conf);
+ TableIdentifier tableIdentifier = TableIdentifier.of("db", "t");
+ Table table = catalog.createTable(tableIdentifier, SCHEMA, SPEC,
+
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()));
+ List<Record> expectedRecords = RandomGenericData.generate(table.schema(),
1, 0L);
+ expectedRecords.get(0).set(2, "2020-03-20");
+ DataFile dataFile = writeFile(table, Row.of("2020-03-20", 0), format,
expectedRecords);
+ table.newAppend()
+ .appendFile(dataFile)
+ .commit();
+
+ Job job = Job.getInstance(conf);
+ IcebergInputFormat.ConfigBuilder configBuilder =
IcebergInputFormat.configure(job);
+ configBuilder
+ .catalogFunc(HadoopCatalogFunc.class)
+ .readFrom(tableIdentifier.toString());
+ validate(job, expectedRecords);
+ }
+
+ private static void validate(Job job, List<Record> expectedRecords) {
+ List<Record> actualRecords = readRecords(job.getConfiguration());
+ Assert.assertEquals(expectedRecords, actualRecords);
+ }
+
+ private static <T> List<InputSplit> splits(Configuration conf) {
+ TaskAttemptContext context = new TaskAttemptContextImpl(conf, new
TaskAttemptID());
+ IcebergInputFormat<T> icebergInputFormat = new IcebergInputFormat<>();
+ return icebergInputFormat.getSplits(context);
+ }
+
+ private static <T> List<T> readRecords(Configuration conf) {
+ TaskAttemptContext context = new TaskAttemptContextImpl(conf, new
TaskAttemptID());
+ IcebergInputFormat<T> icebergInputFormat = new IcebergInputFormat<>();
+ List<InputSplit> splits = icebergInputFormat.getSplits(context);
+ return
+ FluentIterable
+ .from(splits)
+ .transformAndConcat(split -> readRecords(icebergInputFormat,
split, context))
+ .toList();
+ }
+
+ private static <T> Iterable<T> readRecords(
+ IcebergInputFormat<T> inputFormat, InputSplit split, TaskAttemptContext
context) {
+ RecordReader<Void, T> recordReader = inputFormat.createRecordReader(split,
context);
+ List<T> records = new ArrayList<>();
+ try {
+ recordReader.initialize(split, context);
+ while (recordReader.nextKeyValue()) {
+ records.add(recordReader.getCurrentValue());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return records;
+ }
+
+ private DataFile writeFile(
+ Table table, StructLike partitionData, FileFormat fileFormat,
List<Record> records) throws IOException {
+ File file = temp.newFile();
+ Assert.assertTrue(file.delete());
+ FileAppender<Record> appender;
+ switch (fileFormat) {
+ case AVRO:
+ appender = Avro.write(Files.localOutput(file))
+ .schema(table.schema())
+ .createWriterFunc(DataWriter::create)
+ .named(fileFormat.name())
+ .build();
+ break;
+ case PARQUET:
+ appender = Parquet.write(Files.localOutput(file))
+ .schema(table.schema())
+ .createWriterFunc(GenericParquetWriter::buildWriter)
+ .named(fileFormat.name())
+ .build();
+ break;
+ case ORC:
+ appender = ORC.write(Files.localOutput(file))
+ .schema(table.schema())
+ .createWriterFunc(GenericOrcWriter::buildWriter)
+ .build();
+ break;
+ default:
+ throw new UnsupportedOperationException("Cannot write format: " +
fileFormat);
+ }
+
+ try {
+ appender.addAll(records);
+ } finally {
+ appender.close();
+ }
+
+ DataFiles.Builder builder = DataFiles.builder(table.spec())
+ .withPath(file.toString())
+ .withFormat(format)
+ .withFileSizeInBytes(file.length())
+ .withMetrics(appender.metrics());
+ if (partitionData != null) {
+ builder.withPartition(partitionData);
+ }
+ return builder.build();
+ }
+}
diff --git a/settings.gradle b/settings.gradle
index 854b3b2..0c9e592 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -22,6 +22,7 @@ include 'api'
include 'common'
include 'core'
include 'data'
+include 'mr'
include 'orc'
include 'parquet'
include 'spark'
@@ -34,6 +35,7 @@ project(':api').name = 'iceberg-api'
project(':common').name = 'iceberg-common'
project(':core').name = 'iceberg-core'
project(':data').name = 'iceberg-data'
+project(':mr').name = 'iceberg-mr'
project(':orc').name = 'iceberg-orc'
project(':arrow').name = 'iceberg-arrow'
project(':parquet').name = 'iceberg-parquet'
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index ea189d3..0d25fb9 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -21,15 +21,11 @@ package org.apache.iceberg.spark.source;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.Serializable;
-import java.util.Arrays;
import java.util.List;
import java.util.Locale;
-import java.util.Set;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CombinedScanTask;
@@ -44,6 +40,7 @@ import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.spark.SparkFilters;
@@ -353,20 +350,7 @@ class Reader implements DataSourceReader,
SupportsPushDownFilters, SupportsPushD
}
Configuration conf =
SparkSession.active().sparkContext().hadoopConfiguration();
- Set<String> locations = Sets.newHashSet();
- for (FileScanTask f : task.files()) {
- Path path = new Path(f.file().path().toString());
- try {
- FileSystem fs = path.getFileSystem(conf);
- for (BlockLocation b : fs.getFileBlockLocations(path, f.start(),
f.length())) {
- locations.addAll(Arrays.asList(b.getHosts()));
- }
- } catch (IOException ioe) {
- LOG.warn("Failed to get block locations for path {}", path, ioe);
- }
- }
-
- return locations.toArray(new String[0]);
+ return Util.blockLocations(task, conf);
}
}