This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 2226cfaba fix: Refactor CometScanRule and fix bugs (#1483)
2226cfaba is described below

commit 2226cfaba480333c5857dc70bd792be8dca9e47c
Author: Andy Grove <[email protected]>
AuthorDate: Wed Mar 19 14:18:13 2025 -0600

    fix: Refactor CometScanRule and fix bugs (#1483)
---
 .../apache/comet/CometSparkSessionExtensions.scala | 126 ++++++++++-----------
 .../comet/exec/CometColumnarShuffleSuite.scala     |  25 ++++
 .../comet/exec/CometNativeShuffleSuite.scala       |   2 +
 3 files changed, 85 insertions(+), 68 deletions(-)

diff --git 
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala 
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 800cab832..105144e55 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -21,6 +21,8 @@ package org.apache.comet
 
 import java.nio.ByteOrder
 
+import scala.collection.mutable.ListBuffer
+
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.ByteUnit
@@ -100,9 +102,6 @@ class CometSparkSessionExtensions
         plan
       } else {
 
-        def isDynamicPruningFilter(e: Expression): Boolean =
-          e.exists(_.isInstanceOf[PlanExpression[_]])
-
         def hasMetadataCol(plan: SparkPlan): Boolean = {
           plan.expressions.exists(_.exists {
             case a: Attribute =>
@@ -116,11 +115,9 @@ class CometSparkSessionExtensions
             withInfo(scan, "Metadata column is not supported")
             scan
 
-          case scanExec: FileSourceScanExec
-              if COMET_DPP_FALLBACK_ENABLED.get() &&
-                scanExec.partitionFilters.exists(isDynamicPruningFilter) =>
-            withInfo(scanExec, "DPP not supported")
-            scanExec
+          // data source V1
+          case scanExec: FileSourceScanExec =>
+            transformV1Scan(scanExec)
 
           // data source V2
           case scanExec: BatchScanExec
@@ -188,69 +185,62 @@ class CometSparkSessionExtensions
                 scanExec
             }
 
-          // data source V1
-          case scanExec @ FileSourceScanExec(
-                HadoopFsRelation(_, partitionSchema, _, _, fileFormat, _),
-                _: Seq[_],
-                requiredSchema,
-                _,
-                _,
-                _,
-                _,
-                _,
-                _)
-              if CometScanExec.isFileFormatSupported(fileFormat)
-                && CometNativeScanExec.isSchemaSupported(requiredSchema)
-                && CometNativeScanExec.isSchemaSupported(partitionSchema)
-                // TODO we only enable full native scan if COMET_EXEC_ENABLED 
is enabled
-                // but this is not really what we want .. we currently insert 
`CometScanExec`
-                // here and then it gets replaced with `CometNativeScanExec` 
in `CometExecRule`
-                // but that only happens if `COMET_EXEC_ENABLED` is enabled
-                && COMET_EXEC_ENABLED.get()
-                && COMET_NATIVE_SCAN_IMPL.get() == 
CometConf.SCAN_NATIVE_DATAFUSION =>
-            logInfo("Comet extension enabled for v1 full native Scan")
-            CometScanExec(scanExec, session)
+        }
+      }
+    }
 
-          // data source V1
-          case scanExec @ FileSourceScanExec(
-                HadoopFsRelation(_, partitionSchema, _, _, fileFormat, _),
-                _: Seq[_],
-                requiredSchema,
-                _,
-                _,
-                _,
-                _,
-                _,
-                _)
-              if CometScanExec.isFileFormatSupported(fileFormat)
-                && CometScanExec.isSchemaSupported(requiredSchema)
-                && CometScanExec.isSchemaSupported(partitionSchema) =>
-            logInfo("Comet extension enabled for v1 Scan")
-            CometScanExec(scanExec, session)
+    private def isDynamicPruningFilter(e: Expression): Boolean =
+      e.exists(_.isInstanceOf[PlanExpression[_]])
 
-          // data source v1 not supported case
-          case scanExec @ FileSourceScanExec(
-                HadoopFsRelation(_, partitionSchema, _, _, fileFormat, _),
-                _: Seq[_],
-                requiredSchema,
-                _,
-                _,
-                _,
-                _,
-                _,
-                _) =>
-            val info1 = createMessage(
-              !CometScanExec.isFileFormatSupported(fileFormat),
-              s"File format $fileFormat is not supported")
-            val info2 = createMessage(
-              !CometScanExec.isSchemaSupported(requiredSchema),
-              s"Schema $requiredSchema is not supported")
-            val info3 = createMessage(
-              !CometScanExec.isSchemaSupported(partitionSchema),
-              s"Partition schema $partitionSchema is not supported")
-            withInfo(scanExec, Seq(info1, info2, info3).flatten.mkString(","))
+    private def transformV1Scan(scanExec: FileSourceScanExec): SparkPlan = {
+
+      if (COMET_DPP_FALLBACK_ENABLED.get() &&
+        scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
+        withInfo(scanExec, "DPP not supported")
+        return scanExec
+      }
+
+      scanExec.relation match {
+        case r: HadoopFsRelation =>
+          val fallbackReasons = new ListBuffer[String]()
+          if (!CometScanExec.isFileFormatSupported(r.fileFormat)) {
+            fallbackReasons += s"Unsupported file format ${r.fileFormat}"
+          }
+
+          val scanImpl = COMET_NATIVE_SCAN_IMPL.get()
+          if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION && 
!COMET_EXEC_ENABLED.get()) {
+            fallbackReasons +=
+              s"Full native scan disabled because ${COMET_EXEC_ENABLED.key} 
disabled"
+          }
+
+          val (schemaSupported, partitionSchemaSupported) = scanImpl match {
+            case CometConf.SCAN_NATIVE_DATAFUSION =>
+              (
+                CometNativeScanExec.isSchemaSupported(scanExec.requiredSchema),
+                CometNativeScanExec.isSchemaSupported(r.partitionSchema))
+            case CometConf.SCAN_NATIVE_COMET | SCAN_NATIVE_ICEBERG_COMPAT =>
+              (
+                CometScanExec.isSchemaSupported(scanExec.requiredSchema),
+                CometScanExec.isSchemaSupported(r.partitionSchema))
+          }
+
+          if (!schemaSupported) {
+            fallbackReasons += s"Unsupported schema ${scanExec.requiredSchema} 
for $scanImpl"
+          }
+          if (!partitionSchemaSupported) {
+            fallbackReasons += s"Unsupported partitioning schema 
${r.partitionSchema} for $scanImpl"
+          }
+
+          if (fallbackReasons.isEmpty) {
+            CometScanExec(scanExec, session)
+          } else {
+            withInfo(scanExec, fallbackReasons.mkString(", "))
             scanExec
-        }
+          }
+
+        case _ =>
+          withInfo(scanExec, s"Unsupported relation ${scanExec.relation}")
+          scanExec
       }
     }
   }
diff --git 
a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
index 1df865290..45134b8b4 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
@@ -110,6 +110,8 @@ abstract class CometColumnarShuffleSuite extends 
CometTestBase with AdaptiveSpar
   }
 
   test("columnar shuffle on nested struct including nulls") {
+    // https://github.com/apache/datafusion-comet/issues/1538
+    assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_DATAFUSION)
     Seq(10, 201).foreach { numPartitions =>
       Seq("1.0", "10.0").foreach { ratio =>
         withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> 
ratio) {
@@ -247,6 +249,9 @@ abstract class CometColumnarShuffleSuite extends 
CometTestBase with AdaptiveSpar
   }
 
   test("columnar shuffle on map") {
+    // https://github.com/apache/datafusion-comet/issues/1538
+    assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_DATAFUSION)
+
     def genTuples[K](num: Int, keys: Seq[K]): Seq[(
         Int,
         Map[K, Boolean],
@@ -580,6 +585,9 @@ abstract class CometColumnarShuffleSuite extends 
CometTestBase with AdaptiveSpar
   }
 
   test("columnar shuffle on array") {
+    // https://github.com/apache/datafusion-comet/issues/1538
+    assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_DATAFUSION)
+
     Seq(10, 201).foreach { numPartitions =>
       Seq("1.0", "10.0").foreach { ratio =>
         withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> 
ratio) {
@@ -678,6 +686,8 @@ abstract class CometColumnarShuffleSuite extends 
CometTestBase with AdaptiveSpar
   }
 
   test("fix: Dictionary field should have distinct dict_id") {
+    // https://github.com/apache/datafusion-comet/issues/1538
+    assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_DATAFUSION)
     Seq(10, 201).foreach { numPartitions =>
       withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> 
"2.0") {
         withParquetTable(
@@ -695,6 +705,8 @@ abstract class CometColumnarShuffleSuite extends 
CometTestBase with AdaptiveSpar
   }
 
   test("dictionary shuffle") {
+    // https://github.com/apache/datafusion-comet/issues/1538
+    assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_DATAFUSION)
     Seq(10, 201).foreach { numPartitions =>
       withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> 
"2.0") {
         withParquetTable((0 until 10000).map(i => (1.toString, (i + 
1).toLong)), "tbl") {
@@ -710,6 +722,8 @@ abstract class CometColumnarShuffleSuite extends 
CometTestBase with AdaptiveSpar
   }
 
   test("dictionary shuffle: fallback to string") {
+    // https://github.com/apache/datafusion-comet/issues/1538
+    assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_DATAFUSION)
     Seq(10, 201).foreach { numPartitions =>
       withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> 
"1000000000.0") {
         withParquetTable((0 until 10000).map(i => (1.toString, (i + 
1).toLong)), "tbl") {
@@ -725,6 +739,8 @@ abstract class CometColumnarShuffleSuite extends 
CometTestBase with AdaptiveSpar
   }
 
   test("fix: inMemSorter should be reset after spilling") {
+    // https://github.com/apache/datafusion-comet/issues/1538
+    assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_DATAFUSION)
     withParquetTable((0 until 10000).map(i => (1, (i + 1).toLong)), "tbl") {
       assert(
         sql("SELECT * FROM tbl").repartition(201, $"_1").count() == 
sql("SELECT * FROM tbl")
@@ -733,6 +749,8 @@ abstract class CometColumnarShuffleSuite extends 
CometTestBase with AdaptiveSpar
   }
 
   test("fix: native Unsafe row accessors return incorrect results") {
+    // https://github.com/apache/datafusion-comet/issues/1538
+    assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_DATAFUSION)
     Seq(10, 201).foreach { numPartitions =>
       withTempDir { dir =>
         val path = new Path(dir.toURI.toString, "test.parquet")
@@ -854,6 +872,8 @@ abstract class CometColumnarShuffleSuite extends 
CometTestBase with AdaptiveSpar
   }
 
   test("Columnar shuffle for large shuffle partition number") {
+    // https://github.com/apache/datafusion-comet/issues/1538
+    assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_DATAFUSION)
     Seq(10, 200, 201).foreach { numPartitions =>
       withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
         val df = sql("SELECT * FROM tbl")
@@ -872,6 +892,8 @@ abstract class CometColumnarShuffleSuite extends 
CometTestBase with AdaptiveSpar
   }
 
   test("hash-based columnar shuffle") {
+    // https://github.com/apache/datafusion-comet/issues/1538
+    assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_DATAFUSION)
     Seq(10, 200, 201).foreach { numPartitions =>
       withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
         val df = sql("SELECT * FROM tbl")
@@ -900,6 +922,9 @@ abstract class CometColumnarShuffleSuite extends 
CometTestBase with AdaptiveSpar
   }
 
   test("columnar shuffle: different data type") {
+    // https://github.com/apache/datafusion-comet/issues/1538
+    assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_DATAFUSION)
+
     Seq(true, false).foreach { dictionaryEnabled =>
       withTempDir { dir =>
         val path = new Path(dir.toURI.toString, "test.parquet")
diff --git 
a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
index 5b58befcd..9d40a0cdf 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
@@ -57,6 +57,8 @@ class CometNativeShuffleSuite extends CometTestBase with 
AdaptiveSparkPlanHelper
   }
 
   test("native shuffle: different data type") {
+    // https://github.com/apache/datafusion-comet/issues/1538
+    assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_DATAFUSION)
     Seq(true, false).foreach { execEnabled =>
       Seq(true, false).foreach { dictionaryEnabled =>
         withTempDir { dir =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to