Repository: spark Updated Branches: refs/heads/master 99f3c8277 -> 8e7b56f3d
Revert "[SPARK-15639][SQL] Try to push down filter at RowGroups level for parquet reader" This reverts commit bba5d7999f7b3ae9d816ea552ba9378fea1615a6. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8e7b56f3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8e7b56f3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8e7b56f3 Branch: refs/heads/master Commit: 8e7b56f3d4917692d3ff44d91aa264738a6fc2ed Parents: 99f3c82 Author: Cheng Lian <l...@databricks.com> Authored: Fri Jun 10 20:41:48 2016 -0700 Committer: Cheng Lian <l...@databricks.com> Committed: Fri Jun 10 20:41:48 2016 -0700 ---------------------------------------------------------------------- .../catalyst/expressions/namedExpressions.scala | 8 --- .../datasources/FileSourceStrategy.scala | 9 +-- .../datasources/parquet/ParquetFileFormat.scala | 61 ++++++++++++++++++-- 3 files changed, 57 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8e7b56f3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index c06a1ea..306a99d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -292,14 +292,6 @@ case class AttributeReference( } } - def withMetadata(newMetadata: Metadata): AttributeReference = { - if (metadata == newMetadata) { - this - } else { - AttributeReference(name, dataType, nullable, newMetadata)(exprId, qualifier, isGenerated) - } - } - override protected final def otherCopyArgs: Seq[AnyRef] = { exprId :: qualifier :: isGenerated :: Nil } http://git-wip-us.apache.org/repos/asf/spark/blob/8e7b56f3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 7fc842f..13a86bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -84,14 +84,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}") val dataColumns = - l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver).map { c => - files.dataSchema.find(_.name == c.name).map { f => - c match { - case a: AttributeReference => a.withMetadata(f.metadata) - case _ => c - } - }.getOrElse(c) - } + l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver) // Partition keys are not available in the statistics of the files. val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty) http://git-wip-us.apache.org/repos/asf/spark/blob/8e7b56f3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index bc4a9de..3735c94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -357,11 +357,6 @@ private[sql] class ParquetFileFormat val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId) - // Try to push down filters when filter push-down is enabled. - // Notice: This push-down is RowGroups level, not individual records. - pushed.foreach { - ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, _) - } val parquetReader = if (enableVectorizedReader) { val vectorizedReader = new VectorizedParquetRecordReader() vectorizedReader.initialize(split, hadoopAttemptContext) @@ -597,6 +592,62 @@ private[sql] object ParquetFileFormat extends Logging { } } + /** This closure sets various Parquet configurations at both driver side and executor side. */ + private[parquet] def initializeLocalJobFunc( + requiredColumns: Array[String], + filters: Array[Filter], + dataSchema: StructType, + parquetBlockSize: Long, + useMetadataCache: Boolean, + parquetFilterPushDown: Boolean, + assumeBinaryIsString: Boolean, + assumeInt96IsTimestamp: Boolean)(job: Job): Unit = { + val conf = job.getConfiguration + conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName) + + // Try to push down filters when filter push-down is enabled. + if (parquetFilterPushDown) { + filters + // Collects all converted Parquet filter predicates. Notice that not all predicates can be + // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` + // is used here. + .flatMap(ParquetFilters.createFilter(dataSchema, _)) + .reduceOption(FilterApi.and) + .foreach(ParquetInputFormat.setFilterPredicate(conf, _)) + } + + conf.set(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, { + val requestedSchema = StructType(requiredColumns.map(dataSchema(_))) + CatalystSchemaConverter.checkFieldNames(requestedSchema).json + }) + + conf.set( + CatalystWriteSupport.SPARK_ROW_SCHEMA, + CatalystSchemaConverter.checkFieldNames(dataSchema).json) + + // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata + conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache) + + // Sets flags for `CatalystSchemaConverter` + conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString) + conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp) + + overrideMinSplitSize(parquetBlockSize, conf) + } + + /** This closure sets input paths at the driver side. */ + private[parquet] def initializeDriverSideJobFunc( + inputFiles: Array[FileStatus], + parquetBlockSize: Long)(job: Job): Unit = { + // We side the input paths at the driver side. + logInfo(s"Reading Parquet file(s) from ${inputFiles.map(_.getPath).mkString(", ")}") + if (inputFiles.nonEmpty) { + FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*) + } + + overrideMinSplitSize(parquetBlockSize, job.getConfiguration) + } + private[parquet] def readSchema( footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org