This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 383ae919 chore: improve fallback message when comet native shuffle is
not enabled (#445)
383ae919 is described below
commit 383ae9197422da5948a31caf21722569ef2067fe
Author: Andy Grove <[email protected]>
AuthorDate: Mon May 20 08:31:07 2024 -0600
chore: improve fallback message when comet native shuffle is not enabled
(#445)
* improve fallback message when comet native shuffle is not enabled
* update test
---
.../apache/comet/CometSparkSessionExtensions.scala | 32 +++++++++++++++++-----
.../org/apache/comet/CometExpressionSuite.scala | 6 ++--
.../scala/org/apache/spark/sql/CometTestBase.scala | 5 +++-
3 files changed, 33 insertions(+), 10 deletions(-)
diff --git
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 7c269c41..85a19f55 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -30,8 +30,7 @@ import
org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.comet._
-import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle,
CometNativeShuffle}
-import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
+import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle,
CometNativeShuffle, CometShuffleExchangeExec, CometShuffleManager}
import org.apache.spark.sql.comet.util.Utils
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec,
BroadcastQueryStageExec, ShuffleQueryStageExec}
@@ -46,7 +45,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.comet.CometConf._
-import org.apache.comet.CometSparkSessionExtensions.{createMessage,
isANSIEnabled, isCometBroadCastForceEnabled, isCometColumnarShuffleEnabled,
isCometEnabled, isCometExecEnabled, isCometOperatorEnabled, isCometScan,
isCometScanEnabled, isCometShuffleEnabled, isSchemaSupported, isSpark34Plus,
shouldApplyRowToColumnar, withInfo, withInfos}
+import org.apache.comet.CometSparkSessionExtensions.{createMessage,
getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled,
isCometColumnarShuffleEnabled, isCometEnabled, isCometExecEnabled,
isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled,
isSchemaSupported, isSpark34Plus, shouldApplyRowToColumnar, withInfo, withInfos}
import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde
@@ -684,7 +683,8 @@ class CometSparkSessionExtensions
case s: ShuffleExchangeExec =>
val isShuffleEnabled = isCometShuffleEnabled(conf)
- val msg1 = createMessage(!isShuffleEnabled, "Native shuffle is not
enabled")
+ val reason = getCometShuffleNotEnabledReason(conf).getOrElse("no
reason available")
+ val msg1 = createMessage(!isShuffleEnabled, s"Native shuffle is not
enabled: $reason")
val columnarShuffleEnabled = isCometColumnarShuffleEnabled(conf)
val msg2 = createMessage(
isShuffleEnabled && !columnarShuffleEnabled && !QueryPlanSerde
@@ -933,13 +933,31 @@ object CometSparkSessionExtensions extends Logging {
}
private[comet] def isCometShuffleEnabled(conf: SQLConf): Boolean =
- COMET_EXEC_SHUFFLE_ENABLED.get(conf) &&
- (conf.contains("spark.shuffle.manager") &&
conf.getConfString("spark.shuffle.manager") ==
- "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") &&
+ COMET_EXEC_SHUFFLE_ENABLED.get(conf) && isCometShuffleManagerEnabled(conf)
&&
// TODO: AQE coalesce partitions feature causes Comet shuffle memory
leak.
// We should disable Comet shuffle when AQE coalesce partitions is
enabled.
(!conf.coalesceShufflePartitionsEnabled ||
COMET_SHUFFLE_ENFORCE_MODE_ENABLED.get())
+ private[comet] def getCometShuffleNotEnabledReason(conf: SQLConf):
Option[String] = {
+ if (!COMET_EXEC_SHUFFLE_ENABLED.get(conf)) {
+ Some(s"${COMET_EXEC_SHUFFLE_ENABLED.key} is not enabled")
+ } else if (!isCometShuffleManagerEnabled(conf)) {
+ Some(s"spark.shuffle.manager is not set to
${CometShuffleManager.getClass.getName}")
+ } else if (conf.coalesceShufflePartitionsEnabled &&
!COMET_SHUFFLE_ENFORCE_MODE_ENABLED
+ .get()) {
+ Some(
+ s"${SQLConf.COALESCE_PARTITIONS_ENABLED.key} is enabled and " +
+ s"${COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key} is not enabled")
+ } else {
+ None
+ }
+ }
+
+ private def isCometShuffleManagerEnabled(conf: SQLConf) = {
+ conf.contains("spark.shuffle.manager") &&
conf.getConfString("spark.shuffle.manager") ==
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
+ }
+
private[comet] def isCometScanEnabled(conf: SQLConf): Boolean = {
COMET_SCAN_ENABLED.get(conf)
}
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index f3fd50e9..98a2bad0 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -1420,14 +1420,16 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
"extractintervalmonths is not supported")),
(
s"SELECT sum(c0), sum(c2) from $table group by c1",
- Set("Native shuffle is not enabled", "AQEShuffleRead is not
supported")),
+ Set(
+ "Native shuffle is not enabled: spark.comet.exec.shuffle.enabled
is not enabled",
+ "AQEShuffleRead is not supported")),
(
"SELECT A.c1, A.sum_c0, A.sum_c2, B.casted from "
+ s"(SELECT c1, sum(c0) as sum_c0, sum(c2) as sum_c2 from $table
group by c1) as A, "
+ s"(SELECT c1, cast(make_interval(c0, c1, c0, c1, c0, c0, c2)
as string) as casted from $table) as B "
+ "where A.c1 = B.c1 ",
Set(
- "Native shuffle is not enabled",
+ "Native shuffle is not enabled: spark.comet.exec.shuffle.enabled
is not enabled",
"AQEShuffleRead is not supported",
"make_interval is not supported",
"BroadcastExchange is not supported",
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index 112d35b1..0530d764 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -261,7 +261,10 @@ abstract class CometTestBase
}
val extendedInfo =
new
ExtendedExplainInfo().generateExtendedInfo(dfComet.queryExecution.executedPlan)
-
assert(extendedInfo.equalsIgnoreCase(expectedInfo.toSeq.sorted.mkString("\n")))
+ val expectedStr = expectedInfo.toSeq.sorted.mkString("\n")
+ if (!extendedInfo.equalsIgnoreCase(expectedStr)) {
+ fail(s"$extendedInfo != $expectedStr (case-insensitive comparison)")
+ }
}
private var _spark: SparkSession = _
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]