This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch branch-0.10
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/branch-0.10 by this push:
new 63fd9acce fix: [branch-0.10] Avoid spark plan execution cache
preventing CometBatchRDD numPartitions change (#2420) (#2503)
63fd9acce is described below
commit 63fd9acceac938f976c3ba5e4a8ec1c911c21e2f
Author: Andy Grove <[email protected]>
AuthorDate: Tue Sep 30 09:57:37 2025 -0600
fix: [branch-0.10] Avoid spark plan execution cache preventing
CometBatchRDD numPartitions change (#2420) (#2503)
* fix merge conflicts
* trigger build
---------
Co-authored-by: Zhen Wang <[email protected]>
---
.../sql/comet/CometBroadcastExchangeExec.scala | 24 ++++++++++++++--------
.../org/apache/spark/sql/comet/operators.scala | 8 ++++----
2 files changed, 19 insertions(+), 13 deletions(-)
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
index 21b395982..95770592f 100644
---
a/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
@@ -26,7 +26,7 @@ import scala.concurrent.{ExecutionContext, Promise}
import scala.concurrent.duration.NANOSECONDS
import scala.util.control.NonFatal
-import org.apache.spark.{broadcast, Partition, SparkContext, TaskContext}
+import org.apache.spark.{broadcast, Partition, SparkContext, SparkException,
TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -102,14 +102,8 @@ case class CometBroadcastExchangeExec(
@transient
private lazy val maxBroadcastRows = 512000000
- private var numPartitions: Option[Int] = None
-
- def setNumPartitions(numPartitions: Int): CometBroadcastExchangeExec = {
- this.numPartitions = Some(numPartitions)
- this
- }
def getNumPartitions(): Int = {
- numPartitions.getOrElse(child.executeColumnar().getNumPartitions)
+ child.executeColumnar().getNumPartitions
}
@transient
@@ -224,6 +218,18 @@ case class CometBroadcastExchangeExec(
new CometBatchRDD(sparkContext, getNumPartitions(), broadcasted)
}
+ // After https://issues.apache.org/jira/browse/SPARK-48195, Spark plan will
cache created RDD.
+ // Since we may change the number of partitions in CometBatchRDD,
+ // we need a method that always creates a new CometBatchRDD.
+ def executeColumnar(numPartitions: Int): RDD[ColumnarBatch] = {
+ if (isCanonicalizedPlan) {
+ throw SparkException.internalError("A canonicalized plan is not supposed
to be executed.")
+ }
+
+ val broadcasted = executeBroadcast[Array[ChunkedByteBuffer]]()
+ new CometBatchRDD(sparkContext, numPartitions, broadcasted)
+ }
+
override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T]
= {
try {
relationFuture.get(timeout,
TimeUnit.SECONDS).asInstanceOf[broadcast.Broadcast[T]]
@@ -276,7 +282,7 @@ object CometBroadcastExchangeExec {
*/
class CometBatchRDD(
sc: SparkContext,
- numPartitions: Int,
+ val numPartitions: Int,
value: broadcast.Broadcast[Array[ChunkedByteBuffer]])
extends RDD[ColumnarBatch](sc, Nil) {
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index 593f4f3a4..40da23a18 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -272,16 +272,16 @@ abstract class CometNativeExec extends CometExec {
sparkPlans.zipWithIndex.foreach { case (plan, idx) =>
plan match {
case c: CometBroadcastExchangeExec =>
- inputs +=
c.setNumPartitions(firstNonBroadcastPlanNumPartitions).executeColumnar()
+ inputs += c.executeColumnar(firstNonBroadcastPlanNumPartitions)
case BroadcastQueryStageExec(_, c: CometBroadcastExchangeExec, _)
=>
- inputs +=
c.setNumPartitions(firstNonBroadcastPlanNumPartitions).executeColumnar()
+ inputs += c.executeColumnar(firstNonBroadcastPlanNumPartitions)
case ReusedExchangeExec(_, c: CometBroadcastExchangeExec) =>
- inputs +=
c.setNumPartitions(firstNonBroadcastPlanNumPartitions).executeColumnar()
+ inputs += c.executeColumnar(firstNonBroadcastPlanNumPartitions)
case BroadcastQueryStageExec(
_,
ReusedExchangeExec(_, c: CometBroadcastExchangeExec),
_) =>
- inputs +=
c.setNumPartitions(firstNonBroadcastPlanNumPartitions).executeColumnar()
+ inputs += c.executeColumnar(firstNonBroadcastPlanNumPartitions)
case _: CometNativeExec =>
// no-op
case _ if idx == firstNonBroadcastPlan.get._2 =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]