This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new b47e977 Avro: Support partition values using a constants map (#896)
b47e977 is described below
commit b47e97781b82109432612a0f36ebb0d0ce270d8b
Author: Ryan Blue <[email protected]>
AuthorDate: Thu Apr 9 09:28:53 2020 -0700
Avro: Support partition values using a constants map (#896)
---
.../java/org/apache/iceberg/ManifestWriter.java | 2 +-
.../java/org/apache/iceberg/avro/ValueReaders.java | 36 ++++++++-
.../java/org/apache/iceberg/util/ByteBuffers.java | 2 +-
.../org/apache/iceberg/util/PartitionUtil.java | 48 ++++++++++++
.../org/apache/iceberg/data/TableScanIterable.java | 6 +-
.../org/apache/iceberg/data/avro/DataReader.java | 19 +++--
.../apache/iceberg/data/avro/GenericReaders.java | 9 ++-
.../apache/iceberg/spark/data/SparkAvroReader.java | 46 +++++++-----
.../iceberg/spark/data/SparkValueReaders.java | 86 +++++++++++++---------
.../apache/iceberg/spark/source/RowDataReader.java | 50 ++++++++-----
10 files changed, 219 insertions(+), 85 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index fffac1f..b7ed80d 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -151,7 +151,7 @@ public class ManifestWriter implements
FileAppender<DataFile> {
addEntry(reused.wrapAppend(snapshotId, addedFile));
}
- public void add(ManifestEntry entry) {
+ void add(ManifestEntry entry) {
addEntry(reused.wrapAppend(snapshotId, entry.file()));
}
diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
index 14dc869..cbbf77c 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
+++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
@@ -40,6 +40,8 @@ import org.apache.avro.io.Decoder;
import org.apache.avro.io.ResolvingDecoder;
import org.apache.avro.util.Utf8;
import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
import static java.util.Collections.emptyIterator;
@@ -561,12 +563,32 @@ public class ValueReaders {
public abstract static class StructReader<S> implements ValueReader<S> {
private final ValueReader<?>[] readers;
+ private final int[] positions;
+ private final Object[] constants;
protected StructReader(List<ValueReader<?>> readers) {
- this.readers = new ValueReader[readers.size()];
- for (int i = 0; i < this.readers.length; i += 1) {
- this.readers[i] = readers.get(i);
+ this.readers = readers.toArray(new ValueReader[0]);
+ this.positions = new int[0];
+ this.constants = new Object[0];
+ }
+
+ protected StructReader(List<ValueReader<?>> readers, Types.StructType
struct, Map<Integer, ?> idToConstant) {
+ this.readers = readers.toArray(new ValueReader[0]);
+
+ List<Types.NestedField> fields = struct.fields();
+ List<Integer> positionList =
Lists.newArrayListWithCapacity(fields.size());
+ List<Object> constantList =
Lists.newArrayListWithCapacity(fields.size());
+ for (int pos = 0; pos < fields.size(); pos += 1) {
+ Types.NestedField field = fields.get(pos);
+ Object constant = idToConstant.get(field.fieldId());
+ if (constant != null) {
+ positionList.add(pos);
+ constantList.add(prepareConstant(field.type(), constant));
+ }
}
+
+ this.positions =
positionList.stream().mapToInt(Integer::intValue).toArray();
+ this.constants = constantList.toArray();
}
protected abstract S reuseOrCreate(Object reuse);
@@ -575,6 +597,10 @@ public class ValueReaders {
protected abstract void set(S struct, int pos, Object value);
+ protected Object prepareConstant(Type type, Object value) {
+ return value;
+ }
+
public ValueReader<?> reader(int pos) {
return readers[pos];
}
@@ -597,6 +623,10 @@ public class ValueReaders {
}
}
+ for (int i = 0; i < positions.length; i += 1) {
+ set(struct, positions[i], constants[i]);
+ }
+
return struct;
}
}
diff --git a/core/src/main/java/org/apache/iceberg/util/ByteBuffers.java
b/core/src/main/java/org/apache/iceberg/util/ByteBuffers.java
index da692f7..0c6b26a 100644
--- a/core/src/main/java/org/apache/iceberg/util/ByteBuffers.java
+++ b/core/src/main/java/org/apache/iceberg/util/ByteBuffers.java
@@ -41,7 +41,7 @@ public class ByteBuffers {
}
} else {
byte[] bytes = new byte[buffer.remaining()];
- buffer.get(bytes);
+ buffer.asReadOnlyBuffer().get(bytes);
return bytes;
}
}
diff --git a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
new file mode 100644
index 0000000..9a2aa99
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+
+public class PartitionUtil {
+ private PartitionUtil() {
+ }
+
+ public static Map<Integer, ?> constantsMap(FileScanTask task) {
+ return constantsMap(task.spec(), task.file().partition());
+ }
+
+ private static Map<Integer, ?> constantsMap(PartitionSpec spec, StructLike
partitionData) {
+ // use java.util.HashMap because partition data may contain null values
+ Map<Integer, Object> idToConstant = new HashMap<>();
+ List<PartitionField> fields = spec.fields();
+ for (int pos = 0; pos < fields.size(); pos += 1) {
+ PartitionField field = fields.get(pos);
+ idToConstant.put(field.sourceId(), partitionData.get(pos, Object.class));
+ }
+ return idToConstant;
+ }
+}
diff --git a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
index 4ce36cd..baa2320 100644
--- a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
+++ b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
@@ -26,6 +26,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
+import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
@@ -45,6 +46,7 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.util.PartitionUtil;
class TableScanIterable extends CloseableGroup implements
CloseableIterable<Record> {
private final TableOperations ops;
@@ -74,13 +76,15 @@ class TableScanIterable extends CloseableGroup implements
CloseableIterable<Reco
private CloseableIterable<Record> open(FileScanTask task) {
InputFile input = ops.io().newInputFile(task.file().path().toString());
+ Map<Integer, ?> partition = PartitionUtil.constantsMap(task);
// TODO: join to partition data from the manifest file
switch (task.file().format()) {
case AVRO:
Avro.ReadBuilder avro = Avro.read(input)
.project(projection)
- .createReaderFunc(DataReader::create)
+ .createReaderFunc(
+ avroSchema -> DataReader.create(projection, avroSchema,
partition))
.split(task.start(), task.length());
if (reuseContainers) {
diff --git a/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java
b/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java
index f975aa8..1d226c3 100644
--- a/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java
+++ b/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.data.avro;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.MapMaker;
import java.io.IOException;
import java.util.HashMap;
@@ -45,7 +46,12 @@ public class DataReader<T> implements DatumReader<T> {
ThreadLocal.withInitial(() -> new MapMaker().weakKeys().makeMap());
public static <D> DataReader<D> create(org.apache.iceberg.Schema
expectedSchema, Schema readSchema) {
- return new DataReader<>(expectedSchema, readSchema);
+ return create(expectedSchema, readSchema, ImmutableMap.of());
+ }
+
+ public static <D> DataReader<D> create(org.apache.iceberg.Schema
expectedSchema, Schema readSchema,
+ Map<Integer, ?> idToConstant) {
+ return new DataReader<>(expectedSchema, readSchema, idToConstant);
}
private final Schema readSchema;
@@ -53,9 +59,10 @@ public class DataReader<T> implements DatumReader<T> {
private Schema fileSchema = null;
@SuppressWarnings("unchecked")
- private DataReader(org.apache.iceberg.Schema expectedSchema, Schema
readSchema) {
+ private DataReader(org.apache.iceberg.Schema expectedSchema, Schema
readSchema, Map<Integer, ?> idToConstant) {
this.readSchema = readSchema;
- this.reader = (ValueReader<T>)
AvroSchemaWithTypeVisitor.visit(expectedSchema, readSchema, new ReadBuilder());
+ this.reader = (ValueReader<T>) AvroSchemaWithTypeVisitor
+ .visit(expectedSchema, readSchema, new ReadBuilder(idToConstant));
}
@Override
@@ -96,14 +103,16 @@ public class DataReader<T> implements DatumReader<T> {
}
private static class ReadBuilder extends
AvroSchemaWithTypeVisitor<ValueReader<?>> {
+ private final Map<Integer, ?> idToConstant;
- private ReadBuilder() {
+ private ReadBuilder(Map<Integer, ?> idToConstant) {
+ this.idToConstant = idToConstant;
}
@Override
public ValueReader<?> record(Types.StructType struct, Schema record,
List<String> names, List<ValueReader<?>>
fields) {
- return GenericReaders.struct(struct, fields);
+ return GenericReaders.struct(struct, fields, idToConstant);
}
@Override
diff --git
a/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java
b/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java
index 14e4269..7502d15 100644
--- a/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java
+++ b/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java
@@ -28,6 +28,7 @@ import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.List;
+import java.util.Map;
import org.apache.avro.io.Decoder;
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.avro.ValueReaders;
@@ -55,8 +56,8 @@ class GenericReaders {
return TimestamptzReader.INSTANCE;
}
- static ValueReader<Record> struct(StructType struct, List<ValueReader<?>>
readers) {
- return new GenericRecordReader(readers, struct);
+ static ValueReader<Record> struct(StructType struct, List<ValueReader<?>>
readers, Map<Integer, ?> idToConstant) {
+ return new GenericRecordReader(readers, struct, idToConstant);
}
private static final OffsetDateTime EPOCH =
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
@@ -113,8 +114,8 @@ class GenericReaders {
private static class GenericRecordReader extends
ValueReaders.StructReader<Record> {
private final StructType structType;
- private GenericRecordReader(List<ValueReader<?>> readers, StructType
struct) {
- super(readers);
+ private GenericRecordReader(List<ValueReader<?>> readers, StructType
struct, Map<Integer, ?> idToConstant) {
+ super(readers, struct, idToConstant);
this.structType = struct;
}
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java
index 88977bd..215a265 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.spark.data;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.MapMaker;
import java.io.IOException;
import java.util.HashMap;
@@ -31,10 +32,12 @@ import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.ResolvingDecoder;
-import org.apache.iceberg.avro.AvroSchemaVisitor;
+import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor;
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.avro.ValueReaders;
import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -47,10 +50,15 @@ public class SparkAvroReader implements
DatumReader<InternalRow> {
private final ValueReader<InternalRow> reader;
private Schema fileSchema = null;
+ public SparkAvroReader(org.apache.iceberg.Schema expectedSchema, Schema
readSchema) {
+ this(expectedSchema, readSchema, ImmutableMap.of());
+ }
+
@SuppressWarnings("unchecked")
- public SparkAvroReader(Schema readSchema) {
+ public SparkAvroReader(org.apache.iceberg.Schema expectedSchema, Schema
readSchema, Map<Integer, ?> constants) {
this.readSchema = readSchema;
- this.reader = (ValueReader<InternalRow>)
AvroSchemaVisitor.visit(readSchema, new ReadBuilder());
+ this.reader = (ValueReader<InternalRow>) AvroSchemaWithTypeVisitor
+ .visit(expectedSchema, readSchema, new ReadBuilder(constants));
}
@Override
@@ -90,38 +98,42 @@ public class SparkAvroReader implements
DatumReader<InternalRow> {
}
}
- private static class ReadBuilder extends AvroSchemaVisitor<ValueReader<?>> {
- private ReadBuilder() {
+ private static class ReadBuilder extends
AvroSchemaWithTypeVisitor<ValueReader<?>> {
+ private final Map<Integer, ?> idToConstant;
+
+ private ReadBuilder(Map<Integer, ?> idToConstant) {
+ this.idToConstant = idToConstant;
}
@Override
- public ValueReader<?> record(Schema record, List<String> names,
List<ValueReader<?>> fields) {
- return SparkValueReaders.struct(fields);
+ public ValueReader<?> record(Types.StructType expected, Schema record,
List<String> names,
+ List<ValueReader<?>> fields) {
+ return SparkValueReaders.struct(fields, expected, idToConstant);
}
@Override
- public ValueReader<?> union(Schema union, List<ValueReader<?>> options) {
+ public ValueReader<?> union(Type expected, Schema union,
List<ValueReader<?>> options) {
return ValueReaders.union(options);
}
@Override
- public ValueReader<?> array(Schema array, ValueReader<?> elementReader) {
- LogicalType logical = array.getLogicalType();
- if (logical != null && "map".equals(logical.getName())) {
- ValueReader<?>[] keyValueReaders = ((SparkValueReaders.StructReader)
elementReader).readers();
- return SparkValueReaders.arrayMap(keyValueReaders[0],
keyValueReaders[1]);
- }
-
+ public ValueReader<?> array(Types.ListType expected, Schema array,
ValueReader<?> elementReader) {
return SparkValueReaders.array(elementReader);
}
@Override
- public ValueReader<?> map(Schema map, ValueReader<?> valueReader) {
+ public ValueReader<?> map(Types.MapType expected, Schema map,
+ ValueReader<?> keyReader, ValueReader<?>
valueReader) {
+ return SparkValueReaders.arrayMap(keyReader, valueReader);
+ }
+
+ @Override
+ public ValueReader<?> map(Types.MapType expected, Schema map,
ValueReader<?> valueReader) {
return SparkValueReaders.map(SparkValueReaders.strings(), valueReader);
}
@Override
- public ValueReader<?> primitive(Schema primitive) {
+ public ValueReader<?> primitive(Type.PrimitiveType expected, Schema
primitive) {
LogicalType logicalType = primitive.getLogicalType();
if (logicalType != null) {
switch (logicalType.getName()) {
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java
index 2670cd4..b799fe8 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java
@@ -27,12 +27,16 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
-import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.io.Decoder;
-import org.apache.avro.io.ResolvingDecoder;
import org.apache.avro.util.Utf8;
import org.apache.iceberg.avro.ValueReader;
+import org.apache.iceberg.avro.ValueReaders;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ByteBuffers;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
@@ -74,8 +78,9 @@ public class SparkValueReaders {
return new MapReader(keyReader, valueReader);
}
- static ValueReader<InternalRow> struct(List<ValueReader<?>> readers) {
- return new StructReader(readers);
+ static ValueReader<InternalRow> struct(List<ValueReader<?>> readers,
Types.StructType struct,
+ Map<Integer, ?> idToConstant) {
+ return new StructReader(readers, struct, idToConstant);
}
private static class StringReader implements ValueReader<UTF8String> {
@@ -253,46 +258,59 @@ public class SparkValueReaders {
}
}
- static class StructReader implements ValueReader<InternalRow> {
- private final ValueReader<?>[] readers;
+ static class StructReader extends ValueReaders.StructReader<InternalRow> {
+ private final int numFields;
- private StructReader(List<ValueReader<?>> readers) {
- this.readers = new ValueReader[readers.size()];
- for (int i = 0; i < this.readers.length; i += 1) {
- this.readers[i] = readers.get(i);
- }
+ protected StructReader(List<ValueReader<?>> readers, Types.StructType
struct, Map<Integer, ?> idToConstant) {
+ super(readers, struct, idToConstant);
+ this.numFields = readers.size();
}
- ValueReader<?>[] readers() {
- return readers;
+ @Override
+ protected InternalRow reuseOrCreate(Object reuse) {
+ if (reuse instanceof GenericInternalRow && ((GenericInternalRow)
reuse).numFields() == numFields) {
+ return (InternalRow) reuse;
+ }
+ return new GenericInternalRow(numFields);
}
@Override
- public InternalRow read(Decoder decoder, Object reuse) throws IOException {
- GenericInternalRow row = new GenericInternalRow(readers.length);
- if (decoder instanceof ResolvingDecoder) {
- // this may not set all of the fields. nulls are set by default.
- for (Schema.Field field : ((ResolvingDecoder)
decoder).readFieldOrder()) {
- Object value = readers[field.pos()].read(decoder, null);
- if (value != null) {
- row.update(field.pos(), value);
- } else {
- row.setNullAt(field.pos());
- }
- }
+ protected Object get(InternalRow struct, int pos) {
+ return null;
+ }
+ @Override
+ protected void set(InternalRow struct, int pos, Object value) {
+ if (value != null) {
+ struct.update(pos, value);
} else {
- for (int i = 0; i < readers.length; i += 1) {
- Object value = readers[i].read(decoder, null);
- if (value != null) {
- row.update(i, value);
- } else {
- row.setNullAt(i);
- }
- }
+ struct.setNullAt(pos);
}
+ }
- return row;
+ @Override
+ protected Object prepareConstant(Type type, Object value) {
+ switch (type.typeId()) {
+ case DECIMAL:
+ return Decimal.apply((BigDecimal) value);
+ case STRING:
+ if (value instanceof Utf8) {
+ Utf8 utf8 = (Utf8) value;
+ return UTF8String.fromBytes(utf8.getBytes(), 0,
utf8.getByteLength());
+ }
+ return UTF8String.fromString(value.toString());
+ case FIXED:
+ if (value instanceof byte[]) {
+ return value;
+ } else if (value instanceof GenericData.Fixed) {
+ return ((GenericData.Fixed) value).bytes();
+ }
+ return ByteBuffers.toByteArray((ByteBuffer) value);
+ case BINARY:
+ return ByteBuffers.toByteArray((ByteBuffer) value);
+ default:
+ }
+ return value;
}
}
}
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index 3e4c213..61c6fa2 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -20,14 +20,18 @@
package org.apache.iceberg.spark.source;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataTask;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -45,6 +49,7 @@ import org.apache.iceberg.spark.data.SparkOrcReader;
import org.apache.iceberg.spark.data.SparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.Attribute;
@@ -55,6 +60,7 @@ import org.apache.spark.sql.types.StructType;
import scala.collection.JavaConverters;
class RowDataReader extends BaseDataReader<InternalRow> {
+ private static final Set<FileFormat> SUPPORTS_CONSTANTS =
Sets.newHashSet(FileFormat.AVRO);
// for some reason, the apply method can't be called from Java without
reflection
private static final DynMethods.UnboundMethod APPLY_PROJECTION =
DynMethods.builder("apply")
.impl(UnsafeProjection.class, InternalRow.class)
@@ -95,26 +101,31 @@ class RowDataReader extends BaseDataReader<InternalRow> {
Iterator<InternalRow> iter;
if (hasJoinedPartitionColumns) {
- // schema used to read data files
- Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns);
- Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns);
- PartitionRowConverter convertToRow = new
PartitionRowConverter(partitionSchema, spec);
- JoinedRow joined = new JoinedRow();
-
- InternalRow partition = convertToRow.apply(file.partition());
- joined.withRight(partition);
-
- // create joined rows and project from the joined schema to the final
schema
- iterSchema = TypeUtil.join(readSchema, partitionSchema);
- iter = Iterators.transform(open(task, readSchema), joined::withLeft);
+ if (SUPPORTS_CONSTANTS.contains(file.format())) {
+ iterSchema = requiredSchema;
+ iter = open(task, requiredSchema, PartitionUtil.constantsMap(task));
+ } else {
+ // schema used to read data files
+ Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns);
+ Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns);
+ PartitionRowConverter convertToRow = new
PartitionRowConverter(partitionSchema, spec);
+ JoinedRow joined = new JoinedRow();
+
+ InternalRow partition = convertToRow.apply(file.partition());
+ joined.withRight(partition);
+
+ // create joined rows and project from the joined schema to the final
schema
+ iterSchema = TypeUtil.join(readSchema, partitionSchema);
+ iter = Iterators.transform(open(task, readSchema, ImmutableMap.of()),
joined::withLeft);
+ }
} else if (hasExtraFilterColumns) {
// add projection to the final schema
iterSchema = requiredSchema;
- iter = open(task, requiredSchema);
+ iter = open(task, requiredSchema, ImmutableMap.of());
} else {
// return the base iterator
iterSchema = finalSchema;
- iter = open(task, finalSchema);
+ iter = open(task, finalSchema, ImmutableMap.of());
}
// TODO: remove the projection by reporting the iterator's schema back to
Spark
@@ -123,7 +134,7 @@ class RowDataReader extends BaseDataReader<InternalRow> {
APPLY_PROJECTION.bind(projection(finalSchema, iterSchema))::invoke);
}
- private Iterator<InternalRow> open(FileScanTask task, Schema readSchema) {
+ private Iterator<InternalRow> open(FileScanTask task, Schema readSchema,
Map<Integer, ?> idToConstant) {
CloseableIterable<InternalRow> iter;
if (task.isDataTask()) {
iter = newDataIterable(task.asDataTask(), readSchema);
@@ -137,7 +148,7 @@ class RowDataReader extends BaseDataReader<InternalRow> {
break;
case AVRO:
- iter = newAvroIterable(location, task, readSchema);
+ iter = newAvroIterable(location, task, readSchema, idToConstant);
break;
case ORC:
@@ -158,12 +169,13 @@ class RowDataReader extends BaseDataReader<InternalRow> {
private CloseableIterable<InternalRow> newAvroIterable(
InputFile location,
FileScanTask task,
- Schema readSchema) {
+ Schema projection,
+ Map<Integer, ?> idToConstant) {
return Avro.read(location)
.reuseContainers()
- .project(readSchema)
+ .project(projection)
.split(task.start(), task.length())
- .createReaderFunc(SparkAvroReader::new)
+ .createReaderFunc(readSchema -> new SparkAvroReader(projection,
readSchema, idToConstant))
.build();
}