Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21503#discussion_r194875888 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala --- @@ -17,15 +17,56 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.Strategy +import org.apache.spark.sql.{execution, Strategy} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec} object DataSourceV2Strategy extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case r: DataSourceV2Relation => - DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil + case PhysicalOperation(project, filters, relation: DataSourceV2Relation) => + val projectSet = AttributeSet(project.flatMap(_.references)) + val filterSet = AttributeSet(filters.flatMap(_.references)) + + val projection = if (filterSet.subsetOf(projectSet) && + AttributeSet(relation.output) == projectSet) { + // When the required projection contains all of the filter columns and column pruning alone + // can produce the required projection, push the required projection. + // A final projection may still be needed if the data source produces a different column + // order or if it cannot prune all of the nested columns. + relation.output + } else { + // When there are filter columns not already in the required projection or when the required + // projection is more complicated than column pruning, base column pruning on the set of + // all columns needed by both. + (projectSet ++ filterSet).toSeq + } + + val reader = relation.newReader --- End diff -- I don't mind either option #1 or #2. #2 is basically what happens for non-v2 data sources right now. Plus, both should be temporary. I think it is a bad idea to continue with hacky code that uses the reader in the logical plan. It is much cleaner otherwise and we've spend too much time making sure that everything still works. The main example that comes to mind is setting the requested projection and finding out what output is using pushdown. I think hacks are slowing progress on the v2 sources.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org