This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 0398b5b [SPARK-37371][SQL] UnionExec should support columnar if all children support columnar 0398b5b is described below commit 0398b5b5ebc48cf46d18c090443c5e54b16d5b35 Author: Liang-Chi Hsieh <vii...@gmail.com> AuthorDate: Thu Nov 18 19:15:24 2021 -0800 [SPARK-37371][SQL] UnionExec should support columnar if all children support columnar ### What changes were proposed in this pull request? This patch proposes to make `UnionExec` support columnar if all its children support that. ### Why are the changes needed? For `UnionExec`, if all its children support columnar, the union physical plan should be able to support columnar too as it is just union of all partitions from the children. By doing this, we can avoid unnecessary transition between row and columnar batch. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes #34645 from viirya/SPARK-37371. Authored-by: Liang-Chi Hsieh <vii...@gmail.com> Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com> --- .../sql/execution/basicPhysicalOperators.scala | 7 ++++ .../spark/sql/DataFrameSetOperationsSuite.scala | 37 ++++++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 8e0080a..763abbe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.types.{LongType, StructType} +import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.ThreadUtils import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler} @@ -697,6 +698,12 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan { protected override def doExecute(): RDD[InternalRow] = sparkContext.union(children.map(_.execute())) + override def supportsColumnar: Boolean = children.forall(_.supportsColumnar) + + protected override def doExecuteColumnar(): RDD[ColumnarBatch] = { + sparkContext.union(children.map(_.executeColumnar())) + } + override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): UnionExec = copy(children = newChildren) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala index 650d878..26df517 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala @@ -21,6 +21,8 @@ import java.sql.{Date, Timestamp} import org.apache.spark.sql.catalyst.optimizer.RemoveNoopUnion import org.apache.spark.sql.catalyst.plans.logical.Union +import org.apache.spark.sql.execution.{SparkPlan, UnionExec} +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSparkSession, SQLTestData} @@ -1355,6 +1357,41 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession { Row(Row(Seq(Seq(Row("bb", null))))) :: Row(Row(Seq(Seq(Row(null, "ba"))))) :: Nil) } + + test("SPARK-37371: UnionExec should support columnar if all children support columnar") { + def checkIfColumnar( + plan: SparkPlan, + targetPlan: (SparkPlan) => Boolean, + isColumnar: Boolean): Unit = { + val target = plan.collect { + case p if targetPlan(p) => p + } + assert(target.nonEmpty) + assert(target.forall(_.supportsColumnar == isColumnar)) + } + + Seq(true, false).foreach { supported => + withSQLConf(SQLConf.CACHE_VECTORIZED_READER_ENABLED.key -> supported.toString) { + val df1 = Seq(1, 2, 3).toDF("i").cache() + val df2 = Seq(4, 5, 6).toDF("j").cache() + + checkIfColumnar(df1.queryExecution.executedPlan, + _.isInstanceOf[InMemoryTableScanExec], supported) + checkIfColumnar(df2.queryExecution.executedPlan, + _.isInstanceOf[InMemoryTableScanExec], supported) + + val union = df1.union(df2) + checkIfColumnar(union.queryExecution.executedPlan, _.isInstanceOf[UnionExec], supported) + checkAnswer(union, Row(1) :: Row(2) :: Row(3) :: Row(4) :: Row(5) :: Row(6) :: Nil) + + val nonColumnarUnion = df1.union(Seq(7, 8, 9).toDF("k")) + checkIfColumnar(nonColumnarUnion.queryExecution.executedPlan, + _.isInstanceOf[UnionExec], false) + checkAnswer(nonColumnarUnion, + Row(1) :: Row(2) :: Row(3) :: Row(7) :: Row(8) :: Row(9) :: Nil) + } + } + } } case class UnionClass1a(a: Int, b: Long, nested: UnionClass2) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org