ulysses-you commented on a change in pull request #33140: URL: https://github.com/apache/spark/pull/33140#discussion_r662763350
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala ########## @@ -441,52 +455,60 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition { ) override def doExecuteColumnar(): RDD[ColumnarBatch] = { - val enableOffHeapColumnVector = conf.offHeapColumnVectorEnabled - val numInputRows = longMetric("numInputRows") - val numOutputBatches = longMetric("numOutputBatches") - // Instead of creating a new config we are reusing columnBatchSize. In the future if we do - // combine with some of the Arrow conversion tools we will need to unify some of the configs. - val numRows = conf.columnBatchSize - // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire - // plan (this) in the closure. - val localSchema = this.schema - child.execute().mapPartitionsInternal { rowIterator => - if (rowIterator.hasNext) { - new Iterator[ColumnarBatch] { - private val converters = new RowToColumnConverter(localSchema) - private val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) { - OffHeapColumnVector.allocateColumns(numRows, localSchema) - } else { - OnHeapColumnVector.allocateColumns(numRows, localSchema) - } - private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray) - - TaskContext.get().addTaskCompletionListener[Unit] { _ => - cb.close() - } - - override def hasNext: Boolean = { - rowIterator.hasNext - } - - override def next(): ColumnarBatch = { - cb.setNumRows(0) - vectors.foreach(_.reset()) - var rowCount = 0 - while (rowCount < numRows && rowIterator.hasNext) { - val row = rowIterator.next() - converters.convert(row, vectors.toArray) - rowCount += 1 + child match { + case a: AdaptiveSparkPlanExec if a.finalPlanSupportsColumnar() => Review comment: `&& !a.isSubquery` ? If you only care about writing then I think you don't need do this in subquery ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org