Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r209042787
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
    @@ -39,52 +36,43 @@ case class DataSourceV2ScanExec(
         @transient source: DataSourceV2,
         @transient options: Map[String, String],
         @transient pushedFilters: Seq[Expression],
    -    @transient reader: DataSourceReader)
    +    @transient readSupport: ReadSupport,
    +    @transient scanConfig: ScanConfig)
       extends LeafExecNode with DataSourceV2StringFormat with 
ColumnarBatchScan {
     
       override def simpleString: String = "ScanV2 " + metadataString
     
       // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
       override def equals(other: Any): Boolean = other match {
         case other: DataSourceV2ScanExec =>
    -      output == other.output && reader.getClass == other.reader.getClass 
&& options == other.options
    +      output == other.output && readSupport.getClass == 
other.readSupport.getClass &&
    +        options == other.options
         case _ => false
       }
     
       override def hashCode(): Int = {
         Seq(output, source, options).hashCode()
       }
     
    -  override def outputPartitioning: physical.Partitioning = reader match {
    -    case r: SupportsScanColumnarBatch if r.enableBatchRead() && 
batchPartitions.size == 1 =>
    -      SinglePartition
    -
    -    case r: SupportsScanColumnarBatch if !r.enableBatchRead() && 
partitions.size == 1 =>
    -      SinglePartition
    -
    -    case r if !r.isInstanceOf[SupportsScanColumnarBatch] && 
partitions.size == 1 =>
    +  override def outputPartitioning: physical.Partitioning = readSupport 
match {
    +    case _ if partitions.length == 1 =>
           SinglePartition
     
         case s: SupportsReportPartitioning =>
           new DataSourcePartitioning(
    -        s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name)))
    +        s.outputPartitioning(scanConfig), AttributeMap(output.map(a => a 
-> a.name)))
     
         case _ => super.outputPartitioning
       }
     
    -  private lazy val partitions: Seq[InputPartition[InternalRow]] = {
    -    reader.planInputPartitions().asScala
    -  }
    +  private lazy val partitions: Seq[InputPartition] = 
readSupport.planInputPartitions(scanConfig)
     
    -  private lazy val batchPartitions: Seq[InputPartition[ColumnarBatch]] = 
reader match {
    -    case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
    -      assert(!reader.isInstanceOf[ContinuousReader],
    -        "continuous stream reader does not support columnar read yet.")
    -      r.planBatchInputPartitions().asScala
    -  }
    +  private lazy val partitionReaderFactory = 
readSupport.createReaderFactory(scanConfig)
     
    -  private lazy val inputRDD: RDD[InternalRow] = reader match {
    -    case _: ContinuousReader =>
    +  private lazy val inputRDD: RDD[InternalRow] = readSupport match {
    +    case _: ContinuousReadSupport =>
    +      assert(!partitionReaderFactory.supportColumnarReads(),
    --- End diff --
    
    Works for me. What is the issue to add batch support?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to