rdblue commented on a change in pull request #199:
URL: https://github.com/apache/incubator-iceberg/pull/199#discussion_r430545525



##########
File path: 
orc/src/main/java/org/apache/iceberg/orc/avro/GenericDataOrcWriter.java
##########
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.orc.avro;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.orc.OrcValueWriter;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.common.type.HiveDecimal;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+public class GenericDataOrcWriter implements 
OrcValueWriter<GenericData.Record> {
+
+  private final Converter[] converters;
+
+  private GenericDataOrcWriter(TypeDescription schema) {
+    this.converters = buildConverters(schema);
+  }
+
+  public static OrcValueWriter<GenericData.Record> buildWriter(TypeDescription 
fileSchema) {
+    return new GenericDataOrcWriter(fileSchema);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void write(GenericData.Record value, VectorizedRowBatch output) 
throws IOException {
+    int row = output.size++;
+    for (int c = 0; c < converters.length; ++c) {
+      converters[c].addValue(row, value.get(c), output.cols[c]);
+    }
+  }
+
+  /**
+   * The interface for the conversion from Spark's SpecializedGetters to
+   * ORC's ColumnVectors.
+   */
+  interface Converter<T> {
+
+    Class<T> getJavaClass();
+
+    /**
+     * Take a value from the Spark data value and add it to the ORC output.
+     * @param rowId the row in the ColumnVector
+     * @param data either an InternalRow or ArrayData
+     * @param output the ColumnVector to put the value into
+     */
+    void addValue(int rowId, T data, ColumnVector output);
+  }
+
+  static class BooleanConverter implements Converter<Boolean> {
+    @Override
+    public Class<Boolean> getJavaClass() {
+      return Boolean.class;
+    }
+
+    @Override
+    public void addValue(int rowId, Boolean data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data ? 1 : 0;
+      }
+    }
+  }
+
+  static class ByteConverter implements Converter<Byte> {
+    @Override
+    public Class<Byte> getJavaClass() {
+      return Byte.class;
+    }
+
+    public void addValue(int rowId, Byte data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data;
+      }
+    }
+  }
+
+  static class ShortConverter implements Converter<Short> {
+    @Override
+    public Class<Short> getJavaClass() {
+      return Short.class;
+    }
+
+    public void addValue(int rowId, Short data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data;
+      }
+    }
+  }
+
+  static class IntConverter implements Converter<Integer> {
+    @Override
+    public Class<Integer> getJavaClass() {
+      return Integer.class;
+    }
+
+    public void addValue(int rowId, Integer data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data;
+      }
+    }
+  }
+
+  static class LongConverter implements Converter<Long> {
+    @Override
+    public Class<Long> getJavaClass() {
+      return Long.class;
+    }
+
+    public void addValue(int rowId, Long data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data;
+      }
+    }
+  }
+
+  static class FloatConverter implements Converter<Float> {
+    @Override
+    public Class<Float> getJavaClass() {
+      return Float.class;
+    }
+
+    public void addValue(int rowId, Float data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((DoubleColumnVector) output).vector[rowId] = data;
+      }
+    }
+  }
+
+  static class DoubleConverter implements Converter<Double> {
+    @Override
+    public Class<Double> getJavaClass() {
+      return Double.class;
+    }
+
+    public void addValue(int rowId, Double data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((DoubleColumnVector) output).vector[rowId] = data;
+      }
+    }
+  }
+
+  static class StringConverter implements Converter<String> {
+    @Override
+    public Class<String> getJavaClass() {
+      return String.class;
+    }
+
+    public void addValue(int rowId, String data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        byte[] value = data.getBytes(StandardCharsets.UTF_8);
+        ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
+      }
+    }
+  }
+
+  static class UUIDConverter implements Converter<UUID> {
+    private final BytesConverter bytesConverter;
+
+    UUIDConverter() {
+      bytesConverter = new BytesConverter();
+    }
+
+    @Override
+    public Class<UUID> getJavaClass() {
+      return UUID.class;
+    }
+
+    private static byte[] asBytes(UUID uuid) {
+      ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
+      bb.putLong(uuid.getMostSignificantBits());
+      bb.putLong(uuid.getLeastSignificantBits());
+      return bb.array();
+    }
+
+    @Override
+    public void addValue(int rowId, UUID data, ColumnVector output) {
+      bytesConverter.addValue(rowId, data != null ? asBytes(data) : null, 
output);
+    }
+  }
+
+  static class FixedConverter implements Converter<GenericData.Fixed> {
+    private final BytesConverter bytesConverter;
+
+    FixedConverter() {
+      bytesConverter = new BytesConverter();
+    }
+
+    @Override
+    public Class<GenericData.Fixed> getJavaClass() {
+      return GenericData.Fixed.class;
+    }
+
+    @Override
+    public void addValue(int rowId, GenericData.Fixed data, ColumnVector 
output) {
+      bytesConverter.addValue(rowId, data != null ? data.bytes() : null, 
output);
+    }
+  }
+
+  static class BytesConverter implements Converter<byte[]> {
+    @Override
+    public Class<byte[]> getJavaClass() {
+      return byte[].class;
+    }
+
+    public void addValue(int rowId, byte[] data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        // getBinary always makes a copy, so we don't need to worry about it
+        // being changed behind our back.
+        ((BytesColumnVector) output).setRef(rowId, data, 0, data.length);
+      }
+    }
+  }
+
+  static class TimestampConverter implements Converter<Long> {
+    @Override
+    public Class<Long> getJavaClass() {
+      return Long.class;
+    }
+
+    public void addValue(int rowId, Long data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        TimestampColumnVector cv = (TimestampColumnVector) output;
+        long micros = data;
+        cv.time[rowId] = micros / 1_000; // millis
+        cv.nanos[rowId] = (int) (micros % 1_000_000) * 1_000; // nanos
+      }
+    }
+  }
+
+  static class Decimal18Converter implements Converter<BigDecimal> {
+    private final int precision;
+    private final int scale;
+
+    Decimal18Converter(TypeDescription schema) {
+      this.precision = schema.getPrecision();
+      this.scale = schema.getScale();
+    }
+
+    @Override
+    public Class<BigDecimal> getJavaClass() {
+      return BigDecimal.class;
+    }
+
+    public void addValue(int rowId, BigDecimal data, ColumnVector output) {
+      // TODO: validate precision and scale from schema
+      if (data == null) {

Review comment:
       This class is only used for a test. I think we should convert the test 
over to using Iceberg generics instead of Avro generics. I'll work on this 
later today.

##########
File path: 
orc/src/main/java/org/apache/iceberg/orc/avro/GenericDataOrcWriter.java
##########
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.orc.avro;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.orc.OrcValueWriter;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.common.type.HiveDecimal;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+public class GenericDataOrcWriter implements 
OrcValueWriter<GenericData.Record> {
+
+  private final Converter[] converters;
+
+  private GenericDataOrcWriter(TypeDescription schema) {
+    this.converters = buildConverters(schema);
+  }
+
+  public static OrcValueWriter<GenericData.Record> buildWriter(TypeDescription 
fileSchema) {
+    return new GenericDataOrcWriter(fileSchema);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void write(GenericData.Record value, VectorizedRowBatch output) 
throws IOException {
+    int row = output.size++;
+    for (int c = 0; c < converters.length; ++c) {
+      converters[c].addValue(row, value.get(c), output.cols[c]);
+    }
+  }
+
+  /**
+   * The interface for the conversion from Spark's SpecializedGetters to
+   * ORC's ColumnVectors.
+   */
+  interface Converter<T> {
+
+    Class<T> getJavaClass();
+
+    /**
+     * Take a value from the Spark data value and add it to the ORC output.
+     * @param rowId the row in the ColumnVector
+     * @param data either an InternalRow or ArrayData
+     * @param output the ColumnVector to put the value into
+     */
+    void addValue(int rowId, T data, ColumnVector output);
+  }
+
+  static class BooleanConverter implements Converter<Boolean> {
+    @Override
+    public Class<Boolean> getJavaClass() {
+      return Boolean.class;
+    }
+
+    @Override
+    public void addValue(int rowId, Boolean data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data ? 1 : 0;
+      }
+    }
+  }
+
+  static class ByteConverter implements Converter<Byte> {
+    @Override
+    public Class<Byte> getJavaClass() {
+      return Byte.class;
+    }
+
+    public void addValue(int rowId, Byte data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data;
+      }
+    }
+  }
+
+  static class ShortConverter implements Converter<Short> {
+    @Override
+    public Class<Short> getJavaClass() {
+      return Short.class;
+    }
+
+    public void addValue(int rowId, Short data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data;
+      }
+    }
+  }
+
+  static class IntConverter implements Converter<Integer> {
+    @Override
+    public Class<Integer> getJavaClass() {
+      return Integer.class;
+    }
+
+    public void addValue(int rowId, Integer data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data;
+      }
+    }
+  }
+
+  static class LongConverter implements Converter<Long> {
+    @Override
+    public Class<Long> getJavaClass() {
+      return Long.class;
+    }
+
+    public void addValue(int rowId, Long data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data;
+      }
+    }
+  }
+
+  static class FloatConverter implements Converter<Float> {
+    @Override
+    public Class<Float> getJavaClass() {
+      return Float.class;
+    }
+
+    public void addValue(int rowId, Float data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((DoubleColumnVector) output).vector[rowId] = data;
+      }
+    }
+  }
+
+  static class DoubleConverter implements Converter<Double> {
+    @Override
+    public Class<Double> getJavaClass() {
+      return Double.class;
+    }
+
+    public void addValue(int rowId, Double data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((DoubleColumnVector) output).vector[rowId] = data;
+      }
+    }
+  }
+
+  static class StringConverter implements Converter<String> {
+    @Override
+    public Class<String> getJavaClass() {
+      return String.class;
+    }
+
+    public void addValue(int rowId, String data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        byte[] value = data.getBytes(StandardCharsets.UTF_8);
+        ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
+      }
+    }
+  }
+
+  static class UUIDConverter implements Converter<UUID> {
+    private final BytesConverter bytesConverter;
+
+    UUIDConverter() {
+      bytesConverter = new BytesConverter();
+    }
+
+    @Override
+    public Class<UUID> getJavaClass() {
+      return UUID.class;
+    }
+
+    private static byte[] asBytes(UUID uuid) {
+      ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
+      bb.putLong(uuid.getMostSignificantBits());
+      bb.putLong(uuid.getLeastSignificantBits());
+      return bb.array();
+    }
+
+    @Override
+    public void addValue(int rowId, UUID data, ColumnVector output) {
+      bytesConverter.addValue(rowId, data != null ? asBytes(data) : null, 
output);
+    }
+  }
+
+  static class FixedConverter implements Converter<GenericData.Fixed> {
+    private final BytesConverter bytesConverter;
+
+    FixedConverter() {
+      bytesConverter = new BytesConverter();
+    }
+
+    @Override
+    public Class<GenericData.Fixed> getJavaClass() {
+      return GenericData.Fixed.class;
+    }
+
+    @Override
+    public void addValue(int rowId, GenericData.Fixed data, ColumnVector 
output) {
+      bytesConverter.addValue(rowId, data != null ? data.bytes() : null, 
output);
+    }
+  }
+
+  static class BytesConverter implements Converter<byte[]> {
+    @Override
+    public Class<byte[]> getJavaClass() {
+      return byte[].class;
+    }
+
+    public void addValue(int rowId, byte[] data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        // getBinary always makes a copy, so we don't need to worry about it
+        // being changed behind our back.
+        ((BytesColumnVector) output).setRef(rowId, data, 0, data.length);
+      }
+    }
+  }
+
+  static class TimestampConverter implements Converter<Long> {
+    @Override
+    public Class<Long> getJavaClass() {
+      return Long.class;
+    }
+
+    public void addValue(int rowId, Long data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        TimestampColumnVector cv = (TimestampColumnVector) output;
+        long micros = data;
+        cv.time[rowId] = micros / 1_000; // millis
+        cv.nanos[rowId] = (int) (micros % 1_000_000) * 1_000; // nanos
+      }
+    }
+  }
+
+  static class Decimal18Converter implements Converter<BigDecimal> {
+    private final int precision;
+    private final int scale;
+
+    Decimal18Converter(TypeDescription schema) {
+      this.precision = schema.getPrecision();
+      this.scale = schema.getScale();
+    }
+
+    @Override
+    public Class<BigDecimal> getJavaClass() {
+      return BigDecimal.class;
+    }
+
+    public void addValue(int rowId, BigDecimal data, ColumnVector output) {
+      // TODO: validate precision and scale from schema
+      if (data == null) {

Review comment:
       I just opened #1070 with the refactor.

##########
File path: orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java
##########
@@ -19,51 +19,211 @@
 
 package org.apache.iceberg.orc;
 
+import com.google.common.collect.Maps;
 import java.io.IOException;
-import java.util.Collections;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.Map;
+import java.util.Optional;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.Metrics;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.hadoop.HadoopInputFile;
 import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.BooleanColumnStatistics;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.DateColumnStatistics;
+import org.apache.orc.DecimalColumnStatistics;
+import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.IntegerColumnStatistics;
 import org.apache.orc.Reader;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.TimestampColumnStatistics;
+import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;
 
 public class OrcMetrics {
 
   private OrcMetrics() {
   }
 
+  static final OffsetDateTime EPOCH = 
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+  static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
   public static Metrics fromInputFile(InputFile file) {
     final Configuration config = (file instanceof HadoopInputFile) ?
         ((HadoopInputFile) file).getConf() : new Configuration();
     return fromInputFile(file, config);
   }
 
-  public static Metrics fromInputFile(InputFile file, Configuration config) {
+  static Metrics fromInputFile(InputFile file, Configuration config) {
     try (Reader orcReader = ORC.newFileReader(file, config)) {
-
-      // TODO: implement rest of the methods for ORC metrics
-      // https://github.com/apache/incubator-iceberg/pull/199
-      return new Metrics(orcReader.getNumberOfRows(),
-          null,
-          null,
-          Collections.emptyMap(),
-          null,
-          null);
+      return buildOrcMetrics(orcReader.getNumberOfRows(),
+          orcReader.getSchema(), orcReader.getStatistics());
     } catch (IOException ioe) {
-      throw new RuntimeIOException(ioe, "Failed to read footer of file: %s", 
file);
+      throw new RuntimeIOException(ioe, "Failed to open file: %s", 
file.location());
     }
   }
 
+  private static Metrics buildOrcMetrics(final long numOfRows, final 
TypeDescription orcSchema,
+                                         final ColumnStatistics[] colStats) {
+    final Schema schema = ORCSchemaUtil.convert(orcSchema);
+    Map<Integer, Long> columSizes = 
Maps.newHashMapWithExpectedSize(colStats.length);
+    Map<Integer, Long> valueCounts = 
Maps.newHashMapWithExpectedSize(colStats.length);
+    Map<Integer, Long> nullCounts = 
Maps.newHashMapWithExpectedSize(colStats.length);
+    Map<Integer, ByteBuffer> lowerBounds = Maps.newHashMap();
+    Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap();
+
+    for (int i = 0; i < colStats.length; i++) {
+      final ColumnStatistics colStat = colStats[i];
+      final TypeDescription orcCol = orcSchema.findSubtype(i);
+      final Optional<Types.NestedField> icebergColOpt = 
ORCSchemaUtil.icebergID(orcCol)
+          .map(schema::findField);
+
+      if (icebergColOpt.isPresent()) {
+        final Types.NestedField icebergCol = icebergColOpt.get();
+        final int fieldId = icebergCol.fieldId();
+
+        if (colStat.hasNull()) {
+          nullCounts.put(fieldId, numOfRows - colStat.getNumberOfValues());
+        } else {
+          nullCounts.put(fieldId, 0L);
+        }
+        columSizes.put(fieldId, colStat.getBytesOnDisk());
+        valueCounts.put(fieldId, colStat.getNumberOfValues() + 
nullCounts.getOrDefault(fieldId, 0L));
+
+        Optional<ByteBuffer> orcMin = (colStat.getNumberOfValues() > 0) ?
+            fromOrcMin(icebergCol, colStat) : Optional.empty();
+        orcMin.ifPresent(byteBuffer -> lowerBounds.put(icebergCol.fieldId(), 
byteBuffer));
+        Optional<ByteBuffer> orcMax = (colStat.getNumberOfValues() > 0) ?
+            fromOrcMax(icebergCol, colStat) : Optional.empty();
+        orcMax.ifPresent(byteBuffer -> upperBounds.put(icebergCol.fieldId(), 
byteBuffer));
+      }
+    }
+
+    return new Metrics(numOfRows,
+        columSizes,
+        valueCounts,
+        nullCounts,
+        lowerBounds,
+        upperBounds);
+  }
+
   static Metrics fromWriter(Writer writer) {
-    // TODO: implement rest of the methods for ORC metrics in
-    // https://github.com/apache/incubator-iceberg/pull/199
-    return new Metrics(writer.getNumberOfRows(),
-        null,
-        null,
-        Collections.emptyMap(),
-        null,
-        null);
+    try {
+      return buildOrcMetrics(writer.getNumberOfRows(),
+          writer.getSchema(), writer.getStatistics());
+    } catch (IOException ioe) {
+      throw new RuntimeIOException(ioe, "Failed to get statistics from 
writer");
+    }
+  }
+
+  private static long toMicros(Timestamp ts) {
+    return ts.getTime() * 1000;
+  }
+
+  private static Optional<ByteBuffer> fromOrcMin(Types.NestedField column,
+                                                 ColumnStatistics columnStats) 
{
+    ByteBuffer min = null;
+    if (columnStats instanceof IntegerColumnStatistics) {
+      IntegerColumnStatistics intColStats = (IntegerColumnStatistics) 
columnStats;
+      min = column.type().typeId() == Type.TypeID.INTEGER ?
+          Conversions.toByteBuffer(column.type(), (int) 
intColStats.getMinimum()) :
+          Conversions.toByteBuffer(column.type(), intColStats.getMinimum());
+    } else if (columnStats instanceof DoubleColumnStatistics) {
+      double minVal = ((DoubleColumnStatistics) columnStats).getMinimum();
+      min = column.type().typeId() == Type.TypeID.DOUBLE ?
+          Conversions.toByteBuffer(column.type(), minVal) :
+          Conversions.toByteBuffer(column.type(), (float) minVal);
+    } else if (columnStats instanceof StringColumnStatistics) {
+      min = Conversions.toByteBuffer(column.type(),
+          ((StringColumnStatistics) columnStats).getMinimum());
+    } else if (columnStats instanceof DecimalColumnStatistics) {
+      min = Optional
+          .ofNullable(((DecimalColumnStatistics) columnStats).getMinimum())
+          .map(minStats -> {
+            BigDecimal minValue = minStats.bigDecimalValue()
+                .setScale(((Types.DecimalType) column.type()).scale());
+            return Conversions.toByteBuffer(column.type(), minValue);
+          })
+          .orElse(null);
+    } else if (columnStats instanceof DateColumnStatistics) {
+      min = Optional.ofNullable(((DateColumnStatistics) 
columnStats).getMinimum())
+          .map(minStats -> Conversions.toByteBuffer(column.type(),
+              (int) ChronoUnit.DAYS.between(EPOCH_DAY,
+                  EPOCH.plus(minStats.getTime(), 
ChronoUnit.MILLIS).toLocalDate())))
+          .orElse(null);
+    } else if (columnStats instanceof TimestampColumnStatistics) {
+      TimestampColumnStatistics tColStats = (TimestampColumnStatistics) 
columnStats;
+      Timestamp minValue = ((Types.TimestampType) 
column.type()).shouldAdjustToUTC() ?
+          tColStats.getMinimum() : tColStats.getMinimumUTC();
+      min = Optional.ofNullable(minValue)
+          .map(v -> Conversions.toByteBuffer(column.type(), toMicros(v)))
+          .orElse(null);
+    } else if (columnStats instanceof BooleanColumnStatistics) {
+      BooleanColumnStatistics booleanStats = (BooleanColumnStatistics) 
columnStats;
+      min = booleanStats.getFalseCount() > 0 ?
+          Conversions.toByteBuffer(column.type(), false) :
+          Conversions.toByteBuffer(column.type(), true);
+    }
+    return Optional.ofNullable(min);
   }
+
+  private static Optional<ByteBuffer> fromOrcMax(Types.NestedField column,
+                                                 ColumnStatistics columnStats) 
{
+    ByteBuffer max = null;
+    if (columnStats instanceof IntegerColumnStatistics) {
+      IntegerColumnStatistics intColStats = (IntegerColumnStatistics) 
columnStats;
+      max = column.type().typeId() == Type.TypeID.INTEGER ?
+          Conversions.toByteBuffer(column.type(), (int) 
intColStats.getMaximum()) :
+          Conversions.toByteBuffer(column.type(), intColStats.getMaximum());
+    } else if (columnStats instanceof DoubleColumnStatistics) {
+      double maxVal = ((DoubleColumnStatistics) columnStats).getMaximum();
+      max = column.type().typeId() == Type.TypeID.DOUBLE ?
+          Conversions.toByteBuffer(column.type(), maxVal) :
+          Conversions.toByteBuffer(column.type(), (float) maxVal);
+    } else if (columnStats instanceof StringColumnStatistics) {
+      max = Conversions.toByteBuffer(column.type(),
+          ((StringColumnStatistics) columnStats).getMaximum());
+    } else if (columnStats instanceof DecimalColumnStatistics) {
+      max = Optional
+          .ofNullable(((DecimalColumnStatistics) columnStats).getMaximum())
+          .map(maxStats -> {
+            BigDecimal maxValue = maxStats.bigDecimalValue()
+                .setScale(((Types.DecimalType) column.type()).scale());
+            return Conversions.toByteBuffer(column.type(), maxValue);
+          })
+          .orElse(null);
+    } else if (columnStats instanceof DateColumnStatistics) {
+      max = Optional.ofNullable(((DateColumnStatistics) 
columnStats).getMaximum())
+          .map(maxStats -> Conversions.toByteBuffer(column.type(),
+              (int) ChronoUnit.DAYS.between(EPOCH_DAY,
+                  EPOCH.plus(maxStats.getTime(), 
ChronoUnit.MILLIS).toLocalDate())))
+          .orElse(null);
+    } else if (columnStats instanceof TimestampColumnStatistics) {
+      TimestampColumnStatistics tColStats = (TimestampColumnStatistics) 
columnStats;

Review comment:
       Can we increment milliseconds for the upper bound? It should be fine for 
the lower bound.

##########
File path: orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java
##########
@@ -19,51 +19,211 @@
 
 package org.apache.iceberg.orc;
 
+import com.google.common.collect.Maps;
 import java.io.IOException;
-import java.util.Collections;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.Map;
+import java.util.Optional;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.Metrics;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.hadoop.HadoopInputFile;
 import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.BooleanColumnStatistics;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.DateColumnStatistics;
+import org.apache.orc.DecimalColumnStatistics;
+import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.IntegerColumnStatistics;
 import org.apache.orc.Reader;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.TimestampColumnStatistics;
+import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;
 
 public class OrcMetrics {
 
   private OrcMetrics() {
   }
 
+  static final OffsetDateTime EPOCH = 
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+  static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
   public static Metrics fromInputFile(InputFile file) {
     final Configuration config = (file instanceof HadoopInputFile) ?
         ((HadoopInputFile) file).getConf() : new Configuration();
     return fromInputFile(file, config);
   }
 
-  public static Metrics fromInputFile(InputFile file, Configuration config) {
+  static Metrics fromInputFile(InputFile file, Configuration config) {
     try (Reader orcReader = ORC.newFileReader(file, config)) {
-
-      // TODO: implement rest of the methods for ORC metrics
-      // https://github.com/apache/incubator-iceberg/pull/199
-      return new Metrics(orcReader.getNumberOfRows(),
-          null,
-          null,
-          Collections.emptyMap(),
-          null,
-          null);
+      return buildOrcMetrics(orcReader.getNumberOfRows(),
+          orcReader.getSchema(), orcReader.getStatistics());
     } catch (IOException ioe) {
-      throw new RuntimeIOException(ioe, "Failed to read footer of file: %s", 
file);
+      throw new RuntimeIOException(ioe, "Failed to open file: %s", 
file.location());
     }
   }
 
+  private static Metrics buildOrcMetrics(final long numOfRows, final 
TypeDescription orcSchema,
+                                         final ColumnStatistics[] colStats) {
+    final Schema schema = ORCSchemaUtil.convert(orcSchema);
+    Map<Integer, Long> columSizes = 
Maps.newHashMapWithExpectedSize(colStats.length);
+    Map<Integer, Long> valueCounts = 
Maps.newHashMapWithExpectedSize(colStats.length);
+    Map<Integer, Long> nullCounts = 
Maps.newHashMapWithExpectedSize(colStats.length);
+    Map<Integer, ByteBuffer> lowerBounds = Maps.newHashMap();
+    Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap();
+
+    for (int i = 0; i < colStats.length; i++) {
+      final ColumnStatistics colStat = colStats[i];
+      final TypeDescription orcCol = orcSchema.findSubtype(i);
+      final Optional<Types.NestedField> icebergColOpt = 
ORCSchemaUtil.icebergID(orcCol)
+          .map(schema::findField);
+
+      if (icebergColOpt.isPresent()) {
+        final Types.NestedField icebergCol = icebergColOpt.get();
+        final int fieldId = icebergCol.fieldId();
+
+        if (colStat.hasNull()) {
+          nullCounts.put(fieldId, numOfRows - colStat.getNumberOfValues());
+        } else {
+          nullCounts.put(fieldId, 0L);
+        }
+        columSizes.put(fieldId, colStat.getBytesOnDisk());
+        valueCounts.put(fieldId, colStat.getNumberOfValues() + 
nullCounts.getOrDefault(fieldId, 0L));
+
+        Optional<ByteBuffer> orcMin = (colStat.getNumberOfValues() > 0) ?
+            fromOrcMin(icebergCol, colStat) : Optional.empty();
+        orcMin.ifPresent(byteBuffer -> lowerBounds.put(icebergCol.fieldId(), 
byteBuffer));
+        Optional<ByteBuffer> orcMax = (colStat.getNumberOfValues() > 0) ?
+            fromOrcMax(icebergCol, colStat) : Optional.empty();
+        orcMax.ifPresent(byteBuffer -> upperBounds.put(icebergCol.fieldId(), 
byteBuffer));
+      }
+    }
+
+    return new Metrics(numOfRows,
+        columSizes,
+        valueCounts,
+        nullCounts,
+        lowerBounds,
+        upperBounds);
+  }
+
   static Metrics fromWriter(Writer writer) {
-    // TODO: implement rest of the methods for ORC metrics in
-    // https://github.com/apache/incubator-iceberg/pull/199
-    return new Metrics(writer.getNumberOfRows(),
-        null,
-        null,
-        Collections.emptyMap(),
-        null,
-        null);
+    try {
+      return buildOrcMetrics(writer.getNumberOfRows(),
+          writer.getSchema(), writer.getStatistics());
+    } catch (IOException ioe) {
+      throw new RuntimeIOException(ioe, "Failed to get statistics from 
writer");
+    }
+  }
+
+  private static long toMicros(Timestamp ts) {
+    return ts.getTime() * 1000;
+  }
+
+  private static Optional<ByteBuffer> fromOrcMin(Types.NestedField column,
+                                                 ColumnStatistics columnStats) 
{
+    ByteBuffer min = null;
+    if (columnStats instanceof IntegerColumnStatistics) {
+      IntegerColumnStatistics intColStats = (IntegerColumnStatistics) 
columnStats;
+      min = column.type().typeId() == Type.TypeID.INTEGER ?
+          Conversions.toByteBuffer(column.type(), (int) 
intColStats.getMinimum()) :
+          Conversions.toByteBuffer(column.type(), intColStats.getMinimum());
+    } else if (columnStats instanceof DoubleColumnStatistics) {
+      double minVal = ((DoubleColumnStatistics) columnStats).getMinimum();
+      min = column.type().typeId() == Type.TypeID.DOUBLE ?
+          Conversions.toByteBuffer(column.type(), minVal) :
+          Conversions.toByteBuffer(column.type(), (float) minVal);
+    } else if (columnStats instanceof StringColumnStatistics) {
+      min = Conversions.toByteBuffer(column.type(),
+          ((StringColumnStatistics) columnStats).getMinimum());
+    } else if (columnStats instanceof DecimalColumnStatistics) {
+      min = Optional
+          .ofNullable(((DecimalColumnStatistics) columnStats).getMinimum())
+          .map(minStats -> {
+            BigDecimal minValue = minStats.bigDecimalValue()
+                .setScale(((Types.DecimalType) column.type()).scale());
+            return Conversions.toByteBuffer(column.type(), minValue);
+          })
+          .orElse(null);
+    } else if (columnStats instanceof DateColumnStatistics) {
+      min = Optional.ofNullable(((DateColumnStatistics) 
columnStats).getMinimum())
+          .map(minStats -> Conversions.toByteBuffer(column.type(),
+              (int) ChronoUnit.DAYS.between(EPOCH_DAY,
+                  EPOCH.plus(minStats.getTime(), 
ChronoUnit.MILLIS).toLocalDate())))
+          .orElse(null);
+    } else if (columnStats instanceof TimestampColumnStatistics) {
+      TimestampColumnStatistics tColStats = (TimestampColumnStatistics) 
columnStats;
+      Timestamp minValue = ((Types.TimestampType) 
column.type()).shouldAdjustToUTC() ?
+          tColStats.getMinimum() : tColStats.getMinimumUTC();
+      min = Optional.ofNullable(minValue)
+          .map(v -> Conversions.toByteBuffer(column.type(), toMicros(v)))

Review comment:
       I think it would be better to increment the upper bound value by 1ms to 
account for ORC-611, instead of not writing the bounds for timestamps. Iceberg 
doesn't require the bound to be exact, and the file-level filtering enabled by 
this is really valuable for cutting down on data that needs to be processed.

##########
File path: orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java
##########
@@ -19,51 +19,211 @@
 
 package org.apache.iceberg.orc;
 
+import com.google.common.collect.Maps;
 import java.io.IOException;
-import java.util.Collections;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.Map;
+import java.util.Optional;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.Metrics;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.hadoop.HadoopInputFile;
 import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.BooleanColumnStatistics;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.DateColumnStatistics;
+import org.apache.orc.DecimalColumnStatistics;
+import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.IntegerColumnStatistics;
 import org.apache.orc.Reader;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.TimestampColumnStatistics;
+import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;
 
 public class OrcMetrics {
 
   private OrcMetrics() {
   }
 
+  static final OffsetDateTime EPOCH = 
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+  static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
   public static Metrics fromInputFile(InputFile file) {
     final Configuration config = (file instanceof HadoopInputFile) ?
         ((HadoopInputFile) file).getConf() : new Configuration();
     return fromInputFile(file, config);
   }
 
-  public static Metrics fromInputFile(InputFile file, Configuration config) {
+  static Metrics fromInputFile(InputFile file, Configuration config) {
     try (Reader orcReader = ORC.newFileReader(file, config)) {
-
-      // TODO: implement rest of the methods for ORC metrics
-      // https://github.com/apache/incubator-iceberg/pull/199
-      return new Metrics(orcReader.getNumberOfRows(),
-          null,
-          null,
-          Collections.emptyMap(),
-          null,
-          null);
+      return buildOrcMetrics(orcReader.getNumberOfRows(),
+          orcReader.getSchema(), orcReader.getStatistics());
     } catch (IOException ioe) {
-      throw new RuntimeIOException(ioe, "Failed to read footer of file: %s", 
file);
+      throw new RuntimeIOException(ioe, "Failed to open file: %s", 
file.location());
     }
   }
 
+  private static Metrics buildOrcMetrics(final long numOfRows, final 
TypeDescription orcSchema,
+                                         final ColumnStatistics[] colStats) {
+    final Schema schema = ORCSchemaUtil.convert(orcSchema);
+    Map<Integer, Long> columSizes = 
Maps.newHashMapWithExpectedSize(colStats.length);
+    Map<Integer, Long> valueCounts = 
Maps.newHashMapWithExpectedSize(colStats.length);
+    Map<Integer, Long> nullCounts = 
Maps.newHashMapWithExpectedSize(colStats.length);
+    Map<Integer, ByteBuffer> lowerBounds = Maps.newHashMap();
+    Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap();
+
+    for (int i = 0; i < colStats.length; i++) {
+      final ColumnStatistics colStat = colStats[i];
+      final TypeDescription orcCol = orcSchema.findSubtype(i);
+      final Optional<Types.NestedField> icebergColOpt = 
ORCSchemaUtil.icebergID(orcCol)
+          .map(schema::findField);
+
+      if (icebergColOpt.isPresent()) {
+        final Types.NestedField icebergCol = icebergColOpt.get();
+        final int fieldId = icebergCol.fieldId();
+
+        if (colStat.hasNull()) {
+          nullCounts.put(fieldId, numOfRows - colStat.getNumberOfValues());
+        } else {
+          nullCounts.put(fieldId, 0L);
+        }
+        columSizes.put(fieldId, colStat.getBytesOnDisk());
+        valueCounts.put(fieldId, colStat.getNumberOfValues() + 
nullCounts.getOrDefault(fieldId, 0L));
+
+        Optional<ByteBuffer> orcMin = (colStat.getNumberOfValues() > 0) ?
+            fromOrcMin(icebergCol, colStat) : Optional.empty();
+        orcMin.ifPresent(byteBuffer -> lowerBounds.put(icebergCol.fieldId(), 
byteBuffer));
+        Optional<ByteBuffer> orcMax = (colStat.getNumberOfValues() > 0) ?
+            fromOrcMax(icebergCol, colStat) : Optional.empty();
+        orcMax.ifPresent(byteBuffer -> upperBounds.put(icebergCol.fieldId(), 
byteBuffer));
+      }
+    }
+
+    return new Metrics(numOfRows,
+        columSizes,
+        valueCounts,
+        nullCounts,
+        lowerBounds,
+        upperBounds);
+  }
+
   static Metrics fromWriter(Writer writer) {
-    // TODO: implement rest of the methods for ORC metrics in
-    // https://github.com/apache/incubator-iceberg/pull/199
-    return new Metrics(writer.getNumberOfRows(),
-        null,
-        null,
-        Collections.emptyMap(),
-        null,
-        null);
+    try {
+      return buildOrcMetrics(writer.getNumberOfRows(),
+          writer.getSchema(), writer.getStatistics());
+    } catch (IOException ioe) {
+      throw new RuntimeIOException(ioe, "Failed to get statistics from 
writer");
+    }
+  }
+
+  private static long toMicros(Timestamp ts) {
+    return ts.getTime() * 1000;
+  }
+
+  private static Optional<ByteBuffer> fromOrcMin(Types.NestedField column,
+                                                 ColumnStatistics columnStats) 
{
+    ByteBuffer min = null;
+    if (columnStats instanceof IntegerColumnStatistics) {
+      IntegerColumnStatistics intColStats = (IntegerColumnStatistics) 
columnStats;
+      min = column.type().typeId() == Type.TypeID.INTEGER ?
+          Conversions.toByteBuffer(column.type(), (int) 
intColStats.getMinimum()) :
+          Conversions.toByteBuffer(column.type(), intColStats.getMinimum());
+    } else if (columnStats instanceof DoubleColumnStatistics) {
+      double minVal = ((DoubleColumnStatistics) columnStats).getMinimum();
+      min = column.type().typeId() == Type.TypeID.DOUBLE ?
+          Conversions.toByteBuffer(column.type(), minVal) :
+          Conversions.toByteBuffer(column.type(), (float) minVal);
+    } else if (columnStats instanceof StringColumnStatistics) {
+      min = Conversions.toByteBuffer(column.type(),
+          ((StringColumnStatistics) columnStats).getMinimum());
+    } else if (columnStats instanceof DecimalColumnStatistics) {
+      min = Optional
+          .ofNullable(((DecimalColumnStatistics) columnStats).getMinimum())
+          .map(minStats -> {
+            BigDecimal minValue = minStats.bigDecimalValue()
+                .setScale(((Types.DecimalType) column.type()).scale());
+            return Conversions.toByteBuffer(column.type(), minValue);
+          })
+          .orElse(null);
+    } else if (columnStats instanceof DateColumnStatistics) {
+      min = Optional.ofNullable(((DateColumnStatistics) 
columnStats).getMinimum())
+          .map(minStats -> Conversions.toByteBuffer(column.type(),
+              (int) ChronoUnit.DAYS.between(EPOCH_DAY,
+                  EPOCH.plus(minStats.getTime(), 
ChronoUnit.MILLIS).toLocalDate())))
+          .orElse(null);
+    } else if (columnStats instanceof TimestampColumnStatistics) {
+      TimestampColumnStatistics tColStats = (TimestampColumnStatistics) 
columnStats;
+      Timestamp minValue = ((Types.TimestampType) 
column.type()).shouldAdjustToUTC() ?
+          tColStats.getMinimum() : tColStats.getMinimumUTC();
+      min = Optional.ofNullable(minValue)
+          .map(v -> Conversions.toByteBuffer(column.type(), toMicros(v)))
+          .orElse(null);
+    } else if (columnStats instanceof BooleanColumnStatistics) {
+      BooleanColumnStatistics booleanStats = (BooleanColumnStatistics) 
columnStats;
+      min = booleanStats.getFalseCount() > 0 ?
+          Conversions.toByteBuffer(column.type(), false) :
+          Conversions.toByteBuffer(column.type(), true);
+    }
+    return Optional.ofNullable(min);
   }
+
+  private static Optional<ByteBuffer> fromOrcMax(Types.NestedField column,
+                                                 ColumnStatistics columnStats) 
{
+    ByteBuffer max = null;
+    if (columnStats instanceof IntegerColumnStatistics) {
+      IntegerColumnStatistics intColStats = (IntegerColumnStatistics) 
columnStats;
+      max = column.type().typeId() == Type.TypeID.INTEGER ?
+          Conversions.toByteBuffer(column.type(), (int) 
intColStats.getMaximum()) :
+          Conversions.toByteBuffer(column.type(), intColStats.getMaximum());
+    } else if (columnStats instanceof DoubleColumnStatistics) {
+      double maxVal = ((DoubleColumnStatistics) columnStats).getMaximum();
+      max = column.type().typeId() == Type.TypeID.DOUBLE ?
+          Conversions.toByteBuffer(column.type(), maxVal) :
+          Conversions.toByteBuffer(column.type(), (float) maxVal);
+    } else if (columnStats instanceof StringColumnStatistics) {
+      max = Conversions.toByteBuffer(column.type(),
+          ((StringColumnStatistics) columnStats).getMaximum());
+    } else if (columnStats instanceof DecimalColumnStatistics) {
+      max = Optional
+          .ofNullable(((DecimalColumnStatistics) columnStats).getMaximum())
+          .map(maxStats -> {
+            BigDecimal maxValue = maxStats.bigDecimalValue()
+                .setScale(((Types.DecimalType) column.type()).scale());
+            return Conversions.toByteBuffer(column.type(), maxValue);
+          })
+          .orElse(null);
+    } else if (columnStats instanceof DateColumnStatistics) {
+      max = Optional.ofNullable(((DateColumnStatistics) 
columnStats).getMaximum())
+          .map(maxStats -> Conversions.toByteBuffer(column.type(),
+              (int) ChronoUnit.DAYS.between(EPOCH_DAY,
+                  EPOCH.plus(maxStats.getTime(), 
ChronoUnit.MILLIS).toLocalDate())))
+          .orElse(null);
+    } else if (columnStats instanceof TimestampColumnStatistics) {
+      TimestampColumnStatistics tColStats = (TimestampColumnStatistics) 
columnStats;

Review comment:
       We will definitely need tests for these cases to make sure we don't have 
a correctness bug because of ORC-611.

##########
File path: 
parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
##########
@@ -57,10 +78,133 @@ public File writeRecords(Schema schema, Map<String, 
String> properties, GenericD
     return ParquetWritingTestUtils.writeRecords(temp, schema, properties, 
records);
   }
 
-  @Override
-  public int splitCount(File parquetFile) throws IOException {
+  private int splitCount(File parquetFile) throws IOException {
     try (ParquetFileReader reader = 
ParquetFileReader.open(ParquetIO.file(localInput(parquetFile)))) {
       return reader.getRowGroups().size();
     }
   }
+
+  @Test
+  public void testMetricsForTopLevelWithMultipleRowGroup() throws Exception {

Review comment:
       I added an `Assume` for this in #1070 and it defaults to not running the 
test.

##########
File path: 
orc/src/main/java/org/apache/iceberg/orc/avro/GenericDataOrcWriter.java
##########
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.orc.avro;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.orc.OrcValueWriter;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.common.type.HiveDecimal;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+public class GenericDataOrcWriter implements 
OrcValueWriter<GenericData.Record> {

Review comment:
       See #1070.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to