This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e1f3f22 [SPARK-37221][SQL][FOLLOWUP] Add toRowBased to SparkPlan e1f3f22 is described below commit e1f3f22c3dabfea27880e02cbb5df6533c875795 Author: Liang-Chi Hsieh <vii...@gmail.com> AuthorDate: Tue Nov 9 19:11:44 2021 -0800 [SPARK-37221][SQL][FOLLOWUP] Add toRowBased to SparkPlan ### What changes were proposed in this pull request? This is a follow up of #34499. Instead of adding `ColumnarToRowExec` in `getByteArrayRdd`, this patch adds `toRowBased` API to explicitly ask for columnar-to-row-based conversion. ### Why are the changes needed? To make the conversion selectable. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes #34538 from viirya/columnar-followup. Authored-by: Liang-Chi Hsieh <vii...@gmail.com> Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com> --- .../scala/org/apache/spark/sql/execution/SparkPlan.scala | 12 ++++++------ .../org/apache/spark/sql/execution/SparkPlanSuite.scala | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index ea3b133..5c4266d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -313,6 +313,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } /** + * Converts the output of this plan to row-based if it is columnar plan. + */ + def toRowBased: SparkPlan = if (supportsColumnar) ColumnarToRowExec(this) else this + + /** * Packing the UnsafeRows into byte array for faster serialization. * The byte arrays are in the following format: * [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1] @@ -322,12 +327,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ */ private def getByteArrayRdd( n: Int = -1, takeFromEnd: Boolean = false): RDD[(Long, Array[Byte])] = { - val rdd = if (supportsColumnar) { - ColumnarToRowExec(this).execute() - } else { - execute() - } - rdd.mapPartitionsInternal { iter => + execute().mapPartitionsInternal { iter => var count = 0 val buffer = new Array[Byte](4 << 10) // 4K val codec = CompressionCodec.createCodec(SparkEnv.get.conf) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index c9bbee2..bc4dfcb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala @@ -116,12 +116,12 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession { } test("SPARK-37221: The collect-like API in SparkPlan should support columnar output") { - val emptyResults = ColumnarOp(LocalTableScanExec(Nil, Nil)).executeCollect() + val emptyResults = ColumnarOp(LocalTableScanExec(Nil, Nil)).toRowBased.executeCollect() assert(emptyResults.isEmpty) val relation = LocalTableScanExec( Seq(AttributeReference("val", IntegerType)()), Seq(InternalRow(1))) - val nonEmpty = ColumnarOp(relation).executeCollect() + val nonEmpty = ColumnarOp(relation).toRowBased.executeCollect() assert(nonEmpty === relation.executeCollect()) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org