edgarRd commented on a change in pull request #199:
URL: https://github.com/apache/iceberg/pull/199#discussion_r433660065
##########
File path: orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java
##########
@@ -19,51 +19,211 @@
package org.apache.iceberg.orc;
+import com.google.common.collect.Maps;
import java.io.IOException;
-import java.util.Collections;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.Map;
+import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Metrics;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.BooleanColumnStatistics;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.DateColumnStatistics;
+import org.apache.orc.DecimalColumnStatistics;
+import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.IntegerColumnStatistics;
import org.apache.orc.Reader;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.TimestampColumnStatistics;
+import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
public class OrcMetrics {
private OrcMetrics() {
}
+ static final OffsetDateTime EPOCH =
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+ static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
public static Metrics fromInputFile(InputFile file) {
final Configuration config = (file instanceof HadoopInputFile) ?
((HadoopInputFile) file).getConf() : new Configuration();
return fromInputFile(file, config);
}
- public static Metrics fromInputFile(InputFile file, Configuration config) {
+ static Metrics fromInputFile(InputFile file, Configuration config) {
try (Reader orcReader = ORC.newFileReader(file, config)) {
-
- // TODO: implement rest of the methods for ORC metrics
- // https://github.com/apache/incubator-iceberg/pull/199
- return new Metrics(orcReader.getNumberOfRows(),
- null,
- null,
- Collections.emptyMap(),
- null,
- null);
+ return buildOrcMetrics(orcReader.getNumberOfRows(),
+ orcReader.getSchema(), orcReader.getStatistics());
} catch (IOException ioe) {
- throw new RuntimeIOException(ioe, "Failed to read footer of file: %s",
file);
+ throw new RuntimeIOException(ioe, "Failed to open file: %s",
file.location());
}
}
+ private static Metrics buildOrcMetrics(final long numOfRows, final
TypeDescription orcSchema,
+ final ColumnStatistics[] colStats) {
+ final Schema schema = ORCSchemaUtil.convert(orcSchema);
+ Map<Integer, Long> columSizes =
Maps.newHashMapWithExpectedSize(colStats.length);
+ Map<Integer, Long> valueCounts =
Maps.newHashMapWithExpectedSize(colStats.length);
+ Map<Integer, Long> nullCounts =
Maps.newHashMapWithExpectedSize(colStats.length);
+ Map<Integer, ByteBuffer> lowerBounds = Maps.newHashMap();
+ Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap();
+
+ for (int i = 0; i < colStats.length; i++) {
+ final ColumnStatistics colStat = colStats[i];
+ final TypeDescription orcCol = orcSchema.findSubtype(i);
+ final Optional<Types.NestedField> icebergColOpt =
ORCSchemaUtil.icebergID(orcCol)
+ .map(schema::findField);
+
+ if (icebergColOpt.isPresent()) {
+ final Types.NestedField icebergCol = icebergColOpt.get();
+ final int fieldId = icebergCol.fieldId();
+
+ if (colStat.hasNull()) {
+ nullCounts.put(fieldId, numOfRows - colStat.getNumberOfValues());
+ } else {
+ nullCounts.put(fieldId, 0L);
+ }
+ columSizes.put(fieldId, colStat.getBytesOnDisk());
+ valueCounts.put(fieldId, colStat.getNumberOfValues() +
nullCounts.getOrDefault(fieldId, 0L));
+
+ Optional<ByteBuffer> orcMin = (colStat.getNumberOfValues() > 0) ?
+ fromOrcMin(icebergCol, colStat) : Optional.empty();
+ orcMin.ifPresent(byteBuffer -> lowerBounds.put(icebergCol.fieldId(),
byteBuffer));
+ Optional<ByteBuffer> orcMax = (colStat.getNumberOfValues() > 0) ?
+ fromOrcMax(icebergCol, colStat) : Optional.empty();
+ orcMax.ifPresent(byteBuffer -> upperBounds.put(icebergCol.fieldId(),
byteBuffer));
+ }
+ }
+
+ return new Metrics(numOfRows,
+ columSizes,
+ valueCounts,
+ nullCounts,
+ lowerBounds,
+ upperBounds);
+ }
+
static Metrics fromWriter(Writer writer) {
- // TODO: implement rest of the methods for ORC metrics in
- // https://github.com/apache/incubator-iceberg/pull/199
- return new Metrics(writer.getNumberOfRows(),
- null,
- null,
- Collections.emptyMap(),
- null,
- null);
+ try {
+ return buildOrcMetrics(writer.getNumberOfRows(),
+ writer.getSchema(), writer.getStatistics());
+ } catch (IOException ioe) {
+ throw new RuntimeIOException(ioe, "Failed to get statistics from
writer");
+ }
+ }
+
+ private static long toMicros(Timestamp ts) {
+ return ts.getTime() * 1000;
+ }
+
+ private static Optional<ByteBuffer> fromOrcMin(Types.NestedField column,
+ ColumnStatistics columnStats)
{
+ ByteBuffer min = null;
+ if (columnStats instanceof IntegerColumnStatistics) {
+ IntegerColumnStatistics intColStats = (IntegerColumnStatistics)
columnStats;
+ min = column.type().typeId() == Type.TypeID.INTEGER ?
+ Conversions.toByteBuffer(column.type(), (int)
intColStats.getMinimum()) :
+ Conversions.toByteBuffer(column.type(), intColStats.getMinimum());
+ } else if (columnStats instanceof DoubleColumnStatistics) {
+ double minVal = ((DoubleColumnStatistics) columnStats).getMinimum();
+ min = column.type().typeId() == Type.TypeID.DOUBLE ?
+ Conversions.toByteBuffer(column.type(), minVal) :
+ Conversions.toByteBuffer(column.type(), (float) minVal);
+ } else if (columnStats instanceof StringColumnStatistics) {
+ min = Conversions.toByteBuffer(column.type(),
+ ((StringColumnStatistics) columnStats).getMinimum());
+ } else if (columnStats instanceof DecimalColumnStatistics) {
+ min = Optional
+ .ofNullable(((DecimalColumnStatistics) columnStats).getMinimum())
+ .map(minStats -> {
+ BigDecimal minValue = minStats.bigDecimalValue()
+ .setScale(((Types.DecimalType) column.type()).scale());
+ return Conversions.toByteBuffer(column.type(), minValue);
+ })
+ .orElse(null);
+ } else if (columnStats instanceof DateColumnStatistics) {
+ min = Optional.ofNullable(((DateColumnStatistics)
columnStats).getMinimum())
+ .map(minStats -> Conversions.toByteBuffer(column.type(),
+ (int) ChronoUnit.DAYS.between(EPOCH_DAY,
+ EPOCH.plus(minStats.getTime(),
ChronoUnit.MILLIS).toLocalDate())))
+ .orElse(null);
+ } else if (columnStats instanceof TimestampColumnStatistics) {
+ TimestampColumnStatistics tColStats = (TimestampColumnStatistics)
columnStats;
+ Timestamp minValue = ((Types.TimestampType)
column.type()).shouldAdjustToUTC() ?
+ tColStats.getMinimum() : tColStats.getMinimumUTC();
+ min = Optional.ofNullable(minValue)
+ .map(v -> Conversions.toByteBuffer(column.type(), toMicros(v)))
+ .orElse(null);
+ } else if (columnStats instanceof BooleanColumnStatistics) {
+ BooleanColumnStatistics booleanStats = (BooleanColumnStatistics)
columnStats;
+ min = booleanStats.getFalseCount() > 0 ?
+ Conversions.toByteBuffer(column.type(), false) :
+ Conversions.toByteBuffer(column.type(), true);
+ }
+ return Optional.ofNullable(min);
}
+
+ private static Optional<ByteBuffer> fromOrcMax(Types.NestedField column,
+ ColumnStatistics columnStats)
{
+ ByteBuffer max = null;
+ if (columnStats instanceof IntegerColumnStatistics) {
+ IntegerColumnStatistics intColStats = (IntegerColumnStatistics)
columnStats;
+ max = column.type().typeId() == Type.TypeID.INTEGER ?
+ Conversions.toByteBuffer(column.type(), (int)
intColStats.getMaximum()) :
+ Conversions.toByteBuffer(column.type(), intColStats.getMaximum());
+ } else if (columnStats instanceof DoubleColumnStatistics) {
+ double maxVal = ((DoubleColumnStatistics) columnStats).getMaximum();
+ max = column.type().typeId() == Type.TypeID.DOUBLE ?
+ Conversions.toByteBuffer(column.type(), maxVal) :
+ Conversions.toByteBuffer(column.type(), (float) maxVal);
+ } else if (columnStats instanceof StringColumnStatistics) {
+ max = Conversions.toByteBuffer(column.type(),
+ ((StringColumnStatistics) columnStats).getMaximum());
+ } else if (columnStats instanceof DecimalColumnStatistics) {
+ max = Optional
+ .ofNullable(((DecimalColumnStatistics) columnStats).getMaximum())
+ .map(maxStats -> {
+ BigDecimal maxValue = maxStats.bigDecimalValue()
+ .setScale(((Types.DecimalType) column.type()).scale());
+ return Conversions.toByteBuffer(column.type(), maxValue);
+ })
+ .orElse(null);
+ } else if (columnStats instanceof DateColumnStatistics) {
+ max = Optional.ofNullable(((DateColumnStatistics)
columnStats).getMaximum())
+ .map(maxStats -> Conversions.toByteBuffer(column.type(),
+ (int) ChronoUnit.DAYS.between(EPOCH_DAY,
+ EPOCH.plus(maxStats.getTime(),
ChronoUnit.MILLIS).toLocalDate())))
+ .orElse(null);
+ } else if (columnStats instanceof TimestampColumnStatistics) {
+ TimestampColumnStatistics tColStats = (TimestampColumnStatistics)
columnStats;
Review comment:
> I found that we do need to adjust for pre-epoch bounds
After further testing I found that ORC does adjust on pre-epoch by 1
millisecond. However there's a range where the actual timestamp value is
invalid, described in ORC-342. I've added another comment in this PR related to
it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]