This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 4f7187885a6 [SPARK-44287][SQL][FOLLOWUP] Set partition index correctly 4f7187885a6 is described below commit 4f7187885a6c6a0c944af97a493a42dabca3cc1b Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Fri Jul 28 11:36:59 2023 +0800 [SPARK-44287][SQL][FOLLOWUP] Set partition index correctly ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/41839, to set the partition index correctly even if it's not used for now. It also contains a few code cleanup. ### Why are the changes needed? future-proof ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes #42185 from cloud-fan/follow. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit bf1bbc514ebf25fd8041f566dc9d13593c307931) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../org/apache/spark/sql/execution/Columnar.scala | 41 ++++++-------- .../spark/sql/SparkSessionExtensionSuite.scala | 64 ++++++++++------------ .../spark/sql/execution/SparkPlanSuite.scala | 26 ++++----- 3 files changed, 57 insertions(+), 74 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala index a2029816c23..fc879f7e98f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala @@ -85,20 +85,16 @@ case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition w ) override def doExecute(): RDD[InternalRow] = { - val numOutputRows = longMetric("numOutputRows") - val numInputBatches = longMetric("numInputBatches") - val evaluatorFactory = - new ColumnarToRowEvaluatorFactory( - child.output, - numOutputRows, - numInputBatches) - + val evaluatorFactory = new ColumnarToRowEvaluatorFactory( + child.output, + longMetric("numOutputRows"), + longMetric("numInputBatches")) if (conf.usePartitionEvaluator) { child.executeColumnar().mapPartitionsWithEvaluator(evaluatorFactory) } else { - child.executeColumnar().mapPartitionsInternal { batches => + child.executeColumnar().mapPartitionsWithIndexInternal { (index, batches) => val evaluator = evaluatorFactory.createEvaluator() - evaluator.eval(0, batches) + evaluator.eval(index, batches) } } } @@ -454,25 +450,20 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition { ) override def doExecuteColumnar(): RDD[ColumnarBatch] = { - val numInputRows = longMetric("numInputRows") - val numOutputBatches = longMetric("numOutputBatches") - // Instead of creating a new config we are reusing columnBatchSize. In the future if we do - // combine with some of the Arrow conversion tools we will need to unify some of the configs. - val numRows = conf.columnBatchSize - val evaluatorFactory = - new RowToColumnarEvaluatorFactory( - conf.offHeapColumnVectorEnabled, - numRows, - schema, - numInputRows, - numOutputBatches) - + val evaluatorFactory = new RowToColumnarEvaluatorFactory( + conf.offHeapColumnVectorEnabled, + // Instead of creating a new config we are reusing columnBatchSize. In the future if we do + // combine with some of the Arrow conversion tools we will need to unify some of the configs. + conf.columnBatchSize, + schema, + longMetric("numInputRows"), + longMetric("numOutputBatches")) if (conf.usePartitionEvaluator) { child.execute().mapPartitionsWithEvaluator(evaluatorFactory) } else { - child.execute().mapPartitionsInternal { rowIterator => + child.execute().mapPartitionsWithIndexInternal { (index, rowIterator) => val evaluator = evaluatorFactory.createEvaluator() - evaluator.eval(0, rowIterator) + evaluator.eval(index, rowIterator) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 043a3b1a7e5..d4a871c00a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala @@ -279,40 +279,36 @@ class SparkSessionExtensionSuite extends SparkFunSuite with SQLHelper { } withSession(extensions) { session => session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, enableAQE) - Seq(true, false).foreach { enableEvaluator => - withSQLConf(SQLConf.USE_PARTITION_EVALUATOR.key -> enableEvaluator.toString) { - assert(session.sessionState.columnarRules.contains( - MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))) - import session.sqlContext.implicits._ - // perform a join to inject a broadcast exchange - val left = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("l1", "l2") - val right = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("r1", "r2") - val data = left.join(right, $"l1" === $"r1") - // repartitioning avoids having the add operation pushed up into the LocalTableScan - .repartition(1) - val df = data.selectExpr("l2 + r2") - // execute the plan so that the final adaptive plan is available when AQE is on - df.collect() - val found = collectPlanSteps(df.queryExecution.executedPlan).sum - // 1 MyBroadcastExchangeExec - // 1 MyShuffleExchangeExec - // 1 ColumnarToRowExec - // 2 ColumnarProjectExec - // 1 ReplacedRowToColumnarExec - // so 11121 is expected. - assert(found == 11121) - - // Verify that we get back the expected, wrong, result - val result = df.collect() - assert(result(0).getLong(0) == 101L) // Check that broken columnar Add was used. - assert(result(1).getLong(0) == 201L) - assert(result(2).getLong(0) == 301L) - - withTempPath { path => - val e = intercept[Exception](df.write.parquet(path.getCanonicalPath)) - assert(e.getMessage == "columnar write") - } - } + assert(session.sessionState.columnarRules.contains( + MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))) + import session.sqlContext.implicits._ + // perform a join to inject a broadcast exchange + val left = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("l1", "l2") + val right = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("r1", "r2") + val data = left.join(right, $"l1" === $"r1") + // repartitioning avoids having the add operation pushed up into the LocalTableScan + .repartition(1) + val df = data.selectExpr("l2 + r2") + // execute the plan so that the final adaptive plan is available when AQE is on + df.collect() + val found = collectPlanSteps(df.queryExecution.executedPlan).sum + // 1 MyBroadcastExchangeExec + // 1 MyShuffleExchangeExec + // 1 ColumnarToRowExec + // 2 ColumnarProjectExec + // 1 ReplacedRowToColumnarExec + // so 11121 is expected. + assert(found == 11121) + + // Verify that we get back the expected, wrong, result + val result = df.collect() + assert(result(0).getLong(0) == 101L) // Check that broken columnar Add was used. + assert(result(1).getLong(0) == 201L) + assert(result(2).getLong(0) == 301L) + + withTempPath { path => + val e = intercept[Exception](df.write.parquet(path.getCanonicalPath)) + assert(e.getMessage == "columnar write") } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index b97837fb973..b14f4a405f6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala @@ -127,22 +127,18 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession { test("SPARK-37779: ColumnarToRowExec should be canonicalizable after being (de)serialized") { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { - Seq(true, false).foreach { enable => - withSQLConf(SQLConf.USE_PARTITION_EVALUATOR.key -> enable.toString) { - withTempPath { path => - spark.range(1).write.parquet(path.getAbsolutePath) - val df = spark.read.parquet(path.getAbsolutePath) - val columnarToRowExec = - df.queryExecution.executedPlan.collectFirst { case p: ColumnarToRowExec => p }.get - try { - spark.range(1).foreach { _ => - columnarToRowExec.canonicalized - () - } - } catch { - case e: Throwable => fail("ColumnarToRowExec was not canonicalizable", e) - } + withTempPath { path => + spark.range(1).write.parquet(path.getAbsolutePath) + val df = spark.read.parquet(path.getAbsolutePath) + val columnarToRowExec = + df.queryExecution.executedPlan.collectFirst { case p: ColumnarToRowExec => p }.get + try { + spark.range(1).foreach { _ => + columnarToRowExec.canonicalized + () } + } catch { + case e: Throwable => fail("ColumnarToRowExec was not canonicalizable", e) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org