http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
----------------------------------------------------------------------
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java 
b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
index 8056188..2d7491f 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
@@ -22,11 +22,14 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
 
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.ShouldNeverHappenException;
 import org.apache.parquet.column.ColumnReader;
 import org.apache.parquet.io.InvalidRecordException;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.PrimitiveConverter;
 import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.ColumnOrder.ColumnOrderName;
 
 
 /**
@@ -86,6 +89,26 @@ public final class PrimitiveType extends Type {
       public <T, E extends Exception> T convert(PrimitiveTypeNameConverter<T, 
E> converter) throws E {
         return converter.convertINT64(this);
       }
+
+      @Override
+      PrimitiveComparator<?> comparator(OriginalType logicalType) {
+        if (logicalType == null) {
+          return PrimitiveComparator.SIGNED_INT64_COMPARATOR;
+        }
+        switch (logicalType) {
+        case UINT_64:
+          return PrimitiveComparator.UNSIGNED_INT64_COMPARATOR;
+        case INT_64:
+        case DECIMAL:
+        case TIME_MICROS:
+        case TIMESTAMP_MILLIS:
+        case TIMESTAMP_MICROS:
+          return PrimitiveComparator.SIGNED_INT64_COMPARATOR;
+        default:
+          throw new ShouldNeverHappenException(
+              "No comparator logic implemented for INT64 logical type: " + 
logicalType);
+        }
+      }
     },
     INT32("getInteger", Integer.TYPE) {
       @Override
@@ -109,6 +132,29 @@ public final class PrimitiveType extends Type {
       public <T, E extends Exception> T convert(PrimitiveTypeNameConverter<T, 
E> converter) throws E {
         return converter.convertINT32(this);
       }
+
+      @Override
+      PrimitiveComparator<?> comparator(OriginalType logicalType) {
+        if (logicalType == null) {
+          return PrimitiveComparator.SIGNED_INT32_COMPARATOR;
+        }
+        switch (logicalType) {
+        case UINT_8:
+        case UINT_16:
+        case UINT_32:
+          return PrimitiveComparator.UNSIGNED_INT32_COMPARATOR;
+        case INT_8:
+        case INT_16:
+        case INT_32:
+        case DECIMAL:
+        case DATE:
+        case TIME_MILLIS:
+          return PrimitiveComparator.SIGNED_INT32_COMPARATOR;
+        default:
+          throw new ShouldNeverHappenException(
+              "No comparator logic implemented for INT32 logical type: " + 
logicalType);
+        }
+      }
     },
     BOOLEAN("getBoolean", Boolean.TYPE) {
       @Override
@@ -132,6 +178,11 @@ public final class PrimitiveType extends Type {
       public <T, E extends Exception> T convert(PrimitiveTypeNameConverter<T, 
E> converter) throws E {
         return converter.convertBOOLEAN(this);
       }
+
+      @Override
+      PrimitiveComparator<?> comparator(OriginalType logicalType) {
+        return PrimitiveComparator.BOOLEAN_COMPARATOR;
+      }
     },
     BINARY("getBinary", Binary.class) {
       @Override
@@ -155,6 +206,25 @@ public final class PrimitiveType extends Type {
       public <T, E extends Exception> T convert(PrimitiveTypeNameConverter<T, 
E> converter) throws E {
         return converter.convertBINARY(this);
       }
+
+      @Override
+      PrimitiveComparator<?> comparator(OriginalType logicalType) {
+        if (logicalType == null) {
+          return 
PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR;
+        }
+        switch (logicalType) {
+        case DECIMAL:
+          return PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR;
+        case UTF8:
+        case ENUM:
+        case JSON:
+        case BSON:
+          return 
PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR;
+        default:
+          throw new ShouldNeverHappenException(
+              "No comparator logic implemented for BINARY logical type: " + 
logicalType);
+        }
+      }
     },
     FLOAT("getFloat", Float.TYPE) {
       @Override
@@ -178,6 +248,11 @@ public final class PrimitiveType extends Type {
       public <T, E extends Exception> T convert(PrimitiveTypeNameConverter<T, 
E> converter) throws E {
         return converter.convertFLOAT(this);
       }
+
+      @Override
+      PrimitiveComparator<?> comparator(OriginalType logicalType) {
+        return PrimitiveComparator.FLOAT_COMPARATOR;
+      }
     },
     DOUBLE("getDouble", Double.TYPE) {
       @Override
@@ -201,6 +276,11 @@ public final class PrimitiveType extends Type {
       public <T, E extends Exception> T convert(PrimitiveTypeNameConverter<T, 
E> converter) throws E {
         return converter.convertDOUBLE(this);
       }
+
+      @Override
+      PrimitiveComparator<?> comparator(OriginalType logicalType) {
+        return PrimitiveComparator.DOUBLE_COMPARATOR;
+      }
     },
     INT96("getBinary", Binary.class) {
       @Override
@@ -222,6 +302,11 @@ public final class PrimitiveType extends Type {
       public <T, E extends Exception> T convert(PrimitiveTypeNameConverter<T, 
E> converter) throws E {
         return converter.convertINT96(this);
       }
+
+      @Override
+      PrimitiveComparator<?> comparator(OriginalType logicalType) {
+        return PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR;
+      }
     },
     FIXED_LEN_BYTE_ARRAY("getBinary", Binary.class) {
       @Override
@@ -245,6 +330,22 @@ public final class PrimitiveType extends Type {
       public <T, E extends Exception> T convert(PrimitiveTypeNameConverter<T, 
E> converter) throws E {
         return converter.convertFIXED_LEN_BYTE_ARRAY(this);
       }
+
+      @Override
+      PrimitiveComparator<?> comparator(OriginalType logicalType) {
+        if (logicalType == null) {
+          return 
PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR;
+        }
+        switch (logicalType) {
+        case DECIMAL:
+          return PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR;
+        case INTERVAL:
+          return 
PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR;
+        default:
+          throw new ShouldNeverHappenException(
+              "No comparator logic implemented for FIXED_LEN_BYTE_ARRAY 
logical type: " + logicalType);
+        }
+      }
     };
 
     public final String getMethod;
@@ -275,11 +376,14 @@ public final class PrimitiveType extends Type {
 
     abstract public <T, E extends Exception> T 
convert(PrimitiveTypeNameConverter<T, E> converter) throws E;
 
+    abstract PrimitiveComparator<?> comparator(OriginalType logicalType);
+
   }
 
   private final PrimitiveTypeName primitive;
   private final int length;
   private final DecimalMetadata decimalMeta;
+  private final ColumnOrder columnOrder;
 
   /**
    * @param repetition OPTIONAL, REPEATED, REQUIRED
@@ -337,10 +441,61 @@ public final class PrimitiveType extends Type {
   public PrimitiveType(Repetition repetition, PrimitiveTypeName primitive,
                        int length, String name, OriginalType originalType,
                        DecimalMetadata decimalMeta, ID id) {
+    this(repetition, primitive, length, name, originalType, decimalMeta, id, 
null);
+  }
+
+  PrimitiveType(Repetition repetition, PrimitiveTypeName primitive,
+      int length, String name, OriginalType originalType,
+      DecimalMetadata decimalMeta, ID id, ColumnOrder columnOrder) {
     super(name, repetition, originalType, id);
     this.primitive = primitive;
     this.length = length;
     this.decimalMeta = decimalMeta;
+
+    if (columnOrder == null) {
+      columnOrder = primitive == PrimitiveTypeName.INT96 || originalType == 
OriginalType.INTERVAL
+          ? ColumnOrder.undefined()
+          : ColumnOrder.typeDefined();
+    }
+    this.columnOrder = requireValidColumnOrder(columnOrder);
+  }
+
+  private ColumnOrder requireValidColumnOrder(ColumnOrder columnOrder) {
+    if (primitive == PrimitiveTypeName.INT96) {
+      Preconditions.checkArgument(columnOrder.getColumnOrderName() == 
ColumnOrderName.UNDEFINED,
+          "The column order {} is not supported by INT96", columnOrder);
+    }
+    if (getOriginalType() != null) {
+      // Explicitly listing all the logical types to avoid having unsupported 
column orders new types accidentally
+      switch (getOriginalType()) {
+        case INT_8:
+        case INT_16:
+        case INT_32:
+        case INT_64:
+        case UINT_8:
+        case UINT_16:
+        case UINT_32:
+        case UINT_64:
+        case UTF8:
+        case DECIMAL:
+        case DATE:
+        case TIME_MILLIS:
+        case TIME_MICROS:
+        case TIMESTAMP_MILLIS:
+        case TIMESTAMP_MICROS:
+        case ENUM:
+        case JSON:
+        case BSON:
+          // Currently any available column order is valid
+          break;
+        case INTERVAL:
+        default:
+          Preconditions.checkArgument(columnOrder.getColumnOrderName() == 
ColumnOrderName.UNDEFINED,
+              "The column order {} is not supported by {} ({})", columnOrder, 
primitive, getOriginalType());
+          break;
+      }
+    }
+    return columnOrder;
   }
 
   /**
@@ -349,7 +504,8 @@ public final class PrimitiveType extends Type {
    */
   @Override
   public PrimitiveType withId(int id) {
-    return new PrimitiveType(getRepetition(), primitive, length, getName(), 
getOriginalType(), decimalMeta, new ID(id));
+    return new PrimitiveType(getRepetition(), primitive, length, getName(), 
getOriginalType(), decimalMeta, new ID(id),
+        columnOrder);
   }
 
   /**
@@ -441,6 +597,7 @@ public final class PrimitiveType extends Type {
     return super.equals(other)
         && primitive == otherPrimitive.getPrimitiveTypeName()
         && length == otherPrimitive.length
+        && columnOrder.equals(otherPrimitive.columnOrder)
         && eqOrBothNull(decimalMeta, otherPrimitive.decimalMeta);
   }
 
@@ -452,6 +609,7 @@ public final class PrimitiveType extends Type {
     int hash = super.hashCode();
     hash = hash * 31 + primitive.hashCode();
     hash = hash * 31 + length;
+    hash = hash * 31 + columnOrder.hashCode();
     if (decimalMeta != null) {
       hash = hash * 31 + decimalMeta.hashCode();
     }
@@ -519,6 +677,11 @@ public final class PrimitiveType extends Type {
     throw new IncompatibleSchemaModificationException("can not merge type " + 
toMerge + " into " + this);
   }
 
+  private void reportSchemaMergeErrorWithColumnOrder(Type toMerge) {
+    throw new IncompatibleSchemaModificationException("can not merge type " + 
toMerge + " with column order "
+        + toMerge.asPrimitiveType().columnOrder() + " into " + this + " with 
column order " + columnOrder());
+  }
+
   @Override
   protected Type union(Type toMerge, boolean strict) {
     if (!toMerge.isPrimitive()) {
@@ -537,6 +700,11 @@ public final class PrimitiveType extends Type {
       if (primitive == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY && length != 
toMergeLength) {
         reportSchemaMergeError(toMerge);
       }
+
+      // Can't merge primitive fields with different column orders
+      if (!columnOrder().equals(toMerge.asPrimitiveType().columnOrder())) {
+        reportSchemaMergeErrorWithColumnOrder(toMerge);
+      }
     }
 
     Types.PrimitiveBuilder<PrimitiveType> builder = Types.primitive(primitive, 
toMerge.getRepetition());
@@ -547,4 +715,21 @@ public final class PrimitiveType extends Type {
 
     return builder.as(getOriginalType()).named(getName());
   }
+
+  /**
+   * Returns the {@link Type} specific comparator for properly comparing 
values. The natural ordering of the values
+   * might not proper in certain cases (e.g. {@code UINT_32} requires unsigned 
comparison of {@code int} values while
+   * the natural ordering is signed.)
+   */
+  @SuppressWarnings("unchecked")
+  public <T> PrimitiveComparator<T> comparator() {
+    return (PrimitiveComparator<T>) 
getPrimitiveTypeName().comparator(getOriginalType());
+  }
+
+  /**
+   * @return the column order for this type
+   */
+  public ColumnOrder columnOrder() {
+    return columnOrder;
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java 
b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
index e81daae..0422a9d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.parquet.Preconditions;
+import org.apache.parquet.schema.ColumnOrder.ColumnOrderName;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.Type.ID;
 import org.slf4j.Logger;
@@ -316,6 +317,7 @@ public class Types {
     private int length = NOT_SET;
     private int precision = NOT_SET;
     private int scale = NOT_SET;
+    private ColumnOrder columnOrder;
 
     private BasePrimitiveBuilder(P parent, PrimitiveTypeName type) {
       super(parent);
@@ -374,6 +376,22 @@ public class Types {
       return self();
     }
 
+    /**
+     * Adds the column order for the primitive type.
+     * <p>
+     * In case of not set the default column order is {@link 
ColumnOrderName#TYPE_DEFINED_ORDER} except the type
+     * {@link PrimitiveTypeName#INT96} and the types annotated by {@link 
OriginalType#INTERVAL} where the default column
+     * order is {@link ColumnOrderName#UNDEFINED}.
+     *
+     * @param columnOrder
+     *          the column order for the primitive type
+     * @return this builder for method chaining
+     */
+    public THIS columnOrder(ColumnOrder columnOrder) {
+      this.columnOrder = columnOrder;
+      return self();
+    }
+
     @Override
     protected PrimitiveType build(String name) {
       if (PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY == primitiveType) {
@@ -457,7 +475,7 @@ public class Types {
         }
       }
 
-      return new PrimitiveType(repetition, primitiveType, length, name, 
originalType, meta, id);
+      return new PrimitiveType(repetition, primitiveType, length, name, 
originalType, meta, id, columnOrder);
     }
 
     private static long maxPrecision(int numBytes) {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestStatistics.java
----------------------------------------------------------------------
diff --git 
a/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestStatistics.java
 
b/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestStatistics.java
index 690c7e1..476fbb3 100644
--- 
a/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestStatistics.java
+++ 
b/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestStatistics.java
@@ -74,6 +74,13 @@ public class TestStatistics {
     assertEquals(statsNeg.getMax(), 54);
     assertEquals(statsNeg.getMin(), -66);
 
+    assertTrue(statsNeg.compareMaxToValue(55) < 0);
+    assertTrue(statsNeg.compareMaxToValue(54) == 0);
+    assertTrue(statsNeg.compareMaxToValue(5) > 0);
+    assertTrue(statsNeg.compareMinToValue(0) < 0);
+    assertTrue(statsNeg.compareMinToValue(-66) == 0);
+    assertTrue(statsNeg.compareMinToValue(-67) > 0);
+
     // Test converting to and from byte[]
     byte[] intMaxBytes = statsNeg.getMaxBytes();
     byte[] intMinBytes = statsNeg.getMinBytes();
@@ -135,6 +142,13 @@ public class TestStatistics {
     assertEquals(statsNeg.getMax(), 993);
     assertEquals(statsNeg.getMin(), -9914);
 
+    assertTrue(statsNeg.compareMaxToValue(994) < 0);
+    assertTrue(statsNeg.compareMaxToValue(993) == 0);
+    assertTrue(statsNeg.compareMaxToValue(-1000) > 0);
+    assertTrue(statsNeg.compareMinToValue(10000) < 0);
+    assertTrue(statsNeg.compareMinToValue(-9914) == 0);
+    assertTrue(statsNeg.compareMinToValue(-9915) > 0);
+
     // Test converting to and from byte[]
     byte[] longMaxBytes = statsNeg.getMaxBytes();
     byte[] longMinBytes = statsNeg.getMinBytes();
@@ -196,6 +210,13 @@ public class TestStatistics {
     assertEquals(statsNeg.getMax(), 0.65f, 1e-10);
     assertEquals(statsNeg.getMin(), -412.99f, 1e-10);
 
+    assertTrue(statsNeg.compareMaxToValue(1) < 0);
+    assertTrue(statsNeg.compareMaxToValue(0.65F) == 0);
+    assertTrue(statsNeg.compareMaxToValue(0.649F) > 0);
+    assertTrue(statsNeg.compareMinToValue(-412.98F) < 0);
+    assertTrue(statsNeg.compareMinToValue(-412.99F) == 0);
+    assertTrue(statsNeg.compareMinToValue(-450) > 0);
+
     // Test converting to and from byte[]
     byte[] floatMaxBytes = statsNeg.getMaxBytes();
     byte[] floatMinBytes = statsNeg.getMinBytes();
@@ -257,6 +278,13 @@ public class TestStatistics {
     assertEquals(statsNeg.getMax(), 23.0d, 1e-10);
     assertEquals(statsNeg.getMin(), -944.5d, 1e-10);
 
+    assertTrue(statsNeg.compareMaxToValue(23.0001D) < 0);
+    assertTrue(statsNeg.compareMaxToValue(23D) == 0);
+    assertTrue(statsNeg.compareMaxToValue(0D) > 0);
+    assertTrue(statsNeg.compareMinToValue(-400D) < 0);
+    assertTrue(statsNeg.compareMinToValue(-944.5D) == 0);
+    assertTrue(statsNeg.compareMinToValue(-944.500001D) > 0);
+
     // Test converting to and from byte[]
     byte[] doubleMaxBytes = statsNeg.getMaxBytes();
     byte[] doubleMinBytes = statsNeg.getMinBytes();

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
----------------------------------------------------------------------
diff --git 
a/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java 
b/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
index a541e1b..0815597 100644
--- a/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
+++ b/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
@@ -248,4 +248,24 @@ public class TestBinary {
 
     testSerializable(bf, reused);
   }
+
+  @Test
+  public void testCompare() {
+    Binary b1 = Binary.fromCharSequence("aaaaaaaa");
+    Binary b2 = Binary.fromString("aaaaaaab");
+    Binary b3 = Binary.fromReusedByteArray("aaaaaaaaaaa".getBytes(), 1, 8);
+    Binary b4 = 
Binary.fromConstantByteBuffer(ByteBuffer.wrap("aaaaaaac".getBytes()));
+
+    assertTrue(b1.compareTo(b2) < 0);
+    assertTrue(b2.compareTo(b1) > 0);
+    assertTrue(b3.compareTo(b4) < 0);
+    assertTrue(b4.compareTo(b3) > 0);
+    assertTrue(b1.compareTo(b4) < 0);
+    assertTrue(b4.compareTo(b1) > 0);
+    assertTrue(b2.compareTo(b4) < 0);
+    assertTrue(b4.compareTo(b2) > 0);
+
+    assertTrue(b1.compareTo(b3) == 0);
+    assertTrue(b3.compareTo(b1) == 0);
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java
----------------------------------------------------------------------
diff --git 
a/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java 
b/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java
index 4add174..0561938 100644
--- 
a/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java
+++ 
b/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java
@@ -21,9 +21,12 @@ package org.apache.parquet.schema;
 import static 
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
 import static org.apache.parquet.schema.OriginalType.LIST;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96;
 import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
 import static org.apache.parquet.schema.Type.Repetition.REPEATED;
 import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
@@ -189,6 +192,48 @@ public class TestMessageType {
   }
 
   @Test
+  public void testMergeSchemaWithColumnOrder() {
+    MessageType m1 = Types.buildMessage().addFields(
+        Types.requiredList().element(
+            
Types.optional(BINARY).columnOrder(ColumnOrder.undefined()).named("a")
+            ).named("g"),
+        Types.optional(INT96).named("b")
+        ).named("root");
+    MessageType m2 = Types.buildMessage().addFields(
+        Types.requiredList().element(
+            
Types.optional(BINARY).columnOrder(ColumnOrder.undefined()).named("a")
+            ).named("g"),
+        Types.optional(BINARY).named("c")
+        ).named("root");
+    MessageType m3 = Types.buildMessage().addFields(
+        Types.requiredList().element(
+            Types.optional(BINARY).named("a")
+            ).named("g")
+        ).named("root");
+
+    assertEquals(
+        Types.buildMessage().addFields(
+            Types.requiredList().element(
+                Types.optional(BINARY).named("a")
+                ).named("g"),
+            Types.optional(INT96).named("b"),
+            Types.optional(BINARY).named("c")
+            ).named("root"),
+        m1.union(m2));
+    try {
+      m1.union(m3);
+      fail("An IncompatibleSchemaModificationException should have been 
thrown");
+    } catch (Exception e) {
+      assertTrue(
+          "The thrown exception should have been 
IncompatibleSchemaModificationException but was " + e.getClass(),
+          e instanceof IncompatibleSchemaModificationException);
+      assertEquals(
+          "can not merge type optional binary a with column order 
TYPE_DEFINED_ORDER into optional binary a with column order UNDEFINED",
+          e.getMessage());
+    }
+  }
+
+  @Test
   public void testIDs() throws Exception {
     MessageType schema = new MessageType("test",
         new PrimitiveType(REQUIRED, BINARY, "foo").withId(4),

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java
----------------------------------------------------------------------
diff --git 
a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java
 
b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java
new file mode 100644
index 0000000..3f9d643
--- /dev/null
+++ 
b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.schema;
+
+import org.apache.parquet.io.api.Binary;
+import org.junit.Test;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+import static org.apache.parquet.schema.PrimitiveComparator.BOOLEAN_COMPARATOR;
+import static org.apache.parquet.schema.PrimitiveComparator.DOUBLE_COMPARATOR;
+import static org.apache.parquet.schema.PrimitiveComparator.FLOAT_COMPARATOR;
+import static 
org.apache.parquet.schema.PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR;
+import static 
org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR;
+import static 
org.apache.parquet.schema.PrimitiveComparator.SIGNED_INT32_COMPARATOR;
+import static 
org.apache.parquet.schema.PrimitiveComparator.SIGNED_INT64_COMPARATOR;
+import static 
org.apache.parquet.schema.PrimitiveComparator.UNSIGNED_INT32_COMPARATOR;
+import static 
org.apache.parquet.schema.PrimitiveComparator.UNSIGNED_INT64_COMPARATOR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/*
+ * This test verifies all the PrimitiveComparator implementations. The logic 
of all tests is the same: list the
+ * elements to be tested in ascending order and then compare every elements to 
each other (including the element
+ * itself) and expect the related value based on the defined order.
+ */
+public class TestPrimitiveComparator {
+
+  @Test
+  public void testBooleanComparator() {
+    Boolean[] valuesInAscendingOrder = { null, false, true };
+
+    for (int i = 0; i < valuesInAscendingOrder.length; ++i) {
+      for (int j = 0; j < valuesInAscendingOrder.length; ++j) {
+        Boolean vi = valuesInAscendingOrder[i];
+        Boolean vj = valuesInAscendingOrder[j];
+        int exp = i - j;
+        assertSignumEquals(vi, vj, exp, BOOLEAN_COMPARATOR.compare(vi, vj));
+        if (vi != null && vj != null) {
+          assertSignumEquals(vi, vj, exp, 
BOOLEAN_COMPARATOR.compare(vi.booleanValue(), vj.booleanValue()));
+        }
+      }
+    }
+
+    checkThrowingUnsupportedException(BOOLEAN_COMPARATOR, Boolean.TYPE);
+  }
+
+  @Test
+  public void testSignedInt32Comparator() {
+    testInt32Comparator(SIGNED_INT32_COMPARATOR,
+        null,
+        Integer.MIN_VALUE,
+        -12345,
+        -1,
+        0,
+        1,
+        12345,
+        Integer.MAX_VALUE);
+  }
+
+  @Test
+  public void testUnsignedInt32Comparator() {
+    testInt32Comparator(UNSIGNED_INT32_COMPARATOR,
+        null,
+        0,                  // 0x00000000
+        1,                  // 0x00000001
+        12345,              // 0x00003039
+        Integer.MAX_VALUE,  // 0x7FFFFFFF
+        Integer.MIN_VALUE,  // 0x80000000
+        -12345,             // 0xFFFFCFC7
+        -1);                // 0xFFFFFFFF
+  }
+
+  private void testInt32Comparator(PrimitiveComparator<Integer> comparator, 
Integer... valuesInAscendingOrder) {
+    for (int i = 0; i < valuesInAscendingOrder.length; ++i) {
+      for (int j = 0; j < valuesInAscendingOrder.length; ++j) {
+        Integer vi = valuesInAscendingOrder[i];
+        Integer vj = valuesInAscendingOrder[j];
+        int exp = i - j;
+        assertSignumEquals(vi, vj, exp, comparator.compare(vi, vj));
+        if (vi != null && vj != null) {
+          assertSignumEquals(vi, vj, exp, comparator.compare(vi.intValue(), 
vj.intValue()));
+        }
+      }
+    }
+
+    checkThrowingUnsupportedException(comparator, Integer.TYPE);
+  }
+
+  @Test
+  public void testSignedInt64Comparator() {
+    testInt64Comparator(SIGNED_INT64_COMPARATOR,
+        null,
+        Long.MIN_VALUE,
+        -12345678901L,
+        -1L,
+        0L,
+        1L,
+        12345678901L,
+        Long.MAX_VALUE);
+  }
+
+  @Test
+  public void testUnsignedInt64Comparator() {
+    testInt64Comparator(UNSIGNED_INT64_COMPARATOR,
+        null,
+        0L,              // 0x0000000000000000
+        1L,              // 0x0000000000000001
+        12345678901L,    // 0x00000002DFDC1C35
+        Long.MAX_VALUE,  // 0x7FFFFFFFFFFFFFFF
+        Long.MIN_VALUE,  // 0x8000000000000000
+        -12345678901L,   // 0xFFFFFFFD2023E3CB
+        -1L);            // 0xFFFFFFFFFFFFFFFF
+  }
+
+  private void testInt64Comparator(PrimitiveComparator<Long> comparator, 
Long... valuesInAscendingOrder) {
+    for (int i = 0; i < valuesInAscendingOrder.length; ++i) {
+      for (int j = 0; j < valuesInAscendingOrder.length; ++j) {
+        Long vi = valuesInAscendingOrder[i];
+        Long vj = valuesInAscendingOrder[j];
+        int exp = i - j;
+        assertSignumEquals(vi, vj, exp, comparator.compare(vi, vj));
+        if (vi != null && vj != null) {
+          assertSignumEquals(vi, vj, exp, comparator.compare(vi.longValue(), 
vj.longValue()));
+        }
+      }
+    }
+
+    checkThrowingUnsupportedException(comparator, Long.TYPE);
+  }
+
+  @Test
+  public void testFloatComparator() {
+    Float[] valuesInAscendingOrder = {
+        null,
+        Float.NEGATIVE_INFINITY,
+        -Float.MAX_VALUE,
+        -1234.5678F,
+        -Float.MIN_VALUE,
+        0.0F,
+        Float.MIN_VALUE,
+        1234.5678F,
+        Float.MAX_VALUE,
+        Float.POSITIVE_INFINITY };
+
+    for (int i = 0; i < valuesInAscendingOrder.length; ++i) {
+      for (int j = 0; j < valuesInAscendingOrder.length; ++j) {
+        Float vi = valuesInAscendingOrder[i];
+        Float vj = valuesInAscendingOrder[j];
+        int exp = i - j;
+        assertSignumEquals(vi, vj, exp, FLOAT_COMPARATOR.compare(vi, vj));
+        if (vi != null && vj != null) {
+          assertSignumEquals(vi, vj, exp, 
FLOAT_COMPARATOR.compare(vi.floatValue(), vj.floatValue()));
+        }
+      }
+    }
+
+    checkThrowingUnsupportedException(FLOAT_COMPARATOR, Float.TYPE);
+  }
+
+  @Test
+  public void testDoubleComparator() {
+    Double[] valuesInAscendingOrder = {
+        null,
+        Double.NEGATIVE_INFINITY,
+        -Double.MAX_VALUE,
+        -123456.7890123456789,
+        -Double.MIN_VALUE,
+        0.0,
+        Double.MIN_VALUE,
+        123456.7890123456789,
+        Double.MAX_VALUE,
+        Double.POSITIVE_INFINITY };
+
+    for (int i = 0; i < valuesInAscendingOrder.length; ++i) {
+      for (int j = 0; j < valuesInAscendingOrder.length; ++j) {
+        Double vi = valuesInAscendingOrder[i];
+        Double vj = valuesInAscendingOrder[j];
+        int exp = i - j;
+        assertSignumEquals(vi, vj, exp, DOUBLE_COMPARATOR.compare(vi, vj));
+        if (vi != null && vj != null) {
+          assertSignumEquals(vi, vj, exp, 
DOUBLE_COMPARATOR.compare(vi.doubleValue(), vj.doubleValue()));
+        }
+      }
+    }
+
+    checkThrowingUnsupportedException(DOUBLE_COMPARATOR, Double.TYPE);
+  }
+
+  @Test
+  public void testLexicographicalBinaryComparator() {
+    testObjectComparator(UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR,
+        null,
+        Binary.fromConstantByteArray(new byte[0]),                             
              // ||
+        Binary.fromConstantByteArray(new byte[] { 127, 127, 0, 127 }, 2, 1),   
              // |00|
+        Binary.fromCharSequence("aaa"),                                        
              // |61|61|61|
+        Binary.fromString("aaaa"),                                             
              // |61|61|61|61|
+        Binary.fromReusedByteArray("aaab".getBytes()),                         
              // |61|61|61|62|
+        Binary.fromReusedByteArray("azzza".getBytes(), 1, 3),                  
              // |7A|7A|7A|
+        Binary.fromReusedByteBuffer(ByteBuffer.wrap("zzzzzz".getBytes())),     
              // |7A|7A|7A|7A|7A|7A|
+        Binary.fromReusedByteBuffer(ByteBuffer.wrap("aazzzzzzaa".getBytes(), 
2, 7)),         // |7A|7A|7A|7A|7A|7A|61|
+        Binary.fromConstantByteBuffer(ByteBuffer.wrap(new byte[] { -128, -128, 
-128 })),     // |80|80|80|
+        Binary.fromConstantByteBuffer(ByteBuffer.wrap(new byte[] { -128, -128, 
-1 }, 1, 2))  // |80|FF|
+    );
+  }
+
+  @Test
+  public void testBinaryAsSignedIntegerComparator() {
+    testObjectComparator(BINARY_AS_SIGNED_INTEGER_COMPARATOR,
+        null,
+        Binary.fromConstantByteArray(new 
BigInteger("-9999999999999999999999999999999999999999").toByteArray()),
+        Binary.fromReusedByteArray(new 
BigInteger("-9999999999999999999999999999999999999998").toByteArray()),
+        
Binary.fromConstantByteArray(BigInteger.valueOf(Long.MIN_VALUE).subtract(BigInteger.ONE).toByteArray()),
+        
Binary.fromConstantByteArray(BigInteger.valueOf(Long.MIN_VALUE).toByteArray()),
+        
Binary.fromConstantByteArray(BigInteger.valueOf(Long.MIN_VALUE).add(BigInteger.ONE).toByteArray()),
+        Binary.fromReusedByteArray(new byte[] { (byte) 0xFF, (byte) 0xFF, 
(byte) 0xFF, -2 }, 1, 3),
+        Binary.fromReusedByteArray(new BigInteger("-1").toByteArray()),
+        Binary.fromConstantByteBuffer(ByteBuffer.wrap(new 
BigInteger("0").toByteArray())),
+        Binary.fromReusedByteBuffer(ByteBuffer.wrap(new byte[] { 0, 0, 0, 1 
})),
+        Binary.fromConstantByteBuffer(ByteBuffer.wrap(new byte[] { 0, 0, 0, 2 
}), 2, 2),
+        Binary.fromConstantByteBuffer(
+            
ByteBuffer.wrap(BigInteger.valueOf(Long.MAX_VALUE).subtract(BigInteger.ONE).toByteArray())),
+        
Binary.fromConstantByteBuffer(ByteBuffer.wrap(BigInteger.valueOf(Long.MAX_VALUE).toByteArray())),
+        Binary
+            .fromConstantByteBuffer(
+                
ByteBuffer.wrap(BigInteger.valueOf(Long.MAX_VALUE).add(BigInteger.ONE).toByteArray())),
+        Binary.fromConstantByteBuffer(
+            ByteBuffer.wrap(new 
BigInteger("999999999999999999999999999999999999999").toByteArray())),
+        Binary.fromReusedByteBuffer(
+            ByteBuffer.wrap(new 
BigInteger("9999999999999999999999999999999999999998").toByteArray())),
+        Binary.fromConstantByteBuffer(
+            ByteBuffer.wrap(new 
BigInteger("9999999999999999999999999999999999999999").toByteArray())));
+  }
+
+  private <T> void testObjectComparator(PrimitiveComparator<T> comparator, 
T... valuesInAscendingOrder) {
+    for (int i = 0; i < valuesInAscendingOrder.length; ++i) {
+      for (int j = 0; j < valuesInAscendingOrder.length; ++j) {
+        T vi = valuesInAscendingOrder[i];
+        T vj = valuesInAscendingOrder[j];
+        int exp = i - j;
+        assertSignumEquals(vi, vj, exp, comparator.compare(vi, vj));
+      }
+    }
+
+    checkThrowingUnsupportedException(comparator, null);
+  }
+
+  private <T> void assertSignumEquals(T v1, T v2, int expected, int actual) {
+    String sign = expected < 0 ? " < " : expected > 0 ? " > " : " = ";
+    assertEquals("expected: " + v1 + sign + v2, signum(expected), 
signum(actual));
+  }
+
+  private int signum(int i) {
+    return i < 0 ? -1 : i > 0 ? 1 : 0;
+  }
+
+  private void checkThrowingUnsupportedException(PrimitiveComparator<?> 
comparator, Class<?> exclude) {
+    if (Integer.TYPE != exclude) {
+      try {
+        comparator.compare(0, 0);
+        fail("An UnsupportedOperationException should have been thrown");
+      } catch (UnsupportedOperationException e) {
+      }
+    }
+    if (Long.TYPE != exclude) {
+      try {
+        comparator.compare(0L, 0L);
+        fail("An UnsupportedOperationException should have been thrown");
+      } catch (UnsupportedOperationException e) {
+      }
+    }
+    if (Float.TYPE != exclude) {
+      try {
+        comparator.compare(0.0F, 0.0F);
+        fail("An UnsupportedOperationException should have been thrown");
+      } catch (UnsupportedOperationException e) {
+      }
+    }
+    if (Double.TYPE != exclude) {
+      try {
+        comparator.compare(0.0, 0.0);
+        fail("An UnsupportedOperationException should have been thrown");
+      } catch (UnsupportedOperationException e) {
+      }
+    }
+    if (Boolean.TYPE != exclude) {
+      try {
+        comparator.compare(false, false);
+        fail("An UnsupportedOperationException should have been thrown");
+      } catch (UnsupportedOperationException e) {
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java
----------------------------------------------------------------------
diff --git 
a/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java 
b/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java
index 0c39ef2..0b1f41a 100644
--- 
a/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java
+++ 
b/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java
@@ -24,6 +24,7 @@ import java.util.concurrent.Callable;
 import org.junit.Assert;
 import org.junit.Test;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type.Repetition;
 
 import static org.apache.parquet.schema.OriginalType.*;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
@@ -1348,6 +1349,52 @@ public class TestTypeBuilders {
     Assert.assertEquals(expected, actual);
   }
 
+  @Test
+  public void testTypeConstructionWithUndefinedColumnOrder() {
+    PrimitiveTypeName[] types = new PrimitiveTypeName[] {
+        BOOLEAN, INT32, INT64, INT96, FLOAT, DOUBLE, BINARY, 
FIXED_LEN_BYTE_ARRAY
+    };
+    for (PrimitiveTypeName type : types) {
+      String name = type.toString() + "_";
+      int len = type == FIXED_LEN_BYTE_ARRAY ? 42 : 0;
+      PrimitiveType expected = new PrimitiveType(Repetition.OPTIONAL, type, 
len, name, null, null, null,
+          ColumnOrder.undefined());
+      PrimitiveType built = 
Types.optional(type).length(len).columnOrder(ColumnOrder.undefined()).named(name);
+      Assert.assertEquals(expected, built);
+    }
+  }
+
+  @Test
+  public void testTypeConstructionWithTypeDefinedColumnOrder() {
+    PrimitiveTypeName[] types = new PrimitiveTypeName[] {
+        BOOLEAN, INT32, INT64, FLOAT, DOUBLE, BINARY, FIXED_LEN_BYTE_ARRAY
+    };
+    for (PrimitiveTypeName type : types) {
+      String name = type.toString() + "_";
+      int len = type == FIXED_LEN_BYTE_ARRAY ? 42 : 0;
+      PrimitiveType expected = new PrimitiveType(Repetition.OPTIONAL, type, 
len, name, null, null, null,
+          ColumnOrder.typeDefined());
+      PrimitiveType built = 
Types.optional(type).length(len).columnOrder(ColumnOrder.typeDefined()).named(name);
+      Assert.assertEquals(expected, built);
+    }
+  }
+
+  @Test
+  public void testTypeConstructionWithUnsupportedColumnOrder() {
+    assertThrows(null, IllegalArgumentException.class, new 
Callable<PrimitiveType>() {
+      @Override
+      public PrimitiveType call() {
+        return 
Types.optional(INT96).columnOrder(ColumnOrder.typeDefined()).named("int96_unsupported");
+      }
+    });
+    assertThrows(null, IllegalArgumentException.class, new 
Callable<PrimitiveType>() {
+      @Override
+      public PrimitiveType call() {
+        return 
Types.optional(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL)
+            
.columnOrder(ColumnOrder.typeDefined()).named("interval_unsupported");
+      }
+    });
+  }
 
   /**
    * A convenience method to avoid a large number of @Test(expected=...) tests

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java
----------------------------------------------------------------------
diff --git 
a/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java
 
b/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java
index 1dfaf6f..fc5413e 100644
--- 
a/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java
+++ 
b/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java
@@ -45,29 +45,29 @@ public class IncrementallyUpdatedFilterPredicateGenerator {
   private static class TypeInfo {
     public final String className;
     public final String primitiveName;
-    public final boolean useComparable;
     public final boolean supportsInequality;
 
-    private TypeInfo(String className, String primitiveName, boolean 
useComparable, boolean supportsInequality) {
+    private TypeInfo(String className, String primitiveName, boolean 
supportsInequality) {
       this.className = className;
       this.primitiveName = primitiveName;
-      this.useComparable = useComparable;
       this.supportsInequality = supportsInequality;
     }
   }
 
   private static final TypeInfo[] TYPES = new TypeInfo[]{
-    new TypeInfo("Integer", "int", false, true),
-    new TypeInfo("Long", "long", false, true),
-    new TypeInfo("Boolean", "boolean", false, false),
-    new TypeInfo("Float", "float", false, true),
-    new TypeInfo("Double", "double", false, true),
-    new TypeInfo("Binary", "Binary", true, true),
+    new TypeInfo("Integer", "int",  true),
+    new TypeInfo("Long", "long", true),
+    new TypeInfo("Boolean", "boolean", false),
+    new TypeInfo("Float", "float", true),
+    new TypeInfo("Double", "double", true),
+    new TypeInfo("Binary", "Binary", true),
   };
 
   public void run() throws IOException {
     add("package org.apache.parquet.filter2.recordlevel;\n" +
         "\n" +
+        "import java.util.List;\n" +
+        "\n" +
         "import org.apache.parquet.hadoop.metadata.ColumnPath;\n" +
         "import org.apache.parquet.filter2.predicate.Operators.Eq;\n" +
         "import org.apache.parquet.filter2.predicate.Operators.Gt;\n" +
@@ -79,7 +79,9 @@ public class IncrementallyUpdatedFilterPredicateGenerator {
         "import org.apache.parquet.filter2.predicate.Operators.UserDefined;\n" 
+
         "import org.apache.parquet.filter2.predicate.UserDefinedPredicate;\n" +
         "import 
org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;\n"
 +
-        "import org.apache.parquet.io.api.Binary;\n\n" +
+        "import org.apache.parquet.io.api.Binary;\n" +
+        "import org.apache.parquet.io.PrimitiveColumnIO;\n" +
+        "import org.apache.parquet.schema.PrimitiveComparator;\n\n" +
         "/**\n" +
         " * This class is auto-generated by {@link 
parquet.filter2.IncrementallyUpdatedFilterPredicateGenerator}\n" +
         " * Do not manually edit!\n" +
@@ -88,6 +90,10 @@ public class IncrementallyUpdatedFilterPredicateGenerator {
 
     add("public class IncrementallyUpdatedFilterPredicateBuilder extends 
IncrementallyUpdatedFilterPredicateBuilderBase {\n\n");
 
+    add("  public 
IncrementallyUpdatedFilterPredicateBuilder(List<PrimitiveColumnIO> leaves) {\n" 
+
+      "    super(leaves);\n" +
+      "  }\n\n");
+
     addVisitBegin("Eq");
     for (TypeInfo info : TYPES) {
       addEqNotEqCase(info, true);
@@ -180,6 +186,7 @@ public class IncrementallyUpdatedFilterPredicateGenerator {
         "        };\n" +
         "      } else {\n" +
         "        final " + info.primitiveName + " target = (" + info.className 
+ ") (Object) pred.getValue();\n" +
+        "        final PrimitiveComparator<" + info.className + "> comparator 
= getComparator(columnPath);\n" +
         "\n" +
         "        valueInspector = new ValueInspector() {\n" +
         "          @Override\n" +
@@ -190,11 +197,7 @@ public class IncrementallyUpdatedFilterPredicateGenerator {
         "          @Override\n" +
         "          public void update(" + info.primitiveName + " value) {\n");
 
-    if (info.useComparable) {
-      add("            setResult(" + compareEquality("value", "target", isEq) 
+ ");\n");
-    } else {
-      add("            setResult(" + (isEq ? "value == target" : "value != 
target" )  + ");\n");
-    }
+    add("            setResult(" + compareEquality("value", "target", isEq) + 
");\n");
 
     add("          }\n" +
         "        };\n" +
@@ -212,6 +215,7 @@ public class IncrementallyUpdatedFilterPredicateGenerator {
 
     add("    if (clazz.equals(" + info.className + ".class)) {\n" +
         "      final " + info.primitiveName + " target = (" + info.className + 
") (Object) pred.getValue();\n" +
+        "      final PrimitiveComparator<" + info.className + "> comparator = 
getComparator(columnPath);\n" +
         "\n" +
         "      valueInspector = new ValueInspector() {\n" +
         "        @Override\n" +
@@ -222,11 +226,8 @@ public class IncrementallyUpdatedFilterPredicateGenerator {
         "        @Override\n" +
         "        public void update(" + info.primitiveName + " value) {\n");
 
-    if (info.useComparable) {
-      add("          setResult(value.compareTo(target) " + op + " 0);\n");
-    } else {
-      add("          setResult(value " + op + " target);\n");
-    }
+    add("          setResult(comparator.compare(value, target) " + op + " 
0);\n");
+
     add("        }\n" +
         "      };\n" +
         "    }\n\n");
@@ -260,7 +261,7 @@ public class IncrementallyUpdatedFilterPredicateGenerator {
   }
 
   private String compareEquality(String var, String target, boolean eq) {
-    return var + ".compareTo(" + target + ")" + (eq ? " == 0 " : " != 0");
+    return "comparator.compare(" + var + ", " + target + ")" + (eq ? " == 0 " 
: " != 0");
   }
 
   private void add(String s) throws IOException {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
index 19604ec..eaba2c1 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
@@ -33,6 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -75,7 +76,7 @@ public class DictionaryFilter implements 
FilterPredicate.Visitor<Boolean> {
 
   @SuppressWarnings("unchecked")
   private <T extends Comparable<T>> Set<T> 
expandDictionary(ColumnChunkMetaData meta) throws IOException {
-    ColumnDescriptor col = new ColumnDescriptor(meta.getPath().toArray(), 
meta.getType(), -1, -1);
+    ColumnDescriptor col = new ColumnDescriptor(meta.getPath().toArray(), 
meta.getPrimitiveType(), -1, -1);
     DictionaryPage page = dictionaries.readDictionaryPage(col);
 
     // the chunk may not be dictionary-encoded
@@ -212,8 +213,9 @@ public class DictionaryFilter implements 
FilterPredicate.Visitor<Boolean> {
         return BLOCK_MIGHT_MATCH;
       }
 
+      Comparator<T> comparator = meta.getPrimitiveType().comparator();
       for (T entry : dictSet) {
-        if (value.compareTo(entry) > 0) {
+        if (comparator.compare(value, entry) > 0) {
           return BLOCK_MIGHT_MATCH;
         }
       }
@@ -253,8 +255,9 @@ public class DictionaryFilter implements 
FilterPredicate.Visitor<Boolean> {
         return BLOCK_MIGHT_MATCH;
       }
 
+      Comparator<T> comparator = meta.getPrimitiveType().comparator();
       for (T entry : dictSet) {
-        if (value.compareTo(entry) >= 0) {
+        if (comparator.compare(value, entry) >= 0) {
           return BLOCK_MIGHT_MATCH;
         }
       }
@@ -292,8 +295,9 @@ public class DictionaryFilter implements 
FilterPredicate.Visitor<Boolean> {
         return BLOCK_MIGHT_MATCH;
       }
 
+      Comparator<T> comparator = meta.getPrimitiveType().comparator();
       for (T entry : dictSet) {
-        if (value.compareTo(entry) < 0) {
+        if (comparator.compare(value, entry) < 0) {
           return BLOCK_MIGHT_MATCH;
         }
       }
@@ -333,8 +337,9 @@ public class DictionaryFilter implements 
FilterPredicate.Visitor<Boolean> {
         return BLOCK_MIGHT_MATCH;
       }
 
+      Comparator<T> comparator = meta.getPrimitiveType().comparator();
       for (T entry : dictSet) {
-        if (value.compareTo(entry) <= 0) {
+        if (comparator.compare(value, entry) <= 0) {
           return BLOCK_MIGHT_MATCH;
         }
       }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java
index ac7132e..f168a60 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java
@@ -134,7 +134,7 @@ public class StatisticsFilter implements 
FilterPredicate.Visitor<Boolean> {
     }
 
     // drop if value < min || value > max
-    return value.compareTo(stats.genericGetMin()) < 0 || 
value.compareTo(stats.genericGetMax()) > 0;
+    return stats.compareMinToValue(value) > 0 || 
stats.compareMaxToValue(value) < 0;
   }
 
   @Override
@@ -173,7 +173,7 @@ public class StatisticsFilter implements 
FilterPredicate.Visitor<Boolean> {
     }
 
     // drop if this is a column where min = max = value
-    return value.compareTo(stats.genericGetMin()) == 0 && 
value.compareTo(stats.genericGetMax()) == 0;
+    return stats.compareMinToValue(value) == 0 && 
stats.compareMaxToValue(value) == 0;
   }
 
   @Override
@@ -204,7 +204,7 @@ public class StatisticsFilter implements 
FilterPredicate.Visitor<Boolean> {
     T value = lt.getValue();
 
     // drop if value <= min
-    return  value.compareTo(stats.genericGetMin()) <= 0;
+    return stats.compareMinToValue(value) >= 0;
   }
 
   @Override
@@ -235,7 +235,7 @@ public class StatisticsFilter implements 
FilterPredicate.Visitor<Boolean> {
     T value = ltEq.getValue();
 
     // drop if value < min
-    return value.compareTo(stats.genericGetMin()) < 0;
+    return stats.compareMinToValue(value) > 0;
   }
 
   @Override
@@ -266,7 +266,7 @@ public class StatisticsFilter implements 
FilterPredicate.Visitor<Boolean> {
     T value = gt.getValue();
 
     // drop if value >= max
-    return value.compareTo(stats.genericGetMax()) >= 0;
+    return stats.compareMaxToValue(value) <= 0;
   }
 
   @Override
@@ -296,8 +296,8 @@ public class StatisticsFilter implements 
FilterPredicate.Visitor<Boolean> {
 
     T value = gtEq.getValue();
 
-    // drop if value >= max
-    return value.compareTo(stats.genericGetMax()) > 0;
+    // drop if value > max
+    return stats.compareMaxToValue(value) < 0;
   }
 
   @Override
@@ -356,7 +356,8 @@ public class StatisticsFilter implements 
FilterPredicate.Visitor<Boolean> {
     }
 
     org.apache.parquet.filter2.predicate.Statistics<T> udpStats =
-        new 
org.apache.parquet.filter2.predicate.Statistics<T>(stats.genericGetMin(), 
stats.genericGetMax());
+      new 
org.apache.parquet.filter2.predicate.Statistics<T>(stats.genericGetMin(), 
stats.genericGetMax(),
+        stats.comparator());
 
     if (inverted) {
       return udp.inverseCanDrop(udpStats);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 163056c..ef59760 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -44,6 +44,7 @@ import org.apache.parquet.format.PageEncodingStats;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.format.ColumnChunk;
 import org.apache.parquet.format.ColumnMetaData;
+import org.apache.parquet.format.ColumnOrder;
 import org.apache.parquet.format.ConvertedType;
 import org.apache.parquet.format.DataPageHeader;
 import org.apache.parquet.format.DataPageHeaderV2;
@@ -58,12 +59,14 @@ import org.apache.parquet.format.RowGroup;
 import org.apache.parquet.format.SchemaElement;
 import org.apache.parquet.format.Statistics;
 import org.apache.parquet.format.Type;
+import org.apache.parquet.format.TypeDefinedOrder;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.column.EncodingStats;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.ColumnOrder.ColumnOrderName;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.OriginalType;
@@ -79,6 +82,7 @@ import org.slf4j.LoggerFactory;
 // TODO: Lets split it up: https://issues.apache.org/jira/browse/PARQUET-310
 public class ParquetMetadataConverter {
 
+  private static final TypeDefinedOrder TYPE_DEFINED_ORDER = new 
TypeDefinedOrder();
   public static final MetadataFilter NO_FILTER = new NoFilter();
   public static final MetadataFilter SKIP_ROW_GROUPS = new 
SkipMetadataFilter();
   public static final long MAX_STATS_SIZE = 4096; // limit stats to 4k
@@ -135,9 +139,24 @@ public class ParquetMetadataConverter {
     }
 
     
fileMetaData.setCreated_by(parquetMetadata.getFileMetaData().getCreatedBy());
+
+    
fileMetaData.setColumn_orders(getColumnOrders(parquetMetadata.getFileMetaData().getSchema()));
+
     return fileMetaData;
   }
 
+  private List<ColumnOrder> getColumnOrders(MessageType schema) {
+    List<ColumnOrder> columnOrders = new ArrayList<>();
+    // Currently, only TypeDefinedOrder is supported, so we create a column 
order for each columns with
+    // TypeDefinedOrder even if some types (e.g. INT96) have undefined column 
orders.
+    for (int i = 0, n = schema.getPaths().size(); i < n; ++i) {
+      ColumnOrder columnOrder = new ColumnOrder();
+      columnOrder.setTYPE_ORDER(TYPE_DEFINED_ORDER);
+      columnOrders.add(columnOrder);
+    }
+    return columnOrders;
+  }
+
   // Visible for testing
   List<SchemaElement> toParquetSchema(MessageType schema) {
     List<SchemaElement> result = new ArrayList<SchemaElement>();
@@ -326,20 +345,37 @@ public class ParquetMetadataConverter {
   }
 
   public static Statistics toParquetStatistics(
-      org.apache.parquet.column.statistics.Statistics statistics) {
-    Statistics stats = new Statistics();
+      org.apache.parquet.column.statistics.Statistics stats) {
+    Statistics formatStats = new Statistics();
     // Don't write stats larger than the max size rather than truncating. The
     // rationale is that some engines may use the minimum value in the page as
     // the true minimum for aggregations and there is no way to mark that a
     // value has been truncated and is a lower bound and not in the page.
-    if (!statistics.isEmpty() && statistics.isSmallerThan(MAX_STATS_SIZE)) {
-      stats.setNull_count(statistics.getNumNulls());
-      if (statistics.hasNonNullValue()) {
-        stats.setMax(statistics.getMaxBytes());
-        stats.setMin(statistics.getMinBytes());
+    if (!stats.isEmpty() && stats.isSmallerThan(MAX_STATS_SIZE)) {
+      formatStats.setNull_count(stats.getNumNulls());
+      if (stats.hasNonNullValue()) {
+        byte[] min = stats.getMinBytes();
+        byte[] max = stats.getMaxBytes();
+
+        // Fill the former min-max statistics only if the comparison logic is
+        // signed so the logic of V1 and V2 stats are the same (which is
+        // trivially true for equal min-max values)
+        if (sortOrder(stats.type()) == SortOrder.SIGNED || Arrays.equals(min, 
max)) {
+          formatStats.setMin(min);
+          formatStats.setMax(max);
+        }
+
+        if (isMinMaxStatsSupported(stats.type()) || Arrays.equals(min, max)) {
+          formatStats.setMin_value(min);
+          formatStats.setMax_value(max);
+        }
       }
     }
-    return stats;
+    return formatStats;
+  }
+
+  private static boolean isMinMaxStatsSupported(PrimitiveType type) {
+    return type.columnOrder().getColumnOrderName() == 
ColumnOrderName.TYPE_DEFINED_ORDER;
   }
 
   /**
@@ -357,29 +393,42 @@ public class ParquetMetadataConverter {
   @Deprecated
   public static org.apache.parquet.column.statistics.Statistics 
fromParquetStatistics
       (String createdBy, Statistics statistics, PrimitiveTypeName type) {
-    return fromParquetStatisticsInternal(createdBy, statistics, type, 
defaultSortOrder(type));
+    return fromParquetStatisticsInternal(createdBy, statistics,
+        new PrimitiveType(Repetition.OPTIONAL, type, "fake_type"), 
defaultSortOrder(type));
   }
 
   // Visible for testing
   static org.apache.parquet.column.statistics.Statistics 
fromParquetStatisticsInternal
-      (String createdBy, Statistics statistics, PrimitiveTypeName type, 
SortOrder typeSortOrder) {
+      (String createdBy, Statistics formatStats, PrimitiveType type, SortOrder 
typeSortOrder) {
     // create stats object based on the column type
-    org.apache.parquet.column.statistics.Statistics stats = 
org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType(type);
-    // If there was no statistics written to the footer, create an empty 
Statistics object and return
-
-    boolean isSet = statistics != null && statistics.isSetMax() && 
statistics.isSetMin();
-    boolean maxEqualsMin = isSet ? Arrays.equals(statistics.getMin(), 
statistics.getMax()) : false;
-    boolean sortOrdersMatch = SortOrder.SIGNED == typeSortOrder;
-    // NOTE: See docs in CorruptStatistics for explanation of why this check 
is needed
-    // The sort order is checked to avoid returning min/max stats that are not
-    // valid with the type's sort order. Currently, all stats are aggregated
-    // using a signed ordering, which isn't valid for strings or unsigned ints.
-    if (statistics != null && 
!CorruptStatistics.shouldIgnoreStatistics(createdBy, type) &&
-        ( sortOrdersMatch || maxEqualsMin)) {
-      if (isSet) {
-        stats.setMinMaxFromBytes(statistics.min.array(), 
statistics.max.array());
+    org.apache.parquet.column.statistics.Statistics stats = 
org.apache.parquet.column.statistics.Statistics.createStats(type);
+
+    if (formatStats != null) {
+      // Use the new V2 min-max statistics over the former one if it is filled
+      if (formatStats.isSetMin_value() && formatStats.isSetMax_value()) {
+        byte[] min = formatStats.min_value.array();
+        byte[] max = formatStats.max_value.array();
+        if (isMinMaxStatsSupported(type) || Arrays.equals(min, max)) {
+          stats.setMinMaxFromBytes(min, max);
+        }
+        stats.setNumNulls(formatStats.null_count);
+      } else {
+        boolean isSet = formatStats.isSetMax() && formatStats.isSetMin();
+        boolean maxEqualsMin = isSet ? Arrays.equals(formatStats.getMin(), 
formatStats.getMax()) : false;
+        boolean sortOrdersMatch = SortOrder.SIGNED == typeSortOrder;
+        // NOTE: See docs in CorruptStatistics for explanation of why this 
check is needed
+        // The sort order is checked to avoid returning min/max stats that are 
not
+        // valid with the type's sort order. In previous releases, all stats 
were
+        // aggregated using a signed byte-wise ordering, which isn't valid for 
all the
+        // types (e.g. strings, decimals etc.).
+        if (!CorruptStatistics.shouldIgnoreStatistics(createdBy, 
type.getPrimitiveTypeName()) &&
+            (sortOrdersMatch || maxEqualsMin)) {
+          if (isSet) {
+            stats.setMinMaxFromBytes(formatStats.min.array(), 
formatStats.max.array());
+          }
+          stats.setNumNulls(formatStats.null_count);
+        }
       }
-      stats.setNumNulls(statistics.null_count);
     }
     return stats;
   }
@@ -389,7 +438,7 @@ public class ParquetMetadataConverter {
     SortOrder expectedOrder = overrideSortOrderToSigned(type) ?
         SortOrder.SIGNED : sortOrder(type);
     return fromParquetStatisticsInternal(
-        createdBy, statistics, type.getPrimitiveTypeName(), expectedOrder);
+        createdBy, statistics, type, expectedOrder);
   }
 
   /**
@@ -827,7 +876,7 @@ public class ParquetMetadataConverter {
   }
 
   public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) 
throws IOException {
-    MessageType messageType = fromParquetSchema(parquetMetadata.getSchema());
+    MessageType messageType = fromParquetSchema(parquetMetadata.getSchema(), 
parquetMetadata.getColumn_orders());
     List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
     List<RowGroup> row_groups = parquetMetadata.getRow_groups();
     if (row_groups != null) {
@@ -846,7 +895,7 @@ public class ParquetMetadataConverter {
           ColumnPath path = getPath(metaData);
           ColumnChunkMetaData column = ColumnChunkMetaData.get(
               path,
-              
messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName(),
+              messageType.getType(path.toArray()).asPrimitiveType(),
               fromFormatCodec(metaData.codec),
               convertEncodingStats(metaData.getEncoding_stats()),
               fromFormatEncodings(metaData.encodings),
@@ -886,20 +935,22 @@ public class ParquetMetadataConverter {
   }
 
   // Visible for testing
-  MessageType fromParquetSchema(List<SchemaElement> schema) {
+  MessageType fromParquetSchema(List<SchemaElement> schema, List<ColumnOrder> 
columnOrders) {
     Iterator<SchemaElement> iterator = schema.iterator();
     SchemaElement root = iterator.next();
     Types.MessageTypeBuilder builder = Types.buildMessage();
     if (root.isSetField_id()) {
       builder.id(root.field_id);
     }
-    buildChildren(builder, iterator, root.getNum_children());
+    buildChildren(builder, iterator, root.getNum_children(), columnOrders, 0);
     return builder.named(root.name);
   }
 
   private void buildChildren(Types.GroupBuilder builder,
                              Iterator<SchemaElement> schema,
-                             int childrenCount) {
+                             int childrenCount,
+                             List<ColumnOrder> columnOrders,
+                             int columnCount) {
     for (int i = 0; i < childrenCount; i++) {
       SchemaElement schemaElement = schema.next();
 
@@ -918,11 +969,21 @@ public class ParquetMetadataConverter {
         if (schemaElement.isSetScale()) {
           primitiveBuilder.scale(schemaElement.scale);
         }
+        if (columnOrders != null) {
+          org.apache.parquet.schema.ColumnOrder columnOrder = 
fromParquetColumnOrder(columnOrders.get(columnCount));
+          // As per parquet format 2.4.0 no UNDEFINED order is supported. So, 
set undefined column order for the types
+          // where ordering is not supported.
+          if (columnOrder.getColumnOrderName() == 
ColumnOrderName.TYPE_DEFINED_ORDER
+              && (schemaElement.type == Type.INT96 || 
schemaElement.converted_type == ConvertedType.INTERVAL)) {
+            columnOrder = org.apache.parquet.schema.ColumnOrder.undefined();
+          }
+          primitiveBuilder.columnOrder(columnOrder);
+        }
         childBuilder = primitiveBuilder;
 
       } else {
         childBuilder = 
builder.group(fromParquetRepetition(schemaElement.repetition_type));
-        buildChildren((Types.GroupBuilder) childBuilder, schema, 
schemaElement.num_children);
+        buildChildren((Types.GroupBuilder) childBuilder, schema, 
schemaElement.num_children, columnOrders, columnCount);
       }
 
       if (schemaElement.isSetConverted_type()) {
@@ -933,6 +994,7 @@ public class ParquetMetadataConverter {
       }
 
       childBuilder.named(schemaElement.name);
+      ++columnCount;
     }
   }
 
@@ -946,6 +1008,14 @@ public class ParquetMetadataConverter {
     return Repetition.valueOf(repetition.name());
   }
 
+  private static org.apache.parquet.schema.ColumnOrder 
fromParquetColumnOrder(ColumnOrder columnOrder) {
+    if (columnOrder.isSetTYPE_ORDER()) {
+      return org.apache.parquet.schema.ColumnOrder.typeDefined();
+    }
+    // The column order is not yet supported by this API
+    return org.apache.parquet.schema.ColumnOrder.undefined();
+  }
+
   @Deprecated
   public void writeDataPageHeader(
       int uncompressedSize,
@@ -994,8 +1064,7 @@ public class ParquetMetadataConverter {
         getEncoding(dlEncoding),
         getEncoding(rlEncoding)));
     if (!statistics.isEmpty()) {
-      pageHeader.getData_page_header().setStatistics(
-          toParquetStatistics(statistics));
+      
pageHeader.getData_page_header().setStatistics(toParquetStatistics(statistics));
     }
     return pageHeader;
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index ac3cd3b..82c288f 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -18,8 +18,6 @@
  */
 package org.apache.parquet.hadoop;
 
-import static 
org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType;
-
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -79,7 +77,6 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
       this.compressor = compressor;
       this.allocator = allocator;
       this.buf = new ConcatenatingByteArrayCollector();
-      this.totalStatistics = getStatsBasedOnType(this.path.getType());
     }
 
     @Override
@@ -116,7 +113,14 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
       this.compressedLength += compressedSize;
       this.totalValueCount += valueCount;
       this.pageCount += 1;
-      this.totalStatistics.mergeStatistics(statistics);
+
+      // Copying the statistics if it is not initialized yet so we have the 
correct typed one
+      if (totalStatistics == null) {
+        totalStatistics = statistics.copy();
+      } else {
+        totalStatistics.mergeStatistics(statistics);
+      }
+
       // by concatenating before collecting instead of collecting twice,
       // we only allocate one buffer to copy into instead of multiple.
       buf.collect(BytesInput.concat(BytesInput.from(tempOutputStream), 
compressedBytes));
@@ -154,7 +158,13 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
       this.compressedLength += compressedSize;
       this.totalValueCount += valueCount;
       this.pageCount += 1;
-      this.totalStatistics.mergeStatistics(statistics);
+
+      // Copying the statistics if it is not initialized yet so we have the 
correct typed one
+      if (totalStatistics == null) {
+        totalStatistics = statistics.copy();
+      } else {
+        totalStatistics.mergeStatistics(statistics);
+      }
 
       // by concatenating before collecting instead of collecting twice,
       // we only allocate one buffer to copy into instead of multiple.

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index da8635d..285c2db 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -65,7 +65,7 @@ import org.apache.parquet.io.SeekableInputStream;
 import org.apache.parquet.io.ParquetEncodingException;
 import org.apache.parquet.io.PositionOutputStream;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.TypeUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -116,7 +116,7 @@ public class ParquetFileWriter {
   // column chunk data set at the start of a column
   private CompressionCodecName currentChunkCodec; // set in startColumn
   private ColumnPath currentChunkPath;            // set in startColumn
-  private PrimitiveTypeName currentChunkType;     // set in startColumn
+  private PrimitiveType currentChunkType;         // set in startColumn
   private long currentChunkValueCount;            // set in startColumn
   private long currentChunkFirstDataPage;         // set in startColumn 
(out.pos())
   private long currentChunkDictionaryPageOffset;  // set in writeDictionaryPage
@@ -317,15 +317,14 @@ public class ParquetFileWriter {
     encodingStatsBuilder.clear();
     currentEncodings = new HashSet<Encoding>();
     currentChunkPath = ColumnPath.get(descriptor.getPath());
-    currentChunkType = descriptor.getType();
+    currentChunkType = descriptor.getPrimitiveType();
     currentChunkCodec = compressionCodecName;
     currentChunkValueCount = valueCount;
     currentChunkFirstDataPage = out.getPos();
     compressedLength = 0;
     uncompressedLength = 0;
-    // need to know what type of stats to initialize to
-    // better way to do this?
-    currentStatistics = Statistics.getStatsBasedOnType(currentChunkType);
+    // The statistics will be copied from the first one added at 
writeDataPage(s) so we have the correct typed one
+    currentStatistics = null;
   }
 
   /**
@@ -425,7 +424,14 @@ public class ParquetFileWriter {
     this.compressedLength += compressedPageSize + headerSize;
     LOG.debug("{}: write data page content {}", out.getPos(), 
compressedPageSize);
     bytes.writeAllTo(out);
-    currentStatistics.mergeStatistics(statistics);
+
+    // Copying the statistics if it is not initialized yet so we have the 
correct typed one
+    if (currentStatistics == null) {
+      currentStatistics = statistics.copy();
+    } else {
+      currentStatistics.mergeStatistics(statistics);
+    }
+
     encodingStatsBuilder.addDataEncoding(valuesEncoding);
     currentEncodings.add(rlEncoding);
     currentEncodings.add(dlEncoding);
@@ -599,7 +605,7 @@ public class ParquetFileWriter {
 
       currentBlock.addColumn(ColumnChunkMetaData.get(
           chunk.getPath(),
-          chunk.getType(),
+          chunk.getPrimitiveType(),
           chunk.getCodec(),
           chunk.getEncodingStats(),
           chunk.getEncodings(),

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
index 720bd77..e198698 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
@@ -24,7 +24,9 @@ import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.EncodingStats;
 import org.apache.parquet.column.statistics.BooleanStatistics;
 import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Types;
 
 /**
  * Column meta data for a block stored in the file footer and passed in the 
InputSplit
@@ -65,6 +67,12 @@ abstract public class ColumnChunkMetaData {
         valueCount, totalSize, totalUncompressedSize);
   }
 
+  /**
+   * @deprecated will be removed in 2.0.0. Use
+   *             {@link #get(ColumnPath, PrimitiveType, CompressionCodecName, 
EncodingStats, Set, Statistics, long, long, long, long, long)}
+   *             instead.
+   */
+  @Deprecated
   public static ColumnChunkMetaData get(
       ColumnPath path,
       PrimitiveTypeName type,
@@ -77,6 +85,22 @@ abstract public class ColumnChunkMetaData {
       long valueCount,
       long totalSize,
       long totalUncompressedSize) {
+    return get(path, Types.optional(type).named("fake_type"), codec, 
encodingStats, encodings, statistics,
+        firstDataPage, dictionaryPageOffset, valueCount, totalSize, 
totalUncompressedSize);
+  }
+
+  public static ColumnChunkMetaData get(
+      ColumnPath path,
+      PrimitiveType type,
+      CompressionCodecName codec,
+      EncodingStats encodingStats,
+      Set<Encoding> encodings,
+      Statistics statistics,
+      long firstDataPage,
+      long dictionaryPageOffset,
+      long valueCount,
+      long totalSize,
+      long totalUncompressedSize) {
     // to save space we store those always positive longs in ints when they 
fit.
     if (positiveLongFitsInAnInt(firstDataPage)
         && positiveLongFitsInAnInt(dictionaryPageOffset)
@@ -149,19 +173,30 @@ abstract public class ColumnChunkMetaData {
   /**
    *
    * @return column identifier
+   * @deprecated will be removed in 2.0.0. Use {@link #getPrimitiveType()} 
instead.
    */
+  @Deprecated
   public ColumnPath getPath() {
     return properties.getPath();
   }
 
   /**
    * @return type of the column
+   * @deprecated will be removed in 2.0.0. Use {@link #getPrimitiveType()} 
instead.
    */
+  @Deprecated
   public PrimitiveTypeName getType() {
     return properties.getType();
   }
 
   /**
+   * @return the primitive type object of the column
+   */
+  public PrimitiveType getPrimitiveType() {
+    return properties.getPrimitiveType();
+  }
+
+  /**
    * @return start of the column data offset
    */
   abstract public long getFirstDataPageOffset();
@@ -231,7 +266,7 @@ class IntColumnChunkMetaData extends ColumnChunkMetaData {
    */
   IntColumnChunkMetaData(
       ColumnPath path,
-      PrimitiveTypeName type,
+      PrimitiveType type,
       CompressionCodecName codec,
       EncodingStats encodingStats,
       Set<Encoding> encodings,
@@ -336,7 +371,7 @@ class LongColumnChunkMetaData extends ColumnChunkMetaData {
    */
   LongColumnChunkMetaData(
       ColumnPath path,
-      PrimitiveTypeName type,
+      PrimitiveType type,
       CompressionCodecName codec,
       EncodingStats encodingStats,
       Set<Encoding> encodings,

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkProperties.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkProperties.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkProperties.java
index 5e26675..233cf94 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkProperties.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkProperties.java
@@ -22,24 +22,36 @@ import java.util.Arrays;
 import java.util.Set;
 
 import org.apache.parquet.column.Encoding;
+import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type;
 
 public class ColumnChunkProperties {
 
   private static Canonicalizer<ColumnChunkProperties> properties = new 
Canonicalizer<ColumnChunkProperties>();
 
+  /**
+   * @deprecated will be removed in 2.0.0. Use {@link #get(ColumnPath, 
PrimitiveType, CompressionCodecName, Set)}
+   *             instead.
+   */
+  @Deprecated
   public static ColumnChunkProperties get(ColumnPath path, PrimitiveTypeName 
type, CompressionCodecName codec, Set<Encoding> encodings) {
+    return get(path, new PrimitiveType(Type.Repetition.OPTIONAL, type, ""), 
codec, encodings);
+  }
+
+  public static ColumnChunkProperties get(ColumnPath path, PrimitiveType type, 
CompressionCodecName codec,
+      Set<Encoding> encodings) {
     return properties.canonicalize(new ColumnChunkProperties(codec, path, 
type, encodings));
   }
 
   private final CompressionCodecName codec;
   private final ColumnPath path;
-  private final PrimitiveTypeName type;
+  private final PrimitiveType type;
   private final Set<Encoding> encodings;
 
   private ColumnChunkProperties(CompressionCodecName codec,
                                 ColumnPath path,
-                                PrimitiveTypeName type,
+                                PrimitiveType type,
                                 Set<Encoding> encodings) {
     super();
     this.codec = codec;
@@ -56,7 +68,19 @@ public class ColumnChunkProperties {
     return path;
   }
 
+  /**
+   * @return the primitive type name for the column
+   * @deprecated will be removed in 2.0.0. Use {@link #getPrimitiveType()} 
instead.
+   */
+  @Deprecated
   public PrimitiveTypeName getType() {
+    return type.getPrimitiveTypeName();
+  }
+
+  /**
+   * @return the primitive type object for the column
+   */
+  public PrimitiveType getPrimitiveType() {
     return type;
   }
 
@@ -68,7 +92,7 @@ public class ColumnChunkProperties {
   public boolean equals(Object obj) {
     if (obj instanceof ColumnChunkProperties) {
       ColumnChunkProperties other = (ColumnChunkProperties)obj;
-      return other.codec == codec && other.path.equals(path) && other.type == 
type && equals(other.encodings, encodings);
+      return other.codec == codec && other.path.equals(path) && 
other.type.equals(type) && equals(other.encodings, encodings);
     }
     return false;
   }

Reply via email to