This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 769d76e74 fix: NativeScan count assert firing for no reason (#2850)
769d76e74 is described below

commit 769d76e74e618c0555a993322318bd32e69716bf
Author: Emily Matheys <[email protected]>
AuthorDate: Fri Dec 5 16:43:07 2025 +0200

    fix: NativeScan count assert firing for no reason (#2850)
---
 .../org/apache/spark/sql/comet/operators.scala     | 41 +++++++++++++---------
 .../spark/sql/comet/ParquetEncryptionITCase.scala  | 16 ++++++---
 2 files changed, 35 insertions(+), 22 deletions(-)

diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index 6eebc53d5..28688e904 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -217,22 +217,14 @@ abstract class CometNativeExec extends CometExec {
         // TODO: support native metrics for all operators.
         val nativeMetrics = CometMetricNode.fromCometPlan(this)
 
+        // Go over all the native scans, in order to see if they need 
encryption options.
         // For each relation in a CometNativeScan generate a hadoopConf,
         // for each file path in a relation associate with hadoopConf
-        val cometNativeScans: Seq[CometNativeScanExec] = this
-          .collectLeaves()
-          .filter(_.isInstanceOf[CometNativeScanExec])
-          .map(_.asInstanceOf[CometNativeScanExec])
-        assert(
-          cometNativeScans.size <= 1,
-          "We expect one native scan in a Comet plan since we will broadcast 
one hadoopConf.")
-        // If this assumption changes in the future, you can look at the 
commit history of #2447
-        // to see how there used to be a map of relations to broadcasted confs 
in case multiple
-        // relations in a single plan. The example that came up was UNION. See 
discussion at:
-        // 
https://github.com/apache/datafusion-comet/pull/2447#discussion_r2406118264
-        val (broadcastedHadoopConfForEncryption, encryptedFilePaths) =
-          cometNativeScans.headOption.fold(
-            (None: Option[Broadcast[SerializableConfiguration]], 
Seq.empty[String])) { scan =>
+        // This is done per native plan, so only count scans until a comet 
input is reached.
+        val encryptionOptions =
+          mutable.ArrayBuffer.empty[(Broadcast[SerializableConfiguration], 
Seq[String])]
+        foreachUntilCometInput(this) {
+          case scan: CometNativeScanExec =>
             // This creates a hadoopConf that brings in any SQLConf 
"spark.hadoop.*" configs and
             // per-relation configs since different tables might have 
different decryption
             // properties.
@@ -244,10 +236,25 @@ abstract class CometNativeExec extends CometExec {
               val broadcastedConf =
                 scan.relation.sparkSession.sparkContext
                   .broadcast(new SerializableConfiguration(hadoopConf))
-              (Some(broadcastedConf), scan.relation.inputFiles.toSeq)
-            } else {
-              (None, Seq.empty)
+
+              val optsTuple: (Broadcast[SerializableConfiguration], 
Seq[String]) =
+                (broadcastedConf, scan.relation.inputFiles.toSeq)
+              encryptionOptions += optsTuple
             }
+          case _ => // no-op
+        }
+        assert(
+          encryptionOptions.size <= 1,
+          "We expect one native scan that requires encryption reading in a 
Comet plan," +
+            " since we will broadcast one hadoopConf.")
+        // If this assumption changes in the future, you can look at the 
commit history of #2447
+        // to see how there used to be a map of relations to broadcasted confs 
in case multiple
+        // relations in a single plan. The example that came up was UNION. See 
discussion at:
+        // 
https://github.com/apache/datafusion-comet/pull/2447#discussion_r2406118264
+        val (broadcastedHadoopConfForEncryption, encryptedFilePaths) =
+          encryptionOptions.headOption match {
+            case Some((conf, paths)) => (Some(conf), paths)
+            case None => (None, Seq.empty)
           }
 
         def createCometExecIter(
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/comet/ParquetEncryptionITCase.scala 
b/spark/src/test/scala/org/apache/spark/sql/comet/ParquetEncryptionITCase.scala
index cff21ecec..b3e6a5a42 100644
--- 
a/spark/src/test/scala/org/apache/spark/sql/comet/ParquetEncryptionITCase.scala
+++ 
b/spark/src/test/scala/org/apache/spark/sql/comet/ParquetEncryptionITCase.scala
@@ -32,7 +32,7 @@ import org.apache.parquet.crypto.DecryptionPropertiesFactory
 import org.apache.parquet.crypto.keytools.{KeyToolkit, 
PropertiesDrivenCryptoFactory}
 import org.apache.parquet.crypto.keytools.mocks.InMemoryKMS
 import org.apache.spark.{DebugFilesystem, SparkConf}
-import org.apache.spark.sql.{CometTestBase, SQLContext}
+import org.apache.spark.sql.{functions, CometTestBase, SQLContext}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
 
@@ -359,7 +359,8 @@ class ParquetEncryptionITCase extends CometTestBase with 
SQLTestUtils {
         KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME ->
           "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS",
         InMemoryKMS.KEY_LIST_PROPERTY_NAME ->
-          s"footerKey: ${footerKey}, key1: ${key1}, key2: ${key2}") {
+          s"footerKey: ${footerKey}, key1: ${key1}, key2: ${key2}",
+        CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
 
         // Write first file with key1
         val inputDF1 = spark
@@ -394,11 +395,16 @@ class ParquetEncryptionITCase extends CometTestBase with 
SQLTestUtils {
         val parquetDF2 = spark.read.parquet(parquetDir2)
 
         val unionDF = parquetDF1.union(parquetDF2)
+        // Since the union has its own executeColumnar, problems would not 
surface if it is the last operator
+        // If we add another comet aggregate after the union, we see the need 
for the
+        // foreachUntilCometInput() in operator.scala
+        // as we would error on multiple native scan execs despite no longer 
being in the same plan at all
+        val aggDf = unionDF.agg(functions.sum("id"))
 
         if (CometConf.COMET_ENABLED.get(conf)) {
-          checkSparkAnswerAndOperator(unionDF)
+          checkSparkAnswerAndOperator(aggDf)
         } else {
-          checkSparkAnswer(unionDF)
+          checkSparkAnswer(aggDf)
         }
       }
     }
@@ -447,7 +453,7 @@ class ParquetEncryptionITCase extends CometTestBase with 
SQLTestUtils {
   }
 
   protected override def sparkConf: SparkConf = {
-    val conf = new SparkConf()
+    val conf = super.sparkConf
     conf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
     conf
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to