Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20448#discussion_r164958102
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
    @@ -17,16 +17,57 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import org.apache.spark.sql.catalyst.expressions.AttributeReference
    +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
AttributeSet, Expression}
     import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema}
     import org.apache.spark.sql.sources.v2.reader._
    +import org.apache.spark.sql.types.StructType
     
    +/**
    + * A logical plan representing a data source relation, which will be 
planned to a data scan
    + * operator finally.
    + *
    + * @param output The output of this relation.
    + * @param source The instance of a data source v2 implementation.
    + * @param options The options specified for this scan, used to create the 
`DataSourceReader`.
    + * @param userSpecifiedSchema The user specified schema, used to create 
the `DataSourceReader`.
    + * @param filters The predicates which are pushed and handled by this data 
source.
    + * @param existingReader An mutable reader carrying some temporary stats 
during optimization and
    + *                       planning. It's always None before optimization, 
and does not take part in
    + *                       the equality of this plan, which means this plan 
is still immutable.
    + */
     case class DataSourceV2Relation(
    -    fullOutput: Seq[AttributeReference],
    -    reader: DataSourceReader) extends LeafNode with DataSourceReaderHolder 
{
    +    output: Seq[AttributeReference],
    +    source: DataSourceV2,
    +    options: DataSourceOptions,
    +    userSpecifiedSchema: Option[StructType],
    +    filters: Set[Expression],
    +    existingReader: Option[DataSourceReader]) extends LeafNode with 
DataSourceV2QueryPlan {
    +
    +  override def references: AttributeSet = AttributeSet.empty
    +
    +  override def sourceClass: Class[_ <: DataSourceV2] = source.getClass
     
       override def canEqual(other: Any): Boolean = 
other.isInstanceOf[DataSourceV2Relation]
     
    +  def reader: DataSourceReader = existingReader.getOrElse {
    +    (source, userSpecifiedSchema) match {
    +      case (ds: ReadSupportWithSchema, Some(schema)) =>
    +        ds.createReader(schema, options)
    +
    +      case (ds: ReadSupport, None) =>
    +        ds.createReader(options)
    +
    +      case (ds: ReadSupport, Some(schema)) =>
    +        val reader = ds.createReader(options)
    +        // Sanity check, this should be guaranteed by 
`DataFrameReader.load`
    +        assert(reader.readSchema() == schema)
    +        reader
    +
    +      case _ => throw new IllegalStateException()
    +    }
    +  }
    +
    --- End diff --
    
    data source v2 doesn't support tables yet, so we don't have this problem 
now.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to