This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 507e475b feat: Add COMET_SHUFFLE_MODE config to control Comet shuffle
mode (#460)
507e475b is described below
commit 507e475bd06fbb9cc738d61473a8341caa6daf74
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Thu May 23 14:07:43 2024 -0700
feat: Add COMET_SHUFFLE_MODE config to control Comet shuffle mode (#460)
---
.../main/scala/org/apache/comet/CometConf.scala | 20 +++---
docs/source/user-guide/configs.md | 2 +-
docs/source/user-guide/tuning.md | 22 ++++---
.../apache/comet/CometSparkSessionExtensions.scala | 36 +++++++----
.../org/apache/comet/CometExpressionSuite.scala | 4 +-
.../apache/comet/exec/CometAggregateSuite.scala | 75 ++++++++++------------
.../comet/exec/CometColumnarShuffleSuite.scala | 10 +--
.../org/apache/comet/exec/CometExecSuite.scala | 23 +++----
.../comet/exec/CometNativeShuffleSuite.scala | 2 +-
.../org/apache/spark/sql/CometTPCHQuerySuite.scala | 2 +-
.../spark/sql/benchmark/CometExecBenchmark.scala | 2 +-
.../sql/benchmark/CometShuffleBenchmark.scala | 19 +++---
.../org/apache/comet/exec/CometExec3_4Suite.scala | 4 +-
13 files changed, 119 insertions(+), 102 deletions(-)
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 463de90c..5aee02f1 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -19,6 +19,7 @@
package org.apache.comet
+import java.util.Locale
import java.util.concurrent.TimeUnit
import scala.collection.mutable.ListBuffer
@@ -131,14 +132,17 @@ object CometConf {
.booleanConf
.createWithDefault(false)
- val COMET_COLUMNAR_SHUFFLE_ENABLED: ConfigEntry[Boolean] =
- conf("spark.comet.columnar.shuffle.enabled")
- .doc(
- "Whether to enable Arrow-based columnar shuffle for Comet and Spark
regular operators. " +
- "If this is enabled, Comet prefers columnar shuffle than native
shuffle. " +
- "By default, this config is true.")
- .booleanConf
- .createWithDefault(true)
+ val COMET_SHUFFLE_MODE: ConfigEntry[String] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.mode")
+ .doc("The mode of Comet shuffle. This config is only effective if Comet
shuffle " +
+ "is enabled. Available modes are 'native', 'jvm', and 'auto'. " +
+ "'native' is for native shuffle which has best performance in general. "
+
+ "'jvm' is for jvm-based columnar shuffle which has higher coverage than
native shuffle. " +
+ "'auto' is for Comet to choose the best shuffle mode based on the query
plan. " +
+ "By default, this config is 'jvm'.")
+ .stringConf
+ .transform(_.toLowerCase(Locale.ROOT))
+ .checkValues(Set("native", "jvm", "auto"))
+ .createWithDefault("jvm")
val COMET_SHUFFLE_ENFORCE_MODE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.shuffle.enforceMode.enabled")
diff --git a/docs/source/user-guide/configs.md
b/docs/source/user-guide/configs.md
index 0204b0c5..eb349b34 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -29,7 +29,6 @@ Comet provides the following configuration settings.
| spark.comet.columnar.shuffle.async.enabled | Whether to enable asynchronous
shuffle for Arrow-based shuffle. By default, this config is false. | false |
| spark.comet.columnar.shuffle.async.max.thread.num | Maximum number of
threads on an executor used for Comet async columnar shuffle. By default, this
config is 100. This is the upper bound of total number of shuffle threads per
executor. In other words, if the number of cores * the number of shuffle
threads per task `spark.comet.columnar.shuffle.async.thread.num` is larger than
this config. Comet will use this config as the number of shuffle threads per
executor instead. | 100 |
| spark.comet.columnar.shuffle.async.thread.num | Number of threads used for
Comet async columnar shuffle per shuffle task. By default, this config is 3.
Note that more threads means more memory requirement to buffer shuffle data
before flushing to disk. Also, more threads may not always improve performance,
and should be set based on the number of cores available. | 3 |
-| spark.comet.columnar.shuffle.enabled | Whether to enable Arrow-based
columnar shuffle for Comet and Spark regular operators. If this is enabled,
Comet prefers columnar shuffle than native shuffle. By default, this config is
true. | true |
| spark.comet.columnar.shuffle.memory.factor | Fraction of Comet memory to be
allocated per executor process for Comet shuffle. Comet memory size is
specified by `spark.comet.memoryOverhead` or calculated by
`spark.comet.memory.overhead.factor` * `spark.executor.memory`. By default,
this config is 1.0. | 1.0 |
| spark.comet.debug.enabled | Whether to enable debug mode for Comet. By
default, this config is false. When enabled, Comet will do additional checks
for debugging purpose. For example, validating array when importing arrays from
JVM at native side. Note that these checks may be expensive in performance and
should only be enabled for debugging purpose. | false |
| spark.comet.enabled | Whether to enable Comet extension for Spark. When this
is turned on, Spark will use Comet to read Parquet data source. Note that to
enable native vectorized execution, both this config and
'spark.comet.exec.enabled' need to be enabled. By default, this config is the
value of the env var `ENABLE_COMET` if set, or true otherwise. | true |
@@ -39,6 +38,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory
overhead that the native memory manager can use for execution. The purpose of
this config is to set aside memory for untracked data structures, as well as
imprecise size estimation during memory acquisition. Default value is 0.7. |
0.7 |
| spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to
compress shuffle data. Only zstd is supported. | zstd |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle.
By default, this config is false. Note that this requires setting
'spark.shuffle.manager' to
'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'.
'spark.shuffle.manager' must be set before starting the Spark application and
cannot be changed during the application. | false |
+| spark.comet.exec.shuffle.mode | The mode of Comet shuffle. This config is
only effective if Comet shuffle is enabled. Available modes are 'native',
'jvm', and 'auto'. 'native' is for native shuffle which has best performance in
general. 'jvm' is for jvm-based columnar shuffle which has higher coverage than
native shuffle. 'auto' is for Comet to choose the best shuffle mode based on
the query plan. By default, this config is 'jvm'. | jvm |
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet
will provide logging explaining the reason(s) why a query stage cannot be
executed natively. | false |
| spark.comet.memory.overhead.factor | Fraction of executor memory to be
allocated as additional non-heap memory per executor process for Comet. Default
value is 0.2. | 0.2 |
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be
allocated per executor process for Comet, in MiB. | 402653184b |
diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md
index 01fa7bdb..5a3100bd 100644
--- a/docs/source/user-guide/tuning.md
+++ b/docs/source/user-guide/tuning.md
@@ -39,22 +39,26 @@ It must be set before the Spark context is created. You can
enable or disable Co
at runtime by setting `spark.comet.exec.shuffle.enabled` to `true` or `false`.
Once it is disabled, Comet will fallback to the default Spark shuffle manager.
-### Columnar Shuffle
+### Shuffle Mode
-By default, once `spark.comet.exec.shuffle.enabled` is enabled, Comet uses
columnar shuffle
+Comet provides three shuffle modes: Columnar Shuffle, Native Shuffle and Auto
Mode.
+
+#### Columnar Shuffle
+
+By default, once `spark.comet.exec.shuffle.enabled` is enabled, Comet uses
JVM-based columnar shuffle
to improve the performance of shuffle operations. Columnar shuffle supports
HashPartitioning,
-RoundRobinPartitioning, RangePartitioning and SinglePartitioning.
+RoundRobinPartitioning, RangePartitioning and SinglePartitioning. This mode
has the highest
+query coverage.
-Columnar shuffle can be disabled by setting
`spark.comet.columnar.shuffle.enabled` to `false`.
+Columnar shuffle can be enabled by setting `spark.comet.exec.shuffle.mode` to
`jvm`.
-### Native Shuffle
+#### Native Shuffle
Comet also provides a fully native shuffle implementation that can be used to
improve the performance.
-To enable native shuffle, just disable `spark.comet.columnar.shuffle.enabled`.
+To enable native shuffle, just set `spark.comet.exec.shuffle.mode` to `native`
Native shuffle only supports HashPartitioning and SinglePartitioning.
+### Auto Mode
-
-
-
+`spark.comet.exec.shuffle.mode` to `auto` will let Comet choose the best
shuffle mode based on the query plan.
diff --git
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 85a19f55..168d2bb5 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -45,7 +45,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.comet.CometConf._
-import org.apache.comet.CometSparkSessionExtensions.{createMessage,
getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled,
isCometColumnarShuffleEnabled, isCometEnabled, isCometExecEnabled,
isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled,
isSchemaSupported, isSpark34Plus, shouldApplyRowToColumnar, withInfo, withInfos}
+import org.apache.comet.CometSparkSessionExtensions.{createMessage,
getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled,
isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode,
isCometNativeShuffleMode, isCometOperatorEnabled, isCometScan,
isCometScanEnabled, isCometShuffleEnabled, isSchemaSupported, isSpark34Plus,
shouldApplyRowToColumnar, withInfo, withInfos}
import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde
@@ -197,7 +197,7 @@ class CometSparkSessionExtensions
private def applyCometShuffle(plan: SparkPlan): SparkPlan = {
plan.transformUp {
case s: ShuffleExchangeExec
- if isCometPlan(s.child) && !isCometColumnarShuffleEnabled(conf) &&
+ if isCometPlan(s.child) && isCometNativeShuffleMode(conf) &&
QueryPlanSerde.supportPartitioning(s.child.output,
s.outputPartitioning)._1 =>
logInfo("Comet extension enabled for Native Shuffle")
@@ -209,8 +209,8 @@ class CometSparkSessionExtensions
// Columnar shuffle for regular Spark operators (not Comet) and Comet
operators
// (if configured)
case s: ShuffleExchangeExec
- if (!s.child.supportsColumnar || isCometPlan(
- s.child)) && isCometColumnarShuffleEnabled(conf) &&
+ if (!s.child.supportsColumnar || isCometPlan(s.child)) &&
isCometJVMShuffleMode(
+ conf) &&
QueryPlanSerde.supportPartitioningTypes(s.child.output)._1 &&
!isShuffleOperator(s.child) =>
logInfo("Comet extension enabled for JVM Columnar Shuffle")
@@ -641,7 +641,7 @@ class CometSparkSessionExtensions
// Native shuffle for Comet operators
case s: ShuffleExchangeExec
if isCometShuffleEnabled(conf) &&
- !isCometColumnarShuffleEnabled(conf) &&
+ isCometNativeShuffleMode(conf) &&
QueryPlanSerde.supportPartitioning(s.child.output,
s.outputPartitioning)._1 =>
logInfo("Comet extension enabled for Native Shuffle")
@@ -662,7 +662,7 @@ class CometSparkSessionExtensions
// If the child of ShuffleExchangeExec is also a ShuffleExchangeExec,
we should not
// convert it to CometColumnarShuffle,
case s: ShuffleExchangeExec
- if isCometShuffleEnabled(conf) &&
isCometColumnarShuffleEnabled(conf) &&
+ if isCometShuffleEnabled(conf) && isCometJVMShuffleMode(conf) &&
QueryPlanSerde.supportPartitioningTypes(s.child.output)._1 &&
!isShuffleOperator(s.child) =>
logInfo("Comet extension enabled for JVM Columnar Shuffle")
@@ -684,19 +684,19 @@ class CometSparkSessionExtensions
case s: ShuffleExchangeExec =>
val isShuffleEnabled = isCometShuffleEnabled(conf)
val reason = getCometShuffleNotEnabledReason(conf).getOrElse("no
reason available")
- val msg1 = createMessage(!isShuffleEnabled, s"Native shuffle is not
enabled: $reason")
- val columnarShuffleEnabled = isCometColumnarShuffleEnabled(conf)
+ val msg1 = createMessage(!isShuffleEnabled, s"Comet shuffle is not
enabled: $reason")
+ val columnarShuffleEnabled = isCometJVMShuffleMode(conf)
val msg2 = createMessage(
isShuffleEnabled && !columnarShuffleEnabled && !QueryPlanSerde
.supportPartitioning(s.child.output, s.outputPartitioning)
._1,
- "Shuffle: " +
+ "Native shuffle: " +
s"${QueryPlanSerde.supportPartitioning(s.child.output,
s.outputPartitioning)._2}")
val msg3 = createMessage(
isShuffleEnabled && columnarShuffleEnabled && !QueryPlanSerde
.supportPartitioningTypes(s.child.output)
._1,
- s"Columnar shuffle:
${QueryPlanSerde.supportPartitioningTypes(s.child.output)._2}")
+ s"JVM shuffle:
${QueryPlanSerde.supportPartitioningTypes(s.child.output)._2}")
withInfo(s, Seq(msg1, msg2, msg3).flatten.mkString(","))
s
@@ -966,8 +966,20 @@ object CometSparkSessionExtensions extends Logging {
COMET_EXEC_ENABLED.get(conf)
}
- private[comet] def isCometColumnarShuffleEnabled(conf: SQLConf): Boolean = {
- COMET_COLUMNAR_SHUFFLE_ENABLED.get(conf)
+ private[comet] def isCometNativeShuffleMode(conf: SQLConf): Boolean = {
+ COMET_SHUFFLE_MODE.get(conf) match {
+ case "native" => true
+ case "auto" => true
+ case _ => false
+ }
+ }
+
+ private[comet] def isCometJVMShuffleMode(conf: SQLConf): Boolean = {
+ COMET_SHUFFLE_MODE.get(conf) match {
+ case "jvm" => true
+ case "auto" => true
+ case _ => false
+ }
}
private[comet] def isCometAllOperatorEnabled(conf: SQLConf): Boolean = {
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 98a2bad0..6ca4baf6 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -1421,7 +1421,7 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
(
s"SELECT sum(c0), sum(c2) from $table group by c1",
Set(
- "Native shuffle is not enabled: spark.comet.exec.shuffle.enabled
is not enabled",
+ "Comet shuffle is not enabled: spark.comet.exec.shuffle.enabled
is not enabled",
"AQEShuffleRead is not supported")),
(
"SELECT A.c1, A.sum_c0, A.sum_c2, B.casted from "
@@ -1429,7 +1429,7 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
+ s"(SELECT c1, cast(make_interval(c0, c1, c0, c1, c0, c0, c2)
as string) as casted from $table) as B "
+ "where A.c1 = B.c1 ",
Set(
- "Native shuffle is not enabled: spark.comet.exec.shuffle.enabled
is not enabled",
+ "Comet shuffle is not enabled: spark.comet.exec.shuffle.enabled
is not enabled",
"AQEShuffleRead is not supported",
"make_interval is not supported",
"BroadcastExchange is not supported",
diff --git
a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
index d36534ee..ca7bc7df 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
@@ -44,7 +44,7 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
val df1 = sql("SELECT count(DISTINCT 2), count(DISTINCT 2,3)")
checkSparkAnswer(df1)
@@ -57,7 +57,7 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
checkSparkAnswer(sql("""
|SELECT
| lag(123, 100, 321) OVER (ORDER BY id) as lag,
@@ -78,7 +78,7 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
val df1 = Seq(
("a", "b", "c"),
("a", "b", "c"),
@@ -99,7 +99,7 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
val df = sql("SELECT LAST(n) FROM lowerCaseData")
checkSparkAnswer(df)
}
@@ -114,7 +114,7 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
val df = sql("select sum(a), avg(a) from allNulls")
checkSparkAnswer(df)
}
@@ -125,7 +125,7 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test")
makeParquetFile(path, 10000, 10, false)
@@ -141,7 +141,7 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test")
@@ -160,7 +160,7 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
sql(
"CREATE TABLE lineitem(l_extendedprice DOUBLE, l_quantity DOUBLE,
l_partkey STRING) USING PARQUET")
@@ -197,7 +197,7 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> EliminateSorts.ruleName,
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test")
@@ -216,7 +216,7 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "false",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "native") {
withTable(table) {
sql(s"CREATE TABLE $table(col DECIMAL(5, 2)) USING PARQUET")
sql(s"INSERT INTO TABLE $table VALUES (CAST(12345.01 AS DECIMAL(5,
2)))")
@@ -316,7 +316,7 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
Seq(true, false).foreach { dictionaryEnabled =>
withSQLConf(
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key ->
nativeShuffleEnabled.toString,
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "native") {
withParquetTable(
(0 until 100).map(i => (i, (i % 10).toString)),
"tbl",
@@ -497,7 +497,7 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
Seq(true, false).foreach { nativeShuffleEnabled =>
withSQLConf(
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key ->
nativeShuffleEnabled.toString,
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "native") {
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test")
makeParquetFile(path, 1000, 20, dictionaryEnabled)
@@ -686,7 +686,7 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("test final count") {
withSQLConf(
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "native") {
Seq(false, true).foreach { dictionaryEnabled =>
withParquetTable((0 until 5).map(i => (i, i % 2)), "tbl",
dictionaryEnabled) {
checkSparkAnswerAndNumOfAggregates("SELECT _2, COUNT(_1) FROM tbl
GROUP BY _2", 2)
@@ -703,7 +703,7 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("test final min/max") {
withSQLConf(
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "native") {
Seq(true, false).foreach { dictionaryEnabled =>
withParquetTable((0 until 5).map(i => (i, i % 2)), "tbl",
dictionaryEnabled) {
checkSparkAnswerAndNumOfAggregates(
@@ -724,7 +724,7 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("test final min/max/count with result expressions") {
withSQLConf(
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "native") {
Seq(true, false).foreach { dictionaryEnabled =>
withParquetTable((0 until 5).map(i => (i, i % 2)), "tbl",
dictionaryEnabled) {
checkSparkAnswerAndNumOfAggregates(
@@ -759,7 +759,7 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("test final sum") {
withSQLConf(
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "native") {
Seq(false, true).foreach { dictionaryEnabled =>
withParquetTable((0L until 5L).map(i => (i, i % 2)), "tbl",
dictionaryEnabled) {
checkSparkAnswerAndNumOfAggregates(
@@ -780,7 +780,7 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("test final avg") {
withSQLConf(
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "native") {
Seq(true, false).foreach { dictionaryEnabled =>
withParquetTable(
(0 until 5).map(i => (i.toDouble, i.toDouble % 2)),
@@ -805,7 +805,7 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
withSQLConf(
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "native") {
Seq(true, false).foreach { dictionaryEnabled =>
withSQLConf("parquet.enable.dictionary" -> dictionaryEnabled.toString)
{
val table = "t1"
@@ -850,7 +850,7 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("avg null handling") {
withSQLConf(
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "native") {
val table = "t1"
withTable(table) {
sql(s"create table $table(a double, b double) using parquet")
@@ -872,7 +872,7 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
Seq(true, false).foreach { nativeShuffleEnabled =>
withSQLConf(
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key ->
nativeShuffleEnabled.toString,
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false",
+ CometConf.COMET_SHUFFLE_MODE.key -> "native",
CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") {
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test")
@@ -912,11 +912,11 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("distinct") {
withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
- Seq(true, false).foreach { cometColumnShuffleEnabled =>
- withSQLConf(
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key ->
cometColumnShuffleEnabled.toString) {
+ Seq("native", "jvm").foreach { cometShuffleMode =>
+ withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> cometShuffleMode) {
Seq(true, false).foreach { dictionary =>
withSQLConf("parquet.enable.dictionary" -> dictionary.toString) {
+ val cometColumnShuffleEnabled = cometShuffleMode == "jvm"
val table = "test"
withTable(table) {
sql(s"create table $table(col1 int, col2 int, col3 int) using
parquet")
@@ -970,7 +970,7 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
Seq(true, false).foreach { dictionary =>
withSQLConf("parquet.enable.dictionary" -> dictionary.toString) {
val table = "test"
@@ -1016,9 +1016,8 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("test bool_and/bool_or") {
withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
- Seq(true, false).foreach { cometColumnShuffleEnabled =>
- withSQLConf(
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key ->
cometColumnShuffleEnabled.toString) {
+ Seq("native", "jvm").foreach { cometShuffleMode =>
+ withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> cometShuffleMode) {
Seq(true, false).foreach { dictionary =>
withSQLConf("parquet.enable.dictionary" -> dictionary.toString) {
val table = "test"
@@ -1043,7 +1042,7 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("bitwise aggregate") {
withSQLConf(
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
Seq(true, false).foreach { dictionary =>
withSQLConf("parquet.enable.dictionary" -> dictionary.toString) {
val table = "test"
@@ -1092,9 +1091,8 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("covar_pop and covar_samp") {
withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
- Seq(true, false).foreach { cometColumnShuffleEnabled =>
- withSQLConf(
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key ->
cometColumnShuffleEnabled.toString) {
+ Seq("native", "jvm").foreach { cometShuffleMode =>
+ withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> cometShuffleMode) {
Seq(true, false).foreach { dictionary =>
withSQLConf("parquet.enable.dictionary" -> dictionary.toString) {
val table = "test"
@@ -1131,9 +1129,8 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("var_pop and var_samp") {
withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
- Seq(true, false).foreach { cometColumnShuffleEnabled =>
- withSQLConf(
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key ->
cometColumnShuffleEnabled.toString) {
+ Seq("native", "jvm").foreach { cometShuffleMode =>
+ withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> cometShuffleMode) {
Seq(true, false).foreach { dictionary =>
withSQLConf("parquet.enable.dictionary" -> dictionary.toString) {
Seq(true, false).foreach { nullOnDivideByZero =>
@@ -1171,9 +1168,8 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("stddev_pop and stddev_samp") {
withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
- Seq(true, false).foreach { cometColumnShuffleEnabled =>
- withSQLConf(
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key ->
cometColumnShuffleEnabled.toString) {
+ Seq("native", "jvm").foreach { cometShuffleMode =>
+ withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> cometShuffleMode) {
Seq(true, false).foreach { dictionary =>
withSQLConf("parquet.enable.dictionary" -> dictionary.toString) {
Seq(true, false).foreach { nullOnDivideByZero =>
@@ -1214,9 +1210,8 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("correlation") {
withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
- Seq(true, false).foreach { cometColumnShuffleEnabled =>
- withSQLConf(
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key ->
cometColumnShuffleEnabled.toString) {
+ Seq("jvm", "native").foreach { cometShuffleMode =>
+ withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> cometShuffleMode) {
Seq(true, false).foreach { dictionary =>
withSQLConf("parquet.enable.dictionary" -> dictionary.toString) {
Seq(true, false).foreach { nullOnDivideByZero =>
diff --git
a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
index 600f9c44..c38be7c4 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
@@ -54,7 +54,7 @@ abstract class CometColumnarShuffleSuite extends
CometTestBase with AdaptiveSpar
CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key ->
asyncShuffleEnable.toString,
CometConf.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD.key ->
numElementsForceSpillThreshold.toString,
CometConf.COMET_EXEC_ENABLED.key -> "false",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE.key -> "1536m") {
testFun
@@ -963,7 +963,7 @@ class CometShuffleSuite extends CometColumnarShuffleSuite {
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
withParquetTable((0 until 10).map(i => (i, i % 5)), "tbl_a") {
val df = sql("SELECT * FROM tbl_a")
val shuffled = df
@@ -983,7 +983,7 @@ class CometShuffleSuite extends CometColumnarShuffleSuite {
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
withParquetTable((0 until 10).map(i => (i, i % 5)), "tbl_a") {
withParquetTable((0 until 10).map(i => (i % 10, i + 2)), "tbl_b") {
val df = sql("SELECT * FROM tbl_a")
@@ -1016,7 +1016,7 @@ class DisableAQECometShuffleSuite extends
CometColumnarShuffleSuite {
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
withParquetTable((0 until 10).map(i => (i, i % 5)), "tbl_a") {
withParquetTable((0 until 10).map(i => (i % 10, i + 2)), "tbl_b") {
val df = sql("SELECT * FROM tbl_a")
@@ -1061,7 +1061,7 @@ class CometShuffleEncryptionSuite extends CometTestBase {
withSQLConf(
CometConf.COMET_EXEC_ENABLED.key -> "false",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm",
CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key ->
asyncEnabled.toString) {
readParquetFile(path.toString) { df =>
val shuffled = df
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index c5fef022..7c19890d 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -134,14 +134,15 @@ class CometExecSuite extends CometTestBase {
.toDF("c1", "c2")
.createOrReplaceTempView("v")
- Seq(true, false).foreach { columnarShuffle =>
+ Seq("native", "jvm").foreach { columnarShuffleMode =>
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key ->
columnarShuffle.toString) {
+ CometConf.COMET_SHUFFLE_MODE.key -> columnarShuffleMode) {
val df = sql("SELECT * FROM v where c1 = 1 order by c1, c2")
val shuffle = find(df.queryExecution.executedPlan) {
- case _: CometShuffleExchangeExec if columnarShuffle => true
- case _: ShuffleExchangeExec if !columnarShuffle => true
+ case _: CometShuffleExchangeExec if
columnarShuffleMode.equalsIgnoreCase("jvm") =>
+ true
+ case _: ShuffleExchangeExec if
!columnarShuffleMode.equalsIgnoreCase("jvm") => true
case _ => false
}.get
assert(shuffle.logicalLink.isEmpty)
@@ -179,7 +180,7 @@ class CometExecSuite extends CometTestBase {
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled,
// `REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION` is a new config in
Spark 3.3+.
"spark.sql.requireAllClusterKeysForDistribution" -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
val df =
Seq(("a", 1, 1), ("a", 2, 2), ("b", 1, 3), ("b", 1, 4)).toDF("key1",
"key2", "value")
val windowSpec = Window.partitionBy("key1", "key2").orderBy("value")
@@ -318,7 +319,7 @@ class CometExecSuite extends CometTestBase {
dataTypes.map { subqueryType =>
withSQLConf(
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm",
CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") {
withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") {
var column1 = s"CAST(max(_1) AS $subqueryType)"
@@ -499,7 +500,7 @@ class CometExecSuite extends CometTestBase {
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
withTable(tableName, dim) {
sql(
@@ -716,7 +717,7 @@ class CometExecSuite extends CometTestBase {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") {
val df = sql("SELECT * FROM tbl").sort($"_1".desc)
checkSparkAnswerAndOperator(df)
@@ -764,10 +765,10 @@ class CometExecSuite extends CometTestBase {
}
test("limit") {
- Seq("true", "false").foreach { columnarShuffle =>
+ Seq("native", "jvm").foreach { columnarShuffleMode =>
withSQLConf(
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> columnarShuffle) {
+ CometConf.COMET_SHUFFLE_MODE.key -> columnarShuffleMode) {
withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_a") {
val df = sql("SELECT * FROM tbl_a")
.repartition(10, $"_1")
@@ -1411,7 +1412,7 @@ class CometExecSuite extends CometTestBase {
Seq("true", "false").foreach(aqe => {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqe,
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm",
SQLConf.CACHE_VECTORIZED_READER_ENABLED.key -> "false") {
spark
.range(1000)
diff --git
a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
index d48ba183..d17e4abf 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
@@ -37,7 +37,7 @@ class CometNativeShuffleSuite extends CometTestBase with
AdaptiveSparkPlanHelper
super.test(testName, testTags: _*) {
withSQLConf(
CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false",
+ CometConf.COMET_SHUFFLE_MODE.key -> "native",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
testFun
}
diff --git
a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala
b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala
index e8aac261..1abe5fae 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala
@@ -90,7 +90,7 @@ class CometTPCHQuerySuite extends QueryTest with CometTPCBase
with SQLQueryTestH
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
- conf.set(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key, "true")
+ conf.set(CometConf.COMET_SHUFFLE_MODE.key, "jvm")
conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
conf.set(MEMORY_OFFHEAP_SIZE.key, "2g")
}
diff --git
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala
index d6020ac6..bf4bfdbe 100644
---
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala
@@ -131,7 +131,7 @@ object CometExecBenchmark extends CometBenchmarkBase {
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
spark.sql(
"SELECT (SELECT max(col1) AS parquetV1Table FROM parquetV1Table)
AS a, " +
"col2, col3 FROM parquetV1Table")
diff --git
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometShuffleBenchmark.scala
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometShuffleBenchmark.scala
index 86557281..30a2823c 100644
---
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometShuffleBenchmark.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometShuffleBenchmark.scala
@@ -106,7 +106,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true",
CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1.0",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm",
CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "false") {
spark
.sql(
@@ -165,7 +165,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true",
CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1.0",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm",
CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "false") {
spark
.sql(
@@ -222,7 +222,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true",
CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1.0",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm",
CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "false") {
spark
.sql("select c1 from parquetV1Table")
@@ -238,7 +238,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true",
CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "2.0",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm",
CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "false") {
spark
.sql("select c1 from parquetV1Table")
@@ -254,7 +254,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true",
CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key ->
"1000000000.0",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm",
CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "false") {
spark
.sql("select c1 from parquetV1Table")
@@ -321,7 +321,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm",
CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "false") {
spark
.sql("select c1 from parquetV1Table")
@@ -336,7 +336,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm",
CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "true") {
spark
.sql("select c1 from parquetV1Table")
@@ -409,7 +409,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
spark
.sql(s"select $columns from parquetV1Table")
.repartition(partitionNum, Column("c1"))
@@ -422,7 +422,8 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true",
- CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
+ CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
+ CometConf.COMET_SHUFFLE_MODE.key -> "native") {
spark
.sql(s"select $columns from parquetV1Table")
.repartition(partitionNum, Column("c1"))
diff --git
a/spark/src/test/spark-3.4/org/apache/comet/exec/CometExec3_4Suite.scala
b/spark/src/test/spark-3.4/org/apache/comet/exec/CometExec3_4Suite.scala
index 32b76d9b..019b4f03 100644
--- a/spark/src/test/spark-3.4/org/apache/comet/exec/CometExec3_4Suite.scala
+++ b/spark/src/test/spark-3.4/org/apache/comet/exec/CometExec3_4Suite.scala
@@ -43,7 +43,7 @@ class CometExec3_4Suite extends CometTestBase {
// The syntax is only supported by Spark 3.4+.
test("subquery limit: limit with offset should return correct results") {
- withSQLConf(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+ withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
withTable("t1", "t2") {
val table1 =
"""create temporary view t1 as select * from values
@@ -95,7 +95,7 @@ class CometExec3_4Suite extends CometTestBase {
// Dataset.offset API is not available before Spark 3.4
test("offset") {
- withSQLConf(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+ withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
checkSparkAnswer(testData.offset(90))
checkSparkAnswer(arrayData.toDF().offset(99))
checkSparkAnswer(mapData.toDF().offset(99))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]