Repository: spark
Updated Branches:
  refs/heads/master 99f3c8277 -> 8e7b56f3d


Revert "[SPARK-15639][SQL] Try to push down filter at RowGroups level for 
parquet reader"

This reverts commit bba5d7999f7b3ae9d816ea552ba9378fea1615a6.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8e7b56f3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8e7b56f3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8e7b56f3

Branch: refs/heads/master
Commit: 8e7b56f3d4917692d3ff44d91aa264738a6fc2ed
Parents: 99f3c82
Author: Cheng Lian <l...@databricks.com>
Authored: Fri Jun 10 20:41:48 2016 -0700
Committer: Cheng Lian <l...@databricks.com>
Committed: Fri Jun 10 20:41:48 2016 -0700

----------------------------------------------------------------------
 .../catalyst/expressions/namedExpressions.scala |  8 ---
 .../datasources/FileSourceStrategy.scala        |  9 +--
 .../datasources/parquet/ParquetFileFormat.scala | 61 ++++++++++++++++++--
 3 files changed, 57 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8e7b56f3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index c06a1ea..306a99d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -292,14 +292,6 @@ case class AttributeReference(
     }
   }
 
-  def withMetadata(newMetadata: Metadata): AttributeReference = {
-    if (metadata == newMetadata) {
-      this
-    } else {
-      AttributeReference(name, dataType, nullable, newMetadata)(exprId, 
qualifier, isGenerated)
-    }
-  }
-
   override protected final def otherCopyArgs: Seq[AnyRef] = {
     exprId :: qualifier :: isGenerated :: Nil
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8e7b56f3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 7fc842f..13a86bf 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -84,14 +84,7 @@ private[sql] object FileSourceStrategy extends Strategy with 
Logging {
       logInfo(s"Pruning directories with: 
${partitionKeyFilters.mkString(",")}")
 
       val dataColumns =
-        l.resolve(files.dataSchema, 
files.sparkSession.sessionState.analyzer.resolver).map { c =>
-          files.dataSchema.find(_.name == c.name).map { f =>
-            c match {
-              case a: AttributeReference => a.withMetadata(f.metadata)
-              case _ => c
-            }
-          }.getOrElse(c)
-        }
+        l.resolve(files.dataSchema, 
files.sparkSession.sessionState.analyzer.resolver)
 
       // Partition keys are not available in the statistics of the files.
       val dataFilters = 
normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)

http://git-wip-us.apache.org/repos/asf/spark/blob/8e7b56f3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index bc4a9de..3735c94 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -357,11 +357,6 @@ private[sql] class ParquetFileFormat
       val hadoopAttemptContext =
         new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, 
attemptId)
 
-      // Try to push down filters when filter push-down is enabled.
-      // Notice: This push-down is RowGroups level, not individual records.
-      pushed.foreach {
-        
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, _)
-      }
       val parquetReader = if (enableVectorizedReader) {
         val vectorizedReader = new VectorizedParquetRecordReader()
         vectorizedReader.initialize(split, hadoopAttemptContext)
@@ -597,6 +592,62 @@ private[sql] object ParquetFileFormat extends Logging {
     }
   }
 
+  /** This closure sets various Parquet configurations at both driver side and 
executor side. */
+  private[parquet] def initializeLocalJobFunc(
+      requiredColumns: Array[String],
+      filters: Array[Filter],
+      dataSchema: StructType,
+      parquetBlockSize: Long,
+      useMetadataCache: Boolean,
+      parquetFilterPushDown: Boolean,
+      assumeBinaryIsString: Boolean,
+      assumeInt96IsTimestamp: Boolean)(job: Job): Unit = {
+    val conf = job.getConfiguration
+    conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[CatalystReadSupport].getName)
+
+    // Try to push down filters when filter push-down is enabled.
+    if (parquetFilterPushDown) {
+      filters
+        // Collects all converted Parquet filter predicates. Notice that not 
all predicates can be
+        // converted (`ParquetFilters.createFilter` returns an `Option`). 
That's why a `flatMap`
+        // is used here.
+        .flatMap(ParquetFilters.createFilter(dataSchema, _))
+        .reduceOption(FilterApi.and)
+        .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
+    }
+
+    conf.set(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
+      val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
+      CatalystSchemaConverter.checkFieldNames(requestedSchema).json
+    })
+
+    conf.set(
+      CatalystWriteSupport.SPARK_ROW_SCHEMA,
+      CatalystSchemaConverter.checkFieldNames(dataSchema).json)
+
+    // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet 
and FS metadata
+    conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)
+
+    // Sets flags for `CatalystSchemaConverter`
+    conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
+    conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, 
assumeInt96IsTimestamp)
+
+    overrideMinSplitSize(parquetBlockSize, conf)
+  }
+
+  /** This closure sets input paths at the driver side. */
+  private[parquet] def initializeDriverSideJobFunc(
+      inputFiles: Array[FileStatus],
+      parquetBlockSize: Long)(job: Job): Unit = {
+    // We side the input paths at the driver side.
+    logInfo(s"Reading Parquet file(s) from 
${inputFiles.map(_.getPath).mkString(", ")}")
+    if (inputFiles.nonEmpty) {
+      FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
+    }
+
+    overrideMinSplitSize(parquetBlockSize, job.getConfiguration)
+  }
+
   private[parquet] def readSchema(
       footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to