Repository: spark
Updated Branches:
  refs/heads/master 8f225e055 -> 1737d45e0


[SPARK-24478][SQL][FOLLOWUP] Move projection and filter push down to physical 
conversion

## What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/21503, to completely 
move operator pushdown to the planner rule.

The code are mostly from https://github.com/apache/spark/pull/21319

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenc...@databricks.com>

Closes #21574 from cloud-fan/followup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1737d45e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1737d45e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1737d45e

Branch: refs/heads/master
Commit: 1737d45e08a5f1fb78515b14321721d7197b443a
Parents: 8f225e0
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Mon Jun 18 20:15:01 2018 -0700
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Mon Jun 18 20:15:01 2018 -0700

----------------------------------------------------------------------
 .../v2/reader/SupportsReportStatistics.java     |   7 +-
 .../datasources/v2/DataSourceV2Relation.scala   | 109 ++++------------
 .../datasources/v2/DataSourceV2Strategy.scala   | 124 ++++++++++++++-----
 3 files changed, 123 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1737d45e/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
index a79080a..9263964 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
@@ -23,10 +23,9 @@ import org.apache.spark.annotation.InterfaceStability;
  * A mix in interface for {@link DataSourceReader}. Data source readers can 
implement this
  * interface to report statistics to Spark.
  *
- * Statistics are reported to the optimizer before a projection or any filters 
are pushed to the
- * DataSourceReader. Implementations that return more accurate statistics 
based on projection and
- * filters will not improve query performance until the planner can push 
operators before getting
- * stats.
+ * Statistics are reported to the optimizer before any operator is pushed to 
the DataSourceReader.
+ * Implementations that return more accurate statistics based on pushed 
operators will not improve
+ * query performance until the planner can push operators before getting stats.
  */
 @InterfaceStability.Evolving
 public interface SupportsReportStatistics extends DataSourceReader {

http://git-wip-us.apache.org/repos/asf/spark/blob/1737d45e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index e08af21..7613eb2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -23,17 +23,24 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema}
-import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsReportStatistics}
 import org.apache.spark.sql.types.StructType
 
+/**
+ * A logical plan representing a data source v2 scan.
+ *
+ * @param source An instance of a [[DataSourceV2]] implementation.
+ * @param options The options for this scan. Used to create fresh 
[[DataSourceReader]].
+ * @param userSpecifiedSchema The user-specified schema for this scan. Used to 
create fresh
+ *                            [[DataSourceReader]].
+ */
 case class DataSourceV2Relation(
     source: DataSourceV2,
     output: Seq[AttributeReference],
     options: Map[String, String],
-    userSpecifiedSchema: Option[StructType] = None)
+    userSpecifiedSchema: Option[StructType])
   extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {
 
   import DataSourceV2Relation._
@@ -42,14 +49,7 @@ case class DataSourceV2Relation(
 
   override def simpleString: String = "RelationV2 " + metadataString
 
-  lazy val v2Options: DataSourceOptions = makeV2Options(options)
-
-  def newReader: DataSourceReader = userSpecifiedSchema match {
-    case Some(userSchema) =>
-      source.asReadSupportWithSchema.createReader(userSchema, v2Options)
-    case None =>
-      source.asReadSupport.createReader(v2Options)
-  }
+  def newReader(): DataSourceReader = source.createReader(options, 
userSpecifiedSchema)
 
   override def computeStats(): Statistics = newReader match {
     case r: SupportsReportStatistics =>
@@ -139,83 +139,26 @@ object DataSourceV2Relation {
           source.getClass.getSimpleName
       }
     }
-  }
-
-  private def makeV2Options(options: Map[String, String]): DataSourceOptions = 
{
-    new DataSourceOptions(options.asJava)
-  }
 
-  private def schema(
-      source: DataSourceV2,
-      v2Options: DataSourceOptions,
-      userSchema: Option[StructType]): StructType = {
-    val reader = userSchema match {
-      case Some(s) =>
-        source.asReadSupportWithSchema.createReader(s, v2Options)
-      case _ =>
-        source.asReadSupport.createReader(v2Options)
+    def createReader(
+        options: Map[String, String],
+        userSpecifiedSchema: Option[StructType]): DataSourceReader = {
+      val v2Options = new DataSourceOptions(options.asJava)
+      userSpecifiedSchema match {
+        case Some(s) =>
+          asReadSupportWithSchema.createReader(s, v2Options)
+        case _ =>
+          asReadSupport.createReader(v2Options)
+      }
     }
-    reader.readSchema()
   }
 
   def create(
       source: DataSourceV2,
       options: Map[String, String],
-      userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {
-    val output = schema(source, makeV2Options(options), 
userSpecifiedSchema).toAttributes
-    DataSourceV2Relation(source, output, options, userSpecifiedSchema)
-  }
-
-  def pushRequiredColumns(
-      relation: DataSourceV2Relation,
-      reader: DataSourceReader,
-      struct: StructType): Seq[AttributeReference] = {
-    reader match {
-      case projectionSupport: SupportsPushDownRequiredColumns =>
-        projectionSupport.pruneColumns(struct)
-        // return the output columns from the relation that were projected
-        val attrMap = relation.output.map(a => a.name -> a).toMap
-        projectionSupport.readSchema().map(f => attrMap(f.name))
-      case _ =>
-        relation.output
-    }
-  }
-
-  def pushFilters(
-      reader: DataSourceReader,
-      filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
-    reader match {
-      case r: SupportsPushDownCatalystFilters =>
-        val postScanFilters = r.pushCatalystFilters(filters.toArray)
-        val pushedFilters = r.pushedCatalystFilters()
-        (postScanFilters, pushedFilters)
-
-      case r: SupportsPushDownFilters =>
-        // A map from translated data source filters to original catalyst 
filter expressions.
-        val translatedFilterToExpr = 
scala.collection.mutable.HashMap.empty[Filter, Expression]
-        // Catalyst filter expression that can't be translated to data source 
filters.
-        val untranslatableExprs = 
scala.collection.mutable.ArrayBuffer.empty[Expression]
-
-        for (filterExpr <- filters) {
-          val translated = DataSourceStrategy.translateFilter(filterExpr)
-          if (translated.isDefined) {
-            translatedFilterToExpr(translated.get) = filterExpr
-          } else {
-            untranslatableExprs += filterExpr
-          }
-        }
-
-        // Data source filters that need to be evaluated again after scanning. 
which means
-        // the data source cannot guarantee the rows returned can pass these 
filters.
-        // As a result we must return it so Spark can plan an extra filter 
operator.
-        val postScanFilters =
-        
r.pushFilters(translatedFilterToExpr.keys.toArray).map(translatedFilterToExpr)
-        // The filters which are marked as pushed to this data source
-        val pushedFilters = r.pushedFilters().map(translatedFilterToExpr)
-
-        (untranslatableExprs ++ postScanFilters, pushedFilters)
-
-      case _ => (filters, Nil)
-    }
+      userSpecifiedSchema: Option[StructType]): DataSourceV2Relation = {
+    val reader = source.createReader(options, userSpecifiedSchema)
+    DataSourceV2Relation(
+      source, reader.readSchema().toAttributes, options, userSpecifiedSchema)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1737d45e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 8bf858c..182aa29 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -17,51 +17,115 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.{execution, Strategy}
-import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet}
+import scala.collection.mutable
+
+import org.apache.spark.sql.{sources, Strategy}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, Expression}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import 
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
 WriteToContinuousDataSourceExec}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns}
 
 object DataSourceV2Strategy extends Strategy {
-  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-    case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
-      val projectSet = AttributeSet(project.flatMap(_.references))
-      val filterSet = AttributeSet(filters.flatMap(_.references))
-
-      val projection = if (filterSet.subsetOf(projectSet) &&
-          AttributeSet(relation.output) == projectSet) {
-        // When the required projection contains all of the filter columns and 
column pruning alone
-        // can produce the required projection, push the required projection.
-        // A final projection may still be needed if the data source produces 
a different column
-        // order or if it cannot prune all of the nested columns.
-        relation.output
-      } else {
-        // When there are filter columns not already in the required 
projection or when the required
-        // projection is more complicated than column pruning, base column 
pruning on the set of
-        // all columns needed by both.
-        (projectSet ++ filterSet).toSeq
-      }
 
-      val reader = relation.newReader
+  /**
+   * Pushes down filters to the data source reader
+   *
+   * @return pushed filter and post-scan filters.
+   */
+  private def pushFilters(
+      reader: DataSourceReader,
+      filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
+    reader match {
+      case r: SupportsPushDownCatalystFilters =>
+        val postScanFilters = r.pushCatalystFilters(filters.toArray)
+        val pushedFilters = r.pushedCatalystFilters()
+        (pushedFilters, postScanFilters)
+
+      case r: SupportsPushDownFilters =>
+        // A map from translated data source filters to original catalyst 
filter expressions.
+        val translatedFilterToExpr = mutable.HashMap.empty[sources.Filter, 
Expression]
+        // Catalyst filter expression that can't be translated to data source 
filters.
+        val untranslatableExprs = mutable.ArrayBuffer.empty[Expression]
+
+        for (filterExpr <- filters) {
+          val translated = DataSourceStrategy.translateFilter(filterExpr)
+          if (translated.isDefined) {
+            translatedFilterToExpr(translated.get) = filterExpr
+          } else {
+            untranslatableExprs += filterExpr
+          }
+        }
+
+        // Data source filters that need to be evaluated again after scanning. 
which means
+        // the data source cannot guarantee the rows returned can pass these 
filters.
+        // As a result we must return it so Spark can plan an extra filter 
operator.
+        val postScanFilters = 
r.pushFilters(translatedFilterToExpr.keys.toArray)
+          .map(translatedFilterToExpr)
+        // The filters which are marked as pushed to this data source
+        val pushedFilters = r.pushedFilters().map(translatedFilterToExpr)
+        (pushedFilters, untranslatableExprs ++ postScanFilters)
+
+      case _ => (Nil, filters)
+    }
+  }
 
-      val output = DataSourceV2Relation.pushRequiredColumns(relation, reader,
-        projection.asInstanceOf[Seq[AttributeReference]].toStructType)
+  /**
+   * Applies column pruning to the data source, w.r.t. the references of the 
given expressions.
+   *
+   * @return new output attributes after column pruning.
+   */
+  // TODO: nested column pruning.
+  private def pruneColumns(
+      reader: DataSourceReader,
+      relation: DataSourceV2Relation,
+      exprs: Seq[Expression]): Seq[AttributeReference] = {
+    reader match {
+      case r: SupportsPushDownRequiredColumns =>
+        val requiredColumns = AttributeSet(exprs.flatMap(_.references))
+        val neededOutput = relation.output.filter(requiredColumns.contains)
+        if (neededOutput != relation.output) {
+          r.pruneColumns(neededOutput.toStructType)
+          val nameToAttr = 
relation.output.map(_.name).zip(relation.output).toMap
+          r.readSchema().toAttributes.map {
+            // We have to keep the attribute id during transformation.
+            a => a.withExprId(nameToAttr(a.name).exprId)
+          }
+        } else {
+          relation.output
+        }
+
+      case _ => relation.output
+    }
+  }
 
-      val (postScanFilters, pushedFilters) = 
DataSourceV2Relation.pushFilters(reader, filters)
 
-      logInfo(s"Post-Scan Filters: ${postScanFilters.mkString(",")}")
-      logInfo(s"Pushed Filters: ${pushedFilters.mkString(", ")}")
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+    case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
+      val reader = relation.newReader()
+      // `pushedFilters` will be pushed down and evaluated in the underlying 
data sources.
+      // `postScanFilters` need to be evaluated after the scan.
+      // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet 
row group filter.
+      val (pushedFilters, postScanFilters) = pushFilters(reader, filters)
+      val output = pruneColumns(reader, relation, project ++ postScanFilters)
+      logInfo(
+        s"""
+           |Pushing operators to ${relation.source.getClass}
+           |Pushed Filters: ${pushedFilters.mkString(", ")}
+           |Post-Scan Filters: ${postScanFilters.mkString(",")}
+           |Output: ${output.mkString(", ")}
+         """.stripMargin)
 
       val scan = DataSourceV2ScanExec(
         output, relation.source, relation.options, pushedFilters, reader)
 
-      val filter = postScanFilters.reduceLeftOption(And)
-      val withFilter = filter.map(execution.FilterExec(_, 
scan)).getOrElse(scan)
+      val filterCondition = postScanFilters.reduceLeftOption(And)
+      val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
 
       val withProjection = if (withFilter.output != project) {
-        execution.ProjectExec(project, withFilter)
+        ProjectExec(project, withFilter)
       } else {
         withFilter
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to