Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21503#discussion_r195173700
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
    @@ -32,79 +31,35 @@ import org.apache.spark.sql.types.StructType
     
     case class DataSourceV2Relation(
         source: DataSourceV2,
    +    output: Seq[AttributeReference],
         options: Map[String, String],
    -    projection: Seq[AttributeReference],
    -    filters: Option[Seq[Expression]] = None,
         userSpecifiedSchema: Option[StructType] = None)
       extends LeafNode with MultiInstanceRelation with 
DataSourceV2StringFormat {
     
       import DataSourceV2Relation._
     
    -  override def simpleString: String = "RelationV2 " + metadataString
    -
    -  override lazy val schema: StructType = reader.readSchema()
    -
    -  override lazy val output: Seq[AttributeReference] = {
    -    // use the projection attributes to avoid assigning new ids. fields 
that are not projected
    -    // will be assigned new ids, which is okay because they are not 
projected.
    -    val attrMap = projection.map(a => a.name -> a).toMap
    -    schema.map(f => attrMap.getOrElse(f.name,
    -      AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()))
    -  }
    -
    -  private lazy val v2Options: DataSourceOptions = makeV2Options(options)
    +  override def pushedFilters: Seq[Expression] = Seq.empty
     
    -  // postScanFilters: filters that need to be evaluated after the scan.
    -  // pushedFilters: filters that will be pushed down and evaluated in the 
underlying data sources.
    -  // Note: postScanFilters and pushedFilters can overlap, e.g. the parquet 
row group filter.
    -  lazy val (
    -      reader: DataSourceReader,
    -      postScanFilters: Seq[Expression],
    -      pushedFilters: Seq[Expression]) = {
    -    val newReader = userSpecifiedSchema match {
    -      case Some(s) =>
    -        source.asReadSupportWithSchema.createReader(s, v2Options)
    -      case _ =>
    -        source.asReadSupport.createReader(v2Options)
    -    }
    -
    -    DataSourceV2Relation.pushRequiredColumns(newReader, 
projection.toStructType)
    -
    -    val (postScanFilters, pushedFilters) = filters match {
    -      case Some(filterSeq) =>
    -        DataSourceV2Relation.pushFilters(newReader, filterSeq)
    -      case _ =>
    -        (Nil, Nil)
    -    }
    -    logInfo(s"Post-Scan Filters: ${postScanFilters.mkString(",")}")
    -    logInfo(s"Pushed Filters: ${pushedFilters.mkString(", ")}")
    -
    -    (newReader, postScanFilters, pushedFilters)
    -  }
    -
    -  override def doCanonicalize(): LogicalPlan = {
    -    val c = super.doCanonicalize().asInstanceOf[DataSourceV2Relation]
    +  override def simpleString: String = "RelationV2 " + metadataString
     
    -    // override output with canonicalized output to avoid attempting to 
configure a reader
    -    val canonicalOutput: Seq[AttributeReference] = this.output
    -        .map(a => QueryPlan.normalizeExprId(a, projection))
    +  lazy val v2Options: DataSourceOptions = makeV2Options(options)
     
    -    new DataSourceV2Relation(c.source, c.options, c.projection) {
    -      override lazy val output: Seq[AttributeReference] = canonicalOutput
    -    }
    +  def newReader: DataSourceReader = userSpecifiedSchema match {
    +    case Some(userSchema) =>
    +      source.asReadSupportWithSchema.createReader(userSchema, v2Options)
    +    case None =>
    +      source.asReadSupport.createReader(v2Options)
       }
     
    -  override def computeStats(): Statistics = reader match {
    +  override def computeStats(): Statistics = newReader match {
         case r: SupportsReportStatistics =>
           Statistics(sizeInBytes = 
r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
         case _ =>
           Statistics(sizeInBytes = conf.defaultSizeInBytes)
       }
     
       override def newInstance(): DataSourceV2Relation = {
    --- End diff --
    
    I thought that initially, but the canonicalization test was failing without 
this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to