This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new c0e36767d chore: Improve plan comet transformation log (#2564)
c0e36767d is described below
commit c0e36767d9580713366a126d990e495346f3f11e
Author: Zhen Wang <[email protected]>
AuthorDate: Mon Oct 13 23:06:44 2025 +0800
chore: Improve plan comet transformation log (#2564)
* chore: Make COMET_EXPLAIN_TRANSFORMATIONS behavior consistent
* refer to spark rule log
* fix
---
spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala | 8 ++++++--
spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala | 9 ++++++---
.../org/apache/comet/rules/EliminateRedundantTransitions.scala | 9 ++++++---
3 files changed, 18 insertions(+), 8 deletions(-)
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
index f572417bd..df5c82d02 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
@@ -27,6 +27,7 @@ import
org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial}
import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning,
RangePartitioning, RoundRobinPartitioning, SinglePartition}
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.sql.comet._
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle,
CometNativeShuffle, CometShuffleExchangeExec, CometShuffleManager}
import org.apache.spark.sql.execution._
@@ -613,8 +614,11 @@ case class CometExecRule(session: SparkSession) extends
Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
val newPlan = _apply(plan)
- if (showTransformations) {
- logInfo(s"\nINPUT: $plan\nOUTPUT: $newPlan")
+ if (showTransformations && !newPlan.fastEquals(plan)) {
+ logInfo(s"""
+ |=== Applying Rule $ruleName ===
+ |${sideBySide(plan.treeString, newPlan.treeString).mkString("\n")}
+ |""".stripMargin)
}
newPlan
}
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
index 950d0e9d3..1b38b8518 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
@@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression,
GenericInternalRow, PlanExpression}
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData,
GenericArrayData, MetadataColumnHelper}
+import org.apache.spark.sql.catalyst.util.{sideBySide, ArrayBasedMapData,
GenericArrayData, MetadataColumnHelper}
import
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues
import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
@@ -60,8 +60,11 @@ case class CometScanRule(session: SparkSession) extends
Rule[SparkPlan] with Com
override def apply(plan: SparkPlan): SparkPlan = {
val newPlan = _apply(plan)
- if (showTransformations) {
- logInfo(s"\nINPUT: $plan\nOUTPUT: $newPlan")
+ if (showTransformations && !newPlan.fastEquals(plan)) {
+ logInfo(s"""
+ |=== Applying Rule $ruleName ===
+ |${sideBySide(plan.treeString, newPlan.treeString).mkString("\n")}
+ |""".stripMargin)
}
newPlan
}
diff --git
a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
index a1a96d321..7c92b07bc 100644
---
a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
+++
b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
@@ -21,6 +21,7 @@ package org.apache.comet.rules
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.sql.comet.{CometCollectLimitExec,
CometColumnarToRowExec, CometPlan, CometSparkToColumnarExec}
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle,
CometShuffleExchangeExec}
import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec,
SparkPlan}
@@ -56,9 +57,11 @@ case class EliminateRedundantTransitions(session:
SparkSession) extends Rule[Spa
override def apply(plan: SparkPlan): SparkPlan = {
val newPlan = _apply(plan)
- if (showTransformations) {
- // scalastyle:off println
- System.err.println(s"EliminateRedundantTransitions:\nINPUT:
$plan\nOUTPUT: $newPlan")
+ if (showTransformations && !newPlan.fastEquals(plan)) {
+ logInfo(s"""
+ |=== Applying Rule $ruleName ===
+ |${sideBySide(plan.treeString, newPlan.treeString).mkString("\n")}
+ |""".stripMargin)
}
newPlan
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]