This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 4ede2144 chore: Add config for enabling SMJ with join condition (#937)
4ede2144 is described below
commit 4ede2144316eccbe562078066ef9be8ca1deeeae
Author: Andy Grove <[email protected]>
AuthorDate: Mon Sep 16 11:31:54 2024 -0600
chore: Add config for enabling SMJ with join condition (#937)
* Add config for enabling SMJ with join condition
* Update common/src/main/scala/org/apache/comet/CometConf.scala
Co-authored-by: Oleks V <[email protected]>
* Update docs/source/user-guide/configs.md
Co-authored-by: Oleks V <[email protected]>
* enable config in stability suite
---------
Co-authored-by: Oleks V <[email protected]>
---
common/src/main/scala/org/apache/comet/CometConf.scala | 6 ++++++
docs/source/user-guide/configs.md | 1 +
spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 7 +++++++
spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala | 1 +
spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala | 1 +
.../scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala | 1 +
6 files changed, 17 insertions(+)
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 8828b70f..03b7a2a4 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -145,6 +145,12 @@ object CometConf extends ShimCometConf {
val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("takeOrderedAndProject", defaultValue = true)
+ val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED:
ConfigEntry[Boolean] =
+ conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled")
+ .doc("Experimental support for Sort Merge Join with filter")
+ .booleanConf
+ .createWithDefault(false)
+
val COMET_EXPR_STDDEV_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig(
"stddev",
diff --git a/docs/source/user-guide/configs.md
b/docs/source/user-guide/configs.md
index 1b5fe736..ff2db342 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -54,6 +54,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle.
Note that this requires setting 'spark.shuffle.manager' to
'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'.
'spark.shuffle.manager' must be set before starting the Spark application and
cannot be changed during the application. | true |
| spark.comet.exec.sort.enabled | Whether to enable sort by default. | true |
| spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by
default. | true |
+| spark.comet.exec.sortMergeJoinWithJoinFilter.enabled | Experimental support
for Sort Merge Join with filter | false |
| spark.comet.exec.stddev.enabled | Whether to enable stddev by default.
stddev is slower than Spark's implementation. | true |
| spark.comet.exec.takeOrderedAndProject.enabled | Whether to enable
takeOrderedAndProject by default. | true |
| spark.comet.exec.union.enabled | Whether to enable union by default. | true |
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index dbc3a1d8..50d92165 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -2961,6 +2961,13 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
}
}
+ if (join.condition.isDefined &&
+ !CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED
+ .get(conf)) {
+ withInfo(join, join.condition.get)
+ return None
+ }
+
val condition = join.condition.map { cond =>
val condProto = exprToProto(cond, join.left.output ++
join.right.output)
if (condProto.isEmpty) {
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
index 1bd0e1b7..d787a9b1 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
@@ -340,6 +340,7 @@ class CometJoinSuite extends CometTestBase {
test("SortMergeJoin with join filter") {
withSQLConf(
+ CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key ->
"true",
SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
withParquetTable((0 until 10).map(i => (i, i % 5)), "tbl_a") {
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index d49095e2..1709cce6 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -80,6 +80,7 @@ abstract class CometTestBase
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g")
+
conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key,
"true")
conf
}
diff --git
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
index 83cc8982..a553e61c 100644
---
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
@@ -262,6 +262,7 @@ trait CometPlanStabilitySuite extends
DisableAdaptiveExecutionSuite with TPCDSBa
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "false",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key ->
"true",
CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", // needed for
v1.4/q9, v1.4/q44, v2.7.0/q6, v2.7.0/q64
"spark.sql.readSideCharPadding" -> "false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]