Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r209042787 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -39,52 +36,43 @@ case class DataSourceV2ScanExec( @transient source: DataSourceV2, @transient options: Map[String, String], @transient pushedFilters: Seq[Expression], - @transient reader: DataSourceReader) + @transient readSupport: ReadSupport, + @transient scanConfig: ScanConfig) extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan { override def simpleString: String = "ScanV2 " + metadataString // TODO: unify the equal/hashCode implementation for all data source v2 query plans. override def equals(other: Any): Boolean = other match { case other: DataSourceV2ScanExec => - output == other.output && reader.getClass == other.reader.getClass && options == other.options + output == other.output && readSupport.getClass == other.readSupport.getClass && + options == other.options case _ => false } override def hashCode(): Int = { Seq(output, source, options).hashCode() } - override def outputPartitioning: physical.Partitioning = reader match { - case r: SupportsScanColumnarBatch if r.enableBatchRead() && batchPartitions.size == 1 => - SinglePartition - - case r: SupportsScanColumnarBatch if !r.enableBatchRead() && partitions.size == 1 => - SinglePartition - - case r if !r.isInstanceOf[SupportsScanColumnarBatch] && partitions.size == 1 => + override def outputPartitioning: physical.Partitioning = readSupport match { + case _ if partitions.length == 1 => SinglePartition case s: SupportsReportPartitioning => new DataSourcePartitioning( - s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name))) + s.outputPartitioning(scanConfig), AttributeMap(output.map(a => a -> a.name))) case _ => super.outputPartitioning } - private lazy val partitions: Seq[InputPartition[InternalRow]] = { - reader.planInputPartitions().asScala - } + private lazy val partitions: Seq[InputPartition] = readSupport.planInputPartitions(scanConfig) - private lazy val batchPartitions: Seq[InputPartition[ColumnarBatch]] = reader match { - case r: SupportsScanColumnarBatch if r.enableBatchRead() => - assert(!reader.isInstanceOf[ContinuousReader], - "continuous stream reader does not support columnar read yet.") - r.planBatchInputPartitions().asScala - } + private lazy val partitionReaderFactory = readSupport.createReaderFactory(scanConfig) - private lazy val inputRDD: RDD[InternalRow] = reader match { - case _: ContinuousReader => + private lazy val inputRDD: RDD[InternalRow] = readSupport match { + case _: ContinuousReadSupport => + assert(!partitionReaderFactory.supportColumnarReads(), --- End diff -- Works for me. What is the issue to add batch support?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org