http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
deleted file mode 100644
index b6db71b..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ /dev/null
@@ -1,796 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import java.net.URI
-import java.util.logging.{Level, Logger => JLogger}
-import java.util.{List => JList}
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable
-import scala.util.{Failure, Try}
-
-import com.google.common.base.Objects
-import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-import org.apache.parquet.filter2.predicate.FilterApi
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.apache.parquet.hadoop.util.ContextUtil
-import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetRecordReader, 
_}
-import org.apache.parquet.schema.MessageType
-import org.apache.parquet.{Log => ParquetLog}
-
-import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.rdd.{SqlNewHadoopPartition, SqlNewHadoopRDD, RDD}
-import org.apache.spark.rdd.RDD._
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.PartitionSpec
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{DataType, StructType}
-import org.apache.spark.util.{SerializableConfiguration, Utils}
-
-
-private[sql] class DefaultSource extends HadoopFsRelationProvider with 
DataSourceRegister {
-
-  def format(): String = "parquet"
-
-  override def createRelation(
-      sqlContext: SQLContext,
-      paths: Array[String],
-      schema: Option[StructType],
-      partitionColumns: Option[StructType],
-      parameters: Map[String, String]): HadoopFsRelation = {
-    new ParquetRelation(paths, schema, None, partitionColumns, 
parameters)(sqlContext)
-  }
-}
-
-// NOTE: This class is instantiated and used on executor side only, no need to 
be serializable.
-private[sql] class ParquetOutputWriter(path: String, context: 
TaskAttemptContext)
-  extends OutputWriter {
-
-  private val recordWriter: RecordWriter[Void, InternalRow] = {
-    val outputFormat = {
-      new ParquetOutputFormat[InternalRow]() {
-        // Here we override `getDefaultWorkFile` for two reasons:
-        //
-        //  1. To allow appending.  We need to generate unique output file 
names to avoid
-        //     overwriting existing files (either exist before the write job, 
or are just written
-        //     by other tasks within the same write job).
-        //
-        //  2. To allow dynamic partitioning.  Default `getDefaultWorkFile` 
uses
-        //     `FileOutputCommitter.getWorkPath()`, which points to the base 
directory of all
-        //     partitions in the case of dynamic partitioning.
-        override def getDefaultWorkFile(context: TaskAttemptContext, 
extension: String): Path = {
-          val uniqueWriteJobId = 
context.getConfiguration.get("spark.sql.sources.writeJobUUID")
-          val split = context.getTaskAttemptID.getTaskID.getId
-          new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
-        }
-      }
-    }
-
-    outputFormat.getRecordWriter(context)
-  }
-
-  override def write(row: Row): Unit = throw new 
UnsupportedOperationException("call writeInternal")
-
-  override protected[sql] def writeInternal(row: InternalRow): Unit = 
recordWriter.write(null, row)
-
-  override def close(): Unit = recordWriter.close(context)
-}
-
-private[sql] class ParquetRelation(
-    override val paths: Array[String],
-    private val maybeDataSchema: Option[StructType],
-    // This is for metastore conversion.
-    private val maybePartitionSpec: Option[PartitionSpec],
-    override val userDefinedPartitionColumns: Option[StructType],
-    parameters: Map[String, String])(
-    val sqlContext: SQLContext)
-  extends HadoopFsRelation(maybePartitionSpec)
-  with Logging {
-
-  private[sql] def this(
-      paths: Array[String],
-      maybeDataSchema: Option[StructType],
-      maybePartitionSpec: Option[PartitionSpec],
-      parameters: Map[String, String])(
-      sqlContext: SQLContext) = {
-    this(
-      paths,
-      maybeDataSchema,
-      maybePartitionSpec,
-      maybePartitionSpec.map(_.partitionColumns),
-      parameters)(sqlContext)
-  }
-
-  // Should we merge schemas from all Parquet part-files?
-  private val shouldMergeSchemas =
-    parameters
-      .get(ParquetRelation.MERGE_SCHEMA)
-      .map(_.toBoolean)
-      
.getOrElse(sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
-
-  private val mergeRespectSummaries =
-    sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES)
-
-  private val maybeMetastoreSchema = parameters
-    .get(ParquetRelation.METASTORE_SCHEMA)
-    .map(DataType.fromJson(_).asInstanceOf[StructType])
-
-  private lazy val metadataCache: MetadataCache = {
-    val meta = new MetadataCache
-    meta.refresh()
-    meta
-  }
-
-  override def equals(other: Any): Boolean = other match {
-    case that: ParquetRelation =>
-      val schemaEquality = if (shouldMergeSchemas) {
-        this.shouldMergeSchemas == that.shouldMergeSchemas
-      } else {
-        this.dataSchema == that.dataSchema &&
-          this.schema == that.schema
-      }
-
-      this.paths.toSet == that.paths.toSet &&
-        schemaEquality &&
-        this.maybeDataSchema == that.maybeDataSchema &&
-        this.partitionColumns == that.partitionColumns
-
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    if (shouldMergeSchemas) {
-      Objects.hashCode(
-        Boolean.box(shouldMergeSchemas),
-        paths.toSet,
-        maybeDataSchema,
-        partitionColumns)
-    } else {
-      Objects.hashCode(
-        Boolean.box(shouldMergeSchemas),
-        paths.toSet,
-        dataSchema,
-        schema,
-        maybeDataSchema,
-        partitionColumns)
-    }
-  }
-
-  /** Constraints on schema of dataframe to be stored. */
-  private def checkConstraints(schema: StructType): Unit = {
-    if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
-      val duplicateColumns = schema.fieldNames.groupBy(identity).collect {
-        case (x, ys) if ys.length > 1 => "\"" + x + "\""
-      }.mkString(", ")
-      throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns 
found, " +
-        s"cannot save to parquet format")
-    }
-  }
-
-  override def dataSchema: StructType = {
-    val schema = maybeDataSchema.getOrElse(metadataCache.dataSchema)
-    // check if schema satisfies the constraints
-    // before moving forward
-    checkConstraints(schema)
-    schema
-  }
-
-  override private[sql] def refresh(): Unit = {
-    super.refresh()
-    metadataCache.refresh()
-  }
-
-  // Parquet data source always uses Catalyst internal representations.
-  override val needConversion: Boolean = false
-
-  override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum
-
-  override def prepareJobForWrite(job: Job): OutputWriterFactory = {
-    val conf = ContextUtil.getConfiguration(job)
-
-    val committerClass =
-      conf.getClass(
-        SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
-        classOf[ParquetOutputCommitter],
-        classOf[ParquetOutputCommitter])
-
-    if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) {
-      logInfo("Using default output committer for Parquet: " +
-        classOf[ParquetOutputCommitter].getCanonicalName)
-    } else {
-      logInfo("Using user defined output committer for Parquet: " + 
committerClass.getCanonicalName)
-    }
-
-    conf.setClass(
-      SQLConf.OUTPUT_COMMITTER_CLASS.key,
-      committerClass,
-      classOf[ParquetOutputCommitter])
-
-    // We're not really using `ParquetOutputFormat[Row]` for writing data 
here, because we override
-    // it in `ParquetOutputWriter` to support appending and dynamic 
partitioning.  The reason why
-    // we set it here is to setup the output committer class to 
`ParquetOutputCommitter`, which is
-    // bundled with `ParquetOutputFormat[Row]`.
-    job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
-
-    // TODO There's no need to use two kinds of WriteSupport
-    // We should unify them. `SpecificMutableRow` can process both atomic 
(primitive) types and
-    // complex types.
-    val writeSupportClass =
-      if 
(dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
-        classOf[MutableRowWriteSupport]
-      } else {
-        classOf[RowWriteSupport]
-      }
-
-    ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass)
-    RowWriteSupport.setSchema(dataSchema.toAttributes, conf)
-
-    // Sets compression scheme
-    conf.set(
-      ParquetOutputFormat.COMPRESSION,
-      ParquetRelation
-        .shortParquetCompressionCodecNames
-        .getOrElse(
-          sqlContext.conf.parquetCompressionCodec.toUpperCase,
-          CompressionCodecName.UNCOMPRESSED).name())
-
-    new OutputWriterFactory {
-      override def newInstance(
-          path: String, dataSchema: StructType, context: TaskAttemptContext): 
OutputWriter = {
-        new ParquetOutputWriter(path, context)
-      }
-    }
-  }
-
-  override def buildScan(
-      requiredColumns: Array[String],
-      filters: Array[Filter],
-      inputFiles: Array[FileStatus],
-      broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
-    val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA)
-    val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
-    val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
-    val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
-    val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
-
-    // Create the function to set variable Parquet confs at both driver and 
executor side.
-    val initLocalJobFuncOpt =
-      ParquetRelation.initializeLocalJobFunc(
-        requiredColumns,
-        filters,
-        dataSchema,
-        useMetadataCache,
-        parquetFilterPushDown,
-        assumeBinaryIsString,
-        assumeInt96IsTimestamp,
-        followParquetFormatSpec) _
-
-    // Create the function to set input paths at the driver side.
-    val setInputPaths = 
ParquetRelation.initializeDriverSideJobFunc(inputFiles) _
-
-    Utils.withDummyCallSite(sqlContext.sparkContext) {
-      new SqlNewHadoopRDD(
-        sc = sqlContext.sparkContext,
-        broadcastedConf = broadcastedConf,
-        initDriverSideJobFuncOpt = Some(setInputPaths),
-        initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
-        inputFormatClass = classOf[ParquetInputFormat[InternalRow]],
-        valueClass = classOf[InternalRow]) {
-
-        val cacheMetadata = useMetadataCache
-
-        @transient val cachedStatuses = inputFiles.map { f =>
-          // In order to encode the authority of a Path containing special 
characters such as '/'
-          // (which does happen in some S3N credentials), we need to use the 
string returned by the
-          // URI of the path to create a new Path.
-          val pathWithEscapedAuthority = escapePathUserInfo(f.getPath)
-          new FileStatus(
-            f.getLen, f.isDir, f.getReplication, f.getBlockSize, 
f.getModificationTime,
-            f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, 
pathWithEscapedAuthority)
-        }.toSeq
-
-        private def escapePathUserInfo(path: Path): Path = {
-          val uri = path.toUri
-          new Path(new URI(
-            uri.getScheme, uri.getRawUserInfo, uri.getHost, uri.getPort, 
uri.getPath,
-            uri.getQuery, uri.getFragment))
-        }
-
-        // Overridden so we can inject our own cached files statuses.
-        override def getPartitions: Array[SparkPartition] = {
-          val inputFormat = new ParquetInputFormat[InternalRow] {
-            override def listStatus(jobContext: JobContext): JList[FileStatus] 
= {
-              if (cacheMetadata) cachedStatuses else 
super.listStatus(jobContext)
-            }
-          }
-
-          val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
-          val rawSplits = inputFormat.getSplits(jobContext)
-
-          Array.tabulate[SparkPartition](rawSplits.size) { i =>
-            new SqlNewHadoopPartition(id, i, 
rawSplits(i).asInstanceOf[InputSplit with Writable])
-          }
-        }
-      }.asInstanceOf[RDD[Row]]  // type erasure hack to pass RDD[InternalRow] 
as RDD[Row]
-    }
-  }
-
-  private class MetadataCache {
-    // `FileStatus` objects of all "_metadata" files.
-    private var metadataStatuses: Array[FileStatus] = _
-
-    // `FileStatus` objects of all "_common_metadata" files.
-    private var commonMetadataStatuses: Array[FileStatus] = _
-
-    // `FileStatus` objects of all data files (Parquet part-files).
-    var dataStatuses: Array[FileStatus] = _
-
-    // Schema of the actual Parquet files, without partition columns 
discovered from partition
-    // directory paths.
-    var dataSchema: StructType = null
-
-    // Schema of the whole table, including partition columns.
-    var schema: StructType = _
-
-    // Cached leaves
-    var cachedLeaves: Set[FileStatus] = null
-
-    /**
-     * Refreshes `FileStatus`es, footers, partition spec, and table schema.
-     */
-    def refresh(): Unit = {
-      val currentLeafStatuses = cachedLeafStatuses()
-
-      // Check if cachedLeafStatuses is changed or not
-      val leafStatusesChanged = (cachedLeaves == null) ||
-        !cachedLeaves.equals(currentLeafStatuses)
-
-      if (leafStatusesChanged) {
-        cachedLeaves = currentLeafStatuses.toIterator.toSet
-
-        // Lists `FileStatus`es of all leaf nodes (files) under all base 
directories.
-        val leaves = currentLeafStatuses.filter { f =>
-          isSummaryFile(f.getPath) ||
-            !(f.getPath.getName.startsWith("_") || 
f.getPath.getName.startsWith("."))
-        }.toArray
-
-        dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
-        metadataStatuses =
-          leaves.filter(_.getPath.getName == 
ParquetFileWriter.PARQUET_METADATA_FILE)
-        commonMetadataStatuses =
-          leaves.filter(_.getPath.getName == 
ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
-
-        dataSchema = {
-          val dataSchema0 = maybeDataSchema
-            .orElse(readSchema())
-            .orElse(maybeMetastoreSchema)
-            .getOrElse(throw new AnalysisException(
-              s"Failed to discover schema of Parquet file(s) in the following 
location(s):\n" +
-                paths.mkString("\n\t")))
-
-          // If this Parquet relation is converted from a Hive Metastore 
table, must reconcile case
-          // case insensitivity issue and possible schema mismatch (probably 
caused by schema
-          // evolution).
-          maybeMetastoreSchema
-            .map(ParquetRelation.mergeMetastoreParquetSchema(_, dataSchema0))
-            .getOrElse(dataSchema0)
-        }
-      }
-    }
-
-    private def isSummaryFile(file: Path): Boolean = {
-      file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
-        file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
-    }
-
-    private def readSchema(): Option[StructType] = {
-      // Sees which file(s) we need to touch in order to figure out the schema.
-      //
-      // Always tries the summary files first if users don't require a merged 
schema.  In this case,
-      // "_common_metadata" is more preferable than "_metadata" because it 
doesn't contain row
-      // groups information, and could be much smaller for large Parquet files 
with lots of row
-      // groups.  If no summary file is available, falls back to some random 
part-file.
-      //
-      // NOTE: Metadata stored in the summary files are merged from all 
part-files.  However, for
-      // user defined key-value metadata (in which we store Spark SQL schema), 
Parquet doesn't know
-      // how to merge them correctly if some key is associated with different 
values in different
-      // part-files.  When this happens, Parquet simply gives up generating 
the summary file.  This
-      // implies that if a summary file presents, then:
-      //
-      //   1. Either all part-files have exactly the same Spark SQL schema, or
-      //   2. Some part-files don't contain Spark SQL schema in the key-value 
metadata at all (thus
-      //      their schemas may differ from each other).
-      //
-      // Here we tend to be pessimistic and take the second case into account. 
 Basically this means
-      // we can't trust the summary files if users require a merged schema, 
and must touch all part-
-      // files to do the merge.
-      val filesToTouch =
-        if (shouldMergeSchemas) {
-          // Also includes summary files, 'cause there might be empty 
partition directories.
-
-          // If mergeRespectSummaries config is true, we assume that all 
part-files are the same for
-          // their schema with summary files, so we ignore them when merging 
schema.
-          // If the config is disabled, which is the default setting, we merge 
all part-files.
-          // In this mode, we only need to merge schemas contained in all 
those summary files.
-          // You should enable this configuration only if you are very sure 
that for the parquet
-          // part-files to read there are corresponding summary files 
containing correct schema.
-
-          val needMerged: Seq[FileStatus] =
-            if (mergeRespectSummaries) {
-              Seq()
-            } else {
-              dataStatuses
-            }
-          (metadataStatuses ++ commonMetadataStatuses ++ needMerged).toSeq
-        } else {
-          // Tries any "_common_metadata" first. Parquet files written by old 
versions or Parquet
-          // don't have this.
-          commonMetadataStatuses.headOption
-            // Falls back to "_metadata"
-            .orElse(metadataStatuses.headOption)
-            // Summary file(s) not found, the Parquet file is either 
corrupted, or different part-
-            // files contain conflicting user defined metadata (two or more 
values are associated
-            // with a same key in different files).  In either case, we fall 
back to any of the
-            // first part-file, and just assume all schemas are consistent.
-            .orElse(dataStatuses.headOption)
-            .toSeq
-        }
-
-      assert(
-        filesToTouch.nonEmpty || maybeDataSchema.isDefined || 
maybeMetastoreSchema.isDefined,
-        "No predefined schema found, " +
-          s"and no Parquet data files or summary files found under 
${paths.mkString(", ")}.")
-
-      ParquetRelation.mergeSchemasInParallel(filesToTouch, sqlContext)
-    }
-  }
-}
-
-private[sql] object ParquetRelation extends Logging {
-  // Whether we should merge schemas collected from all Parquet part-files.
-  private[sql] val MERGE_SCHEMA = "mergeSchema"
-
-  // Hive Metastore schema, used when converting Metastore Parquet tables.  
This option is only used
-  // internally.
-  private[sql] val METASTORE_SCHEMA = "metastoreSchema"
-
-  /** This closure sets various Parquet configurations at both driver side and 
executor side. */
-  private[parquet] def initializeLocalJobFunc(
-      requiredColumns: Array[String],
-      filters: Array[Filter],
-      dataSchema: StructType,
-      useMetadataCache: Boolean,
-      parquetFilterPushDown: Boolean,
-      assumeBinaryIsString: Boolean,
-      assumeInt96IsTimestamp: Boolean,
-      followParquetFormatSpec: Boolean)(job: Job): Unit = {
-    val conf = job.getConfiguration
-    conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[CatalystReadSupport].getName)
-
-    // Try to push down filters when filter push-down is enabled.
-    if (parquetFilterPushDown) {
-      filters
-        // Collects all converted Parquet filter predicates. Notice that not 
all predicates can be
-        // converted (`ParquetFilters.createFilter` returns an `Option`). 
That's why a `flatMap`
-        // is used here.
-        .flatMap(ParquetFilters.createFilter(dataSchema, _))
-        .reduceOption(FilterApi.and)
-        .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
-    }
-
-    conf.set(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
-      val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
-      CatalystSchemaConverter.checkFieldNames(requestedSchema).json
-    })
-
-    conf.set(
-      RowWriteSupport.SPARK_ROW_SCHEMA,
-      CatalystSchemaConverter.checkFieldNames(dataSchema).json)
-
-    // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet 
and FS metadata
-    conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)
-
-    // Sets flags for Parquet schema conversion
-    conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
-    conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, 
assumeInt96IsTimestamp)
-    conf.setBoolean(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key, 
followParquetFormatSpec)
-  }
-
-  /** This closure sets input paths at the driver side. */
-  private[parquet] def initializeDriverSideJobFunc(
-      inputFiles: Array[FileStatus])(job: Job): Unit = {
-    // We side the input paths at the driver side.
-    logInfo(s"Reading Parquet file(s) from 
${inputFiles.map(_.getPath).mkString(", ")}")
-    if (inputFiles.nonEmpty) {
-      FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
-    }
-  }
-
-  private[parquet] def readSchema(
-      footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
-
-    def parseParquetSchema(schema: MessageType): StructType = {
-      val converter = new CatalystSchemaConverter(
-        sqlContext.conf.isParquetBinaryAsString,
-        sqlContext.conf.isParquetBinaryAsString,
-        sqlContext.conf.followParquetFormatSpec)
-
-      converter.convert(schema)
-    }
-
-    val seen = mutable.HashSet[String]()
-    val finalSchemas: Seq[StructType] = footers.flatMap { footer =>
-      val metadata = footer.getParquetMetadata.getFileMetaData
-      val serializedSchema = metadata
-        .getKeyValueMetaData
-        .toMap
-        .get(CatalystReadSupport.SPARK_METADATA_KEY)
-      if (serializedSchema.isEmpty) {
-        // Falls back to Parquet schema if no Spark SQL schema found.
-        Some(parseParquetSchema(metadata.getSchema))
-      } else if (!seen.contains(serializedSchema.get)) {
-        seen += serializedSchema.get
-
-        // Don't throw even if we failed to parse the serialized Spark schema. 
Just fallback to
-        // whatever is available.
-        Some(Try(DataType.fromJson(serializedSchema.get))
-          .recover { case _: Throwable =>
-            logInfo(
-              s"Serialized Spark schema in Parquet key-value metadata is not 
in JSON format, " +
-                "falling back to the deprecated DataType.fromCaseClassString 
parser.")
-            DataType.fromCaseClassString(serializedSchema.get)
-          }
-          .recover { case cause: Throwable =>
-            logWarning(
-              s"""Failed to parse serialized Spark schema in Parquet key-value 
metadata:
-                 |\t$serializedSchema
-               """.stripMargin,
-              cause)
-          }
-          .map(_.asInstanceOf[StructType])
-          .getOrElse {
-            // Falls back to Parquet schema if Spark SQL schema can't be 
parsed.
-            parseParquetSchema(metadata.getSchema)
-          })
-      } else {
-        None
-      }
-    }
-
-    finalSchemas.reduceOption { (left, right) =>
-      try left.merge(right) catch { case e: Throwable =>
-        throw new SparkException(s"Failed to merge incompatible schemas $left 
and $right", e)
-      }
-    }
-  }
-
-  /**
-   * Reconciles Hive Metastore case insensitivity issue and data type 
conflicts between Metastore
-   * schema and Parquet schema.
-   *
-   * Hive doesn't retain case information, while Parquet is case sensitive. On 
the other hand, the
-   * schema read from Parquet files may be incomplete (e.g. older versions of 
Parquet doesn't
-   * distinguish binary and string).  This method generates a correct schema 
by merging Metastore
-   * schema data types and Parquet schema field names.
-   */
-  private[parquet] def mergeMetastoreParquetSchema(
-      metastoreSchema: StructType,
-      parquetSchema: StructType): StructType = {
-    def schemaConflictMessage: String =
-      s"""Converting Hive Metastore Parquet, but detected conflicting schemas. 
Metastore schema:
-         |${metastoreSchema.prettyJson}
-         |
-         |Parquet schema:
-         |${parquetSchema.prettyJson}
-       """.stripMargin
-
-    val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, 
parquetSchema)
-
-    assert(metastoreSchema.size <= mergedParquetSchema.size, 
schemaConflictMessage)
-
-    val ordinalMap = metastoreSchema.zipWithIndex.map {
-      case (field, index) => field.name.toLowerCase -> index
-    }.toMap
-
-    val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
-      ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
-
-    StructType(metastoreSchema.zip(reorderedParquetSchema).map {
-      // Uses Parquet field names but retains Metastore data types.
-      case (mSchema, pSchema) if mSchema.name.toLowerCase == 
pSchema.name.toLowerCase =>
-        mSchema.copy(name = pSchema.name)
-      case _ =>
-        throw new SparkException(schemaConflictMessage)
-    })
-  }
-
-  /**
-   * Returns the original schema from the Parquet file with any missing 
nullable fields from the
-   * Hive Metastore schema merged in.
-   *
-   * When constructing a DataFrame from a collection of structured data, the 
resulting object has
-   * a schema corresponding to the union of the fields present in each element 
of the collection.
-   * Spark SQL simply assigns a null value to any field that isn't present for 
a particular row.
-   * In some cases, it is possible that a given table partition stored as a 
Parquet file doesn't
-   * contain a particular nullable field in its schema despite that field 
being present in the
-   * table schema obtained from the Hive Metastore. This method returns a 
schema representing the
-   * Parquet file schema along with any additional nullable fields from the 
Metastore schema
-   * merged in.
-   */
-  private[parquet] def mergeMissingNullableFields(
-      metastoreSchema: StructType,
-      parquetSchema: StructType): StructType = {
-    val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
-    val missingFields = metastoreSchema
-      .map(_.name.toLowerCase)
-      .diff(parquetSchema.map(_.name.toLowerCase))
-      .map(fieldMap(_))
-      .filter(_.nullable)
-    StructType(parquetSchema ++ missingFields)
-  }
-
-  /**
-   * Figures out a merged Parquet schema with a distributed Spark job.
-   *
-   * Note that locality is not taken into consideration here because:
-   *
-   *  1. For a single Parquet part-file, in most cases the footer only resides 
in the last block of
-   *     that file.  Thus we only need to retrieve the location of the last 
block.  However, Hadoop
-   *     `FileSystem` only provides API to retrieve locations of all blocks, 
which can be
-   *     potentially expensive.
-   *
-   *  2. This optimization is mainly useful for S3, where file metadata 
operations can be pretty
-   *     slow.  And basically locality is not available when using S3 (you 
can't run computation on
-   *     S3 nodes).
-   */
-  def mergeSchemasInParallel(
-      filesToTouch: Seq[FileStatus], sqlContext: SQLContext): 
Option[StructType] = {
-    val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
-    val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
-    val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
-    val serializedConf = new 
SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration)
-
-    // HACK ALERT:
-    //
-    // Parquet requires `FileStatus`es to read footers.  Here we try to send 
cached `FileStatus`es
-    // to executor side to avoid fetching them again.  However, `FileStatus` 
is not `Serializable`
-    // but only `Writable`.  What makes it worth, for some reason, 
`FileStatus` doesn't play well
-    // with `SerializableWritable[T]` and always causes a weird 
`IllegalStateException`.  These
-    // facts virtually prevents us to serialize `FileStatus`es.
-    //
-    // Since Parquet only relies on path and length information of those 
`FileStatus`es to read
-    // footers, here we just extract them (which can be easily serialized), 
send them to executor
-    // side, and resemble fake `FileStatus`es there.
-    val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, 
f.getLen))
-
-    // Issues a Spark job to read Parquet schema in parallel.
-    val partiallyMergedSchemas =
-      sqlContext
-        .sparkContext
-        .parallelize(partialFileStatusInfo)
-        .mapPartitions { iterator =>
-          // Resembles fake `FileStatus`es with serialized path and length 
information.
-          val fakeFileStatuses = iterator.map { case (path, length) =>
-            new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new 
Path(path))
-          }.toSeq
-
-          // Skips row group information since we only need the schema
-          val skipRowGroups = true
-
-          // Reads footers in multi-threaded manner within each task
-          val footers =
-            ParquetFileReader.readAllFootersInParallel(
-              serializedConf.value, fakeFileStatuses, skipRowGroups)
-
-          // Converter used to convert Parquet `MessageType` to Spark SQL 
`StructType`
-          val converter =
-            new CatalystSchemaConverter(
-              assumeBinaryIsString = assumeBinaryIsString,
-              assumeInt96IsTimestamp = assumeInt96IsTimestamp,
-              followParquetFormatSpec = followParquetFormatSpec)
-
-          footers.map { footer =>
-            ParquetRelation.readSchemaFromFooter(footer, converter)
-          }.reduceOption(_ merge _).iterator
-        }.collect()
-
-    partiallyMergedSchemas.reduceOption(_ merge _)
-  }
-
-  /**
-   * Reads Spark SQL schema from a Parquet footer.  If a valid serialized 
Spark SQL schema string
-   * can be found in the file metadata, returns the deserialized 
[[StructType]], otherwise, returns
-   * a [[StructType]] converted from the [[MessageType]] stored in this footer.
-   */
-  def readSchemaFromFooter(
-      footer: Footer, converter: CatalystSchemaConverter): StructType = {
-    val fileMetaData = footer.getParquetMetadata.getFileMetaData
-    fileMetaData
-      .getKeyValueMetaData
-      .toMap
-      .get(CatalystReadSupport.SPARK_METADATA_KEY)
-      .flatMap(deserializeSchemaString)
-      .getOrElse(converter.convert(fileMetaData.getSchema))
-  }
-
-  private def deserializeSchemaString(schemaString: String): 
Option[StructType] = {
-    // Tries to deserialize the schema string as JSON first, then falls back 
to the case class
-    // string parser (data generated by older versions of Spark SQL uses this 
format).
-    Try(DataType.fromJson(schemaString).asInstanceOf[StructType]).recover {
-      case _: Throwable =>
-        logInfo(
-          s"Serialized Spark schema in Parquet key-value metadata is not in 
JSON format, " +
-            "falling back to the deprecated DataType.fromCaseClassString 
parser.")
-        DataType.fromCaseClassString(schemaString).asInstanceOf[StructType]
-    }.recoverWith {
-      case cause: Throwable =>
-        logWarning(
-          "Failed to parse and ignored serialized Spark schema in " +
-            s"Parquet key-value metadata:\n\t$schemaString", cause)
-        Failure(cause)
-    }.toOption
-  }
-
-  def enableLogForwarding() {
-    // Note: the org.apache.parquet.Log class has a static initializer that
-    // sets the java.util.logging Logger for "org.apache.parquet". This
-    // checks first to see if there's any handlers already set
-    // and if not it creates them. If this method executes prior
-    // to that class being loaded then:
-    //  1) there's no handlers installed so there's none to
-    // remove. But when it IS finally loaded the desired affect
-    // of removing them is circumvented.
-    //  2) The parquet.Log static initializer calls setUseParentHandlers(false)
-    // undoing the attempt to override the logging here.
-    //
-    // Therefore we need to force the class to be loaded.
-    // This should really be resolved by Parquet.
-    Utils.classForName(classOf[ParquetLog].getName)
-
-    // Note: Logger.getLogger("parquet") has a default logger
-    // that appends to Console which needs to be cleared.
-    val parquetLogger = 
JLogger.getLogger(classOf[ParquetLog].getPackage.getName)
-    parquetLogger.getHandlers.foreach(parquetLogger.removeHandler)
-    parquetLogger.setUseParentHandlers(true)
-
-    // Disables a WARN log message in ParquetOutputCommitter.  We first ensure 
that
-    // ParquetOutputCommitter is loaded and the static LOG field gets 
initialized.
-    // See https://issues.apache.org/jira/browse/SPARK-5968 for details
-    Utils.classForName(classOf[ParquetOutputCommitter].getName)
-    
JLogger.getLogger(classOf[ParquetOutputCommitter].getName).setLevel(Level.OFF)
-
-    // Similar as above, disables a unnecessary WARN log message in 
ParquetRecordReader.
-    // See https://issues.apache.org/jira/browse/PARQUET-220 for details
-    Utils.classForName(classOf[ParquetRecordReader[_]].getName)
-    
JLogger.getLogger(classOf[ParquetRecordReader[_]].getName).setLevel(Level.OFF)
-  }
-
-  // The parquet compression short names
-  val shortParquetCompressionCodecNames = Map(
-    "NONE"         -> CompressionCodecName.UNCOMPRESSED,
-    "UNCOMPRESSED" -> CompressionCodecName.UNCOMPRESSED,
-    "SNAPPY"       -> CompressionCodecName.SNAPPY,
-    "GZIP"         -> CompressionCodecName.GZIP,
-    "LZO"          -> CompressionCodecName.LZO)
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
deleted file mode 100644
index 9cd0250..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ /dev/null
@@ -1,322 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import java.math.BigInteger
-import java.nio.{ByteBuffer, ByteOrder}
-import java.util.{HashMap => JHashMap}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.column.ParquetProperties
-import org.apache.parquet.hadoop.ParquetOutputFormat
-import org.apache.parquet.hadoop.api.WriteSupport
-import org.apache.parquet.io.api._
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
-
-/**
- * A `parquet.hadoop.api.WriteSupport` for Row objects.
- */
-private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with 
Logging {
-
-  private[parquet] var writer: RecordConsumer = null
-  private[parquet] var attributes: Array[Attribute] = null
-
-  override def init(configuration: Configuration): WriteSupport.WriteContext = 
{
-    val origAttributesStr: String = 
configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA)
-    val metadata = new JHashMap[String, String]()
-    metadata.put(CatalystReadSupport.SPARK_METADATA_KEY, origAttributesStr)
-
-    if (attributes == null) {
-      attributes = 
ParquetTypesConverter.convertFromString(origAttributesStr).toArray
-    }
-
-    log.debug(s"write support initialized for requested schema $attributes")
-    ParquetRelation.enableLogForwarding()
-    new 
WriteSupport.WriteContext(ParquetTypesConverter.convertFromAttributes(attributes),
 metadata)
-  }
-
-  override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
-    writer = recordConsumer
-    log.debug(s"preparing for write with schema $attributes")
-  }
-
-  override def write(record: InternalRow): Unit = {
-    val attributesSize = attributes.size
-    if (attributesSize > record.numFields) {
-      throw new IndexOutOfBoundsException("Trying to write more fields than 
contained in row " +
-        s"($attributesSize > ${record.numFields})")
-    }
-
-    var index = 0
-    writer.startMessage()
-    while(index < attributesSize) {
-      // null values indicate optional fields but we do not check currently
-      if (!record.isNullAt(index)) {
-        writer.startField(attributes(index).name, index)
-        writeValue(attributes(index).dataType, record.get(index, 
attributes(index).dataType))
-        writer.endField(attributes(index).name, index)
-      }
-      index = index + 1
-    }
-    writer.endMessage()
-  }
-
-  private[parquet] def writeValue(schema: DataType, value: Any): Unit = {
-    if (value != null) {
-      schema match {
-        case t: UserDefinedType[_] => writeValue(t.sqlType, value)
-        case t @ ArrayType(_, _) => writeArray(
-          t,
-          value.asInstanceOf[CatalystConverter.ArrayScalaType])
-        case t @ MapType(_, _, _) => writeMap(
-          t,
-          value.asInstanceOf[CatalystConverter.MapScalaType])
-        case t @ StructType(_) => writeStruct(
-          t,
-          value.asInstanceOf[CatalystConverter.StructScalaType])
-        case _ => writePrimitive(schema.asInstanceOf[AtomicType], value)
-      }
-    }
-  }
-
-  private[parquet] def writePrimitive(schema: DataType, value: Any): Unit = {
-    if (value != null) {
-      schema match {
-        case BooleanType => writer.addBoolean(value.asInstanceOf[Boolean])
-        case ByteType => writer.addInteger(value.asInstanceOf[Byte])
-        case ShortType => writer.addInteger(value.asInstanceOf[Short])
-        case IntegerType | DateType => 
writer.addInteger(value.asInstanceOf[Int])
-        case LongType => writer.addLong(value.asInstanceOf[Long])
-        case TimestampType => writeTimestamp(value.asInstanceOf[Long])
-        case FloatType => writer.addFloat(value.asInstanceOf[Float])
-        case DoubleType => writer.addDouble(value.asInstanceOf[Double])
-        case StringType => writer.addBinary(
-          Binary.fromByteArray(value.asInstanceOf[UTF8String].getBytes))
-        case BinaryType => writer.addBinary(
-          Binary.fromByteArray(value.asInstanceOf[Array[Byte]]))
-        case DecimalType.Fixed(precision, _) =>
-          writeDecimal(value.asInstanceOf[Decimal], precision)
-        case _ => sys.error(s"Do not know how to writer $schema to consumer")
-      }
-    }
-  }
-
-  private[parquet] def writeStruct(
-      schema: StructType,
-      struct: CatalystConverter.StructScalaType): Unit = {
-    if (struct != null) {
-      val fields = schema.fields.toArray
-      writer.startGroup()
-      var i = 0
-      while(i < fields.length) {
-        if (!struct.isNullAt(i)) {
-          writer.startField(fields(i).name, i)
-          writeValue(fields(i).dataType, struct.get(i, fields(i).dataType))
-          writer.endField(fields(i).name, i)
-        }
-        i = i + 1
-      }
-      writer.endGroup()
-    }
-  }
-
-  private[parquet] def writeArray(
-      schema: ArrayType,
-      array: CatalystConverter.ArrayScalaType): Unit = {
-    val elementType = schema.elementType
-    writer.startGroup()
-    if (array.numElements() > 0) {
-      if (schema.containsNull) {
-        
writer.startField(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, 0)
-        var i = 0
-        while (i < array.numElements()) {
-          writer.startGroup()
-          if (!array.isNullAt(i)) {
-            writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
-            writeValue(elementType, array.get(i, elementType))
-            writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
-          }
-          writer.endGroup()
-          i = i + 1
-        }
-        writer.endField(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, 
0)
-      } else {
-        writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
-        var i = 0
-        while (i < array.numElements()) {
-          writeValue(elementType, array.get(i, elementType))
-          i = i + 1
-        }
-        writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
-      }
-    }
-    writer.endGroup()
-  }
-
-  private[parquet] def writeMap(
-      schema: MapType,
-      map: CatalystConverter.MapScalaType): Unit = {
-    writer.startGroup()
-    val length = map.numElements()
-    if (length > 0) {
-      writer.startField(CatalystConverter.MAP_SCHEMA_NAME, 0)
-      map.foreach(schema.keyType, schema.valueType, (key, value) => {
-        writer.startGroup()
-        writer.startField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0)
-        writeValue(schema.keyType, key)
-        writer.endField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0)
-        if (value != null) {
-          writer.startField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1)
-          writeValue(schema.valueType, value)
-          writer.endField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1)
-        }
-        writer.endGroup()
-      })
-      writer.endField(CatalystConverter.MAP_SCHEMA_NAME, 0)
-    }
-    writer.endGroup()
-  }
-
-  // Scratch array used to write decimals as fixed-length byte array
-  private[this] var reusableDecimalBytes = new Array[Byte](16)
-
-  private[parquet] def writeDecimal(decimal: Decimal, precision: Int): Unit = {
-    val numBytes = CatalystSchemaConverter.minBytesForPrecision(precision)
-
-    def longToBinary(unscaled: Long): Binary = {
-      var i = 0
-      var shift = 8 * (numBytes - 1)
-      while (i < numBytes) {
-        reusableDecimalBytes(i) = (unscaled >> shift).toByte
-        i += 1
-        shift -= 8
-      }
-      Binary.fromByteArray(reusableDecimalBytes, 0, numBytes)
-    }
-
-    def bigIntegerToBinary(unscaled: BigInteger): Binary = {
-      unscaled.toByteArray match {
-        case bytes if bytes.length == numBytes =>
-          Binary.fromByteArray(bytes)
-
-        case bytes if bytes.length <= reusableDecimalBytes.length =>
-          val signedByte = (if (bytes.head < 0) -1 else 0).toByte
-          java.util.Arrays.fill(reusableDecimalBytes, 0, numBytes - 
bytes.length, signedByte)
-          System.arraycopy(bytes, 0, reusableDecimalBytes, numBytes - 
bytes.length, bytes.length)
-          Binary.fromByteArray(reusableDecimalBytes, 0, numBytes)
-
-        case bytes =>
-          reusableDecimalBytes = new Array[Byte](bytes.length)
-          bigIntegerToBinary(unscaled)
-      }
-    }
-
-    val binary = if (numBytes <= 8) {
-      longToBinary(decimal.toUnscaledLong)
-    } else {
-      bigIntegerToBinary(decimal.toJavaBigDecimal.unscaledValue())
-    }
-
-    writer.addBinary(binary)
-  }
-
-  // array used to write Timestamp as Int96 (fixed-length binary)
-  private[this] val int96buf = new Array[Byte](12)
-
-  private[parquet] def writeTimestamp(ts: Long): Unit = {
-    val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(ts)
-    val buf = ByteBuffer.wrap(int96buf)
-    buf.order(ByteOrder.LITTLE_ENDIAN)
-    buf.putLong(timeOfDayNanos)
-    buf.putInt(julianDay)
-    writer.addBinary(Binary.fromByteArray(int96buf))
-  }
-}
-
-// Optimized for non-nested rows
-private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
-  override def write(record: InternalRow): Unit = {
-    val attributesSize = attributes.size
-    if (attributesSize > record.numFields) {
-      throw new IndexOutOfBoundsException("Trying to write more fields than 
contained in row " +
-        s"($attributesSize > ${record.numFields})")
-    }
-
-    var index = 0
-    writer.startMessage()
-    while(index < attributesSize) {
-      // null values indicate optional fields but we do not check currently
-      if (!record.isNullAt(index) && !record.isNullAt(index)) {
-        writer.startField(attributes(index).name, index)
-        consumeType(attributes(index).dataType, record, index)
-        writer.endField(attributes(index).name, index)
-      }
-      index = index + 1
-    }
-    writer.endMessage()
-  }
-
-  private def consumeType(
-      ctype: DataType,
-      record: InternalRow,
-      index: Int): Unit = {
-    ctype match {
-      case BooleanType => writer.addBoolean(record.getBoolean(index))
-      case ByteType => writer.addInteger(record.getByte(index))
-      case ShortType => writer.addInteger(record.getShort(index))
-      case IntegerType | DateType => writer.addInteger(record.getInt(index))
-      case LongType => writer.addLong(record.getLong(index))
-      case TimestampType => writeTimestamp(record.getLong(index))
-      case FloatType => writer.addFloat(record.getFloat(index))
-      case DoubleType => writer.addDouble(record.getDouble(index))
-      case StringType =>
-        
writer.addBinary(Binary.fromByteArray(record.getUTF8String(index).getBytes))
-      case BinaryType =>
-        writer.addBinary(Binary.fromByteArray(record.getBinary(index)))
-      case DecimalType.Fixed(precision, scale) =>
-        writeDecimal(record.getDecimal(index, precision, scale), precision)
-      case _ => sys.error(s"Unsupported datatype $ctype, cannot write to 
consumer")
-    }
-  }
-}
-
-private[parquet] object RowWriteSupport {
-  val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes"
-
-  def getSchema(configuration: Configuration): Seq[Attribute] = {
-    val schemaString = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA)
-    if (schemaString == null) {
-      throw new RuntimeException("Missing schema!")
-    }
-    ParquetTypesConverter.convertFromString(schemaString)
-  }
-
-  def setSchema(schema: Seq[Attribute], configuration: Configuration) {
-    val encoded = ParquetTypesConverter.convertToString(schema)
-    configuration.set(SPARK_ROW_SCHEMA, encoded)
-    configuration.set(
-      ParquetOutputFormat.WRITER_VERSION,
-      ParquetProperties.WriterVersion.PARQUET_1_0.toString)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala
deleted file mode 100644
index 3854f5b..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import java.io.IOException
-
-import scala.collection.JavaConversions._
-import scala.util.Try
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.mapreduce.Job
-import org.apache.parquet.format.converter.ParquetMetadataConverter
-import org.apache.parquet.hadoop.metadata.{FileMetaData, ParquetMetadata}
-import org.apache.parquet.hadoop.util.ContextUtil
-import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter}
-import org.apache.parquet.schema.MessageType
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.types._
-
-
-private[parquet] object ParquetTypesConverter extends Logging {
-  def isPrimitiveType(ctype: DataType): Boolean = ctype match {
-    case _: NumericType | BooleanType | DateType | TimestampType | StringType 
| BinaryType => true
-    case _ => false
-  }
-
-  /**
-   * Compute the FIXED_LEN_BYTE_ARRAY length needed to represent a given 
DECIMAL precision.
-   */
-  private[parquet] val BYTES_FOR_PRECISION = Array.tabulate[Int](38) { 
precision =>
-    var length = 1
-    while (math.pow(2.0, 8 * length - 1) < math.pow(10.0, precision)) {
-      length += 1
-    }
-    length
-  }
-
-  def convertFromAttributes(attributes: Seq[Attribute]): MessageType = {
-    val converter = new CatalystSchemaConverter()
-    converter.convert(StructType.fromAttributes(attributes))
-  }
-
-  def convertFromString(string: String): Seq[Attribute] = {
-    
Try(DataType.fromJson(string)).getOrElse(DataType.fromCaseClassString(string)) 
match {
-      case s: StructType => s.toAttributes
-      case other => sys.error(s"Can convert $string to row")
-    }
-  }
-
-  def convertToString(schema: Seq[Attribute]): String = {
-    schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName)
-    StructType.fromAttributes(schema).json
-  }
-
-  def writeMetaData(attributes: Seq[Attribute], origPath: Path, conf: 
Configuration): Unit = {
-    if (origPath == null) {
-      throw new IllegalArgumentException("Unable to write Parquet metadata: 
path is null")
-    }
-    val fs = origPath.getFileSystem(conf)
-    if (fs == null) {
-      throw new IllegalArgumentException(
-        s"Unable to write Parquet metadata: path $origPath is incorrectly 
formatted")
-    }
-    val path = origPath.makeQualified(fs)
-    if (fs.exists(path) && !fs.getFileStatus(path).isDir) {
-      throw new IllegalArgumentException(s"Expected to write to directory 
$path but found file")
-    }
-    val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)
-    if (fs.exists(metadataPath)) {
-      try {
-        fs.delete(metadataPath, true)
-      } catch {
-        case e: IOException =>
-          throw new IOException(s"Unable to delete previous 
PARQUET_METADATA_FILE at $metadataPath")
-      }
-    }
-    val extraMetadata = new java.util.HashMap[String, String]()
-    extraMetadata.put(
-      CatalystReadSupport.SPARK_METADATA_KEY,
-      ParquetTypesConverter.convertToString(attributes))
-    // TODO: add extra data, e.g., table name, date, etc.?
-
-    val parquetSchema: MessageType = 
ParquetTypesConverter.convertFromAttributes(attributes)
-    val metaData: FileMetaData = new FileMetaData(
-      parquetSchema,
-      extraMetadata,
-      "Spark")
-
-    ParquetRelation.enableLogForwarding()
-    ParquetFileWriter.writeMetadataFile(
-      conf,
-      path,
-      new Footer(path, new ParquetMetadata(metaData, Nil)) :: Nil)
-  }
-
-  /**
-   * Try to read Parquet metadata at the given Path. We first see if there is 
a summary file
-   * in the parent directory. If so, this is used. Else we read the actual 
footer at the given
-   * location.
-   * @param origPath The path at which we expect one (or more) Parquet files.
-   * @param configuration The Hadoop configuration to use.
-   * @return The `ParquetMetadata` containing among other things the schema.
-   */
-  def readMetaData(origPath: Path, configuration: Option[Configuration]): 
ParquetMetadata = {
-    if (origPath == null) {
-      throw new IllegalArgumentException("Unable to read Parquet metadata: 
path is null")
-    }
-    val job = new Job()
-    val conf = configuration.getOrElse(ContextUtil.getConfiguration(job))
-    val fs: FileSystem = origPath.getFileSystem(conf)
-    if (fs == null) {
-      throw new IllegalArgumentException(s"Incorrectly formatted Parquet 
metadata path $origPath")
-    }
-    val path = origPath.makeQualified(fs)
-
-    val children =
-      fs
-        .globStatus(path)
-        .flatMap { status => if (status.isDir) fs.listStatus(status.getPath) 
else List(status) }
-        .filterNot { status =>
-          val name = status.getPath.getName
-          (name(0) == '.' || name(0) == '_') && name != 
ParquetFileWriter.PARQUET_METADATA_FILE
-        }
-
-    ParquetRelation.enableLogForwarding()
-
-    // NOTE (lian): Parquet "_metadata" file can be very slow if the file 
consists of lots of row
-    // groups. Since Parquet schema is replicated among all row groups, we 
only need to touch a
-    // single row group to read schema related metadata. Notice that we are 
making assumptions that
-    // all data in a single Parquet file have the same schema, which is 
normally true.
-    children
-      // Try any non-"_metadata" file first...
-      .find(_.getPath.getName != ParquetFileWriter.PARQUET_METADATA_FILE)
-      // ... and fallback to "_metadata" if no such file exists (which implies 
the Parquet file is
-      // empty, thus normally the "_metadata" file is expected to be fairly 
small).
-      .orElse(children.find(_.getPath.getName == 
ParquetFileWriter.PARQUET_METADATA_FILE))
-      .map(ParquetFileReader.readFooter(conf, _, 
ParquetMetadataConverter.NO_FILTER))
-      .getOrElse(
-        throw new IllegalArgumentException(s"Could not find Parquet metadata 
at path $path"))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 6bcabba..2f8417a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -43,19 +43,24 @@ import org.apache.spark.util.SerializableConfiguration
  * This allows users to give the data source alias as the format type over the 
fully qualified
  * class name.
  *
- * ex: parquet.DefaultSource.format = "parquet".
- *
  * A new instance of this class with be instantiated each time a DDL call is 
made.
+ *
+ * @since 1.5.0
  */
 @DeveloperApi
 trait DataSourceRegister {
 
   /**
    * The string that represents the format that this data source provider 
uses. This is
-   * overridden by children to provide a nice alias for the data source,
-   * ex: override def format(): String = "parquet"
+   * overridden by children to provide a nice alias for the data source. For 
example:
+   *
+   * {{{
+   *   override def format(): String = "parquet"
+   * }}}
+   *
+   * @since 1.5.0
    */
-  def format(): String
+  def shortName(): String
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/ui/AllExecutionsPage.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/ui/AllExecutionsPage.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/ui/AllExecutionsPage.scala
deleted file mode 100644
index cb7ca60..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/ui/AllExecutionsPage.scala
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.ui
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.collection.mutable
-import scala.xml.Node
-
-import org.apache.commons.lang3.StringEscapeUtils
-
-import org.apache.spark.Logging
-import org.apache.spark.ui.{UIUtils, WebUIPage}
-
-private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with 
Logging {
-
-  private val listener = parent.listener
-
-  override def render(request: HttpServletRequest): Seq[Node] = {
-    val currentTime = System.currentTimeMillis()
-    val content = listener.synchronized {
-      val _content = mutable.ListBuffer[Node]()
-      if (listener.getRunningExecutions.nonEmpty) {
-        _content ++=
-          new RunningExecutionTable(
-            parent, "Running Queries", currentTime,
-            
listener.getRunningExecutions.sortBy(_.submissionTime).reverse).toNodeSeq
-      }
-      if (listener.getCompletedExecutions.nonEmpty) {
-        _content ++=
-          new CompletedExecutionTable(
-            parent, "Completed Queries", currentTime,
-            
listener.getCompletedExecutions.sortBy(_.submissionTime).reverse).toNodeSeq
-      }
-      if (listener.getFailedExecutions.nonEmpty) {
-        _content ++=
-          new FailedExecutionTable(
-            parent, "Failed Queries", currentTime,
-            
listener.getFailedExecutions.sortBy(_.submissionTime).reverse).toNodeSeq
-      }
-      _content
-    }
-    UIUtils.headerSparkPage("SQL", content, parent, Some(5000))
-  }
-}
-
-private[ui] abstract class ExecutionTable(
-    parent: SQLTab,
-    tableId: String,
-    tableName: String,
-    currentTime: Long,
-    executionUIDatas: Seq[SQLExecutionUIData],
-    showRunningJobs: Boolean,
-    showSucceededJobs: Boolean,
-    showFailedJobs: Boolean) {
-
-  protected def baseHeader: Seq[String] = Seq(
-    "ID",
-    "Description",
-    "Submitted",
-    "Duration")
-
-  protected def header: Seq[String]
-
-  protected def row(currentTime: Long, executionUIData: SQLExecutionUIData): 
Seq[Node] = {
-    val submissionTime = executionUIData.submissionTime
-    val duration = executionUIData.completionTime.getOrElse(currentTime) - 
submissionTime
-
-    val runningJobs = executionUIData.runningJobs.map { jobId =>
-      <a href={jobURL(jobId)}>{jobId.toString}</a><br/>
-    }
-    val succeededJobs = executionUIData.succeededJobs.sorted.map { jobId =>
-      <a href={jobURL(jobId)}>{jobId.toString}</a><br/>
-    }
-    val failedJobs = executionUIData.failedJobs.sorted.map { jobId =>
-      <a href={jobURL(jobId)}>{jobId.toString}</a><br/>
-    }
-    <tr>
-      <td>
-        {executionUIData.executionId.toString}
-      </td>
-      <td>
-        {descriptionCell(executionUIData)}
-      </td>
-      <td sorttable_customkey={submissionTime.toString}>
-        {UIUtils.formatDate(submissionTime)}
-      </td>
-      <td sorttable_customkey={duration.toString}>
-        {UIUtils.formatDuration(duration)}
-      </td>
-      {if (showRunningJobs) {
-        <td>
-          {runningJobs}
-        </td>
-      }}
-      {if (showSucceededJobs) {
-        <td>
-          {succeededJobs}
-        </td>
-      }}
-      {if (showFailedJobs) {
-        <td>
-          {failedJobs}
-        </td>
-      }}
-      {detailCell(executionUIData.physicalPlanDescription)}
-    </tr>
-  }
-
-  private def descriptionCell(execution: SQLExecutionUIData): Seq[Node] = {
-    val details = if (execution.details.nonEmpty) {
-      <span 
onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')"
-            class="expand-details">
-        +details
-      </span> ++
-      <div class="stage-details collapsed">
-        <pre>{execution.details}</pre>
-      </div>
-    } else {
-      Nil
-    }
-
-    val desc = {
-      <a href={executionURL(execution.executionId)}>{execution.description}</a>
-    }
-
-    <div>{desc} {details}</div>
-  }
-
-  private def detailCell(physicalPlan: String): Seq[Node] = {
-    val isMultiline = physicalPlan.indexOf('\n') >= 0
-    val summary = StringEscapeUtils.escapeHtml4(
-      if (isMultiline) {
-        physicalPlan.substring(0, physicalPlan.indexOf('\n'))
-      } else {
-        physicalPlan
-      })
-    val details = if (isMultiline) {
-      // scalastyle:off
-      <span 
onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
-            class="expand-details">
-        +details
-      </span> ++
-        <div class="stacktrace-details collapsed">
-          <pre>{physicalPlan}</pre>
-        </div>
-      // scalastyle:on
-    } else {
-      ""
-    }
-    <td>{summary}{details}</td>
-  }
-
-  def toNodeSeq: Seq[Node] = {
-    <div>
-      <h4>{tableName}</h4>
-      {UIUtils.listingTable[SQLExecutionUIData](
-        header, row(currentTime, _), executionUIDatas, id = Some(tableId))}
-    </div>
-  }
-
-  private def jobURL(jobId: Long): String =
-    "%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), jobId)
-
-  private def executionURL(executionID: Long): String =
-    
s"${UIUtils.prependBaseUri(parent.basePath)}/${parent.prefix}/execution?id=$executionID"
-}
-
-private[ui] class RunningExecutionTable(
-    parent: SQLTab,
-    tableName: String,
-    currentTime: Long,
-    executionUIDatas: Seq[SQLExecutionUIData])
-  extends ExecutionTable(
-    parent,
-    "running-execution-table",
-    tableName,
-    currentTime,
-    executionUIDatas,
-    showRunningJobs = true,
-    showSucceededJobs = true,
-    showFailedJobs = true) {
-
-  override protected def header: Seq[String] =
-    baseHeader ++ Seq("Running Jobs", "Succeeded Jobs", "Failed Jobs", 
"Detail")
-}
-
-private[ui] class CompletedExecutionTable(
-    parent: SQLTab,
-    tableName: String,
-    currentTime: Long,
-    executionUIDatas: Seq[SQLExecutionUIData])
-  extends ExecutionTable(
-    parent,
-    "completed-execution-table",
-    tableName,
-    currentTime,
-    executionUIDatas,
-    showRunningJobs = false,
-    showSucceededJobs = true,
-    showFailedJobs = false) {
-
-  override protected def header: Seq[String] = baseHeader ++ Seq("Jobs", 
"Detail")
-}
-
-private[ui] class FailedExecutionTable(
-    parent: SQLTab,
-    tableName: String,
-    currentTime: Long,
-    executionUIDatas: Seq[SQLExecutionUIData])
-  extends ExecutionTable(
-    parent,
-    "failed-execution-table",
-    tableName,
-    currentTime,
-    executionUIDatas,
-    showRunningJobs = false,
-    showSucceededJobs = true,
-    showFailedJobs = true) {
-
-  override protected def header: Seq[String] =
-    baseHeader ++ Seq("Succeeded Jobs", "Failed Jobs", "Detail")
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/ui/ExecutionPage.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/ui/ExecutionPage.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/ui/ExecutionPage.scala
deleted file mode 100644
index 52ddf99..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/ui/ExecutionPage.scala
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.ui
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.{Node, Unparsed}
-
-import org.apache.commons.lang3.StringEscapeUtils
-
-import org.apache.spark.Logging
-import org.apache.spark.ui.{UIUtils, WebUIPage}
-
-private[sql] class ExecutionPage(parent: SQLTab) extends 
WebUIPage("execution") with Logging {
-
-  private val listener = parent.listener
-
-  override def render(request: HttpServletRequest): Seq[Node] = 
listener.synchronized {
-    val parameterExecutionId = request.getParameter("id")
-    require(parameterExecutionId != null && parameterExecutionId.nonEmpty,
-      "Missing execution id parameter")
-
-    val executionId = parameterExecutionId.toLong
-    val content = listener.getExecution(executionId).map { executionUIData =>
-      val currentTime = System.currentTimeMillis()
-      val duration =
-        executionUIData.completionTime.getOrElse(currentTime) - 
executionUIData.submissionTime
-
-      val summary =
-        <div>
-          <ul class="unstyled">
-            <li>
-              <strong>Submitted Time: 
</strong>{UIUtils.formatDate(executionUIData.submissionTime)}
-            </li>
-            <li>
-              <strong>Duration: </strong>{UIUtils.formatDuration(duration)}
-            </li>
-            {if (executionUIData.runningJobs.nonEmpty) {
-              <li>
-                <strong>Running Jobs: </strong>
-                {executionUIData.runningJobs.sorted.map { jobId =>
-                <a href={jobURL(jobId)}>{jobId.toString}</a><span>&nbsp;</span>
-              }}
-              </li>
-            }}
-            {if (executionUIData.succeededJobs.nonEmpty) {
-              <li>
-                <strong>Succeeded Jobs: </strong>
-                {executionUIData.succeededJobs.sorted.map { jobId =>
-                  <a 
href={jobURL(jobId)}>{jobId.toString}</a><span>&nbsp;</span>
-                }}
-              </li>
-            }}
-            {if (executionUIData.failedJobs.nonEmpty) {
-              <li>
-                <strong>Failed Jobs: </strong>
-                {executionUIData.failedJobs.sorted.map { jobId =>
-                  <a 
href={jobURL(jobId)}>{jobId.toString}</a><span>&nbsp;</span>
-                }}
-              </li>
-            }}
-            <li>
-              <strong>Detail: </strong><br/>
-              <pre>{executionUIData.physicalPlanDescription}</pre>
-            </li>
-          </ul>
-        </div>
-
-      val metrics = listener.getExecutionMetrics(executionId)
-
-      summary ++ planVisualization(metrics, executionUIData.physicalPlanGraph)
-    }.getOrElse {
-      <div>No information to display for Plan {executionId}</div>
-    }
-
-    UIUtils.headerSparkPage(s"Details for Query $executionId", content, 
parent, Some(5000))
-  }
-
-
-  private def planVisualizationResources: Seq[Node] = {
-    // scalastyle:off
-    <link rel="stylesheet" 
href={UIUtils.prependBaseUri("/static/sql/spark-sql-viz.css")} type="text/css"/>
-    <script src={UIUtils.prependBaseUri("/static/d3.min.js")}></script>
-    <script src={UIUtils.prependBaseUri("/static/dagre-d3.min.js")}></script>
-    <script 
src={UIUtils.prependBaseUri("/static/graphlib-dot.min.js")}></script>
-    <script 
src={UIUtils.prependBaseUri("/static/sql/spark-sql-viz.js")}></script>
-    // scalastyle:on
-  }
-
-  private def planVisualization(metrics: Map[Long, Any], graph: 
SparkPlanGraph): Seq[Node] = {
-    val metadata = graph.nodes.flatMap { node =>
-      val nodeId = s"plan-meta-data-${node.id}"
-      <div id={nodeId}>{node.desc}</div>
-    }
-
-    <div>
-      <div id="plan-viz-graph"></div>
-      <div id="plan-viz-metadata" style="display:none">
-        <div class="dot-file">
-          {graph.makeDotFile(metrics)}
-        </div>
-        <div id="plan-viz-metadata-size">{graph.nodes.size.toString}</div>
-        {metadata}
-      </div>
-      {planVisualizationResources}
-      <script>$(function(){{ renderPlanViz(); }})</script>
-    </div>
-  }
-
-  private def jobURL(jobId: Long): String =
-    "%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), jobId)
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLListener.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLListener.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLListener.scala
deleted file mode 100644
index 2fd4fc6..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLListener.scala
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.ui
-
-import scala.collection.mutable
-
-import com.google.common.annotations.VisibleForTesting
-
-import org.apache.spark.{JobExecutionStatus, Logging}
-import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.scheduler._
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.metric.{SQLMetricParam, SQLMetricValue}
-
-private[sql] class SQLListener(sqlContext: SQLContext) extends SparkListener 
with Logging {
-
-  private val retainedExecutions =
-    sqlContext.sparkContext.conf.getInt("spark.sql.ui.retainedExecutions", 
1000)
-
-  private val activeExecutions = mutable.HashMap[Long, SQLExecutionUIData]()
-
-  // Old data in the following fields must be removed in 
"trimExecutionsIfNecessary".
-  // If adding new fields, make sure "trimExecutionsIfNecessary" can clean up 
old data
-  private val _executionIdToData = mutable.HashMap[Long, SQLExecutionUIData]()
-
-  /**
-   * Maintain the relation between job id and execution id so that we can get 
the execution id in
-   * the "onJobEnd" method.
-   */
-  private val _jobIdToExecutionId = mutable.HashMap[Long, Long]()
-
-  private val _stageIdToStageMetrics = mutable.HashMap[Long, SQLStageMetrics]()
-
-  private val failedExecutions = mutable.ListBuffer[SQLExecutionUIData]()
-
-  private val completedExecutions = mutable.ListBuffer[SQLExecutionUIData]()
-
-  @VisibleForTesting
-  def executionIdToData: Map[Long, SQLExecutionUIData] = synchronized {
-    _executionIdToData.toMap
-  }
-
-  @VisibleForTesting
-  def jobIdToExecutionId: Map[Long, Long] = synchronized {
-    _jobIdToExecutionId.toMap
-  }
-
-  @VisibleForTesting
-  def stageIdToStageMetrics: Map[Long, SQLStageMetrics] = synchronized {
-    _stageIdToStageMetrics.toMap
-  }
-
-  private def trimExecutionsIfNecessary(
-      executions: mutable.ListBuffer[SQLExecutionUIData]): Unit = {
-    if (executions.size > retainedExecutions) {
-      val toRemove = math.max(retainedExecutions / 10, 1)
-      executions.take(toRemove).foreach { execution =>
-        for (executionUIData <- 
_executionIdToData.remove(execution.executionId)) {
-          for (jobId <- executionUIData.jobs.keys) {
-            _jobIdToExecutionId.remove(jobId)
-          }
-          for (stageId <- executionUIData.stages) {
-            _stageIdToStageMetrics.remove(stageId)
-          }
-        }
-      }
-      executions.trimStart(toRemove)
-    }
-  }
-
-  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
-    val executionIdString = 
jobStart.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
-    if (executionIdString == null) {
-      // This is not a job created by SQL
-      return
-    }
-    val executionId = executionIdString.toLong
-    val jobId = jobStart.jobId
-    val stageIds = jobStart.stageIds
-
-    synchronized {
-      activeExecutions.get(executionId).foreach { executionUIData =>
-        executionUIData.jobs(jobId) = JobExecutionStatus.RUNNING
-        executionUIData.stages ++= stageIds
-        stageIds.foreach(stageId =>
-          _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId 
= 0))
-        _jobIdToExecutionId(jobId) = executionId
-      }
-    }
-  }
-
-  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
-    val jobId = jobEnd.jobId
-    for (executionId <- _jobIdToExecutionId.get(jobId);
-         executionUIData <- _executionIdToData.get(executionId)) {
-      jobEnd.jobResult match {
-        case JobSucceeded => executionUIData.jobs(jobId) = 
JobExecutionStatus.SUCCEEDED
-        case JobFailed(_) => executionUIData.jobs(jobId) = 
JobExecutionStatus.FAILED
-      }
-      if (executionUIData.completionTime.nonEmpty && 
!executionUIData.hasRunningJobs) {
-        // We are the last job of this execution, so mark the execution as 
finished. Note that
-        // `onExecutionEnd` also does this, but currently that can be called 
before `onJobEnd`
-        // since these are called on different threads.
-        markExecutionFinished(executionId)
-      }
-    }
-  }
-
-  override def onExecutorMetricsUpdate(
-      executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = 
synchronized {
-    for ((taskId, stageId, stageAttemptID, metrics) <- 
executorMetricsUpdate.taskMetrics) {
-      updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, metrics, 
finishTask = false)
-    }
-  }
-
-  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): 
Unit = synchronized {
-    val stageId = stageSubmitted.stageInfo.stageId
-    val stageAttemptId = stageSubmitted.stageInfo.attemptId
-    // Always override metrics for old stage attempt
-    _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId)
-  }
-
-  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
-    updateTaskAccumulatorValues(
-      taskEnd.taskInfo.taskId,
-      taskEnd.stageId,
-      taskEnd.stageAttemptId,
-      taskEnd.taskMetrics,
-      finishTask = true)
-  }
-
-  /**
-   * Update the accumulator values of a task with the latest metrics for this 
task. This is called
-   * every time we receive an executor heartbeat or when a task finishes.
-   */
-  private def updateTaskAccumulatorValues(
-      taskId: Long,
-      stageId: Int,
-      stageAttemptID: Int,
-      metrics: TaskMetrics,
-      finishTask: Boolean): Unit = {
-    if (metrics == null) {
-      return
-    }
-
-    _stageIdToStageMetrics.get(stageId) match {
-      case Some(stageMetrics) =>
-        if (stageAttemptID < stageMetrics.stageAttemptId) {
-          // A task of an old stage attempt. Because a new stage is submitted, 
we can ignore it.
-        } else if (stageAttemptID > stageMetrics.stageAttemptId) {
-          logWarning(s"A task should not have a higher stageAttemptID 
($stageAttemptID) then " +
-            s"what we have seen (${stageMetrics.stageAttemptId}})")
-        } else {
-          // TODO We don't know the attemptId. Currently, what we can do is 
overriding the
-          // accumulator updates. However, if there are two same task are 
running, such as
-          // speculation, the accumulator updates will be overriding by 
different task attempts,
-          // the results will be weird.
-          stageMetrics.taskIdToMetricUpdates.get(taskId) match {
-            case Some(taskMetrics) =>
-              if (finishTask) {
-                taskMetrics.finished = true
-                taskMetrics.accumulatorUpdates = metrics.accumulatorUpdates()
-              } else if (!taskMetrics.finished) {
-                taskMetrics.accumulatorUpdates = metrics.accumulatorUpdates()
-              } else {
-                // If a task is finished, we should not override with 
accumulator updates from
-                // heartbeat reports
-              }
-            case None =>
-              // TODO Now just set attemptId to 0. Should fix here when we can 
get the attempt
-              // id from SparkListenerExecutorMetricsUpdate
-              stageMetrics.taskIdToMetricUpdates(taskId) = new SQLTaskMetrics(
-                  attemptId = 0, finished = finishTask, 
metrics.accumulatorUpdates())
-          }
-        }
-      case None =>
-      // This execution and its stage have been dropped
-    }
-  }
-
-  def onExecutionStart(
-      executionId: Long,
-      description: String,
-      details: String,
-      physicalPlanDescription: String,
-      physicalPlanGraph: SparkPlanGraph,
-      time: Long): Unit = {
-    val sqlPlanMetrics = physicalPlanGraph.nodes.flatMap { node =>
-      node.metrics.map(metric => metric.accumulatorId -> metric)
-    }
-
-    val executionUIData = new SQLExecutionUIData(executionId, description, 
details,
-      physicalPlanDescription, physicalPlanGraph, sqlPlanMetrics.toMap, time)
-    synchronized {
-      activeExecutions(executionId) = executionUIData
-      _executionIdToData(executionId) = executionUIData
-    }
-  }
-
-  def onExecutionEnd(executionId: Long, time: Long): Unit = synchronized {
-    _executionIdToData.get(executionId).foreach { executionUIData =>
-      executionUIData.completionTime = Some(time)
-      if (!executionUIData.hasRunningJobs) {
-        // onExecutionEnd happens after all "onJobEnd"s
-        // So we should update the execution lists.
-        markExecutionFinished(executionId)
-      } else {
-        // There are some running jobs, onExecutionEnd happens before some 
"onJobEnd"s.
-        // Then we don't if the execution is successful, so let the last 
onJobEnd updates the
-        // execution lists.
-      }
-    }
-  }
-
-  private def markExecutionFinished(executionId: Long): Unit = {
-    activeExecutions.remove(executionId).foreach { executionUIData =>
-      if (executionUIData.isFailed) {
-        failedExecutions += executionUIData
-        trimExecutionsIfNecessary(failedExecutions)
-      } else {
-        completedExecutions += executionUIData
-        trimExecutionsIfNecessary(completedExecutions)
-      }
-    }
-  }
-
-  def getRunningExecutions: Seq[SQLExecutionUIData] = synchronized {
-    activeExecutions.values.toSeq
-  }
-
-  def getFailedExecutions: Seq[SQLExecutionUIData] = synchronized {
-    failedExecutions
-  }
-
-  def getCompletedExecutions: Seq[SQLExecutionUIData] = synchronized {
-    completedExecutions
-  }
-
-  def getExecution(executionId: Long): Option[SQLExecutionUIData] = 
synchronized {
-    _executionIdToData.get(executionId)
-  }
-
-  /**
-   * Get all accumulator updates from all tasks which belong to this execution 
and merge them.
-   */
-  def getExecutionMetrics(executionId: Long): Map[Long, Any] = synchronized {
-    _executionIdToData.get(executionId) match {
-      case Some(executionUIData) =>
-        val accumulatorUpdates = {
-          for (stageId <- executionUIData.stages;
-               stageMetrics <- _stageIdToStageMetrics.get(stageId).toIterable;
-               taskMetrics <- stageMetrics.taskIdToMetricUpdates.values;
-               accumulatorUpdate <- taskMetrics.accumulatorUpdates.toSeq) 
yield {
-            accumulatorUpdate
-          }
-        }.filter { case (id, _) => 
executionUIData.accumulatorMetrics.contains(id) }
-        mergeAccumulatorUpdates(accumulatorUpdates, accumulatorId =>
-          executionUIData.accumulatorMetrics(accumulatorId).metricParam).
-          mapValues(_.asInstanceOf[SQLMetricValue[_]].value)
-      case None =>
-        // This execution has been dropped
-        Map.empty
-    }
-  }
-
-  private def mergeAccumulatorUpdates(
-      accumulatorUpdates: Seq[(Long, Any)],
-      paramFunc: Long => SQLMetricParam[SQLMetricValue[Any], Any]): Map[Long, 
Any] = {
-    accumulatorUpdates.groupBy(_._1).map { case (accumulatorId, values) =>
-      val param = paramFunc(accumulatorId)
-      (accumulatorId,
-        
values.map(_._2.asInstanceOf[SQLMetricValue[Any]]).foldLeft(param.zero)(param.addInPlace))
-    }
-  }
-
-}
-
-/**
- * Represent all necessary data for an execution that will be used in Web UI.
- */
-private[ui] class SQLExecutionUIData(
-    val executionId: Long,
-    val description: String,
-    val details: String,
-    val physicalPlanDescription: String,
-    val physicalPlanGraph: SparkPlanGraph,
-    val accumulatorMetrics: Map[Long, SQLPlanMetric],
-    val submissionTime: Long,
-    var completionTime: Option[Long] = None,
-    val jobs: mutable.HashMap[Long, JobExecutionStatus] = 
mutable.HashMap.empty,
-    val stages: mutable.ArrayBuffer[Int] = mutable.ArrayBuffer()) {
-
-  /**
-   * Return whether there are running jobs in this execution.
-   */
-  def hasRunningJobs: Boolean = jobs.values.exists(_ == 
JobExecutionStatus.RUNNING)
-
-  /**
-   * Return whether there are any failed jobs in this execution.
-   */
-  def isFailed: Boolean = jobs.values.exists(_ == JobExecutionStatus.FAILED)
-
-  def runningJobs: Seq[Long] =
-    jobs.filter { case (_, status) => status == JobExecutionStatus.RUNNING 
}.keys.toSeq
-
-  def succeededJobs: Seq[Long] =
-    jobs.filter { case (_, status) => status == JobExecutionStatus.SUCCEEDED 
}.keys.toSeq
-
-  def failedJobs: Seq[Long] =
-    jobs.filter { case (_, status) => status == JobExecutionStatus.FAILED 
}.keys.toSeq
-}
-
-/**
- * Represent a metric in a SQLPlan.
- *
- * Because we cannot revert our changes for an "Accumulator", we need to 
maintain accumulator
- * updates for each task. So that if a task is retried, we can simply override 
the old updates with
- * the new updates of the new attempt task. Since we cannot add them to 
accumulator, we need to use
- * "AccumulatorParam" to get the aggregation value.
- */
-private[ui] case class SQLPlanMetric(
-    name: String,
-    accumulatorId: Long,
-    metricParam: SQLMetricParam[SQLMetricValue[Any], Any])
-
-/**
- * Store all accumulatorUpdates for all tasks in a Spark stage.
- */
-private[ui] class SQLStageMetrics(
-    val stageAttemptId: Long,
-    val taskIdToMetricUpdates: mutable.HashMap[Long, SQLTaskMetrics] = 
mutable.HashMap.empty)
-
-/**
- * Store all accumulatorUpdates for a Spark task.
- */
-private[ui] class SQLTaskMetrics(
-    val attemptId: Long, // TODO not used yet
-    var finished: Boolean,
-    var accumulatorUpdates: Map[Long, Any])

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLTab.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLTab.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLTab.scala
deleted file mode 100644
index 3bba0af..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLTab.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.ui
-
-import java.util.concurrent.atomic.AtomicInteger
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.ui.{SparkUI, SparkUITab}
-
-private[sql] class SQLTab(sqlContext: SQLContext, sparkUI: SparkUI)
-  extends SparkUITab(sparkUI, SQLTab.nextTabName) with Logging {
-
-  val parent = sparkUI
-  val listener = sqlContext.listener
-
-  attachPage(new AllExecutionsPage(this))
-  attachPage(new ExecutionPage(this))
-  parent.attachTab(this)
-
-  parent.addStaticHandler(SQLTab.STATIC_RESOURCE_DIR, "/static/sql")
-}
-
-private[sql] object SQLTab {
-
-  private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/ui/static"
-
-  private val nextTabId = new AtomicInteger(0)
-
-  private def nextTabName: String = {
-    val nextId = nextTabId.getAndIncrement()
-    if (nextId == 0) "SQL" else s"SQL${nextId}"
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to