Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20153#discussion_r161034623
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
    @@ -37,40 +35,58 @@ import org.apache.spark.sql.types.StructType
      */
     case class DataSourceV2ScanExec(
         fullOutput: Seq[AttributeReference],
    -    @transient reader: DataSourceV2Reader) extends LeafExecNode with 
DataSourceReaderHolder {
    +    @transient reader: DataSourceV2Reader)
    +  extends LeafExecNode with DataSourceReaderHolder with ColumnarBatchScan {
     
       override def canEqual(other: Any): Boolean = 
other.isInstanceOf[DataSourceV2ScanExec]
     
    -  override def references: AttributeSet = AttributeSet.empty
    +  override def producedAttributes: AttributeSet = AttributeSet(fullOutput)
     
    -  override lazy val metrics = Map(
    -    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"))
    +  private lazy val readTasks: java.util.List[ReadTask[UnsafeRow]] = reader 
match {
    +    case r: SupportsScanUnsafeRow => r.createUnsafeRowReadTasks()
    +    case _ =>
    +      reader.createReadTasks().asScala.map {
    +        new RowToUnsafeRowReadTask(_, reader.readSchema()): 
ReadTask[UnsafeRow]
    +      }.asJava
    +  }
     
    -  override protected def doExecute(): RDD[InternalRow] = {
    -    val readTasks: java.util.List[ReadTask[UnsafeRow]] = reader match {
    -      case r: SupportsScanUnsafeRow => r.createUnsafeRowReadTasks()
    -      case _ =>
    -        reader.createReadTasks().asScala.map {
    -          new RowToUnsafeRowReadTask(_, reader.readSchema()): 
ReadTask[UnsafeRow]
    -        }.asJava
    -    }
    +  private lazy val inputRDD: RDD[InternalRow] = reader match {
    +    case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
    +      assert(!reader.isInstanceOf[ContinuousReader],
    +        "continuous stream reader does not support columnar read yet.")
    +      new DataSourceRDD(sparkContext, 
r.createBatchReadTasks()).asInstanceOf[RDD[InternalRow]]
    +
    +    case _ =>
    --- End diff --
    
    we can combine the child case clause with the outer one, like:
    ```
    reader match {
        case r: SupportsScanColumnarBatch if r.enableBatchRead() => ......
        case _: ContinuousReader => ......
        case _ => ......
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to