This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new d09baeced fix: Pass all Comet configs to native plan (#2801)
d09baeced is described below

commit d09baecedbc6bbd3f5be210c6d7b1c3f71610234
Author: Andy Grove <[email protected]>
AuthorDate: Wed Nov 19 17:24:41 2025 -0700

    fix: Pass all Comet configs to native plan (#2801)
---
 .../scala/org/apache/comet/CometExecIterator.scala | 18 +++++++++++-----
 .../org/apache/comet/exec/CometExecSuite.scala     | 24 +++++++++++++++++++++-
 2 files changed, 36 insertions(+), 6 deletions(-)

diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala 
b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
index a680cbf59..5dae41642 100644
--- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
+++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
@@ -30,6 +30,7 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.sql.comet.CometMetricNode
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.vectorized._
 import org.apache.spark.util.SerializableConfiguration
 
@@ -87,11 +88,7 @@ class CometExecIterator(
     val localDiskDirs = SparkEnv.get.blockManager.getLocalDiskDirs
 
     // serialize Comet related Spark configs in protobuf format
-    val builder = ConfigMap.newBuilder()
-    conf.getAll.filter(_._1.startsWith(CometConf.COMET_PREFIX)).foreach { case 
(k, v) =>
-      builder.putEntries(k, v)
-    }
-    val protobufSparkConfigs = builder.build().toByteArray
+    val protobufSparkConfigs = CometExecIterator.serializeCometSQLConfs()
 
     // Create keyUnwrapper if encryption is enabled
     val keyUnwrapper = if (encryptedFilePaths.nonEmpty) {
@@ -265,6 +262,17 @@ class CometExecIterator(
 
 object CometExecIterator extends Logging {
 
+  private def cometSqlConfs: Map[String, String] =
+    SQLConf.get.getAllConfs.filter(_._1.startsWith(CometConf.COMET_PREFIX))
+
+  def serializeCometSQLConfs(): Array[Byte] = {
+    val builder = ConfigMap.newBuilder()
+    cometSqlConfs.foreach { case (k, v) =>
+      builder.putEntries(k, v)
+    }
+    builder.build().toByteArray
+  }
+
   def getMemoryConfig(conf: SparkConf): MemoryConfig = {
     val numCores = numDriverOrExecutorCores(conf)
     val coresPerTask = conf.get("spark.task.cpus", "1").toInt
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index 64d7663c0..9f9df73a9 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -47,8 +47,9 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
 import org.apache.spark.unsafe.types.UTF8String
 
-import org.apache.comet.{CometConf, ExtendedExplainInfo}
+import org.apache.comet.{CometConf, CometExecIterator, ExtendedExplainInfo}
 import org.apache.comet.CometSparkSessionExtensions.{isSpark35Plus, 
isSpark40Plus}
+import org.apache.comet.serde.Config.ConfigMap
 import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, 
SchemaGenOptions}
 
 class CometExecSuite extends CometTestBase {
@@ -66,6 +67,27 @@ class CometExecSuite extends CometTestBase {
     }
   }
 
+  test("SQLConf serde") {
+
+    def roundtrip = {
+      val protobuf = CometExecIterator.serializeCometSQLConfs()
+      ConfigMap.parseFrom(protobuf)
+    }
+
+    // test not setting the config
+    val deserialized: ConfigMap = roundtrip
+    assert(null == 
deserialized.getEntriesMap.get(CometConf.COMET_EXPLAIN_NATIVE_ENABLED.key))
+
+    // test explicitly setting the config
+    for (value <- Seq("true", "false")) {
+      withSQLConf(CometConf.COMET_EXPLAIN_NATIVE_ENABLED.key -> value) {
+        val deserialized: ConfigMap = roundtrip
+        assert(
+          value == 
deserialized.getEntriesMap.get(CometConf.COMET_EXPLAIN_NATIVE_ENABLED.key))
+      }
+    }
+  }
+
   test("TopK operator should return correct results on dictionary column with 
nulls") {
     withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
       withTable("test_data") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to