[SPARK-15979][SQL] Rename various Parquet support classes.

## What changes were proposed in this pull request?
This patch renames various Parquet support classes from CatalystAbc to 
ParquetAbc. This new naming makes more sense for two reasons:

1. These are not optimizer related (i.e. Catalyst) classes.
2. We are in the Spark code base, and as a result it'd be more clear to call 
out these are Parquet support classes, rather than some Spark classes.

## How was this patch tested?
Renamed test cases as well.

Author: Reynold Xin <r...@databricks.com>

Closes #13696 from rxin/parquet-rename.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/865e7cc3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/865e7cc3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/865e7cc3

Branch: refs/heads/master
Commit: 865e7cc38d2b7cf2d4f7e7b04ccb7b17791a693b
Parents: 3e6d567
Author: Reynold Xin <r...@databricks.com>
Authored: Wed Jun 15 20:05:08 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Jun 15 20:05:08 2016 -0700

----------------------------------------------------------------------
 .../spark/ml/source/libsvm/LibSVMRelation.scala |   6 +-
 .../SpecificParquetRecordReaderBase.java        |   8 +-
 .../parquet/VectorizedColumnReader.java         |  12 +-
 .../datasources/fileSourceInterfaces.scala      |  27 +-
 .../parquet/CatalystReadSupport.scala           | 306 ---------
 .../parquet/CatalystRecordMaterializer.scala    |  41 --
 .../parquet/CatalystRowConverter.scala          | 672 -------------------
 .../parquet/CatalystSchemaConverter.scala       | 595 ----------------
 .../parquet/CatalystWriteSupport.scala          |   8 +-
 .../datasources/parquet/ParquetFileFormat.scala |  30 +-
 .../parquet/ParquetReadSupport.scala            | 306 +++++++++
 .../parquet/ParquetRecordMaterializer.scala     |  41 ++
 .../parquet/ParquetRowConverter.scala           | 672 +++++++++++++++++++
 .../parquet/ParquetSchemaConverter.scala        | 595 ++++++++++++++++
 .../datasources/parquet/ParquetIOSuite.scala    |   4 +-
 .../datasources/parquet/ParquetQuerySuite.scala |   6 +-
 .../parquet/ParquetSchemaSuite.scala            |   8 +-
 .../datasources/parquet/ParquetTest.scala       |   4 +-
 18 files changed, 1669 insertions(+), 1672 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala 
b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index b5b2a68..62e09d2 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -141,7 +141,7 @@ class LibSVMFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       sparkSession: SparkSession,
       options: Map[String, String],
       files: Seq[FileStatus]): Map[String, String] = {
-    def computeNumFeatures(): Int = {
+    val numFeatures = options.get("numFeatures").filter(_.toInt > 0).getOrElse 
{
       val dataFiles = files.filterNot(_.getPath.getName startsWith "_")
       val path = if (dataFiles.length == 1) {
         dataFiles.head.getPath.toUri.toString
@@ -156,10 +156,6 @@ class LibSVMFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       MLUtils.computeNumFeatures(parsed)
     }
 
-    val numFeatures = options.get("numFeatures").filter(_.toInt > 0).getOrElse 
{
-      computeNumFeatures()
-    }
-
     new CaseInsensitiveMap(options + ("numFeatures" -> numFeatures.toString))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index 14626e5..d823275 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -136,7 +136,7 @@ public abstract class SpecificParquetRecordReaderBase<T> 
extends RecordReader<Vo
     ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
         taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), 
fileSchema));
     this.requestedSchema = readContext.getRequestedSchema();
-    this.sparkSchema = new 
CatalystSchemaConverter(configuration).convert(requestedSchema);
+    this.sparkSchema = new 
ParquetSchemaConverter(configuration).convert(requestedSchema);
     this.reader = new ParquetFileReader(configuration, file, blocks, 
requestedSchema.getColumns());
     for (BlockMetaData block : blocks) {
       this.totalRowCount += block.getRowCount();
@@ -195,12 +195,12 @@ public abstract class SpecificParquetRecordReaderBase<T> 
extends RecordReader<Vo
           }
           builder.addFields(fileSchema.getType(s));
         }
-        this.requestedSchema = 
builder.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME());
+        this.requestedSchema = 
builder.named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME());
       } else {
-        this.requestedSchema = CatalystSchemaConverter.EMPTY_MESSAGE();
+        this.requestedSchema = ParquetSchemaConverter.EMPTY_MESSAGE();
       }
     }
-    this.sparkSchema = new 
CatalystSchemaConverter(config).convert(requestedSchema);
+    this.sparkSchema = new 
ParquetSchemaConverter(config).convert(requestedSchema);
     this.reader = new ParquetFileReader(config, file, blocks, 
requestedSchema.getColumns());
     for (BlockMetaData block : blocks) {
       this.totalRowCount += block.getRowCount();

http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index ea37a08..662a03d 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -259,7 +259,7 @@ public class VectorizedColumnReader {
           for (int i = rowId; i < rowId + num; ++i) {
             // TODO: Convert dictionary of Binaries to dictionary of Longs
             Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
-            column.putLong(i, CatalystRowConverter.binaryToSQLTimestamp(v));
+            column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
           }
         } else {
           throw new NotImplementedException();
@@ -280,12 +280,12 @@ public class VectorizedColumnReader {
         if (DecimalType.is32BitDecimalType(column.dataType())) {
           for (int i = rowId; i < rowId + num; ++i) {
             Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
-            column.putInt(i, (int) 
CatalystRowConverter.binaryToUnscaledLong(v));
+            column.putInt(i, (int) 
ParquetRowConverter.binaryToUnscaledLong(v));
           }
         } else if (DecimalType.is64BitDecimalType(column.dataType())) {
           for (int i = rowId; i < rowId + num; ++i) {
             Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
-            column.putLong(i, CatalystRowConverter.binaryToUnscaledLong(v));
+            column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v));
           }
         } else if (DecimalType.isByteArrayDecimalType(column.dataType())) {
           for (int i = rowId; i < rowId + num; ++i) {
@@ -375,7 +375,7 @@ public class VectorizedColumnReader {
         if (defColumn.readInteger() == maxDefLevel) {
           column.putLong(rowId + i,
               // Read 12 bytes for INT96
-              CatalystRowConverter.binaryToSQLTimestamp(data.readBinary(12)));
+              ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12)));
         } else {
           column.putNull(rowId + i);
         }
@@ -394,7 +394,7 @@ public class VectorizedColumnReader {
       for (int i = 0; i < num; i++) {
         if (defColumn.readInteger() == maxDefLevel) {
           column.putInt(rowId + i,
-              (int) 
CatalystRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
+              (int) 
ParquetRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
         } else {
           column.putNull(rowId + i);
         }
@@ -403,7 +403,7 @@ public class VectorizedColumnReader {
       for (int i = 0; i < num; i++) {
         if (defColumn.readInteger() == maxDefLevel) {
           column.putLong(rowId + i,
-              
CatalystRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
+              
ParquetRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
         } else {
           column.putNull(rowId + i);
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 9c18989..641c5cb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -207,6 +207,20 @@ trait FileFormat {
       dataSchema: StructType): OutputWriterFactory
 
   /**
+   * Returns a [[OutputWriterFactory]] for generating output writers that can 
write data.
+   * This method is current used only by FileStreamSinkWriter to generate 
output writers that
+   * does not use output committers to write data. The OutputWriter generated 
by the returned
+   * [[OutputWriterFactory]] must implement the method `newWriter(path)`..
+   */
+  def buildWriter(
+      sqlContext: SQLContext,
+      dataSchema: StructType,
+      options: Map[String, String]): OutputWriterFactory = {
+    // TODO: Remove this default implementation when the other formats have 
been ported
+    throw new UnsupportedOperationException(s"buildWriter is not supported for 
$this")
+  }
+
+  /**
    * Returns whether this format support returning columnar batch or not.
    *
    * TODO: we should just have different traits for the different formats.
@@ -293,19 +307,6 @@ trait FileFormat {
     }
   }
 
-  /**
-   * Returns a [[OutputWriterFactory]] for generating output writers that can 
write data.
-   * This method is current used only by FileStreamSinkWriter to generate 
output writers that
-   * does not use output committers to write data. The OutputWriter generated 
by the returned
-   * [[OutputWriterFactory]] must implement the method `newWriter(path)`..
-   */
-  def buildWriter(
-      sqlContext: SQLContext,
-      dataSchema: StructType,
-      options: Map[String, String]): OutputWriterFactory = {
-    // TODO: Remove this default implementation when the other formats have 
been ported
-    throw new UnsupportedOperationException(s"buildWriter is not supported for 
$this")
-  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
deleted file mode 100644
index 9c885b2..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.parquet
-
-import java.util.{Map => JMap}
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.hadoop.api.{InitContext, ReadSupport}
-import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
-import org.apache.parquet.io.api.RecordMaterializer
-import org.apache.parquet.schema._
-import org.apache.parquet.schema.Type.Repetition
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types._
-
-/**
- * A Parquet [[ReadSupport]] implementation for reading Parquet records as 
Catalyst
- * [[InternalRow]]s.
- *
- * The API interface of [[ReadSupport]] is a little bit over complicated 
because of historical
- * reasons.  In older versions of parquet-mr (say 1.6.0rc3 and prior), 
[[ReadSupport]] need to be
- * instantiated and initialized twice on both driver side and executor side.  
The [[init()]] method
- * is for driver side initialization, while [[prepareForRead()]] is for 
executor side.  However,
- * starting from parquet-mr 1.6.0, it's no longer the case, and 
[[ReadSupport]] is only instantiated
- * and initialized on executor side.  So, theoretically, now it's totally fine 
to combine these two
- * methods into a single initialization method.  The only reason (I could 
think of) to still have
- * them here is for parquet-mr API backwards-compatibility.
- *
- * Due to this reason, we no longer rely on [[ReadContext]] to pass requested 
schema from [[init()]]
- * to [[prepareForRead()]], but use a private `var` for simplicity.
- */
-private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] 
with Logging {
-  private var catalystRequestedSchema: StructType = _
-
-  /**
-   * Called on executor side before [[prepareForRead()]] and instantiating 
actual Parquet record
-   * readers.  Responsible for figuring out Parquet requested schema used for 
column pruning.
-   */
-  override def init(context: InitContext): ReadContext = {
-    catalystRequestedSchema = {
-      val conf = context.getConfiguration
-      val schemaString = 
conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
-      assert(schemaString != null, "Parquet requested schema not set.")
-      StructType.fromString(schemaString)
-    }
-
-    val parquetRequestedSchema =
-      CatalystReadSupport.clipParquetSchema(context.getFileSchema, 
catalystRequestedSchema)
-
-    new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
-  }
-
-  /**
-   * Called on executor side after [[init()]], before instantiating actual 
Parquet record readers.
-   * Responsible for instantiating [[RecordMaterializer]], which is used for 
converting Parquet
-   * records to Catalyst [[InternalRow]]s.
-   */
-  override def prepareForRead(
-      conf: Configuration,
-      keyValueMetaData: JMap[String, String],
-      fileSchema: MessageType,
-      readContext: ReadContext): RecordMaterializer[InternalRow] = {
-    log.debug(s"Preparing for read Parquet file with message type: 
$fileSchema")
-    val parquetRequestedSchema = readContext.getRequestedSchema
-
-    logInfo {
-      s"""Going to read the following fields from the Parquet file:
-         |
-         |Parquet form:
-         |$parquetRequestedSchema
-         |Catalyst form:
-         |$catalystRequestedSchema
-       """.stripMargin
-    }
-
-    new CatalystRecordMaterializer(
-      parquetRequestedSchema,
-      CatalystReadSupport.expandUDT(catalystRequestedSchema))
-  }
-}
-
-private[parquet] object CatalystReadSupport {
-  val SPARK_ROW_REQUESTED_SCHEMA = 
"org.apache.spark.sql.parquet.row.requested_schema"
-
-  val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
-
-  /**
-   * Tailors `parquetSchema` according to `catalystSchema` by removing column 
paths don't exist
-   * in `catalystSchema`, and adding those only exist in `catalystSchema`.
-   */
-  def clipParquetSchema(parquetSchema: MessageType, catalystSchema: 
StructType): MessageType = {
-    val clippedParquetFields = 
clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
-    if (clippedParquetFields.isEmpty) {
-      CatalystSchemaConverter.EMPTY_MESSAGE
-    } else {
-      Types
-        .buildMessage()
-        .addFields(clippedParquetFields: _*)
-        .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
-    }
-  }
-
-  private def clipParquetType(parquetType: Type, catalystType: DataType): Type 
= {
-    catalystType match {
-      case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
-        // Only clips array types with nested type as element type.
-        clipParquetListType(parquetType.asGroupType(), t.elementType)
-
-      case t: MapType
-        if !isPrimitiveCatalystType(t.keyType) ||
-           !isPrimitiveCatalystType(t.valueType) =>
-        // Only clips map types with nested key type or value type
-        clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType)
-
-      case t: StructType =>
-        clipParquetGroup(parquetType.asGroupType(), t)
-
-      case _ =>
-        // UDTs and primitive types are not clipped.  For UDTs, a clipped 
version might not be able
-        // to be mapped to desired user-space types.  So UDTs shouldn't 
participate schema merging.
-        parquetType
-    }
-  }
-
-  /**
-   * Whether a Catalyst [[DataType]] is primitive.  Primitive [[DataType]] is 
not equivalent to
-   * [[AtomicType]].  For example, [[CalendarIntervalType]] is primitive, but 
it's not an
-   * [[AtomicType]].
-   */
-  private def isPrimitiveCatalystType(dataType: DataType): Boolean = {
-    dataType match {
-      case _: ArrayType | _: MapType | _: StructType => false
-      case _ => true
-    }
-  }
-
-  /**
-   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst 
[[ArrayType]].  The element type
-   * of the [[ArrayType]] should also be a nested type, namely an 
[[ArrayType]], a [[MapType]], or a
-   * [[StructType]].
-   */
-  private def clipParquetListType(parquetList: GroupType, elementType: 
DataType): Type = {
-    // Precondition of this method, should only be called for lists with 
nested element types.
-    assert(!isPrimitiveCatalystType(elementType))
-
-    // Unannotated repeated group should be interpreted as required list of 
required element, so
-    // list element type is just the group itself.  Clip it.
-    if (parquetList.getOriginalType == null && 
parquetList.isRepetition(Repetition.REPEATED)) {
-      clipParquetType(parquetList, elementType)
-    } else {
-      assert(
-        parquetList.getOriginalType == OriginalType.LIST,
-        "Invalid Parquet schema. " +
-          "Original type of annotated Parquet lists must be LIST: " +
-          parquetList.toString)
-
-      assert(
-        parquetList.getFieldCount == 1 && 
parquetList.getType(0).isRepetition(Repetition.REPEATED),
-        "Invalid Parquet schema. " +
-          "LIST-annotated group should only have exactly one repeated field: " 
+
-          parquetList)
-
-      // Precondition of this method, should only be called for lists with 
nested element types.
-      assert(!parquetList.getType(0).isPrimitive)
-
-      val repeatedGroup = parquetList.getType(0).asGroupType()
-
-      // If the repeated field is a group with multiple fields, or the 
repeated field is a group
-      // with one field and is named either "array" or uses the LIST-annotated 
group's name with
-      // "_tuple" appended then the repeated type is the element type and 
elements are required.
-      // Build a new LIST-annotated group with clipped `repeatedGroup` as 
element type and the
-      // only field.
-      if (
-        repeatedGroup.getFieldCount > 1 ||
-        repeatedGroup.getName == "array" ||
-        repeatedGroup.getName == parquetList.getName + "_tuple"
-      ) {
-        Types
-          .buildGroup(parquetList.getRepetition)
-          .as(OriginalType.LIST)
-          .addField(clipParquetType(repeatedGroup, elementType))
-          .named(parquetList.getName)
-      } else {
-        // Otherwise, the repeated field's type is the element type with the 
repeated field's
-        // repetition.
-        Types
-          .buildGroup(parquetList.getRepetition)
-          .as(OriginalType.LIST)
-          .addField(
-            Types
-              .repeatedGroup()
-              .addField(clipParquetType(repeatedGroup.getType(0), elementType))
-              .named(repeatedGroup.getName))
-          .named(parquetList.getName)
-      }
-    }
-  }
-
-  /**
-   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst 
[[MapType]].  Either key type or
-   * value type of the [[MapType]] must be a nested type, namely an 
[[ArrayType]], a [[MapType]], or
-   * a [[StructType]].
-   */
-  private def clipParquetMapType(
-      parquetMap: GroupType, keyType: DataType, valueType: DataType): 
GroupType = {
-    // Precondition of this method, only handles maps with nested key types or 
value types.
-    assert(!isPrimitiveCatalystType(keyType) || 
!isPrimitiveCatalystType(valueType))
-
-    val repeatedGroup = parquetMap.getType(0).asGroupType()
-    val parquetKeyType = repeatedGroup.getType(0)
-    val parquetValueType = repeatedGroup.getType(1)
-
-    val clippedRepeatedGroup =
-      Types
-        .repeatedGroup()
-        .as(repeatedGroup.getOriginalType)
-        .addField(clipParquetType(parquetKeyType, keyType))
-        .addField(clipParquetType(parquetValueType, valueType))
-        .named(repeatedGroup.getName)
-
-    Types
-      .buildGroup(parquetMap.getRepetition)
-      .as(parquetMap.getOriginalType)
-      .addField(clippedRepeatedGroup)
-      .named(parquetMap.getName)
-  }
-
-  /**
-   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst 
[[StructType]].
-   *
-   * @return A clipped [[GroupType]], which has at least one field.
-   * @note Parquet doesn't allow creating empty [[GroupType]] instances except 
for empty
-   *       [[MessageType]].  Because it's legal to construct an empty 
requested schema for column
-   *       pruning.
-   */
-  private def clipParquetGroup(parquetRecord: GroupType, structType: 
StructType): GroupType = {
-    val clippedParquetFields = clipParquetGroupFields(parquetRecord, 
structType)
-    Types
-      .buildGroup(parquetRecord.getRepetition)
-      .as(parquetRecord.getOriginalType)
-      .addFields(clippedParquetFields: _*)
-      .named(parquetRecord.getName)
-  }
-
-  /**
-   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst 
[[StructType]].
-   *
-   * @return A list of clipped [[GroupType]] fields, which can be empty.
-   */
-  private def clipParquetGroupFields(
-      parquetRecord: GroupType, structType: StructType): Seq[Type] = {
-    val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName 
-> f).toMap
-    val toParquet = new CatalystSchemaConverter(writeLegacyParquetFormat = 
false)
-    structType.map { f =>
-      parquetFieldMap
-        .get(f.name)
-        .map(clipParquetType(_, f.dataType))
-        .getOrElse(toParquet.convertField(f))
-    }
-  }
-
-  def expandUDT(schema: StructType): StructType = {
-    def expand(dataType: DataType): DataType = {
-      dataType match {
-        case t: ArrayType =>
-          t.copy(elementType = expand(t.elementType))
-
-        case t: MapType =>
-          t.copy(
-            keyType = expand(t.keyType),
-            valueType = expand(t.valueType))
-
-        case t: StructType =>
-          val expandedFields = t.fields.map(f => f.copy(dataType = 
expand(f.dataType)))
-          t.copy(fields = expandedFields)
-
-        case t: UserDefinedType[_] =>
-          t.sqlType
-
-        case t =>
-          t
-      }
-    }
-
-    expand(schema).asInstanceOf[StructType]
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala
deleted file mode 100644
index eeead9f..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.parquet
-
-import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer}
-import org.apache.parquet.schema.MessageType
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types.StructType
-
-/**
- * A [[RecordMaterializer]] for Catalyst rows.
- *
- * @param parquetSchema Parquet schema of the records to be read
- * @param catalystSchema Catalyst schema of the rows to be constructed
- */
-private[parquet] class CatalystRecordMaterializer(
-    parquetSchema: MessageType, catalystSchema: StructType)
-  extends RecordMaterializer[InternalRow] {
-
-  private val rootConverter = new CatalystRowConverter(parquetSchema, 
catalystSchema, NoopUpdater)
-
-  override def getCurrentRecord: InternalRow = rootConverter.currentRecord
-
-  override def getRootConverter: GroupConverter = rootConverter
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
deleted file mode 100644
index 85b0bc1..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ /dev/null
@@ -1,672 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.parquet
-
-import java.math.{BigDecimal, BigInteger}
-import java.nio.ByteOrder
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.parquet.column.Dictionary
-import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, 
PrimitiveConverter}
-import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type}
-import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8}
-import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, 
DOUBLE, FIXED_LEN_BYTE_ARRAY, INT32, INT64}
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, 
GenericArrayData}
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
-import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
-
-/**
- * A [[ParentContainerUpdater]] is used by a Parquet converter to set 
converted values to some
- * corresponding parent container. For example, a converter for a `StructType` 
field may set
- * converted values to a [[MutableRow]]; or a converter for array elements may 
append converted
- * values to an [[ArrayBuffer]].
- */
-private[parquet] trait ParentContainerUpdater {
-  /** Called before a record field is being converted */
-  def start(): Unit = ()
-
-  /** Called after a record field is being converted */
-  def end(): Unit = ()
-
-  def set(value: Any): Unit = ()
-  def setBoolean(value: Boolean): Unit = set(value)
-  def setByte(value: Byte): Unit = set(value)
-  def setShort(value: Short): Unit = set(value)
-  def setInt(value: Int): Unit = set(value)
-  def setLong(value: Long): Unit = set(value)
-  def setFloat(value: Float): Unit = set(value)
-  def setDouble(value: Double): Unit = set(value)
-}
-
-/** A no-op updater used for root converter (who doesn't have a parent). */
-private[parquet] object NoopUpdater extends ParentContainerUpdater
-
-private[parquet] trait HasParentContainerUpdater {
-  def updater: ParentContainerUpdater
-}
-
-/**
- * A convenient converter class for Parquet group types with a 
[[HasParentContainerUpdater]].
- */
-private[parquet] abstract class CatalystGroupConverter(val updater: 
ParentContainerUpdater)
-  extends GroupConverter with HasParentContainerUpdater
-
-/**
- * Parquet converter for Parquet primitive types.  Note that not all Spark SQL 
atomic types
- * are handled by this converter.  Parquet primitive types are only a subset 
of those of Spark
- * SQL.  For example, BYTE, SHORT, and INT in Spark SQL are all covered by 
INT32 in Parquet.
- */
-private[parquet] class CatalystPrimitiveConverter(val updater: 
ParentContainerUpdater)
-  extends PrimitiveConverter with HasParentContainerUpdater {
-
-  override def addBoolean(value: Boolean): Unit = updater.setBoolean(value)
-  override def addInt(value: Int): Unit = updater.setInt(value)
-  override def addLong(value: Long): Unit = updater.setLong(value)
-  override def addFloat(value: Float): Unit = updater.setFloat(value)
-  override def addDouble(value: Double): Unit = updater.setDouble(value)
-  override def addBinary(value: Binary): Unit = updater.set(value.getBytes)
-}
-
-/**
- * A [[CatalystRowConverter]] is used to convert Parquet records into Catalyst 
[[InternalRow]]s.
- * Since Catalyst `StructType` is also a Parquet record, this converter can be 
used as root
- * converter.  Take the following Parquet type as an example:
- * {{{
- *   message root {
- *     required int32 f1;
- *     optional group f2 {
- *       required double f21;
- *       optional binary f22 (utf8);
- *     }
- *   }
- * }}}
- * 5 converters will be created:
- *
- * - a root [[CatalystRowConverter]] for [[MessageType]] `root`, which 
contains:
- *   - a [[CatalystPrimitiveConverter]] for required [[INT_32]] field `f1`, and
- *   - a nested [[CatalystRowConverter]] for optional [[GroupType]] `f2`, 
which contains:
- *     - a [[CatalystPrimitiveConverter]] for required [[DOUBLE]] field `f21`, 
and
- *     - a [[CatalystStringConverter]] for optional [[UTF8]] string field `f22`
- *
- * When used as a root converter, [[NoopUpdater]] should be used since root 
converters don't have
- * any "parent" container.
- *
- * @param parquetType Parquet schema of Parquet records
- * @param catalystType Spark SQL schema that corresponds to the Parquet record 
type. User-defined
- *        types should have been expanded.
- * @param updater An updater which propagates converted field values to the 
parent container
- */
-private[parquet] class CatalystRowConverter(
-    parquetType: GroupType,
-    catalystType: StructType,
-    updater: ParentContainerUpdater)
-  extends CatalystGroupConverter(updater) with Logging {
-
-  assert(
-    parquetType.getFieldCount == catalystType.length,
-    s"""Field counts of the Parquet schema and the Catalyst schema don't match:
-       |
-       |Parquet schema:
-       |$parquetType
-       |Catalyst schema:
-       |${catalystType.prettyJson}
-     """.stripMargin)
-
-  assert(
-    !catalystType.existsRecursively(_.isInstanceOf[UserDefinedType[_]]),
-    s"""User-defined types in Catalyst schema should have already been 
expanded:
-       |${catalystType.prettyJson}
-     """.stripMargin)
-
-  logDebug(
-    s"""Building row converter for the following schema:
-       |
-       |Parquet form:
-       |$parquetType
-       |Catalyst form:
-       |${catalystType.prettyJson}
-     """.stripMargin)
-
-  /**
-   * Updater used together with field converters within a 
[[CatalystRowConverter]].  It propagates
-   * converted filed values to the `ordinal`-th cell in `currentRow`.
-   */
-  private final class RowUpdater(row: MutableRow, ordinal: Int) extends 
ParentContainerUpdater {
-    override def set(value: Any): Unit = row(ordinal) = value
-    override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, 
value)
-    override def setByte(value: Byte): Unit = row.setByte(ordinal, value)
-    override def setShort(value: Short): Unit = row.setShort(ordinal, value)
-    override def setInt(value: Int): Unit = row.setInt(ordinal, value)
-    override def setLong(value: Long): Unit = row.setLong(ordinal, value)
-    override def setDouble(value: Double): Unit = row.setDouble(ordinal, value)
-    override def setFloat(value: Float): Unit = row.setFloat(ordinal, value)
-  }
-
-  private val currentRow = new SpecificMutableRow(catalystType.map(_.dataType))
-
-  private val unsafeProjection = UnsafeProjection.create(catalystType)
-
-  /**
-   * The [[UnsafeRow]] converted from an entire Parquet record.
-   */
-  def currentRecord: UnsafeRow = unsafeProjection(currentRow)
-
-  // Converters for each field.
-  private val fieldConverters: Array[Converter with HasParentContainerUpdater] 
= {
-    parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map {
-      case ((parquetFieldType, catalystField), ordinal) =>
-        // Converted field value should be set to the `ordinal`-th cell of 
`currentRow`
-        newConverter(parquetFieldType, catalystField.dataType, new 
RowUpdater(currentRow, ordinal))
-    }.toArray
-  }
-
-  override def getConverter(fieldIndex: Int): Converter = 
fieldConverters(fieldIndex)
-
-  override def end(): Unit = {
-    var i = 0
-    while (i < currentRow.numFields) {
-      fieldConverters(i).updater.end()
-      i += 1
-    }
-    updater.set(currentRow)
-  }
-
-  override def start(): Unit = {
-    var i = 0
-    while (i < currentRow.numFields) {
-      fieldConverters(i).updater.start()
-      currentRow.setNullAt(i)
-      i += 1
-    }
-  }
-
-  /**
-   * Creates a converter for the given Parquet type `parquetType` and Spark 
SQL data type
-   * `catalystType`. Converted values are handled by `updater`.
-   */
-  private def newConverter(
-      parquetType: Type,
-      catalystType: DataType,
-      updater: ParentContainerUpdater): Converter with 
HasParentContainerUpdater = {
-
-    catalystType match {
-      case BooleanType | IntegerType | LongType | FloatType | DoubleType | 
BinaryType =>
-        new CatalystPrimitiveConverter(updater)
-
-      case ByteType =>
-        new CatalystPrimitiveConverter(updater) {
-          override def addInt(value: Int): Unit =
-            updater.setByte(value.asInstanceOf[ByteType#InternalType])
-        }
-
-      case ShortType =>
-        new CatalystPrimitiveConverter(updater) {
-          override def addInt(value: Int): Unit =
-            updater.setShort(value.asInstanceOf[ShortType#InternalType])
-        }
-
-      // For INT32 backed decimals
-      case t: DecimalType if 
parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 =>
-        new CatalystIntDictionaryAwareDecimalConverter(t.precision, t.scale, 
updater)
-
-      // For INT64 backed decimals
-      case t: DecimalType if 
parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 =>
-        new CatalystLongDictionaryAwareDecimalConverter(t.precision, t.scale, 
updater)
-
-      // For BINARY and FIXED_LEN_BYTE_ARRAY backed decimals
-      case t: DecimalType
-        if parquetType.asPrimitiveType().getPrimitiveTypeName == 
FIXED_LEN_BYTE_ARRAY ||
-           parquetType.asPrimitiveType().getPrimitiveTypeName == BINARY =>
-        new CatalystBinaryDictionaryAwareDecimalConverter(t.precision, 
t.scale, updater)
-
-      case t: DecimalType =>
-        throw new RuntimeException(
-          s"Unable to create Parquet converter for decimal type ${t.json} 
whose Parquet type is " +
-            s"$parquetType.  Parquet DECIMAL type can only be backed by INT32, 
INT64, " +
-            "FIXED_LEN_BYTE_ARRAY, or BINARY.")
-
-      case StringType =>
-        new CatalystStringConverter(updater)
-
-      case TimestampType =>
-        // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
-        new CatalystPrimitiveConverter(updater) {
-          // Converts nanosecond timestamps stored as INT96
-          override def addBinary(value: Binary): Unit = {
-            assert(
-              value.length() == 12,
-              "Timestamps (with nanoseconds) are expected to be stored in 
12-byte long binaries, " +
-              s"but got a ${value.length()}-byte binary.")
-
-            val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
-            val timeOfDayNanos = buf.getLong
-            val julianDay = buf.getInt
-            updater.setLong(DateTimeUtils.fromJulianDay(julianDay, 
timeOfDayNanos))
-          }
-        }
-
-      case DateType =>
-        new CatalystPrimitiveConverter(updater) {
-          override def addInt(value: Int): Unit = {
-            // DateType is not specialized in `SpecificMutableRow`, have to 
box it here.
-            updater.set(value.asInstanceOf[DateType#InternalType])
-          }
-        }
-
-      // A repeated field that is neither contained by a `LIST`- or 
`MAP`-annotated group nor
-      // annotated by `LIST` or `MAP` should be interpreted as a required list 
of required
-      // elements where the element type is the type of the field.
-      case t: ArrayType if parquetType.getOriginalType != LIST =>
-        if (parquetType.isPrimitive) {
-          new RepeatedPrimitiveConverter(parquetType, t.elementType, updater)
-        } else {
-          new RepeatedGroupConverter(parquetType, t.elementType, updater)
-        }
-
-      case t: ArrayType =>
-        new CatalystArrayConverter(parquetType.asGroupType(), t, updater)
-
-      case t: MapType =>
-        new CatalystMapConverter(parquetType.asGroupType(), t, updater)
-
-      case t: StructType =>
-        new CatalystRowConverter(parquetType.asGroupType(), t, new 
ParentContainerUpdater {
-          override def set(value: Any): Unit = 
updater.set(value.asInstanceOf[InternalRow].copy())
-        })
-
-      case t =>
-        throw new RuntimeException(
-          s"Unable to create Parquet converter for data type ${t.json} " +
-            s"whose Parquet type is $parquetType")
-    }
-  }
-
-  /**
-   * Parquet converter for strings. A dictionary is used to minimize string 
decoding cost.
-   */
-  private final class CatalystStringConverter(updater: ParentContainerUpdater)
-    extends CatalystPrimitiveConverter(updater) {
-
-    private var expandedDictionary: Array[UTF8String] = null
-
-    override def hasDictionarySupport: Boolean = true
-
-    override def setDictionary(dictionary: Dictionary): Unit = {
-      this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { i =>
-        UTF8String.fromBytes(dictionary.decodeToBinary(i).getBytes)
-      }
-    }
-
-    override def addValueFromDictionary(dictionaryId: Int): Unit = {
-      updater.set(expandedDictionary(dictionaryId))
-    }
-
-    override def addBinary(value: Binary): Unit = {
-      // The underlying `ByteBuffer` implementation is guaranteed to be 
`HeapByteBuffer`, so here we
-      // are using `Binary.toByteBuffer.array()` to steal the underlying byte 
array without copying
-      // it.
-      val buffer = value.toByteBuffer
-      val offset = buffer.arrayOffset() + buffer.position()
-      val numBytes = buffer.remaining()
-      updater.set(UTF8String.fromBytes(buffer.array(), offset, numBytes))
-    }
-  }
-
-  /**
-   * Parquet converter for fixed-precision decimals.
-   */
-  private abstract class CatalystDecimalConverter(
-      precision: Int, scale: Int, updater: ParentContainerUpdater)
-    extends CatalystPrimitiveConverter(updater) {
-
-    protected var expandedDictionary: Array[Decimal] = _
-
-    override def hasDictionarySupport: Boolean = true
-
-    override def addValueFromDictionary(dictionaryId: Int): Unit = {
-      updater.set(expandedDictionary(dictionaryId))
-    }
-
-    // Converts decimals stored as INT32
-    override def addInt(value: Int): Unit = {
-      addLong(value: Long)
-    }
-
-    // Converts decimals stored as INT64
-    override def addLong(value: Long): Unit = {
-      updater.set(decimalFromLong(value))
-    }
-
-    // Converts decimals stored as either FIXED_LENGTH_BYTE_ARRAY or BINARY
-    override def addBinary(value: Binary): Unit = {
-      updater.set(decimalFromBinary(value))
-    }
-
-    protected def decimalFromLong(value: Long): Decimal = {
-      Decimal(value, precision, scale)
-    }
-
-    protected def decimalFromBinary(value: Binary): Decimal = {
-      if (precision <= Decimal.MAX_LONG_DIGITS) {
-        // Constructs a `Decimal` with an unscaled `Long` value if possible.
-        val unscaled = CatalystRowConverter.binaryToUnscaledLong(value)
-        Decimal(unscaled, precision, scale)
-      } else {
-        // Otherwise, resorts to an unscaled `BigInteger` instead.
-        Decimal(new BigDecimal(new BigInteger(value.getBytes), scale), 
precision, scale)
-      }
-    }
-  }
-
-  private class CatalystIntDictionaryAwareDecimalConverter(
-      precision: Int, scale: Int, updater: ParentContainerUpdater)
-    extends CatalystDecimalConverter(precision, scale, updater) {
-
-    override def setDictionary(dictionary: Dictionary): Unit = {
-      this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id =>
-        decimalFromLong(dictionary.decodeToInt(id).toLong)
-      }
-    }
-  }
-
-  private class CatalystLongDictionaryAwareDecimalConverter(
-      precision: Int, scale: Int, updater: ParentContainerUpdater)
-    extends CatalystDecimalConverter(precision, scale, updater) {
-
-    override def setDictionary(dictionary: Dictionary): Unit = {
-      this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id =>
-        decimalFromLong(dictionary.decodeToLong(id))
-      }
-    }
-  }
-
-  private class CatalystBinaryDictionaryAwareDecimalConverter(
-      precision: Int, scale: Int, updater: ParentContainerUpdater)
-    extends CatalystDecimalConverter(precision, scale, updater) {
-
-    override def setDictionary(dictionary: Dictionary): Unit = {
-      this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id =>
-        decimalFromBinary(dictionary.decodeToBinary(id))
-      }
-    }
-  }
-
-  /**
-   * Parquet converter for arrays.  Spark SQL arrays are represented as 
Parquet lists.  Standard
-   * Parquet lists are represented as a 3-level group annotated by `LIST`:
-   * {{{
-   *   <list-repetition> group <name> (LIST) {            <-- parquetSchema 
points here
-   *     repeated group list {
-   *       <element-repetition> <element-type> element;
-   *     }
-   *   }
-   * }}}
-   * The `parquetSchema` constructor argument points to the outermost group.
-   *
-   * However, before this representation is standardized, some Parquet 
libraries/tools also use some
-   * non-standard formats to represent list-like structures.  
Backwards-compatibility rules for
-   * handling these cases are described in Parquet format spec.
-   *
-   * @see 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
-   */
-  private final class CatalystArrayConverter(
-      parquetSchema: GroupType,
-      catalystSchema: ArrayType,
-      updater: ParentContainerUpdater)
-    extends CatalystGroupConverter(updater) {
-
-    private var currentArray: ArrayBuffer[Any] = _
-
-    private val elementConverter: Converter = {
-      val repeatedType = parquetSchema.getType(0)
-      val elementType = catalystSchema.elementType
-      val parentName = parquetSchema.getName
-
-      if (isElementType(repeatedType, elementType, parentName)) {
-        newConverter(repeatedType, elementType, new ParentContainerUpdater {
-          override def set(value: Any): Unit = currentArray += value
-        })
-      } else {
-        new ElementConverter(repeatedType.asGroupType().getType(0), 
elementType)
-      }
-    }
-
-    override def getConverter(fieldIndex: Int): Converter = elementConverter
-
-    override def end(): Unit = updater.set(new 
GenericArrayData(currentArray.toArray))
-
-    // NOTE: We can't reuse the mutable `ArrayBuffer` here and must 
instantiate a new buffer for the
-    // next value.  `Row.copy()` only copies row cells, it doesn't do deep 
copy to objects stored
-    // in row cells.
-    override def start(): Unit = currentArray = ArrayBuffer.empty[Any]
-
-    // scalastyle:off
-    /**
-     * Returns whether the given type is the element type of a list or is a 
syntactic group with
-     * one field that is the element type.  This is determined by checking 
whether the type can be
-     * a syntactic group and by checking whether a potential syntactic group 
matches the expected
-     * schema.
-     * {{{
-     *   <list-repetition> group <name> (LIST) {
-     *     repeated group list {                          <-- repeatedType 
points here
-     *       <element-repetition> <element-type> element;
-     *     }
-     *   }
-     * }}}
-     * In short, here we handle Parquet list backwards-compatibility rules on 
the read path.  This
-     * method is based on `AvroIndexedRecordConverter.isElementType`.
-     *
-     * @see 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
-     */
-    // scalastyle:on
-    private def isElementType(
-        parquetRepeatedType: Type, catalystElementType: DataType, parentName: 
String): Boolean = {
-      (parquetRepeatedType, catalystElementType) match {
-        case (t: PrimitiveType, _) => true
-        case (t: GroupType, _) if t.getFieldCount > 1 => true
-        case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == "array" 
=> true
-        case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == 
parentName + "_tuple" => true
-        case (t: GroupType, StructType(Array(f))) if f.name == 
t.getFieldName(0) => true
-        case _ => false
-      }
-    }
-
-    /** Array element converter */
-    private final class ElementConverter(parquetType: Type, catalystType: 
DataType)
-      extends GroupConverter {
-
-      private var currentElement: Any = _
-
-      private val converter = newConverter(parquetType, catalystType, new 
ParentContainerUpdater {
-        override def set(value: Any): Unit = currentElement = value
-      })
-
-      override def getConverter(fieldIndex: Int): Converter = converter
-
-      override def end(): Unit = currentArray += currentElement
-
-      override def start(): Unit = currentElement = null
-    }
-  }
-
-  /** Parquet converter for maps */
-  private final class CatalystMapConverter(
-      parquetType: GroupType,
-      catalystType: MapType,
-      updater: ParentContainerUpdater)
-    extends CatalystGroupConverter(updater) {
-
-    private var currentKeys: ArrayBuffer[Any] = _
-    private var currentValues: ArrayBuffer[Any] = _
-
-    private val keyValueConverter = {
-      val repeatedType = parquetType.getType(0).asGroupType()
-      new KeyValueConverter(
-        repeatedType.getType(0),
-        repeatedType.getType(1),
-        catalystType.keyType,
-        catalystType.valueType)
-    }
-
-    override def getConverter(fieldIndex: Int): Converter = keyValueConverter
-
-    override def end(): Unit =
-      updater.set(ArrayBasedMapData(currentKeys.toArray, 
currentValues.toArray))
-
-    // NOTE: We can't reuse the mutable Map here and must instantiate a new 
`Map` for the next
-    // value.  `Row.copy()` only copies row cells, it doesn't do deep copy to 
objects stored in row
-    // cells.
-    override def start(): Unit = {
-      currentKeys = ArrayBuffer.empty[Any]
-      currentValues = ArrayBuffer.empty[Any]
-    }
-
-    /** Parquet converter for key-value pairs within the map. */
-    private final class KeyValueConverter(
-        parquetKeyType: Type,
-        parquetValueType: Type,
-        catalystKeyType: DataType,
-        catalystValueType: DataType)
-      extends GroupConverter {
-
-      private var currentKey: Any = _
-
-      private var currentValue: Any = _
-
-      private val converters = Array(
-        // Converter for keys
-        newConverter(parquetKeyType, catalystKeyType, new 
ParentContainerUpdater {
-          override def set(value: Any): Unit = currentKey = value
-        }),
-
-        // Converter for values
-        newConverter(parquetValueType, catalystValueType, new 
ParentContainerUpdater {
-          override def set(value: Any): Unit = currentValue = value
-        }))
-
-      override def getConverter(fieldIndex: Int): Converter = 
converters(fieldIndex)
-
-      override def end(): Unit = {
-        currentKeys += currentKey
-        currentValues += currentValue
-      }
-
-      override def start(): Unit = {
-        currentKey = null
-        currentValue = null
-      }
-    }
-  }
-
-  private trait RepeatedConverter {
-    private var currentArray: ArrayBuffer[Any] = _
-
-    protected def newArrayUpdater(updater: ParentContainerUpdater) = new 
ParentContainerUpdater {
-      override def start(): Unit = currentArray = ArrayBuffer.empty[Any]
-      override def end(): Unit = updater.set(new 
GenericArrayData(currentArray.toArray))
-      override def set(value: Any): Unit = currentArray += value
-    }
-  }
-
-  /**
-   * A primitive converter for converting unannotated repeated primitive 
values to required arrays
-   * of required primitives values.
-   */
-  private final class RepeatedPrimitiveConverter(
-      parquetType: Type,
-      catalystType: DataType,
-      parentUpdater: ParentContainerUpdater)
-    extends PrimitiveConverter with RepeatedConverter with 
HasParentContainerUpdater {
-
-    val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater)
-
-    private val elementConverter: PrimitiveConverter =
-      newConverter(parquetType, catalystType, updater).asPrimitiveConverter()
-
-    override def addBoolean(value: Boolean): Unit = 
elementConverter.addBoolean(value)
-    override def addInt(value: Int): Unit = elementConverter.addInt(value)
-    override def addLong(value: Long): Unit = elementConverter.addLong(value)
-    override def addFloat(value: Float): Unit = 
elementConverter.addFloat(value)
-    override def addDouble(value: Double): Unit = 
elementConverter.addDouble(value)
-    override def addBinary(value: Binary): Unit = 
elementConverter.addBinary(value)
-
-    override def setDictionary(dict: Dictionary): Unit = 
elementConverter.setDictionary(dict)
-    override def hasDictionarySupport: Boolean = 
elementConverter.hasDictionarySupport
-    override def addValueFromDictionary(id: Int): Unit = 
elementConverter.addValueFromDictionary(id)
-  }
-
-  /**
-   * A group converter for converting unannotated repeated group values to 
required arrays of
-   * required struct values.
-   */
-  private final class RepeatedGroupConverter(
-      parquetType: Type,
-      catalystType: DataType,
-      parentUpdater: ParentContainerUpdater)
-    extends GroupConverter with HasParentContainerUpdater with 
RepeatedConverter {
-
-    val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater)
-
-    private val elementConverter: GroupConverter =
-      newConverter(parquetType, catalystType, updater).asGroupConverter()
-
-    override def getConverter(field: Int): Converter = 
elementConverter.getConverter(field)
-    override def end(): Unit = elementConverter.end()
-    override def start(): Unit = elementConverter.start()
-  }
-}
-
-private[parquet] object CatalystRowConverter {
-  def binaryToUnscaledLong(binary: Binary): Long = {
-    // The underlying `ByteBuffer` implementation is guaranteed to be 
`HeapByteBuffer`, so here
-    // we are using `Binary.toByteBuffer.array()` to steal the underlying byte 
array without
-    // copying it.
-    val buffer = binary.toByteBuffer
-    val bytes = buffer.array()
-    val start = buffer.arrayOffset() + buffer.position()
-    val end = buffer.arrayOffset() + buffer.limit()
-
-    var unscaled = 0L
-    var i = start
-
-    while (i < end) {
-      unscaled = (unscaled << 8) | (bytes(i) & 0xff)
-      i += 1
-    }
-
-    val bits = 8 * (end - start)
-    unscaled = (unscaled << (64 - bits)) >> (64 - bits)
-    unscaled
-  }
-
-  def binaryToSQLTimestamp(binary: Binary): SQLTimestamp = {
-    assert(binary.length() == 12, s"Timestamps (with nanoseconds) are expected 
to be stored in" +
-      s" 12-byte long binaries. Found a ${binary.length()}-byte binary 
instead.")
-    val buffer = binary.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
-    val timeOfDayNanos = buffer.getLong
-    val julianDay = buffer.getInt
-    DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
deleted file mode 100644
index 3688c3e..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++ /dev/null
@@ -1,595 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.parquet
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.schema._
-import org.apache.parquet.schema.OriginalType._
-import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
-import org.apache.parquet.schema.Type.Repetition._
-
-import org.apache.spark.sql.AnalysisException
-import 
org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.maxPrecisionForBytes
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types._
-
-/**
- * This converter class is used to convert Parquet [[MessageType]] to Spark 
SQL [[StructType]] and
- * vice versa.
- *
- * Parquet format backwards-compatibility rules are respected when converting 
Parquet
- * [[MessageType]] schemas.
- *
- * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
- * @constructor
- * @param assumeBinaryIsString Whether unannotated BINARY fields should be 
assumed to be Spark SQL
- *        [[StringType]] fields when converting Parquet a [[MessageType]] to 
Spark SQL
- *        [[StructType]].  This argument only affects Parquet read path.
- * @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be 
assumed to be Spark SQL
- *        [[TimestampType]] fields when converting Parquet a [[MessageType]] 
to Spark SQL
- *        [[StructType]].  Note that Spark SQL [[TimestampType]] is similar to 
Hive timestamp, which
- *        has optional nanosecond precision, but different from `TIME_MILLS` 
and `TIMESTAMP_MILLIS`
- *        described in Parquet format spec.  This argument only affects 
Parquet read path.
- * @param writeLegacyParquetFormat Whether to use legacy Parquet format 
compatible with Spark 1.4
- *        and prior versions when converting a Catalyst [[StructType]] to a 
Parquet [[MessageType]].
- *        When set to false, use standard format defined in parquet-format 
spec.  This argument only
- *        affects Parquet write path.
- */
-private[parquet] class CatalystSchemaConverter(
-    assumeBinaryIsString: Boolean = 
SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
-    assumeInt96IsTimestamp: Boolean = 
SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
-    writeLegacyParquetFormat: Boolean = 
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get) {
-
-  def this(conf: SQLConf) = this(
-    assumeBinaryIsString = conf.isParquetBinaryAsString,
-    assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
-    writeLegacyParquetFormat = conf.writeLegacyParquetFormat)
-
-  def this(conf: Configuration) = this(
-    assumeBinaryIsString = 
conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
-    assumeInt96IsTimestamp = 
conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
-    writeLegacyParquetFormat = 
conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
-      SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get.toString).toBoolean)
-
-  /**
-   * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL 
[[StructType]].
-   */
-  def convert(parquetSchema: MessageType): StructType = 
convert(parquetSchema.asGroupType())
-
-  private def convert(parquetSchema: GroupType): StructType = {
-    val fields = parquetSchema.getFields.asScala.map { field =>
-      field.getRepetition match {
-        case OPTIONAL =>
-          StructField(field.getName, convertField(field), nullable = true)
-
-        case REQUIRED =>
-          StructField(field.getName, convertField(field), nullable = false)
-
-        case REPEATED =>
-          // A repeated field that is neither contained by a `LIST`- or 
`MAP`-annotated group nor
-          // annotated by `LIST` or `MAP` should be interpreted as a required 
list of required
-          // elements where the element type is the type of the field.
-          val arrayType = ArrayType(convertField(field), containsNull = false)
-          StructField(field.getName, arrayType, nullable = false)
-      }
-    }
-
-    StructType(fields)
-  }
-
-  /**
-   * Converts a Parquet [[Type]] to a Spark SQL [[DataType]].
-   */
-  def convertField(parquetType: Type): DataType = parquetType match {
-    case t: PrimitiveType => convertPrimitiveField(t)
-    case t: GroupType => convertGroupField(t.asGroupType())
-  }
-
-  private def convertPrimitiveField(field: PrimitiveType): DataType = {
-    val typeName = field.getPrimitiveTypeName
-    val originalType = field.getOriginalType
-
-    def typeString =
-      if (originalType == null) s"$typeName" else s"$typeName ($originalType)"
-
-    def typeNotSupported() =
-      throw new AnalysisException(s"Parquet type not supported: $typeString")
-
-    def typeNotImplemented() =
-      throw new AnalysisException(s"Parquet type not yet supported: 
$typeString")
-
-    def illegalType() =
-      throw new AnalysisException(s"Illegal Parquet type: $typeString")
-
-    // When maxPrecision = -1, we skip precision range check, and always 
respect the precision
-    // specified in field.getDecimalMetadata.  This is useful when 
interpreting decimal types stored
-    // as binaries with variable lengths.
-    def makeDecimalType(maxPrecision: Int = -1): DecimalType = {
-      val precision = field.getDecimalMetadata.getPrecision
-      val scale = field.getDecimalMetadata.getScale
-
-      CatalystSchemaConverter.checkConversionRequirement(
-        maxPrecision == -1 || 1 <= precision && precision <= maxPrecision,
-        s"Invalid decimal precision: $typeName cannot store $precision digits 
(max $maxPrecision)")
-
-      DecimalType(precision, scale)
-    }
-
-    typeName match {
-      case BOOLEAN => BooleanType
-
-      case FLOAT => FloatType
-
-      case DOUBLE => DoubleType
-
-      case INT32 =>
-        originalType match {
-          case INT_8 => ByteType
-          case INT_16 => ShortType
-          case INT_32 | null => IntegerType
-          case DATE => DateType
-          case DECIMAL => makeDecimalType(Decimal.MAX_INT_DIGITS)
-          case UINT_8 => typeNotSupported()
-          case UINT_16 => typeNotSupported()
-          case UINT_32 => typeNotSupported()
-          case TIME_MILLIS => typeNotImplemented()
-          case _ => illegalType()
-        }
-
-      case INT64 =>
-        originalType match {
-          case INT_64 | null => LongType
-          case DECIMAL => makeDecimalType(Decimal.MAX_LONG_DIGITS)
-          case UINT_64 => typeNotSupported()
-          case TIMESTAMP_MILLIS => typeNotImplemented()
-          case _ => illegalType()
-        }
-
-      case INT96 =>
-        CatalystSchemaConverter.checkConversionRequirement(
-          assumeInt96IsTimestamp,
-          "INT96 is not supported unless it's interpreted as timestamp. " +
-            s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to 
true.")
-        TimestampType
-
-      case BINARY =>
-        originalType match {
-          case UTF8 | ENUM | JSON => StringType
-          case null if assumeBinaryIsString => StringType
-          case null => BinaryType
-          case BSON => BinaryType
-          case DECIMAL => makeDecimalType()
-          case _ => illegalType()
-        }
-
-      case FIXED_LEN_BYTE_ARRAY =>
-        originalType match {
-          case DECIMAL => 
makeDecimalType(maxPrecisionForBytes(field.getTypeLength))
-          case INTERVAL => typeNotImplemented()
-          case _ => illegalType()
-        }
-
-      case _ => illegalType()
-    }
-  }
-
-  private def convertGroupField(field: GroupType): DataType = {
-    Option(field.getOriginalType).fold(convert(field): DataType) {
-      // A Parquet list is represented as a 3-level structure:
-      //
-      //   <list-repetition> group <name> (LIST) {
-      //     repeated group list {
-      //       <element-repetition> <element-type> element;
-      //     }
-      //   }
-      //
-      // However, according to the most recent Parquet format spec (not 
released yet up until
-      // writing), some 2-level structures are also recognized for 
backwards-compatibility.  Thus,
-      // we need to check whether the 2nd level or the 3rd level refers to 
list element type.
-      //
-      // See: 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
-      case LIST =>
-        CatalystSchemaConverter.checkConversionRequirement(
-          field.getFieldCount == 1, s"Invalid list type $field")
-
-        val repeatedType = field.getType(0)
-        CatalystSchemaConverter.checkConversionRequirement(
-          repeatedType.isRepetition(REPEATED), s"Invalid list type $field")
-
-        if (isElementType(repeatedType, field.getName)) {
-          ArrayType(convertField(repeatedType), containsNull = false)
-        } else {
-          val elementType = repeatedType.asGroupType().getType(0)
-          val optional = elementType.isRepetition(OPTIONAL)
-          ArrayType(convertField(elementType), containsNull = optional)
-        }
-
-      // scalastyle:off
-      // `MAP_KEY_VALUE` is for backwards-compatibility
-      // See: 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1
-      // scalastyle:on
-      case MAP | MAP_KEY_VALUE =>
-        CatalystSchemaConverter.checkConversionRequirement(
-          field.getFieldCount == 1 && !field.getType(0).isPrimitive,
-          s"Invalid map type: $field")
-
-        val keyValueType = field.getType(0).asGroupType()
-        CatalystSchemaConverter.checkConversionRequirement(
-          keyValueType.isRepetition(REPEATED) && keyValueType.getFieldCount == 
2,
-          s"Invalid map type: $field")
-
-        val keyType = keyValueType.getType(0)
-        CatalystSchemaConverter.checkConversionRequirement(
-          keyType.isPrimitive,
-          s"Map key type is expected to be a primitive type, but found: 
$keyType")
-
-        val valueType = keyValueType.getType(1)
-        val valueOptional = valueType.isRepetition(OPTIONAL)
-        MapType(
-          convertField(keyType),
-          convertField(valueType),
-          valueContainsNull = valueOptional)
-
-      case _ =>
-        throw new AnalysisException(s"Unrecognized Parquet type: $field")
-    }
-  }
-
-  // scalastyle:off
-  // Here we implement Parquet LIST backwards-compatibility rules.
-  // See: 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
-  // scalastyle:on
-  private def isElementType(repeatedType: Type, parentName: String): Boolean = 
{
-    {
-      // For legacy 2-level list types with primitive element type, e.g.:
-      //
-      //    // List<Integer> (nullable list, non-null elements)
-      //    optional group my_list (LIST) {
-      //      repeated int32 element;
-      //    }
-      //
-      repeatedType.isPrimitive
-    } || {
-      // For legacy 2-level list types whose element type is a group type with 
2 or more fields,
-      // e.g.:
-      //
-      //    // List<Tuple<String, Integer>> (nullable list, non-null elements)
-      //    optional group my_list (LIST) {
-      //      repeated group element {
-      //        required binary str (UTF8);
-      //        required int32 num;
-      //      };
-      //    }
-      //
-      repeatedType.asGroupType().getFieldCount > 1
-    } || {
-      // For legacy 2-level list types generated by parquet-avro (Parquet 
version < 1.6.0), e.g.:
-      //
-      //    // List<OneTuple<String>> (nullable list, non-null elements)
-      //    optional group my_list (LIST) {
-      //      repeated group array {
-      //        required binary str (UTF8);
-      //      };
-      //    }
-      //
-      repeatedType.getName == "array"
-    } || {
-      // For Parquet data generated by parquet-thrift, e.g.:
-      //
-      //    // List<OneTuple<String>> (nullable list, non-null elements)
-      //    optional group my_list (LIST) {
-      //      repeated group my_list_tuple {
-      //        required binary str (UTF8);
-      //      };
-      //    }
-      //
-      repeatedType.getName == s"${parentName}_tuple"
-    }
-  }
-
-  /**
-   * Converts a Spark SQL [[StructType]] to a Parquet [[MessageType]].
-   */
-  def convert(catalystSchema: StructType): MessageType = {
-    Types
-      .buildMessage()
-      .addFields(catalystSchema.map(convertField): _*)
-      .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
-  }
-
-  /**
-   * Converts a Spark SQL [[StructField]] to a Parquet [[Type]].
-   */
-  def convertField(field: StructField): Type = {
-    convertField(field, if (field.nullable) OPTIONAL else REQUIRED)
-  }
-
-  private def convertField(field: StructField, repetition: Type.Repetition): 
Type = {
-    CatalystSchemaConverter.checkFieldName(field.name)
-
-    field.dataType match {
-      // ===================
-      // Simple atomic types
-      // ===================
-
-      case BooleanType =>
-        Types.primitive(BOOLEAN, repetition).named(field.name)
-
-      case ByteType =>
-        Types.primitive(INT32, repetition).as(INT_8).named(field.name)
-
-      case ShortType =>
-        Types.primitive(INT32, repetition).as(INT_16).named(field.name)
-
-      case IntegerType =>
-        Types.primitive(INT32, repetition).named(field.name)
-
-      case LongType =>
-        Types.primitive(INT64, repetition).named(field.name)
-
-      case FloatType =>
-        Types.primitive(FLOAT, repetition).named(field.name)
-
-      case DoubleType =>
-        Types.primitive(DOUBLE, repetition).named(field.name)
-
-      case StringType =>
-        Types.primitive(BINARY, repetition).as(UTF8).named(field.name)
-
-      case DateType =>
-        Types.primitive(INT32, repetition).as(DATE).named(field.name)
-
-      // NOTE: Spark SQL TimestampType is NOT a well defined type in Parquet 
format spec.
-      //
-      // As stated in PARQUET-323, Parquet `INT96` was originally introduced 
to represent nanosecond
-      // timestamp in Impala for some historical reasons.  It's not 
recommended to be used for any
-      // other types and will probably be deprecated in some future version of 
parquet-format spec.
-      // That's the reason why parquet-format spec only defines 
`TIMESTAMP_MILLIS` and
-      // `TIMESTAMP_MICROS` which are both logical types annotating `INT64`.
-      //
-      // Originally, Spark SQL uses the same nanosecond timestamp type as 
Impala and Hive.  Starting
-      // from Spark 1.5.0, we resort to a timestamp type with 100 ns precision 
so that we can store
-      // a timestamp into a `Long`.  This design decision is subject to change 
though, for example,
-      // we may resort to microsecond precision in the future.
-      //
-      // For Parquet, we plan to write all `TimestampType` value as 
`TIMESTAMP_MICROS`, but it's
-      // currently not implemented yet because parquet-mr 1.7.0 (the version 
we're currently using)
-      // hasn't implemented `TIMESTAMP_MICROS` yet.
-      //
-      // TODO Converts `TIMESTAMP_MICROS` once parquet-mr implements that.
-      case TimestampType =>
-        Types.primitive(INT96, repetition).named(field.name)
-
-      case BinaryType =>
-        Types.primitive(BINARY, repetition).named(field.name)
-
-      // ======================
-      // Decimals (legacy mode)
-      // ======================
-
-      // Spark 1.4.x and prior versions only support decimals with a maximum 
precision of 18 and
-      // always store decimals in fixed-length byte arrays.  To keep 
compatibility with these older
-      // versions, here we convert decimals with all precisions to 
`FIXED_LEN_BYTE_ARRAY` annotated
-      // by `DECIMAL`.
-      case DecimalType.Fixed(precision, scale) if writeLegacyParquetFormat =>
-        Types
-          .primitive(FIXED_LEN_BYTE_ARRAY, repetition)
-          .as(DECIMAL)
-          .precision(precision)
-          .scale(scale)
-          .length(CatalystSchemaConverter.minBytesForPrecision(precision))
-          .named(field.name)
-
-      // ========================
-      // Decimals (standard mode)
-      // ========================
-
-      // Uses INT32 for 1 <= precision <= 9
-      case DecimalType.Fixed(precision, scale)
-          if precision <= Decimal.MAX_INT_DIGITS && !writeLegacyParquetFormat 
=>
-        Types
-          .primitive(INT32, repetition)
-          .as(DECIMAL)
-          .precision(precision)
-          .scale(scale)
-          .named(field.name)
-
-      // Uses INT64 for 1 <= precision <= 18
-      case DecimalType.Fixed(precision, scale)
-          if precision <= Decimal.MAX_LONG_DIGITS && !writeLegacyParquetFormat 
=>
-        Types
-          .primitive(INT64, repetition)
-          .as(DECIMAL)
-          .precision(precision)
-          .scale(scale)
-          .named(field.name)
-
-      // Uses FIXED_LEN_BYTE_ARRAY for all other precisions
-      case DecimalType.Fixed(precision, scale) if !writeLegacyParquetFormat =>
-        Types
-          .primitive(FIXED_LEN_BYTE_ARRAY, repetition)
-          .as(DECIMAL)
-          .precision(precision)
-          .scale(scale)
-          .length(CatalystSchemaConverter.minBytesForPrecision(precision))
-          .named(field.name)
-
-      // ===================================
-      // ArrayType and MapType (legacy mode)
-      // ===================================
-
-      // Spark 1.4.x and prior versions convert `ArrayType` with nullable 
elements into a 3-level
-      // `LIST` structure.  This behavior is somewhat a hybrid of parquet-hive 
and parquet-avro
-      // (1.6.0rc3): the 3-level structure is similar to parquet-hive while 
the 3rd level element
-      // field name "array" is borrowed from parquet-avro.
-      case ArrayType(elementType, nullable @ true) if writeLegacyParquetFormat 
=>
-        // <list-repetition> group <name> (LIST) {
-        //   optional group bag {
-        //     repeated <element-type> array;
-        //   }
-        // }
-        ConversionPatterns.listType(
-          repetition,
-          field.name,
-          Types
-            .buildGroup(REPEATED)
-            // "array_element" is the name chosen by parquet-hive (1.7.0 and 
prior version)
-            .addField(convertField(StructField("array", elementType, 
nullable)))
-            .named("bag"))
-
-      // Spark 1.4.x and prior versions convert ArrayType with non-nullable 
elements into a 2-level
-      // LIST structure.  This behavior mimics parquet-avro (1.6.0rc3).  Note 
that this case is
-      // covered by the backwards-compatibility rules implemented in 
`isElementType()`.
-      case ArrayType(elementType, nullable @ false) if 
writeLegacyParquetFormat =>
-        // <list-repetition> group <name> (LIST) {
-        //   repeated <element-type> element;
-        // }
-        ConversionPatterns.listType(
-          repetition,
-          field.name,
-          // "array" is the name chosen by parquet-avro (1.7.0 and prior 
version)
-          convertField(StructField("array", elementType, nullable), REPEATED))
-
-      // Spark 1.4.x and prior versions convert MapType into a 3-level group 
annotated by
-      // MAP_KEY_VALUE.  This is covered by `convertGroupField(field: 
GroupType): DataType`.
-      case MapType(keyType, valueType, valueContainsNull) if 
writeLegacyParquetFormat =>
-        // <map-repetition> group <name> (MAP) {
-        //   repeated group map (MAP_KEY_VALUE) {
-        //     required <key-type> key;
-        //     <value-repetition> <value-type> value;
-        //   }
-        // }
-        ConversionPatterns.mapType(
-          repetition,
-          field.name,
-          convertField(StructField("key", keyType, nullable = false)),
-          convertField(StructField("value", valueType, valueContainsNull)))
-
-      // =====================================
-      // ArrayType and MapType (standard mode)
-      // =====================================
-
-      case ArrayType(elementType, containsNull) if !writeLegacyParquetFormat =>
-        // <list-repetition> group <name> (LIST) {
-        //   repeated group list {
-        //     <element-repetition> <element-type> element;
-        //   }
-        // }
-        Types
-          .buildGroup(repetition).as(LIST)
-          .addField(
-            Types.repeatedGroup()
-              .addField(convertField(StructField("element", elementType, 
containsNull)))
-              .named("list"))
-          .named(field.name)
-
-      case MapType(keyType, valueType, valueContainsNull) =>
-        // <map-repetition> group <name> (MAP) {
-        //   repeated group key_value {
-        //     required <key-type> key;
-        //     <value-repetition> <value-type> value;
-        //   }
-        // }
-        Types
-          .buildGroup(repetition).as(MAP)
-          .addField(
-            Types
-              .repeatedGroup()
-              .addField(convertField(StructField("key", keyType, nullable = 
false)))
-              .addField(convertField(StructField("value", valueType, 
valueContainsNull)))
-              .named("key_value"))
-          .named(field.name)
-
-      // ===========
-      // Other types
-      // ===========
-
-      case StructType(fields) =>
-        fields.foldLeft(Types.buildGroup(repetition)) { (builder, field) =>
-          builder.addField(convertField(field))
-        }.named(field.name)
-
-      case udt: UserDefinedType[_] =>
-        convertField(field.copy(dataType = udt.sqlType))
-
-      case _ =>
-        throw new AnalysisException(s"Unsupported data type $field.dataType")
-    }
-  }
-}
-
-private[parquet] object CatalystSchemaConverter {
-  val SPARK_PARQUET_SCHEMA_NAME = "spark_schema"
-
-  // !! HACK ALERT !!
-  //
-  // PARQUET-363 & PARQUET-278: parquet-mr 1.8.1 doesn't allow constructing 
empty GroupType,
-  // which prevents us to avoid selecting any columns for queries like `SELECT 
COUNT(*) FROM t`.
-  // This issue has been fixed in parquet-mr 1.8.2-SNAPSHOT.
-  //
-  // To workaround this problem, here we first construct a `MessageType` with 
a single dummy
-  // field, and then remove the field to obtain an empty `MessageType`.
-  //
-  // TODO Reverts this change after upgrading parquet-mr to 1.8.2+
-  val EMPTY_MESSAGE = Types
-      .buildMessage()
-      .required(PrimitiveType.PrimitiveTypeName.INT32).named("dummy")
-      .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
-  EMPTY_MESSAGE.getFields.clear()
-
-  def checkFieldName(name: String): Unit = {
-    // ,;{}()\n\t= and space are special characters in Parquet schema
-    checkConversionRequirement(
-      !name.matches(".*[ ,;{}()\n\t=].*"),
-      s"""Attribute name "$name" contains invalid character(s) among " 
,;{}()\\n\\t=".
-         |Please use alias to rename it.
-       """.stripMargin.split("\n").mkString(" ").trim)
-  }
-
-  def checkFieldNames(schema: StructType): StructType = {
-    schema.fieldNames.foreach(checkFieldName)
-    schema
-  }
-
-  def checkConversionRequirement(f: => Boolean, message: String): Unit = {
-    if (!f) {
-      throw new AnalysisException(message)
-    }
-  }
-
-  private def computeMinBytesForPrecision(precision : Int) : Int = {
-    var numBytes = 1
-    while (math.pow(2.0, 8 * numBytes - 1) < math.pow(10.0, precision)) {
-      numBytes += 1
-    }
-    numBytes
-  }
-
-  // Returns the minimum number of bytes needed to store a decimal with a 
given `precision`.
-  val minBytesForPrecision = 
Array.tabulate[Int](39)(computeMinBytesForPrecision)
-
-  // Max precision of a decimal value stored in `numBytes` bytes
-  def maxPrecisionForBytes(numBytes: Int): Int = {
-    Math.round(                               // convert double to long
-      Math.floor(Math.log10(                  // number of base-10 digits
-        Math.pow(2, 8 * numBytes - 1) - 1)))  // max value stored in numBytes
-      .asInstanceOf[Int]
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
index 67bfd39..cf974af 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
@@ -33,7 +33,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import 
org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.minBytesForPrecision
+import 
org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter.minBytesForPrecision
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
@@ -82,8 +82,8 @@ private[parquet] class CatalystWriteSupport extends 
WriteSupport[InternalRow] wi
     }
     this.rootFieldWriters = schema.map(_.dataType).map(makeWriter)
 
-    val messageType = new 
CatalystSchemaConverter(configuration).convert(schema)
-    val metadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> 
schemaString).asJava
+    val messageType = new ParquetSchemaConverter(configuration).convert(schema)
+    val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> 
schemaString).asJava
 
     logInfo(
       s"""Initialized Parquet WriteSupport with Catalyst schema:
@@ -427,7 +427,7 @@ private[parquet] object CatalystWriteSupport {
   val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes"
 
   def setSchema(schema: StructType, configuration: Configuration): Unit = {
-    schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName)
+    schema.map(_.name).foreach(ParquetSchemaConverter.checkFieldName)
     configuration.set(SPARK_ROW_SCHEMA, schema.json)
     configuration.setIfUnset(
       ParquetOutputFormat.WRITER_VERSION,

http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 6b25e36..2cce3db 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -288,13 +288,13 @@ private[sql] class ParquetFileFormat
       filters: Seq[Filter],
       options: Map[String, String],
       hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
-    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[CatalystReadSupport].getName)
+    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[ParquetReadSupport].getName)
     hadoopConf.set(
-      CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
-      CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
+      ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+      ParquetSchemaConverter.checkFieldNames(requiredSchema).json)
     hadoopConf.set(
       CatalystWriteSupport.SPARK_ROW_SCHEMA,
-      CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
+      ParquetSchemaConverter.checkFieldNames(requiredSchema).json)
 
     // We want to clear this temporary metadata from saving into Parquet file.
     // This metadata is only useful for detecting optional columns when 
pushdowning filters.
@@ -369,10 +369,10 @@ private[sql] class ParquetFileFormat
         val reader = pushed match {
           case Some(filter) =>
             new ParquetRecordReader[InternalRow](
-              new CatalystReadSupport,
+              new ParquetReadSupport,
               FilterCompat.get(filter, null))
           case _ =>
-            new ParquetRecordReader[InternalRow](new CatalystReadSupport)
+            new ParquetRecordReader[InternalRow](new ParquetReadSupport)
         }
         reader.initialize(split, hadoopAttemptContext)
         reader
@@ -590,7 +590,7 @@ private[sql] object ParquetFileFormat extends Logging {
       assumeBinaryIsString: Boolean,
       assumeInt96IsTimestamp: Boolean)(job: Job): Unit = {
     val conf = job.getConfiguration
-    conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[CatalystReadSupport].getName)
+    conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[ParquetReadSupport].getName)
 
     // Try to push down filters when filter push-down is enabled.
     if (parquetFilterPushDown) {
@@ -603,14 +603,14 @@ private[sql] object ParquetFileFormat extends Logging {
         .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
     }
 
-    conf.set(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
+    conf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
       val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
-      CatalystSchemaConverter.checkFieldNames(requestedSchema).json
+      ParquetSchemaConverter.checkFieldNames(requestedSchema).json
     })
 
     conf.set(
       CatalystWriteSupport.SPARK_ROW_SCHEMA,
-      CatalystSchemaConverter.checkFieldNames(dataSchema).json)
+      ParquetSchemaConverter.checkFieldNames(dataSchema).json)
 
     // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet 
and FS metadata
     conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)
@@ -639,7 +639,7 @@ private[sql] object ParquetFileFormat extends Logging {
       footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = {
 
     def parseParquetSchema(schema: MessageType): StructType = {
-      val converter = new CatalystSchemaConverter(
+      val converter = new ParquetSchemaConverter(
         sparkSession.sessionState.conf.isParquetBinaryAsString,
         sparkSession.sessionState.conf.isParquetBinaryAsString,
         sparkSession.sessionState.conf.writeLegacyParquetFormat)
@@ -653,7 +653,7 @@ private[sql] object ParquetFileFormat extends Logging {
       val serializedSchema = metadata
         .getKeyValueMetaData
         .asScala.toMap
-        .get(CatalystReadSupport.SPARK_METADATA_KEY)
+        .get(ParquetReadSupport.SPARK_METADATA_KEY)
       if (serializedSchema.isEmpty) {
         // Falls back to Parquet schema if no Spark SQL schema found.
         Some(parseParquetSchema(metadata.getSchema))
@@ -820,7 +820,7 @@ private[sql] object ParquetFileFormat extends Logging {
 
           // Converter used to convert Parquet `MessageType` to Spark SQL 
`StructType`
           val converter =
-            new CatalystSchemaConverter(
+            new ParquetSchemaConverter(
               assumeBinaryIsString = assumeBinaryIsString,
               assumeInt96IsTimestamp = assumeInt96IsTimestamp,
               writeLegacyParquetFormat = writeLegacyParquetFormat)
@@ -864,12 +864,12 @@ private[sql] object ParquetFileFormat extends Logging {
    * a [[StructType]] converted from the [[MessageType]] stored in this footer.
    */
   def readSchemaFromFooter(
-      footer: Footer, converter: CatalystSchemaConverter): StructType = {
+      footer: Footer, converter: ParquetSchemaConverter): StructType = {
     val fileMetaData = footer.getParquetMetadata.getFileMetaData
     fileMetaData
       .getKeyValueMetaData
       .asScala.toMap
-      .get(CatalystReadSupport.SPARK_METADATA_KEY)
+      .get(ParquetReadSupport.SPARK_METADATA_KEY)
       .flatMap(deserializeSchemaString)
       .getOrElse(converter.convert(fileMetaData.getSchema))
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to