http://git-wip-us.apache.org/repos/asf/spark/blob/52cb1ad3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
deleted file mode 100644
index 67bfd39..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
+++ /dev/null
@@ -1,436 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.parquet
-
-import java.nio.{ByteBuffer, ByteOrder}
-import java.util
-
-import scala.collection.JavaConverters.mapAsJavaMapConverter
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.column.ParquetProperties
-import org.apache.parquet.hadoop.ParquetOutputFormat
-import org.apache.parquet.hadoop.api.WriteSupport
-import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
-import org.apache.parquet.io.api.{Binary, RecordConsumer}
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import 
org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.minBytesForPrecision
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types._
-
-/**
- * A Parquet [[WriteSupport]] implementation that writes Catalyst 
[[InternalRow]]s as Parquet
- * messages.  This class can write Parquet data in two modes:
- *
- *  - Standard mode: Parquet data are written in standard format defined in 
parquet-format spec.
- *  - Legacy mode: Parquet data are written in legacy format compatible with 
Spark 1.4 and prior.
- *
- * This behavior can be controlled by SQL option 
`spark.sql.parquet.writeLegacyFormat`.  The value
- * of this option is propagated to this class by the `init()` method and its 
Hadoop configuration
- * argument.
- */
-private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] 
with Logging {
-  // A `ValueWriter` is responsible for writing a field of an `InternalRow` to 
the record consumer.
-  // Here we are using `SpecializedGetters` rather than `InternalRow` so that 
we can directly access
-  // data in `ArrayData` without the help of `SpecificMutableRow`.
-  private type ValueWriter = (SpecializedGetters, Int) => Unit
-
-  // Schema of the `InternalRow`s to be written
-  private var schema: StructType = _
-
-  // `ValueWriter`s for all fields of the schema
-  private var rootFieldWriters: Seq[ValueWriter] = _
-
-  // The Parquet `RecordConsumer` to which all `InternalRow`s are written
-  private var recordConsumer: RecordConsumer = _
-
-  // Whether to write data in legacy Parquet format compatible with Spark 1.4 
and prior versions
-  private var writeLegacyParquetFormat: Boolean = _
-
-  // Reusable byte array used to write timestamps as Parquet INT96 values
-  private val timestampBuffer = new Array[Byte](12)
-
-  // Reusable byte array used to write decimal values
-  private val decimalBuffer = new 
Array[Byte](minBytesForPrecision(DecimalType.MAX_PRECISION))
-
-  override def init(configuration: Configuration): WriteContext = {
-    val schemaString = configuration.get(CatalystWriteSupport.SPARK_ROW_SCHEMA)
-    this.schema = StructType.fromString(schemaString)
-    this.writeLegacyParquetFormat = {
-      // `SQLConf.PARQUET_WRITE_LEGACY_FORMAT` should always be explicitly set 
in ParquetRelation
-      assert(configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key) != 
null)
-      configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean
-    }
-    this.rootFieldWriters = schema.map(_.dataType).map(makeWriter)
-
-    val messageType = new 
CatalystSchemaConverter(configuration).convert(schema)
-    val metadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> 
schemaString).asJava
-
-    logInfo(
-      s"""Initialized Parquet WriteSupport with Catalyst schema:
-         |${schema.prettyJson}
-         |and corresponding Parquet message type:
-         |$messageType
-       """.stripMargin)
-
-    new WriteContext(messageType, metadata)
-  }
-
-  override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
-    this.recordConsumer = recordConsumer
-  }
-
-  override def write(row: InternalRow): Unit = {
-    consumeMessage {
-      writeFields(row, schema, rootFieldWriters)
-    }
-  }
-
-  private def writeFields(
-      row: InternalRow, schema: StructType, fieldWriters: Seq[ValueWriter]): 
Unit = {
-    var i = 0
-    while (i < row.numFields) {
-      if (!row.isNullAt(i)) {
-        consumeField(schema(i).name, i) {
-          fieldWriters(i).apply(row, i)
-        }
-      }
-      i += 1
-    }
-  }
-
-  private def makeWriter(dataType: DataType): ValueWriter = {
-    dataType match {
-      case BooleanType =>
-        (row: SpecializedGetters, ordinal: Int) =>
-          recordConsumer.addBoolean(row.getBoolean(ordinal))
-
-      case ByteType =>
-        (row: SpecializedGetters, ordinal: Int) =>
-          recordConsumer.addInteger(row.getByte(ordinal))
-
-      case ShortType =>
-        (row: SpecializedGetters, ordinal: Int) =>
-          recordConsumer.addInteger(row.getShort(ordinal))
-
-      case IntegerType | DateType =>
-        (row: SpecializedGetters, ordinal: Int) =>
-          recordConsumer.addInteger(row.getInt(ordinal))
-
-      case LongType =>
-        (row: SpecializedGetters, ordinal: Int) =>
-          recordConsumer.addLong(row.getLong(ordinal))
-
-      case FloatType =>
-        (row: SpecializedGetters, ordinal: Int) =>
-          recordConsumer.addFloat(row.getFloat(ordinal))
-
-      case DoubleType =>
-        (row: SpecializedGetters, ordinal: Int) =>
-          recordConsumer.addDouble(row.getDouble(ordinal))
-
-      case StringType =>
-        (row: SpecializedGetters, ordinal: Int) =>
-          
recordConsumer.addBinary(Binary.fromByteArray(row.getUTF8String(ordinal).getBytes))
-
-      case TimestampType =>
-        (row: SpecializedGetters, ordinal: Int) => {
-          // TODO Writes `TimestampType` values as `TIMESTAMP_MICROS` once 
parquet-mr implements it
-          // Currently we only support timestamps stored as INT96, which is 
compatible with Hive
-          // and Impala.  However, INT96 is to be deprecated.  We plan to 
support `TIMESTAMP_MICROS`
-          // defined in the parquet-format spec.  But up until writing, the 
most recent parquet-mr
-          // version (1.8.1) hasn't implemented it yet.
-
-          // NOTE: Starting from Spark 1.5, Spark SQL `TimestampType` only has 
microsecond
-          // precision.  Nanosecond parts of timestamp values read from INT96 
are simply stripped.
-          val (julianDay, timeOfDayNanos) = 
DateTimeUtils.toJulianDay(row.getLong(ordinal))
-          val buf = ByteBuffer.wrap(timestampBuffer)
-          
buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
-          recordConsumer.addBinary(Binary.fromByteArray(timestampBuffer))
-        }
-
-      case BinaryType =>
-        (row: SpecializedGetters, ordinal: Int) =>
-          
recordConsumer.addBinary(Binary.fromByteArray(row.getBinary(ordinal)))
-
-      case DecimalType.Fixed(precision, scale) =>
-        makeDecimalWriter(precision, scale)
-
-      case t: StructType =>
-        val fieldWriters = t.map(_.dataType).map(makeWriter)
-        (row: SpecializedGetters, ordinal: Int) =>
-          consumeGroup {
-            writeFields(row.getStruct(ordinal, t.length), t, fieldWriters)
-          }
-
-      case t: ArrayType => makeArrayWriter(t)
-
-      case t: MapType => makeMapWriter(t)
-
-      case t: UserDefinedType[_] => makeWriter(t.sqlType)
-
-      // TODO Adds IntervalType support
-      case _ => sys.error(s"Unsupported data type $dataType.")
-    }
-  }
-
-  private def makeDecimalWriter(precision: Int, scale: Int): ValueWriter = {
-    assert(
-      precision <= DecimalType.MAX_PRECISION,
-      s"Decimal precision $precision exceeds max precision 
${DecimalType.MAX_PRECISION}")
-
-    val numBytes = minBytesForPrecision(precision)
-
-    val int32Writer =
-      (row: SpecializedGetters, ordinal: Int) => {
-        val unscaledLong = row.getDecimal(ordinal, precision, 
scale).toUnscaledLong
-        recordConsumer.addInteger(unscaledLong.toInt)
-      }
-
-    val int64Writer =
-      (row: SpecializedGetters, ordinal: Int) => {
-        val unscaledLong = row.getDecimal(ordinal, precision, 
scale).toUnscaledLong
-        recordConsumer.addLong(unscaledLong)
-      }
-
-    val binaryWriterUsingUnscaledLong =
-      (row: SpecializedGetters, ordinal: Int) => {
-        // When the precision is low enough (<= 18) to squeeze the decimal 
value into a `Long`, we
-        // can build a fixed-length byte array with length `numBytes` using 
the unscaled `Long`
-        // value and the `decimalBuffer` for better performance.
-        val unscaled = row.getDecimal(ordinal, precision, scale).toUnscaledLong
-        var i = 0
-        var shift = 8 * (numBytes - 1)
-
-        while (i < numBytes) {
-          decimalBuffer(i) = (unscaled >> shift).toByte
-          i += 1
-          shift -= 8
-        }
-
-        recordConsumer.addBinary(Binary.fromByteArray(decimalBuffer, 0, 
numBytes))
-      }
-
-    val binaryWriterUsingUnscaledBytes =
-      (row: SpecializedGetters, ordinal: Int) => {
-        val decimal = row.getDecimal(ordinal, precision, scale)
-        val bytes = decimal.toJavaBigDecimal.unscaledValue().toByteArray
-        val fixedLengthBytes = if (bytes.length == numBytes) {
-          // If the length of the underlying byte array of the unscaled 
`BigInteger` happens to be
-          // `numBytes`, just reuse it, so that we don't bother copying it to 
`decimalBuffer`.
-          bytes
-        } else {
-          // Otherwise, the length must be less than `numBytes`.  In this case 
we copy contents of
-          // the underlying bytes with padding sign bytes to `decimalBuffer` 
to form the result
-          // fixed-length byte array.
-          val signByte = if (bytes.head < 0) -1: Byte else 0: Byte
-          util.Arrays.fill(decimalBuffer, 0, numBytes - bytes.length, signByte)
-          System.arraycopy(bytes, 0, decimalBuffer, numBytes - bytes.length, 
bytes.length)
-          decimalBuffer
-        }
-
-        recordConsumer.addBinary(Binary.fromByteArray(fixedLengthBytes, 0, 
numBytes))
-      }
-
-    writeLegacyParquetFormat match {
-      // Standard mode, 1 <= precision <= 9, writes as INT32
-      case false if precision <= Decimal.MAX_INT_DIGITS => int32Writer
-
-      // Standard mode, 10 <= precision <= 18, writes as INT64
-      case false if precision <= Decimal.MAX_LONG_DIGITS => int64Writer
-
-      // Legacy mode, 1 <= precision <= 18, writes as FIXED_LEN_BYTE_ARRAY
-      case true if precision <= Decimal.MAX_LONG_DIGITS => 
binaryWriterUsingUnscaledLong
-
-      // Either standard or legacy mode, 19 <= precision <= 38, writes as 
FIXED_LEN_BYTE_ARRAY
-      case _ => binaryWriterUsingUnscaledBytes
-    }
-  }
-
-  def makeArrayWriter(arrayType: ArrayType): ValueWriter = {
-    val elementWriter = makeWriter(arrayType.elementType)
-
-    def threeLevelArrayWriter(repeatedGroupName: String, elementFieldName: 
String): ValueWriter =
-      (row: SpecializedGetters, ordinal: Int) => {
-        val array = row.getArray(ordinal)
-        consumeGroup {
-          // Only creates the repeated field if the array is non-empty.
-          if (array.numElements() > 0) {
-            consumeField(repeatedGroupName, 0) {
-              var i = 0
-              while (i < array.numElements()) {
-                consumeGroup {
-                  // Only creates the element field if the current array 
element is not null.
-                  if (!array.isNullAt(i)) {
-                    consumeField(elementFieldName, 0) {
-                      elementWriter.apply(array, i)
-                    }
-                  }
-                }
-                i += 1
-              }
-            }
-          }
-        }
-      }
-
-    def twoLevelArrayWriter(repeatedFieldName: String): ValueWriter =
-      (row: SpecializedGetters, ordinal: Int) => {
-        val array = row.getArray(ordinal)
-        consumeGroup {
-          // Only creates the repeated field if the array is non-empty.
-          if (array.numElements() > 0) {
-            consumeField(repeatedFieldName, 0) {
-              var i = 0
-              while (i < array.numElements()) {
-                elementWriter.apply(array, i)
-                i += 1
-              }
-            }
-          }
-        }
-      }
-
-    (writeLegacyParquetFormat, arrayType.containsNull) match {
-      case (legacyMode @ false, _) =>
-        // Standard mode:
-        //
-        //   <list-repetition> group <name> (LIST) {
-        //     repeated group list {
-        //                    ^~~~  repeatedGroupName
-        //       <element-repetition> <element-type> element;
-        //                                           ^~~~~~~  elementFieldName
-        //     }
-        //   }
-        threeLevelArrayWriter(repeatedGroupName = "list", elementFieldName = 
"element")
-
-      case (legacyMode @ true, nullableElements @ true) =>
-        // Legacy mode, with nullable elements:
-        //
-        //   <list-repetition> group <name> (LIST) {
-        //     optional group bag {
-        //                    ^~~  repeatedGroupName
-        //       repeated <element-type> array;
-        //                               ^~~~~ elementFieldName
-        //     }
-        //   }
-        threeLevelArrayWriter(repeatedGroupName = "bag", elementFieldName = 
"array")
-
-      case (legacyMode @ true, nullableElements @ false) =>
-        // Legacy mode, with non-nullable elements:
-        //
-        //   <list-repetition> group <name> (LIST) {
-        //     repeated <element-type> array;
-        //                             ^~~~~  repeatedFieldName
-        //   }
-        twoLevelArrayWriter(repeatedFieldName = "array")
-    }
-  }
-
-  private def makeMapWriter(mapType: MapType): ValueWriter = {
-    val keyWriter = makeWriter(mapType.keyType)
-    val valueWriter = makeWriter(mapType.valueType)
-    val repeatedGroupName = if (writeLegacyParquetFormat) {
-      // Legacy mode:
-      //
-      //   <map-repetition> group <name> (MAP) {
-      //     repeated group map (MAP_KEY_VALUE) {
-      //                    ^~~  repeatedGroupName
-      //       required <key-type> key;
-      //       <value-repetition> <value-type> value;
-      //     }
-      //   }
-      "map"
-    } else {
-      // Standard mode:
-      //
-      //   <map-repetition> group <name> (MAP) {
-      //     repeated group key_value {
-      //                    ^~~~~~~~~  repeatedGroupName
-      //       required <key-type> key;
-      //       <value-repetition> <value-type> value;
-      //     }
-      //   }
-      "key_value"
-    }
-
-    (row: SpecializedGetters, ordinal: Int) => {
-      val map = row.getMap(ordinal)
-      val keyArray = map.keyArray()
-      val valueArray = map.valueArray()
-
-      consumeGroup {
-        // Only creates the repeated field if the map is non-empty.
-        if (map.numElements() > 0) {
-          consumeField(repeatedGroupName, 0) {
-            var i = 0
-            while (i < map.numElements()) {
-              consumeGroup {
-                consumeField("key", 0) {
-                  keyWriter.apply(keyArray, i)
-                }
-
-                // Only creates the "value" field if the value if non-empty
-                if (!map.valueArray().isNullAt(i)) {
-                  consumeField("value", 1) {
-                    valueWriter.apply(valueArray, i)
-                  }
-                }
-              }
-              i += 1
-            }
-          }
-        }
-      }
-    }
-  }
-
-  private def consumeMessage(f: => Unit): Unit = {
-    recordConsumer.startMessage()
-    f
-    recordConsumer.endMessage()
-  }
-
-  private def consumeGroup(f: => Unit): Unit = {
-    recordConsumer.startGroup()
-    f
-    recordConsumer.endGroup()
-  }
-
-  private def consumeField(field: String, index: Int)(f: => Unit): Unit = {
-    recordConsumer.startField(field, index)
-    f
-    recordConsumer.endField(field, index)
-  }
-}
-
-private[parquet] object CatalystWriteSupport {
-  val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes"
-
-  def setSchema(schema: StructType, configuration: Configuration): Unit = {
-    schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName)
-    configuration.set(SPARK_ROW_SCHEMA, schema.json)
-    configuration.setIfUnset(
-      ParquetOutputFormat.WRITER_VERSION,
-      ParquetProperties.WriterVersion.PARQUET_1_0.toString)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/52cb1ad3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 6b25e36..f38bf81 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -99,13 +99,13 @@ private[sql] class ParquetFileFormat
     // bundled with `ParquetOutputFormat[Row]`.
     job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
 
-    ParquetOutputFormat.setWriteSupportClass(job, 
classOf[CatalystWriteSupport])
+    ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])
 
     // We want to clear this temporary metadata from saving into Parquet file.
     // This metadata is only useful for detecting optional columns when 
pushdowning filters.
     val dataSchemaToWrite = 
StructType.removeMetadata(StructType.metadataKeyForOptionalField,
       dataSchema).asInstanceOf[StructType]
-    CatalystWriteSupport.setSchema(dataSchemaToWrite, conf)
+    ParquetWriteSupport.setSchema(dataSchemaToWrite, conf)
 
     // Sets flags for `CatalystSchemaConverter` (which converts Catalyst 
schema to Parquet schema)
     // and `CatalystWriteSupport` (writing actual rows to Parquet files).
@@ -288,19 +288,19 @@ private[sql] class ParquetFileFormat
       filters: Seq[Filter],
       options: Map[String, String],
       hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
-    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[CatalystReadSupport].getName)
+    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[ParquetReadSupport].getName)
     hadoopConf.set(
-      CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
-      CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
+      ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+      ParquetSchemaConverter.checkFieldNames(requiredSchema).json)
     hadoopConf.set(
-      CatalystWriteSupport.SPARK_ROW_SCHEMA,
-      CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
+      ParquetWriteSupport.SPARK_ROW_SCHEMA,
+      ParquetSchemaConverter.checkFieldNames(requiredSchema).json)
 
     // We want to clear this temporary metadata from saving into Parquet file.
     // This metadata is only useful for detecting optional columns when 
pushdowning filters.
     val dataSchemaToWrite = 
StructType.removeMetadata(StructType.metadataKeyForOptionalField,
       requiredSchema).asInstanceOf[StructType]
-    CatalystWriteSupport.setSchema(dataSchemaToWrite, hadoopConf)
+    ParquetWriteSupport.setSchema(dataSchemaToWrite, hadoopConf)
 
     // Sets flags for `CatalystSchemaConverter`
     hadoopConf.setBoolean(
@@ -369,10 +369,10 @@ private[sql] class ParquetFileFormat
         val reader = pushed match {
           case Some(filter) =>
             new ParquetRecordReader[InternalRow](
-              new CatalystReadSupport,
+              new ParquetReadSupport,
               FilterCompat.get(filter, null))
           case _ =>
-            new ParquetRecordReader[InternalRow](new CatalystReadSupport)
+            new ParquetRecordReader[InternalRow](new ParquetReadSupport)
         }
         reader.initialize(split, hadoopAttemptContext)
         reader
@@ -433,14 +433,14 @@ private[sql] class ParquetOutputWriterFactory(
     // bundled with `ParquetOutputFormat[Row]`.
     job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
 
-    ParquetOutputFormat.setWriteSupportClass(job, 
classOf[CatalystWriteSupport])
+    ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])
 
     // We want to clear this temporary metadata from saving into Parquet file.
     // This metadata is only useful for detecting optional columns when 
pushdowning filters.
     val dataSchemaToWrite = StructType.removeMetadata(
       StructType.metadataKeyForOptionalField,
       dataSchema).asInstanceOf[StructType]
-    CatalystWriteSupport.setSchema(dataSchemaToWrite, conf)
+    ParquetWriteSupport.setSchema(dataSchemaToWrite, conf)
 
     // Sets flags for `CatalystSchemaConverter` (which converts Catalyst 
schema to Parquet schema)
     // and `CatalystWriteSupport` (writing actual rows to Parquet files).
@@ -590,7 +590,7 @@ private[sql] object ParquetFileFormat extends Logging {
       assumeBinaryIsString: Boolean,
       assumeInt96IsTimestamp: Boolean)(job: Job): Unit = {
     val conf = job.getConfiguration
-    conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[CatalystReadSupport].getName)
+    conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[ParquetReadSupport].getName)
 
     // Try to push down filters when filter push-down is enabled.
     if (parquetFilterPushDown) {
@@ -603,14 +603,14 @@ private[sql] object ParquetFileFormat extends Logging {
         .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
     }
 
-    conf.set(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
+    conf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
       val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
-      CatalystSchemaConverter.checkFieldNames(requestedSchema).json
+      ParquetSchemaConverter.checkFieldNames(requestedSchema).json
     })
 
     conf.set(
-      CatalystWriteSupport.SPARK_ROW_SCHEMA,
-      CatalystSchemaConverter.checkFieldNames(dataSchema).json)
+      ParquetWriteSupport.SPARK_ROW_SCHEMA,
+      ParquetSchemaConverter.checkFieldNames(dataSchema).json)
 
     // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet 
and FS metadata
     conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)
@@ -639,7 +639,7 @@ private[sql] object ParquetFileFormat extends Logging {
       footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = {
 
     def parseParquetSchema(schema: MessageType): StructType = {
-      val converter = new CatalystSchemaConverter(
+      val converter = new ParquetSchemaConverter(
         sparkSession.sessionState.conf.isParquetBinaryAsString,
         sparkSession.sessionState.conf.isParquetBinaryAsString,
         sparkSession.sessionState.conf.writeLegacyParquetFormat)
@@ -653,7 +653,7 @@ private[sql] object ParquetFileFormat extends Logging {
       val serializedSchema = metadata
         .getKeyValueMetaData
         .asScala.toMap
-        .get(CatalystReadSupport.SPARK_METADATA_KEY)
+        .get(ParquetReadSupport.SPARK_METADATA_KEY)
       if (serializedSchema.isEmpty) {
         // Falls back to Parquet schema if no Spark SQL schema found.
         Some(parseParquetSchema(metadata.getSchema))
@@ -820,7 +820,7 @@ private[sql] object ParquetFileFormat extends Logging {
 
           // Converter used to convert Parquet `MessageType` to Spark SQL 
`StructType`
           val converter =
-            new CatalystSchemaConverter(
+            new ParquetSchemaConverter(
               assumeBinaryIsString = assumeBinaryIsString,
               assumeInt96IsTimestamp = assumeInt96IsTimestamp,
               writeLegacyParquetFormat = writeLegacyParquetFormat)
@@ -864,12 +864,12 @@ private[sql] object ParquetFileFormat extends Logging {
    * a [[StructType]] converted from the [[MessageType]] stored in this footer.
    */
   def readSchemaFromFooter(
-      footer: Footer, converter: CatalystSchemaConverter): StructType = {
+      footer: Footer, converter: ParquetSchemaConverter): StructType = {
     val fileMetaData = footer.getParquetMetadata.getFileMetaData
     fileMetaData
       .getKeyValueMetaData
       .asScala.toMap
-      .get(CatalystReadSupport.SPARK_METADATA_KEY)
+      .get(ParquetReadSupport.SPARK_METADATA_KEY)
       .flatMap(deserializeSchemaString)
       .getOrElse(converter.convert(fileMetaData.getSchema))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/52cb1ad3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
new file mode 100644
index 0000000..12f4974
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.{InitContext, ReadSupport}
+import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema._
+import org.apache.parquet.schema.Type.Repetition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+
+/**
+ * A Parquet [[ReadSupport]] implementation for reading Parquet records as 
Catalyst
+ * [[InternalRow]]s.
+ *
+ * The API interface of [[ReadSupport]] is a little bit over complicated 
because of historical
+ * reasons.  In older versions of parquet-mr (say 1.6.0rc3 and prior), 
[[ReadSupport]] need to be
+ * instantiated and initialized twice on both driver side and executor side.  
The [[init()]] method
+ * is for driver side initialization, while [[prepareForRead()]] is for 
executor side.  However,
+ * starting from parquet-mr 1.6.0, it's no longer the case, and 
[[ReadSupport]] is only instantiated
+ * and initialized on executor side.  So, theoretically, now it's totally fine 
to combine these two
+ * methods into a single initialization method.  The only reason (I could 
think of) to still have
+ * them here is for parquet-mr API backwards-compatibility.
+ *
+ * Due to this reason, we no longer rely on [[ReadContext]] to pass requested 
schema from [[init()]]
+ * to [[prepareForRead()]], but use a private `var` for simplicity.
+ */
+private[parquet] class ParquetReadSupport extends ReadSupport[InternalRow] 
with Logging {
+  private var catalystRequestedSchema: StructType = _
+
+  /**
+   * Called on executor side before [[prepareForRead()]] and instantiating 
actual Parquet record
+   * readers.  Responsible for figuring out Parquet requested schema used for 
column pruning.
+   */
+  override def init(context: InitContext): ReadContext = {
+    catalystRequestedSchema = {
+      val conf = context.getConfiguration
+      val schemaString = 
conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
+      assert(schemaString != null, "Parquet requested schema not set.")
+      StructType.fromString(schemaString)
+    }
+
+    val parquetRequestedSchema =
+      ParquetReadSupport.clipParquetSchema(context.getFileSchema, 
catalystRequestedSchema)
+
+    new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
+  }
+
+  /**
+   * Called on executor side after [[init()]], before instantiating actual 
Parquet record readers.
+   * Responsible for instantiating [[RecordMaterializer]], which is used for 
converting Parquet
+   * records to Catalyst [[InternalRow]]s.
+   */
+  override def prepareForRead(
+      conf: Configuration,
+      keyValueMetaData: JMap[String, String],
+      fileSchema: MessageType,
+      readContext: ReadContext): RecordMaterializer[InternalRow] = {
+    log.debug(s"Preparing for read Parquet file with message type: 
$fileSchema")
+    val parquetRequestedSchema = readContext.getRequestedSchema
+
+    logInfo {
+      s"""Going to read the following fields from the Parquet file:
+         |
+         |Parquet form:
+         |$parquetRequestedSchema
+         |Catalyst form:
+         |$catalystRequestedSchema
+       """.stripMargin
+    }
+
+    new ParquetRecordMaterializer(
+      parquetRequestedSchema,
+      ParquetReadSupport.expandUDT(catalystRequestedSchema))
+  }
+}
+
+private[parquet] object ParquetReadSupport {
+  val SPARK_ROW_REQUESTED_SCHEMA = 
"org.apache.spark.sql.parquet.row.requested_schema"
+
+  val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
+
+  /**
+   * Tailors `parquetSchema` according to `catalystSchema` by removing column 
paths don't exist
+   * in `catalystSchema`, and adding those only exist in `catalystSchema`.
+   */
+  def clipParquetSchema(parquetSchema: MessageType, catalystSchema: 
StructType): MessageType = {
+    val clippedParquetFields = 
clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
+    Types
+      .buildMessage()
+      .addFields(clippedParquetFields: _*)
+      .named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
+  }
+
+  private def clipParquetType(parquetType: Type, catalystType: DataType): Type 
= {
+    catalystType match {
+      case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
+        // Only clips array types with nested type as element type.
+        clipParquetListType(parquetType.asGroupType(), t.elementType)
+
+      case t: MapType
+        if !isPrimitiveCatalystType(t.keyType) ||
+           !isPrimitiveCatalystType(t.valueType) =>
+        // Only clips map types with nested key type or value type
+        clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType)
+
+      case t: StructType =>
+        clipParquetGroup(parquetType.asGroupType(), t)
+
+      case _ =>
+        // UDTs and primitive types are not clipped.  For UDTs, a clipped 
version might not be able
+        // to be mapped to desired user-space types.  So UDTs shouldn't 
participate schema merging.
+        parquetType
+    }
+  }
+
+  /**
+   * Whether a Catalyst [[DataType]] is primitive.  Primitive [[DataType]] is 
not equivalent to
+   * [[AtomicType]].  For example, [[CalendarIntervalType]] is primitive, but 
it's not an
+   * [[AtomicType]].
+   */
+  private def isPrimitiveCatalystType(dataType: DataType): Boolean = {
+    dataType match {
+      case _: ArrayType | _: MapType | _: StructType => false
+      case _ => true
+    }
+  }
+
+  /**
+   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst 
[[ArrayType]].  The element type
+   * of the [[ArrayType]] should also be a nested type, namely an 
[[ArrayType]], a [[MapType]], or a
+   * [[StructType]].
+   */
+  private def clipParquetListType(parquetList: GroupType, elementType: 
DataType): Type = {
+    // Precondition of this method, should only be called for lists with 
nested element types.
+    assert(!isPrimitiveCatalystType(elementType))
+
+    // Unannotated repeated group should be interpreted as required list of 
required element, so
+    // list element type is just the group itself.  Clip it.
+    if (parquetList.getOriginalType == null && 
parquetList.isRepetition(Repetition.REPEATED)) {
+      clipParquetType(parquetList, elementType)
+    } else {
+      assert(
+        parquetList.getOriginalType == OriginalType.LIST,
+        "Invalid Parquet schema. " +
+          "Original type of annotated Parquet lists must be LIST: " +
+          parquetList.toString)
+
+      assert(
+        parquetList.getFieldCount == 1 && 
parquetList.getType(0).isRepetition(Repetition.REPEATED),
+        "Invalid Parquet schema. " +
+          "LIST-annotated group should only have exactly one repeated field: " 
+
+          parquetList)
+
+      // Precondition of this method, should only be called for lists with 
nested element types.
+      assert(!parquetList.getType(0).isPrimitive)
+
+      val repeatedGroup = parquetList.getType(0).asGroupType()
+
+      // If the repeated field is a group with multiple fields, or the 
repeated field is a group
+      // with one field and is named either "array" or uses the LIST-annotated 
group's name with
+      // "_tuple" appended then the repeated type is the element type and 
elements are required.
+      // Build a new LIST-annotated group with clipped `repeatedGroup` as 
element type and the
+      // only field.
+      if (
+        repeatedGroup.getFieldCount > 1 ||
+        repeatedGroup.getName == "array" ||
+        repeatedGroup.getName == parquetList.getName + "_tuple"
+      ) {
+        Types
+          .buildGroup(parquetList.getRepetition)
+          .as(OriginalType.LIST)
+          .addField(clipParquetType(repeatedGroup, elementType))
+          .named(parquetList.getName)
+      } else {
+        // Otherwise, the repeated field's type is the element type with the 
repeated field's
+        // repetition.
+        Types
+          .buildGroup(parquetList.getRepetition)
+          .as(OriginalType.LIST)
+          .addField(
+            Types
+              .repeatedGroup()
+              .addField(clipParquetType(repeatedGroup.getType(0), elementType))
+              .named(repeatedGroup.getName))
+          .named(parquetList.getName)
+      }
+    }
+  }
+
+  /**
+   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst 
[[MapType]].  Either key type or
+   * value type of the [[MapType]] must be a nested type, namely an 
[[ArrayType]], a [[MapType]], or
+   * a [[StructType]].
+   */
+  private def clipParquetMapType(
+      parquetMap: GroupType, keyType: DataType, valueType: DataType): 
GroupType = {
+    // Precondition of this method, only handles maps with nested key types or 
value types.
+    assert(!isPrimitiveCatalystType(keyType) || 
!isPrimitiveCatalystType(valueType))
+
+    val repeatedGroup = parquetMap.getType(0).asGroupType()
+    val parquetKeyType = repeatedGroup.getType(0)
+    val parquetValueType = repeatedGroup.getType(1)
+
+    val clippedRepeatedGroup =
+      Types
+        .repeatedGroup()
+        .as(repeatedGroup.getOriginalType)
+        .addField(clipParquetType(parquetKeyType, keyType))
+        .addField(clipParquetType(parquetValueType, valueType))
+        .named(repeatedGroup.getName)
+
+    Types
+      .buildGroup(parquetMap.getRepetition)
+      .as(parquetMap.getOriginalType)
+      .addField(clippedRepeatedGroup)
+      .named(parquetMap.getName)
+  }
+
+  /**
+   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst 
[[StructType]].
+   *
+   * @return A clipped [[GroupType]], which has at least one field.
+   * @note Parquet doesn't allow creating empty [[GroupType]] instances except 
for empty
+   *       [[MessageType]].  Because it's legal to construct an empty 
requested schema for column
+   *       pruning.
+   */
+  private def clipParquetGroup(parquetRecord: GroupType, structType: 
StructType): GroupType = {
+    val clippedParquetFields = clipParquetGroupFields(parquetRecord, 
structType)
+    Types
+      .buildGroup(parquetRecord.getRepetition)
+      .as(parquetRecord.getOriginalType)
+      .addFields(clippedParquetFields: _*)
+      .named(parquetRecord.getName)
+  }
+
+  /**
+   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst 
[[StructType]].
+   *
+   * @return A list of clipped [[GroupType]] fields, which can be empty.
+   */
+  private def clipParquetGroupFields(
+      parquetRecord: GroupType, structType: StructType): Seq[Type] = {
+    val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName 
-> f).toMap
+    val toParquet = new ParquetSchemaConverter(writeLegacyParquetFormat = 
false)
+    structType.map { f =>
+      parquetFieldMap
+        .get(f.name)
+        .map(clipParquetType(_, f.dataType))
+        .getOrElse(toParquet.convertField(f))
+    }
+  }
+
+  def expandUDT(schema: StructType): StructType = {
+    def expand(dataType: DataType): DataType = {
+      dataType match {
+        case t: ArrayType =>
+          t.copy(elementType = expand(t.elementType))
+
+        case t: MapType =>
+          t.copy(
+            keyType = expand(t.keyType),
+            valueType = expand(t.valueType))
+
+        case t: StructType =>
+          val expandedFields = t.fields.map(f => f.copy(dataType = 
expand(f.dataType)))
+          t.copy(fields = expandedFields)
+
+        case t: UserDefinedType[_] =>
+          t.sqlType
+
+        case t =>
+          t
+      }
+    }
+
+    expand(schema).asInstanceOf[StructType]
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/52cb1ad3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
new file mode 100644
index 0000000..0818d80
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer}
+import org.apache.parquet.schema.MessageType
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A [[RecordMaterializer]] for Catalyst rows.
+ *
+ * @param parquetSchema Parquet schema of the records to be read
+ * @param catalystSchema Catalyst schema of the rows to be constructed
+ */
+private[parquet] class ParquetRecordMaterializer(
+    parquetSchema: MessageType, catalystSchema: StructType)
+  extends RecordMaterializer[InternalRow] {
+
+  private val rootConverter = new ParquetRowConverter(parquetSchema, 
catalystSchema, NoopUpdater)
+
+  override def getCurrentRecord: InternalRow = rootConverter.currentRecord
+
+  override def getRootConverter: GroupConverter = rootConverter
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/52cb1ad3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
new file mode 100644
index 0000000..9dad596
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -0,0 +1,672 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.math.{BigDecimal, BigInteger}
+import java.nio.ByteOrder
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.parquet.column.Dictionary
+import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, 
PrimitiveConverter}
+import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type}
+import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8}
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, 
DOUBLE, FIXED_LEN_BYTE_ARRAY, INT32, INT64}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, 
GenericArrayData}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * A [[ParentContainerUpdater]] is used by a Parquet converter to set 
converted values to some
+ * corresponding parent container. For example, a converter for a `StructType` 
field may set
+ * converted values to a [[MutableRow]]; or a converter for array elements may 
append converted
+ * values to an [[ArrayBuffer]].
+ */
+private[parquet] trait ParentContainerUpdater {
+  /** Called before a record field is being converted */
+  def start(): Unit = ()
+
+  /** Called after a record field is being converted */
+  def end(): Unit = ()
+
+  def set(value: Any): Unit = ()
+  def setBoolean(value: Boolean): Unit = set(value)
+  def setByte(value: Byte): Unit = set(value)
+  def setShort(value: Short): Unit = set(value)
+  def setInt(value: Int): Unit = set(value)
+  def setLong(value: Long): Unit = set(value)
+  def setFloat(value: Float): Unit = set(value)
+  def setDouble(value: Double): Unit = set(value)
+}
+
+/** A no-op updater used for root converter (who doesn't have a parent). */
+private[parquet] object NoopUpdater extends ParentContainerUpdater
+
+private[parquet] trait HasParentContainerUpdater {
+  def updater: ParentContainerUpdater
+}
+
+/**
+ * A convenient converter class for Parquet group types with a 
[[HasParentContainerUpdater]].
+ */
+private[parquet] abstract class ParquetGroupConverter(val updater: 
ParentContainerUpdater)
+  extends GroupConverter with HasParentContainerUpdater
+
+/**
+ * Parquet converter for Parquet primitive types.  Note that not all Spark SQL 
atomic types
+ * are handled by this converter.  Parquet primitive types are only a subset 
of those of Spark
+ * SQL.  For example, BYTE, SHORT, and INT in Spark SQL are all covered by 
INT32 in Parquet.
+ */
+private[parquet] class ParquetPrimitiveConverter(val updater: 
ParentContainerUpdater)
+  extends PrimitiveConverter with HasParentContainerUpdater {
+
+  override def addBoolean(value: Boolean): Unit = updater.setBoolean(value)
+  override def addInt(value: Int): Unit = updater.setInt(value)
+  override def addLong(value: Long): Unit = updater.setLong(value)
+  override def addFloat(value: Float): Unit = updater.setFloat(value)
+  override def addDouble(value: Double): Unit = updater.setDouble(value)
+  override def addBinary(value: Binary): Unit = updater.set(value.getBytes)
+}
+
+/**
+ * A [[ParquetRowConverter]] is used to convert Parquet records into Catalyst 
[[InternalRow]]s.
+ * Since Catalyst `StructType` is also a Parquet record, this converter can be 
used as root
+ * converter.  Take the following Parquet type as an example:
+ * {{{
+ *   message root {
+ *     required int32 f1;
+ *     optional group f2 {
+ *       required double f21;
+ *       optional binary f22 (utf8);
+ *     }
+ *   }
+ * }}}
+ * 5 converters will be created:
+ *
+ * - a root [[ParquetRowConverter]] for [[MessageType]] `root`, which contains:
+ *   - a [[ParquetPrimitiveConverter]] for required [[INT_32]] field `f1`, and
+ *   - a nested [[ParquetRowConverter]] for optional [[GroupType]] `f2`, which 
contains:
+ *     - a [[ParquetPrimitiveConverter]] for required [[DOUBLE]] field `f21`, 
and
+ *     - a [[ParquetStringConverter]] for optional [[UTF8]] string field `f22`
+ *
+ * When used as a root converter, [[NoopUpdater]] should be used since root 
converters don't have
+ * any "parent" container.
+ *
+ * @param parquetType Parquet schema of Parquet records
+ * @param catalystType Spark SQL schema that corresponds to the Parquet record 
type. User-defined
+ *        types should have been expanded.
+ * @param updater An updater which propagates converted field values to the 
parent container
+ */
+private[parquet] class ParquetRowConverter(
+    parquetType: GroupType,
+    catalystType: StructType,
+    updater: ParentContainerUpdater)
+  extends ParquetGroupConverter(updater) with Logging {
+
+  assert(
+    parquetType.getFieldCount == catalystType.length,
+    s"""Field counts of the Parquet schema and the Catalyst schema don't match:
+       |
+       |Parquet schema:
+       |$parquetType
+       |Catalyst schema:
+       |${catalystType.prettyJson}
+     """.stripMargin)
+
+  assert(
+    !catalystType.existsRecursively(_.isInstanceOf[UserDefinedType[_]]),
+    s"""User-defined types in Catalyst schema should have already been 
expanded:
+       |${catalystType.prettyJson}
+     """.stripMargin)
+
+  logDebug(
+    s"""Building row converter for the following schema:
+       |
+       |Parquet form:
+       |$parquetType
+       |Catalyst form:
+       |${catalystType.prettyJson}
+     """.stripMargin)
+
+  /**
+   * Updater used together with field converters within a 
[[ParquetRowConverter]].  It propagates
+   * converted filed values to the `ordinal`-th cell in `currentRow`.
+   */
+  private final class RowUpdater(row: MutableRow, ordinal: Int) extends 
ParentContainerUpdater {
+    override def set(value: Any): Unit = row(ordinal) = value
+    override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, 
value)
+    override def setByte(value: Byte): Unit = row.setByte(ordinal, value)
+    override def setShort(value: Short): Unit = row.setShort(ordinal, value)
+    override def setInt(value: Int): Unit = row.setInt(ordinal, value)
+    override def setLong(value: Long): Unit = row.setLong(ordinal, value)
+    override def setDouble(value: Double): Unit = row.setDouble(ordinal, value)
+    override def setFloat(value: Float): Unit = row.setFloat(ordinal, value)
+  }
+
+  private val currentRow = new SpecificMutableRow(catalystType.map(_.dataType))
+
+  private val unsafeProjection = UnsafeProjection.create(catalystType)
+
+  /**
+   * The [[UnsafeRow]] converted from an entire Parquet record.
+   */
+  def currentRecord: UnsafeRow = unsafeProjection(currentRow)
+
+  // Converters for each field.
+  private val fieldConverters: Array[Converter with HasParentContainerUpdater] 
= {
+    parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map {
+      case ((parquetFieldType, catalystField), ordinal) =>
+        // Converted field value should be set to the `ordinal`-th cell of 
`currentRow`
+        newConverter(parquetFieldType, catalystField.dataType, new 
RowUpdater(currentRow, ordinal))
+    }.toArray
+  }
+
+  override def getConverter(fieldIndex: Int): Converter = 
fieldConverters(fieldIndex)
+
+  override def end(): Unit = {
+    var i = 0
+    while (i < currentRow.numFields) {
+      fieldConverters(i).updater.end()
+      i += 1
+    }
+    updater.set(currentRow)
+  }
+
+  override def start(): Unit = {
+    var i = 0
+    while (i < currentRow.numFields) {
+      fieldConverters(i).updater.start()
+      currentRow.setNullAt(i)
+      i += 1
+    }
+  }
+
+  /**
+   * Creates a converter for the given Parquet type `parquetType` and Spark 
SQL data type
+   * `catalystType`. Converted values are handled by `updater`.
+   */
+  private def newConverter(
+      parquetType: Type,
+      catalystType: DataType,
+      updater: ParentContainerUpdater): Converter with 
HasParentContainerUpdater = {
+
+    catalystType match {
+      case BooleanType | IntegerType | LongType | FloatType | DoubleType | 
BinaryType =>
+        new ParquetPrimitiveConverter(updater)
+
+      case ByteType =>
+        new ParquetPrimitiveConverter(updater) {
+          override def addInt(value: Int): Unit =
+            updater.setByte(value.asInstanceOf[ByteType#InternalType])
+        }
+
+      case ShortType =>
+        new ParquetPrimitiveConverter(updater) {
+          override def addInt(value: Int): Unit =
+            updater.setShort(value.asInstanceOf[ShortType#InternalType])
+        }
+
+      // For INT32 backed decimals
+      case t: DecimalType if 
parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 =>
+        new ParquetIntDictionaryAwareDecimalConverter(t.precision, t.scale, 
updater)
+
+      // For INT64 backed decimals
+      case t: DecimalType if 
parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 =>
+        new ParquetLongDictionaryAwareDecimalConverter(t.precision, t.scale, 
updater)
+
+      // For BINARY and FIXED_LEN_BYTE_ARRAY backed decimals
+      case t: DecimalType
+        if parquetType.asPrimitiveType().getPrimitiveTypeName == 
FIXED_LEN_BYTE_ARRAY ||
+           parquetType.asPrimitiveType().getPrimitiveTypeName == BINARY =>
+        new ParquetBinaryDictionaryAwareDecimalConverter(t.precision, t.scale, 
updater)
+
+      case t: DecimalType =>
+        throw new RuntimeException(
+          s"Unable to create Parquet converter for decimal type ${t.json} 
whose Parquet type is " +
+            s"$parquetType.  Parquet DECIMAL type can only be backed by INT32, 
INT64, " +
+            "FIXED_LEN_BYTE_ARRAY, or BINARY.")
+
+      case StringType =>
+        new ParquetStringConverter(updater)
+
+      case TimestampType =>
+        // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
+        new ParquetPrimitiveConverter(updater) {
+          // Converts nanosecond timestamps stored as INT96
+          override def addBinary(value: Binary): Unit = {
+            assert(
+              value.length() == 12,
+              "Timestamps (with nanoseconds) are expected to be stored in 
12-byte long binaries, " +
+              s"but got a ${value.length()}-byte binary.")
+
+            val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
+            val timeOfDayNanos = buf.getLong
+            val julianDay = buf.getInt
+            updater.setLong(DateTimeUtils.fromJulianDay(julianDay, 
timeOfDayNanos))
+          }
+        }
+
+      case DateType =>
+        new ParquetPrimitiveConverter(updater) {
+          override def addInt(value: Int): Unit = {
+            // DateType is not specialized in `SpecificMutableRow`, have to 
box it here.
+            updater.set(value.asInstanceOf[DateType#InternalType])
+          }
+        }
+
+      // A repeated field that is neither contained by a `LIST`- or 
`MAP`-annotated group nor
+      // annotated by `LIST` or `MAP` should be interpreted as a required list 
of required
+      // elements where the element type is the type of the field.
+      case t: ArrayType if parquetType.getOriginalType != LIST =>
+        if (parquetType.isPrimitive) {
+          new RepeatedPrimitiveConverter(parquetType, t.elementType, updater)
+        } else {
+          new RepeatedGroupConverter(parquetType, t.elementType, updater)
+        }
+
+      case t: ArrayType =>
+        new ParquetArrayConverter(parquetType.asGroupType(), t, updater)
+
+      case t: MapType =>
+        new ParquetMapConverter(parquetType.asGroupType(), t, updater)
+
+      case t: StructType =>
+        new ParquetRowConverter(parquetType.asGroupType(), t, new 
ParentContainerUpdater {
+          override def set(value: Any): Unit = 
updater.set(value.asInstanceOf[InternalRow].copy())
+        })
+
+      case t =>
+        throw new RuntimeException(
+          s"Unable to create Parquet converter for data type ${t.json} " +
+            s"whose Parquet type is $parquetType")
+    }
+  }
+
+  /**
+   * Parquet converter for strings. A dictionary is used to minimize string 
decoding cost.
+   */
+  private final class ParquetStringConverter(updater: ParentContainerUpdater)
+    extends ParquetPrimitiveConverter(updater) {
+
+    private var expandedDictionary: Array[UTF8String] = null
+
+    override def hasDictionarySupport: Boolean = true
+
+    override def setDictionary(dictionary: Dictionary): Unit = {
+      this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { i =>
+        UTF8String.fromBytes(dictionary.decodeToBinary(i).getBytes)
+      }
+    }
+
+    override def addValueFromDictionary(dictionaryId: Int): Unit = {
+      updater.set(expandedDictionary(dictionaryId))
+    }
+
+    override def addBinary(value: Binary): Unit = {
+      // The underlying `ByteBuffer` implementation is guaranteed to be 
`HeapByteBuffer`, so here we
+      // are using `Binary.toByteBuffer.array()` to steal the underlying byte 
array without copying
+      // it.
+      val buffer = value.toByteBuffer
+      val offset = buffer.arrayOffset() + buffer.position()
+      val numBytes = buffer.remaining()
+      updater.set(UTF8String.fromBytes(buffer.array(), offset, numBytes))
+    }
+  }
+
+  /**
+   * Parquet converter for fixed-precision decimals.
+   */
+  private abstract class ParquetDecimalConverter(
+      precision: Int, scale: Int, updater: ParentContainerUpdater)
+    extends ParquetPrimitiveConverter(updater) {
+
+    protected var expandedDictionary: Array[Decimal] = _
+
+    override def hasDictionarySupport: Boolean = true
+
+    override def addValueFromDictionary(dictionaryId: Int): Unit = {
+      updater.set(expandedDictionary(dictionaryId))
+    }
+
+    // Converts decimals stored as INT32
+    override def addInt(value: Int): Unit = {
+      addLong(value: Long)
+    }
+
+    // Converts decimals stored as INT64
+    override def addLong(value: Long): Unit = {
+      updater.set(decimalFromLong(value))
+    }
+
+    // Converts decimals stored as either FIXED_LENGTH_BYTE_ARRAY or BINARY
+    override def addBinary(value: Binary): Unit = {
+      updater.set(decimalFromBinary(value))
+    }
+
+    protected def decimalFromLong(value: Long): Decimal = {
+      Decimal(value, precision, scale)
+    }
+
+    protected def decimalFromBinary(value: Binary): Decimal = {
+      if (precision <= Decimal.MAX_LONG_DIGITS) {
+        // Constructs a `Decimal` with an unscaled `Long` value if possible.
+        val unscaled = ParquetRowConverter.binaryToUnscaledLong(value)
+        Decimal(unscaled, precision, scale)
+      } else {
+        // Otherwise, resorts to an unscaled `BigInteger` instead.
+        Decimal(new BigDecimal(new BigInteger(value.getBytes), scale), 
precision, scale)
+      }
+    }
+  }
+
+  private class ParquetIntDictionaryAwareDecimalConverter(
+      precision: Int, scale: Int, updater: ParentContainerUpdater)
+    extends ParquetDecimalConverter(precision, scale, updater) {
+
+    override def setDictionary(dictionary: Dictionary): Unit = {
+      this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id =>
+        decimalFromLong(dictionary.decodeToInt(id).toLong)
+      }
+    }
+  }
+
+  private class ParquetLongDictionaryAwareDecimalConverter(
+      precision: Int, scale: Int, updater: ParentContainerUpdater)
+    extends ParquetDecimalConverter(precision, scale, updater) {
+
+    override def setDictionary(dictionary: Dictionary): Unit = {
+      this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id =>
+        decimalFromLong(dictionary.decodeToLong(id))
+      }
+    }
+  }
+
+  private class ParquetBinaryDictionaryAwareDecimalConverter(
+      precision: Int, scale: Int, updater: ParentContainerUpdater)
+    extends ParquetDecimalConverter(precision, scale, updater) {
+
+    override def setDictionary(dictionary: Dictionary): Unit = {
+      this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id =>
+        decimalFromBinary(dictionary.decodeToBinary(id))
+      }
+    }
+  }
+
+  /**
+   * Parquet converter for arrays.  Spark SQL arrays are represented as 
Parquet lists.  Standard
+   * Parquet lists are represented as a 3-level group annotated by `LIST`:
+   * {{{
+   *   <list-repetition> group <name> (LIST) {            <-- parquetSchema 
points here
+   *     repeated group list {
+   *       <element-repetition> <element-type> element;
+   *     }
+   *   }
+   * }}}
+   * The `parquetSchema` constructor argument points to the outermost group.
+   *
+   * However, before this representation is standardized, some Parquet 
libraries/tools also use some
+   * non-standard formats to represent list-like structures.  
Backwards-compatibility rules for
+   * handling these cases are described in Parquet format spec.
+   *
+   * @see 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
+   */
+  private final class ParquetArrayConverter(
+      parquetSchema: GroupType,
+      catalystSchema: ArrayType,
+      updater: ParentContainerUpdater)
+    extends ParquetGroupConverter(updater) {
+
+    private var currentArray: ArrayBuffer[Any] = _
+
+    private val elementConverter: Converter = {
+      val repeatedType = parquetSchema.getType(0)
+      val elementType = catalystSchema.elementType
+      val parentName = parquetSchema.getName
+
+      if (isElementType(repeatedType, elementType, parentName)) {
+        newConverter(repeatedType, elementType, new ParentContainerUpdater {
+          override def set(value: Any): Unit = currentArray += value
+        })
+      } else {
+        new ElementConverter(repeatedType.asGroupType().getType(0), 
elementType)
+      }
+    }
+
+    override def getConverter(fieldIndex: Int): Converter = elementConverter
+
+    override def end(): Unit = updater.set(new 
GenericArrayData(currentArray.toArray))
+
+    // NOTE: We can't reuse the mutable `ArrayBuffer` here and must 
instantiate a new buffer for the
+    // next value.  `Row.copy()` only copies row cells, it doesn't do deep 
copy to objects stored
+    // in row cells.
+    override def start(): Unit = currentArray = ArrayBuffer.empty[Any]
+
+    // scalastyle:off
+    /**
+     * Returns whether the given type is the element type of a list or is a 
syntactic group with
+     * one field that is the element type.  This is determined by checking 
whether the type can be
+     * a syntactic group and by checking whether a potential syntactic group 
matches the expected
+     * schema.
+     * {{{
+     *   <list-repetition> group <name> (LIST) {
+     *     repeated group list {                          <-- repeatedType 
points here
+     *       <element-repetition> <element-type> element;
+     *     }
+     *   }
+     * }}}
+     * In short, here we handle Parquet list backwards-compatibility rules on 
the read path.  This
+     * method is based on `AvroIndexedRecordConverter.isElementType`.
+     *
+     * @see 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
+     */
+    // scalastyle:on
+    private def isElementType(
+        parquetRepeatedType: Type, catalystElementType: DataType, parentName: 
String): Boolean = {
+      (parquetRepeatedType, catalystElementType) match {
+        case (t: PrimitiveType, _) => true
+        case (t: GroupType, _) if t.getFieldCount > 1 => true
+        case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == "array" 
=> true
+        case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == 
parentName + "_tuple" => true
+        case (t: GroupType, StructType(Array(f))) if f.name == 
t.getFieldName(0) => true
+        case _ => false
+      }
+    }
+
+    /** Array element converter */
+    private final class ElementConverter(parquetType: Type, catalystType: 
DataType)
+      extends GroupConverter {
+
+      private var currentElement: Any = _
+
+      private val converter = newConverter(parquetType, catalystType, new 
ParentContainerUpdater {
+        override def set(value: Any): Unit = currentElement = value
+      })
+
+      override def getConverter(fieldIndex: Int): Converter = converter
+
+      override def end(): Unit = currentArray += currentElement
+
+      override def start(): Unit = currentElement = null
+    }
+  }
+
+  /** Parquet converter for maps */
+  private final class ParquetMapConverter(
+      parquetType: GroupType,
+      catalystType: MapType,
+      updater: ParentContainerUpdater)
+    extends ParquetGroupConverter(updater) {
+
+    private var currentKeys: ArrayBuffer[Any] = _
+    private var currentValues: ArrayBuffer[Any] = _
+
+    private val keyValueConverter = {
+      val repeatedType = parquetType.getType(0).asGroupType()
+      new KeyValueConverter(
+        repeatedType.getType(0),
+        repeatedType.getType(1),
+        catalystType.keyType,
+        catalystType.valueType)
+    }
+
+    override def getConverter(fieldIndex: Int): Converter = keyValueConverter
+
+    override def end(): Unit =
+      updater.set(ArrayBasedMapData(currentKeys.toArray, 
currentValues.toArray))
+
+    // NOTE: We can't reuse the mutable Map here and must instantiate a new 
`Map` for the next
+    // value.  `Row.copy()` only copies row cells, it doesn't do deep copy to 
objects stored in row
+    // cells.
+    override def start(): Unit = {
+      currentKeys = ArrayBuffer.empty[Any]
+      currentValues = ArrayBuffer.empty[Any]
+    }
+
+    /** Parquet converter for key-value pairs within the map. */
+    private final class KeyValueConverter(
+        parquetKeyType: Type,
+        parquetValueType: Type,
+        catalystKeyType: DataType,
+        catalystValueType: DataType)
+      extends GroupConverter {
+
+      private var currentKey: Any = _
+
+      private var currentValue: Any = _
+
+      private val converters = Array(
+        // Converter for keys
+        newConverter(parquetKeyType, catalystKeyType, new 
ParentContainerUpdater {
+          override def set(value: Any): Unit = currentKey = value
+        }),
+
+        // Converter for values
+        newConverter(parquetValueType, catalystValueType, new 
ParentContainerUpdater {
+          override def set(value: Any): Unit = currentValue = value
+        }))
+
+      override def getConverter(fieldIndex: Int): Converter = 
converters(fieldIndex)
+
+      override def end(): Unit = {
+        currentKeys += currentKey
+        currentValues += currentValue
+      }
+
+      override def start(): Unit = {
+        currentKey = null
+        currentValue = null
+      }
+    }
+  }
+
+  private trait RepeatedConverter {
+    private var currentArray: ArrayBuffer[Any] = _
+
+    protected def newArrayUpdater(updater: ParentContainerUpdater) = new 
ParentContainerUpdater {
+      override def start(): Unit = currentArray = ArrayBuffer.empty[Any]
+      override def end(): Unit = updater.set(new 
GenericArrayData(currentArray.toArray))
+      override def set(value: Any): Unit = currentArray += value
+    }
+  }
+
+  /**
+   * A primitive converter for converting unannotated repeated primitive 
values to required arrays
+   * of required primitives values.
+   */
+  private final class RepeatedPrimitiveConverter(
+      parquetType: Type,
+      catalystType: DataType,
+      parentUpdater: ParentContainerUpdater)
+    extends PrimitiveConverter with RepeatedConverter with 
HasParentContainerUpdater {
+
+    val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater)
+
+    private val elementConverter: PrimitiveConverter =
+      newConverter(parquetType, catalystType, updater).asPrimitiveConverter()
+
+    override def addBoolean(value: Boolean): Unit = 
elementConverter.addBoolean(value)
+    override def addInt(value: Int): Unit = elementConverter.addInt(value)
+    override def addLong(value: Long): Unit = elementConverter.addLong(value)
+    override def addFloat(value: Float): Unit = 
elementConverter.addFloat(value)
+    override def addDouble(value: Double): Unit = 
elementConverter.addDouble(value)
+    override def addBinary(value: Binary): Unit = 
elementConverter.addBinary(value)
+
+    override def setDictionary(dict: Dictionary): Unit = 
elementConverter.setDictionary(dict)
+    override def hasDictionarySupport: Boolean = 
elementConverter.hasDictionarySupport
+    override def addValueFromDictionary(id: Int): Unit = 
elementConverter.addValueFromDictionary(id)
+  }
+
+  /**
+   * A group converter for converting unannotated repeated group values to 
required arrays of
+   * required struct values.
+   */
+  private final class RepeatedGroupConverter(
+      parquetType: Type,
+      catalystType: DataType,
+      parentUpdater: ParentContainerUpdater)
+    extends GroupConverter with HasParentContainerUpdater with 
RepeatedConverter {
+
+    val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater)
+
+    private val elementConverter: GroupConverter =
+      newConverter(parquetType, catalystType, updater).asGroupConverter()
+
+    override def getConverter(field: Int): Converter = 
elementConverter.getConverter(field)
+    override def end(): Unit = elementConverter.end()
+    override def start(): Unit = elementConverter.start()
+  }
+}
+
+private[parquet] object ParquetRowConverter {
+  def binaryToUnscaledLong(binary: Binary): Long = {
+    // The underlying `ByteBuffer` implementation is guaranteed to be 
`HeapByteBuffer`, so here
+    // we are using `Binary.toByteBuffer.array()` to steal the underlying byte 
array without
+    // copying it.
+    val buffer = binary.toByteBuffer
+    val bytes = buffer.array()
+    val start = buffer.arrayOffset() + buffer.position()
+    val end = buffer.arrayOffset() + buffer.limit()
+
+    var unscaled = 0L
+    var i = start
+
+    while (i < end) {
+      unscaled = (unscaled << 8) | (bytes(i) & 0xff)
+      i += 1
+    }
+
+    val bits = 8 * (end - start)
+    unscaled = (unscaled << (64 - bits)) >> (64 - bits)
+    unscaled
+  }
+
+  def binaryToSQLTimestamp(binary: Binary): SQLTimestamp = {
+    assert(binary.length() == 12, s"Timestamps (with nanoseconds) are expected 
to be stored in" +
+      s" 12-byte long binaries. Found a ${binary.length()}-byte binary 
instead.")
+    val buffer = binary.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
+    val timeOfDayNanos = buffer.getLong
+    val julianDay = buffer.getInt
+    DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to