Github user cloud-fan commented on a diff in the pull request:
    --- Diff: 
    @@ -17,17 +17,130 @@
     package org.apache.spark.sql.execution.datasources.v2
    +import java.util.UUID
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +import org.apache.spark.sql.{AnalysisException, SaveMode}
     import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
    -import org.apache.spark.sql.catalyst.expressions.AttributeReference
    -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
    -import org.apache.spark.sql.sources.v2.reader._
    +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
    +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
    +import org.apache.spark.sql.execution.datasources.DataSourceStrategy
    +import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
    +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
    +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
    +import org.apache.spark.sql.types.StructType
     case class DataSourceV2Relation(
    -    output: Seq[AttributeReference],
    -    reader: DataSourceReader)
    -  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
    +    source: DataSourceV2,
    +    options: Map[String, String],
    +    projection: Option[Seq[AttributeReference]] = None,
    +    filters: Option[Seq[Expression]] = None,
    +    userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
    +  override def simpleString: String = {
    +    s"DataSourceV2Relation(source=$sourceName, " +
    +      s"schema=[${ => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
    +      s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
    +  }
    +  override lazy val schema: StructType = reader.readSchema()
    +  override lazy val output: Seq[AttributeReference] = {
    +    projection match {
    +      case Some(attrs) =>
    +        // use the projection attributes to avoid assigning new ids. 
fields that are not projected
    +        // will be assigned new ids, which is okay because they are not 
    +        val attrMap = => -> a).toMap
    + => attrMap.getOrElse(,
    +          AttributeReference(, f.dataType, f.nullable, 
    +      case _ =>
    +        schema.toAttributes
    +    }
    +  }
    +  private lazy val v2Options: DataSourceOptions = {
    +    // ensure path and table options are set correctly
    +    val updatedOptions = new mutable.HashMap[String, String]
    +    updatedOptions ++= options
    +    new DataSourceOptions(options.asJava)
    +  }
    +  private val sourceName: String = {
    +    source match {
    +      case registered: DataSourceRegister =>
    +        registered.shortName()
    +      case _ =>
    +        source.getClass.getSimpleName
    +    }
    +  }
    +  lazy val (
    +      reader: DataSourceReader,
    +      unsupportedFilters: Seq[Expression],
    +      pushedFilters: Seq[Expression]) = {
    +    val newReader = userSchema match {
    +      case Some(s) =>
    +        asReadSupportWithSchema.createReader(s, v2Options)
    +      case _ =>
    +        asReadSupport.createReader(v2Options)
    +    }
    +    projection.foreach { attrs =>
    +      DataSourceV2Relation.pushRequiredColumns(newReader, 
    +    }
    +    val (remainingFilters, pushedFilters) = filters match {
    +      case Some(filterSeq) =>
    +        DataSourceV2Relation.pushFilters(newReader, filterSeq)
    +      case _ =>
    +        (Nil, Nil)
    +    }
    +    (newReader, remainingFilters, pushedFilters)
    +  }
    -  override def canEqual(other: Any): Boolean = 
    +  def writer(dfSchema: StructType, mode: SaveMode): 
Option[DataSourceWriter] = {
    --- End diff --
    I think we should avoid adding unused code that is needed in the future. 
The streaming data source v2 was a bad example and you already pointed it out. 
Hope we don't make the same mistake in the future.


To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to