http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
new file mode 100644
index 0000000..3542dfb
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.math.{BigDecimal, BigInteger}
+import java.nio.ByteOrder
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.parquet.column.Dictionary
+import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, 
PrimitiveConverter}
+import org.apache.parquet.schema.Type.Repetition
+import org.apache.parquet.schema.{GroupType, PrimitiveType, Type}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * A [[ParentContainerUpdater]] is used by a Parquet converter to set 
converted values to some
+ * corresponding parent container. For example, a converter for a `StructType` 
field may set
+ * converted values to a [[MutableRow]]; or a converter for array elements may 
append converted
+ * values to an [[ArrayBuffer]].
+ */
+private[parquet] trait ParentContainerUpdater {
+  def set(value: Any): Unit = ()
+  def setBoolean(value: Boolean): Unit = set(value)
+  def setByte(value: Byte): Unit = set(value)
+  def setShort(value: Short): Unit = set(value)
+  def setInt(value: Int): Unit = set(value)
+  def setLong(value: Long): Unit = set(value)
+  def setFloat(value: Float): Unit = set(value)
+  def setDouble(value: Double): Unit = set(value)
+}
+
+/** A no-op updater used for root converter (who doesn't have a parent). */
+private[parquet] object NoopUpdater extends ParentContainerUpdater
+
+/**
+ * A [[CatalystRowConverter]] is used to convert Parquet "structs" into Spark 
SQL [[InternalRow]]s.
+ * Since any Parquet record is also a struct, this converter can also be used 
as root converter.
+ *
+ * When used as a root converter, [[NoopUpdater]] should be used since root 
converters don't have
+ * any "parent" container.
+ *
+ * @param parquetType Parquet schema of Parquet records
+ * @param catalystType Spark SQL schema that corresponds to the Parquet record 
type
+ * @param updater An updater which propagates converted field values to the 
parent container
+ */
+private[parquet] class CatalystRowConverter(
+    parquetType: GroupType,
+    catalystType: StructType,
+    updater: ParentContainerUpdater)
+  extends GroupConverter {
+
+  /**
+   * Updater used together with field converters within a 
[[CatalystRowConverter]].  It propagates
+   * converted filed values to the `ordinal`-th cell in `currentRow`.
+   */
+  private final class RowUpdater(row: MutableRow, ordinal: Int) extends 
ParentContainerUpdater {
+    override def set(value: Any): Unit = row(ordinal) = value
+    override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, 
value)
+    override def setByte(value: Byte): Unit = row.setByte(ordinal, value)
+    override def setShort(value: Short): Unit = row.setShort(ordinal, value)
+    override def setInt(value: Int): Unit = row.setInt(ordinal, value)
+    override def setLong(value: Long): Unit = row.setLong(ordinal, value)
+    override def setDouble(value: Double): Unit = row.setDouble(ordinal, value)
+    override def setFloat(value: Float): Unit = row.setFloat(ordinal, value)
+  }
+
+  /**
+   * Represents the converted row object once an entire Parquet record is 
converted.
+   *
+   * @todo Uses [[UnsafeRow]] for better performance.
+   */
+  val currentRow = new SpecificMutableRow(catalystType.map(_.dataType))
+
+  // Converters for each field.
+  private val fieldConverters: Array[Converter] = {
+    parquetType.getFields.zip(catalystType).zipWithIndex.map {
+      case ((parquetFieldType, catalystField), ordinal) =>
+        // Converted field value should be set to the `ordinal`-th cell of 
`currentRow`
+        newConverter(parquetFieldType, catalystField.dataType, new 
RowUpdater(currentRow, ordinal))
+    }.toArray
+  }
+
+  override def getConverter(fieldIndex: Int): Converter = 
fieldConverters(fieldIndex)
+
+  override def end(): Unit = updater.set(currentRow)
+
+  override def start(): Unit = {
+    var i = 0
+    while (i < currentRow.numFields) {
+      currentRow.setNullAt(i)
+      i += 1
+    }
+  }
+
+  /**
+   * Creates a converter for the given Parquet type `parquetType` and Spark 
SQL data type
+   * `catalystType`. Converted values are handled by `updater`.
+   */
+  private def newConverter(
+      parquetType: Type,
+      catalystType: DataType,
+      updater: ParentContainerUpdater): Converter = {
+
+    catalystType match {
+      case BooleanType | IntegerType | LongType | FloatType | DoubleType | 
BinaryType =>
+        new CatalystPrimitiveConverter(updater)
+
+      case ByteType =>
+        new PrimitiveConverter {
+          override def addInt(value: Int): Unit =
+            updater.setByte(value.asInstanceOf[ByteType#InternalType])
+        }
+
+      case ShortType =>
+        new PrimitiveConverter {
+          override def addInt(value: Int): Unit =
+            updater.setShort(value.asInstanceOf[ShortType#InternalType])
+        }
+
+      case t: DecimalType =>
+        new CatalystDecimalConverter(t, updater)
+
+      case StringType =>
+        new CatalystStringConverter(updater)
+
+      case TimestampType =>
+        // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
+        new PrimitiveConverter {
+          // Converts nanosecond timestamps stored as INT96
+          override def addBinary(value: Binary): Unit = {
+            assert(
+              value.length() == 12,
+              "Timestamps (with nanoseconds) are expected to be stored in 
12-byte long binaries, " +
+              s"but got a ${value.length()}-byte binary.")
+
+            val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
+            val timeOfDayNanos = buf.getLong
+            val julianDay = buf.getInt
+            updater.setLong(DateTimeUtils.fromJulianDay(julianDay, 
timeOfDayNanos))
+          }
+        }
+
+      case DateType =>
+        new PrimitiveConverter {
+          override def addInt(value: Int): Unit = {
+            // DateType is not specialized in `SpecificMutableRow`, have to 
box it here.
+            updater.set(value.asInstanceOf[DateType#InternalType])
+          }
+        }
+
+      case t: ArrayType =>
+        new CatalystArrayConverter(parquetType.asGroupType(), t, updater)
+
+      case t: MapType =>
+        new CatalystMapConverter(parquetType.asGroupType(), t, updater)
+
+      case t: StructType =>
+        new CatalystRowConverter(parquetType.asGroupType(), t, new 
ParentContainerUpdater {
+          override def set(value: Any): Unit = 
updater.set(value.asInstanceOf[InternalRow].copy())
+        })
+
+      case t: UserDefinedType[_] =>
+        val catalystTypeForUDT = t.sqlType
+        val nullable = parquetType.isRepetition(Repetition.OPTIONAL)
+        val field = StructField("udt", catalystTypeForUDT, nullable)
+        val parquetTypeForUDT = new 
CatalystSchemaConverter().convertField(field)
+        newConverter(parquetTypeForUDT, catalystTypeForUDT, updater)
+
+      case _ =>
+        throw new RuntimeException(
+          s"Unable to create Parquet converter for data type 
${catalystType.json}")
+    }
+  }
+
+  /**
+   * Parquet converter for Parquet primitive types.  Note that not all Spark 
SQL atomic types
+   * are handled by this converter.  Parquet primitive types are only a subset 
of those of Spark
+   * SQL.  For example, BYTE, SHORT, and INT in Spark SQL are all covered by 
INT32 in Parquet.
+   */
+  private final class CatalystPrimitiveConverter(updater: 
ParentContainerUpdater)
+    extends PrimitiveConverter {
+
+    override def addBoolean(value: Boolean): Unit = updater.setBoolean(value)
+    override def addInt(value: Int): Unit = updater.setInt(value)
+    override def addLong(value: Long): Unit = updater.setLong(value)
+    override def addFloat(value: Float): Unit = updater.setFloat(value)
+    override def addDouble(value: Double): Unit = updater.setDouble(value)
+    override def addBinary(value: Binary): Unit = updater.set(value.getBytes)
+  }
+
+  /**
+   * Parquet converter for strings. A dictionary is used to minimize string 
decoding cost.
+   */
+  private final class CatalystStringConverter(updater: ParentContainerUpdater)
+    extends PrimitiveConverter {
+
+    private var expandedDictionary: Array[UTF8String] = null
+
+    override def hasDictionarySupport: Boolean = true
+
+    override def setDictionary(dictionary: Dictionary): Unit = {
+      this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { i =>
+        UTF8String.fromBytes(dictionary.decodeToBinary(i).getBytes)
+      }
+    }
+
+    override def addValueFromDictionary(dictionaryId: Int): Unit = {
+      updater.set(expandedDictionary(dictionaryId))
+    }
+
+    override def addBinary(value: Binary): Unit = {
+      updater.set(UTF8String.fromBytes(value.getBytes))
+    }
+  }
+
+  /**
+   * Parquet converter for fixed-precision decimals.
+   */
+  private final class CatalystDecimalConverter(
+      decimalType: DecimalType,
+      updater: ParentContainerUpdater)
+    extends PrimitiveConverter {
+
+    // Converts decimals stored as INT32
+    override def addInt(value: Int): Unit = {
+      addLong(value: Long)
+    }
+
+    // Converts decimals stored as INT64
+    override def addLong(value: Long): Unit = {
+      updater.set(Decimal(value, decimalType.precision, decimalType.scale))
+    }
+
+    // Converts decimals stored as either FIXED_LENGTH_BYTE_ARRAY or BINARY
+    override def addBinary(value: Binary): Unit = {
+      updater.set(toDecimal(value))
+    }
+
+    private def toDecimal(value: Binary): Decimal = {
+      val precision = decimalType.precision
+      val scale = decimalType.scale
+      val bytes = value.getBytes
+
+      if (precision <= CatalystSchemaConverter.MAX_PRECISION_FOR_INT64) {
+        // Constructs a `Decimal` with an unscaled `Long` value if possible.
+        var unscaled = 0L
+        var i = 0
+
+        while (i < bytes.length) {
+          unscaled = (unscaled << 8) | (bytes(i) & 0xff)
+          i += 1
+        }
+
+        val bits = 8 * bytes.length
+        unscaled = (unscaled << (64 - bits)) >> (64 - bits)
+        Decimal(unscaled, precision, scale)
+      } else {
+        // Otherwise, resorts to an unscaled `BigInteger` instead.
+        Decimal(new BigDecimal(new BigInteger(bytes), scale), precision, scale)
+      }
+    }
+  }
+
+  /**
+   * Parquet converter for arrays.  Spark SQL arrays are represented as 
Parquet lists.  Standard
+   * Parquet lists are represented as a 3-level group annotated by `LIST`:
+   * {{{
+   *   <list-repetition> group <name> (LIST) {            <-- parquetSchema 
points here
+   *     repeated group list {
+   *       <element-repetition> <element-type> element;
+   *     }
+   *   }
+   * }}}
+   * The `parquetSchema` constructor argument points to the outermost group.
+   *
+   * However, before this representation is standardized, some Parquet 
libraries/tools also use some
+   * non-standard formats to represent list-like structures.  
Backwards-compatibility rules for
+   * handling these cases are described in Parquet format spec.
+   *
+   * @see 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
+   */
+  private final class CatalystArrayConverter(
+      parquetSchema: GroupType,
+      catalystSchema: ArrayType,
+      updater: ParentContainerUpdater)
+    extends GroupConverter {
+
+    private var currentArray: ArrayBuffer[Any] = _
+
+    private val elementConverter: Converter = {
+      val repeatedType = parquetSchema.getType(0)
+      val elementType = catalystSchema.elementType
+
+      if (isElementType(repeatedType, elementType)) {
+        newConverter(repeatedType, elementType, new ParentContainerUpdater {
+          override def set(value: Any): Unit = currentArray += value
+        })
+      } else {
+        new ElementConverter(repeatedType.asGroupType().getType(0), 
elementType)
+      }
+    }
+
+    override def getConverter(fieldIndex: Int): Converter = elementConverter
+
+    override def end(): Unit = updater.set(new 
GenericArrayData(currentArray.toArray))
+
+    // NOTE: We can't reuse the mutable `ArrayBuffer` here and must 
instantiate a new buffer for the
+    // next value.  `Row.copy()` only copies row cells, it doesn't do deep 
copy to objects stored
+    // in row cells.
+    override def start(): Unit = currentArray = ArrayBuffer.empty[Any]
+
+    // scalastyle:off
+    /**
+     * Returns whether the given type is the element type of a list or is a 
syntactic group with
+     * one field that is the element type.  This is determined by checking 
whether the type can be
+     * a syntactic group and by checking whether a potential syntactic group 
matches the expected
+     * schema.
+     * {{{
+     *   <list-repetition> group <name> (LIST) {
+     *     repeated group list {                          <-- repeatedType 
points here
+     *       <element-repetition> <element-type> element;
+     *     }
+     *   }
+     * }}}
+     * In short, here we handle Parquet list backwards-compatibility rules on 
the read path.  This
+     * method is based on `AvroIndexedRecordConverter.isElementType`.
+     *
+     * @see 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
+     */
+    // scalastyle:on
+    private def isElementType(parquetRepeatedType: Type, catalystElementType: 
DataType): Boolean = {
+      (parquetRepeatedType, catalystElementType) match {
+        case (t: PrimitiveType, _) => true
+        case (t: GroupType, _) if t.getFieldCount > 1 => true
+        case (t: GroupType, StructType(Array(f))) if f.name == 
t.getFieldName(0) => true
+        case _ => false
+      }
+    }
+
+    /** Array element converter */
+    private final class ElementConverter(parquetType: Type, catalystType: 
DataType)
+      extends GroupConverter {
+
+      private var currentElement: Any = _
+
+      private val converter = newConverter(parquetType, catalystType, new 
ParentContainerUpdater {
+        override def set(value: Any): Unit = currentElement = value
+      })
+
+      override def getConverter(fieldIndex: Int): Converter = converter
+
+      override def end(): Unit = currentArray += currentElement
+
+      override def start(): Unit = currentElement = null
+    }
+  }
+
+  /** Parquet converter for maps */
+  private final class CatalystMapConverter(
+      parquetType: GroupType,
+      catalystType: MapType,
+      updater: ParentContainerUpdater)
+    extends GroupConverter {
+
+    private var currentKeys: ArrayBuffer[Any] = _
+    private var currentValues: ArrayBuffer[Any] = _
+
+    private val keyValueConverter = {
+      val repeatedType = parquetType.getType(0).asGroupType()
+      new KeyValueConverter(
+        repeatedType.getType(0),
+        repeatedType.getType(1),
+        catalystType.keyType,
+        catalystType.valueType)
+    }
+
+    override def getConverter(fieldIndex: Int): Converter = keyValueConverter
+
+    override def end(): Unit =
+      updater.set(ArrayBasedMapData(currentKeys.toArray, 
currentValues.toArray))
+
+    // NOTE: We can't reuse the mutable Map here and must instantiate a new 
`Map` for the next
+    // value.  `Row.copy()` only copies row cells, it doesn't do deep copy to 
objects stored in row
+    // cells.
+    override def start(): Unit = {
+      currentKeys = ArrayBuffer.empty[Any]
+      currentValues = ArrayBuffer.empty[Any]
+    }
+
+    /** Parquet converter for key-value pairs within the map. */
+    private final class KeyValueConverter(
+        parquetKeyType: Type,
+        parquetValueType: Type,
+        catalystKeyType: DataType,
+        catalystValueType: DataType)
+      extends GroupConverter {
+
+      private var currentKey: Any = _
+
+      private var currentValue: Any = _
+
+      private val converters = Array(
+        // Converter for keys
+        newConverter(parquetKeyType, catalystKeyType, new 
ParentContainerUpdater {
+          override def set(value: Any): Unit = currentKey = value
+        }),
+
+        // Converter for values
+        newConverter(parquetValueType, catalystValueType, new 
ParentContainerUpdater {
+          override def set(value: Any): Unit = currentValue = value
+        }))
+
+      override def getConverter(fieldIndex: Int): Converter = 
converters(fieldIndex)
+
+      override def end(): Unit = {
+        currentKeys += currentKey
+        currentValues += currentValue
+      }
+
+      override def start(): Unit = {
+        currentKey = null
+        currentValue = null
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
new file mode 100644
index 0000000..a3fc74c
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConversions._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
+import org.apache.parquet.schema.Type.Repetition._
+import org.apache.parquet.schema._
+
+import 
org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.{MAX_PRECISION_FOR_INT32,
 MAX_PRECISION_FOR_INT64, maxPrecisionForBytes}
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{AnalysisException, SQLConf}
+
+/**
+ * This converter class is used to convert Parquet [[MessageType]] to Spark 
SQL [[StructType]] and
+ * vice versa.
+ *
+ * Parquet format backwards-compatibility rules are respected when converting 
Parquet
+ * [[MessageType]] schemas.
+ *
+ * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
+ *
+ * @constructor
+ * @param assumeBinaryIsString Whether unannotated BINARY fields should be 
assumed to be Spark SQL
+ *        [[StringType]] fields when converting Parquet a [[MessageType]] to 
Spark SQL
+ *        [[StructType]].
+ * @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be 
assumed to be Spark SQL
+ *        [[TimestampType]] fields when converting Parquet a [[MessageType]] 
to Spark SQL
+ *        [[StructType]].  Note that Spark SQL [[TimestampType]] is similar to 
Hive timestamp, which
+ *        has optional nanosecond precision, but different from `TIME_MILLS` 
and `TIMESTAMP_MILLIS`
+ *        described in Parquet format spec.
+ * @param followParquetFormatSpec Whether to generate standard DECIMAL, LIST, 
and MAP structure when
+ *        converting Spark SQL [[StructType]] to Parquet [[MessageType]].  For 
Spark 1.4.x and
+ *        prior versions, Spark SQL only supports decimals with a max 
precision of 18 digits, and
+ *        uses non-standard LIST and MAP structure.  Note that the current 
Parquet format spec is
+ *        backwards-compatible with these settings.  If this argument is set 
to `false`, we fallback
+ *        to old style non-standard behaviors.
+ */
+private[parquet] class CatalystSchemaConverter(
+    private val assumeBinaryIsString: Boolean,
+    private val assumeInt96IsTimestamp: Boolean,
+    private val followParquetFormatSpec: Boolean) {
+
+  // Only used when constructing converter for converting Spark SQL schema to 
Parquet schema, in
+  // which case `assumeInt96IsTimestamp` and `assumeBinaryIsString` are 
irrelevant.
+  def this() = this(
+    assumeBinaryIsString = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
+    assumeInt96IsTimestamp = 
SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
+    followParquetFormatSpec = 
SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get)
+
+  def this(conf: SQLConf) = this(
+    assumeBinaryIsString = conf.isParquetBinaryAsString,
+    assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
+    followParquetFormatSpec = conf.followParquetFormatSpec)
+
+  def this(conf: Configuration) = this(
+    assumeBinaryIsString =
+      conf.getBoolean(
+        SQLConf.PARQUET_BINARY_AS_STRING.key,
+        SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get),
+    assumeInt96IsTimestamp =
+      conf.getBoolean(
+        SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+        SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get),
+    followParquetFormatSpec =
+      conf.getBoolean(
+        SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key,
+        SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get))
+
+  /**
+   * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL 
[[StructType]].
+   */
+  def convert(parquetSchema: MessageType): StructType = 
convert(parquetSchema.asGroupType())
+
+  private def convert(parquetSchema: GroupType): StructType = {
+    val fields = parquetSchema.getFields.map { field =>
+      field.getRepetition match {
+        case OPTIONAL =>
+          StructField(field.getName, convertField(field), nullable = true)
+
+        case REQUIRED =>
+          StructField(field.getName, convertField(field), nullable = false)
+
+        case REPEATED =>
+          throw new AnalysisException(
+            s"REPEATED not supported outside LIST or MAP. Type: $field")
+      }
+    }
+
+    StructType(fields)
+  }
+
+  /**
+   * Converts a Parquet [[Type]] to a Spark SQL [[DataType]].
+   */
+  def convertField(parquetType: Type): DataType = parquetType match {
+    case t: PrimitiveType => convertPrimitiveField(t)
+    case t: GroupType => convertGroupField(t.asGroupType())
+  }
+
+  private def convertPrimitiveField(field: PrimitiveType): DataType = {
+    val typeName = field.getPrimitiveTypeName
+    val originalType = field.getOriginalType
+
+    def typeString =
+      if (originalType == null) s"$typeName" else s"$typeName ($originalType)"
+
+    def typeNotImplemented() =
+      throw new AnalysisException(s"Parquet type not yet supported: 
$typeString")
+
+    def illegalType() =
+      throw new AnalysisException(s"Illegal Parquet type: $typeString")
+
+    // When maxPrecision = -1, we skip precision range check, and always 
respect the precision
+    // specified in field.getDecimalMetadata.  This is useful when 
interpreting decimal types stored
+    // as binaries with variable lengths.
+    def makeDecimalType(maxPrecision: Int = -1): DecimalType = {
+      val precision = field.getDecimalMetadata.getPrecision
+      val scale = field.getDecimalMetadata.getScale
+
+      CatalystSchemaConverter.analysisRequire(
+        maxPrecision == -1 || 1 <= precision && precision <= maxPrecision,
+        s"Invalid decimal precision: $typeName cannot store $precision digits 
(max $maxPrecision)")
+
+      DecimalType(precision, scale)
+    }
+
+    typeName match {
+      case BOOLEAN => BooleanType
+
+      case FLOAT => FloatType
+
+      case DOUBLE => DoubleType
+
+      case INT32 =>
+        originalType match {
+          case INT_8 => ByteType
+          case INT_16 => ShortType
+          case INT_32 | null => IntegerType
+          case DATE => DateType
+          case DECIMAL => makeDecimalType(MAX_PRECISION_FOR_INT32)
+          case TIME_MILLIS => typeNotImplemented()
+          case _ => illegalType()
+        }
+
+      case INT64 =>
+        originalType match {
+          case INT_64 | null => LongType
+          case DECIMAL => makeDecimalType(MAX_PRECISION_FOR_INT64)
+          case TIMESTAMP_MILLIS => typeNotImplemented()
+          case _ => illegalType()
+        }
+
+      case INT96 =>
+        CatalystSchemaConverter.analysisRequire(
+          assumeInt96IsTimestamp,
+          "INT96 is not supported unless it's interpreted as timestamp. " +
+            s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to 
true.")
+        TimestampType
+
+      case BINARY =>
+        originalType match {
+          case UTF8 | ENUM => StringType
+          case null if assumeBinaryIsString => StringType
+          case null => BinaryType
+          case DECIMAL => makeDecimalType()
+          case _ => illegalType()
+        }
+
+      case FIXED_LEN_BYTE_ARRAY =>
+        originalType match {
+          case DECIMAL => 
makeDecimalType(maxPrecisionForBytes(field.getTypeLength))
+          case INTERVAL => typeNotImplemented()
+          case _ => illegalType()
+        }
+
+      case _ => illegalType()
+    }
+  }
+
+  private def convertGroupField(field: GroupType): DataType = {
+    Option(field.getOriginalType).fold(convert(field): DataType) {
+      // A Parquet list is represented as a 3-level structure:
+      //
+      //   <list-repetition> group <name> (LIST) {
+      //     repeated group list {
+      //       <element-repetition> <element-type> element;
+      //     }
+      //   }
+      //
+      // However, according to the most recent Parquet format spec (not 
released yet up until
+      // writing), some 2-level structures are also recognized for 
backwards-compatibility.  Thus,
+      // we need to check whether the 2nd level or the 3rd level refers to 
list element type.
+      //
+      // See: 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
+      case LIST =>
+        CatalystSchemaConverter.analysisRequire(
+          field.getFieldCount == 1, s"Invalid list type $field")
+
+        val repeatedType = field.getType(0)
+        CatalystSchemaConverter.analysisRequire(
+          repeatedType.isRepetition(REPEATED), s"Invalid list type $field")
+
+        if (isElementType(repeatedType, field.getName)) {
+          ArrayType(convertField(repeatedType), containsNull = false)
+        } else {
+          val elementType = repeatedType.asGroupType().getType(0)
+          val optional = elementType.isRepetition(OPTIONAL)
+          ArrayType(convertField(elementType), containsNull = optional)
+        }
+
+      // scalastyle:off
+      // `MAP_KEY_VALUE` is for backwards-compatibility
+      // See: 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1
+      // scalastyle:on
+      case MAP | MAP_KEY_VALUE =>
+        CatalystSchemaConverter.analysisRequire(
+          field.getFieldCount == 1 && !field.getType(0).isPrimitive,
+          s"Invalid map type: $field")
+
+        val keyValueType = field.getType(0).asGroupType()
+        CatalystSchemaConverter.analysisRequire(
+          keyValueType.isRepetition(REPEATED) && keyValueType.getFieldCount == 
2,
+          s"Invalid map type: $field")
+
+        val keyType = keyValueType.getType(0)
+        CatalystSchemaConverter.analysisRequire(
+          keyType.isPrimitive,
+          s"Map key type is expected to be a primitive type, but found: 
$keyType")
+
+        val valueType = keyValueType.getType(1)
+        val valueOptional = valueType.isRepetition(OPTIONAL)
+        MapType(
+          convertField(keyType),
+          convertField(valueType),
+          valueContainsNull = valueOptional)
+
+      case _ =>
+        throw new AnalysisException(s"Unrecognized Parquet type: $field")
+    }
+  }
+
+  // scalastyle:off
+  // Here we implement Parquet LIST backwards-compatibility rules.
+  // See: 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
+  // scalastyle:on
+  private def isElementType(repeatedType: Type, parentName: String): Boolean = 
{
+    {
+      // For legacy 2-level list types with primitive element type, e.g.:
+      //
+      //    // List<Integer> (nullable list, non-null elements)
+      //    optional group my_list (LIST) {
+      //      repeated int32 element;
+      //    }
+      //
+      repeatedType.isPrimitive
+    } || {
+      // For legacy 2-level list types whose element type is a group type with 
2 or more fields,
+      // e.g.:
+      //
+      //    // List<Tuple<String, Integer>> (nullable list, non-null elements)
+      //    optional group my_list (LIST) {
+      //      repeated group element {
+      //        required binary str (UTF8);
+      //        required int32 num;
+      //      };
+      //    }
+      //
+      repeatedType.asGroupType().getFieldCount > 1
+    } || {
+      // For legacy 2-level list types generated by parquet-avro (Parquet 
version < 1.6.0), e.g.:
+      //
+      //    // List<OneTuple<String>> (nullable list, non-null elements)
+      //    optional group my_list (LIST) {
+      //      repeated group array {
+      //        required binary str (UTF8);
+      //      };
+      //    }
+      //
+      repeatedType.getName == "array"
+    } || {
+      // For Parquet data generated by parquet-thrift, e.g.:
+      //
+      //    // List<OneTuple<String>> (nullable list, non-null elements)
+      //    optional group my_list (LIST) {
+      //      repeated group my_list_tuple {
+      //        required binary str (UTF8);
+      //      };
+      //    }
+      //
+      repeatedType.getName == s"${parentName}_tuple"
+    }
+  }
+
+  /**
+   * Converts a Spark SQL [[StructType]] to a Parquet [[MessageType]].
+   */
+  def convert(catalystSchema: StructType): MessageType = {
+    Types.buildMessage().addFields(catalystSchema.map(convertField): 
_*).named("root")
+  }
+
+  /**
+   * Converts a Spark SQL [[StructField]] to a Parquet [[Type]].
+   */
+  def convertField(field: StructField): Type = {
+    convertField(field, if (field.nullable) OPTIONAL else REQUIRED)
+  }
+
+  private def convertField(field: StructField, repetition: Type.Repetition): 
Type = {
+    CatalystSchemaConverter.checkFieldName(field.name)
+
+    field.dataType match {
+      // ===================
+      // Simple atomic types
+      // ===================
+
+      case BooleanType =>
+        Types.primitive(BOOLEAN, repetition).named(field.name)
+
+      case ByteType =>
+        Types.primitive(INT32, repetition).as(INT_8).named(field.name)
+
+      case ShortType =>
+        Types.primitive(INT32, repetition).as(INT_16).named(field.name)
+
+      case IntegerType =>
+        Types.primitive(INT32, repetition).named(field.name)
+
+      case LongType =>
+        Types.primitive(INT64, repetition).named(field.name)
+
+      case FloatType =>
+        Types.primitive(FLOAT, repetition).named(field.name)
+
+      case DoubleType =>
+        Types.primitive(DOUBLE, repetition).named(field.name)
+
+      case StringType =>
+        Types.primitive(BINARY, repetition).as(UTF8).named(field.name)
+
+      case DateType =>
+        Types.primitive(INT32, repetition).as(DATE).named(field.name)
+
+      // NOTE: Spark SQL TimestampType is NOT a well defined type in Parquet 
format spec.
+      //
+      // As stated in PARQUET-323, Parquet `INT96` was originally introduced 
to represent nanosecond
+      // timestamp in Impala for some historical reasons, it's not recommended 
to be used for any
+      // other types and will probably be deprecated in future Parquet format 
spec.  That's the
+      // reason why Parquet format spec only defines `TIMESTAMP_MILLIS` and 
`TIMESTAMP_MICROS` which
+      // are both logical types annotating `INT64`.
+      //
+      // Originally, Spark SQL uses the same nanosecond timestamp type as 
Impala and Hive.  Starting
+      // from Spark 1.5.0, we resort to a timestamp type with 100 ns precision 
so that we can store
+      // a timestamp into a `Long`.  This design decision is subject to change 
though, for example,
+      // we may resort to microsecond precision in the future.
+      //
+      // For Parquet, we plan to write all `TimestampType` value as 
`TIMESTAMP_MICROS`, but it's
+      // currently not implemented yet because parquet-mr 1.7.0 (the version 
we're currently using)
+      // hasn't implemented `TIMESTAMP_MICROS` yet.
+      //
+      // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
+      case TimestampType =>
+        Types.primitive(INT96, repetition).named(field.name)
+
+      case BinaryType =>
+        Types.primitive(BINARY, repetition).named(field.name)
+
+      // =====================================
+      // Decimals (for Spark version <= 1.4.x)
+      // =====================================
+
+      // Spark 1.4.x and prior versions only support decimals with a maximum 
precision of 18 and
+      // always store decimals in fixed-length byte arrays.  To keep 
compatibility with these older
+      // versions, here we convert decimals with all precisions to 
`FIXED_LEN_BYTE_ARRAY` annotated
+      // by `DECIMAL`.
+      case DecimalType.Fixed(precision, scale) if !followParquetFormatSpec =>
+        Types
+          .primitive(FIXED_LEN_BYTE_ARRAY, repetition)
+          .as(DECIMAL)
+          .precision(precision)
+          .scale(scale)
+          .length(CatalystSchemaConverter.minBytesForPrecision(precision))
+          .named(field.name)
+
+      // =====================================
+      // Decimals (follow Parquet format spec)
+      // =====================================
+
+      // Uses INT32 for 1 <= precision <= 9
+      case DecimalType.Fixed(precision, scale)
+          if precision <= MAX_PRECISION_FOR_INT32 && followParquetFormatSpec =>
+        Types
+          .primitive(INT32, repetition)
+          .as(DECIMAL)
+          .precision(precision)
+          .scale(scale)
+          .named(field.name)
+
+      // Uses INT64 for 1 <= precision <= 18
+      case DecimalType.Fixed(precision, scale)
+          if precision <= MAX_PRECISION_FOR_INT64 && followParquetFormatSpec =>
+        Types
+          .primitive(INT64, repetition)
+          .as(DECIMAL)
+          .precision(precision)
+          .scale(scale)
+          .named(field.name)
+
+      // Uses FIXED_LEN_BYTE_ARRAY for all other precisions
+      case DecimalType.Fixed(precision, scale) if followParquetFormatSpec =>
+        Types
+          .primitive(FIXED_LEN_BYTE_ARRAY, repetition)
+          .as(DECIMAL)
+          .precision(precision)
+          .scale(scale)
+          .length(CatalystSchemaConverter.minBytesForPrecision(precision))
+          .named(field.name)
+
+      // ===================================================
+      // ArrayType and MapType (for Spark versions <= 1.4.x)
+      // ===================================================
+
+      // Spark 1.4.x and prior versions convert ArrayType with nullable 
elements into a 3-level
+      // LIST structure.  This behavior mimics parquet-hive (1.6.0rc3).  Note 
that this case is
+      // covered by the backwards-compatibility rules implemented in 
`isElementType()`.
+      case ArrayType(elementType, nullable @ true) if !followParquetFormatSpec 
=>
+        // <list-repetition> group <name> (LIST) {
+        //   optional group bag {
+        //     repeated <element-type> element;
+        //   }
+        // }
+        ConversionPatterns.listType(
+          repetition,
+          field.name,
+          Types
+            .buildGroup(REPEATED)
+            // "array_element" is the name chosen by parquet-hive (1.7.0 and 
prior version)
+            .addField(convertField(StructField("array_element", elementType, 
nullable)))
+            .named(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME))
+
+      // Spark 1.4.x and prior versions convert ArrayType with non-nullable 
elements into a 2-level
+      // LIST structure.  This behavior mimics parquet-avro (1.6.0rc3).  Note 
that this case is
+      // covered by the backwards-compatibility rules implemented in 
`isElementType()`.
+      case ArrayType(elementType, nullable @ false) if 
!followParquetFormatSpec =>
+        // <list-repetition> group <name> (LIST) {
+        //   repeated <element-type> element;
+        // }
+        ConversionPatterns.listType(
+          repetition,
+          field.name,
+          // "array" is the name chosen by parquet-avro (1.7.0 and prior 
version)
+          convertField(StructField("array", elementType, nullable), REPEATED))
+
+      // Spark 1.4.x and prior versions convert MapType into a 3-level group 
annotated by
+      // MAP_KEY_VALUE.  This is covered by `convertGroupField(field: 
GroupType): DataType`.
+      case MapType(keyType, valueType, valueContainsNull) if 
!followParquetFormatSpec =>
+        // <map-repetition> group <name> (MAP) {
+        //   repeated group map (MAP_KEY_VALUE) {
+        //     required <key-type> key;
+        //     <value-repetition> <value-type> value;
+        //   }
+        // }
+        ConversionPatterns.mapType(
+          repetition,
+          field.name,
+          convertField(StructField("key", keyType, nullable = false)),
+          convertField(StructField("value", valueType, valueContainsNull)))
+
+      // ==================================================
+      // ArrayType and MapType (follow Parquet format spec)
+      // ==================================================
+
+      case ArrayType(elementType, containsNull) if followParquetFormatSpec =>
+        // <list-repetition> group <name> (LIST) {
+        //   repeated group list {
+        //     <element-repetition> <element-type> element;
+        //   }
+        // }
+        Types
+          .buildGroup(repetition).as(LIST)
+          .addField(
+            Types.repeatedGroup()
+              .addField(convertField(StructField("element", elementType, 
containsNull)))
+              .named("list"))
+          .named(field.name)
+
+      case MapType(keyType, valueType, valueContainsNull) =>
+        // <map-repetition> group <name> (MAP) {
+        //   repeated group key_value {
+        //     required <key-type> key;
+        //     <value-repetition> <value-type> value;
+        //   }
+        // }
+        Types
+          .buildGroup(repetition).as(MAP)
+          .addField(
+            Types
+              .repeatedGroup()
+              .addField(convertField(StructField("key", keyType, nullable = 
false)))
+              .addField(convertField(StructField("value", valueType, 
valueContainsNull)))
+              .named("key_value"))
+          .named(field.name)
+
+      // ===========
+      // Other types
+      // ===========
+
+      case StructType(fields) =>
+        fields.foldLeft(Types.buildGroup(repetition)) { (builder, field) =>
+          builder.addField(convertField(field))
+        }.named(field.name)
+
+      case udt: UserDefinedType[_] =>
+        convertField(field.copy(dataType = udt.sqlType))
+
+      case _ =>
+        throw new AnalysisException(s"Unsupported data type $field.dataType")
+    }
+  }
+}
+
+
+private[parquet] object CatalystSchemaConverter {
+  def checkFieldName(name: String): Unit = {
+    // ,;{}()\n\t= and space are special characters in Parquet schema
+    analysisRequire(
+      !name.matches(".*[ ,;{}()\n\t=].*"),
+      s"""Attribute name "$name" contains invalid character(s) among " 
,;{}()\\n\\t=".
+         |Please use alias to rename it.
+       """.stripMargin.split("\n").mkString(" "))
+  }
+
+  def checkFieldNames(schema: StructType): StructType = {
+    schema.fieldNames.foreach(checkFieldName)
+    schema
+  }
+
+  def analysisRequire(f: => Boolean, message: String): Unit = {
+    if (!f) {
+      throw new AnalysisException(message)
+    }
+  }
+
+  private def computeMinBytesForPrecision(precision : Int) : Int = {
+    var numBytes = 1
+    while (math.pow(2.0, 8 * numBytes - 1) < math.pow(10.0, precision)) {
+      numBytes += 1
+    }
+    numBytes
+  }
+
+  private val MIN_BYTES_FOR_PRECISION = 
Array.tabulate[Int](39)(computeMinBytesForPrecision)
+
+  // Returns the minimum number of bytes needed to store a decimal with a 
given `precision`.
+  def minBytesForPrecision(precision : Int) : Int = {
+    if (precision < MIN_BYTES_FOR_PRECISION.length) {
+      MIN_BYTES_FOR_PRECISION(precision)
+    } else {
+      computeMinBytesForPrecision(precision)
+    }
+  }
+
+  val MAX_PRECISION_FOR_INT32 = maxPrecisionForBytes(4)
+
+  val MAX_PRECISION_FOR_INT64 = maxPrecisionForBytes(8)
+
+  // Max precision of a decimal value stored in `numBytes` bytes
+  def maxPrecisionForBytes(numBytes: Int): Int = {
+    Math.round(                               // convert double to long
+      Math.floor(Math.log10(                  // number of base-10 digits
+        Math.pow(2, 8 * numBytes - 1) - 1)))  // max value stored in numBytes
+      .asInstanceOf[Int]
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
new file mode 100644
index 0000000..2c6b914
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
+import org.apache.parquet.Log
+import org.apache.parquet.hadoop.util.ContextUtil
+import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, 
ParquetOutputCommitter, ParquetOutputFormat}
+
+/**
+ * An output committer for writing Parquet files.  In stead of writing to the 
`_temporary` folder
+ * like what [[ParquetOutputCommitter]] does, this output committer writes 
data directly to the
+ * destination folder.  This can be useful for data stored in S3, where 
directory operations are
+ * relatively expensive.
+ *
+ * To enable this output committer, users may set the 
"spark.sql.parquet.output.committer.class"
+ * property via Hadoop [[Configuration]].  Not that this property overrides
+ * "spark.sql.sources.outputCommitterClass".
+ *
+ * *NOTE*
+ *
+ *   NEVER use [[DirectParquetOutputCommitter]] when appending data, because 
currently there's
+ *   no safe way undo a failed appending job (that's why both `abortTask()` 
and `abortJob()` are
+ *   left * empty).
+ */
+private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: 
TaskAttemptContext)
+  extends ParquetOutputCommitter(outputPath, context) {
+  val LOG = Log.getLog(classOf[ParquetOutputCommitter])
+
+  override def getWorkPath: Path = outputPath
+  override def abortTask(taskContext: TaskAttemptContext): Unit = {}
+  override def commitTask(taskContext: TaskAttemptContext): Unit = {}
+  override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true
+  override def setupJob(jobContext: JobContext): Unit = {}
+  override def setupTask(taskContext: TaskAttemptContext): Unit = {}
+
+  override def commitJob(jobContext: JobContext) {
+    val configuration = ContextUtil.getConfiguration(jobContext)
+    val fileSystem = outputPath.getFileSystem(configuration)
+
+    if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, 
true)) {
+      try {
+        val outputStatus = fileSystem.getFileStatus(outputPath)
+        val footers = 
ParquetFileReader.readAllFootersInParallel(configuration, outputStatus)
+        try {
+          ParquetFileWriter.writeMetadataFile(configuration, outputPath, 
footers)
+        } catch { case e: Exception =>
+          LOG.warn("could not write summary file for " + outputPath, e)
+          val metadataPath = new Path(outputPath, 
ParquetFileWriter.PARQUET_METADATA_FILE)
+          if (fileSystem.exists(metadataPath)) {
+            fileSystem.delete(metadataPath, true)
+          }
+        }
+      } catch {
+        case e: Exception => LOG.warn("could not write summary file for " + 
outputPath, e)
+      }
+    }
+
+    if 
(configuration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", 
true)) {
+      try {
+        val successPath = new Path(outputPath, 
FileOutputCommitter.SUCCEEDED_FILE_NAME)
+        fileSystem.create(successPath).close()
+      } catch {
+        case e: Exception => LOG.warn("could not write success file for " + 
outputPath, e)
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetConverter.scala
new file mode 100644
index 0000000..ccd7ebf
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetConverter.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{MapData, ArrayData}
+
+// TODO Removes this while fixing SPARK-8848
+private[sql] object CatalystConverter {
+  // This is mostly Parquet convention (see, e.g., `ConversionPatterns`).
+  // Note that "array" for the array elements is chosen by ParquetAvro.
+  // Using a different value will result in Parquet silently dropping columns.
+  val ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME = "bag"
+  val ARRAY_ELEMENTS_SCHEMA_NAME = "array"
+
+  val MAP_KEY_SCHEMA_NAME = "key"
+  val MAP_VALUE_SCHEMA_NAME = "value"
+  val MAP_SCHEMA_NAME = "map"
+
+  // TODO: consider using Array[T] for arrays to avoid boxing of primitive 
types
+  type ArrayScalaType = ArrayData
+  type StructScalaType = InternalRow
+  type MapScalaType = MapData
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
new file mode 100644
index 0000000..9e2e232
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.Serializable
+import java.nio.ByteBuffer
+
+import com.google.common.io.BaseEncoding
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.compat.FilterCompat._
+import org.apache.parquet.filter2.predicate.FilterApi._
+import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, 
Statistics}
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate
+import org.apache.parquet.io.api.Binary
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.sources
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+private[sql] object ParquetFilters {
+  val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"
+
+  def createRecordFilter(filterExpressions: Seq[Expression]): Option[Filter] = 
{
+    filterExpressions.flatMap { filter =>
+      createFilter(filter)
+    }.reduceOption(FilterApi.and).map(FilterCompat.get)
+  }
+
+  case class SetInFilter[T <: Comparable[T]](
+    valueSet: Set[T]) extends UserDefinedPredicate[T] with Serializable {
+
+    override def keep(value: T): Boolean = {
+      value != null && valueSet.contains(value)
+    }
+
+    override def canDrop(statistics: Statistics[T]): Boolean = false
+
+    override def inverseCanDrop(statistics: Statistics[T]): Boolean = false
+  }
+
+  private val makeEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
+    case BooleanType =>
+      (n: String, v: Any) => FilterApi.eq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
+    case IntegerType =>
+      (n: String, v: Any) => FilterApi.eq(intColumn(n), 
v.asInstanceOf[Integer])
+    case LongType =>
+      (n: String, v: Any) => FilterApi.eq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
+    case FloatType =>
+      (n: String, v: Any) => FilterApi.eq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
+    case DoubleType =>
+      (n: String, v: Any) => FilterApi.eq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
+
+    // Binary.fromString and Binary.fromByteArray don't accept null values
+    case StringType =>
+      (n: String, v: Any) => FilterApi.eq(
+        binaryColumn(n),
+        Option(v).map(s => 
Binary.fromByteArray(s.asInstanceOf[UTF8String].getBytes)).orNull)
+    case BinaryType =>
+      (n: String, v: Any) => FilterApi.eq(
+        binaryColumn(n),
+        Option(v).map(b => 
Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
+  }
+
+  private val makeNotEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
+    case BooleanType =>
+      (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
+    case IntegerType =>
+      (n: String, v: Any) => FilterApi.notEq(intColumn(n), 
v.asInstanceOf[Integer])
+    case LongType =>
+      (n: String, v: Any) => FilterApi.notEq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
+    case FloatType =>
+      (n: String, v: Any) => FilterApi.notEq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
+    case DoubleType =>
+      (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
+    case StringType =>
+      (n: String, v: Any) => FilterApi.notEq(
+        binaryColumn(n),
+        Option(v).map(s => 
Binary.fromByteArray(s.asInstanceOf[UTF8String].getBytes)).orNull)
+    case BinaryType =>
+      (n: String, v: Any) => FilterApi.notEq(
+        binaryColumn(n),
+        Option(v).map(b => 
Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
+  }
+
+  private val makeLt: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
+    case IntegerType =>
+      (n: String, v: Any) => FilterApi.lt(intColumn(n), 
v.asInstanceOf[Integer])
+    case LongType =>
+      (n: String, v: Any) => FilterApi.lt(longColumn(n), 
v.asInstanceOf[java.lang.Long])
+    case FloatType =>
+      (n: String, v: Any) => FilterApi.lt(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
+    case DoubleType =>
+      (n: String, v: Any) => FilterApi.lt(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
+    case StringType =>
+      (n: String, v: Any) =>
+        FilterApi.lt(binaryColumn(n), 
Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
+    case BinaryType =>
+      (n: String, v: Any) =>
+        FilterApi.lt(binaryColumn(n), 
Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+  }
+
+  private val makeLtEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
+    case IntegerType =>
+      (n: String, v: Any) => FilterApi.ltEq(intColumn(n), 
v.asInstanceOf[java.lang.Integer])
+    case LongType =>
+      (n: String, v: Any) => FilterApi.ltEq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
+    case FloatType =>
+      (n: String, v: Any) => FilterApi.ltEq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
+    case DoubleType =>
+      (n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
+    case StringType =>
+      (n: String, v: Any) =>
+        FilterApi.ltEq(binaryColumn(n), 
Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
+    case BinaryType =>
+      (n: String, v: Any) =>
+        FilterApi.ltEq(binaryColumn(n), 
Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+  }
+
+  private val makeGt: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
+    case IntegerType =>
+      (n: String, v: Any) => FilterApi.gt(intColumn(n), 
v.asInstanceOf[java.lang.Integer])
+    case LongType =>
+      (n: String, v: Any) => FilterApi.gt(longColumn(n), 
v.asInstanceOf[java.lang.Long])
+    case FloatType =>
+      (n: String, v: Any) => FilterApi.gt(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
+    case DoubleType =>
+      (n: String, v: Any) => FilterApi.gt(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
+    case StringType =>
+      (n: String, v: Any) =>
+        FilterApi.gt(binaryColumn(n), 
Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
+    case BinaryType =>
+      (n: String, v: Any) =>
+        FilterApi.gt(binaryColumn(n), 
Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+  }
+
+  private val makeGtEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
+    case IntegerType =>
+      (n: String, v: Any) => FilterApi.gtEq(intColumn(n), 
v.asInstanceOf[java.lang.Integer])
+    case LongType =>
+      (n: String, v: Any) => FilterApi.gtEq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
+    case FloatType =>
+      (n: String, v: Any) => FilterApi.gtEq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
+    case DoubleType =>
+      (n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
+    case StringType =>
+      (n: String, v: Any) =>
+        FilterApi.gtEq(binaryColumn(n), 
Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
+    case BinaryType =>
+      (n: String, v: Any) =>
+        FilterApi.gtEq(binaryColumn(n), 
Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+  }
+
+  private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => 
FilterPredicate] = {
+    case IntegerType =>
+      (n: String, v: Set[Any]) =>
+        FilterApi.userDefined(intColumn(n), 
SetInFilter(v.asInstanceOf[Set[java.lang.Integer]]))
+    case LongType =>
+      (n: String, v: Set[Any]) =>
+        FilterApi.userDefined(longColumn(n), 
SetInFilter(v.asInstanceOf[Set[java.lang.Long]]))
+    case FloatType =>
+      (n: String, v: Set[Any]) =>
+        FilterApi.userDefined(floatColumn(n), 
SetInFilter(v.asInstanceOf[Set[java.lang.Float]]))
+    case DoubleType =>
+      (n: String, v: Set[Any]) =>
+        FilterApi.userDefined(doubleColumn(n), 
SetInFilter(v.asInstanceOf[Set[java.lang.Double]]))
+    case StringType =>
+      (n: String, v: Set[Any]) =>
+        FilterApi.userDefined(binaryColumn(n),
+          SetInFilter(v.map(e => 
Binary.fromByteArray(e.asInstanceOf[UTF8String].getBytes))))
+    case BinaryType =>
+      (n: String, v: Set[Any]) =>
+        FilterApi.userDefined(binaryColumn(n),
+          SetInFilter(v.map(e => 
Binary.fromByteArray(e.asInstanceOf[Array[Byte]]))))
+  }
+
+  /**
+   * Converts data sources filters to Parquet filter predicates.
+   */
+  def createFilter(schema: StructType, predicate: sources.Filter): 
Option[FilterPredicate] = {
+    val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
+
+    // NOTE:
+    //
+    // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` 
evaluate to `NULL`,
+    // which can be casted to `false` implicitly. Please refer to the `eval` 
method of these
+    // operators and the `SimplifyFilters` rule for details.
+    predicate match {
+      case sources.IsNull(name) =>
+        makeEq.lift(dataTypeOf(name)).map(_(name, null))
+      case sources.IsNotNull(name) =>
+        makeNotEq.lift(dataTypeOf(name)).map(_(name, null))
+
+      case sources.EqualTo(name, value) =>
+        makeEq.lift(dataTypeOf(name)).map(_(name, value))
+      case sources.Not(sources.EqualTo(name, value)) =>
+        makeNotEq.lift(dataTypeOf(name)).map(_(name, value))
+
+      case sources.LessThan(name, value) =>
+        makeLt.lift(dataTypeOf(name)).map(_(name, value))
+      case sources.LessThanOrEqual(name, value) =>
+        makeLtEq.lift(dataTypeOf(name)).map(_(name, value))
+
+      case sources.GreaterThan(name, value) =>
+        makeGt.lift(dataTypeOf(name)).map(_(name, value))
+      case sources.GreaterThanOrEqual(name, value) =>
+        makeGtEq.lift(dataTypeOf(name)).map(_(name, value))
+
+      case sources.And(lhs, rhs) =>
+        (createFilter(schema, lhs) ++ createFilter(schema, 
rhs)).reduceOption(FilterApi.and)
+
+      case sources.Or(lhs, rhs) =>
+        for {
+          lhsFilter <- createFilter(schema, lhs)
+          rhsFilter <- createFilter(schema, rhs)
+        } yield FilterApi.or(lhsFilter, rhsFilter)
+
+      case sources.Not(pred) =>
+        createFilter(schema, pred).map(FilterApi.not)
+
+      case _ => None
+    }
+  }
+
+  /**
+   * Converts Catalyst predicate expressions to Parquet filter predicates.
+   *
+   * @todo This can be removed once we get rid of the old Parquet support.
+   */
+  def createFilter(predicate: Expression): Option[FilterPredicate] = {
+    // NOTE:
+    //
+    // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` 
evaluate to `NULL`,
+    // which can be casted to `false` implicitly. Please refer to the `eval` 
method of these
+    // operators and the `SimplifyFilters` rule for details.
+    predicate match {
+      case IsNull(NamedExpression(name, dataType)) =>
+        makeEq.lift(dataType).map(_(name, null))
+      case IsNotNull(NamedExpression(name, dataType)) =>
+        makeNotEq.lift(dataType).map(_(name, null))
+
+      case EqualTo(NamedExpression(name, _), NonNullLiteral(value, dataType)) 
=>
+        makeEq.lift(dataType).map(_(name, value))
+      case EqualTo(Cast(NamedExpression(name, _), dataType), 
NonNullLiteral(value, _)) =>
+        makeEq.lift(dataType).map(_(name, value))
+      case EqualTo(NonNullLiteral(value, dataType), NamedExpression(name, _)) 
=>
+        makeEq.lift(dataType).map(_(name, value))
+      case EqualTo(NonNullLiteral(value, _), Cast(NamedExpression(name, _), 
dataType)) =>
+        makeEq.lift(dataType).map(_(name, value))
+
+      case Not(EqualTo(NamedExpression(name, _), NonNullLiteral(value, 
dataType))) =>
+        makeNotEq.lift(dataType).map(_(name, value))
+      case Not(EqualTo(Cast(NamedExpression(name, _), dataType), 
NonNullLiteral(value, _))) =>
+        makeNotEq.lift(dataType).map(_(name, value))
+      case Not(EqualTo(NonNullLiteral(value, dataType), NamedExpression(name, 
_))) =>
+        makeNotEq.lift(dataType).map(_(name, value))
+      case Not(EqualTo(NonNullLiteral(value, _), Cast(NamedExpression(name, 
_), dataType))) =>
+        makeNotEq.lift(dataType).map(_(name, value))
+
+      case LessThan(NamedExpression(name, _), NonNullLiteral(value, dataType)) 
=>
+        makeLt.lift(dataType).map(_(name, value))
+      case LessThan(Cast(NamedExpression(name, _), dataType), 
NonNullLiteral(value, _)) =>
+        makeLt.lift(dataType).map(_(name, value))
+      case LessThan(NonNullLiteral(value, dataType), NamedExpression(name, _)) 
=>
+        makeGt.lift(dataType).map(_(name, value))
+      case LessThan(NonNullLiteral(value, _), Cast(NamedExpression(name, _), 
dataType)) =>
+        makeGt.lift(dataType).map(_(name, value))
+
+      case LessThanOrEqual(NamedExpression(name, _), NonNullLiteral(value, 
dataType)) =>
+        makeLtEq.lift(dataType).map(_(name, value))
+      case LessThanOrEqual(Cast(NamedExpression(name, _), dataType), 
NonNullLiteral(value, _)) =>
+        makeLtEq.lift(dataType).map(_(name, value))
+      case LessThanOrEqual(NonNullLiteral(value, dataType), 
NamedExpression(name, _)) =>
+        makeGtEq.lift(dataType).map(_(name, value))
+      case LessThanOrEqual(NonNullLiteral(value, _), 
Cast(NamedExpression(name, _), dataType)) =>
+        makeGtEq.lift(dataType).map(_(name, value))
+
+      case GreaterThan(NamedExpression(name, _), NonNullLiteral(value, 
dataType)) =>
+        makeGt.lift(dataType).map(_(name, value))
+      case GreaterThan(Cast(NamedExpression(name, _), dataType), 
NonNullLiteral(value, _)) =>
+        makeGt.lift(dataType).map(_(name, value))
+      case GreaterThan(NonNullLiteral(value, dataType), NamedExpression(name, 
_)) =>
+        makeLt.lift(dataType).map(_(name, value))
+      case GreaterThan(NonNullLiteral(value, _), Cast(NamedExpression(name, 
_), dataType)) =>
+        makeLt.lift(dataType).map(_(name, value))
+
+      case GreaterThanOrEqual(NamedExpression(name, _), NonNullLiteral(value, 
dataType)) =>
+        makeGtEq.lift(dataType).map(_(name, value))
+      case GreaterThanOrEqual(Cast(NamedExpression(name, _), dataType), 
NonNullLiteral(value, _)) =>
+        makeGtEq.lift(dataType).map(_(name, value))
+      case GreaterThanOrEqual(NonNullLiteral(value, dataType), 
NamedExpression(name, _)) =>
+        makeLtEq.lift(dataType).map(_(name, value))
+      case GreaterThanOrEqual(NonNullLiteral(value, _), 
Cast(NamedExpression(name, _), dataType)) =>
+        makeLtEq.lift(dataType).map(_(name, value))
+
+      case And(lhs, rhs) =>
+        (createFilter(lhs) ++ createFilter(rhs)).reduceOption(FilterApi.and)
+
+      case Or(lhs, rhs) =>
+        for {
+          lhsFilter <- createFilter(lhs)
+          rhsFilter <- createFilter(rhs)
+        } yield FilterApi.or(lhsFilter, rhsFilter)
+
+      case Not(pred) =>
+        createFilter(pred).map(FilterApi.not)
+
+      case InSet(NamedExpression(name, dataType), valueSet) =>
+        makeInSet.lift(dataType).map(_(name, valueSet))
+
+      case _ => None
+    }
+  }
+
+  /**
+   * Note: Inside the Hadoop API we only have access to `Configuration`, not to
+   * [[org.apache.spark.SparkContext]], so we cannot use broadcasts to convey
+   * the actual filter predicate.
+   */
+  def serializeFilterExpressions(filters: Seq[Expression], conf: 
Configuration): Unit = {
+    if (filters.nonEmpty) {
+      val serialized: Array[Byte] =
+        SparkEnv.get.closureSerializer.newInstance().serialize(filters).array()
+      val encoded: String = BaseEncoding.base64().encode(serialized)
+      conf.set(PARQUET_FILTER_DATA, encoded)
+    }
+  }
+
+  /**
+   * Note: Inside the Hadoop API we only have access to `Configuration`, not to
+   * [[org.apache.spark.SparkContext]], so we cannot use broadcasts to convey
+   * the actual filter predicate.
+   */
+  def deserializeFilterExpressions(conf: Configuration): Seq[Expression] = {
+    val data = conf.get(PARQUET_FILTER_DATA)
+    if (data != null) {
+      val decoded: Array[Byte] = BaseEncoding.base64().decode(data)
+      
SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(decoded))
+    } else {
+      Seq()
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to