ulysses-you commented on a change in pull request #33140:
URL: https://github.com/apache/spark/pull/33140#discussion_r662763350



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -441,52 +455,60 @@ case class RowToColumnarExec(child: SparkPlan) extends 
RowToColumnarTransition {
   )
 
   override def doExecuteColumnar(): RDD[ColumnarBatch] = {
-    val enableOffHeapColumnVector = conf.offHeapColumnVectorEnabled
-    val numInputRows = longMetric("numInputRows")
-    val numOutputBatches = longMetric("numOutputBatches")
-    // Instead of creating a new config we are reusing columnBatchSize. In the 
future if we do
-    // combine with some of the Arrow conversion tools we will need to unify 
some of the configs.
-    val numRows = conf.columnBatchSize
-    // This avoids calling `schema` in the RDD closure, so that we don't need 
to include the entire
-    // plan (this) in the closure.
-    val localSchema = this.schema
-    child.execute().mapPartitionsInternal { rowIterator =>
-      if (rowIterator.hasNext) {
-        new Iterator[ColumnarBatch] {
-          private val converters = new RowToColumnConverter(localSchema)
-          private val vectors: Seq[WritableColumnVector] = if 
(enableOffHeapColumnVector) {
-            OffHeapColumnVector.allocateColumns(numRows, localSchema)
-          } else {
-            OnHeapColumnVector.allocateColumns(numRows, localSchema)
-          }
-          private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray)
-
-          TaskContext.get().addTaskCompletionListener[Unit] { _ =>
-            cb.close()
-          }
-
-          override def hasNext: Boolean = {
-            rowIterator.hasNext
-          }
-
-          override def next(): ColumnarBatch = {
-            cb.setNumRows(0)
-            vectors.foreach(_.reset())
-            var rowCount = 0
-            while (rowCount < numRows && rowIterator.hasNext) {
-              val row = rowIterator.next()
-              converters.convert(row, vectors.toArray)
-              rowCount += 1
+    child match {
+      case a: AdaptiveSparkPlanExec if a.finalPlanSupportsColumnar() =>

Review comment:
       `&& !a.isSubquery` ? If you only care about writing then I think you 
don't need do this in subquery ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to