Repository: spark
Updated Branches:
  refs/heads/branch-2.0 35c0a60a6 -> 52cb1ad38


http://git-wip-us.apache.org/repos/asf/spark/blob/52cb1ad3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
new file mode 100644
index 0000000..1ac083f
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.schema._
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
+import org.apache.parquet.schema.Type.Repetition._
+
+import org.apache.spark.sql.AnalysisException
+import 
org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter.maxPrecisionForBytes
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+/**
+ * This converter class is used to convert Parquet [[MessageType]] to Spark 
SQL [[StructType]] and
+ * vice versa.
+ *
+ * Parquet format backwards-compatibility rules are respected when converting 
Parquet
+ * [[MessageType]] schemas.
+ *
+ * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
+ * @constructor
+ * @param assumeBinaryIsString Whether unannotated BINARY fields should be 
assumed to be Spark SQL
+ *        [[StringType]] fields when converting Parquet a [[MessageType]] to 
Spark SQL
+ *        [[StructType]].  This argument only affects Parquet read path.
+ * @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be 
assumed to be Spark SQL
+ *        [[TimestampType]] fields when converting Parquet a [[MessageType]] 
to Spark SQL
+ *        [[StructType]].  Note that Spark SQL [[TimestampType]] is similar to 
Hive timestamp, which
+ *        has optional nanosecond precision, but different from `TIME_MILLS` 
and `TIMESTAMP_MILLIS`
+ *        described in Parquet format spec.  This argument only affects 
Parquet read path.
+ * @param writeLegacyParquetFormat Whether to use legacy Parquet format 
compatible with Spark 1.4
+ *        and prior versions when converting a Catalyst [[StructType]] to a 
Parquet [[MessageType]].
+ *        When set to false, use standard format defined in parquet-format 
spec.  This argument only
+ *        affects Parquet write path.
+ */
+private[parquet] class ParquetSchemaConverter(
+    assumeBinaryIsString: Boolean = 
SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
+    assumeInt96IsTimestamp: Boolean = 
SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
+    writeLegacyParquetFormat: Boolean = 
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get) {
+
+  def this(conf: SQLConf) = this(
+    assumeBinaryIsString = conf.isParquetBinaryAsString,
+    assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
+    writeLegacyParquetFormat = conf.writeLegacyParquetFormat)
+
+  def this(conf: Configuration) = this(
+    assumeBinaryIsString = 
conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
+    assumeInt96IsTimestamp = 
conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
+    writeLegacyParquetFormat = 
conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
+      SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get.toString).toBoolean)
+
+  /**
+   * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL 
[[StructType]].
+   */
+  def convert(parquetSchema: MessageType): StructType = 
convert(parquetSchema.asGroupType())
+
+  private def convert(parquetSchema: GroupType): StructType = {
+    val fields = parquetSchema.getFields.asScala.map { field =>
+      field.getRepetition match {
+        case OPTIONAL =>
+          StructField(field.getName, convertField(field), nullable = true)
+
+        case REQUIRED =>
+          StructField(field.getName, convertField(field), nullable = false)
+
+        case REPEATED =>
+          // A repeated field that is neither contained by a `LIST`- or 
`MAP`-annotated group nor
+          // annotated by `LIST` or `MAP` should be interpreted as a required 
list of required
+          // elements where the element type is the type of the field.
+          val arrayType = ArrayType(convertField(field), containsNull = false)
+          StructField(field.getName, arrayType, nullable = false)
+      }
+    }
+
+    StructType(fields)
+  }
+
+  /**
+   * Converts a Parquet [[Type]] to a Spark SQL [[DataType]].
+   */
+  def convertField(parquetType: Type): DataType = parquetType match {
+    case t: PrimitiveType => convertPrimitiveField(t)
+    case t: GroupType => convertGroupField(t.asGroupType())
+  }
+
+  private def convertPrimitiveField(field: PrimitiveType): DataType = {
+    val typeName = field.getPrimitiveTypeName
+    val originalType = field.getOriginalType
+
+    def typeString =
+      if (originalType == null) s"$typeName" else s"$typeName ($originalType)"
+
+    def typeNotSupported() =
+      throw new AnalysisException(s"Parquet type not supported: $typeString")
+
+    def typeNotImplemented() =
+      throw new AnalysisException(s"Parquet type not yet supported: 
$typeString")
+
+    def illegalType() =
+      throw new AnalysisException(s"Illegal Parquet type: $typeString")
+
+    // When maxPrecision = -1, we skip precision range check, and always 
respect the precision
+    // specified in field.getDecimalMetadata.  This is useful when 
interpreting decimal types stored
+    // as binaries with variable lengths.
+    def makeDecimalType(maxPrecision: Int = -1): DecimalType = {
+      val precision = field.getDecimalMetadata.getPrecision
+      val scale = field.getDecimalMetadata.getScale
+
+      ParquetSchemaConverter.checkConversionRequirement(
+        maxPrecision == -1 || 1 <= precision && precision <= maxPrecision,
+        s"Invalid decimal precision: $typeName cannot store $precision digits 
(max $maxPrecision)")
+
+      DecimalType(precision, scale)
+    }
+
+    typeName match {
+      case BOOLEAN => BooleanType
+
+      case FLOAT => FloatType
+
+      case DOUBLE => DoubleType
+
+      case INT32 =>
+        originalType match {
+          case INT_8 => ByteType
+          case INT_16 => ShortType
+          case INT_32 | null => IntegerType
+          case DATE => DateType
+          case DECIMAL => makeDecimalType(Decimal.MAX_INT_DIGITS)
+          case UINT_8 => typeNotSupported()
+          case UINT_16 => typeNotSupported()
+          case UINT_32 => typeNotSupported()
+          case TIME_MILLIS => typeNotImplemented()
+          case _ => illegalType()
+        }
+
+      case INT64 =>
+        originalType match {
+          case INT_64 | null => LongType
+          case DECIMAL => makeDecimalType(Decimal.MAX_LONG_DIGITS)
+          case UINT_64 => typeNotSupported()
+          case TIMESTAMP_MILLIS => typeNotImplemented()
+          case _ => illegalType()
+        }
+
+      case INT96 =>
+        ParquetSchemaConverter.checkConversionRequirement(
+          assumeInt96IsTimestamp,
+          "INT96 is not supported unless it's interpreted as timestamp. " +
+            s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to 
true.")
+        TimestampType
+
+      case BINARY =>
+        originalType match {
+          case UTF8 | ENUM | JSON => StringType
+          case null if assumeBinaryIsString => StringType
+          case null => BinaryType
+          case BSON => BinaryType
+          case DECIMAL => makeDecimalType()
+          case _ => illegalType()
+        }
+
+      case FIXED_LEN_BYTE_ARRAY =>
+        originalType match {
+          case DECIMAL => 
makeDecimalType(maxPrecisionForBytes(field.getTypeLength))
+          case INTERVAL => typeNotImplemented()
+          case _ => illegalType()
+        }
+
+      case _ => illegalType()
+    }
+  }
+
+  private def convertGroupField(field: GroupType): DataType = {
+    Option(field.getOriginalType).fold(convert(field): DataType) {
+      // A Parquet list is represented as a 3-level structure:
+      //
+      //   <list-repetition> group <name> (LIST) {
+      //     repeated group list {
+      //       <element-repetition> <element-type> element;
+      //     }
+      //   }
+      //
+      // However, according to the most recent Parquet format spec (not 
released yet up until
+      // writing), some 2-level structures are also recognized for 
backwards-compatibility.  Thus,
+      // we need to check whether the 2nd level or the 3rd level refers to 
list element type.
+      //
+      // See: 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
+      case LIST =>
+        ParquetSchemaConverter.checkConversionRequirement(
+          field.getFieldCount == 1, s"Invalid list type $field")
+
+        val repeatedType = field.getType(0)
+        ParquetSchemaConverter.checkConversionRequirement(
+          repeatedType.isRepetition(REPEATED), s"Invalid list type $field")
+
+        if (isElementType(repeatedType, field.getName)) {
+          ArrayType(convertField(repeatedType), containsNull = false)
+        } else {
+          val elementType = repeatedType.asGroupType().getType(0)
+          val optional = elementType.isRepetition(OPTIONAL)
+          ArrayType(convertField(elementType), containsNull = optional)
+        }
+
+      // scalastyle:off
+      // `MAP_KEY_VALUE` is for backwards-compatibility
+      // See: 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1
+      // scalastyle:on
+      case MAP | MAP_KEY_VALUE =>
+        ParquetSchemaConverter.checkConversionRequirement(
+          field.getFieldCount == 1 && !field.getType(0).isPrimitive,
+          s"Invalid map type: $field")
+
+        val keyValueType = field.getType(0).asGroupType()
+        ParquetSchemaConverter.checkConversionRequirement(
+          keyValueType.isRepetition(REPEATED) && keyValueType.getFieldCount == 
2,
+          s"Invalid map type: $field")
+
+        val keyType = keyValueType.getType(0)
+        ParquetSchemaConverter.checkConversionRequirement(
+          keyType.isPrimitive,
+          s"Map key type is expected to be a primitive type, but found: 
$keyType")
+
+        val valueType = keyValueType.getType(1)
+        val valueOptional = valueType.isRepetition(OPTIONAL)
+        MapType(
+          convertField(keyType),
+          convertField(valueType),
+          valueContainsNull = valueOptional)
+
+      case _ =>
+        throw new AnalysisException(s"Unrecognized Parquet type: $field")
+    }
+  }
+
+  // scalastyle:off
+  // Here we implement Parquet LIST backwards-compatibility rules.
+  // See: 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
+  // scalastyle:on
+  private def isElementType(repeatedType: Type, parentName: String): Boolean = 
{
+    {
+      // For legacy 2-level list types with primitive element type, e.g.:
+      //
+      //    // List<Integer> (nullable list, non-null elements)
+      //    optional group my_list (LIST) {
+      //      repeated int32 element;
+      //    }
+      //
+      repeatedType.isPrimitive
+    } || {
+      // For legacy 2-level list types whose element type is a group type with 
2 or more fields,
+      // e.g.:
+      //
+      //    // List<Tuple<String, Integer>> (nullable list, non-null elements)
+      //    optional group my_list (LIST) {
+      //      repeated group element {
+      //        required binary str (UTF8);
+      //        required int32 num;
+      //      };
+      //    }
+      //
+      repeatedType.asGroupType().getFieldCount > 1
+    } || {
+      // For legacy 2-level list types generated by parquet-avro (Parquet 
version < 1.6.0), e.g.:
+      //
+      //    // List<OneTuple<String>> (nullable list, non-null elements)
+      //    optional group my_list (LIST) {
+      //      repeated group array {
+      //        required binary str (UTF8);
+      //      };
+      //    }
+      //
+      repeatedType.getName == "array"
+    } || {
+      // For Parquet data generated by parquet-thrift, e.g.:
+      //
+      //    // List<OneTuple<String>> (nullable list, non-null elements)
+      //    optional group my_list (LIST) {
+      //      repeated group my_list_tuple {
+      //        required binary str (UTF8);
+      //      };
+      //    }
+      //
+      repeatedType.getName == s"${parentName}_tuple"
+    }
+  }
+
+  /**
+   * Converts a Spark SQL [[StructType]] to a Parquet [[MessageType]].
+   */
+  def convert(catalystSchema: StructType): MessageType = {
+    Types
+      .buildMessage()
+      .addFields(catalystSchema.map(convertField): _*)
+      .named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
+  }
+
+  /**
+   * Converts a Spark SQL [[StructField]] to a Parquet [[Type]].
+   */
+  def convertField(field: StructField): Type = {
+    convertField(field, if (field.nullable) OPTIONAL else REQUIRED)
+  }
+
+  private def convertField(field: StructField, repetition: Type.Repetition): 
Type = {
+    ParquetSchemaConverter.checkFieldName(field.name)
+
+    field.dataType match {
+      // ===================
+      // Simple atomic types
+      // ===================
+
+      case BooleanType =>
+        Types.primitive(BOOLEAN, repetition).named(field.name)
+
+      case ByteType =>
+        Types.primitive(INT32, repetition).as(INT_8).named(field.name)
+
+      case ShortType =>
+        Types.primitive(INT32, repetition).as(INT_16).named(field.name)
+
+      case IntegerType =>
+        Types.primitive(INT32, repetition).named(field.name)
+
+      case LongType =>
+        Types.primitive(INT64, repetition).named(field.name)
+
+      case FloatType =>
+        Types.primitive(FLOAT, repetition).named(field.name)
+
+      case DoubleType =>
+        Types.primitive(DOUBLE, repetition).named(field.name)
+
+      case StringType =>
+        Types.primitive(BINARY, repetition).as(UTF8).named(field.name)
+
+      case DateType =>
+        Types.primitive(INT32, repetition).as(DATE).named(field.name)
+
+      // NOTE: Spark SQL TimestampType is NOT a well defined type in Parquet 
format spec.
+      //
+      // As stated in PARQUET-323, Parquet `INT96` was originally introduced 
to represent nanosecond
+      // timestamp in Impala for some historical reasons.  It's not 
recommended to be used for any
+      // other types and will probably be deprecated in some future version of 
parquet-format spec.
+      // That's the reason why parquet-format spec only defines 
`TIMESTAMP_MILLIS` and
+      // `TIMESTAMP_MICROS` which are both logical types annotating `INT64`.
+      //
+      // Originally, Spark SQL uses the same nanosecond timestamp type as 
Impala and Hive.  Starting
+      // from Spark 1.5.0, we resort to a timestamp type with 100 ns precision 
so that we can store
+      // a timestamp into a `Long`.  This design decision is subject to change 
though, for example,
+      // we may resort to microsecond precision in the future.
+      //
+      // For Parquet, we plan to write all `TimestampType` value as 
`TIMESTAMP_MICROS`, but it's
+      // currently not implemented yet because parquet-mr 1.7.0 (the version 
we're currently using)
+      // hasn't implemented `TIMESTAMP_MICROS` yet.
+      //
+      // TODO Converts `TIMESTAMP_MICROS` once parquet-mr implements that.
+      case TimestampType =>
+        Types.primitive(INT96, repetition).named(field.name)
+
+      case BinaryType =>
+        Types.primitive(BINARY, repetition).named(field.name)
+
+      // ======================
+      // Decimals (legacy mode)
+      // ======================
+
+      // Spark 1.4.x and prior versions only support decimals with a maximum 
precision of 18 and
+      // always store decimals in fixed-length byte arrays.  To keep 
compatibility with these older
+      // versions, here we convert decimals with all precisions to 
`FIXED_LEN_BYTE_ARRAY` annotated
+      // by `DECIMAL`.
+      case DecimalType.Fixed(precision, scale) if writeLegacyParquetFormat =>
+        Types
+          .primitive(FIXED_LEN_BYTE_ARRAY, repetition)
+          .as(DECIMAL)
+          .precision(precision)
+          .scale(scale)
+          .length(ParquetSchemaConverter.minBytesForPrecision(precision))
+          .named(field.name)
+
+      // ========================
+      // Decimals (standard mode)
+      // ========================
+
+      // Uses INT32 for 1 <= precision <= 9
+      case DecimalType.Fixed(precision, scale)
+          if precision <= Decimal.MAX_INT_DIGITS && !writeLegacyParquetFormat 
=>
+        Types
+          .primitive(INT32, repetition)
+          .as(DECIMAL)
+          .precision(precision)
+          .scale(scale)
+          .named(field.name)
+
+      // Uses INT64 for 1 <= precision <= 18
+      case DecimalType.Fixed(precision, scale)
+          if precision <= Decimal.MAX_LONG_DIGITS && !writeLegacyParquetFormat 
=>
+        Types
+          .primitive(INT64, repetition)
+          .as(DECIMAL)
+          .precision(precision)
+          .scale(scale)
+          .named(field.name)
+
+      // Uses FIXED_LEN_BYTE_ARRAY for all other precisions
+      case DecimalType.Fixed(precision, scale) if !writeLegacyParquetFormat =>
+        Types
+          .primitive(FIXED_LEN_BYTE_ARRAY, repetition)
+          .as(DECIMAL)
+          .precision(precision)
+          .scale(scale)
+          .length(ParquetSchemaConverter.minBytesForPrecision(precision))
+          .named(field.name)
+
+      // ===================================
+      // ArrayType and MapType (legacy mode)
+      // ===================================
+
+      // Spark 1.4.x and prior versions convert `ArrayType` with nullable 
elements into a 3-level
+      // `LIST` structure.  This behavior is somewhat a hybrid of parquet-hive 
and parquet-avro
+      // (1.6.0rc3): the 3-level structure is similar to parquet-hive while 
the 3rd level element
+      // field name "array" is borrowed from parquet-avro.
+      case ArrayType(elementType, nullable @ true) if writeLegacyParquetFormat 
=>
+        // <list-repetition> group <name> (LIST) {
+        //   optional group bag {
+        //     repeated <element-type> array;
+        //   }
+        // }
+        ConversionPatterns.listType(
+          repetition,
+          field.name,
+          Types
+            .buildGroup(REPEATED)
+            // "array_element" is the name chosen by parquet-hive (1.7.0 and 
prior version)
+            .addField(convertField(StructField("array", elementType, 
nullable)))
+            .named("bag"))
+
+      // Spark 1.4.x and prior versions convert ArrayType with non-nullable 
elements into a 2-level
+      // LIST structure.  This behavior mimics parquet-avro (1.6.0rc3).  Note 
that this case is
+      // covered by the backwards-compatibility rules implemented in 
`isElementType()`.
+      case ArrayType(elementType, nullable @ false) if 
writeLegacyParquetFormat =>
+        // <list-repetition> group <name> (LIST) {
+        //   repeated <element-type> element;
+        // }
+        ConversionPatterns.listType(
+          repetition,
+          field.name,
+          // "array" is the name chosen by parquet-avro (1.7.0 and prior 
version)
+          convertField(StructField("array", elementType, nullable), REPEATED))
+
+      // Spark 1.4.x and prior versions convert MapType into a 3-level group 
annotated by
+      // MAP_KEY_VALUE.  This is covered by `convertGroupField(field: 
GroupType): DataType`.
+      case MapType(keyType, valueType, valueContainsNull) if 
writeLegacyParquetFormat =>
+        // <map-repetition> group <name> (MAP) {
+        //   repeated group map (MAP_KEY_VALUE) {
+        //     required <key-type> key;
+        //     <value-repetition> <value-type> value;
+        //   }
+        // }
+        ConversionPatterns.mapType(
+          repetition,
+          field.name,
+          convertField(StructField("key", keyType, nullable = false)),
+          convertField(StructField("value", valueType, valueContainsNull)))
+
+      // =====================================
+      // ArrayType and MapType (standard mode)
+      // =====================================
+
+      case ArrayType(elementType, containsNull) if !writeLegacyParquetFormat =>
+        // <list-repetition> group <name> (LIST) {
+        //   repeated group list {
+        //     <element-repetition> <element-type> element;
+        //   }
+        // }
+        Types
+          .buildGroup(repetition).as(LIST)
+          .addField(
+            Types.repeatedGroup()
+              .addField(convertField(StructField("element", elementType, 
containsNull)))
+              .named("list"))
+          .named(field.name)
+
+      case MapType(keyType, valueType, valueContainsNull) =>
+        // <map-repetition> group <name> (MAP) {
+        //   repeated group key_value {
+        //     required <key-type> key;
+        //     <value-repetition> <value-type> value;
+        //   }
+        // }
+        Types
+          .buildGroup(repetition).as(MAP)
+          .addField(
+            Types
+              .repeatedGroup()
+              .addField(convertField(StructField("key", keyType, nullable = 
false)))
+              .addField(convertField(StructField("value", valueType, 
valueContainsNull)))
+              .named("key_value"))
+          .named(field.name)
+
+      // ===========
+      // Other types
+      // ===========
+
+      case StructType(fields) =>
+        fields.foldLeft(Types.buildGroup(repetition)) { (builder, field) =>
+          builder.addField(convertField(field))
+        }.named(field.name)
+
+      case udt: UserDefinedType[_] =>
+        convertField(field.copy(dataType = udt.sqlType))
+
+      case _ =>
+        throw new AnalysisException(s"Unsupported data type $field.dataType")
+    }
+  }
+}
+
+private[parquet] object ParquetSchemaConverter {
+  val SPARK_PARQUET_SCHEMA_NAME = "spark_schema"
+
+  def checkFieldName(name: String): Unit = {
+    // ,;{}()\n\t= and space are special characters in Parquet schema
+    checkConversionRequirement(
+      !name.matches(".*[ ,;{}()\n\t=].*"),
+      s"""Attribute name "$name" contains invalid character(s) among " 
,;{}()\\n\\t=".
+         |Please use alias to rename it.
+       """.stripMargin.split("\n").mkString(" ").trim)
+  }
+
+  def checkFieldNames(schema: StructType): StructType = {
+    schema.fieldNames.foreach(checkFieldName)
+    schema
+  }
+
+  def checkConversionRequirement(f: => Boolean, message: String): Unit = {
+    if (!f) {
+      throw new AnalysisException(message)
+    }
+  }
+
+  private def computeMinBytesForPrecision(precision : Int) : Int = {
+    var numBytes = 1
+    while (math.pow(2.0, 8 * numBytes - 1) < math.pow(10.0, precision)) {
+      numBytes += 1
+    }
+    numBytes
+  }
+
+  // Returns the minimum number of bytes needed to store a decimal with a 
given `precision`.
+  val minBytesForPrecision = 
Array.tabulate[Int](39)(computeMinBytesForPrecision)
+
+  // Max precision of a decimal value stored in `numBytes` bytes
+  def maxPrecisionForBytes(numBytes: Int): Int = {
+    Math.round(                               // convert double to long
+      Math.floor(Math.log10(                  // number of base-10 digits
+        Math.pow(2, 8 * numBytes - 1) - 1)))  // max value stored in numBytes
+      .asInstanceOf[Int]
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/52cb1ad3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
new file mode 100644
index 0000000..307c64d
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.nio.{ByteBuffer, ByteOrder}
+import java.util
+
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.column.ParquetProperties
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
+import org.apache.parquet.io.api.{Binary, RecordConsumer}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import 
org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter.minBytesForPrecision
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+/**
+ * A Parquet [[WriteSupport]] implementation that writes Catalyst 
[[InternalRow]]s as Parquet
+ * messages.  This class can write Parquet data in two modes:
+ *
+ *  - Standard mode: Parquet data are written in standard format defined in 
parquet-format spec.
+ *  - Legacy mode: Parquet data are written in legacy format compatible with 
Spark 1.4 and prior.
+ *
+ * This behavior can be controlled by SQL option 
`spark.sql.parquet.writeLegacyFormat`.  The value
+ * of this option is propagated to this class by the `init()` method and its 
Hadoop configuration
+ * argument.
+ */
+private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] 
with Logging {
+  // A `ValueWriter` is responsible for writing a field of an `InternalRow` to 
the record consumer.
+  // Here we are using `SpecializedGetters` rather than `InternalRow` so that 
we can directly access
+  // data in `ArrayData` without the help of `SpecificMutableRow`.
+  private type ValueWriter = (SpecializedGetters, Int) => Unit
+
+  // Schema of the `InternalRow`s to be written
+  private var schema: StructType = _
+
+  // `ValueWriter`s for all fields of the schema
+  private var rootFieldWriters: Seq[ValueWriter] = _
+
+  // The Parquet `RecordConsumer` to which all `InternalRow`s are written
+  private var recordConsumer: RecordConsumer = _
+
+  // Whether to write data in legacy Parquet format compatible with Spark 1.4 
and prior versions
+  private var writeLegacyParquetFormat: Boolean = _
+
+  // Reusable byte array used to write timestamps as Parquet INT96 values
+  private val timestampBuffer = new Array[Byte](12)
+
+  // Reusable byte array used to write decimal values
+  private val decimalBuffer = new 
Array[Byte](minBytesForPrecision(DecimalType.MAX_PRECISION))
+
+  override def init(configuration: Configuration): WriteContext = {
+    val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)
+    this.schema = StructType.fromString(schemaString)
+    this.writeLegacyParquetFormat = {
+      // `SQLConf.PARQUET_WRITE_LEGACY_FORMAT` should always be explicitly set 
in ParquetRelation
+      assert(configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key) != 
null)
+      configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean
+    }
+    this.rootFieldWriters = schema.map(_.dataType).map(makeWriter)
+
+    val messageType = new ParquetSchemaConverter(configuration).convert(schema)
+    val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> 
schemaString).asJava
+
+    logInfo(
+      s"""Initialized Parquet WriteSupport with Catalyst schema:
+         |${schema.prettyJson}
+         |and corresponding Parquet message type:
+         |$messageType
+       """.stripMargin)
+
+    new WriteContext(messageType, metadata)
+  }
+
+  override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
+    this.recordConsumer = recordConsumer
+  }
+
+  override def write(row: InternalRow): Unit = {
+    consumeMessage {
+      writeFields(row, schema, rootFieldWriters)
+    }
+  }
+
+  private def writeFields(
+      row: InternalRow, schema: StructType, fieldWriters: Seq[ValueWriter]): 
Unit = {
+    var i = 0
+    while (i < row.numFields) {
+      if (!row.isNullAt(i)) {
+        consumeField(schema(i).name, i) {
+          fieldWriters(i).apply(row, i)
+        }
+      }
+      i += 1
+    }
+  }
+
+  private def makeWriter(dataType: DataType): ValueWriter = {
+    dataType match {
+      case BooleanType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          recordConsumer.addBoolean(row.getBoolean(ordinal))
+
+      case ByteType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          recordConsumer.addInteger(row.getByte(ordinal))
+
+      case ShortType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          recordConsumer.addInteger(row.getShort(ordinal))
+
+      case IntegerType | DateType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          recordConsumer.addInteger(row.getInt(ordinal))
+
+      case LongType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          recordConsumer.addLong(row.getLong(ordinal))
+
+      case FloatType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          recordConsumer.addFloat(row.getFloat(ordinal))
+
+      case DoubleType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          recordConsumer.addDouble(row.getDouble(ordinal))
+
+      case StringType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          
recordConsumer.addBinary(Binary.fromByteArray(row.getUTF8String(ordinal).getBytes))
+
+      case TimestampType =>
+        (row: SpecializedGetters, ordinal: Int) => {
+          // TODO Writes `TimestampType` values as `TIMESTAMP_MICROS` once 
parquet-mr implements it
+          // Currently we only support timestamps stored as INT96, which is 
compatible with Hive
+          // and Impala.  However, INT96 is to be deprecated.  We plan to 
support `TIMESTAMP_MICROS`
+          // defined in the parquet-format spec.  But up until writing, the 
most recent parquet-mr
+          // version (1.8.1) hasn't implemented it yet.
+
+          // NOTE: Starting from Spark 1.5, Spark SQL `TimestampType` only has 
microsecond
+          // precision.  Nanosecond parts of timestamp values read from INT96 
are simply stripped.
+          val (julianDay, timeOfDayNanos) = 
DateTimeUtils.toJulianDay(row.getLong(ordinal))
+          val buf = ByteBuffer.wrap(timestampBuffer)
+          
buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
+          recordConsumer.addBinary(Binary.fromByteArray(timestampBuffer))
+        }
+
+      case BinaryType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          
recordConsumer.addBinary(Binary.fromByteArray(row.getBinary(ordinal)))
+
+      case DecimalType.Fixed(precision, scale) =>
+        makeDecimalWriter(precision, scale)
+
+      case t: StructType =>
+        val fieldWriters = t.map(_.dataType).map(makeWriter)
+        (row: SpecializedGetters, ordinal: Int) =>
+          consumeGroup {
+            writeFields(row.getStruct(ordinal, t.length), t, fieldWriters)
+          }
+
+      case t: ArrayType => makeArrayWriter(t)
+
+      case t: MapType => makeMapWriter(t)
+
+      case t: UserDefinedType[_] => makeWriter(t.sqlType)
+
+      // TODO Adds IntervalType support
+      case _ => sys.error(s"Unsupported data type $dataType.")
+    }
+  }
+
+  private def makeDecimalWriter(precision: Int, scale: Int): ValueWriter = {
+    assert(
+      precision <= DecimalType.MAX_PRECISION,
+      s"Decimal precision $precision exceeds max precision 
${DecimalType.MAX_PRECISION}")
+
+    val numBytes = minBytesForPrecision(precision)
+
+    val int32Writer =
+      (row: SpecializedGetters, ordinal: Int) => {
+        val unscaledLong = row.getDecimal(ordinal, precision, 
scale).toUnscaledLong
+        recordConsumer.addInteger(unscaledLong.toInt)
+      }
+
+    val int64Writer =
+      (row: SpecializedGetters, ordinal: Int) => {
+        val unscaledLong = row.getDecimal(ordinal, precision, 
scale).toUnscaledLong
+        recordConsumer.addLong(unscaledLong)
+      }
+
+    val binaryWriterUsingUnscaledLong =
+      (row: SpecializedGetters, ordinal: Int) => {
+        // When the precision is low enough (<= 18) to squeeze the decimal 
value into a `Long`, we
+        // can build a fixed-length byte array with length `numBytes` using 
the unscaled `Long`
+        // value and the `decimalBuffer` for better performance.
+        val unscaled = row.getDecimal(ordinal, precision, scale).toUnscaledLong
+        var i = 0
+        var shift = 8 * (numBytes - 1)
+
+        while (i < numBytes) {
+          decimalBuffer(i) = (unscaled >> shift).toByte
+          i += 1
+          shift -= 8
+        }
+
+        recordConsumer.addBinary(Binary.fromByteArray(decimalBuffer, 0, 
numBytes))
+      }
+
+    val binaryWriterUsingUnscaledBytes =
+      (row: SpecializedGetters, ordinal: Int) => {
+        val decimal = row.getDecimal(ordinal, precision, scale)
+        val bytes = decimal.toJavaBigDecimal.unscaledValue().toByteArray
+        val fixedLengthBytes = if (bytes.length == numBytes) {
+          // If the length of the underlying byte array of the unscaled 
`BigInteger` happens to be
+          // `numBytes`, just reuse it, so that we don't bother copying it to 
`decimalBuffer`.
+          bytes
+        } else {
+          // Otherwise, the length must be less than `numBytes`.  In this case 
we copy contents of
+          // the underlying bytes with padding sign bytes to `decimalBuffer` 
to form the result
+          // fixed-length byte array.
+          val signByte = if (bytes.head < 0) -1: Byte else 0: Byte
+          util.Arrays.fill(decimalBuffer, 0, numBytes - bytes.length, signByte)
+          System.arraycopy(bytes, 0, decimalBuffer, numBytes - bytes.length, 
bytes.length)
+          decimalBuffer
+        }
+
+        recordConsumer.addBinary(Binary.fromByteArray(fixedLengthBytes, 0, 
numBytes))
+      }
+
+    writeLegacyParquetFormat match {
+      // Standard mode, 1 <= precision <= 9, writes as INT32
+      case false if precision <= Decimal.MAX_INT_DIGITS => int32Writer
+
+      // Standard mode, 10 <= precision <= 18, writes as INT64
+      case false if precision <= Decimal.MAX_LONG_DIGITS => int64Writer
+
+      // Legacy mode, 1 <= precision <= 18, writes as FIXED_LEN_BYTE_ARRAY
+      case true if precision <= Decimal.MAX_LONG_DIGITS => 
binaryWriterUsingUnscaledLong
+
+      // Either standard or legacy mode, 19 <= precision <= 38, writes as 
FIXED_LEN_BYTE_ARRAY
+      case _ => binaryWriterUsingUnscaledBytes
+    }
+  }
+
+  def makeArrayWriter(arrayType: ArrayType): ValueWriter = {
+    val elementWriter = makeWriter(arrayType.elementType)
+
+    def threeLevelArrayWriter(repeatedGroupName: String, elementFieldName: 
String): ValueWriter =
+      (row: SpecializedGetters, ordinal: Int) => {
+        val array = row.getArray(ordinal)
+        consumeGroup {
+          // Only creates the repeated field if the array is non-empty.
+          if (array.numElements() > 0) {
+            consumeField(repeatedGroupName, 0) {
+              var i = 0
+              while (i < array.numElements()) {
+                consumeGroup {
+                  // Only creates the element field if the current array 
element is not null.
+                  if (!array.isNullAt(i)) {
+                    consumeField(elementFieldName, 0) {
+                      elementWriter.apply(array, i)
+                    }
+                  }
+                }
+                i += 1
+              }
+            }
+          }
+        }
+      }
+
+    def twoLevelArrayWriter(repeatedFieldName: String): ValueWriter =
+      (row: SpecializedGetters, ordinal: Int) => {
+        val array = row.getArray(ordinal)
+        consumeGroup {
+          // Only creates the repeated field if the array is non-empty.
+          if (array.numElements() > 0) {
+            consumeField(repeatedFieldName, 0) {
+              var i = 0
+              while (i < array.numElements()) {
+                elementWriter.apply(array, i)
+                i += 1
+              }
+            }
+          }
+        }
+      }
+
+    (writeLegacyParquetFormat, arrayType.containsNull) match {
+      case (legacyMode @ false, _) =>
+        // Standard mode:
+        //
+        //   <list-repetition> group <name> (LIST) {
+        //     repeated group list {
+        //                    ^~~~  repeatedGroupName
+        //       <element-repetition> <element-type> element;
+        //                                           ^~~~~~~  elementFieldName
+        //     }
+        //   }
+        threeLevelArrayWriter(repeatedGroupName = "list", elementFieldName = 
"element")
+
+      case (legacyMode @ true, nullableElements @ true) =>
+        // Legacy mode, with nullable elements:
+        //
+        //   <list-repetition> group <name> (LIST) {
+        //     optional group bag {
+        //                    ^~~  repeatedGroupName
+        //       repeated <element-type> array;
+        //                               ^~~~~ elementFieldName
+        //     }
+        //   }
+        threeLevelArrayWriter(repeatedGroupName = "bag", elementFieldName = 
"array")
+
+      case (legacyMode @ true, nullableElements @ false) =>
+        // Legacy mode, with non-nullable elements:
+        //
+        //   <list-repetition> group <name> (LIST) {
+        //     repeated <element-type> array;
+        //                             ^~~~~  repeatedFieldName
+        //   }
+        twoLevelArrayWriter(repeatedFieldName = "array")
+    }
+  }
+
+  private def makeMapWriter(mapType: MapType): ValueWriter = {
+    val keyWriter = makeWriter(mapType.keyType)
+    val valueWriter = makeWriter(mapType.valueType)
+    val repeatedGroupName = if (writeLegacyParquetFormat) {
+      // Legacy mode:
+      //
+      //   <map-repetition> group <name> (MAP) {
+      //     repeated group map (MAP_KEY_VALUE) {
+      //                    ^~~  repeatedGroupName
+      //       required <key-type> key;
+      //       <value-repetition> <value-type> value;
+      //     }
+      //   }
+      "map"
+    } else {
+      // Standard mode:
+      //
+      //   <map-repetition> group <name> (MAP) {
+      //     repeated group key_value {
+      //                    ^~~~~~~~~  repeatedGroupName
+      //       required <key-type> key;
+      //       <value-repetition> <value-type> value;
+      //     }
+      //   }
+      "key_value"
+    }
+
+    (row: SpecializedGetters, ordinal: Int) => {
+      val map = row.getMap(ordinal)
+      val keyArray = map.keyArray()
+      val valueArray = map.valueArray()
+
+      consumeGroup {
+        // Only creates the repeated field if the map is non-empty.
+        if (map.numElements() > 0) {
+          consumeField(repeatedGroupName, 0) {
+            var i = 0
+            while (i < map.numElements()) {
+              consumeGroup {
+                consumeField("key", 0) {
+                  keyWriter.apply(keyArray, i)
+                }
+
+                // Only creates the "value" field if the value if non-empty
+                if (!map.valueArray().isNullAt(i)) {
+                  consumeField("value", 1) {
+                    valueWriter.apply(valueArray, i)
+                  }
+                }
+              }
+              i += 1
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private def consumeMessage(f: => Unit): Unit = {
+    recordConsumer.startMessage()
+    f
+    recordConsumer.endMessage()
+  }
+
+  private def consumeGroup(f: => Unit): Unit = {
+    recordConsumer.startGroup()
+    f
+    recordConsumer.endGroup()
+  }
+
+  private def consumeField(field: String, index: Int)(f: => Unit): Unit = {
+    recordConsumer.startField(field, index)
+    f
+    recordConsumer.endField(field, index)
+  }
+}
+
+private[parquet] object ParquetWriteSupport {
+  val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes"
+
+  def setSchema(schema: StructType, configuration: Configuration): Unit = {
+    schema.map(_.name).foreach(ParquetSchemaConverter.checkFieldName)
+    configuration.set(SPARK_ROW_SCHEMA, schema.json)
+    configuration.setIfUnset(
+      ParquetOutputFormat.WRITER_VERSION,
+      ParquetProperties.WriterVersion.PARQUET_1_0.toString)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/52cb1ad3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 92f2db3..fc9ce6b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -362,7 +362,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
       assert(fs.exists(new Path(path, 
ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)))
       assert(fs.exists(new Path(path, 
ParquetFileWriter.PARQUET_METADATA_FILE)))
 
-      val expectedSchema = new CatalystSchemaConverter().convert(schema)
+      val expectedSchema = new ParquetSchemaConverter().convert(schema)
       val actualSchema = readFooter(path, hadoopConf).getFileMetaData.getSchema
 
       actualSchema.checkContains(expectedSchema)
@@ -432,7 +432,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
       """.stripMargin)
 
     withTempPath { location =>
-      val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> 
sparkSchema.toString)
+      val extraMetadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> 
sparkSchema.toString)
       val path = new Path(location.getCanonicalPath)
       val conf = spark.sessionState.newHadoopConf()
       writeMetadata(parquetSchema, path, conf, extraMetadata)

http://git-wip-us.apache.org/repos/asf/spark/blob/52cb1ad3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index b4fd0ef..83d1001 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -574,7 +574,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest 
with SharedSQLContext
   test("expand UDT in StructType") {
     val schema = new StructType().add("n", new NestedStructUDT, nullable = 
true)
     val expected = new StructType().add("n", new NestedStructUDT().sqlType, 
nullable = true)
-    assert(CatalystReadSupport.expandUDT(schema) === expected)
+    assert(ParquetReadSupport.expandUDT(schema) === expected)
   }
 
   test("expand UDT in ArrayType") {
@@ -592,7 +592,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest 
with SharedSQLContext
         containsNull = false),
       nullable = true)
 
-    assert(CatalystReadSupport.expandUDT(schema) === expected)
+    assert(ParquetReadSupport.expandUDT(schema) === expected)
   }
 
   test("expand UDT in MapType") {
@@ -612,7 +612,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest 
with SharedSQLContext
         valueContainsNull = false),
       nullable = true)
 
-    assert(CatalystReadSupport.expandUDT(schema) === expected)
+    assert(ParquetReadSupport.expandUDT(schema) === expected)
   }
 
   test("returning batch for wide table") {

http://git-wip-us.apache.org/repos/asf/spark/blob/52cb1ad3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 1bc6f70..51bb236 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -54,7 +54,7 @@ abstract class ParquetSchemaTest extends ParquetTest with 
SharedSQLContext {
       binaryAsString: Boolean,
       int96AsTimestamp: Boolean,
       writeLegacyParquetFormat: Boolean): Unit = {
-    val converter = new CatalystSchemaConverter(
+    val converter = new ParquetSchemaConverter(
       assumeBinaryIsString = binaryAsString,
       assumeInt96IsTimestamp = int96AsTimestamp,
       writeLegacyParquetFormat = writeLegacyParquetFormat)
@@ -78,7 +78,7 @@ abstract class ParquetSchemaTest extends ParquetTest with 
SharedSQLContext {
       binaryAsString: Boolean,
       int96AsTimestamp: Boolean,
       writeLegacyParquetFormat: Boolean): Unit = {
-    val converter = new CatalystSchemaConverter(
+    val converter = new ParquetSchemaConverter(
       assumeBinaryIsString = binaryAsString,
       assumeInt96IsTimestamp = int96AsTimestamp,
       writeLegacyParquetFormat = writeLegacyParquetFormat)
@@ -1054,7 +1054,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       expectedSchema: String): Unit = {
     test(s"Clipping - $testName") {
       val expected = MessageTypeParser.parseMessageType(expectedSchema)
-      val actual = CatalystReadSupport.clipParquetSchema(
+      val actual = ParquetReadSupport.clipParquetSchema(
         MessageTypeParser.parseMessageType(parquetSchema), catalystSchema)
 
       try {

http://git-wip-us.apache.org/repos/asf/spark/blob/52cb1ad3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index 1953d6f..9fb34e0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -124,8 +124,8 @@ private[sql] trait ParquetTest extends SQLTestUtils {
 
   protected def writeMetadata(
       schema: StructType, path: Path, configuration: Configuration): Unit = {
-    val parquetSchema = new CatalystSchemaConverter().convert(schema)
-    val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> 
schema.json).asJava
+    val parquetSchema = new ParquetSchemaConverter().convert(schema)
+    val extraMetadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> 
schema.json).asJava
     val createdBy = s"Apache Spark ${org.apache.spark.SPARK_VERSION}"
     val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, 
createdBy)
     val parquetMetadata = new ParquetMetadata(fileMetadata, 
Seq.empty[BlockMetaData].asJava)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to