Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r166997697 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,130 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( - output: Seq[AttributeReference], - reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { + source: DataSourceV2, + options: Map[String, String], + projection: Option[Seq[AttributeReference]] = None, + filters: Option[Seq[Expression]] = None, + userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { + s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { + projection match { + case Some(attrs) => + // use the projection attributes to avoid assigning new ids. fields that are not projected + // will be assigned new ids, which is okay because they are not projected. + val attrMap = attrs.map(a => a.name -> a).toMap + schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => + schema.toAttributes + } + } + + private lazy val v2Options: DataSourceOptions = { + // ensure path and table options are set correctly + val updatedOptions = new mutable.HashMap[String, String] + updatedOptions ++= options + + new DataSourceOptions(options.asJava) + } + + private val sourceName: String = { + source match { + case registered: DataSourceRegister => + registered.shortName() + case _ => + source.getClass.getSimpleName + } + } + + lazy val ( + reader: DataSourceReader, + unsupportedFilters: Seq[Expression], + pushedFilters: Seq[Expression]) = { + val newReader = userSchema match { + case Some(s) => + asReadSupportWithSchema.createReader(s, v2Options) + case _ => + asReadSupport.createReader(v2Options) + } + + projection.foreach { attrs => + DataSourceV2Relation.pushRequiredColumns(newReader, attrs.toStructType) + } + + val (remainingFilters, pushedFilters) = filters match { + case Some(filterSeq) => + DataSourceV2Relation.pushFilters(newReader, filterSeq) + case _ => + (Nil, Nil) + } + + (newReader, remainingFilters, pushedFilters) + } - override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation] + def writer(dfSchema: StructType, mode: SaveMode): Option[DataSourceWriter] = { --- End diff -- No, it isn't. But a relation should be able to return a writer. This is going to be needed as we improve the logical plans used by v2.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org