edgarRd commented on a change in pull request #199:
URL: https://github.com/apache/iceberg/pull/199#discussion_r433585356
##########
File path: orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java
##########
@@ -20,50 +20,271 @@
package org.apache.iceberg.orc;
import java.io.IOException;
-import java.util.Collections;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Metrics;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Queues;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.BooleanColumnStatistics;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.DateColumnStatistics;
+import org.apache.orc.DecimalColumnStatistics;
+import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.IntegerColumnStatistics;
import org.apache.orc.Reader;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.TimestampColumnStatistics;
+import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
public class OrcMetrics {
private OrcMetrics() {
}
+ private static final OffsetDateTime EPOCH =
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+ private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
public static Metrics fromInputFile(InputFile file) {
final Configuration config = (file instanceof HadoopInputFile) ?
((HadoopInputFile) file).getConf() : new Configuration();
return fromInputFile(file, config);
}
- public static Metrics fromInputFile(InputFile file, Configuration config) {
+ static Metrics fromInputFile(InputFile file, Configuration config) {
try (Reader orcReader = ORC.newFileReader(file, config)) {
-
- // TODO: implement rest of the methods for ORC metrics
- // https://github.com/apache/incubator-iceberg/pull/199
- return new Metrics(orcReader.getNumberOfRows(),
- null,
- null,
- Collections.emptyMap(),
- null,
- null);
+ return buildOrcMetrics(orcReader.getNumberOfRows(),
+ orcReader.getSchema(), orcReader.getStatistics());
} catch (IOException ioe) {
- throw new RuntimeIOException(ioe, "Failed to read footer of file: %s",
file);
+ throw new RuntimeIOException(ioe, "Failed to open file: %s",
file.location());
}
}
static Metrics fromWriter(Writer writer) {
- // TODO: implement rest of the methods for ORC metrics in
- // https://github.com/apache/incubator-iceberg/pull/199
- return new Metrics(writer.getNumberOfRows(),
- null,
- null,
- Collections.emptyMap(),
- null,
- null);
+ try {
+ return buildOrcMetrics(writer.getNumberOfRows(),
+ writer.getSchema(), writer.getStatistics());
+ } catch (IOException ioe) {
+ throw new RuntimeIOException(ioe, "Failed to get statistics from
writer");
+ }
+ }
+
+ private static Metrics buildOrcMetrics(final long numOfRows, final
TypeDescription orcSchema,
+ final ColumnStatistics[] colStats) {
+ final Schema schema = ORCSchemaUtil.convert(orcSchema);
+ final Set<TypeDescription> columnsInContainers =
findColumnsInContainers(schema, orcSchema);
+ Map<Integer, Long> columnSizes =
Maps.newHashMapWithExpectedSize(colStats.length);
+ Map<Integer, Long> valueCounts =
Maps.newHashMapWithExpectedSize(colStats.length);
+ Map<Integer, Long> nullCounts =
Maps.newHashMapWithExpectedSize(colStats.length);
+ Map<Integer, ByteBuffer> lowerBounds = Maps.newHashMap();
+ Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap();
+
+ for (int i = 0; i < colStats.length; i++) {
+ final ColumnStatistics colStat = colStats[i];
+ final TypeDescription orcCol = orcSchema.findSubtype(i);
+ final Optional<Types.NestedField> icebergColOpt =
ORCSchemaUtil.icebergID(orcCol)
+ .map(schema::findField);
+
+ if (icebergColOpt.isPresent()) {
+ final Types.NestedField icebergCol = icebergColOpt.get();
+ final int fieldId = icebergCol.fieldId();
+
+ columnSizes.put(fieldId, colStat.getBytesOnDisk());
+
+ if (!columnsInContainers.contains(orcCol)) {
+ if (colStat.hasNull()) {
+ nullCounts.put(fieldId, numOfRows - colStat.getNumberOfValues());
+ } else {
+ nullCounts.put(fieldId, 0L);
+ }
+
+ Optional<ByteBuffer> orcMin = (colStat.getNumberOfValues() > 0) ?
+ fromOrcMin(icebergCol, colStat) : Optional.empty();
+ orcMin.ifPresent(byteBuffer -> lowerBounds.put(icebergCol.fieldId(),
byteBuffer));
+ Optional<ByteBuffer> orcMax = (colStat.getNumberOfValues() > 0) ?
+ fromOrcMax(icebergCol, colStat) : Optional.empty();
+ orcMax.ifPresent(byteBuffer -> upperBounds.put(icebergCol.fieldId(),
byteBuffer));
+ }
+
+ // Since ORC does not track null values nor repeated ones, the value
count for columns in
+ // containers (maps, list) may be larger than what it actually is,
however these are not
+ // used in experssions right now. For such cases, we use the value
number of values
+ // directly stored in ORC.
+ valueCounts.put(fieldId, colStat.getNumberOfValues() +
nullCounts.getOrDefault(fieldId, 0L));
+ }
+ }
+
+ return new Metrics(numOfRows,
+ columnSizes,
+ valueCounts,
+ nullCounts,
+ lowerBounds,
+ upperBounds);
+ }
+
+
+ private static Optional<ByteBuffer> fromOrcMin(Types.NestedField column,
+ ColumnStatistics columnStats)
{
+ Object min = null;
+ if (columnStats instanceof IntegerColumnStatistics) {
+ min = ((IntegerColumnStatistics) columnStats).getMinimum();
+ if (column.type().typeId() == Type.TypeID.INTEGER) {
+ min = Math.toIntExact((long) min);
+ }
+ } else if (columnStats instanceof DoubleColumnStatistics) {
+ min = ((DoubleColumnStatistics) columnStats).getMinimum();
+ if (column.type().typeId() == Type.TypeID.FLOAT) {
+ min = ((Double) min).floatValue();
+ }
+ } else if (columnStats instanceof StringColumnStatistics) {
+ min = ((StringColumnStatistics) columnStats).getMinimum();
+ } else if (columnStats instanceof DecimalColumnStatistics) {
+ min = Optional
+ .ofNullable(((DecimalColumnStatistics) columnStats).getMinimum())
+ .map(minStats -> minStats.bigDecimalValue()
+ .setScale(((Types.DecimalType) column.type()).scale()))
+ .orElse(null);
+ } else if (columnStats instanceof DateColumnStatistics) {
+ min = Optional.ofNullable(((DateColumnStatistics)
columnStats).getMinimum())
+ .map(minStats ->
+ Math.toIntExact(ChronoUnit.DAYS.between(EPOCH_DAY,
+ EPOCH.plus(minStats.getTime(),
ChronoUnit.MILLIS).toLocalDate())))
+ .orElse(null);
+ } else if (columnStats instanceof TimestampColumnStatistics) {
+ TimestampColumnStatistics tColStats = (TimestampColumnStatistics)
columnStats;
+ Timestamp minValue = tColStats.getMinimumUTC();
+ min = Optional.ofNullable(minValue)
+ .map(v -> TimeUnit.MILLISECONDS.toMicros(v.getTime()))
+ .map(v -> v - 1_000) // Subtract 1 millisecond to handle precision
issue due to ORC-611
+ .orElse(null);
+ } else if (columnStats instanceof BooleanColumnStatistics) {
+ BooleanColumnStatistics booleanStats = (BooleanColumnStatistics)
columnStats;
+ min = booleanStats.getFalseCount() <= 0;
+ }
+ return Optional.ofNullable(Conversions.toByteBuffer(column.type(), min));
+ }
+
+ private static Optional<ByteBuffer> fromOrcMax(Types.NestedField column,
+ ColumnStatistics columnStats)
{
+ Object max = null;
+ if (columnStats instanceof IntegerColumnStatistics) {
+ max = ((IntegerColumnStatistics) columnStats).getMaximum();
+ if (column.type().typeId() == Type.TypeID.INTEGER) {
+ max = Math.toIntExact((long) max);
+ }
+ } else if (columnStats instanceof DoubleColumnStatistics) {
+ max = ((DoubleColumnStatistics) columnStats).getMaximum();
+ if (column.type().typeId() == Type.TypeID.FLOAT) {
+ max = ((Double) max).floatValue();
+ }
+ } else if (columnStats instanceof StringColumnStatistics) {
+ max = ((StringColumnStatistics) columnStats).getMaximum();
+ } else if (columnStats instanceof DecimalColumnStatistics) {
+ max = Optional
+ .ofNullable(((DecimalColumnStatistics) columnStats).getMaximum())
+ .map(maxStats -> maxStats.bigDecimalValue()
+ .setScale(((Types.DecimalType) column.type()).scale()))
+ .orElse(null);
+ } else if (columnStats instanceof DateColumnStatistics) {
+ max = Optional.ofNullable(((DateColumnStatistics)
columnStats).getMaximum())
+ .map(maxStats ->
+ (int) ChronoUnit.DAYS.between(EPOCH_DAY,
+ EPOCH.plus(maxStats.getTime(),
ChronoUnit.MILLIS).toLocalDate()))
+ .orElse(null);
+ } else if (columnStats instanceof TimestampColumnStatistics) {
+ TimestampColumnStatistics tColStats = (TimestampColumnStatistics)
columnStats;
+ Timestamp maxValue = tColStats.getMaximumUTC();
+ max = Optional.ofNullable(maxValue)
+ .map(v -> TimeUnit.MILLISECONDS.toMicros(v.getTime()))
+ .map(v -> v + 1_000) // Add 1 millisecond to handle precision issue
due to ORC-611
+ .orElse(null);
+ } else if (columnStats instanceof BooleanColumnStatistics) {
+ BooleanColumnStatistics booleanStats = (BooleanColumnStatistics)
columnStats;
+ max = booleanStats.getTrueCount() > 0;
+ }
+ return Optional.ofNullable(Conversions.toByteBuffer(column.type(), max));
+ }
+
+ private static Set<TypeDescription> findColumnsInContainers(Schema schema,
+ TypeDescription
orcSchema) {
+ ColumnsInContainersVisitor visitor = new ColumnsInContainersVisitor();
+ OrcSchemaWithTypeVisitor.visit(schema, orcSchema, visitor);
+ return visitor.getColumnsInContainers();
+ }
+
+ private static class ColumnsInContainersVisitor extends
OrcSchemaWithTypeVisitor<TypeDescription> {
+
+ private final Set<TypeDescription> columnsInContainers;
+
+ private ColumnsInContainersVisitor() {
+ columnsInContainers = Sets.newHashSet();
+ }
+
+ public Set<TypeDescription> getColumnsInContainers() {
+ return columnsInContainers;
+ }
+
+ private Set<TypeDescription> flatten(TypeDescription rootType) {
+ if (rootType == null) {
+ return ImmutableSet.of();
+ }
+
+ final Set<TypeDescription> flatTypes =
Sets.newHashSetWithExpectedSize(rootType.getMaximumId());
+ final Queue<TypeDescription> queue = Queues.newLinkedBlockingQueue();
+ queue.add(rootType);
+ while (!queue.isEmpty()) {
+ TypeDescription type = queue.remove();
+ flatTypes.add(type);
+
queue.addAll(Optional.ofNullable(type.getChildren()).orElse(ImmutableList.of()));
+ }
+ return flatTypes;
+ }
+
+ @Override
+ public TypeDescription record(Types.StructType iStruct, TypeDescription
record,
+ List<String> names, List<TypeDescription>
fields) {
+ return record;
+ }
+
+ @Override
+ public TypeDescription list(Types.ListType iList, TypeDescription array,
TypeDescription element) {
+ columnsInContainers.addAll(flatten(element));
+ return null;
+ }
+
+ @Override
+ public TypeDescription map(Types.MapType iMap, TypeDescription map,
+ TypeDescription key, TypeDescription value) {
+ columnsInContainers.addAll(flatten(key));
+ columnsInContainers.addAll(flatten(value));
+ return null;
Review comment:
I had it returning `map` but changed it to `null` to avoid traversals in
`flatten` for elements that we'd already have since the top level container
elements, even though Set already dedups the added columns. I have no strong
opinion on this, I'll change back to map.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]