Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167147910 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,130 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( - output: Seq[AttributeReference], - reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { + source: DataSourceV2, + options: Map[String, String], + projection: Option[Seq[AttributeReference]] = None, + filters: Option[Seq[Expression]] = None, + userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { + s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { --- End diff -- I pulled your code and played with it. So your PR does fix the bug, but in a hacky way. Let's me explain what happened. 1. `QueryPlan.canonicalized` is called, every expression in `DataSourceV2Relation` is canonicalized, including `DataSourceV2Relation.projection`. This means, the attributes in `projection` are all renamed to "none". 2. `DataSourceV2Relation.output` is called, which triggers the creation of the reader, and applies filter push down and column pruning. Note that because all attributes are renamed to "none", we are actually pushing invalid filters and columns to data sources. 3. line up `reader.schema` and `projection`, to get the actual output. Because all names are "none", it works. However step 2 is pretty dangerous, Spark doesn't define the behavior of pushing invalid filters and columns, especially what `reader.schema` should return after invalid columns are pushed down. I prefer my original fix, which put `output` in `DataSourceV2Relation`'s constructor parameters, and update it when doing column pruning in `PushDownOperatorsToDataSource`.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org