This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new d09baeced fix: Pass all Comet configs to native plan (#2801)
d09baeced is described below
commit d09baecedbc6bbd3f5be210c6d7b1c3f71610234
Author: Andy Grove <[email protected]>
AuthorDate: Wed Nov 19 17:24:41 2025 -0700
fix: Pass all Comet configs to native plan (#2801)
---
.../scala/org/apache/comet/CometExecIterator.scala | 18 +++++++++++-----
.../org/apache/comet/exec/CometExecSuite.scala | 24 +++++++++++++++++++++-
2 files changed, 36 insertions(+), 6 deletions(-)
diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
index a680cbf59..5dae41642 100644
--- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
+++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
@@ -30,6 +30,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.comet.CometMetricNode
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized._
import org.apache.spark.util.SerializableConfiguration
@@ -87,11 +88,7 @@ class CometExecIterator(
val localDiskDirs = SparkEnv.get.blockManager.getLocalDiskDirs
// serialize Comet related Spark configs in protobuf format
- val builder = ConfigMap.newBuilder()
- conf.getAll.filter(_._1.startsWith(CometConf.COMET_PREFIX)).foreach { case
(k, v) =>
- builder.putEntries(k, v)
- }
- val protobufSparkConfigs = builder.build().toByteArray
+ val protobufSparkConfigs = CometExecIterator.serializeCometSQLConfs()
// Create keyUnwrapper if encryption is enabled
val keyUnwrapper = if (encryptedFilePaths.nonEmpty) {
@@ -265,6 +262,17 @@ class CometExecIterator(
object CometExecIterator extends Logging {
+ private def cometSqlConfs: Map[String, String] =
+ SQLConf.get.getAllConfs.filter(_._1.startsWith(CometConf.COMET_PREFIX))
+
+ def serializeCometSQLConfs(): Array[Byte] = {
+ val builder = ConfigMap.newBuilder()
+ cometSqlConfs.foreach { case (k, v) =>
+ builder.putEntries(k, v)
+ }
+ builder.build().toByteArray
+ }
+
def getMemoryConfig(conf: SparkConf): MemoryConfig = {
val numCores = numDriverOrExecutorCores(conf)
val coresPerTask = conf.get("spark.task.cpus", "1").toInt
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index 64d7663c0..9f9df73a9 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -47,8 +47,9 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
import org.apache.spark.unsafe.types.UTF8String
-import org.apache.comet.{CometConf, ExtendedExplainInfo}
+import org.apache.comet.{CometConf, CometExecIterator, ExtendedExplainInfo}
import org.apache.comet.CometSparkSessionExtensions.{isSpark35Plus,
isSpark40Plus}
+import org.apache.comet.serde.Config.ConfigMap
import org.apache.comet.testing.{DataGenOptions, ParquetGenerator,
SchemaGenOptions}
class CometExecSuite extends CometTestBase {
@@ -66,6 +67,27 @@ class CometExecSuite extends CometTestBase {
}
}
+ test("SQLConf serde") {
+
+ def roundtrip = {
+ val protobuf = CometExecIterator.serializeCometSQLConfs()
+ ConfigMap.parseFrom(protobuf)
+ }
+
+ // test not setting the config
+ val deserialized: ConfigMap = roundtrip
+ assert(null ==
deserialized.getEntriesMap.get(CometConf.COMET_EXPLAIN_NATIVE_ENABLED.key))
+
+ // test explicitly setting the config
+ for (value <- Seq("true", "false")) {
+ withSQLConf(CometConf.COMET_EXPLAIN_NATIVE_ENABLED.key -> value) {
+ val deserialized: ConfigMap = roundtrip
+ assert(
+ value ==
deserialized.getEntriesMap.get(CometConf.COMET_EXPLAIN_NATIVE_ENABLED.key))
+ }
+ }
+ }
+
test("TopK operator should return correct results on dictionary column with
nulls") {
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
withTable("test_data") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]