http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala deleted file mode 100644 index b6db71b..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ /dev/null @@ -1,796 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import java.net.URI -import java.util.logging.{Level, Logger => JLogger} -import java.util.{List => JList} - -import scala.collection.JavaConversions._ -import scala.collection.mutable -import scala.util.{Failure, Try} - -import com.google.common.base.Objects -import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.io.Writable -import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat -import org.apache.parquet.filter2.predicate.FilterApi -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.apache.parquet.hadoop.util.ContextUtil -import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetRecordReader, _} -import org.apache.parquet.schema.MessageType -import org.apache.parquet.{Log => ParquetLog} - -import org.apache.spark.{Logging, Partition => SparkPartition, SparkException} -import org.apache.spark.broadcast.Broadcast -import org.apache.spark.rdd.{SqlNewHadoopPartition, SqlNewHadoopRDD, RDD} -import org.apache.spark.rdd.RDD._ -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.PartitionSpec -import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.{DataType, StructType} -import org.apache.spark.util.{SerializableConfiguration, Utils} - - -private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister { - - def format(): String = "parquet" - - override def createRelation( - sqlContext: SQLContext, - paths: Array[String], - schema: Option[StructType], - partitionColumns: Option[StructType], - parameters: Map[String, String]): HadoopFsRelation = { - new ParquetRelation(paths, schema, None, partitionColumns, parameters)(sqlContext) - } -} - -// NOTE: This class is instantiated and used on executor side only, no need to be serializable. -private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext) - extends OutputWriter { - - private val recordWriter: RecordWriter[Void, InternalRow] = { - val outputFormat = { - new ParquetOutputFormat[InternalRow]() { - // Here we override `getDefaultWorkFile` for two reasons: - // - // 1. To allow appending. We need to generate unique output file names to avoid - // overwriting existing files (either exist before the write job, or are just written - // by other tasks within the same write job). - // - // 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses - // `FileOutputCommitter.getWorkPath()`, which points to the base directory of all - // partitions in the case of dynamic partitioning. - override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { - val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID") - val split = context.getTaskAttemptID.getTaskID.getId - new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension") - } - } - } - - outputFormat.getRecordWriter(context) - } - - override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") - - override protected[sql] def writeInternal(row: InternalRow): Unit = recordWriter.write(null, row) - - override def close(): Unit = recordWriter.close(context) -} - -private[sql] class ParquetRelation( - override val paths: Array[String], - private val maybeDataSchema: Option[StructType], - // This is for metastore conversion. - private val maybePartitionSpec: Option[PartitionSpec], - override val userDefinedPartitionColumns: Option[StructType], - parameters: Map[String, String])( - val sqlContext: SQLContext) - extends HadoopFsRelation(maybePartitionSpec) - with Logging { - - private[sql] def this( - paths: Array[String], - maybeDataSchema: Option[StructType], - maybePartitionSpec: Option[PartitionSpec], - parameters: Map[String, String])( - sqlContext: SQLContext) = { - this( - paths, - maybeDataSchema, - maybePartitionSpec, - maybePartitionSpec.map(_.partitionColumns), - parameters)(sqlContext) - } - - // Should we merge schemas from all Parquet part-files? - private val shouldMergeSchemas = - parameters - .get(ParquetRelation.MERGE_SCHEMA) - .map(_.toBoolean) - .getOrElse(sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED)) - - private val mergeRespectSummaries = - sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES) - - private val maybeMetastoreSchema = parameters - .get(ParquetRelation.METASTORE_SCHEMA) - .map(DataType.fromJson(_).asInstanceOf[StructType]) - - private lazy val metadataCache: MetadataCache = { - val meta = new MetadataCache - meta.refresh() - meta - } - - override def equals(other: Any): Boolean = other match { - case that: ParquetRelation => - val schemaEquality = if (shouldMergeSchemas) { - this.shouldMergeSchemas == that.shouldMergeSchemas - } else { - this.dataSchema == that.dataSchema && - this.schema == that.schema - } - - this.paths.toSet == that.paths.toSet && - schemaEquality && - this.maybeDataSchema == that.maybeDataSchema && - this.partitionColumns == that.partitionColumns - - case _ => false - } - - override def hashCode(): Int = { - if (shouldMergeSchemas) { - Objects.hashCode( - Boolean.box(shouldMergeSchemas), - paths.toSet, - maybeDataSchema, - partitionColumns) - } else { - Objects.hashCode( - Boolean.box(shouldMergeSchemas), - paths.toSet, - dataSchema, - schema, - maybeDataSchema, - partitionColumns) - } - } - - /** Constraints on schema of dataframe to be stored. */ - private def checkConstraints(schema: StructType): Unit = { - if (schema.fieldNames.length != schema.fieldNames.distinct.length) { - val duplicateColumns = schema.fieldNames.groupBy(identity).collect { - case (x, ys) if ys.length > 1 => "\"" + x + "\"" - }.mkString(", ") - throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " + - s"cannot save to parquet format") - } - } - - override def dataSchema: StructType = { - val schema = maybeDataSchema.getOrElse(metadataCache.dataSchema) - // check if schema satisfies the constraints - // before moving forward - checkConstraints(schema) - schema - } - - override private[sql] def refresh(): Unit = { - super.refresh() - metadataCache.refresh() - } - - // Parquet data source always uses Catalyst internal representations. - override val needConversion: Boolean = false - - override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum - - override def prepareJobForWrite(job: Job): OutputWriterFactory = { - val conf = ContextUtil.getConfiguration(job) - - val committerClass = - conf.getClass( - SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key, - classOf[ParquetOutputCommitter], - classOf[ParquetOutputCommitter]) - - if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) { - logInfo("Using default output committer for Parquet: " + - classOf[ParquetOutputCommitter].getCanonicalName) - } else { - logInfo("Using user defined output committer for Parquet: " + committerClass.getCanonicalName) - } - - conf.setClass( - SQLConf.OUTPUT_COMMITTER_CLASS.key, - committerClass, - classOf[ParquetOutputCommitter]) - - // We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override - // it in `ParquetOutputWriter` to support appending and dynamic partitioning. The reason why - // we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is - // bundled with `ParquetOutputFormat[Row]`. - job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]]) - - // TODO There's no need to use two kinds of WriteSupport - // We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and - // complex types. - val writeSupportClass = - if (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) { - classOf[MutableRowWriteSupport] - } else { - classOf[RowWriteSupport] - } - - ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass) - RowWriteSupport.setSchema(dataSchema.toAttributes, conf) - - // Sets compression scheme - conf.set( - ParquetOutputFormat.COMPRESSION, - ParquetRelation - .shortParquetCompressionCodecNames - .getOrElse( - sqlContext.conf.parquetCompressionCodec.toUpperCase, - CompressionCodecName.UNCOMPRESSED).name()) - - new OutputWriterFactory { - override def newInstance( - path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - new ParquetOutputWriter(path, context) - } - } - } - - override def buildScan( - requiredColumns: Array[String], - filters: Array[Filter], - inputFiles: Array[FileStatus], - broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = { - val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA) - val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown - val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString - val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp - val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec - - // Create the function to set variable Parquet confs at both driver and executor side. - val initLocalJobFuncOpt = - ParquetRelation.initializeLocalJobFunc( - requiredColumns, - filters, - dataSchema, - useMetadataCache, - parquetFilterPushDown, - assumeBinaryIsString, - assumeInt96IsTimestamp, - followParquetFormatSpec) _ - - // Create the function to set input paths at the driver side. - val setInputPaths = ParquetRelation.initializeDriverSideJobFunc(inputFiles) _ - - Utils.withDummyCallSite(sqlContext.sparkContext) { - new SqlNewHadoopRDD( - sc = sqlContext.sparkContext, - broadcastedConf = broadcastedConf, - initDriverSideJobFuncOpt = Some(setInputPaths), - initLocalJobFuncOpt = Some(initLocalJobFuncOpt), - inputFormatClass = classOf[ParquetInputFormat[InternalRow]], - valueClass = classOf[InternalRow]) { - - val cacheMetadata = useMetadataCache - - @transient val cachedStatuses = inputFiles.map { f => - // In order to encode the authority of a Path containing special characters such as '/' - // (which does happen in some S3N credentials), we need to use the string returned by the - // URI of the path to create a new Path. - val pathWithEscapedAuthority = escapePathUserInfo(f.getPath) - new FileStatus( - f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime, - f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority) - }.toSeq - - private def escapePathUserInfo(path: Path): Path = { - val uri = path.toUri - new Path(new URI( - uri.getScheme, uri.getRawUserInfo, uri.getHost, uri.getPort, uri.getPath, - uri.getQuery, uri.getFragment)) - } - - // Overridden so we can inject our own cached files statuses. - override def getPartitions: Array[SparkPartition] = { - val inputFormat = new ParquetInputFormat[InternalRow] { - override def listStatus(jobContext: JobContext): JList[FileStatus] = { - if (cacheMetadata) cachedStatuses else super.listStatus(jobContext) - } - } - - val jobContext = newJobContext(getConf(isDriverSide = true), jobId) - val rawSplits = inputFormat.getSplits(jobContext) - - Array.tabulate[SparkPartition](rawSplits.size) { i => - new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) - } - } - }.asInstanceOf[RDD[Row]] // type erasure hack to pass RDD[InternalRow] as RDD[Row] - } - } - - private class MetadataCache { - // `FileStatus` objects of all "_metadata" files. - private var metadataStatuses: Array[FileStatus] = _ - - // `FileStatus` objects of all "_common_metadata" files. - private var commonMetadataStatuses: Array[FileStatus] = _ - - // `FileStatus` objects of all data files (Parquet part-files). - var dataStatuses: Array[FileStatus] = _ - - // Schema of the actual Parquet files, without partition columns discovered from partition - // directory paths. - var dataSchema: StructType = null - - // Schema of the whole table, including partition columns. - var schema: StructType = _ - - // Cached leaves - var cachedLeaves: Set[FileStatus] = null - - /** - * Refreshes `FileStatus`es, footers, partition spec, and table schema. - */ - def refresh(): Unit = { - val currentLeafStatuses = cachedLeafStatuses() - - // Check if cachedLeafStatuses is changed or not - val leafStatusesChanged = (cachedLeaves == null) || - !cachedLeaves.equals(currentLeafStatuses) - - if (leafStatusesChanged) { - cachedLeaves = currentLeafStatuses.toIterator.toSet - - // Lists `FileStatus`es of all leaf nodes (files) under all base directories. - val leaves = currentLeafStatuses.filter { f => - isSummaryFile(f.getPath) || - !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith(".")) - }.toArray - - dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath)) - metadataStatuses = - leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE) - commonMetadataStatuses = - leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE) - - dataSchema = { - val dataSchema0 = maybeDataSchema - .orElse(readSchema()) - .orElse(maybeMetastoreSchema) - .getOrElse(throw new AnalysisException( - s"Failed to discover schema of Parquet file(s) in the following location(s):\n" + - paths.mkString("\n\t"))) - - // If this Parquet relation is converted from a Hive Metastore table, must reconcile case - // case insensitivity issue and possible schema mismatch (probably caused by schema - // evolution). - maybeMetastoreSchema - .map(ParquetRelation.mergeMetastoreParquetSchema(_, dataSchema0)) - .getOrElse(dataSchema0) - } - } - } - - private def isSummaryFile(file: Path): Boolean = { - file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE || - file.getName == ParquetFileWriter.PARQUET_METADATA_FILE - } - - private def readSchema(): Option[StructType] = { - // Sees which file(s) we need to touch in order to figure out the schema. - // - // Always tries the summary files first if users don't require a merged schema. In this case, - // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row - // groups information, and could be much smaller for large Parquet files with lots of row - // groups. If no summary file is available, falls back to some random part-file. - // - // NOTE: Metadata stored in the summary files are merged from all part-files. However, for - // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know - // how to merge them correctly if some key is associated with different values in different - // part-files. When this happens, Parquet simply gives up generating the summary file. This - // implies that if a summary file presents, then: - // - // 1. Either all part-files have exactly the same Spark SQL schema, or - // 2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus - // their schemas may differ from each other). - // - // Here we tend to be pessimistic and take the second case into account. Basically this means - // we can't trust the summary files if users require a merged schema, and must touch all part- - // files to do the merge. - val filesToTouch = - if (shouldMergeSchemas) { - // Also includes summary files, 'cause there might be empty partition directories. - - // If mergeRespectSummaries config is true, we assume that all part-files are the same for - // their schema with summary files, so we ignore them when merging schema. - // If the config is disabled, which is the default setting, we merge all part-files. - // In this mode, we only need to merge schemas contained in all those summary files. - // You should enable this configuration only if you are very sure that for the parquet - // part-files to read there are corresponding summary files containing correct schema. - - val needMerged: Seq[FileStatus] = - if (mergeRespectSummaries) { - Seq() - } else { - dataStatuses - } - (metadataStatuses ++ commonMetadataStatuses ++ needMerged).toSeq - } else { - // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet - // don't have this. - commonMetadataStatuses.headOption - // Falls back to "_metadata" - .orElse(metadataStatuses.headOption) - // Summary file(s) not found, the Parquet file is either corrupted, or different part- - // files contain conflicting user defined metadata (two or more values are associated - // with a same key in different files). In either case, we fall back to any of the - // first part-file, and just assume all schemas are consistent. - .orElse(dataStatuses.headOption) - .toSeq - } - - assert( - filesToTouch.nonEmpty || maybeDataSchema.isDefined || maybeMetastoreSchema.isDefined, - "No predefined schema found, " + - s"and no Parquet data files or summary files found under ${paths.mkString(", ")}.") - - ParquetRelation.mergeSchemasInParallel(filesToTouch, sqlContext) - } - } -} - -private[sql] object ParquetRelation extends Logging { - // Whether we should merge schemas collected from all Parquet part-files. - private[sql] val MERGE_SCHEMA = "mergeSchema" - - // Hive Metastore schema, used when converting Metastore Parquet tables. This option is only used - // internally. - private[sql] val METASTORE_SCHEMA = "metastoreSchema" - - /** This closure sets various Parquet configurations at both driver side and executor side. */ - private[parquet] def initializeLocalJobFunc( - requiredColumns: Array[String], - filters: Array[Filter], - dataSchema: StructType, - useMetadataCache: Boolean, - parquetFilterPushDown: Boolean, - assumeBinaryIsString: Boolean, - assumeInt96IsTimestamp: Boolean, - followParquetFormatSpec: Boolean)(job: Job): Unit = { - val conf = job.getConfiguration - conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName) - - // Try to push down filters when filter push-down is enabled. - if (parquetFilterPushDown) { - filters - // Collects all converted Parquet filter predicates. Notice that not all predicates can be - // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` - // is used here. - .flatMap(ParquetFilters.createFilter(dataSchema, _)) - .reduceOption(FilterApi.and) - .foreach(ParquetInputFormat.setFilterPredicate(conf, _)) - } - - conf.set(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, { - val requestedSchema = StructType(requiredColumns.map(dataSchema(_))) - CatalystSchemaConverter.checkFieldNames(requestedSchema).json - }) - - conf.set( - RowWriteSupport.SPARK_ROW_SCHEMA, - CatalystSchemaConverter.checkFieldNames(dataSchema).json) - - // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata - conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache) - - // Sets flags for Parquet schema conversion - conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString) - conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp) - conf.setBoolean(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key, followParquetFormatSpec) - } - - /** This closure sets input paths at the driver side. */ - private[parquet] def initializeDriverSideJobFunc( - inputFiles: Array[FileStatus])(job: Job): Unit = { - // We side the input paths at the driver side. - logInfo(s"Reading Parquet file(s) from ${inputFiles.map(_.getPath).mkString(", ")}") - if (inputFiles.nonEmpty) { - FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*) - } - } - - private[parquet] def readSchema( - footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = { - - def parseParquetSchema(schema: MessageType): StructType = { - val converter = new CatalystSchemaConverter( - sqlContext.conf.isParquetBinaryAsString, - sqlContext.conf.isParquetBinaryAsString, - sqlContext.conf.followParquetFormatSpec) - - converter.convert(schema) - } - - val seen = mutable.HashSet[String]() - val finalSchemas: Seq[StructType] = footers.flatMap { footer => - val metadata = footer.getParquetMetadata.getFileMetaData - val serializedSchema = metadata - .getKeyValueMetaData - .toMap - .get(CatalystReadSupport.SPARK_METADATA_KEY) - if (serializedSchema.isEmpty) { - // Falls back to Parquet schema if no Spark SQL schema found. - Some(parseParquetSchema(metadata.getSchema)) - } else if (!seen.contains(serializedSchema.get)) { - seen += serializedSchema.get - - // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to - // whatever is available. - Some(Try(DataType.fromJson(serializedSchema.get)) - .recover { case _: Throwable => - logInfo( - s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " + - "falling back to the deprecated DataType.fromCaseClassString parser.") - DataType.fromCaseClassString(serializedSchema.get) - } - .recover { case cause: Throwable => - logWarning( - s"""Failed to parse serialized Spark schema in Parquet key-value metadata: - |\t$serializedSchema - """.stripMargin, - cause) - } - .map(_.asInstanceOf[StructType]) - .getOrElse { - // Falls back to Parquet schema if Spark SQL schema can't be parsed. - parseParquetSchema(metadata.getSchema) - }) - } else { - None - } - } - - finalSchemas.reduceOption { (left, right) => - try left.merge(right) catch { case e: Throwable => - throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e) - } - } - } - - /** - * Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore - * schema and Parquet schema. - * - * Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the - * schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't - * distinguish binary and string). This method generates a correct schema by merging Metastore - * schema data types and Parquet schema field names. - */ - private[parquet] def mergeMetastoreParquetSchema( - metastoreSchema: StructType, - parquetSchema: StructType): StructType = { - def schemaConflictMessage: String = - s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema: - |${metastoreSchema.prettyJson} - | - |Parquet schema: - |${parquetSchema.prettyJson} - """.stripMargin - - val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema) - - assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage) - - val ordinalMap = metastoreSchema.zipWithIndex.map { - case (field, index) => field.name.toLowerCase -> index - }.toMap - - val reorderedParquetSchema = mergedParquetSchema.sortBy(f => - ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1)) - - StructType(metastoreSchema.zip(reorderedParquetSchema).map { - // Uses Parquet field names but retains Metastore data types. - case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase => - mSchema.copy(name = pSchema.name) - case _ => - throw new SparkException(schemaConflictMessage) - }) - } - - /** - * Returns the original schema from the Parquet file with any missing nullable fields from the - * Hive Metastore schema merged in. - * - * When constructing a DataFrame from a collection of structured data, the resulting object has - * a schema corresponding to the union of the fields present in each element of the collection. - * Spark SQL simply assigns a null value to any field that isn't present for a particular row. - * In some cases, it is possible that a given table partition stored as a Parquet file doesn't - * contain a particular nullable field in its schema despite that field being present in the - * table schema obtained from the Hive Metastore. This method returns a schema representing the - * Parquet file schema along with any additional nullable fields from the Metastore schema - * merged in. - */ - private[parquet] def mergeMissingNullableFields( - metastoreSchema: StructType, - parquetSchema: StructType): StructType = { - val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap - val missingFields = metastoreSchema - .map(_.name.toLowerCase) - .diff(parquetSchema.map(_.name.toLowerCase)) - .map(fieldMap(_)) - .filter(_.nullable) - StructType(parquetSchema ++ missingFields) - } - - /** - * Figures out a merged Parquet schema with a distributed Spark job. - * - * Note that locality is not taken into consideration here because: - * - * 1. For a single Parquet part-file, in most cases the footer only resides in the last block of - * that file. Thus we only need to retrieve the location of the last block. However, Hadoop - * `FileSystem` only provides API to retrieve locations of all blocks, which can be - * potentially expensive. - * - * 2. This optimization is mainly useful for S3, where file metadata operations can be pretty - * slow. And basically locality is not available when using S3 (you can't run computation on - * S3 nodes). - */ - def mergeSchemasInParallel( - filesToTouch: Seq[FileStatus], sqlContext: SQLContext): Option[StructType] = { - val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString - val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp - val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec - val serializedConf = new SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration) - - // HACK ALERT: - // - // Parquet requires `FileStatus`es to read footers. Here we try to send cached `FileStatus`es - // to executor side to avoid fetching them again. However, `FileStatus` is not `Serializable` - // but only `Writable`. What makes it worth, for some reason, `FileStatus` doesn't play well - // with `SerializableWritable[T]` and always causes a weird `IllegalStateException`. These - // facts virtually prevents us to serialize `FileStatus`es. - // - // Since Parquet only relies on path and length information of those `FileStatus`es to read - // footers, here we just extract them (which can be easily serialized), send them to executor - // side, and resemble fake `FileStatus`es there. - val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, f.getLen)) - - // Issues a Spark job to read Parquet schema in parallel. - val partiallyMergedSchemas = - sqlContext - .sparkContext - .parallelize(partialFileStatusInfo) - .mapPartitions { iterator => - // Resembles fake `FileStatus`es with serialized path and length information. - val fakeFileStatuses = iterator.map { case (path, length) => - new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path)) - }.toSeq - - // Skips row group information since we only need the schema - val skipRowGroups = true - - // Reads footers in multi-threaded manner within each task - val footers = - ParquetFileReader.readAllFootersInParallel( - serializedConf.value, fakeFileStatuses, skipRowGroups) - - // Converter used to convert Parquet `MessageType` to Spark SQL `StructType` - val converter = - new CatalystSchemaConverter( - assumeBinaryIsString = assumeBinaryIsString, - assumeInt96IsTimestamp = assumeInt96IsTimestamp, - followParquetFormatSpec = followParquetFormatSpec) - - footers.map { footer => - ParquetRelation.readSchemaFromFooter(footer, converter) - }.reduceOption(_ merge _).iterator - }.collect() - - partiallyMergedSchemas.reduceOption(_ merge _) - } - - /** - * Reads Spark SQL schema from a Parquet footer. If a valid serialized Spark SQL schema string - * can be found in the file metadata, returns the deserialized [[StructType]], otherwise, returns - * a [[StructType]] converted from the [[MessageType]] stored in this footer. - */ - def readSchemaFromFooter( - footer: Footer, converter: CatalystSchemaConverter): StructType = { - val fileMetaData = footer.getParquetMetadata.getFileMetaData - fileMetaData - .getKeyValueMetaData - .toMap - .get(CatalystReadSupport.SPARK_METADATA_KEY) - .flatMap(deserializeSchemaString) - .getOrElse(converter.convert(fileMetaData.getSchema)) - } - - private def deserializeSchemaString(schemaString: String): Option[StructType] = { - // Tries to deserialize the schema string as JSON first, then falls back to the case class - // string parser (data generated by older versions of Spark SQL uses this format). - Try(DataType.fromJson(schemaString).asInstanceOf[StructType]).recover { - case _: Throwable => - logInfo( - s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " + - "falling back to the deprecated DataType.fromCaseClassString parser.") - DataType.fromCaseClassString(schemaString).asInstanceOf[StructType] - }.recoverWith { - case cause: Throwable => - logWarning( - "Failed to parse and ignored serialized Spark schema in " + - s"Parquet key-value metadata:\n\t$schemaString", cause) - Failure(cause) - }.toOption - } - - def enableLogForwarding() { - // Note: the org.apache.parquet.Log class has a static initializer that - // sets the java.util.logging Logger for "org.apache.parquet". This - // checks first to see if there's any handlers already set - // and if not it creates them. If this method executes prior - // to that class being loaded then: - // 1) there's no handlers installed so there's none to - // remove. But when it IS finally loaded the desired affect - // of removing them is circumvented. - // 2) The parquet.Log static initializer calls setUseParentHandlers(false) - // undoing the attempt to override the logging here. - // - // Therefore we need to force the class to be loaded. - // This should really be resolved by Parquet. - Utils.classForName(classOf[ParquetLog].getName) - - // Note: Logger.getLogger("parquet") has a default logger - // that appends to Console which needs to be cleared. - val parquetLogger = JLogger.getLogger(classOf[ParquetLog].getPackage.getName) - parquetLogger.getHandlers.foreach(parquetLogger.removeHandler) - parquetLogger.setUseParentHandlers(true) - - // Disables a WARN log message in ParquetOutputCommitter. We first ensure that - // ParquetOutputCommitter is loaded and the static LOG field gets initialized. - // See https://issues.apache.org/jira/browse/SPARK-5968 for details - Utils.classForName(classOf[ParquetOutputCommitter].getName) - JLogger.getLogger(classOf[ParquetOutputCommitter].getName).setLevel(Level.OFF) - - // Similar as above, disables a unnecessary WARN log message in ParquetRecordReader. - // See https://issues.apache.org/jira/browse/PARQUET-220 for details - Utils.classForName(classOf[ParquetRecordReader[_]].getName) - JLogger.getLogger(classOf[ParquetRecordReader[_]].getName).setLevel(Level.OFF) - } - - // The parquet compression short names - val shortParquetCompressionCodecNames = Map( - "NONE" -> CompressionCodecName.UNCOMPRESSED, - "UNCOMPRESSED" -> CompressionCodecName.UNCOMPRESSED, - "SNAPPY" -> CompressionCodecName.SNAPPY, - "GZIP" -> CompressionCodecName.GZIP, - "LZO" -> CompressionCodecName.LZO) -}
http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala deleted file mode 100644 index 9cd0250..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ /dev/null @@ -1,322 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import java.math.BigInteger -import java.nio.{ByteBuffer, ByteOrder} -import java.util.{HashMap => JHashMap} - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.column.ParquetProperties -import org.apache.parquet.hadoop.ParquetOutputFormat -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.io.api._ - -import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String - -/** - * A `parquet.hadoop.api.WriteSupport` for Row objects. - */ -private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Logging { - - private[parquet] var writer: RecordConsumer = null - private[parquet] var attributes: Array[Attribute] = null - - override def init(configuration: Configuration): WriteSupport.WriteContext = { - val origAttributesStr: String = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA) - val metadata = new JHashMap[String, String]() - metadata.put(CatalystReadSupport.SPARK_METADATA_KEY, origAttributesStr) - - if (attributes == null) { - attributes = ParquetTypesConverter.convertFromString(origAttributesStr).toArray - } - - log.debug(s"write support initialized for requested schema $attributes") - ParquetRelation.enableLogForwarding() - new WriteSupport.WriteContext(ParquetTypesConverter.convertFromAttributes(attributes), metadata) - } - - override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { - writer = recordConsumer - log.debug(s"preparing for write with schema $attributes") - } - - override def write(record: InternalRow): Unit = { - val attributesSize = attributes.size - if (attributesSize > record.numFields) { - throw new IndexOutOfBoundsException("Trying to write more fields than contained in row " + - s"($attributesSize > ${record.numFields})") - } - - var index = 0 - writer.startMessage() - while(index < attributesSize) { - // null values indicate optional fields but we do not check currently - if (!record.isNullAt(index)) { - writer.startField(attributes(index).name, index) - writeValue(attributes(index).dataType, record.get(index, attributes(index).dataType)) - writer.endField(attributes(index).name, index) - } - index = index + 1 - } - writer.endMessage() - } - - private[parquet] def writeValue(schema: DataType, value: Any): Unit = { - if (value != null) { - schema match { - case t: UserDefinedType[_] => writeValue(t.sqlType, value) - case t @ ArrayType(_, _) => writeArray( - t, - value.asInstanceOf[CatalystConverter.ArrayScalaType]) - case t @ MapType(_, _, _) => writeMap( - t, - value.asInstanceOf[CatalystConverter.MapScalaType]) - case t @ StructType(_) => writeStruct( - t, - value.asInstanceOf[CatalystConverter.StructScalaType]) - case _ => writePrimitive(schema.asInstanceOf[AtomicType], value) - } - } - } - - private[parquet] def writePrimitive(schema: DataType, value: Any): Unit = { - if (value != null) { - schema match { - case BooleanType => writer.addBoolean(value.asInstanceOf[Boolean]) - case ByteType => writer.addInteger(value.asInstanceOf[Byte]) - case ShortType => writer.addInteger(value.asInstanceOf[Short]) - case IntegerType | DateType => writer.addInteger(value.asInstanceOf[Int]) - case LongType => writer.addLong(value.asInstanceOf[Long]) - case TimestampType => writeTimestamp(value.asInstanceOf[Long]) - case FloatType => writer.addFloat(value.asInstanceOf[Float]) - case DoubleType => writer.addDouble(value.asInstanceOf[Double]) - case StringType => writer.addBinary( - Binary.fromByteArray(value.asInstanceOf[UTF8String].getBytes)) - case BinaryType => writer.addBinary( - Binary.fromByteArray(value.asInstanceOf[Array[Byte]])) - case DecimalType.Fixed(precision, _) => - writeDecimal(value.asInstanceOf[Decimal], precision) - case _ => sys.error(s"Do not know how to writer $schema to consumer") - } - } - } - - private[parquet] def writeStruct( - schema: StructType, - struct: CatalystConverter.StructScalaType): Unit = { - if (struct != null) { - val fields = schema.fields.toArray - writer.startGroup() - var i = 0 - while(i < fields.length) { - if (!struct.isNullAt(i)) { - writer.startField(fields(i).name, i) - writeValue(fields(i).dataType, struct.get(i, fields(i).dataType)) - writer.endField(fields(i).name, i) - } - i = i + 1 - } - writer.endGroup() - } - } - - private[parquet] def writeArray( - schema: ArrayType, - array: CatalystConverter.ArrayScalaType): Unit = { - val elementType = schema.elementType - writer.startGroup() - if (array.numElements() > 0) { - if (schema.containsNull) { - writer.startField(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, 0) - var i = 0 - while (i < array.numElements()) { - writer.startGroup() - if (!array.isNullAt(i)) { - writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) - writeValue(elementType, array.get(i, elementType)) - writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) - } - writer.endGroup() - i = i + 1 - } - writer.endField(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, 0) - } else { - writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) - var i = 0 - while (i < array.numElements()) { - writeValue(elementType, array.get(i, elementType)) - i = i + 1 - } - writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) - } - } - writer.endGroup() - } - - private[parquet] def writeMap( - schema: MapType, - map: CatalystConverter.MapScalaType): Unit = { - writer.startGroup() - val length = map.numElements() - if (length > 0) { - writer.startField(CatalystConverter.MAP_SCHEMA_NAME, 0) - map.foreach(schema.keyType, schema.valueType, (key, value) => { - writer.startGroup() - writer.startField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0) - writeValue(schema.keyType, key) - writer.endField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0) - if (value != null) { - writer.startField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1) - writeValue(schema.valueType, value) - writer.endField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1) - } - writer.endGroup() - }) - writer.endField(CatalystConverter.MAP_SCHEMA_NAME, 0) - } - writer.endGroup() - } - - // Scratch array used to write decimals as fixed-length byte array - private[this] var reusableDecimalBytes = new Array[Byte](16) - - private[parquet] def writeDecimal(decimal: Decimal, precision: Int): Unit = { - val numBytes = CatalystSchemaConverter.minBytesForPrecision(precision) - - def longToBinary(unscaled: Long): Binary = { - var i = 0 - var shift = 8 * (numBytes - 1) - while (i < numBytes) { - reusableDecimalBytes(i) = (unscaled >> shift).toByte - i += 1 - shift -= 8 - } - Binary.fromByteArray(reusableDecimalBytes, 0, numBytes) - } - - def bigIntegerToBinary(unscaled: BigInteger): Binary = { - unscaled.toByteArray match { - case bytes if bytes.length == numBytes => - Binary.fromByteArray(bytes) - - case bytes if bytes.length <= reusableDecimalBytes.length => - val signedByte = (if (bytes.head < 0) -1 else 0).toByte - java.util.Arrays.fill(reusableDecimalBytes, 0, numBytes - bytes.length, signedByte) - System.arraycopy(bytes, 0, reusableDecimalBytes, numBytes - bytes.length, bytes.length) - Binary.fromByteArray(reusableDecimalBytes, 0, numBytes) - - case bytes => - reusableDecimalBytes = new Array[Byte](bytes.length) - bigIntegerToBinary(unscaled) - } - } - - val binary = if (numBytes <= 8) { - longToBinary(decimal.toUnscaledLong) - } else { - bigIntegerToBinary(decimal.toJavaBigDecimal.unscaledValue()) - } - - writer.addBinary(binary) - } - - // array used to write Timestamp as Int96 (fixed-length binary) - private[this] val int96buf = new Array[Byte](12) - - private[parquet] def writeTimestamp(ts: Long): Unit = { - val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(ts) - val buf = ByteBuffer.wrap(int96buf) - buf.order(ByteOrder.LITTLE_ENDIAN) - buf.putLong(timeOfDayNanos) - buf.putInt(julianDay) - writer.addBinary(Binary.fromByteArray(int96buf)) - } -} - -// Optimized for non-nested rows -private[parquet] class MutableRowWriteSupport extends RowWriteSupport { - override def write(record: InternalRow): Unit = { - val attributesSize = attributes.size - if (attributesSize > record.numFields) { - throw new IndexOutOfBoundsException("Trying to write more fields than contained in row " + - s"($attributesSize > ${record.numFields})") - } - - var index = 0 - writer.startMessage() - while(index < attributesSize) { - // null values indicate optional fields but we do not check currently - if (!record.isNullAt(index) && !record.isNullAt(index)) { - writer.startField(attributes(index).name, index) - consumeType(attributes(index).dataType, record, index) - writer.endField(attributes(index).name, index) - } - index = index + 1 - } - writer.endMessage() - } - - private def consumeType( - ctype: DataType, - record: InternalRow, - index: Int): Unit = { - ctype match { - case BooleanType => writer.addBoolean(record.getBoolean(index)) - case ByteType => writer.addInteger(record.getByte(index)) - case ShortType => writer.addInteger(record.getShort(index)) - case IntegerType | DateType => writer.addInteger(record.getInt(index)) - case LongType => writer.addLong(record.getLong(index)) - case TimestampType => writeTimestamp(record.getLong(index)) - case FloatType => writer.addFloat(record.getFloat(index)) - case DoubleType => writer.addDouble(record.getDouble(index)) - case StringType => - writer.addBinary(Binary.fromByteArray(record.getUTF8String(index).getBytes)) - case BinaryType => - writer.addBinary(Binary.fromByteArray(record.getBinary(index))) - case DecimalType.Fixed(precision, scale) => - writeDecimal(record.getDecimal(index, precision, scale), precision) - case _ => sys.error(s"Unsupported datatype $ctype, cannot write to consumer") - } - } -} - -private[parquet] object RowWriteSupport { - val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes" - - def getSchema(configuration: Configuration): Seq[Attribute] = { - val schemaString = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA) - if (schemaString == null) { - throw new RuntimeException("Missing schema!") - } - ParquetTypesConverter.convertFromString(schemaString) - } - - def setSchema(schema: Seq[Attribute], configuration: Configuration) { - val encoded = ParquetTypesConverter.convertToString(schema) - configuration.set(SPARK_ROW_SCHEMA, encoded) - configuration.set( - ParquetOutputFormat.WRITER_VERSION, - ParquetProperties.WriterVersion.PARQUET_1_0.toString) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala deleted file mode 100644 index 3854f5b..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import java.io.IOException - -import scala.collection.JavaConversions._ -import scala.util.Try - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.mapreduce.Job -import org.apache.parquet.format.converter.ParquetMetadataConverter -import org.apache.parquet.hadoop.metadata.{FileMetaData, ParquetMetadata} -import org.apache.parquet.hadoop.util.ContextUtil -import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter} -import org.apache.parquet.schema.MessageType - -import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.types._ - - -private[parquet] object ParquetTypesConverter extends Logging { - def isPrimitiveType(ctype: DataType): Boolean = ctype match { - case _: NumericType | BooleanType | DateType | TimestampType | StringType | BinaryType => true - case _ => false - } - - /** - * Compute the FIXED_LEN_BYTE_ARRAY length needed to represent a given DECIMAL precision. - */ - private[parquet] val BYTES_FOR_PRECISION = Array.tabulate[Int](38) { precision => - var length = 1 - while (math.pow(2.0, 8 * length - 1) < math.pow(10.0, precision)) { - length += 1 - } - length - } - - def convertFromAttributes(attributes: Seq[Attribute]): MessageType = { - val converter = new CatalystSchemaConverter() - converter.convert(StructType.fromAttributes(attributes)) - } - - def convertFromString(string: String): Seq[Attribute] = { - Try(DataType.fromJson(string)).getOrElse(DataType.fromCaseClassString(string)) match { - case s: StructType => s.toAttributes - case other => sys.error(s"Can convert $string to row") - } - } - - def convertToString(schema: Seq[Attribute]): String = { - schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName) - StructType.fromAttributes(schema).json - } - - def writeMetaData(attributes: Seq[Attribute], origPath: Path, conf: Configuration): Unit = { - if (origPath == null) { - throw new IllegalArgumentException("Unable to write Parquet metadata: path is null") - } - val fs = origPath.getFileSystem(conf) - if (fs == null) { - throw new IllegalArgumentException( - s"Unable to write Parquet metadata: path $origPath is incorrectly formatted") - } - val path = origPath.makeQualified(fs) - if (fs.exists(path) && !fs.getFileStatus(path).isDir) { - throw new IllegalArgumentException(s"Expected to write to directory $path but found file") - } - val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE) - if (fs.exists(metadataPath)) { - try { - fs.delete(metadataPath, true) - } catch { - case e: IOException => - throw new IOException(s"Unable to delete previous PARQUET_METADATA_FILE at $metadataPath") - } - } - val extraMetadata = new java.util.HashMap[String, String]() - extraMetadata.put( - CatalystReadSupport.SPARK_METADATA_KEY, - ParquetTypesConverter.convertToString(attributes)) - // TODO: add extra data, e.g., table name, date, etc.? - - val parquetSchema: MessageType = ParquetTypesConverter.convertFromAttributes(attributes) - val metaData: FileMetaData = new FileMetaData( - parquetSchema, - extraMetadata, - "Spark") - - ParquetRelation.enableLogForwarding() - ParquetFileWriter.writeMetadataFile( - conf, - path, - new Footer(path, new ParquetMetadata(metaData, Nil)) :: Nil) - } - - /** - * Try to read Parquet metadata at the given Path. We first see if there is a summary file - * in the parent directory. If so, this is used. Else we read the actual footer at the given - * location. - * @param origPath The path at which we expect one (or more) Parquet files. - * @param configuration The Hadoop configuration to use. - * @return The `ParquetMetadata` containing among other things the schema. - */ - def readMetaData(origPath: Path, configuration: Option[Configuration]): ParquetMetadata = { - if (origPath == null) { - throw new IllegalArgumentException("Unable to read Parquet metadata: path is null") - } - val job = new Job() - val conf = configuration.getOrElse(ContextUtil.getConfiguration(job)) - val fs: FileSystem = origPath.getFileSystem(conf) - if (fs == null) { - throw new IllegalArgumentException(s"Incorrectly formatted Parquet metadata path $origPath") - } - val path = origPath.makeQualified(fs) - - val children = - fs - .globStatus(path) - .flatMap { status => if (status.isDir) fs.listStatus(status.getPath) else List(status) } - .filterNot { status => - val name = status.getPath.getName - (name(0) == '.' || name(0) == '_') && name != ParquetFileWriter.PARQUET_METADATA_FILE - } - - ParquetRelation.enableLogForwarding() - - // NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row - // groups. Since Parquet schema is replicated among all row groups, we only need to touch a - // single row group to read schema related metadata. Notice that we are making assumptions that - // all data in a single Parquet file have the same schema, which is normally true. - children - // Try any non-"_metadata" file first... - .find(_.getPath.getName != ParquetFileWriter.PARQUET_METADATA_FILE) - // ... and fallback to "_metadata" if no such file exists (which implies the Parquet file is - // empty, thus normally the "_metadata" file is expected to be fairly small). - .orElse(children.find(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)) - .map(ParquetFileReader.readFooter(conf, _, ParquetMetadataConverter.NO_FILTER)) - .getOrElse( - throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path")) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 6bcabba..2f8417a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -43,19 +43,24 @@ import org.apache.spark.util.SerializableConfiguration * This allows users to give the data source alias as the format type over the fully qualified * class name. * - * ex: parquet.DefaultSource.format = "parquet". - * * A new instance of this class with be instantiated each time a DDL call is made. + * + * @since 1.5.0 */ @DeveloperApi trait DataSourceRegister { /** * The string that represents the format that this data source provider uses. This is - * overridden by children to provide a nice alias for the data source, - * ex: override def format(): String = "parquet" + * overridden by children to provide a nice alias for the data source. For example: + * + * {{{ + * override def format(): String = "parquet" + * }}} + * + * @since 1.5.0 */ - def format(): String + def shortName(): String } /** http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/ui/AllExecutionsPage.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/ui/AllExecutionsPage.scala deleted file mode 100644 index cb7ca60..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/ui/AllExecutionsPage.scala +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.ui - -import javax.servlet.http.HttpServletRequest - -import scala.collection.mutable -import scala.xml.Node - -import org.apache.commons.lang3.StringEscapeUtils - -import org.apache.spark.Logging -import org.apache.spark.ui.{UIUtils, WebUIPage} - -private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with Logging { - - private val listener = parent.listener - - override def render(request: HttpServletRequest): Seq[Node] = { - val currentTime = System.currentTimeMillis() - val content = listener.synchronized { - val _content = mutable.ListBuffer[Node]() - if (listener.getRunningExecutions.nonEmpty) { - _content ++= - new RunningExecutionTable( - parent, "Running Queries", currentTime, - listener.getRunningExecutions.sortBy(_.submissionTime).reverse).toNodeSeq - } - if (listener.getCompletedExecutions.nonEmpty) { - _content ++= - new CompletedExecutionTable( - parent, "Completed Queries", currentTime, - listener.getCompletedExecutions.sortBy(_.submissionTime).reverse).toNodeSeq - } - if (listener.getFailedExecutions.nonEmpty) { - _content ++= - new FailedExecutionTable( - parent, "Failed Queries", currentTime, - listener.getFailedExecutions.sortBy(_.submissionTime).reverse).toNodeSeq - } - _content - } - UIUtils.headerSparkPage("SQL", content, parent, Some(5000)) - } -} - -private[ui] abstract class ExecutionTable( - parent: SQLTab, - tableId: String, - tableName: String, - currentTime: Long, - executionUIDatas: Seq[SQLExecutionUIData], - showRunningJobs: Boolean, - showSucceededJobs: Boolean, - showFailedJobs: Boolean) { - - protected def baseHeader: Seq[String] = Seq( - "ID", - "Description", - "Submitted", - "Duration") - - protected def header: Seq[String] - - protected def row(currentTime: Long, executionUIData: SQLExecutionUIData): Seq[Node] = { - val submissionTime = executionUIData.submissionTime - val duration = executionUIData.completionTime.getOrElse(currentTime) - submissionTime - - val runningJobs = executionUIData.runningJobs.map { jobId => - <a href={jobURL(jobId)}>{jobId.toString}</a><br/> - } - val succeededJobs = executionUIData.succeededJobs.sorted.map { jobId => - <a href={jobURL(jobId)}>{jobId.toString}</a><br/> - } - val failedJobs = executionUIData.failedJobs.sorted.map { jobId => - <a href={jobURL(jobId)}>{jobId.toString}</a><br/> - } - <tr> - <td> - {executionUIData.executionId.toString} - </td> - <td> - {descriptionCell(executionUIData)} - </td> - <td sorttable_customkey={submissionTime.toString}> - {UIUtils.formatDate(submissionTime)} - </td> - <td sorttable_customkey={duration.toString}> - {UIUtils.formatDuration(duration)} - </td> - {if (showRunningJobs) { - <td> - {runningJobs} - </td> - }} - {if (showSucceededJobs) { - <td> - {succeededJobs} - </td> - }} - {if (showFailedJobs) { - <td> - {failedJobs} - </td> - }} - {detailCell(executionUIData.physicalPlanDescription)} - </tr> - } - - private def descriptionCell(execution: SQLExecutionUIData): Seq[Node] = { - val details = if (execution.details.nonEmpty) { - <span onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')" - class="expand-details"> - +details - </span> ++ - <div class="stage-details collapsed"> - <pre>{execution.details}</pre> - </div> - } else { - Nil - } - - val desc = { - <a href={executionURL(execution.executionId)}>{execution.description}</a> - } - - <div>{desc} {details}</div> - } - - private def detailCell(physicalPlan: String): Seq[Node] = { - val isMultiline = physicalPlan.indexOf('\n') >= 0 - val summary = StringEscapeUtils.escapeHtml4( - if (isMultiline) { - physicalPlan.substring(0, physicalPlan.indexOf('\n')) - } else { - physicalPlan - }) - val details = if (isMultiline) { - // scalastyle:off - <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')" - class="expand-details"> - +details - </span> ++ - <div class="stacktrace-details collapsed"> - <pre>{physicalPlan}</pre> - </div> - // scalastyle:on - } else { - "" - } - <td>{summary}{details}</td> - } - - def toNodeSeq: Seq[Node] = { - <div> - <h4>{tableName}</h4> - {UIUtils.listingTable[SQLExecutionUIData]( - header, row(currentTime, _), executionUIDatas, id = Some(tableId))} - </div> - } - - private def jobURL(jobId: Long): String = - "%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), jobId) - - private def executionURL(executionID: Long): String = - s"${UIUtils.prependBaseUri(parent.basePath)}/${parent.prefix}/execution?id=$executionID" -} - -private[ui] class RunningExecutionTable( - parent: SQLTab, - tableName: String, - currentTime: Long, - executionUIDatas: Seq[SQLExecutionUIData]) - extends ExecutionTable( - parent, - "running-execution-table", - tableName, - currentTime, - executionUIDatas, - showRunningJobs = true, - showSucceededJobs = true, - showFailedJobs = true) { - - override protected def header: Seq[String] = - baseHeader ++ Seq("Running Jobs", "Succeeded Jobs", "Failed Jobs", "Detail") -} - -private[ui] class CompletedExecutionTable( - parent: SQLTab, - tableName: String, - currentTime: Long, - executionUIDatas: Seq[SQLExecutionUIData]) - extends ExecutionTable( - parent, - "completed-execution-table", - tableName, - currentTime, - executionUIDatas, - showRunningJobs = false, - showSucceededJobs = true, - showFailedJobs = false) { - - override protected def header: Seq[String] = baseHeader ++ Seq("Jobs", "Detail") -} - -private[ui] class FailedExecutionTable( - parent: SQLTab, - tableName: String, - currentTime: Long, - executionUIDatas: Seq[SQLExecutionUIData]) - extends ExecutionTable( - parent, - "failed-execution-table", - tableName, - currentTime, - executionUIDatas, - showRunningJobs = false, - showSucceededJobs = true, - showFailedJobs = true) { - - override protected def header: Seq[String] = - baseHeader ++ Seq("Succeeded Jobs", "Failed Jobs", "Detail") -} http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/ui/ExecutionPage.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ui/ExecutionPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/ui/ExecutionPage.scala deleted file mode 100644 index 52ddf99..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/ui/ExecutionPage.scala +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.ui - -import javax.servlet.http.HttpServletRequest - -import scala.xml.{Node, Unparsed} - -import org.apache.commons.lang3.StringEscapeUtils - -import org.apache.spark.Logging -import org.apache.spark.ui.{UIUtils, WebUIPage} - -private[sql] class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging { - - private val listener = parent.listener - - override def render(request: HttpServletRequest): Seq[Node] = listener.synchronized { - val parameterExecutionId = request.getParameter("id") - require(parameterExecutionId != null && parameterExecutionId.nonEmpty, - "Missing execution id parameter") - - val executionId = parameterExecutionId.toLong - val content = listener.getExecution(executionId).map { executionUIData => - val currentTime = System.currentTimeMillis() - val duration = - executionUIData.completionTime.getOrElse(currentTime) - executionUIData.submissionTime - - val summary = - <div> - <ul class="unstyled"> - <li> - <strong>Submitted Time: </strong>{UIUtils.formatDate(executionUIData.submissionTime)} - </li> - <li> - <strong>Duration: </strong>{UIUtils.formatDuration(duration)} - </li> - {if (executionUIData.runningJobs.nonEmpty) { - <li> - <strong>Running Jobs: </strong> - {executionUIData.runningJobs.sorted.map { jobId => - <a href={jobURL(jobId)}>{jobId.toString}</a><span> </span> - }} - </li> - }} - {if (executionUIData.succeededJobs.nonEmpty) { - <li> - <strong>Succeeded Jobs: </strong> - {executionUIData.succeededJobs.sorted.map { jobId => - <a href={jobURL(jobId)}>{jobId.toString}</a><span> </span> - }} - </li> - }} - {if (executionUIData.failedJobs.nonEmpty) { - <li> - <strong>Failed Jobs: </strong> - {executionUIData.failedJobs.sorted.map { jobId => - <a href={jobURL(jobId)}>{jobId.toString}</a><span> </span> - }} - </li> - }} - <li> - <strong>Detail: </strong><br/> - <pre>{executionUIData.physicalPlanDescription}</pre> - </li> - </ul> - </div> - - val metrics = listener.getExecutionMetrics(executionId) - - summary ++ planVisualization(metrics, executionUIData.physicalPlanGraph) - }.getOrElse { - <div>No information to display for Plan {executionId}</div> - } - - UIUtils.headerSparkPage(s"Details for Query $executionId", content, parent, Some(5000)) - } - - - private def planVisualizationResources: Seq[Node] = { - // scalastyle:off - <link rel="stylesheet" href={UIUtils.prependBaseUri("/static/sql/spark-sql-viz.css")} type="text/css"/> - <script src={UIUtils.prependBaseUri("/static/d3.min.js")}></script> - <script src={UIUtils.prependBaseUri("/static/dagre-d3.min.js")}></script> - <script src={UIUtils.prependBaseUri("/static/graphlib-dot.min.js")}></script> - <script src={UIUtils.prependBaseUri("/static/sql/spark-sql-viz.js")}></script> - // scalastyle:on - } - - private def planVisualization(metrics: Map[Long, Any], graph: SparkPlanGraph): Seq[Node] = { - val metadata = graph.nodes.flatMap { node => - val nodeId = s"plan-meta-data-${node.id}" - <div id={nodeId}>{node.desc}</div> - } - - <div> - <div id="plan-viz-graph"></div> - <div id="plan-viz-metadata" style="display:none"> - <div class="dot-file"> - {graph.makeDotFile(metrics)} - </div> - <div id="plan-viz-metadata-size">{graph.nodes.size.toString}</div> - {metadata} - </div> - {planVisualizationResources} - <script>$(function(){{ renderPlanViz(); }})</script> - </div> - } - - private def jobURL(jobId: Long): String = - "%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), jobId) -} http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLListener.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLListener.scala deleted file mode 100644 index 2fd4fc6..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLListener.scala +++ /dev/null @@ -1,355 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.ui - -import scala.collection.mutable - -import com.google.common.annotations.VisibleForTesting - -import org.apache.spark.{JobExecutionStatus, Logging} -import org.apache.spark.executor.TaskMetrics -import org.apache.spark.scheduler._ -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.execution.SQLExecution -import org.apache.spark.sql.metric.{SQLMetricParam, SQLMetricValue} - -private[sql] class SQLListener(sqlContext: SQLContext) extends SparkListener with Logging { - - private val retainedExecutions = - sqlContext.sparkContext.conf.getInt("spark.sql.ui.retainedExecutions", 1000) - - private val activeExecutions = mutable.HashMap[Long, SQLExecutionUIData]() - - // Old data in the following fields must be removed in "trimExecutionsIfNecessary". - // If adding new fields, make sure "trimExecutionsIfNecessary" can clean up old data - private val _executionIdToData = mutable.HashMap[Long, SQLExecutionUIData]() - - /** - * Maintain the relation between job id and execution id so that we can get the execution id in - * the "onJobEnd" method. - */ - private val _jobIdToExecutionId = mutable.HashMap[Long, Long]() - - private val _stageIdToStageMetrics = mutable.HashMap[Long, SQLStageMetrics]() - - private val failedExecutions = mutable.ListBuffer[SQLExecutionUIData]() - - private val completedExecutions = mutable.ListBuffer[SQLExecutionUIData]() - - @VisibleForTesting - def executionIdToData: Map[Long, SQLExecutionUIData] = synchronized { - _executionIdToData.toMap - } - - @VisibleForTesting - def jobIdToExecutionId: Map[Long, Long] = synchronized { - _jobIdToExecutionId.toMap - } - - @VisibleForTesting - def stageIdToStageMetrics: Map[Long, SQLStageMetrics] = synchronized { - _stageIdToStageMetrics.toMap - } - - private def trimExecutionsIfNecessary( - executions: mutable.ListBuffer[SQLExecutionUIData]): Unit = { - if (executions.size > retainedExecutions) { - val toRemove = math.max(retainedExecutions / 10, 1) - executions.take(toRemove).foreach { execution => - for (executionUIData <- _executionIdToData.remove(execution.executionId)) { - for (jobId <- executionUIData.jobs.keys) { - _jobIdToExecutionId.remove(jobId) - } - for (stageId <- executionUIData.stages) { - _stageIdToStageMetrics.remove(stageId) - } - } - } - executions.trimStart(toRemove) - } - } - - override def onJobStart(jobStart: SparkListenerJobStart): Unit = { - val executionIdString = jobStart.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) - if (executionIdString == null) { - // This is not a job created by SQL - return - } - val executionId = executionIdString.toLong - val jobId = jobStart.jobId - val stageIds = jobStart.stageIds - - synchronized { - activeExecutions.get(executionId).foreach { executionUIData => - executionUIData.jobs(jobId) = JobExecutionStatus.RUNNING - executionUIData.stages ++= stageIds - stageIds.foreach(stageId => - _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId = 0)) - _jobIdToExecutionId(jobId) = executionId - } - } - } - - override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized { - val jobId = jobEnd.jobId - for (executionId <- _jobIdToExecutionId.get(jobId); - executionUIData <- _executionIdToData.get(executionId)) { - jobEnd.jobResult match { - case JobSucceeded => executionUIData.jobs(jobId) = JobExecutionStatus.SUCCEEDED - case JobFailed(_) => executionUIData.jobs(jobId) = JobExecutionStatus.FAILED - } - if (executionUIData.completionTime.nonEmpty && !executionUIData.hasRunningJobs) { - // We are the last job of this execution, so mark the execution as finished. Note that - // `onExecutionEnd` also does this, but currently that can be called before `onJobEnd` - // since these are called on different threads. - markExecutionFinished(executionId) - } - } - } - - override def onExecutorMetricsUpdate( - executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = synchronized { - for ((taskId, stageId, stageAttemptID, metrics) <- executorMetricsUpdate.taskMetrics) { - updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, metrics, finishTask = false) - } - } - - override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized { - val stageId = stageSubmitted.stageInfo.stageId - val stageAttemptId = stageSubmitted.stageInfo.attemptId - // Always override metrics for old stage attempt - _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId) - } - - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized { - updateTaskAccumulatorValues( - taskEnd.taskInfo.taskId, - taskEnd.stageId, - taskEnd.stageAttemptId, - taskEnd.taskMetrics, - finishTask = true) - } - - /** - * Update the accumulator values of a task with the latest metrics for this task. This is called - * every time we receive an executor heartbeat or when a task finishes. - */ - private def updateTaskAccumulatorValues( - taskId: Long, - stageId: Int, - stageAttemptID: Int, - metrics: TaskMetrics, - finishTask: Boolean): Unit = { - if (metrics == null) { - return - } - - _stageIdToStageMetrics.get(stageId) match { - case Some(stageMetrics) => - if (stageAttemptID < stageMetrics.stageAttemptId) { - // A task of an old stage attempt. Because a new stage is submitted, we can ignore it. - } else if (stageAttemptID > stageMetrics.stageAttemptId) { - logWarning(s"A task should not have a higher stageAttemptID ($stageAttemptID) then " + - s"what we have seen (${stageMetrics.stageAttemptId}})") - } else { - // TODO We don't know the attemptId. Currently, what we can do is overriding the - // accumulator updates. However, if there are two same task are running, such as - // speculation, the accumulator updates will be overriding by different task attempts, - // the results will be weird. - stageMetrics.taskIdToMetricUpdates.get(taskId) match { - case Some(taskMetrics) => - if (finishTask) { - taskMetrics.finished = true - taskMetrics.accumulatorUpdates = metrics.accumulatorUpdates() - } else if (!taskMetrics.finished) { - taskMetrics.accumulatorUpdates = metrics.accumulatorUpdates() - } else { - // If a task is finished, we should not override with accumulator updates from - // heartbeat reports - } - case None => - // TODO Now just set attemptId to 0. Should fix here when we can get the attempt - // id from SparkListenerExecutorMetricsUpdate - stageMetrics.taskIdToMetricUpdates(taskId) = new SQLTaskMetrics( - attemptId = 0, finished = finishTask, metrics.accumulatorUpdates()) - } - } - case None => - // This execution and its stage have been dropped - } - } - - def onExecutionStart( - executionId: Long, - description: String, - details: String, - physicalPlanDescription: String, - physicalPlanGraph: SparkPlanGraph, - time: Long): Unit = { - val sqlPlanMetrics = physicalPlanGraph.nodes.flatMap { node => - node.metrics.map(metric => metric.accumulatorId -> metric) - } - - val executionUIData = new SQLExecutionUIData(executionId, description, details, - physicalPlanDescription, physicalPlanGraph, sqlPlanMetrics.toMap, time) - synchronized { - activeExecutions(executionId) = executionUIData - _executionIdToData(executionId) = executionUIData - } - } - - def onExecutionEnd(executionId: Long, time: Long): Unit = synchronized { - _executionIdToData.get(executionId).foreach { executionUIData => - executionUIData.completionTime = Some(time) - if (!executionUIData.hasRunningJobs) { - // onExecutionEnd happens after all "onJobEnd"s - // So we should update the execution lists. - markExecutionFinished(executionId) - } else { - // There are some running jobs, onExecutionEnd happens before some "onJobEnd"s. - // Then we don't if the execution is successful, so let the last onJobEnd updates the - // execution lists. - } - } - } - - private def markExecutionFinished(executionId: Long): Unit = { - activeExecutions.remove(executionId).foreach { executionUIData => - if (executionUIData.isFailed) { - failedExecutions += executionUIData - trimExecutionsIfNecessary(failedExecutions) - } else { - completedExecutions += executionUIData - trimExecutionsIfNecessary(completedExecutions) - } - } - } - - def getRunningExecutions: Seq[SQLExecutionUIData] = synchronized { - activeExecutions.values.toSeq - } - - def getFailedExecutions: Seq[SQLExecutionUIData] = synchronized { - failedExecutions - } - - def getCompletedExecutions: Seq[SQLExecutionUIData] = synchronized { - completedExecutions - } - - def getExecution(executionId: Long): Option[SQLExecutionUIData] = synchronized { - _executionIdToData.get(executionId) - } - - /** - * Get all accumulator updates from all tasks which belong to this execution and merge them. - */ - def getExecutionMetrics(executionId: Long): Map[Long, Any] = synchronized { - _executionIdToData.get(executionId) match { - case Some(executionUIData) => - val accumulatorUpdates = { - for (stageId <- executionUIData.stages; - stageMetrics <- _stageIdToStageMetrics.get(stageId).toIterable; - taskMetrics <- stageMetrics.taskIdToMetricUpdates.values; - accumulatorUpdate <- taskMetrics.accumulatorUpdates.toSeq) yield { - accumulatorUpdate - } - }.filter { case (id, _) => executionUIData.accumulatorMetrics.contains(id) } - mergeAccumulatorUpdates(accumulatorUpdates, accumulatorId => - executionUIData.accumulatorMetrics(accumulatorId).metricParam). - mapValues(_.asInstanceOf[SQLMetricValue[_]].value) - case None => - // This execution has been dropped - Map.empty - } - } - - private def mergeAccumulatorUpdates( - accumulatorUpdates: Seq[(Long, Any)], - paramFunc: Long => SQLMetricParam[SQLMetricValue[Any], Any]): Map[Long, Any] = { - accumulatorUpdates.groupBy(_._1).map { case (accumulatorId, values) => - val param = paramFunc(accumulatorId) - (accumulatorId, - values.map(_._2.asInstanceOf[SQLMetricValue[Any]]).foldLeft(param.zero)(param.addInPlace)) - } - } - -} - -/** - * Represent all necessary data for an execution that will be used in Web UI. - */ -private[ui] class SQLExecutionUIData( - val executionId: Long, - val description: String, - val details: String, - val physicalPlanDescription: String, - val physicalPlanGraph: SparkPlanGraph, - val accumulatorMetrics: Map[Long, SQLPlanMetric], - val submissionTime: Long, - var completionTime: Option[Long] = None, - val jobs: mutable.HashMap[Long, JobExecutionStatus] = mutable.HashMap.empty, - val stages: mutable.ArrayBuffer[Int] = mutable.ArrayBuffer()) { - - /** - * Return whether there are running jobs in this execution. - */ - def hasRunningJobs: Boolean = jobs.values.exists(_ == JobExecutionStatus.RUNNING) - - /** - * Return whether there are any failed jobs in this execution. - */ - def isFailed: Boolean = jobs.values.exists(_ == JobExecutionStatus.FAILED) - - def runningJobs: Seq[Long] = - jobs.filter { case (_, status) => status == JobExecutionStatus.RUNNING }.keys.toSeq - - def succeededJobs: Seq[Long] = - jobs.filter { case (_, status) => status == JobExecutionStatus.SUCCEEDED }.keys.toSeq - - def failedJobs: Seq[Long] = - jobs.filter { case (_, status) => status == JobExecutionStatus.FAILED }.keys.toSeq -} - -/** - * Represent a metric in a SQLPlan. - * - * Because we cannot revert our changes for an "Accumulator", we need to maintain accumulator - * updates for each task. So that if a task is retried, we can simply override the old updates with - * the new updates of the new attempt task. Since we cannot add them to accumulator, we need to use - * "AccumulatorParam" to get the aggregation value. - */ -private[ui] case class SQLPlanMetric( - name: String, - accumulatorId: Long, - metricParam: SQLMetricParam[SQLMetricValue[Any], Any]) - -/** - * Store all accumulatorUpdates for all tasks in a Spark stage. - */ -private[ui] class SQLStageMetrics( - val stageAttemptId: Long, - val taskIdToMetricUpdates: mutable.HashMap[Long, SQLTaskMetrics] = mutable.HashMap.empty) - -/** - * Store all accumulatorUpdates for a Spark task. - */ -private[ui] class SQLTaskMetrics( - val attemptId: Long, // TODO not used yet - var finished: Boolean, - var accumulatorUpdates: Map[Long, Any]) http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLTab.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLTab.scala deleted file mode 100644 index 3bba0af..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLTab.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.ui - -import java.util.concurrent.atomic.AtomicInteger - -import org.apache.spark.Logging -import org.apache.spark.sql.SQLContext -import org.apache.spark.ui.{SparkUI, SparkUITab} - -private[sql] class SQLTab(sqlContext: SQLContext, sparkUI: SparkUI) - extends SparkUITab(sparkUI, SQLTab.nextTabName) with Logging { - - val parent = sparkUI - val listener = sqlContext.listener - - attachPage(new AllExecutionsPage(this)) - attachPage(new ExecutionPage(this)) - parent.attachTab(this) - - parent.addStaticHandler(SQLTab.STATIC_RESOURCE_DIR, "/static/sql") -} - -private[sql] object SQLTab { - - private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/ui/static" - - private val nextTabId = new AtomicInteger(0) - - private def nextTabName: String = { - val nextId = nextTabId.getAndIncrement() - if (nextId == 0) "SQL" else s"SQL${nextId}" - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org