This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new b47e977  Avro: Support partition values using a constants map (#896)
b47e977 is described below

commit b47e97781b82109432612a0f36ebb0d0ce270d8b
Author: Ryan Blue <[email protected]>
AuthorDate: Thu Apr 9 09:28:53 2020 -0700

    Avro: Support partition values using a constants map (#896)
---
 .../java/org/apache/iceberg/ManifestWriter.java    |  2 +-
 .../java/org/apache/iceberg/avro/ValueReaders.java | 36 ++++++++-
 .../java/org/apache/iceberg/util/ByteBuffers.java  |  2 +-
 .../org/apache/iceberg/util/PartitionUtil.java     | 48 ++++++++++++
 .../org/apache/iceberg/data/TableScanIterable.java |  6 +-
 .../org/apache/iceberg/data/avro/DataReader.java   | 19 +++--
 .../apache/iceberg/data/avro/GenericReaders.java   |  9 ++-
 .../apache/iceberg/spark/data/SparkAvroReader.java | 46 +++++++-----
 .../iceberg/spark/data/SparkValueReaders.java      | 86 +++++++++++++---------
 .../apache/iceberg/spark/source/RowDataReader.java | 50 ++++++++-----
 10 files changed, 219 insertions(+), 85 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java 
b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index fffac1f..b7ed80d 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -151,7 +151,7 @@ public class ManifestWriter implements 
FileAppender<DataFile> {
     addEntry(reused.wrapAppend(snapshotId, addedFile));
   }
 
-  public void add(ManifestEntry entry) {
+  void add(ManifestEntry entry) {
     addEntry(reused.wrapAppend(snapshotId, entry.file()));
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java 
b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
index 14dc869..cbbf77c 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
+++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
@@ -40,6 +40,8 @@ import org.apache.avro.io.Decoder;
 import org.apache.avro.io.ResolvingDecoder;
 import org.apache.avro.util.Utf8;
 import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
 
 import static java.util.Collections.emptyIterator;
 
@@ -561,12 +563,32 @@ public class ValueReaders {
 
   public abstract static class StructReader<S> implements ValueReader<S> {
     private final ValueReader<?>[] readers;
+    private final int[] positions;
+    private final Object[] constants;
 
     protected StructReader(List<ValueReader<?>> readers) {
-      this.readers = new ValueReader[readers.size()];
-      for (int i = 0; i < this.readers.length; i += 1) {
-        this.readers[i] = readers.get(i);
+      this.readers = readers.toArray(new ValueReader[0]);
+      this.positions = new int[0];
+      this.constants = new Object[0];
+    }
+
+    protected StructReader(List<ValueReader<?>> readers, Types.StructType 
struct, Map<Integer, ?> idToConstant) {
+      this.readers = readers.toArray(new ValueReader[0]);
+
+      List<Types.NestedField> fields = struct.fields();
+      List<Integer> positionList = 
Lists.newArrayListWithCapacity(fields.size());
+      List<Object> constantList = 
Lists.newArrayListWithCapacity(fields.size());
+      for (int pos = 0; pos < fields.size(); pos += 1) {
+        Types.NestedField field = fields.get(pos);
+        Object constant = idToConstant.get(field.fieldId());
+        if (constant != null) {
+          positionList.add(pos);
+          constantList.add(prepareConstant(field.type(), constant));
+        }
       }
+
+      this.positions = 
positionList.stream().mapToInt(Integer::intValue).toArray();
+      this.constants = constantList.toArray();
     }
 
     protected abstract S reuseOrCreate(Object reuse);
@@ -575,6 +597,10 @@ public class ValueReaders {
 
     protected abstract void set(S struct, int pos, Object value);
 
+    protected Object prepareConstant(Type type, Object value) {
+      return value;
+    }
+
     public ValueReader<?> reader(int pos) {
       return readers[pos];
     }
@@ -597,6 +623,10 @@ public class ValueReaders {
         }
       }
 
+      for (int i = 0; i < positions.length; i += 1) {
+        set(struct, positions[i], constants[i]);
+      }
+
       return struct;
     }
   }
diff --git a/core/src/main/java/org/apache/iceberg/util/ByteBuffers.java 
b/core/src/main/java/org/apache/iceberg/util/ByteBuffers.java
index da692f7..0c6b26a 100644
--- a/core/src/main/java/org/apache/iceberg/util/ByteBuffers.java
+++ b/core/src/main/java/org/apache/iceberg/util/ByteBuffers.java
@@ -41,7 +41,7 @@ public class ByteBuffers {
       }
     } else {
       byte[] bytes = new byte[buffer.remaining()];
-      buffer.get(bytes);
+      buffer.asReadOnlyBuffer().get(bytes);
       return bytes;
     }
   }
diff --git a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java 
b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
new file mode 100644
index 0000000..9a2aa99
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+
+public class PartitionUtil {
+  private PartitionUtil() {
+  }
+
+  public static Map<Integer, ?> constantsMap(FileScanTask task) {
+    return constantsMap(task.spec(), task.file().partition());
+  }
+
+  private static Map<Integer, ?> constantsMap(PartitionSpec spec, StructLike 
partitionData) {
+    // use java.util.HashMap because partition data may contain null values
+    Map<Integer, Object> idToConstant = new HashMap<>();
+    List<PartitionField> fields = spec.fields();
+    for (int pos = 0; pos < fields.size(); pos += 1) {
+      PartitionField field = fields.get(pos);
+      idToConstant.put(field.sourceId(), partitionData.get(pos, Object.class));
+    }
+    return idToConstant;
+  }
+}
diff --git a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java 
b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
index 4ce36cd..baa2320 100644
--- a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
+++ b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
@@ -26,6 +26,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.FileScanTask;
@@ -45,6 +46,7 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.util.PartitionUtil;
 
 class TableScanIterable extends CloseableGroup implements 
CloseableIterable<Record> {
   private final TableOperations ops;
@@ -74,13 +76,15 @@ class TableScanIterable extends CloseableGroup implements 
CloseableIterable<Reco
 
   private CloseableIterable<Record> open(FileScanTask task) {
     InputFile input = ops.io().newInputFile(task.file().path().toString());
+    Map<Integer, ?> partition = PartitionUtil.constantsMap(task);
 
     // TODO: join to partition data from the manifest file
     switch (task.file().format()) {
       case AVRO:
         Avro.ReadBuilder avro = Avro.read(input)
             .project(projection)
-            .createReaderFunc(DataReader::create)
+            .createReaderFunc(
+                avroSchema -> DataReader.create(projection, avroSchema, 
partition))
             .split(task.start(), task.length());
 
         if (reuseContainers) {
diff --git a/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java 
b/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java
index f975aa8..1d226c3 100644
--- a/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java
+++ b/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java
@@ -19,6 +19,7 @@
 
 package org.apache.iceberg.data.avro;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.MapMaker;
 import java.io.IOException;
 import java.util.HashMap;
@@ -45,7 +46,12 @@ public class DataReader<T> implements DatumReader<T> {
       ThreadLocal.withInitial(() -> new MapMaker().weakKeys().makeMap());
 
   public static <D> DataReader<D> create(org.apache.iceberg.Schema 
expectedSchema, Schema readSchema) {
-    return new DataReader<>(expectedSchema, readSchema);
+    return create(expectedSchema, readSchema, ImmutableMap.of());
+  }
+
+  public static <D> DataReader<D> create(org.apache.iceberg.Schema 
expectedSchema, Schema readSchema,
+                                         Map<Integer, ?> idToConstant) {
+    return new DataReader<>(expectedSchema, readSchema, idToConstant);
   }
 
   private final Schema readSchema;
@@ -53,9 +59,10 @@ public class DataReader<T> implements DatumReader<T> {
   private Schema fileSchema = null;
 
   @SuppressWarnings("unchecked")
-  private DataReader(org.apache.iceberg.Schema expectedSchema, Schema 
readSchema) {
+  private DataReader(org.apache.iceberg.Schema expectedSchema, Schema 
readSchema, Map<Integer, ?> idToConstant) {
     this.readSchema = readSchema;
-    this.reader = (ValueReader<T>) 
AvroSchemaWithTypeVisitor.visit(expectedSchema, readSchema, new ReadBuilder());
+    this.reader = (ValueReader<T>) AvroSchemaWithTypeVisitor
+        .visit(expectedSchema, readSchema, new ReadBuilder(idToConstant));
   }
 
   @Override
@@ -96,14 +103,16 @@ public class DataReader<T> implements DatumReader<T> {
   }
 
   private static class ReadBuilder extends 
AvroSchemaWithTypeVisitor<ValueReader<?>> {
+    private final Map<Integer, ?> idToConstant;
 
-    private ReadBuilder() {
+    private ReadBuilder(Map<Integer, ?> idToConstant) {
+      this.idToConstant = idToConstant;
     }
 
     @Override
     public ValueReader<?> record(Types.StructType struct, Schema record,
                                  List<String> names, List<ValueReader<?>> 
fields) {
-      return GenericReaders.struct(struct, fields);
+      return GenericReaders.struct(struct, fields, idToConstant);
     }
 
     @Override
diff --git 
a/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java 
b/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java
index 14e4269..7502d15 100644
--- a/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java
+++ b/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java
@@ -28,6 +28,7 @@ import java.time.OffsetDateTime;
 import java.time.ZoneOffset;
 import java.time.temporal.ChronoUnit;
 import java.util.List;
+import java.util.Map;
 import org.apache.avro.io.Decoder;
 import org.apache.iceberg.avro.ValueReader;
 import org.apache.iceberg.avro.ValueReaders;
@@ -55,8 +56,8 @@ class GenericReaders {
     return TimestamptzReader.INSTANCE;
   }
 
-  static ValueReader<Record> struct(StructType struct, List<ValueReader<?>> 
readers) {
-    return new GenericRecordReader(readers, struct);
+  static ValueReader<Record> struct(StructType struct, List<ValueReader<?>> 
readers, Map<Integer, ?> idToConstant) {
+    return new GenericRecordReader(readers, struct, idToConstant);
   }
 
   private static final OffsetDateTime EPOCH = 
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
@@ -113,8 +114,8 @@ class GenericReaders {
   private static class GenericRecordReader extends 
ValueReaders.StructReader<Record> {
     private final StructType structType;
 
-    private GenericRecordReader(List<ValueReader<?>> readers, StructType 
struct) {
-      super(readers);
+    private GenericRecordReader(List<ValueReader<?>> readers, StructType 
struct, Map<Integer, ?> idToConstant) {
+      super(readers, struct, idToConstant);
       this.structType = struct;
     }
 
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java 
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java
index 88977bd..215a265 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java
@@ -19,6 +19,7 @@
 
 package org.apache.iceberg.spark.data;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.MapMaker;
 import java.io.IOException;
 import java.util.HashMap;
@@ -31,10 +32,12 @@ import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.ResolvingDecoder;
-import org.apache.iceberg.avro.AvroSchemaVisitor;
+import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor;
 import org.apache.iceberg.avro.ValueReader;
 import org.apache.iceberg.avro.ValueReaders;
 import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.catalyst.InternalRow;
 
 
@@ -47,10 +50,15 @@ public class SparkAvroReader implements 
DatumReader<InternalRow> {
   private final ValueReader<InternalRow> reader;
   private Schema fileSchema = null;
 
+  public SparkAvroReader(org.apache.iceberg.Schema expectedSchema, Schema 
readSchema) {
+    this(expectedSchema, readSchema, ImmutableMap.of());
+  }
+
   @SuppressWarnings("unchecked")
-  public SparkAvroReader(Schema readSchema) {
+  public SparkAvroReader(org.apache.iceberg.Schema expectedSchema, Schema 
readSchema, Map<Integer, ?> constants) {
     this.readSchema = readSchema;
-    this.reader = (ValueReader<InternalRow>) 
AvroSchemaVisitor.visit(readSchema, new ReadBuilder());
+    this.reader = (ValueReader<InternalRow>) AvroSchemaWithTypeVisitor
+        .visit(expectedSchema, readSchema, new ReadBuilder(constants));
   }
 
   @Override
@@ -90,38 +98,42 @@ public class SparkAvroReader implements 
DatumReader<InternalRow> {
     }
   }
 
-  private static class ReadBuilder extends AvroSchemaVisitor<ValueReader<?>> {
-    private ReadBuilder() {
+  private static class ReadBuilder extends 
AvroSchemaWithTypeVisitor<ValueReader<?>> {
+    private final Map<Integer, ?> idToConstant;
+
+    private ReadBuilder(Map<Integer, ?> idToConstant) {
+      this.idToConstant = idToConstant;
     }
 
     @Override
-    public ValueReader<?> record(Schema record, List<String> names, 
List<ValueReader<?>> fields) {
-      return SparkValueReaders.struct(fields);
+    public ValueReader<?> record(Types.StructType expected, Schema record, 
List<String> names,
+                                 List<ValueReader<?>> fields) {
+      return SparkValueReaders.struct(fields, expected, idToConstant);
     }
 
     @Override
-    public ValueReader<?> union(Schema union, List<ValueReader<?>> options) {
+    public ValueReader<?> union(Type expected, Schema union, 
List<ValueReader<?>> options) {
       return ValueReaders.union(options);
     }
 
     @Override
-    public ValueReader<?> array(Schema array, ValueReader<?> elementReader) {
-      LogicalType logical = array.getLogicalType();
-      if (logical != null && "map".equals(logical.getName())) {
-        ValueReader<?>[] keyValueReaders = ((SparkValueReaders.StructReader) 
elementReader).readers();
-        return SparkValueReaders.arrayMap(keyValueReaders[0], 
keyValueReaders[1]);
-      }
-
+    public ValueReader<?> array(Types.ListType expected, Schema array, 
ValueReader<?> elementReader) {
       return SparkValueReaders.array(elementReader);
     }
 
     @Override
-    public ValueReader<?> map(Schema map, ValueReader<?> valueReader) {
+    public ValueReader<?> map(Types.MapType expected, Schema map,
+                              ValueReader<?> keyReader, ValueReader<?> 
valueReader) {
+      return SparkValueReaders.arrayMap(keyReader, valueReader);
+    }
+
+    @Override
+    public ValueReader<?> map(Types.MapType expected, Schema map, 
ValueReader<?> valueReader) {
       return SparkValueReaders.map(SparkValueReaders.strings(), valueReader);
     }
 
     @Override
-    public ValueReader<?> primitive(Schema primitive) {
+    public ValueReader<?> primitive(Type.PrimitiveType expected, Schema 
primitive) {
       LogicalType logicalType = primitive.getLogicalType();
       if (logicalType != null) {
         switch (logicalType.getName()) {
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java 
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java
index 2670cd4..b799fe8 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java
@@ -27,12 +27,16 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
-import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.io.Decoder;
-import org.apache.avro.io.ResolvingDecoder;
 import org.apache.avro.util.Utf8;
 import org.apache.iceberg.avro.ValueReader;
+import org.apache.iceberg.avro.ValueReaders;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ByteBuffers;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
@@ -74,8 +78,9 @@ public class SparkValueReaders {
     return new MapReader(keyReader, valueReader);
   }
 
-  static ValueReader<InternalRow> struct(List<ValueReader<?>> readers) {
-    return new StructReader(readers);
+  static ValueReader<InternalRow> struct(List<ValueReader<?>> readers, 
Types.StructType struct,
+                                         Map<Integer, ?> idToConstant) {
+    return new StructReader(readers, struct, idToConstant);
   }
 
   private static class StringReader implements ValueReader<UTF8String> {
@@ -253,46 +258,59 @@ public class SparkValueReaders {
     }
   }
 
-  static class StructReader implements ValueReader<InternalRow> {
-    private final ValueReader<?>[] readers;
+  static class StructReader extends ValueReaders.StructReader<InternalRow> {
+    private final int numFields;
 
-    private StructReader(List<ValueReader<?>> readers) {
-      this.readers = new ValueReader[readers.size()];
-      for (int i = 0; i < this.readers.length; i += 1) {
-        this.readers[i] = readers.get(i);
-      }
+    protected StructReader(List<ValueReader<?>> readers, Types.StructType 
struct, Map<Integer, ?> idToConstant) {
+      super(readers, struct, idToConstant);
+      this.numFields = readers.size();
     }
 
-    ValueReader<?>[] readers() {
-      return readers;
+    @Override
+    protected InternalRow reuseOrCreate(Object reuse) {
+      if (reuse instanceof GenericInternalRow && ((GenericInternalRow) 
reuse).numFields() == numFields) {
+        return (InternalRow) reuse;
+      }
+      return new GenericInternalRow(numFields);
     }
 
     @Override
-    public InternalRow read(Decoder decoder, Object reuse) throws IOException {
-      GenericInternalRow row = new GenericInternalRow(readers.length);
-      if (decoder instanceof ResolvingDecoder) {
-        // this may not set all of the fields. nulls are set by default.
-        for (Schema.Field field : ((ResolvingDecoder) 
decoder).readFieldOrder()) {
-          Object value = readers[field.pos()].read(decoder, null);
-          if (value != null) {
-            row.update(field.pos(), value);
-          } else {
-            row.setNullAt(field.pos());
-          }
-        }
+    protected Object get(InternalRow struct, int pos) {
+      return null;
+    }
 
+    @Override
+    protected void set(InternalRow struct, int pos, Object value) {
+      if (value != null) {
+        struct.update(pos, value);
       } else {
-        for (int i = 0; i < readers.length; i += 1) {
-          Object value = readers[i].read(decoder, null);
-          if (value != null) {
-            row.update(i, value);
-          } else {
-            row.setNullAt(i);
-          }
-        }
+        struct.setNullAt(pos);
       }
+    }
 
-      return row;
+    @Override
+    protected Object prepareConstant(Type type, Object value) {
+      switch (type.typeId()) {
+        case DECIMAL:
+          return Decimal.apply((BigDecimal) value);
+        case STRING:
+          if (value instanceof Utf8) {
+            Utf8 utf8 = (Utf8) value;
+            return UTF8String.fromBytes(utf8.getBytes(), 0, 
utf8.getByteLength());
+          }
+          return UTF8String.fromString(value.toString());
+        case FIXED:
+          if (value instanceof byte[]) {
+            return value;
+          } else if (value instanceof GenericData.Fixed) {
+            return ((GenericData.Fixed) value).bytes();
+          }
+          return ByteBuffers.toByteArray((ByteBuffer) value);
+        case BINARY:
+          return ByteBuffers.toByteArray((ByteBuffer) value);
+        default:
+      }
+      return value;
     }
   }
 }
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java 
b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index 3e4c213..61c6fa2 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -20,14 +20,18 @@
 package org.apache.iceberg.spark.source;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataTask;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -45,6 +49,7 @@ import org.apache.iceberg.spark.data.SparkOrcReader;
 import org.apache.iceberg.spark.data.SparkParquetReaders;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
 import org.apache.spark.rdd.InputFileBlockHolder;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.Attribute;
@@ -55,6 +60,7 @@ import org.apache.spark.sql.types.StructType;
 import scala.collection.JavaConverters;
 
 class RowDataReader extends BaseDataReader<InternalRow> {
+  private static final Set<FileFormat> SUPPORTS_CONSTANTS = 
Sets.newHashSet(FileFormat.AVRO);
   // for some reason, the apply method can't be called from Java without 
reflection
   private static final DynMethods.UnboundMethod APPLY_PROJECTION = 
DynMethods.builder("apply")
       .impl(UnsafeProjection.class, InternalRow.class)
@@ -95,26 +101,31 @@ class RowDataReader extends BaseDataReader<InternalRow> {
     Iterator<InternalRow> iter;
 
     if (hasJoinedPartitionColumns) {
-      // schema used to read data files
-      Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns);
-      Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns);
-      PartitionRowConverter convertToRow = new 
PartitionRowConverter(partitionSchema, spec);
-      JoinedRow joined = new JoinedRow();
-
-      InternalRow partition = convertToRow.apply(file.partition());
-      joined.withRight(partition);
-
-      // create joined rows and project from the joined schema to the final 
schema
-      iterSchema = TypeUtil.join(readSchema, partitionSchema);
-      iter = Iterators.transform(open(task, readSchema), joined::withLeft);
+      if (SUPPORTS_CONSTANTS.contains(file.format())) {
+        iterSchema = requiredSchema;
+        iter = open(task, requiredSchema, PartitionUtil.constantsMap(task));
+      } else {
+        // schema used to read data files
+        Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns);
+        Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns);
+        PartitionRowConverter convertToRow = new 
PartitionRowConverter(partitionSchema, spec);
+        JoinedRow joined = new JoinedRow();
+
+        InternalRow partition = convertToRow.apply(file.partition());
+        joined.withRight(partition);
+
+        // create joined rows and project from the joined schema to the final 
schema
+        iterSchema = TypeUtil.join(readSchema, partitionSchema);
+        iter = Iterators.transform(open(task, readSchema, ImmutableMap.of()), 
joined::withLeft);
+      }
     } else if (hasExtraFilterColumns) {
       // add projection to the final schema
       iterSchema = requiredSchema;
-      iter = open(task, requiredSchema);
+      iter = open(task, requiredSchema, ImmutableMap.of());
     } else {
       // return the base iterator
       iterSchema = finalSchema;
-      iter = open(task, finalSchema);
+      iter = open(task, finalSchema, ImmutableMap.of());
     }
 
     // TODO: remove the projection by reporting the iterator's schema back to 
Spark
@@ -123,7 +134,7 @@ class RowDataReader extends BaseDataReader<InternalRow> {
         APPLY_PROJECTION.bind(projection(finalSchema, iterSchema))::invoke);
   }
 
-  private Iterator<InternalRow> open(FileScanTask task, Schema readSchema) {
+  private Iterator<InternalRow> open(FileScanTask task, Schema readSchema, 
Map<Integer, ?> idToConstant) {
     CloseableIterable<InternalRow> iter;
     if (task.isDataTask()) {
       iter = newDataIterable(task.asDataTask(), readSchema);
@@ -137,7 +148,7 @@ class RowDataReader extends BaseDataReader<InternalRow> {
           break;
 
         case AVRO:
-          iter = newAvroIterable(location, task, readSchema);
+          iter = newAvroIterable(location, task, readSchema, idToConstant);
           break;
 
         case ORC:
@@ -158,12 +169,13 @@ class RowDataReader extends BaseDataReader<InternalRow> {
   private CloseableIterable<InternalRow> newAvroIterable(
       InputFile location,
       FileScanTask task,
-      Schema readSchema) {
+      Schema projection,
+      Map<Integer, ?> idToConstant) {
     return Avro.read(location)
         .reuseContainers()
-        .project(readSchema)
+        .project(projection)
         .split(task.start(), task.length())
-        .createReaderFunc(SparkAvroReader::new)
+        .createReaderFunc(readSchema -> new SparkAvroReader(projection, 
readSchema, idToConstant))
         .build();
   }
 

Reply via email to