This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ca0a09  [SPARK-34661][SQL] Clean up `OriginalType` and 
`DecimalMetadata ` usage in Parquet related code
7ca0a09 is described below

commit 7ca0a0910f6ea42086c64ef8eba2f21988015dd2
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Sun May 16 09:03:26 2021 -0500

    [SPARK-34661][SQL] Clean up `OriginalType` and `DecimalMetadata ` usage in 
Parquet related code
    
    ### What changes were proposed in this pull request?
    `OriginalType` and `DecimalMetadata` has been marked as `Deprecated` in new 
Parquet code.
    
    `Apache Parquet` suggest us replace `OriginalType` with 
`LogicalTypeAnnotation` and replace `DecimalMetadata` with 
`DecimalLogicalTypeAnnotation`,  so the main change of this pr is clean up 
these deprecated usages in Parquet related code.
    
    ### Why are the changes needed?
    Cleanup deprecated api usage.
    
    ### Does this PR introduce _any_ user-facing change?
     No.
    
    ### How was this patch tested?
    Pass the Jenkins or GitHub Action
    
    Closes #31776 from LuciferYang/cleanup-parquet-dep-api.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../parquet/VectorizedColumnReader.java            |  69 +++++++----
 .../parquet/VectorizedParquetRecordReader.java     |   2 +-
 .../datasources/parquet/ParquetFilters.scala       | 105 +++++++++--------
 .../datasources/parquet/ParquetReadSupport.scala   |  18 +--
 .../datasources/parquet/ParquetRowConverter.scala  |  88 ++++++++------
 .../parquet/ParquetSchemaConverter.scala           | 131 ++++++++++++---------
 6 files changed, 244 insertions(+), 169 deletions(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 52620b0..8932916 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -32,8 +32,12 @@ import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.page.*;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.schema.DecimalMetadata;
-import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation;
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation;
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
+import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
 import org.apache.parquet.schema.PrimitiveType;
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
@@ -101,7 +105,7 @@ public class VectorizedColumnReader {
 
   private final PageReader pageReader;
   private final ColumnDescriptor descriptor;
-  private final OriginalType originalType;
+  private final LogicalTypeAnnotation logicalTypeAnnotation;
   // The timezone conversion to apply to int96 timestamps. Null if no 
conversion.
   private final ZoneId convertTz;
   private static final ZoneId UTC = ZoneOffset.UTC;
@@ -110,10 +114,14 @@ public class VectorizedColumnReader {
 
   private boolean isDecimalTypeMatched(DataType dt) {
     DecimalType d = (DecimalType) dt;
-    DecimalMetadata dm = descriptor.getPrimitiveType().getDecimalMetadata();
-    // It's OK if the required decimal precision is larger than or equal to 
the physical decimal
-    // precision in the Parquet metadata, as long as the decimal scale is the 
same.
-    return dm != null && dm.getPrecision() <= d.precision() && dm.getScale() 
== d.scale();
+    LogicalTypeAnnotation typeAnnotation = 
descriptor.getPrimitiveType().getLogicalTypeAnnotation();
+    if (typeAnnotation instanceof DecimalLogicalTypeAnnotation) {
+      DecimalLogicalTypeAnnotation decimalType = 
(DecimalLogicalTypeAnnotation) typeAnnotation;
+      // It's OK if the required decimal precision is larger than or equal to 
the physical decimal
+      // precision in the Parquet metadata, as long as the decimal scale is 
the same.
+      return decimalType.getPrecision() <= d.precision() && 
decimalType.getScale() == d.scale();
+    }
+    return false;
   }
 
   private boolean canReadAsIntDecimal(DataType dt) {
@@ -133,7 +141,7 @@ public class VectorizedColumnReader {
 
   public VectorizedColumnReader(
       ColumnDescriptor descriptor,
-      OriginalType originalType,
+      LogicalTypeAnnotation logicalTypeAnnotation,
       PageReader pageReader,
       ZoneId convertTz,
       String datetimeRebaseMode,
@@ -141,7 +149,7 @@ public class VectorizedColumnReader {
     this.descriptor = descriptor;
     this.pageReader = pageReader;
     this.convertTz = convertTz;
-    this.originalType = originalType;
+    this.logicalTypeAnnotation = logicalTypeAnnotation;
     this.maxDefLevel = descriptor.getMaxDefinitionLevel();
 
     DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
@@ -172,13 +180,14 @@ public class VectorizedColumnReader {
     boolean isSupported = false;
     switch (typeName) {
       case INT32:
-        isSupported = originalType != OriginalType.DATE || 
"CORRECTED".equals(datetimeRebaseMode);
+        isSupported = !(logicalTypeAnnotation instanceof 
DateLogicalTypeAnnotation) ||
+          "CORRECTED".equals(datetimeRebaseMode);
         break;
       case INT64:
-        if (originalType == OriginalType.TIMESTAMP_MICROS) {
+        if (isTimestampTypeMatched(TimeUnit.MICROS)) {
           isSupported = "CORRECTED".equals(datetimeRebaseMode);
         } else {
-          isSupported = originalType != OriginalType.TIMESTAMP_MILLIS;
+          isSupported = !isTimestampTypeMatched(TimeUnit.MILLIS);
         }
         break;
       case FLOAT:
@@ -263,17 +272,18 @@ public class VectorizedColumnReader {
           // We need to make sure that we initialize the right type for the 
dictionary otherwise
           // WritableColumnVector will throw an exception when trying to 
decode to an Int when the
           // dictionary is in fact initialized as Long
-          boolean castLongToInt = primitiveType.getOriginalType() == 
OriginalType.DECIMAL &&
-            primitiveType.getDecimalMetadata().getPrecision() <= 
Decimal.MAX_INT_DIGITS() &&
-            primitiveType.getPrimitiveTypeName() == INT64;
+          LogicalTypeAnnotation typeAnnotation = 
primitiveType.getLogicalTypeAnnotation();
+          boolean castLongToInt = typeAnnotation instanceof 
DecimalLogicalTypeAnnotation &&
+            ((DecimalLogicalTypeAnnotation) typeAnnotation).getPrecision() <=
+            Decimal.MAX_INT_DIGITS() && primitiveType.getPrimitiveTypeName() 
== INT64;
 
           // We require a long value, but we need to use dictionary to decode 
the original
           // signed int first
-          boolean isUnsignedInt32 = primitiveType.getOriginalType() == 
OriginalType.UINT_32;
+          boolean isUnsignedInt32 = isUnsignedIntTypeMatched(32);
 
           // We require a decimal value, but we need to use dictionary to 
decode the original
           // signed long first
-          boolean isUnsignedInt64 = primitiveType.getOriginalType() == 
OriginalType.UINT_64;
+          boolean isUnsignedInt64 = isUnsignedIntTypeMatched(64);
 
           boolean needTransform = castLongToInt || isUnsignedInt32 || 
isUnsignedInt64;
           column.setDictionary(new ParquetDictionary(dictionary, 
needTransform));
@@ -398,14 +408,14 @@ public class VectorizedColumnReader {
       case INT64:
         if (column.dataType() == DataTypes.LongType ||
             canReadAsLongDecimal(column.dataType()) ||
-            (originalType == OriginalType.TIMESTAMP_MICROS &&
+            (isTimestampTypeMatched(TimeUnit.MICROS) &&
               "CORRECTED".equals(datetimeRebaseMode))) {
           for (int i = rowId; i < rowId + num; ++i) {
             if (!column.isNullAt(i)) {
               column.putLong(i, 
dictionary.decodeToLong(dictionaryIds.getDictId(i)));
             }
           }
-        } else if (originalType == OriginalType.UINT_64) {
+        } else if (isUnsignedIntTypeMatched(64)) {
           // In `ParquetToSparkSchemaConverter`, we map parquet UINT64 to our 
Decimal(20, 0).
           // For unsigned int64, it stores as dictionary encoded signed int64 
in Parquet
           // whenever dictionary is available.
@@ -418,7 +428,7 @@ public class VectorizedColumnReader {
               column.putByteArray(i, unsigned);
             }
           }
-        } else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
+        } else if (isTimestampTypeMatched(TimeUnit.MILLIS)) {
           if ("CORRECTED".equals(datetimeRebaseMode)) {
             for (int i = rowId; i < rowId + num; ++i) {
               if (!column.isNullAt(i)) {
@@ -436,7 +446,7 @@ public class VectorizedColumnReader {
               }
             }
           }
-        } else if (originalType == OriginalType.TIMESTAMP_MICROS) {
+        } else if (isTimestampTypeMatched(TimeUnit.MICROS)) {
           final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
           for (int i = rowId; i < rowId + num; ++i) {
             if (!column.isNullAt(i)) {
@@ -611,13 +621,13 @@ public class VectorizedColumnReader {
       defColumn.readLongs(
         num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn,
         DecimalType.is32BitDecimalType(column.dataType()));
-    } else if (originalType == OriginalType.UINT_64) {
+    } else if (isUnsignedIntTypeMatched(64)) {
       // In `ParquetToSparkSchemaConverter`, we map parquet UINT64 to our 
Decimal(20, 0).
       // For unsigned int64, it stores as plain signed int64 in Parquet when 
dictionary fallbacks.
       // We read them as decimal values.
       defColumn.readUnsignedLongs(
         num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
-    } else if (originalType == OriginalType.TIMESTAMP_MICROS) {
+    } else if (isTimestampTypeMatched(TimeUnit.MICROS)) {
       if ("CORRECTED".equals(datetimeRebaseMode)) {
         defColumn.readLongs(
           num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn, false);
@@ -626,7 +636,7 @@ public class VectorizedColumnReader {
         defColumn.readLongsWithRebase(
           num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn, failIfRebase);
       }
-    } else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
+    } else if (isTimestampTypeMatched(TimeUnit.MILLIS)) {
       if ("CORRECTED".equals(datetimeRebaseMode)) {
         for (int i = 0; i < num; i++) {
           if (defColumn.readInteger() == maxDefLevel) {
@@ -871,4 +881,15 @@ public class VectorizedColumnReader {
       throw new IOException("could not read page " + page + " in col " + 
descriptor, e);
     }
   }
+
+  private boolean isTimestampTypeMatched(TimeUnit unit) {
+    return logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation &&
+      ((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).getUnit() == 
unit;
+  }
+
+  private boolean isUnsignedIntTypeMatched(int bitWidth) {
+    return logicalTypeAnnotation instanceof IntLogicalTypeAnnotation &&
+      !((IntLogicalTypeAnnotation) logicalTypeAnnotation).isSigned() &&
+      ((IntLogicalTypeAnnotation) logicalTypeAnnotation).getBitWidth() == 
bitWidth;
+  }
 }
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 1b15953..3245527 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -332,7 +332,7 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
       if (missingColumns[i]) continue;
       columnReaders[i] = new VectorizedColumnReader(
         columns.get(i),
-        types.get(i).getOriginalType(),
+        types.get(i).getLogicalTypeAnnotation(),
         pages.getPageReader(columns.get(i)),
         convertTz,
         datetimeRebaseMode,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 73910c3..6eb4573 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -28,8 +28,8 @@ import scala.collection.JavaConverters.asScalaBufferConverter
 import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.filter2.predicate.SparkFilterApi._
 import org.apache.parquet.io.api.Binary
-import org.apache.parquet.schema.{DecimalMetadata, GroupType, MessageType, 
OriginalType, PrimitiveComparator, PrimitiveType, Type}
-import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.{GroupType, LogicalTypeAnnotation, 
MessageType, PrimitiveComparator, PrimitiveType, Type}
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.{DecimalLogicalTypeAnnotation, 
TimeUnit}
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
 
@@ -62,8 +62,8 @@ class ParquetFilters(
       fields.flatMap {
         case p: PrimitiveType =>
           Some(ParquetPrimitiveField(fieldNames = parentFieldNames :+ 
p.getName,
-            fieldType = ParquetSchemaType(p.getOriginalType,
-              p.getPrimitiveTypeName, p.getTypeLength, p.getDecimalMetadata)))
+            fieldType = ParquetSchemaType(p.getLogicalTypeAnnotation,
+              p.getPrimitiveTypeName, p.getTypeLength)))
         // Note that when g is a `Struct`, `g.getOriginalType` is `null`.
         // When g is a `Map`, `g.getOriginalType` is `MAP`.
         // When g is a `List`, `g.getOriginalType` is `LIST`.
@@ -105,23 +105,28 @@ class ParquetFilters(
       fieldType: ParquetSchemaType)
 
   private case class ParquetSchemaType(
-      originalType: OriginalType,
+      logicalTypeAnnotation: LogicalTypeAnnotation,
       primitiveTypeName: PrimitiveTypeName,
-      length: Int,
-      decimalMetadata: DecimalMetadata)
-
-  private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, 0, null)
-  private val ParquetByteType = ParquetSchemaType(INT_8, INT32, 0, null)
-  private val ParquetShortType = ParquetSchemaType(INT_16, INT32, 0, null)
-  private val ParquetIntegerType = ParquetSchemaType(null, INT32, 0, null)
-  private val ParquetLongType = ParquetSchemaType(null, INT64, 0, null)
-  private val ParquetFloatType = ParquetSchemaType(null, FLOAT, 0, null)
-  private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, 0, null)
-  private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, 0, null)
-  private val ParquetBinaryType = ParquetSchemaType(null, BINARY, 0, null)
-  private val ParquetDateType = ParquetSchemaType(DATE, INT32, 0, null)
-  private val ParquetTimestampMicrosType = ParquetSchemaType(TIMESTAMP_MICROS, 
INT64, 0, null)
-  private val ParquetTimestampMillisType = ParquetSchemaType(TIMESTAMP_MILLIS, 
INT64, 0, null)
+      length: Int)
+
+  private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, 0)
+  private val ParquetByteType =
+    ParquetSchemaType(LogicalTypeAnnotation.intType(8, true), INT32, 0)
+  private val ParquetShortType =
+    ParquetSchemaType(LogicalTypeAnnotation.intType(16, true), INT32, 0)
+  private val ParquetIntegerType = ParquetSchemaType(null, INT32, 0)
+  private val ParquetLongType = ParquetSchemaType(null, INT64, 0)
+  private val ParquetFloatType = ParquetSchemaType(null, FLOAT, 0)
+  private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, 0)
+  private val ParquetStringType =
+    ParquetSchemaType(LogicalTypeAnnotation.stringType(), BINARY, 0)
+  private val ParquetBinaryType = ParquetSchemaType(null, BINARY, 0)
+  private val ParquetDateType =
+    ParquetSchemaType(LogicalTypeAnnotation.dateType(), INT32, 0)
+  private val ParquetTimestampMicrosType =
+    ParquetSchemaType(LogicalTypeAnnotation.timestampType(true, 
TimeUnit.MICROS), INT64, 0)
+  private val ParquetTimestampMillisType =
+    ParquetSchemaType(LogicalTypeAnnotation.timestampType(true, 
TimeUnit.MILLIS), INT64, 0)
 
   private def dateToDays(date: Any): Int = date match {
     case d: Date => DateTimeUtils.fromJavaDate(d)
@@ -195,15 +200,16 @@ class ParquetFilters(
         longColumn(n),
         Option(v).map(timestampToMillis).orNull)
 
-    case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
+    case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if 
pushDownDecimal =>
       (n: Array[String], v: Any) => FilterApi.eq(
         intColumn(n),
         Option(v).map(d => decimalToInt32(d.asInstanceOf[JBigDecimal])).orNull)
-    case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal =>
+    case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT64, _) if 
pushDownDecimal =>
       (n: Array[String], v: Any) => FilterApi.eq(
         longColumn(n),
         Option(v).map(d => decimalToInt64(d.asInstanceOf[JBigDecimal])).orNull)
-    case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if 
pushDownDecimal =>
+    case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, 
FIXED_LEN_BYTE_ARRAY, length)
+      if pushDownDecimal =>
       (n: Array[String], v: Any) => FilterApi.eq(
         binaryColumn(n),
         Option(v).map(d => decimalToByteArray(d.asInstanceOf[JBigDecimal], 
length)).orNull)
@@ -245,15 +251,16 @@ class ParquetFilters(
         longColumn(n),
         Option(v).map(timestampToMillis).orNull)
 
-    case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
+    case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if 
pushDownDecimal =>
       (n: Array[String], v: Any) => FilterApi.notEq(
         intColumn(n),
         Option(v).map(d => decimalToInt32(d.asInstanceOf[JBigDecimal])).orNull)
-    case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal =>
+    case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT64, _) if 
pushDownDecimal =>
       (n: Array[String], v: Any) => FilterApi.notEq(
         longColumn(n),
         Option(v).map(d => decimalToInt64(d.asInstanceOf[JBigDecimal])).orNull)
-    case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if 
pushDownDecimal =>
+    case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, 
FIXED_LEN_BYTE_ARRAY, length)
+      if pushDownDecimal =>
       (n: Array[String], v: Any) => FilterApi.notEq(
         binaryColumn(n),
         Option(v).map(d => decimalToByteArray(d.asInstanceOf[JBigDecimal], 
length)).orNull)
@@ -285,13 +292,14 @@ class ParquetFilters(
     case ParquetTimestampMillisType if pushDownTimestamp =>
       (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), 
timestampToMillis(v))
 
-    case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
+    case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if 
pushDownDecimal =>
       (n: Array[String], v: Any) =>
         FilterApi.lt(intColumn(n), decimalToInt32(v.asInstanceOf[JBigDecimal]))
-    case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal =>
+    case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT64, _) if 
pushDownDecimal =>
       (n: Array[String], v: Any) =>
         FilterApi.lt(longColumn(n), 
decimalToInt64(v.asInstanceOf[JBigDecimal]))
-    case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if 
pushDownDecimal =>
+    case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, 
FIXED_LEN_BYTE_ARRAY, length)
+      if pushDownDecimal =>
       (n: Array[String], v: Any) =>
         FilterApi.lt(binaryColumn(n), 
decimalToByteArray(v.asInstanceOf[JBigDecimal], length))
   }
@@ -322,13 +330,14 @@ class ParquetFilters(
     case ParquetTimestampMillisType if pushDownTimestamp =>
       (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), 
timestampToMillis(v))
 
-    case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
+    case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if 
pushDownDecimal =>
       (n: Array[String], v: Any) =>
         FilterApi.ltEq(intColumn(n), 
decimalToInt32(v.asInstanceOf[JBigDecimal]))
-    case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal =>
+    case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT64, _) if 
pushDownDecimal =>
       (n: Array[String], v: Any) =>
         FilterApi.ltEq(longColumn(n), 
decimalToInt64(v.asInstanceOf[JBigDecimal]))
-    case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if 
pushDownDecimal =>
+    case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, 
FIXED_LEN_BYTE_ARRAY, length)
+      if pushDownDecimal =>
       (n: Array[String], v: Any) =>
         FilterApi.ltEq(binaryColumn(n), 
decimalToByteArray(v.asInstanceOf[JBigDecimal], length))
   }
@@ -359,13 +368,14 @@ class ParquetFilters(
     case ParquetTimestampMillisType if pushDownTimestamp =>
       (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), 
timestampToMillis(v))
 
-    case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
+    case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if 
pushDownDecimal =>
       (n: Array[String], v: Any) =>
         FilterApi.gt(intColumn(n), decimalToInt32(v.asInstanceOf[JBigDecimal]))
-    case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal =>
+    case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT64, _) if 
pushDownDecimal =>
       (n: Array[String], v: Any) =>
         FilterApi.gt(longColumn(n), 
decimalToInt64(v.asInstanceOf[JBigDecimal]))
-    case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if 
pushDownDecimal =>
+    case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, 
FIXED_LEN_BYTE_ARRAY, length)
+      if pushDownDecimal =>
       (n: Array[String], v: Any) =>
         FilterApi.gt(binaryColumn(n), 
decimalToByteArray(v.asInstanceOf[JBigDecimal], length))
   }
@@ -396,13 +406,14 @@ class ParquetFilters(
     case ParquetTimestampMillisType if pushDownTimestamp =>
       (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), 
timestampToMillis(v))
 
-    case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
+    case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if 
pushDownDecimal =>
       (n: Array[String], v: Any) =>
         FilterApi.gtEq(intColumn(n), 
decimalToInt32(v.asInstanceOf[JBigDecimal]))
-    case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal =>
+    case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT64, _) if 
pushDownDecimal =>
       (n: Array[String], v: Any) =>
         FilterApi.gtEq(longColumn(n), 
decimalToInt64(v.asInstanceOf[JBigDecimal]))
-    case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if 
pushDownDecimal =>
+    case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, 
FIXED_LEN_BYTE_ARRAY, length)
+      if pushDownDecimal =>
       (n: Array[String], v: Any) =>
         FilterApi.gtEq(binaryColumn(n), 
decimalToByteArray(v.asInstanceOf[JBigDecimal], length))
   }
@@ -469,21 +480,23 @@ class ParquetFilters(
         value.isInstanceOf[Date] || value.isInstanceOf[LocalDate]
       case ParquetTimestampMicrosType | ParquetTimestampMillisType =>
         value.isInstanceOf[Timestamp] || value.isInstanceOf[Instant]
-      case ParquetSchemaType(DECIMAL, INT32, _, decimalMeta) =>
-        isDecimalMatched(value, decimalMeta)
-      case ParquetSchemaType(DECIMAL, INT64, _, decimalMeta) =>
-        isDecimalMatched(value, decimalMeta)
-      case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, _, decimalMeta) =>
-        isDecimalMatched(value, decimalMeta)
+      case ParquetSchemaType(decimalType: DecimalLogicalTypeAnnotation, INT32, 
_) =>
+        isDecimalMatched(value, decimalType)
+      case ParquetSchemaType(decimalType: DecimalLogicalTypeAnnotation, INT64, 
_) =>
+        isDecimalMatched(value, decimalType)
+      case
+        ParquetSchemaType(decimalType: DecimalLogicalTypeAnnotation, 
FIXED_LEN_BYTE_ARRAY, _) =>
+        isDecimalMatched(value, decimalType)
       case _ => false
     })
   }
 
   // Decimal type must make sure that filter value's scale matched the file.
   // If doesn't matched, which would cause data corruption.
-  private def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): 
Boolean = value match {
+  private def isDecimalMatched(value: Any,
+      decimalLogicalType: DecimalLogicalTypeAnnotation): Boolean = value match 
{
     case decimal: JBigDecimal =>
-      decimal.scale == decimalMeta.getScale
+      decimal.scale == decimalLogicalType.getScale
     case _ => false
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index ce06620..597a1bb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.api.{InitContext, 
ReadSupport}
 import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
 import org.apache.parquet.io.api.RecordMaterializer
 import org.apache.parquet.schema._
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.ListLogicalTypeAnnotation
 import org.apache.parquet.schema.Type.Repetition
 
 import org.apache.spark.internal.Logging
@@ -214,13 +215,14 @@ object ParquetReadSupport {
 
     // Unannotated repeated group should be interpreted as required list of 
required element, so
     // list element type is just the group itself.  Clip it.
-    if (parquetList.getOriginalType == null && 
parquetList.isRepetition(Repetition.REPEATED)) {
+    if (parquetList.getLogicalTypeAnnotation == null &&
+      parquetList.isRepetition(Repetition.REPEATED)) {
       clipParquetType(parquetList, elementType, caseSensitive)
     } else {
       assert(
-        parquetList.getOriginalType == OriginalType.LIST,
+        
parquetList.getLogicalTypeAnnotation.isInstanceOf[ListLogicalTypeAnnotation],
         "Invalid Parquet schema. " +
-          "Original type of annotated Parquet lists must be LIST: " +
+          "Logical type annotation of annotated Parquet lists must be 
ListLogicalTypeAnnotation: " +
           parquetList.toString)
 
       assert(
@@ -246,7 +248,7 @@ object ParquetReadSupport {
       ) {
         Types
           .buildGroup(parquetList.getRepetition)
-          .as(OriginalType.LIST)
+          .as(LogicalTypeAnnotation.listType())
           .addField(clipParquetType(repeatedGroup, elementType, caseSensitive))
           .named(parquetList.getName)
       } else {
@@ -254,7 +256,7 @@ object ParquetReadSupport {
         // repetition.
         Types
           .buildGroup(parquetList.getRepetition)
-          .as(OriginalType.LIST)
+          .as(LogicalTypeAnnotation.listType())
           .addField(
             Types
               .repeatedGroup()
@@ -285,14 +287,14 @@ object ParquetReadSupport {
     val clippedRepeatedGroup =
       Types
         .repeatedGroup()
-        .as(repeatedGroup.getOriginalType)
+        .as(repeatedGroup.getLogicalTypeAnnotation)
         .addField(clipParquetType(parquetKeyType, keyType, caseSensitive))
         .addField(clipParquetType(parquetValueType, valueType, caseSensitive))
         .named(repeatedGroup.getName)
 
     Types
       .buildGroup(parquetMap.getRepetition)
-      .as(parquetMap.getOriginalType)
+      .as(parquetMap.getLogicalTypeAnnotation)
       .addField(clippedRepeatedGroup)
       .named(parquetMap.getName)
   }
@@ -310,7 +312,7 @@ object ParquetReadSupport {
     val clippedParquetFields = clipParquetGroupFields(parquetRecord, 
structType, caseSensitive)
     Types
       .buildGroup(parquetRecord.getRepetition)
-      .as(parquetRecord.getOriginalType)
+      .as(parquetRecord.getLogicalTypeAnnotation)
       .addFields(clippedParquetFields: _*)
       .named(parquetRecord.getName)
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 0a1cca7..0556257 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -26,8 +26,8 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.parquet.column.Dictionary
 import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, 
PrimitiveConverter}
-import org.apache.parquet.schema.{GroupType, OriginalType, Type}
-import org.apache.parquet.schema.OriginalType.LIST
+import org.apache.parquet.schema.{GroupType, Type}
+import org.apache.parquet.schema.LogicalTypeAnnotation._
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, 
FIXED_LEN_BYTE_ARRAY, INT32, INT64, INT96}
 
 import org.apache.spark.internal.Logging
@@ -110,12 +110,12 @@ private[parquet] class ParquetPrimitiveConverter(val 
updater: ParentContainerUpd
  * - a root [[ParquetRowConverter]] for 
[[org.apache.parquet.schema.MessageType]] `root`,
  * which contains:
  *   - a [[ParquetPrimitiveConverter]] for required
- *   [[org.apache.parquet.schema.OriginalType.INT_32]] field `f1`, and
+ *   [[org.apache.parquet.schema.LogicalTypeAnnotation.intType(32, true)]] 
field `f1`, and
  *   - a nested [[ParquetRowConverter]] for optional [[GroupType]] `f2`, which 
contains:
  *     - a [[ParquetPrimitiveConverter]] for required
  *     [[org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE]] 
field `f21`, and
- *     - a [[ParquetStringConverter]] for optional 
[[org.apache.parquet.schema.OriginalType.UTF8]]
- *     string field `f22`
+ *     - a [[ParquetStringConverter]] for optional
+ *     [[org.apache.parquet.schema.LogicalTypeAnnotation.stringType()]] string 
field `f22`
  *
  * When used as a root converter, [[NoopUpdater]] should be used since root 
converters don't have
  * any "parent" container.
@@ -251,8 +251,15 @@ private[parquet] class ParquetRowConverter(
       catalystType: DataType,
       updater: ParentContainerUpdater): Converter with 
HasParentContainerUpdater = {
 
+    def isUnsignedIntTypeMatched(bitWidth: Int): Boolean = {
+      parquetType.getLogicalTypeAnnotation match {
+        case i: IntLogicalTypeAnnotation if !i.isSigned => i.getBitWidth == 
bitWidth
+        case _ => false
+      }
+    }
+
     catalystType match {
-      case LongType if parquetType.getOriginalType == OriginalType.UINT_32 =>
+      case LongType if isUnsignedIntTypeMatched(32) =>
         new ParquetPrimitiveConverter(updater) {
           override def addInt(value: Int): Unit =
             updater.setLong(Integer.toUnsignedLong(value))
@@ -273,20 +280,20 @@ private[parquet] class ParquetRowConverter(
         }
 
       // For INT32 backed decimals
-      case t: DecimalType if 
parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 =>
-        val metadata = parquetType.asPrimitiveType().getDecimalMetadata
-        if (metadata == null) {
-          // If the column is a plain INT32, we should pick the precision that 
can host the largest
-          // INT32 value.
-          new ParquetIntDictionaryAwareDecimalConverter(
-            DecimalType.IntDecimal.precision, 0, updater)
-        } else {
-          new ParquetIntDictionaryAwareDecimalConverter(
-            metadata.getPrecision, metadata.getScale, updater)
+      case _: DecimalType if 
parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 =>
+        parquetType.asPrimitiveType().getLogicalTypeAnnotation match {
+          case decimalType: DecimalLogicalTypeAnnotation =>
+            new ParquetIntDictionaryAwareDecimalConverter(
+              decimalType.getPrecision, decimalType.getScale, updater)
+          case _ =>
+            // If the column is a plain INT32, we should pick the precision 
that can host the
+            // largest INT32 value.
+            new ParquetIntDictionaryAwareDecimalConverter(
+              DecimalType.IntDecimal.precision, 0, updater)
         }
 
       // For unsigned int64
-      case _: DecimalType if parquetType.getOriginalType == 
OriginalType.UINT_64 =>
+      case _: DecimalType if isUnsignedIntTypeMatched(64) =>
         new ParquetPrimitiveConverter(updater) {
           override def addLong(value: Long): Unit = {
             updater.set(Decimal(java.lang.Long.toUnsignedString(value)))
@@ -295,29 +302,29 @@ private[parquet] class ParquetRowConverter(
 
       // For INT64 backed decimals
       case t: DecimalType if 
parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 =>
-        val metadata = parquetType.asPrimitiveType().getDecimalMetadata
-        if (metadata == null) {
-          // If the column is a plain INT64, we should pick the precision that 
can host the largest
-          // INT64 value.
-          new ParquetLongDictionaryAwareDecimalConverter(
-            DecimalType.LongDecimal.precision, 0, updater)
-        } else {
-          new ParquetLongDictionaryAwareDecimalConverter(
-            metadata.getPrecision, metadata.getScale, updater)
+        parquetType.asPrimitiveType().getLogicalTypeAnnotation match {
+          case decimalType: DecimalLogicalTypeAnnotation =>
+            new ParquetLongDictionaryAwareDecimalConverter(
+              decimalType.getPrecision, decimalType.getScale, updater)
+          case _ =>
+            // If the column is a plain INT64, we should pick the precision 
that can host the
+            // largest INT64 value.
+            new ParquetLongDictionaryAwareDecimalConverter(
+              DecimalType.LongDecimal.precision, 0, updater)
         }
 
       // For BINARY and FIXED_LEN_BYTE_ARRAY backed decimals
       case t: DecimalType
         if parquetType.asPrimitiveType().getPrimitiveTypeName == 
FIXED_LEN_BYTE_ARRAY ||
            parquetType.asPrimitiveType().getPrimitiveTypeName == BINARY =>
-        val metadata = parquetType.asPrimitiveType().getDecimalMetadata
-        if (metadata == null) {
-          throw new RuntimeException(s"Unable to create Parquet converter for 
${t.typeName} " +
-            s"whose Parquet type is $parquetType without decimal metadata. 
Please read this " +
-            "column/field as Spark BINARY type." )
-        } else {
-          new ParquetBinaryDictionaryAwareDecimalConverter(
-            metadata.getPrecision, metadata.getScale, updater)
+        parquetType.asPrimitiveType().getLogicalTypeAnnotation match {
+          case decimalType: DecimalLogicalTypeAnnotation =>
+            new ParquetBinaryDictionaryAwareDecimalConverter(
+              decimalType.getPrecision, decimalType.getScale, updater)
+          case _ =>
+            throw new RuntimeException(s"Unable to create Parquet converter 
for ${t.typeName} " +
+              s"whose Parquet type is $parquetType without decimal metadata. 
Please read this " +
+              "column/field as Spark BINARY type." )
         }
 
       case t: DecimalType =>
@@ -329,14 +336,20 @@ private[parquet] class ParquetRowConverter(
       case StringType =>
         new ParquetStringConverter(updater)
 
-      case TimestampType if parquetType.getOriginalType == 
OriginalType.TIMESTAMP_MICROS =>
+      case TimestampType
+        if 
parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation]
 &&
+           parquetType.getLogicalTypeAnnotation
+             .asInstanceOf[TimestampLogicalTypeAnnotation].getUnit == 
TimeUnit.MICROS =>
         new ParquetPrimitiveConverter(updater) {
           override def addLong(value: Long): Unit = {
             updater.setLong(timestampRebaseFunc(value))
           }
         }
 
-      case TimestampType if parquetType.getOriginalType == 
OriginalType.TIMESTAMP_MILLIS =>
+      case TimestampType
+        if 
parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation]
 &&
+          parquetType.getLogicalTypeAnnotation
+            .asInstanceOf[TimestampLogicalTypeAnnotation].getUnit == 
TimeUnit.MILLIS =>
         new ParquetPrimitiveConverter(updater) {
           override def addLong(value: Long): Unit = {
             val micros = DateTimeUtils.millisToMicros(value)
@@ -367,7 +380,8 @@ private[parquet] class ParquetRowConverter(
       // A repeated field that is neither contained by a `LIST`- or 
`MAP`-annotated group nor
       // annotated by `LIST` or `MAP` should be interpreted as a required list 
of required
       // elements where the element type is the type of the field.
-      case t: ArrayType if parquetType.getOriginalType != LIST =>
+      case t: ArrayType
+        if 
!parquetType.getLogicalTypeAnnotation.isInstanceOf[ListLogicalTypeAnnotation] =>
         if (parquetType.isPrimitive) {
           new RepeatedPrimitiveConverter(parquetType, t.elementType, updater)
         } else {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index e751c97..1b26c69 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.parquet.schema._
-import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.LogicalTypeAnnotation._
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
 import org.apache.parquet.schema.Type.Repetition._
 
@@ -93,10 +93,10 @@ class ParquetToSparkSchemaConverter(
 
   private def convertPrimitiveField(field: PrimitiveType): DataType = {
     val typeName = field.getPrimitiveTypeName
-    val originalType = field.getOriginalType
+    val typeAnnotation = field.getLogicalTypeAnnotation
 
     def typeString =
-      if (originalType == null) s"$typeName" else s"$typeName ($originalType)"
+      if (typeAnnotation == null) s"$typeName" else s"$typeName 
($typeAnnotation)"
 
     def typeNotImplemented() =
       throw QueryCompilationErrors.parquetTypeUnsupportedYetError(typeString)
@@ -108,8 +108,10 @@ class ParquetToSparkSchemaConverter(
     // specified in field.getDecimalMetadata.  This is useful when 
interpreting decimal types stored
     // as binaries with variable lengths.
     def makeDecimalType(maxPrecision: Int = -1): DecimalType = {
-      val precision = field.getDecimalMetadata.getPrecision
-      val scale = field.getDecimalMetadata.getScale
+      val decimalLogicalTypeAnnotation = field.getLogicalTypeAnnotation
+        .asInstanceOf[DecimalLogicalTypeAnnotation]
+      val precision = decimalLogicalTypeAnnotation.getPrecision
+      val scale = decimalLogicalTypeAnnotation.getScale
 
       ParquetSchemaConverter.checkConversionRequirement(
         maxPrecision == -1 || 1 <= precision && precision <= maxPrecision,
@@ -126,26 +128,49 @@ class ParquetToSparkSchemaConverter(
       case DOUBLE => DoubleType
 
       case INT32 =>
-        originalType match {
-          case INT_8 => ByteType
-          case INT_16 | UINT_8 => ShortType
-          case INT_32 | UINT_16 | null => IntegerType
-          case DATE => DateType
-          case DECIMAL => makeDecimalType(Decimal.MAX_INT_DIGITS)
-          case UINT_32 => LongType
-          case TIME_MILLIS => typeNotImplemented()
+        typeAnnotation match {
+          case intTypeAnnotation: IntLogicalTypeAnnotation if 
intTypeAnnotation.isSigned =>
+            intTypeAnnotation.getBitWidth match {
+              case 8 => ByteType
+              case 16 => ShortType
+              case 32 => IntegerType
+              case _ => illegalType()
+            }
+          case null => IntegerType
+          case _: DateLogicalTypeAnnotation => DateType
+          case _: DecimalLogicalTypeAnnotation => 
makeDecimalType(Decimal.MAX_INT_DIGITS)
+          case intTypeAnnotation: IntLogicalTypeAnnotation if 
!intTypeAnnotation.isSigned =>
+            intTypeAnnotation.getBitWidth match {
+              case 8 => ShortType
+              case 16 => IntegerType
+              case 32 => LongType
+              case _ => illegalType()
+            }
+          case t: TimestampLogicalTypeAnnotation if t.getUnit == 
TimeUnit.MILLIS =>
+            typeNotImplemented()
           case _ => illegalType()
         }
 
       case INT64 =>
-        originalType match {
-          case INT_64 | null => LongType
-          case DECIMAL => makeDecimalType(Decimal.MAX_LONG_DIGITS)
-          // The precision to hold the largest unsigned long is:
-          // `java.lang.Long.toUnsignedString(-1).length` = 20
-          case UINT_64 => DecimalType(20, 0)
-          case TIMESTAMP_MICROS => TimestampType
-          case TIMESTAMP_MILLIS => TimestampType
+        typeAnnotation match {
+          case intTypeAnnotation: IntLogicalTypeAnnotation if 
intTypeAnnotation.isSigned =>
+            intTypeAnnotation.getBitWidth match {
+              case 64 => LongType
+              case _ => illegalType()
+            }
+          case null => LongType
+          case _: DecimalLogicalTypeAnnotation => 
makeDecimalType(Decimal.MAX_LONG_DIGITS)
+          case intTypeAnnotation: IntLogicalTypeAnnotation if 
!intTypeAnnotation.isSigned =>
+            intTypeAnnotation.getBitWidth match {
+              // The precision to hold the largest unsigned long is:
+              // `java.lang.Long.toUnsignedString(-1).length` = 20
+              case 64 => DecimalType(20, 0)
+              case _ => illegalType()
+            }
+          case timestamp: TimestampLogicalTypeAnnotation if timestamp.getUnit 
== TimeUnit.MICROS =>
+            TimestampType
+          case timestamp: TimestampLogicalTypeAnnotation if timestamp.getUnit 
== TimeUnit.MILLIS =>
+            TimestampType
           case _ => illegalType()
         }
 
@@ -157,19 +182,21 @@ class ParquetToSparkSchemaConverter(
         TimestampType
 
       case BINARY =>
-        originalType match {
-          case UTF8 | ENUM | JSON => StringType
+        typeAnnotation match {
+          case _: StringLogicalTypeAnnotation | _: EnumLogicalTypeAnnotation |
+               _: JsonLogicalTypeAnnotation => StringType
           case null if assumeBinaryIsString => StringType
           case null => BinaryType
-          case BSON => BinaryType
-          case DECIMAL => makeDecimalType()
+          case _: BsonLogicalTypeAnnotation => BinaryType
+          case _: DecimalLogicalTypeAnnotation => makeDecimalType()
           case _ => illegalType()
         }
 
       case FIXED_LEN_BYTE_ARRAY =>
-        originalType match {
-          case DECIMAL => 
makeDecimalType(Decimal.maxPrecisionForBytes(field.getTypeLength))
-          case INTERVAL => typeNotImplemented()
+        typeAnnotation match {
+          case _: DecimalLogicalTypeAnnotation =>
+            makeDecimalType(Decimal.maxPrecisionForBytes(field.getTypeLength))
+          case _: IntervalLogicalTypeAnnotation => typeNotImplemented()
           case _ => illegalType()
         }
 
@@ -178,7 +205,7 @@ class ParquetToSparkSchemaConverter(
   }
 
   private def convertGroupField(field: GroupType): DataType = {
-    Option(field.getOriginalType).fold(convert(field): DataType) {
+    Option(field.getLogicalTypeAnnotation).fold(convert(field): DataType) {
       // A Parquet list is represented as a 3-level structure:
       //
       //   <list-repetition> group <name> (LIST) {
@@ -192,7 +219,7 @@ class ParquetToSparkSchemaConverter(
       // we need to check whether the 2nd level or the 3rd level refers to 
list element type.
       //
       // See: 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
-      case LIST =>
+      case _: ListLogicalTypeAnnotation =>
         ParquetSchemaConverter.checkConversionRequirement(
           field.getFieldCount == 1, s"Invalid list type $field")
 
@@ -212,7 +239,7 @@ class ParquetToSparkSchemaConverter(
       // `MAP_KEY_VALUE` is for backwards-compatibility
       // See: 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1
       // scalastyle:on
-      case MAP | MAP_KEY_VALUE =>
+      case _: MapLogicalTypeAnnotation | _: MapKeyValueTypeAnnotation =>
         ParquetSchemaConverter.checkConversionRequirement(
           field.getFieldCount == 1 && !field.getType(0).isPrimitive,
           s"Invalid map type: $field")
@@ -340,10 +367,12 @@ class SparkToParquetSchemaConverter(
         Types.primitive(BOOLEAN, repetition).named(field.name)
 
       case ByteType =>
-        Types.primitive(INT32, repetition).as(INT_8).named(field.name)
+        Types.primitive(INT32, repetition)
+          .as(LogicalTypeAnnotation.intType(8, true)).named(field.name)
 
       case ShortType =>
-        Types.primitive(INT32, repetition).as(INT_16).named(field.name)
+        Types.primitive(INT32, repetition)
+          .as(LogicalTypeAnnotation.intType(16, true)).named(field.name)
 
       case IntegerType =>
         Types.primitive(INT32, repetition).named(field.name)
@@ -358,10 +387,12 @@ class SparkToParquetSchemaConverter(
         Types.primitive(DOUBLE, repetition).named(field.name)
 
       case StringType =>
-        Types.primitive(BINARY, repetition).as(UTF8).named(field.name)
+        Types.primitive(BINARY, repetition)
+          .as(LogicalTypeAnnotation.stringType()).named(field.name)
 
       case DateType =>
-        Types.primitive(INT32, repetition).as(DATE).named(field.name)
+        Types.primitive(INT32, repetition)
+          .as(LogicalTypeAnnotation.dateType()).named(field.name)
 
       // NOTE: Spark SQL can write timestamp values to Parquet using INT96, 
TIMESTAMP_MICROS or
       // TIMESTAMP_MILLIS. TIMESTAMP_MICROS is recommended but INT96 is the 
default to keep the
@@ -382,9 +413,11 @@ class SparkToParquetSchemaConverter(
           case SQLConf.ParquetOutputTimestampType.INT96 =>
             Types.primitive(INT96, repetition).named(field.name)
           case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS =>
-            Types.primitive(INT64, 
repetition).as(TIMESTAMP_MICROS).named(field.name)
+            Types.primitive(INT64, repetition)
+              .as(LogicalTypeAnnotation.timestampType(true, 
TimeUnit.MICROS)).named(field.name)
           case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS =>
-            Types.primitive(INT64, 
repetition).as(TIMESTAMP_MILLIS).named(field.name)
+            Types.primitive(INT64, repetition)
+              .as(LogicalTypeAnnotation.timestampType(true, 
TimeUnit.MILLIS)).named(field.name)
         }
 
       case BinaryType =>
@@ -401,9 +434,7 @@ class SparkToParquetSchemaConverter(
       case DecimalType.Fixed(precision, scale) if writeLegacyParquetFormat =>
         Types
           .primitive(FIXED_LEN_BYTE_ARRAY, repetition)
-          .as(DECIMAL)
-          .precision(precision)
-          .scale(scale)
+          .as(LogicalTypeAnnotation.decimalType(scale, precision))
           .length(Decimal.minBytesForPrecision(precision))
           .named(field.name)
 
@@ -416,9 +447,7 @@ class SparkToParquetSchemaConverter(
           if precision <= Decimal.MAX_INT_DIGITS && !writeLegacyParquetFormat 
=>
         Types
           .primitive(INT32, repetition)
-          .as(DECIMAL)
-          .precision(precision)
-          .scale(scale)
+          .as(LogicalTypeAnnotation.decimalType(scale, precision))
           .named(field.name)
 
       // Uses INT64 for 1 <= precision <= 18
@@ -426,18 +455,14 @@ class SparkToParquetSchemaConverter(
           if precision <= Decimal.MAX_LONG_DIGITS && !writeLegacyParquetFormat 
=>
         Types
           .primitive(INT64, repetition)
-          .as(DECIMAL)
-          .precision(precision)
-          .scale(scale)
+          .as(LogicalTypeAnnotation.decimalType(scale, precision))
           .named(field.name)
 
       // Uses FIXED_LEN_BYTE_ARRAY for all other precisions
       case DecimalType.Fixed(precision, scale) if !writeLegacyParquetFormat =>
         Types
           .primitive(FIXED_LEN_BYTE_ARRAY, repetition)
-          .as(DECIMAL)
-          .precision(precision)
-          .scale(scale)
+          .as(LogicalTypeAnnotation.decimalType(scale, precision))
           .length(Decimal.minBytesForPrecision(precision))
           .named(field.name)
 
@@ -462,7 +487,7 @@ class SparkToParquetSchemaConverter(
         // `array` as its element name as below. Therefore, we build manually
         // the correct group type here via the builder. (See SPARK-16777)
         Types
-          .buildGroup(repetition).as(LIST)
+          .buildGroup(repetition).as(LogicalTypeAnnotation.listType())
           .addField(Types
             .buildGroup(REPEATED)
             // "array" is the name chosen by parquet-hive (1.7.0 and prior 
version)
@@ -480,7 +505,7 @@ class SparkToParquetSchemaConverter(
 
         // Here too, we should not use `listOfElements`. (See SPARK-16777)
         Types
-          .buildGroup(repetition).as(LIST)
+          .buildGroup(repetition).as(LogicalTypeAnnotation.listType())
           // "array" is the name chosen by parquet-avro (1.7.0 and prior 
version)
           .addField(convertField(StructField("array", elementType, nullable), 
REPEATED))
           .named(field.name)
@@ -511,7 +536,7 @@ class SparkToParquetSchemaConverter(
         //   }
         // }
         Types
-          .buildGroup(repetition).as(LIST)
+          .buildGroup(repetition).as(LogicalTypeAnnotation.listType())
           .addField(
             Types.repeatedGroup()
               .addField(convertField(StructField("element", elementType, 
containsNull)))
@@ -526,7 +551,7 @@ class SparkToParquetSchemaConverter(
         //   }
         // }
         Types
-          .buildGroup(repetition).as(MAP)
+          .buildGroup(repetition).as(LogicalTypeAnnotation.mapType())
           .addField(
             Types
               .repeatedGroup()

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to