http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala
deleted file mode 100644
index 975fec1..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import java.util.{Map => JMap}
-
-import scala.collection.JavaConversions.{iterableAsScalaIterable, 
mapAsJavaMap, mapAsScalaMap}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
-import org.apache.parquet.hadoop.api.{InitContext, ReadSupport}
-import org.apache.parquet.io.api.RecordMaterializer
-import org.apache.parquet.schema.MessageType
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types.StructType
-
-private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] 
with Logging {
-  override def prepareForRead(
-      conf: Configuration,
-      keyValueMetaData: JMap[String, String],
-      fileSchema: MessageType,
-      readContext: ReadContext): RecordMaterializer[InternalRow] = {
-    log.debug(s"Preparing for read Parquet file with message type: 
$fileSchema")
-
-    val toCatalyst = new CatalystSchemaConverter(conf)
-    val parquetRequestedSchema = readContext.getRequestedSchema
-
-    val catalystRequestedSchema =
-      Option(readContext.getReadSupportMetadata).map(_.toMap).flatMap { 
metadata =>
-        metadata
-          // First tries to read requested schema, which may result from 
projections
-          .get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
-          // If not available, tries to read Catalyst schema from file 
metadata.  It's only
-          // available if the target file is written by Spark SQL.
-          .orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY))
-      }.map(StructType.fromString).getOrElse {
-        logDebug("Catalyst schema not available, falling back to Parquet 
schema")
-        toCatalyst.convert(parquetRequestedSchema)
-      }
-
-    logDebug(s"Catalyst schema used to read Parquet files: 
$catalystRequestedSchema")
-    new CatalystRecordMaterializer(parquetRequestedSchema, 
catalystRequestedSchema)
-  }
-
-  override def init(context: InitContext): ReadContext = {
-    val conf = context.getConfiguration
-
-    // If the target file was written by Spark SQL, we should be able to find 
a serialized Catalyst
-    // schema of this file from its the metadata.
-    val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))
-
-    // Optional schema of requested columns, in the form of a string 
serialized from a Catalyst
-    // `StructType` containing all requested columns.
-    val maybeRequestedSchema = 
Option(conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA))
-
-    // Below we construct a Parquet schema containing all requested columns.  
This schema tells
-    // Parquet which columns to read.
-    //
-    // If `maybeRequestedSchema` is defined, we assemble an equivalent Parquet 
schema.  Otherwise,
-    // we have to fallback to the full file schema which contains all columns 
in the file.
-    // Obviously this may waste IO bandwidth since it may read more columns 
than requested.
-    //
-    // Two things to note:
-    //
-    // 1. It's possible that some requested columns don't exist in the target 
Parquet file.  For
-    //    example, in the case of schema merging, the globally merged schema 
may contain extra
-    //    columns gathered from other Parquet files.  These columns will be 
simply filled with nulls
-    //    when actually reading the target Parquet file.
-    //
-    // 2. When `maybeRequestedSchema` is available, we can't simply convert 
the Catalyst schema to
-    //    Parquet schema using `CatalystSchemaConverter`, because the mapping 
is not unique due to
-    //    non-standard behaviors of some Parquet libraries/tools.  For 
example, a Parquet file
-    //    containing a single integer array field `f1` may have the following 
legacy 2-level
-    //    structure:
-    //
-    //      message root {
-    //        optional group f1 (LIST) {
-    //          required INT32 element;
-    //        }
-    //      }
-    //
-    //    while `CatalystSchemaConverter` may generate a standard 3-level 
structure:
-    //
-    //      message root {
-    //        optional group f1 (LIST) {
-    //          repeated group list {
-    //            required INT32 element;
-    //          }
-    //        }
-    //      }
-    //
-    //    Apparently, we can't use the 2nd schema to read the target Parquet 
file as they have
-    //    different physical structures.
-    val parquetRequestedSchema =
-      maybeRequestedSchema.fold(context.getFileSchema) { schemaString =>
-        val toParquet = new CatalystSchemaConverter(conf)
-        val fileSchema = context.getFileSchema.asGroupType()
-        val fileFieldNames = fileSchema.getFields.map(_.getName).toSet
-
-        StructType
-          // Deserializes the Catalyst schema of requested columns
-          .fromString(schemaString)
-          .map { field =>
-            if (fileFieldNames.contains(field.name)) {
-              // If the field exists in the target Parquet file, extracts the 
field type from the
-              // full file schema and makes a single-field Parquet schema
-              new MessageType("root", fileSchema.getType(field.name))
-            } else {
-              // Otherwise, just resorts to `CatalystSchemaConverter`
-              toParquet.convert(StructType(Array(field)))
-            }
-          }
-          // Merges all single-field Parquet schemas to form a complete schema 
for all requested
-          // columns.  Note that it's possible that no columns are requested 
at all (e.g., count
-          // some partition column of a partitioned Parquet table). That's why 
`fold` is used here
-          // and always fallback to an empty Parquet schema.
-          .fold(new MessageType("root")) {
-            _ union _
-          }
-      }
-
-    val metadata =
-      Map.empty[String, String] ++
-        
maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
-        maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)
-
-    logInfo(s"Going to read Parquet file with these requested columns: 
$parquetRequestedSchema")
-    new ReadContext(parquetRequestedSchema, metadata)
-  }
-}
-
-private[parquet] object CatalystReadSupport {
-  val SPARK_ROW_REQUESTED_SCHEMA = 
"org.apache.spark.sql.parquet.row.requested_schema"
-
-  val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRecordMaterializer.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRecordMaterializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRecordMaterializer.scala
deleted file mode 100644
index 84f1dcc..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRecordMaterializer.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer}
-import org.apache.parquet.schema.MessageType
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types.StructType
-
-/**
- * A [[RecordMaterializer]] for Catalyst rows.
- *
- * @param parquetSchema Parquet schema of the records to be read
- * @param catalystSchema Catalyst schema of the rows to be constructed
- */
-private[parquet] class CatalystRecordMaterializer(
-    parquetSchema: MessageType, catalystSchema: StructType)
-  extends RecordMaterializer[InternalRow] {
-
-  private val rootConverter = new CatalystRowConverter(parquetSchema, 
catalystSchema, NoopUpdater)
-
-  override def getCurrentRecord: InternalRow = rootConverter.currentRow
-
-  override def getRootConverter: GroupConverter = rootConverter
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRowConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRowConverter.scala
deleted file mode 100644
index 4fe8a39..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRowConverter.scala
+++ /dev/null
@@ -1,449 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import java.math.{BigDecimal, BigInteger}
-import java.nio.ByteOrder
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.parquet.column.Dictionary
-import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, 
PrimitiveConverter}
-import org.apache.parquet.schema.Type.Repetition
-import org.apache.parquet.schema.{GroupType, PrimitiveType, Type}
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
-
-/**
- * A [[ParentContainerUpdater]] is used by a Parquet converter to set 
converted values to some
- * corresponding parent container. For example, a converter for a `StructType` 
field may set
- * converted values to a [[MutableRow]]; or a converter for array elements may 
append converted
- * values to an [[ArrayBuffer]].
- */
-private[parquet] trait ParentContainerUpdater {
-  def set(value: Any): Unit = ()
-  def setBoolean(value: Boolean): Unit = set(value)
-  def setByte(value: Byte): Unit = set(value)
-  def setShort(value: Short): Unit = set(value)
-  def setInt(value: Int): Unit = set(value)
-  def setLong(value: Long): Unit = set(value)
-  def setFloat(value: Float): Unit = set(value)
-  def setDouble(value: Double): Unit = set(value)
-}
-
-/** A no-op updater used for root converter (who doesn't have a parent). */
-private[parquet] object NoopUpdater extends ParentContainerUpdater
-
-/**
- * A [[CatalystRowConverter]] is used to convert Parquet "structs" into Spark 
SQL [[InternalRow]]s.
- * Since any Parquet record is also a struct, this converter can also be used 
as root converter.
- *
- * When used as a root converter, [[NoopUpdater]] should be used since root 
converters don't have
- * any "parent" container.
- *
- * @param parquetType Parquet schema of Parquet records
- * @param catalystType Spark SQL schema that corresponds to the Parquet record 
type
- * @param updater An updater which propagates converted field values to the 
parent container
- */
-private[parquet] class CatalystRowConverter(
-    parquetType: GroupType,
-    catalystType: StructType,
-    updater: ParentContainerUpdater)
-  extends GroupConverter {
-
-  /**
-   * Updater used together with field converters within a 
[[CatalystRowConverter]].  It propagates
-   * converted filed values to the `ordinal`-th cell in `currentRow`.
-   */
-  private final class RowUpdater(row: MutableRow, ordinal: Int) extends 
ParentContainerUpdater {
-    override def set(value: Any): Unit = row(ordinal) = value
-    override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, 
value)
-    override def setByte(value: Byte): Unit = row.setByte(ordinal, value)
-    override def setShort(value: Short): Unit = row.setShort(ordinal, value)
-    override def setInt(value: Int): Unit = row.setInt(ordinal, value)
-    override def setLong(value: Long): Unit = row.setLong(ordinal, value)
-    override def setDouble(value: Double): Unit = row.setDouble(ordinal, value)
-    override def setFloat(value: Float): Unit = row.setFloat(ordinal, value)
-  }
-
-  /**
-   * Represents the converted row object once an entire Parquet record is 
converted.
-   *
-   * @todo Uses [[UnsafeRow]] for better performance.
-   */
-  val currentRow = new SpecificMutableRow(catalystType.map(_.dataType))
-
-  // Converters for each field.
-  private val fieldConverters: Array[Converter] = {
-    parquetType.getFields.zip(catalystType).zipWithIndex.map {
-      case ((parquetFieldType, catalystField), ordinal) =>
-        // Converted field value should be set to the `ordinal`-th cell of 
`currentRow`
-        newConverter(parquetFieldType, catalystField.dataType, new 
RowUpdater(currentRow, ordinal))
-    }.toArray
-  }
-
-  override def getConverter(fieldIndex: Int): Converter = 
fieldConverters(fieldIndex)
-
-  override def end(): Unit = updater.set(currentRow)
-
-  override def start(): Unit = {
-    var i = 0
-    while (i < currentRow.numFields) {
-      currentRow.setNullAt(i)
-      i += 1
-    }
-  }
-
-  /**
-   * Creates a converter for the given Parquet type `parquetType` and Spark 
SQL data type
-   * `catalystType`. Converted values are handled by `updater`.
-   */
-  private def newConverter(
-      parquetType: Type,
-      catalystType: DataType,
-      updater: ParentContainerUpdater): Converter = {
-
-    catalystType match {
-      case BooleanType | IntegerType | LongType | FloatType | DoubleType | 
BinaryType =>
-        new CatalystPrimitiveConverter(updater)
-
-      case ByteType =>
-        new PrimitiveConverter {
-          override def addInt(value: Int): Unit =
-            updater.setByte(value.asInstanceOf[ByteType#InternalType])
-        }
-
-      case ShortType =>
-        new PrimitiveConverter {
-          override def addInt(value: Int): Unit =
-            updater.setShort(value.asInstanceOf[ShortType#InternalType])
-        }
-
-      case t: DecimalType =>
-        new CatalystDecimalConverter(t, updater)
-
-      case StringType =>
-        new CatalystStringConverter(updater)
-
-      case TimestampType =>
-        // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
-        new PrimitiveConverter {
-          // Converts nanosecond timestamps stored as INT96
-          override def addBinary(value: Binary): Unit = {
-            assert(
-              value.length() == 12,
-              "Timestamps (with nanoseconds) are expected to be stored in 
12-byte long binaries, " +
-              s"but got a ${value.length()}-byte binary.")
-
-            val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
-            val timeOfDayNanos = buf.getLong
-            val julianDay = buf.getInt
-            updater.setLong(DateTimeUtils.fromJulianDay(julianDay, 
timeOfDayNanos))
-          }
-        }
-
-      case DateType =>
-        new PrimitiveConverter {
-          override def addInt(value: Int): Unit = {
-            // DateType is not specialized in `SpecificMutableRow`, have to 
box it here.
-            updater.set(value.asInstanceOf[DateType#InternalType])
-          }
-        }
-
-      case t: ArrayType =>
-        new CatalystArrayConverter(parquetType.asGroupType(), t, updater)
-
-      case t: MapType =>
-        new CatalystMapConverter(parquetType.asGroupType(), t, updater)
-
-      case t: StructType =>
-        new CatalystRowConverter(parquetType.asGroupType(), t, new 
ParentContainerUpdater {
-          override def set(value: Any): Unit = 
updater.set(value.asInstanceOf[InternalRow].copy())
-        })
-
-      case t: UserDefinedType[_] =>
-        val catalystTypeForUDT = t.sqlType
-        val nullable = parquetType.isRepetition(Repetition.OPTIONAL)
-        val field = StructField("udt", catalystTypeForUDT, nullable)
-        val parquetTypeForUDT = new 
CatalystSchemaConverter().convertField(field)
-        newConverter(parquetTypeForUDT, catalystTypeForUDT, updater)
-
-      case _ =>
-        throw new RuntimeException(
-          s"Unable to create Parquet converter for data type 
${catalystType.json}")
-    }
-  }
-
-  /**
-   * Parquet converter for Parquet primitive types.  Note that not all Spark 
SQL atomic types
-   * are handled by this converter.  Parquet primitive types are only a subset 
of those of Spark
-   * SQL.  For example, BYTE, SHORT, and INT in Spark SQL are all covered by 
INT32 in Parquet.
-   */
-  private final class CatalystPrimitiveConverter(updater: 
ParentContainerUpdater)
-    extends PrimitiveConverter {
-
-    override def addBoolean(value: Boolean): Unit = updater.setBoolean(value)
-    override def addInt(value: Int): Unit = updater.setInt(value)
-    override def addLong(value: Long): Unit = updater.setLong(value)
-    override def addFloat(value: Float): Unit = updater.setFloat(value)
-    override def addDouble(value: Double): Unit = updater.setDouble(value)
-    override def addBinary(value: Binary): Unit = updater.set(value.getBytes)
-  }
-
-  /**
-   * Parquet converter for strings. A dictionary is used to minimize string 
decoding cost.
-   */
-  private final class CatalystStringConverter(updater: ParentContainerUpdater)
-    extends PrimitiveConverter {
-
-    private var expandedDictionary: Array[UTF8String] = null
-
-    override def hasDictionarySupport: Boolean = true
-
-    override def setDictionary(dictionary: Dictionary): Unit = {
-      this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { i =>
-        UTF8String.fromBytes(dictionary.decodeToBinary(i).getBytes)
-      }
-    }
-
-    override def addValueFromDictionary(dictionaryId: Int): Unit = {
-      updater.set(expandedDictionary(dictionaryId))
-    }
-
-    override def addBinary(value: Binary): Unit = {
-      updater.set(UTF8String.fromBytes(value.getBytes))
-    }
-  }
-
-  /**
-   * Parquet converter for fixed-precision decimals.
-   */
-  private final class CatalystDecimalConverter(
-      decimalType: DecimalType,
-      updater: ParentContainerUpdater)
-    extends PrimitiveConverter {
-
-    // Converts decimals stored as INT32
-    override def addInt(value: Int): Unit = {
-      addLong(value: Long)
-    }
-
-    // Converts decimals stored as INT64
-    override def addLong(value: Long): Unit = {
-      updater.set(Decimal(value, decimalType.precision, decimalType.scale))
-    }
-
-    // Converts decimals stored as either FIXED_LENGTH_BYTE_ARRAY or BINARY
-    override def addBinary(value: Binary): Unit = {
-      updater.set(toDecimal(value))
-    }
-
-    private def toDecimal(value: Binary): Decimal = {
-      val precision = decimalType.precision
-      val scale = decimalType.scale
-      val bytes = value.getBytes
-
-      if (precision <= CatalystSchemaConverter.MAX_PRECISION_FOR_INT64) {
-        // Constructs a `Decimal` with an unscaled `Long` value if possible.
-        var unscaled = 0L
-        var i = 0
-
-        while (i < bytes.length) {
-          unscaled = (unscaled << 8) | (bytes(i) & 0xff)
-          i += 1
-        }
-
-        val bits = 8 * bytes.length
-        unscaled = (unscaled << (64 - bits)) >> (64 - bits)
-        Decimal(unscaled, precision, scale)
-      } else {
-        // Otherwise, resorts to an unscaled `BigInteger` instead.
-        Decimal(new BigDecimal(new BigInteger(bytes), scale), precision, scale)
-      }
-    }
-  }
-
-  /**
-   * Parquet converter for arrays.  Spark SQL arrays are represented as 
Parquet lists.  Standard
-   * Parquet lists are represented as a 3-level group annotated by `LIST`:
-   * {{{
-   *   <list-repetition> group <name> (LIST) {            <-- parquetSchema 
points here
-   *     repeated group list {
-   *       <element-repetition> <element-type> element;
-   *     }
-   *   }
-   * }}}
-   * The `parquetSchema` constructor argument points to the outermost group.
-   *
-   * However, before this representation is standardized, some Parquet 
libraries/tools also use some
-   * non-standard formats to represent list-like structures.  
Backwards-compatibility rules for
-   * handling these cases are described in Parquet format spec.
-   *
-   * @see 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
-   */
-  private final class CatalystArrayConverter(
-      parquetSchema: GroupType,
-      catalystSchema: ArrayType,
-      updater: ParentContainerUpdater)
-    extends GroupConverter {
-
-    private var currentArray: ArrayBuffer[Any] = _
-
-    private val elementConverter: Converter = {
-      val repeatedType = parquetSchema.getType(0)
-      val elementType = catalystSchema.elementType
-
-      if (isElementType(repeatedType, elementType)) {
-        newConverter(repeatedType, elementType, new ParentContainerUpdater {
-          override def set(value: Any): Unit = currentArray += value
-        })
-      } else {
-        new ElementConverter(repeatedType.asGroupType().getType(0), 
elementType)
-      }
-    }
-
-    override def getConverter(fieldIndex: Int): Converter = elementConverter
-
-    override def end(): Unit = updater.set(new 
GenericArrayData(currentArray.toArray))
-
-    // NOTE: We can't reuse the mutable `ArrayBuffer` here and must 
instantiate a new buffer for the
-    // next value.  `Row.copy()` only copies row cells, it doesn't do deep 
copy to objects stored
-    // in row cells.
-    override def start(): Unit = currentArray = ArrayBuffer.empty[Any]
-
-    // scalastyle:off
-    /**
-     * Returns whether the given type is the element type of a list or is a 
syntactic group with
-     * one field that is the element type.  This is determined by checking 
whether the type can be
-     * a syntactic group and by checking whether a potential syntactic group 
matches the expected
-     * schema.
-     * {{{
-     *   <list-repetition> group <name> (LIST) {
-     *     repeated group list {                          <-- repeatedType 
points here
-     *       <element-repetition> <element-type> element;
-     *     }
-     *   }
-     * }}}
-     * In short, here we handle Parquet list backwards-compatibility rules on 
the read path.  This
-     * method is based on `AvroIndexedRecordConverter.isElementType`.
-     *
-     * @see 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
-     */
-    // scalastyle:on
-    private def isElementType(parquetRepeatedType: Type, catalystElementType: 
DataType): Boolean = {
-      (parquetRepeatedType, catalystElementType) match {
-        case (t: PrimitiveType, _) => true
-        case (t: GroupType, _) if t.getFieldCount > 1 => true
-        case (t: GroupType, StructType(Array(f))) if f.name == 
t.getFieldName(0) => true
-        case _ => false
-      }
-    }
-
-    /** Array element converter */
-    private final class ElementConverter(parquetType: Type, catalystType: 
DataType)
-      extends GroupConverter {
-
-      private var currentElement: Any = _
-
-      private val converter = newConverter(parquetType, catalystType, new 
ParentContainerUpdater {
-        override def set(value: Any): Unit = currentElement = value
-      })
-
-      override def getConverter(fieldIndex: Int): Converter = converter
-
-      override def end(): Unit = currentArray += currentElement
-
-      override def start(): Unit = currentElement = null
-    }
-  }
-
-  /** Parquet converter for maps */
-  private final class CatalystMapConverter(
-      parquetType: GroupType,
-      catalystType: MapType,
-      updater: ParentContainerUpdater)
-    extends GroupConverter {
-
-    private var currentKeys: ArrayBuffer[Any] = _
-    private var currentValues: ArrayBuffer[Any] = _
-
-    private val keyValueConverter = {
-      val repeatedType = parquetType.getType(0).asGroupType()
-      new KeyValueConverter(
-        repeatedType.getType(0),
-        repeatedType.getType(1),
-        catalystType.keyType,
-        catalystType.valueType)
-    }
-
-    override def getConverter(fieldIndex: Int): Converter = keyValueConverter
-
-    override def end(): Unit =
-      updater.set(ArrayBasedMapData(currentKeys.toArray, 
currentValues.toArray))
-
-    // NOTE: We can't reuse the mutable Map here and must instantiate a new 
`Map` for the next
-    // value.  `Row.copy()` only copies row cells, it doesn't do deep copy to 
objects stored in row
-    // cells.
-    override def start(): Unit = {
-      currentKeys = ArrayBuffer.empty[Any]
-      currentValues = ArrayBuffer.empty[Any]
-    }
-
-    /** Parquet converter for key-value pairs within the map. */
-    private final class KeyValueConverter(
-        parquetKeyType: Type,
-        parquetValueType: Type,
-        catalystKeyType: DataType,
-        catalystValueType: DataType)
-      extends GroupConverter {
-
-      private var currentKey: Any = _
-
-      private var currentValue: Any = _
-
-      private val converters = Array(
-        // Converter for keys
-        newConverter(parquetKeyType, catalystKeyType, new 
ParentContainerUpdater {
-          override def set(value: Any): Unit = currentKey = value
-        }),
-
-        // Converter for values
-        newConverter(parquetValueType, catalystValueType, new 
ParentContainerUpdater {
-          override def set(value: Any): Unit = currentValue = value
-        }))
-
-      override def getConverter(fieldIndex: Int): Converter = 
converters(fieldIndex)
-
-      override def end(): Unit = {
-        currentKeys += currentKey
-        currentValues += currentValue
-      }
-
-      override def start(): Unit = {
-        currentKey = null
-        currentValue = null
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala
deleted file mode 100644
index b12149d..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala
+++ /dev/null
@@ -1,592 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import scala.collection.JavaConversions._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.schema.OriginalType._
-import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
-import org.apache.parquet.schema.Type.Repetition._
-import org.apache.parquet.schema._
-
-import 
org.apache.spark.sql.parquet.CatalystSchemaConverter.{MAX_PRECISION_FOR_INT32, 
MAX_PRECISION_FOR_INT64, maxPrecisionForBytes}
-import org.apache.spark.sql.types._
-import org.apache.spark.sql.{AnalysisException, SQLConf}
-
-/**
- * This converter class is used to convert Parquet [[MessageType]] to Spark 
SQL [[StructType]] and
- * vice versa.
- *
- * Parquet format backwards-compatibility rules are respected when converting 
Parquet
- * [[MessageType]] schemas.
- *
- * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
- *
- * @constructor
- * @param assumeBinaryIsString Whether unannotated BINARY fields should be 
assumed to be Spark SQL
- *        [[StringType]] fields when converting Parquet a [[MessageType]] to 
Spark SQL
- *        [[StructType]].
- * @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be 
assumed to be Spark SQL
- *        [[TimestampType]] fields when converting Parquet a [[MessageType]] 
to Spark SQL
- *        [[StructType]].  Note that Spark SQL [[TimestampType]] is similar to 
Hive timestamp, which
- *        has optional nanosecond precision, but different from `TIME_MILLS` 
and `TIMESTAMP_MILLIS`
- *        described in Parquet format spec.
- * @param followParquetFormatSpec Whether to generate standard DECIMAL, LIST, 
and MAP structure when
- *        converting Spark SQL [[StructType]] to Parquet [[MessageType]].  For 
Spark 1.4.x and
- *        prior versions, Spark SQL only supports decimals with a max 
precision of 18 digits, and
- *        uses non-standard LIST and MAP structure.  Note that the current 
Parquet format spec is
- *        backwards-compatible with these settings.  If this argument is set 
to `false`, we fallback
- *        to old style non-standard behaviors.
- */
-private[parquet] class CatalystSchemaConverter(
-    private val assumeBinaryIsString: Boolean,
-    private val assumeInt96IsTimestamp: Boolean,
-    private val followParquetFormatSpec: Boolean) {
-
-  // Only used when constructing converter for converting Spark SQL schema to 
Parquet schema, in
-  // which case `assumeInt96IsTimestamp` and `assumeBinaryIsString` are 
irrelevant.
-  def this() = this(
-    assumeBinaryIsString = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
-    assumeInt96IsTimestamp = 
SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
-    followParquetFormatSpec = 
SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get)
-
-  def this(conf: SQLConf) = this(
-    assumeBinaryIsString = conf.isParquetBinaryAsString,
-    assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
-    followParquetFormatSpec = conf.followParquetFormatSpec)
-
-  def this(conf: Configuration) = this(
-    assumeBinaryIsString =
-      conf.getBoolean(
-        SQLConf.PARQUET_BINARY_AS_STRING.key,
-        SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get),
-    assumeInt96IsTimestamp =
-      conf.getBoolean(
-        SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
-        SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get),
-    followParquetFormatSpec =
-      conf.getBoolean(
-        SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key,
-        SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get))
-
-  /**
-   * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL 
[[StructType]].
-   */
-  def convert(parquetSchema: MessageType): StructType = 
convert(parquetSchema.asGroupType())
-
-  private def convert(parquetSchema: GroupType): StructType = {
-    val fields = parquetSchema.getFields.map { field =>
-      field.getRepetition match {
-        case OPTIONAL =>
-          StructField(field.getName, convertField(field), nullable = true)
-
-        case REQUIRED =>
-          StructField(field.getName, convertField(field), nullable = false)
-
-        case REPEATED =>
-          throw new AnalysisException(
-            s"REPEATED not supported outside LIST or MAP. Type: $field")
-      }
-    }
-
-    StructType(fields)
-  }
-
-  /**
-   * Converts a Parquet [[Type]] to a Spark SQL [[DataType]].
-   */
-  def convertField(parquetType: Type): DataType = parquetType match {
-    case t: PrimitiveType => convertPrimitiveField(t)
-    case t: GroupType => convertGroupField(t.asGroupType())
-  }
-
-  private def convertPrimitiveField(field: PrimitiveType): DataType = {
-    val typeName = field.getPrimitiveTypeName
-    val originalType = field.getOriginalType
-
-    def typeString =
-      if (originalType == null) s"$typeName" else s"$typeName ($originalType)"
-
-    def typeNotImplemented() =
-      throw new AnalysisException(s"Parquet type not yet supported: 
$typeString")
-
-    def illegalType() =
-      throw new AnalysisException(s"Illegal Parquet type: $typeString")
-
-    // When maxPrecision = -1, we skip precision range check, and always 
respect the precision
-    // specified in field.getDecimalMetadata.  This is useful when 
interpreting decimal types stored
-    // as binaries with variable lengths.
-    def makeDecimalType(maxPrecision: Int = -1): DecimalType = {
-      val precision = field.getDecimalMetadata.getPrecision
-      val scale = field.getDecimalMetadata.getScale
-
-      CatalystSchemaConverter.analysisRequire(
-        maxPrecision == -1 || 1 <= precision && precision <= maxPrecision,
-        s"Invalid decimal precision: $typeName cannot store $precision digits 
(max $maxPrecision)")
-
-      DecimalType(precision, scale)
-    }
-
-    typeName match {
-      case BOOLEAN => BooleanType
-
-      case FLOAT => FloatType
-
-      case DOUBLE => DoubleType
-
-      case INT32 =>
-        originalType match {
-          case INT_8 => ByteType
-          case INT_16 => ShortType
-          case INT_32 | null => IntegerType
-          case DATE => DateType
-          case DECIMAL => makeDecimalType(MAX_PRECISION_FOR_INT32)
-          case TIME_MILLIS => typeNotImplemented()
-          case _ => illegalType()
-        }
-
-      case INT64 =>
-        originalType match {
-          case INT_64 | null => LongType
-          case DECIMAL => makeDecimalType(MAX_PRECISION_FOR_INT64)
-          case TIMESTAMP_MILLIS => typeNotImplemented()
-          case _ => illegalType()
-        }
-
-      case INT96 =>
-        CatalystSchemaConverter.analysisRequire(
-          assumeInt96IsTimestamp,
-          "INT96 is not supported unless it's interpreted as timestamp. " +
-            s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to 
true.")
-        TimestampType
-
-      case BINARY =>
-        originalType match {
-          case UTF8 | ENUM => StringType
-          case null if assumeBinaryIsString => StringType
-          case null => BinaryType
-          case DECIMAL => makeDecimalType()
-          case _ => illegalType()
-        }
-
-      case FIXED_LEN_BYTE_ARRAY =>
-        originalType match {
-          case DECIMAL => 
makeDecimalType(maxPrecisionForBytes(field.getTypeLength))
-          case INTERVAL => typeNotImplemented()
-          case _ => illegalType()
-        }
-
-      case _ => illegalType()
-    }
-  }
-
-  private def convertGroupField(field: GroupType): DataType = {
-    Option(field.getOriginalType).fold(convert(field): DataType) {
-      // A Parquet list is represented as a 3-level structure:
-      //
-      //   <list-repetition> group <name> (LIST) {
-      //     repeated group list {
-      //       <element-repetition> <element-type> element;
-      //     }
-      //   }
-      //
-      // However, according to the most recent Parquet format spec (not 
released yet up until
-      // writing), some 2-level structures are also recognized for 
backwards-compatibility.  Thus,
-      // we need to check whether the 2nd level or the 3rd level refers to 
list element type.
-      //
-      // See: 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
-      case LIST =>
-        CatalystSchemaConverter.analysisRequire(
-          field.getFieldCount == 1, s"Invalid list type $field")
-
-        val repeatedType = field.getType(0)
-        CatalystSchemaConverter.analysisRequire(
-          repeatedType.isRepetition(REPEATED), s"Invalid list type $field")
-
-        if (isElementType(repeatedType, field.getName)) {
-          ArrayType(convertField(repeatedType), containsNull = false)
-        } else {
-          val elementType = repeatedType.asGroupType().getType(0)
-          val optional = elementType.isRepetition(OPTIONAL)
-          ArrayType(convertField(elementType), containsNull = optional)
-        }
-
-      // scalastyle:off
-      // `MAP_KEY_VALUE` is for backwards-compatibility
-      // See: 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1
-      // scalastyle:on
-      case MAP | MAP_KEY_VALUE =>
-        CatalystSchemaConverter.analysisRequire(
-          field.getFieldCount == 1 && !field.getType(0).isPrimitive,
-          s"Invalid map type: $field")
-
-        val keyValueType = field.getType(0).asGroupType()
-        CatalystSchemaConverter.analysisRequire(
-          keyValueType.isRepetition(REPEATED) && keyValueType.getFieldCount == 
2,
-          s"Invalid map type: $field")
-
-        val keyType = keyValueType.getType(0)
-        CatalystSchemaConverter.analysisRequire(
-          keyType.isPrimitive,
-          s"Map key type is expected to be a primitive type, but found: 
$keyType")
-
-        val valueType = keyValueType.getType(1)
-        val valueOptional = valueType.isRepetition(OPTIONAL)
-        MapType(
-          convertField(keyType),
-          convertField(valueType),
-          valueContainsNull = valueOptional)
-
-      case _ =>
-        throw new AnalysisException(s"Unrecognized Parquet type: $field")
-    }
-  }
-
-  // scalastyle:off
-  // Here we implement Parquet LIST backwards-compatibility rules.
-  // See: 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
-  // scalastyle:on
-  private def isElementType(repeatedType: Type, parentName: String): Boolean = 
{
-    {
-      // For legacy 2-level list types with primitive element type, e.g.:
-      //
-      //    // List<Integer> (nullable list, non-null elements)
-      //    optional group my_list (LIST) {
-      //      repeated int32 element;
-      //    }
-      //
-      repeatedType.isPrimitive
-    } || {
-      // For legacy 2-level list types whose element type is a group type with 
2 or more fields,
-      // e.g.:
-      //
-      //    // List<Tuple<String, Integer>> (nullable list, non-null elements)
-      //    optional group my_list (LIST) {
-      //      repeated group element {
-      //        required binary str (UTF8);
-      //        required int32 num;
-      //      };
-      //    }
-      //
-      repeatedType.asGroupType().getFieldCount > 1
-    } || {
-      // For legacy 2-level list types generated by parquet-avro (Parquet 
version < 1.6.0), e.g.:
-      //
-      //    // List<OneTuple<String>> (nullable list, non-null elements)
-      //    optional group my_list (LIST) {
-      //      repeated group array {
-      //        required binary str (UTF8);
-      //      };
-      //    }
-      //
-      repeatedType.getName == "array"
-    } || {
-      // For Parquet data generated by parquet-thrift, e.g.:
-      //
-      //    // List<OneTuple<String>> (nullable list, non-null elements)
-      //    optional group my_list (LIST) {
-      //      repeated group my_list_tuple {
-      //        required binary str (UTF8);
-      //      };
-      //    }
-      //
-      repeatedType.getName == s"${parentName}_tuple"
-    }
-  }
-
-  /**
-   * Converts a Spark SQL [[StructType]] to a Parquet [[MessageType]].
-   */
-  def convert(catalystSchema: StructType): MessageType = {
-    Types.buildMessage().addFields(catalystSchema.map(convertField): 
_*).named("root")
-  }
-
-  /**
-   * Converts a Spark SQL [[StructField]] to a Parquet [[Type]].
-   */
-  def convertField(field: StructField): Type = {
-    convertField(field, if (field.nullable) OPTIONAL else REQUIRED)
-  }
-
-  private def convertField(field: StructField, repetition: Type.Repetition): 
Type = {
-    CatalystSchemaConverter.checkFieldName(field.name)
-
-    field.dataType match {
-      // ===================
-      // Simple atomic types
-      // ===================
-
-      case BooleanType =>
-        Types.primitive(BOOLEAN, repetition).named(field.name)
-
-      case ByteType =>
-        Types.primitive(INT32, repetition).as(INT_8).named(field.name)
-
-      case ShortType =>
-        Types.primitive(INT32, repetition).as(INT_16).named(field.name)
-
-      case IntegerType =>
-        Types.primitive(INT32, repetition).named(field.name)
-
-      case LongType =>
-        Types.primitive(INT64, repetition).named(field.name)
-
-      case FloatType =>
-        Types.primitive(FLOAT, repetition).named(field.name)
-
-      case DoubleType =>
-        Types.primitive(DOUBLE, repetition).named(field.name)
-
-      case StringType =>
-        Types.primitive(BINARY, repetition).as(UTF8).named(field.name)
-
-      case DateType =>
-        Types.primitive(INT32, repetition).as(DATE).named(field.name)
-
-      // NOTE: Spark SQL TimestampType is NOT a well defined type in Parquet 
format spec.
-      //
-      // As stated in PARQUET-323, Parquet `INT96` was originally introduced 
to represent nanosecond
-      // timestamp in Impala for some historical reasons, it's not recommended 
to be used for any
-      // other types and will probably be deprecated in future Parquet format 
spec.  That's the
-      // reason why Parquet format spec only defines `TIMESTAMP_MILLIS` and 
`TIMESTAMP_MICROS` which
-      // are both logical types annotating `INT64`.
-      //
-      // Originally, Spark SQL uses the same nanosecond timestamp type as 
Impala and Hive.  Starting
-      // from Spark 1.5.0, we resort to a timestamp type with 100 ns precision 
so that we can store
-      // a timestamp into a `Long`.  This design decision is subject to change 
though, for example,
-      // we may resort to microsecond precision in the future.
-      //
-      // For Parquet, we plan to write all `TimestampType` value as 
`TIMESTAMP_MICROS`, but it's
-      // currently not implemented yet because parquet-mr 1.7.0 (the version 
we're currently using)
-      // hasn't implemented `TIMESTAMP_MICROS` yet.
-      //
-      // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
-      case TimestampType =>
-        Types.primitive(INT96, repetition).named(field.name)
-
-      case BinaryType =>
-        Types.primitive(BINARY, repetition).named(field.name)
-
-      // =====================================
-      // Decimals (for Spark version <= 1.4.x)
-      // =====================================
-
-      // Spark 1.4.x and prior versions only support decimals with a maximum 
precision of 18 and
-      // always store decimals in fixed-length byte arrays.  To keep 
compatibility with these older
-      // versions, here we convert decimals with all precisions to 
`FIXED_LEN_BYTE_ARRAY` annotated
-      // by `DECIMAL`.
-      case DecimalType.Fixed(precision, scale) if !followParquetFormatSpec =>
-        Types
-          .primitive(FIXED_LEN_BYTE_ARRAY, repetition)
-          .as(DECIMAL)
-          .precision(precision)
-          .scale(scale)
-          .length(CatalystSchemaConverter.minBytesForPrecision(precision))
-          .named(field.name)
-
-      // =====================================
-      // Decimals (follow Parquet format spec)
-      // =====================================
-
-      // Uses INT32 for 1 <= precision <= 9
-      case DecimalType.Fixed(precision, scale)
-          if precision <= MAX_PRECISION_FOR_INT32 && followParquetFormatSpec =>
-        Types
-          .primitive(INT32, repetition)
-          .as(DECIMAL)
-          .precision(precision)
-          .scale(scale)
-          .named(field.name)
-
-      // Uses INT64 for 1 <= precision <= 18
-      case DecimalType.Fixed(precision, scale)
-          if precision <= MAX_PRECISION_FOR_INT64 && followParquetFormatSpec =>
-        Types
-          .primitive(INT64, repetition)
-          .as(DECIMAL)
-          .precision(precision)
-          .scale(scale)
-          .named(field.name)
-
-      // Uses FIXED_LEN_BYTE_ARRAY for all other precisions
-      case DecimalType.Fixed(precision, scale) if followParquetFormatSpec =>
-        Types
-          .primitive(FIXED_LEN_BYTE_ARRAY, repetition)
-          .as(DECIMAL)
-          .precision(precision)
-          .scale(scale)
-          .length(CatalystSchemaConverter.minBytesForPrecision(precision))
-          .named(field.name)
-
-      // ===================================================
-      // ArrayType and MapType (for Spark versions <= 1.4.x)
-      // ===================================================
-
-      // Spark 1.4.x and prior versions convert ArrayType with nullable 
elements into a 3-level
-      // LIST structure.  This behavior mimics parquet-hive (1.6.0rc3).  Note 
that this case is
-      // covered by the backwards-compatibility rules implemented in 
`isElementType()`.
-      case ArrayType(elementType, nullable @ true) if !followParquetFormatSpec 
=>
-        // <list-repetition> group <name> (LIST) {
-        //   optional group bag {
-        //     repeated <element-type> element;
-        //   }
-        // }
-        ConversionPatterns.listType(
-          repetition,
-          field.name,
-          Types
-            .buildGroup(REPEATED)
-            // "array_element" is the name chosen by parquet-hive (1.7.0 and 
prior version)
-            .addField(convertField(StructField("array_element", elementType, 
nullable)))
-            .named(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME))
-
-      // Spark 1.4.x and prior versions convert ArrayType with non-nullable 
elements into a 2-level
-      // LIST structure.  This behavior mimics parquet-avro (1.6.0rc3).  Note 
that this case is
-      // covered by the backwards-compatibility rules implemented in 
`isElementType()`.
-      case ArrayType(elementType, nullable @ false) if 
!followParquetFormatSpec =>
-        // <list-repetition> group <name> (LIST) {
-        //   repeated <element-type> element;
-        // }
-        ConversionPatterns.listType(
-          repetition,
-          field.name,
-          // "array" is the name chosen by parquet-avro (1.7.0 and prior 
version)
-          convertField(StructField("array", elementType, nullable), REPEATED))
-
-      // Spark 1.4.x and prior versions convert MapType into a 3-level group 
annotated by
-      // MAP_KEY_VALUE.  This is covered by `convertGroupField(field: 
GroupType): DataType`.
-      case MapType(keyType, valueType, valueContainsNull) if 
!followParquetFormatSpec =>
-        // <map-repetition> group <name> (MAP) {
-        //   repeated group map (MAP_KEY_VALUE) {
-        //     required <key-type> key;
-        //     <value-repetition> <value-type> value;
-        //   }
-        // }
-        ConversionPatterns.mapType(
-          repetition,
-          field.name,
-          convertField(StructField("key", keyType, nullable = false)),
-          convertField(StructField("value", valueType, valueContainsNull)))
-
-      // ==================================================
-      // ArrayType and MapType (follow Parquet format spec)
-      // ==================================================
-
-      case ArrayType(elementType, containsNull) if followParquetFormatSpec =>
-        // <list-repetition> group <name> (LIST) {
-        //   repeated group list {
-        //     <element-repetition> <element-type> element;
-        //   }
-        // }
-        Types
-          .buildGroup(repetition).as(LIST)
-          .addField(
-            Types.repeatedGroup()
-              .addField(convertField(StructField("element", elementType, 
containsNull)))
-              .named("list"))
-          .named(field.name)
-
-      case MapType(keyType, valueType, valueContainsNull) =>
-        // <map-repetition> group <name> (MAP) {
-        //   repeated group key_value {
-        //     required <key-type> key;
-        //     <value-repetition> <value-type> value;
-        //   }
-        // }
-        Types
-          .buildGroup(repetition).as(MAP)
-          .addField(
-            Types
-              .repeatedGroup()
-              .addField(convertField(StructField("key", keyType, nullable = 
false)))
-              .addField(convertField(StructField("value", valueType, 
valueContainsNull)))
-              .named("key_value"))
-          .named(field.name)
-
-      // ===========
-      // Other types
-      // ===========
-
-      case StructType(fields) =>
-        fields.foldLeft(Types.buildGroup(repetition)) { (builder, field) =>
-          builder.addField(convertField(field))
-        }.named(field.name)
-
-      case udt: UserDefinedType[_] =>
-        convertField(field.copy(dataType = udt.sqlType))
-
-      case _ =>
-        throw new AnalysisException(s"Unsupported data type $field.dataType")
-    }
-  }
-}
-
-
-private[parquet] object CatalystSchemaConverter {
-  def checkFieldName(name: String): Unit = {
-    // ,;{}()\n\t= and space are special characters in Parquet schema
-    analysisRequire(
-      !name.matches(".*[ ,;{}()\n\t=].*"),
-      s"""Attribute name "$name" contains invalid character(s) among " 
,;{}()\\n\\t=".
-         |Please use alias to rename it.
-       """.stripMargin.split("\n").mkString(" "))
-  }
-
-  def checkFieldNames(schema: StructType): StructType = {
-    schema.fieldNames.foreach(checkFieldName)
-    schema
-  }
-
-  def analysisRequire(f: => Boolean, message: String): Unit = {
-    if (!f) {
-      throw new AnalysisException(message)
-    }
-  }
-
-  private def computeMinBytesForPrecision(precision : Int) : Int = {
-    var numBytes = 1
-    while (math.pow(2.0, 8 * numBytes - 1) < math.pow(10.0, precision)) {
-      numBytes += 1
-    }
-    numBytes
-  }
-
-  private val MIN_BYTES_FOR_PRECISION = 
Array.tabulate[Int](39)(computeMinBytesForPrecision)
-
-  // Returns the minimum number of bytes needed to store a decimal with a 
given `precision`.
-  def minBytesForPrecision(precision : Int) : Int = {
-    if (precision < MIN_BYTES_FOR_PRECISION.length) {
-      MIN_BYTES_FOR_PRECISION(precision)
-    } else {
-      computeMinBytesForPrecision(precision)
-    }
-  }
-
-  val MAX_PRECISION_FOR_INT32 = maxPrecisionForBytes(4)
-
-  val MAX_PRECISION_FOR_INT64 = maxPrecisionForBytes(8)
-
-  // Max precision of a decimal value stored in `numBytes` bytes
-  def maxPrecisionForBytes(numBytes: Int): Int = {
-    Math.round(                               // convert double to long
-      Math.floor(Math.log10(                  // number of base-10 digits
-        Math.pow(2, 8 * numBytes - 1) - 1)))  // max value stored in numBytes
-      .asInstanceOf[Int]
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
deleted file mode 100644
index 1551afd..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
-import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
-import org.apache.parquet.Log
-import org.apache.parquet.hadoop.util.ContextUtil
-import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, 
ParquetOutputCommitter, ParquetOutputFormat}
-
-/**
- * An output committer for writing Parquet files.  In stead of writing to the 
`_temporary` folder
- * like what [[ParquetOutputCommitter]] does, this output committer writes 
data directly to the
- * destination folder.  This can be useful for data stored in S3, where 
directory operations are
- * relatively expensive.
- *
- * To enable this output committer, users may set the 
"spark.sql.parquet.output.committer.class"
- * property via Hadoop [[Configuration]].  Not that this property overrides
- * "spark.sql.sources.outputCommitterClass".
- *
- * *NOTE*
- *
- *   NEVER use [[DirectParquetOutputCommitter]] when appending data, because 
currently there's
- *   no safe way undo a failed appending job (that's why both `abortTask()` 
and `abortJob()` are
- *   left * empty).
- */
-private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: 
TaskAttemptContext)
-  extends ParquetOutputCommitter(outputPath, context) {
-  val LOG = Log.getLog(classOf[ParquetOutputCommitter])
-
-  override def getWorkPath: Path = outputPath
-  override def abortTask(taskContext: TaskAttemptContext): Unit = {}
-  override def commitTask(taskContext: TaskAttemptContext): Unit = {}
-  override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true
-  override def setupJob(jobContext: JobContext): Unit = {}
-  override def setupTask(taskContext: TaskAttemptContext): Unit = {}
-
-  override def commitJob(jobContext: JobContext) {
-    val configuration = ContextUtil.getConfiguration(jobContext)
-    val fileSystem = outputPath.getFileSystem(configuration)
-
-    if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, 
true)) {
-      try {
-        val outputStatus = fileSystem.getFileStatus(outputPath)
-        val footers = 
ParquetFileReader.readAllFootersInParallel(configuration, outputStatus)
-        try {
-          ParquetFileWriter.writeMetadataFile(configuration, outputPath, 
footers)
-        } catch { case e: Exception =>
-          LOG.warn("could not write summary file for " + outputPath, e)
-          val metadataPath = new Path(outputPath, 
ParquetFileWriter.PARQUET_METADATA_FILE)
-          if (fileSystem.exists(metadataPath)) {
-            fileSystem.delete(metadataPath, true)
-          }
-        }
-      } catch {
-        case e: Exception => LOG.warn("could not write summary file for " + 
outputPath, e)
-      }
-    }
-
-    if 
(configuration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", 
true)) {
-      try {
-        val successPath = new Path(outputPath, 
FileOutputCommitter.SUCCEEDED_FILE_NAME)
-        fileSystem.create(successPath).close()
-      } catch {
-        case e: Exception => LOG.warn("could not write success file for " + 
outputPath, e)
-      }
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
deleted file mode 100644
index 6ed3580..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types.{MapData, ArrayData}
-
-// TODO Removes this while fixing SPARK-8848
-private[sql] object CatalystConverter {
-  // This is mostly Parquet convention (see, e.g., `ConversionPatterns`).
-  // Note that "array" for the array elements is chosen by ParquetAvro.
-  // Using a different value will result in Parquet silently dropping columns.
-  val ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME = "bag"
-  val ARRAY_ELEMENTS_SCHEMA_NAME = "array"
-
-  val MAP_KEY_SCHEMA_NAME = "key"
-  val MAP_VALUE_SCHEMA_NAME = "value"
-  val MAP_SCHEMA_NAME = "map"
-
-  // TODO: consider using Array[T] for arrays to avoid boxing of primitive 
types
-  type ArrayScalaType = ArrayData
-  type StructScalaType = InternalRow
-  type MapScalaType = MapData
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
deleted file mode 100644
index d57b789..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import java.io.Serializable
-import java.nio.ByteBuffer
-
-import com.google.common.io.BaseEncoding
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.filter2.compat.FilterCompat
-import org.apache.parquet.filter2.compat.FilterCompat._
-import org.apache.parquet.filter2.predicate.FilterApi._
-import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, 
Statistics}
-import org.apache.parquet.filter2.predicate.UserDefinedPredicate
-import org.apache.parquet.io.api.Binary
-
-import org.apache.spark.SparkEnv
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.sources
-import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
-
-private[sql] object ParquetFilters {
-  val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"
-
-  def createRecordFilter(filterExpressions: Seq[Expression]): Option[Filter] = 
{
-    filterExpressions.flatMap { filter =>
-      createFilter(filter)
-    }.reduceOption(FilterApi.and).map(FilterCompat.get)
-  }
-
-  case class SetInFilter[T <: Comparable[T]](
-    valueSet: Set[T]) extends UserDefinedPredicate[T] with Serializable {
-
-    override def keep(value: T): Boolean = {
-      value != null && valueSet.contains(value)
-    }
-
-    override def canDrop(statistics: Statistics[T]): Boolean = false
-
-    override def inverseCanDrop(statistics: Statistics[T]): Boolean = false
-  }
-
-  private val makeEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-    case BooleanType =>
-      (n: String, v: Any) => FilterApi.eq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
-    case IntegerType =>
-      (n: String, v: Any) => FilterApi.eq(intColumn(n), 
v.asInstanceOf[Integer])
-    case LongType =>
-      (n: String, v: Any) => FilterApi.eq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
-    case FloatType =>
-      (n: String, v: Any) => FilterApi.eq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
-    case DoubleType =>
-      (n: String, v: Any) => FilterApi.eq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
-
-    // Binary.fromString and Binary.fromByteArray don't accept null values
-    case StringType =>
-      (n: String, v: Any) => FilterApi.eq(
-        binaryColumn(n),
-        Option(v).map(s => 
Binary.fromByteArray(s.asInstanceOf[UTF8String].getBytes)).orNull)
-    case BinaryType =>
-      (n: String, v: Any) => FilterApi.eq(
-        binaryColumn(n),
-        Option(v).map(b => 
Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
-  }
-
-  private val makeNotEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-    case BooleanType =>
-      (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
-    case IntegerType =>
-      (n: String, v: Any) => FilterApi.notEq(intColumn(n), 
v.asInstanceOf[Integer])
-    case LongType =>
-      (n: String, v: Any) => FilterApi.notEq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
-    case FloatType =>
-      (n: String, v: Any) => FilterApi.notEq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
-    case DoubleType =>
-      (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
-    case StringType =>
-      (n: String, v: Any) => FilterApi.notEq(
-        binaryColumn(n),
-        Option(v).map(s => 
Binary.fromByteArray(s.asInstanceOf[UTF8String].getBytes)).orNull)
-    case BinaryType =>
-      (n: String, v: Any) => FilterApi.notEq(
-        binaryColumn(n),
-        Option(v).map(b => 
Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
-  }
-
-  private val makeLt: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-    case IntegerType =>
-      (n: String, v: Any) => FilterApi.lt(intColumn(n), 
v.asInstanceOf[Integer])
-    case LongType =>
-      (n: String, v: Any) => FilterApi.lt(longColumn(n), 
v.asInstanceOf[java.lang.Long])
-    case FloatType =>
-      (n: String, v: Any) => FilterApi.lt(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
-    case DoubleType =>
-      (n: String, v: Any) => FilterApi.lt(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
-    case StringType =>
-      (n: String, v: Any) =>
-        FilterApi.lt(binaryColumn(n), 
Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
-    case BinaryType =>
-      (n: String, v: Any) =>
-        FilterApi.lt(binaryColumn(n), 
Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
-  }
-
-  private val makeLtEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-    case IntegerType =>
-      (n: String, v: Any) => FilterApi.ltEq(intColumn(n), 
v.asInstanceOf[java.lang.Integer])
-    case LongType =>
-      (n: String, v: Any) => FilterApi.ltEq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
-    case FloatType =>
-      (n: String, v: Any) => FilterApi.ltEq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
-    case DoubleType =>
-      (n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
-    case StringType =>
-      (n: String, v: Any) =>
-        FilterApi.ltEq(binaryColumn(n), 
Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
-    case BinaryType =>
-      (n: String, v: Any) =>
-        FilterApi.ltEq(binaryColumn(n), 
Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
-  }
-
-  private val makeGt: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-    case IntegerType =>
-      (n: String, v: Any) => FilterApi.gt(intColumn(n), 
v.asInstanceOf[java.lang.Integer])
-    case LongType =>
-      (n: String, v: Any) => FilterApi.gt(longColumn(n), 
v.asInstanceOf[java.lang.Long])
-    case FloatType =>
-      (n: String, v: Any) => FilterApi.gt(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
-    case DoubleType =>
-      (n: String, v: Any) => FilterApi.gt(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
-    case StringType =>
-      (n: String, v: Any) =>
-        FilterApi.gt(binaryColumn(n), 
Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
-    case BinaryType =>
-      (n: String, v: Any) =>
-        FilterApi.gt(binaryColumn(n), 
Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
-  }
-
-  private val makeGtEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-    case IntegerType =>
-      (n: String, v: Any) => FilterApi.gtEq(intColumn(n), 
v.asInstanceOf[java.lang.Integer])
-    case LongType =>
-      (n: String, v: Any) => FilterApi.gtEq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
-    case FloatType =>
-      (n: String, v: Any) => FilterApi.gtEq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
-    case DoubleType =>
-      (n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
-    case StringType =>
-      (n: String, v: Any) =>
-        FilterApi.gtEq(binaryColumn(n), 
Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
-    case BinaryType =>
-      (n: String, v: Any) =>
-        FilterApi.gtEq(binaryColumn(n), 
Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
-  }
-
-  private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => 
FilterPredicate] = {
-    case IntegerType =>
-      (n: String, v: Set[Any]) =>
-        FilterApi.userDefined(intColumn(n), 
SetInFilter(v.asInstanceOf[Set[java.lang.Integer]]))
-    case LongType =>
-      (n: String, v: Set[Any]) =>
-        FilterApi.userDefined(longColumn(n), 
SetInFilter(v.asInstanceOf[Set[java.lang.Long]]))
-    case FloatType =>
-      (n: String, v: Set[Any]) =>
-        FilterApi.userDefined(floatColumn(n), 
SetInFilter(v.asInstanceOf[Set[java.lang.Float]]))
-    case DoubleType =>
-      (n: String, v: Set[Any]) =>
-        FilterApi.userDefined(doubleColumn(n), 
SetInFilter(v.asInstanceOf[Set[java.lang.Double]]))
-    case StringType =>
-      (n: String, v: Set[Any]) =>
-        FilterApi.userDefined(binaryColumn(n),
-          SetInFilter(v.map(e => 
Binary.fromByteArray(e.asInstanceOf[UTF8String].getBytes))))
-    case BinaryType =>
-      (n: String, v: Set[Any]) =>
-        FilterApi.userDefined(binaryColumn(n),
-          SetInFilter(v.map(e => 
Binary.fromByteArray(e.asInstanceOf[Array[Byte]]))))
-  }
-
-  /**
-   * Converts data sources filters to Parquet filter predicates.
-   */
-  def createFilter(schema: StructType, predicate: sources.Filter): 
Option[FilterPredicate] = {
-    val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
-
-    // NOTE:
-    //
-    // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` 
evaluate to `NULL`,
-    // which can be casted to `false` implicitly. Please refer to the `eval` 
method of these
-    // operators and the `SimplifyFilters` rule for details.
-    predicate match {
-      case sources.IsNull(name) =>
-        makeEq.lift(dataTypeOf(name)).map(_(name, null))
-      case sources.IsNotNull(name) =>
-        makeNotEq.lift(dataTypeOf(name)).map(_(name, null))
-
-      case sources.EqualTo(name, value) =>
-        makeEq.lift(dataTypeOf(name)).map(_(name, value))
-      case sources.Not(sources.EqualTo(name, value)) =>
-        makeNotEq.lift(dataTypeOf(name)).map(_(name, value))
-
-      case sources.LessThan(name, value) =>
-        makeLt.lift(dataTypeOf(name)).map(_(name, value))
-      case sources.LessThanOrEqual(name, value) =>
-        makeLtEq.lift(dataTypeOf(name)).map(_(name, value))
-
-      case sources.GreaterThan(name, value) =>
-        makeGt.lift(dataTypeOf(name)).map(_(name, value))
-      case sources.GreaterThanOrEqual(name, value) =>
-        makeGtEq.lift(dataTypeOf(name)).map(_(name, value))
-
-      case sources.And(lhs, rhs) =>
-        (createFilter(schema, lhs) ++ createFilter(schema, 
rhs)).reduceOption(FilterApi.and)
-
-      case sources.Or(lhs, rhs) =>
-        for {
-          lhsFilter <- createFilter(schema, lhs)
-          rhsFilter <- createFilter(schema, rhs)
-        } yield FilterApi.or(lhsFilter, rhsFilter)
-
-      case sources.Not(pred) =>
-        createFilter(schema, pred).map(FilterApi.not)
-
-      case _ => None
-    }
-  }
-
-  /**
-   * Converts Catalyst predicate expressions to Parquet filter predicates.
-   *
-   * @todo This can be removed once we get rid of the old Parquet support.
-   */
-  def createFilter(predicate: Expression): Option[FilterPredicate] = {
-    // NOTE:
-    //
-    // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` 
evaluate to `NULL`,
-    // which can be casted to `false` implicitly. Please refer to the `eval` 
method of these
-    // operators and the `SimplifyFilters` rule for details.
-    predicate match {
-      case IsNull(NamedExpression(name, dataType)) =>
-        makeEq.lift(dataType).map(_(name, null))
-      case IsNotNull(NamedExpression(name, dataType)) =>
-        makeNotEq.lift(dataType).map(_(name, null))
-
-      case EqualTo(NamedExpression(name, _), NonNullLiteral(value, dataType)) 
=>
-        makeEq.lift(dataType).map(_(name, value))
-      case EqualTo(Cast(NamedExpression(name, _), dataType), 
NonNullLiteral(value, _)) =>
-        makeEq.lift(dataType).map(_(name, value))
-      case EqualTo(NonNullLiteral(value, dataType), NamedExpression(name, _)) 
=>
-        makeEq.lift(dataType).map(_(name, value))
-      case EqualTo(NonNullLiteral(value, _), Cast(NamedExpression(name, _), 
dataType)) =>
-        makeEq.lift(dataType).map(_(name, value))
-
-      case Not(EqualTo(NamedExpression(name, _), NonNullLiteral(value, 
dataType))) =>
-        makeNotEq.lift(dataType).map(_(name, value))
-      case Not(EqualTo(Cast(NamedExpression(name, _), dataType), 
NonNullLiteral(value, _))) =>
-        makeNotEq.lift(dataType).map(_(name, value))
-      case Not(EqualTo(NonNullLiteral(value, dataType), NamedExpression(name, 
_))) =>
-        makeNotEq.lift(dataType).map(_(name, value))
-      case Not(EqualTo(NonNullLiteral(value, _), Cast(NamedExpression(name, 
_), dataType))) =>
-        makeNotEq.lift(dataType).map(_(name, value))
-
-      case LessThan(NamedExpression(name, _), NonNullLiteral(value, dataType)) 
=>
-        makeLt.lift(dataType).map(_(name, value))
-      case LessThan(Cast(NamedExpression(name, _), dataType), 
NonNullLiteral(value, _)) =>
-        makeLt.lift(dataType).map(_(name, value))
-      case LessThan(NonNullLiteral(value, dataType), NamedExpression(name, _)) 
=>
-        makeGt.lift(dataType).map(_(name, value))
-      case LessThan(NonNullLiteral(value, _), Cast(NamedExpression(name, _), 
dataType)) =>
-        makeGt.lift(dataType).map(_(name, value))
-
-      case LessThanOrEqual(NamedExpression(name, _), NonNullLiteral(value, 
dataType)) =>
-        makeLtEq.lift(dataType).map(_(name, value))
-      case LessThanOrEqual(Cast(NamedExpression(name, _), dataType), 
NonNullLiteral(value, _)) =>
-        makeLtEq.lift(dataType).map(_(name, value))
-      case LessThanOrEqual(NonNullLiteral(value, dataType), 
NamedExpression(name, _)) =>
-        makeGtEq.lift(dataType).map(_(name, value))
-      case LessThanOrEqual(NonNullLiteral(value, _), 
Cast(NamedExpression(name, _), dataType)) =>
-        makeGtEq.lift(dataType).map(_(name, value))
-
-      case GreaterThan(NamedExpression(name, _), NonNullLiteral(value, 
dataType)) =>
-        makeGt.lift(dataType).map(_(name, value))
-      case GreaterThan(Cast(NamedExpression(name, _), dataType), 
NonNullLiteral(value, _)) =>
-        makeGt.lift(dataType).map(_(name, value))
-      case GreaterThan(NonNullLiteral(value, dataType), NamedExpression(name, 
_)) =>
-        makeLt.lift(dataType).map(_(name, value))
-      case GreaterThan(NonNullLiteral(value, _), Cast(NamedExpression(name, 
_), dataType)) =>
-        makeLt.lift(dataType).map(_(name, value))
-
-      case GreaterThanOrEqual(NamedExpression(name, _), NonNullLiteral(value, 
dataType)) =>
-        makeGtEq.lift(dataType).map(_(name, value))
-      case GreaterThanOrEqual(Cast(NamedExpression(name, _), dataType), 
NonNullLiteral(value, _)) =>
-        makeGtEq.lift(dataType).map(_(name, value))
-      case GreaterThanOrEqual(NonNullLiteral(value, dataType), 
NamedExpression(name, _)) =>
-        makeLtEq.lift(dataType).map(_(name, value))
-      case GreaterThanOrEqual(NonNullLiteral(value, _), 
Cast(NamedExpression(name, _), dataType)) =>
-        makeLtEq.lift(dataType).map(_(name, value))
-
-      case And(lhs, rhs) =>
-        (createFilter(lhs) ++ createFilter(rhs)).reduceOption(FilterApi.and)
-
-      case Or(lhs, rhs) =>
-        for {
-          lhsFilter <- createFilter(lhs)
-          rhsFilter <- createFilter(rhs)
-        } yield FilterApi.or(lhsFilter, rhsFilter)
-
-      case Not(pred) =>
-        createFilter(pred).map(FilterApi.not)
-
-      case InSet(NamedExpression(name, dataType), valueSet) =>
-        makeInSet.lift(dataType).map(_(name, valueSet))
-
-      case _ => None
-    }
-  }
-
-  /**
-   * Note: Inside the Hadoop API we only have access to `Configuration`, not to
-   * [[org.apache.spark.SparkContext]], so we cannot use broadcasts to convey
-   * the actual filter predicate.
-   */
-  def serializeFilterExpressions(filters: Seq[Expression], conf: 
Configuration): Unit = {
-    if (filters.nonEmpty) {
-      val serialized: Array[Byte] =
-        SparkEnv.get.closureSerializer.newInstance().serialize(filters).array()
-      val encoded: String = BaseEncoding.base64().encode(serialized)
-      conf.set(PARQUET_FILTER_DATA, encoded)
-    }
-  }
-
-  /**
-   * Note: Inside the Hadoop API we only have access to `Configuration`, not to
-   * [[org.apache.spark.SparkContext]], so we cannot use broadcasts to convey
-   * the actual filter predicate.
-   */
-  def deserializeFilterExpressions(conf: Configuration): Seq[Expression] = {
-    val data = conf.get(PARQUET_FILTER_DATA)
-    if (data != null) {
-      val decoded: Array[Byte] = BaseEncoding.base64().decode(data)
-      
SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(decoded))
-    } else {
-      Seq()
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to