This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d46ff245 chore: Add `scanImpl` attribute to `CometScanExec` (#1746)
7d46ff245 is described below

commit 7d46ff245c3d95fc9460db56e52c499588689123
Author: Andy Grove <[email protected]>
AuthorDate: Tue May 20 11:58:54 2025 -0600

    chore: Add `scanImpl` attribute to `CometScanExec` (#1746)
---
 .github/workflows/miri.yml                         |  2 +-
 .../main/scala/org/apache/comet/CometConf.scala    |  5 --
 dev/diffs/3.4.3.diff                               |  2 +-
 dev/diffs/3.5.4.diff                               |  2 +-
 dev/diffs/3.5.5.diff                               |  2 +-
 dev/diffs/4.0.0-preview1.diff                      |  2 +-
 .../comet/parquet/CometParquetFileFormat.scala     |  9 ++--
 .../CometParquetPartitionReaderFactory.scala       | 12 +----
 .../apache/comet/parquet/CometParquetScan.scala    |  1 +
 .../org/apache/comet/rules/CometExecRule.scala     |  5 +-
 .../org/apache/comet/rules/CometScanRule.scala     | 44 +++++++++++-----
 .../org/apache/comet/serde/QueryPlanSerde.scala    | 23 +++++++--
 .../spark/sql/comet/CometNativeScanExec.scala      | 24 ++-------
 .../org/apache/spark/sql/comet/CometScanExec.scala | 58 +++++++++-------------
 .../apache/comet/CometArrayExpressionSuite.scala   |  2 +-
 .../scala/org/apache/comet/CometCastSuite.scala    |  2 +-
 .../org/apache/comet/CometExpressionSuite.scala    |  6 +--
 .../org/apache/comet/CometFuzzTestSuite.scala      | 18 +++----
 .../org/apache/comet/exec/CometJoinSuite.scala     |  2 +-
 .../apache/comet/parquet/ParquetReadSuite.scala    | 32 ++++++------
 .../scala/org/apache/spark/sql/CometTestBase.scala |  2 +
 21 files changed, 125 insertions(+), 130 deletions(-)

diff --git a/.github/workflows/miri.yml b/.github/workflows/miri.yml
index 2f67c2866..923598ee3 100644
--- a/.github/workflows/miri.yml
+++ b/.github/workflows/miri.yml
@@ -52,4 +52,4 @@ jobs:
       - name: Test with Miri
         run: |
           cd native
-          MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test
+          MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test --lib --bins 
--tests --examples
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 3f35133a5..7b01d0fa1 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -104,11 +104,6 @@ object CometConf extends ShimCometConf {
       .getOrElse("COMET_PARQUET_SCAN_IMPL", SCAN_NATIVE_COMET)
       .toLowerCase(Locale.ROOT))
 
-  def isExperimentalNativeScan: Boolean = COMET_NATIVE_SCAN_IMPL.get() match {
-    case SCAN_NATIVE_DATAFUSION | SCAN_NATIVE_ICEBERG_COMPAT => true
-    case SCAN_NATIVE_COMET => false
-  }
-
   val COMET_PARQUET_PARALLEL_IO_ENABLED: ConfigEntry[Boolean] =
     conf("spark.comet.parquet.read.parallel.io.enabled")
       .doc(
diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff
index eecd8cd02..03353ed2a 100644
--- a/dev/diffs/3.4.3.diff
+++ b/dev/diffs/3.4.3.diff
@@ -961,7 +961,7 @@ index 75eabcb96f2..36e3318ad7e 100644
                _.asInstanceOf[FileScanRDD].filePartitions.forall(
                  _.files.forall(_.urlEncodedPath.contains("p=0"))))
 +        case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
-+        fs @ CometScanExec(_, _, _, partitionFilters, _, _, _, _, _, _)))) =>
++        fs @ CometScanExec(_, _, _, _, partitionFilters, _, _, _, _, _, _)))) 
=>
 +          partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
 +            fs.inputRDDs().forall(
 +              _.asInstanceOf[FileScanRDD].filePartitions.forall(
diff --git a/dev/diffs/3.5.4.diff b/dev/diffs/3.5.4.diff
index d9804b887..d4f8cdd37 100644
--- a/dev/diffs/3.5.4.diff
+++ b/dev/diffs/3.5.4.diff
@@ -1092,7 +1092,7 @@ index 260c992f1ae..b9d8e22337c 100644
                _.asInstanceOf[FileScanRDD].filePartitions.forall(
                  _.files.forall(_.urlEncodedPath.contains("p=0"))))
 +        case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
-+        fs @ CometScanExec(_, _, _, partitionFilters, _, _, _, _, _, _)))) =>
++        fs @ CometScanExec(_, _, _, _, partitionFilters, _, _, _, _, _, _)))) 
=>
 +          partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
 +            fs.inputRDDs().forall(
 +              _.asInstanceOf[FileScanRDD].filePartitions.forall(
diff --git a/dev/diffs/3.5.5.diff b/dev/diffs/3.5.5.diff
index 4270037b9..1d5198f38 100644
--- a/dev/diffs/3.5.5.diff
+++ b/dev/diffs/3.5.5.diff
@@ -963,7 +963,7 @@ index 04702201f82..6cc2b01b7f3 100644
                _.asInstanceOf[FileScanRDD].filePartitions.forall(
                  _.files.forall(_.urlEncodedPath.contains("p=0"))))
 +        case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
-+        fs @ CometScanExec(_, _, _, partitionFilters, _, _, _, _, _, _)))) =>
++        fs @ CometScanExec(_, _, _, _, partitionFilters, _, _, _, _, _, _)))) 
=>
 +          partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
 +            fs.inputRDDs().forall(
 +              _.asInstanceOf[FileScanRDD].filePartitions.forall(
diff --git a/dev/diffs/4.0.0-preview1.diff b/dev/diffs/4.0.0-preview1.diff
index 3efb4b2b7..a1c6a1422 100644
--- a/dev/diffs/4.0.0-preview1.diff
+++ b/dev/diffs/4.0.0-preview1.diff
@@ -1035,7 +1035,7 @@ index 68f14f13bbd..174636cefb5 100644
                _.asInstanceOf[FileScanRDD].filePartitions.forall(
                  _.files.forall(_.urlEncodedPath.contains("p=0"))))
 +        case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
-+        fs @ CometScanExec(_, _, _, partitionFilters, _, _, _, _, _, _)))) =>
++        fs @ CometScanExec(_, _, _, _, partitionFilters, _, _, _, _, _, _)))) 
=>
 +          partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
 +            fs.inputRDDs().forall(
 +              _.asInstanceOf[FileScanRDD].filePartitions.forall(
diff --git 
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala 
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
index 668865e08..871ac2704 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
@@ -56,7 +56,10 @@ import org.apache.comet.vector.CometVector
  *     in [[org.apache.comet.CometSparkSessionExtensions]]
  *   - `buildReaderWithPartitionValues`, so Spark calls Comet's Parquet reader 
to read values.
  */
-class CometParquetFileFormat extends ParquetFileFormat with MetricsSupport 
with ShimSQLConf {
+class CometParquetFileFormat(scanImpl: String)
+    extends ParquetFileFormat
+    with MetricsSupport
+    with ShimSQLConf {
   override def shortName(): String = "parquet"
   override def toString: String = "CometParquet"
   override def hashCode(): Int = getClass.hashCode()
@@ -100,8 +103,8 @@ class CometParquetFileFormat extends ParquetFileFormat with 
MetricsSupport with
 
     // Comet specific configurations
     val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf)
-    val nativeIcebergCompat =
-      
CometConf.COMET_NATIVE_SCAN_IMPL.get(sqlConf).equals(CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
+
+    val nativeIcebergCompat = scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT
 
     (file: PartitionedFile) => {
       val sharedConf = broadcastedHadoopConf.value.value
diff --git 
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala
 
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala
index 4dc099735..69cffdd15 100644
--- 
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala
+++ 
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala
@@ -46,6 +46,7 @@ import org.apache.comet.{CometConf, CometRuntimeException}
 import org.apache.comet.shims.ShimSQLConf
 
 case class CometParquetPartitionReaderFactory(
+    usingDataFusionReader: Boolean,
     @transient sqlConf: SQLConf,
     broadcastedConf: Broadcast[SerializableConfiguration],
     readDataSchema: StructType,
@@ -71,17 +72,6 @@ case class CometParquetPartitionReaderFactory(
   // Comet specific configurations
   private val batchSize = CometConf.COMET_BATCH_SIZE.get(sqlConf)
 
-  @transient private lazy val usingDataFusionReader: Boolean = {
-    val conf = broadcastedConf.value.value
-    conf.getBoolean(
-      CometConf.COMET_NATIVE_SCAN_ENABLED.key,
-      CometConf.COMET_NATIVE_SCAN_ENABLED.defaultValue.get) &&
-    conf
-      .get(
-        CometConf.COMET_NATIVE_SCAN_IMPL.key,
-        CometConf.COMET_NATIVE_SCAN_IMPL.defaultValueString)
-      .equalsIgnoreCase(CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
-  }
   // This is only called at executor on a Broadcast variable, so we don't want 
it to be
   // materialized at driver.
   @transient private lazy val preFetchEnabled = {
diff --git 
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetScan.scala 
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetScan.scala
index e3cd33b41..b04d6ebc1 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetScan.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetScan.scala
@@ -58,6 +58,7 @@ trait CometParquetScan extends FileScan with MetricsSupport {
     val broadcastedConf =
       sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
     CometParquetPartitionReaderFactory(
+      usingDataFusionReader = false, // this value is not used since this is 
v2 scan
       sqlConf,
       broadcastedConf,
       readDataSchema,
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala 
b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
index da4795fe8..d325043d8 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.window.WindowExec
 import org.apache.spark.sql.types.{DoubleType, FloatType}
 
 import org.apache.comet.{CometConf, ExtendedExplainInfo}
-import org.apache.comet.CometConf.{COMET_ANSI_MODE_ENABLED, 
COMET_NATIVE_SCAN_IMPL, COMET_SHUFFLE_FALLBACK_TO_COLUMNAR}
+import org.apache.comet.CometConf.{COMET_ANSI_MODE_ENABLED, 
COMET_SHUFFLE_FALLBACK_TO_COLUMNAR}
 import org.apache.comet.CometSparkSessionExtensions.{createMessage, 
getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, 
isANSIEnabled, isCometBroadCastForceEnabled, isCometExecEnabled, 
isCometJVMShuffleMode, isCometLoaded, isCometNativeShuffleMode, isCometScan, 
isCometShuffleEnabled, isSpark40Plus, shouldApplySparkToColumnar, withInfo}
 import org.apache.comet.serde.OperatorOuterClass.Operator
 import org.apache.comet.serde.QueryPlanSerde
@@ -154,8 +154,7 @@ case class CometExecRule(session: SparkSession) extends 
Rule[SparkPlan] {
 
     plan.transformUp {
       // Fully native scan for V1
-      case scan: CometScanExec
-          if COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_DATAFUSION 
=>
+      case scan: CometScanExec if scan.scanImpl == 
CometConf.SCAN_NATIVE_DATAFUSION =>
         val nativeOp = QueryPlanSerde.operator2Proto(scan).get
         CometNativeScanExec(nativeOp, scan.wrapped, scan.session)
 
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala 
b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
index 429a00291..eed1dfdba 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
@@ -25,14 +25,15 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
PlanExpression}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.MetadataColumnHelper
-import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, 
CometScanExec}
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
 import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.HadoopFsRelation
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, ByteType, DataType, MapType, 
ShortType, StructType}
 
-import org.apache.comet.CometConf
+import org.apache.comet.{CometConf, DataTypeSupport}
 import org.apache.comet.CometConf._
 import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, 
isCometScanEnabled, withInfo, withInfos}
 import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
@@ -106,16 +107,11 @@ case class CometScanRule(session: SparkSession) extends 
Rule[SparkPlan] {
           return withInfos(scanExec, fallbackReasons.toSet)
         }
 
-        val (schemaSupported, partitionSchemaSupported) = scanImpl match {
-          case CometConf.SCAN_NATIVE_DATAFUSION =>
-            (
-              CometNativeScanExec.isSchemaSupported(scanExec.requiredSchema, 
fallbackReasons),
-              CometNativeScanExec.isSchemaSupported(r.partitionSchema, 
fallbackReasons))
-          case CometConf.SCAN_NATIVE_COMET | SCAN_NATIVE_ICEBERG_COMPAT =>
-            (
-              CometScanExec.isSchemaSupported(scanExec.requiredSchema, 
fallbackReasons),
-              CometScanExec.isSchemaSupported(r.partitionSchema, 
fallbackReasons))
-        }
+        val typeChecker = new CometScanTypeChecker(scanImpl)
+        val schemaSupported =
+          typeChecker.isSchemaSupported(scanExec.requiredSchema, 
fallbackReasons)
+        val partitionSchemaSupported =
+          typeChecker.isSchemaSupported(r.partitionSchema, fallbackReasons)
 
         if (!schemaSupported) {
           fallbackReasons += s"Unsupported schema ${scanExec.requiredSchema} 
for $scanImpl"
@@ -125,7 +121,9 @@ case class CometScanRule(session: SparkSession) extends 
Rule[SparkPlan] {
         }
 
         if (schemaSupported && partitionSchemaSupported) {
-          CometScanExec(scanExec, session)
+          // this is confusing, but we always insert a CometScanExec here, 
which may replaced
+          // with a CometNativeExec when CometExecRule runs, depending on the 
scanImpl value.
+          CometScanExec(scanExec, session, scanImpl)
         } else {
           withInfos(scanExec, fallbackReasons.toSet)
         }
@@ -201,3 +199,23 @@ case class CometScanRule(session: SparkSession) extends 
Rule[SparkPlan] {
   }
 
 }
+
+case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport {
+  override def isTypeSupported(
+      dt: DataType,
+      name: String,
+      fallbackReasons: ListBuffer[String]): Boolean = {
+    dt match {
+      case ByteType | ShortType
+          if scanImpl != CometConf.SCAN_NATIVE_COMET &&
+            !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() =>
+        fallbackReasons += s"$scanImpl scan cannot read $dt when " +
+          s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false. 
${CometConf.COMPAT_GUIDE}."
+        false
+      case _: StructType | _: ArrayType | _: MapType if scanImpl == 
CometConf.SCAN_NATIVE_COMET =>
+        false
+      case _ =>
+        super.isTypeSupported(dt, name, fallbackReasons)
+    }
+  }
+}
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 79a6c16c5..ad9683918 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -2282,8 +2282,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
     op match {
 
       // Fully native scan for V1
-      case scan: CometScanExec
-          if CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) == 
CometConf.SCAN_NATIVE_DATAFUSION =>
+      case scan: CometScanExec if scan.scanImpl == 
CometConf.SCAN_NATIVE_DATAFUSION =>
         val nativeScanBuilder = OperatorOuterClass.NativeScan.newBuilder()
         nativeScanBuilder.setSource(op.simpleStringWithNodeId())
 
@@ -2376,12 +2375,26 @@ object QueryPlanSerde extends Logging with 
CometExprShim {
         val cond = exprToProto(condition, child.output)
 
         if (cond.isDefined && childOp.nonEmpty) {
+          // We need to determine whether to use DataFusion's FilterExec or 
Comet's
+          // FilterExec. The difference is that DataFusion's implementation 
will sometimes pass
+          // batches through whereas the Comet implementation guarantees that 
a copy is always
+          // made, which is critical when using `native_comet` scans due to 
buffer re-use
+
+          // TODO this could be optimized more to stop walking the tree on 
hitting
+          //  certain operators such as join or aggregate which will copy 
batches
+          def containsNativeCometScan(plan: SparkPlan): Boolean = {
+            plan match {
+              case w: CometScanWrapper => 
containsNativeCometScan(w.originalPlan)
+              case scan: CometScanExec => scan.scanImpl == 
CometConf.SCAN_NATIVE_COMET
+              case _: CometNativeScanExec => false
+              case _ => plan.children.exists(containsNativeCometScan)
+            }
+          }
+
           val filterBuilder = OperatorOuterClass.Filter
             .newBuilder()
             .setPredicate(cond.get)
-            .setUseDatafusionFilter(
-              CometConf.COMET_NATIVE_SCAN_IMPL.get() == 
CometConf.SCAN_NATIVE_DATAFUSION ||
-                CometConf.COMET_NATIVE_SCAN_IMPL.get() == 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
+            .setUseDatafusionFilter(!containsNativeCometScan(op))
           Some(result.setFilter(filterBuilder).build())
         } else {
           withInfo(op, condition, child)
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
index 922b75e78..92b2e6a88 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
@@ -19,7 +19,6 @@
 
 package org.apache.spark.sql.comet
 
-import scala.collection.mutable.ListBuffer
 import scala.reflect.ClassTag
 
 import org.apache.spark.rdd.RDD
@@ -36,12 +35,12 @@ import org.apache.spark.util.collection._
 
 import com.google.common.base.Objects
 
-import org.apache.comet.{CometConf, DataTypeSupport}
+import org.apache.comet.CometConf
 import org.apache.comet.parquet.CometParquetFileFormat
 import org.apache.comet.serde.OperatorOuterClass.Operator
 
 /**
- * Comet fully native scan node for DataSource V1.
+ * Comet fully native scan node for DataSource V1 that delegates to 
DataFusion's DataSourceExec.
  */
 case class CometNativeScanExec(
     override val nativeOp: Operator,
@@ -184,7 +183,7 @@ case class CometNativeScanExec(
   override def inputRDDs(): Seq[RDD[InternalRow]] = originalPlan.inputRDDs()
 }
 
-object CometNativeScanExec extends DataTypeSupport {
+object CometNativeScanExec {
   def apply(
       nativeOp: Operator,
       scanExec: FileSourceScanExec,
@@ -206,7 +205,8 @@ object CometNativeScanExec extends DataTypeSupport {
     // https://github.com/apache/arrow-datafusion-comet/issues/190
     def transform(arg: Any): AnyRef = arg match {
       case _: HadoopFsRelation =>
-        scanExec.relation.copy(fileFormat = new 
CometParquetFileFormat)(session)
+        scanExec.relation.copy(fileFormat =
+          new 
CometParquetFileFormat(CometConf.SCAN_NATIVE_DATAFUSION))(session)
       case other: AnyRef => other
       case null => null
     }
@@ -229,18 +229,4 @@ object CometNativeScanExec extends DataTypeSupport {
     scanExec.logicalLink.foreach(batchScanExec.setLogicalLink)
     batchScanExec
   }
-
-  override def isTypeSupported(
-      dt: DataType,
-      name: String,
-      fallbackReasons: ListBuffer[String]): Boolean = {
-    dt match {
-      case ByteType | ShortType if 
!CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() =>
-        fallbackReasons += s"${CometConf.SCAN_NATIVE_DATAFUSION} scan cannot 
read $dt when " +
-          s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false. 
${CometConf.COMPAT_GUIDE}."
-        false
-      case _ =>
-        super.isTypeSupported(dt, name, fallbackReasons)
-    }
-  }
 }
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
index a4d7f4ec4..8bba2d863 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
@@ -19,7 +19,7 @@
 
 package org.apache.spark.sql.comet
 
-import scala.collection.mutable.{HashMap, ListBuffer}
+import scala.collection.mutable.HashMap
 import scala.concurrent.duration.NANOSECONDS
 import scala.reflect.ClassTag
 
@@ -44,14 +44,22 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.SerializableConfiguration
 import org.apache.spark.util.collection._
 
-import org.apache.comet.{CometConf, DataTypeSupport, MetricsSupport}
+import org.apache.comet.{CometConf, MetricsSupport}
 import org.apache.comet.parquet.{CometParquetFileFormat, 
CometParquetPartitionReaderFactory}
 
 /**
  * Comet physical scan node for DataSource V1. Most of the code here follow 
Spark's
- * [[FileSourceScanExec]],
+ * [[FileSourceScanExec]].
+ *
+ * This is a hybrid scan where the native plan will contain a `ScanExec` that 
reads batches of
+ * data from the JVM via JNI. The ultimate source of data may be a JVM 
implementation such as
+ * Spark readers, or could be the `native_comet` or `native_iceberg_compat` 
native scans.
+ *
+ * Note that scanImpl can only be `native_datafusion` after CometScanRule runs 
and before
+ * CometExecRule runs. It will never be set to `native_datafusion` at 
execution time
  */
 case class CometScanExec(
+    scanImpl: String,
     @transient relation: HadoopFsRelation,
     output: Seq[Attribute],
     requiredSchema: StructType,
@@ -414,16 +422,8 @@ case class CometScanExec(
       readFile: (PartitionedFile) => Iterator[InternalRow],
       partitions: Seq[FilePartition]): RDD[InternalRow] = {
     val hadoopConf = 
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)
-    val usingDataFusionReader: Boolean = {
-      hadoopConf.getBoolean(
-        CometConf.COMET_NATIVE_SCAN_ENABLED.key,
-        CometConf.COMET_NATIVE_SCAN_ENABLED.defaultValue.get) &&
-      hadoopConf
-        .get(
-          CometConf.COMET_NATIVE_SCAN_IMPL.key,
-          CometConf.COMET_NATIVE_SCAN_IMPL.defaultValueString)
-        .equalsIgnoreCase(CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
-    }
+    val usingDataFusionReader: Boolean = scanImpl == 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT
+
     val prefetchEnabled = hadoopConf.getBoolean(
       CometConf.COMET_SCAN_PREFETCH_ENABLED.key,
       CometConf.COMET_SCAN_PREFETCH_ENABLED.defaultValue.get) &&
@@ -435,6 +435,7 @@ case class CometScanExec(
       val broadcastedConf =
         fsRelation.sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
       val partitionReaderFactory = CometParquetPartitionReaderFactory(
+        scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT,
         sqlConf,
         broadcastedConf,
         requiredSchema,
@@ -461,6 +462,7 @@ case class CometScanExec(
 
   override def doCanonicalize(): CometScanExec = {
     CometScanExec(
+      scanImpl,
       relation,
       output.map(QueryPlan.normalizeExpressions(_, output)),
       requiredSchema,
@@ -476,9 +478,12 @@ case class CometScanExec(
   }
 }
 
-object CometScanExec extends DataTypeSupport {
+object CometScanExec {
 
-  def apply(scanExec: FileSourceScanExec, session: SparkSession): 
CometScanExec = {
+  def apply(
+      scanExec: FileSourceScanExec,
+      session: SparkSession,
+      scanImpl: String): CometScanExec = {
     // TreeNode.mapProductIterator is protected method.
     def mapProductIterator[B: ClassTag](product: Product, f: Any => B): 
Array[B] = {
       val arr = Array.ofDim[B](product.productArity)
@@ -496,13 +501,14 @@ object CometScanExec extends DataTypeSupport {
     // https://github.com/apache/arrow-datafusion-comet/issues/190
     def transform(arg: Any): AnyRef = arg match {
       case _: HadoopFsRelation =>
-        scanExec.relation.copy(fileFormat = new 
CometParquetFileFormat)(session)
+        scanExec.relation.copy(fileFormat = new 
CometParquetFileFormat(scanImpl))(session)
       case other: AnyRef => other
       case null => null
     }
-    val newArgs = mapProductIterator(scanExec, transform(_))
+    val newArgs = mapProductIterator(scanExec, transform)
     val wrapped = scanExec.makeCopy(newArgs).asInstanceOf[FileSourceScanExec]
     val batchScanExec = CometScanExec(
+      scanImpl,
       wrapped.relation,
       wrapped.output,
       wrapped.requiredSchema,
@@ -523,22 +529,4 @@ object CometScanExec extends DataTypeSupport {
     fileFormat.getClass().equals(classOf[ParquetFileFormat])
   }
 
-  override def isTypeSupported(
-      dt: DataType,
-      name: String,
-      fallbackReasons: ListBuffer[String]): Boolean = {
-    dt match {
-      case ByteType | ShortType
-          if CometConf.COMET_NATIVE_SCAN_IMPL.get() == 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT &&
-            !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() =>
-        fallbackReasons += s"${CometConf.SCAN_NATIVE_ICEBERG_COMPAT} scan 
cannot read $dt when " +
-          s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false. 
${CometConf.COMPAT_GUIDE}."
-        false
-      case _: StructType | _: ArrayType | _: MapType
-          if CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT =>
-        false
-      case _ =>
-        super.isTypeSupported(dt, name, fallbackReasons)
-    }
-  }
 }
diff --git 
a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
index c58f1a14f..4803f4922 100644
--- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
@@ -235,7 +235,7 @@ class CometArrayExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelp
 
   test("array_intersect") {
     // https://github.com/apache/datafusion-comet/issues/1441
-    assume(!CometConf.isExperimentalNativeScan)
+    assume(!usingDataSourceExec)
     withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {
 
       Seq(true, false).foreach { dictionaryEnabled =>
diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
index ec01c16fc..11e44251b 100644
--- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
@@ -932,7 +932,7 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
 
   test("cast StructType to StringType") {
     // https://github.com/apache/datafusion-comet/issues/1441
-    assume(!CometConf.isExperimentalNativeScan)
+    assume(!usingDataSourceExec)
     Seq(true, false).foreach { dictionaryEnabled =>
       withTempDir { dir =>
         val path = new Path(dir.toURI.toString, "test.parquet")
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 35f8d8fc0..43fbe57c5 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -1266,7 +1266,7 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
 
   test("round") {
     // https://github.com/apache/datafusion-comet/issues/1441
-    assume(!CometConf.isExperimentalNativeScan)
+    assume(!usingDataSourceExec)
     Seq(true, false).foreach { dictionaryEnabled =>
       withTempDir { dir =>
         val path = new Path(dir.toURI.toString, "test.parquet")
@@ -1496,7 +1496,7 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
 
   test("hex") {
     // https://github.com/apache/datafusion-comet/issues/1441
-    assume(!CometConf.isExperimentalNativeScan)
+    assume(!usingDataSourceExec)
     Seq(true, false).foreach { dictionaryEnabled =>
       withTempDir { dir =>
         val path = new Path(dir.toURI.toString, "hex.parquet")
@@ -2725,7 +2725,7 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
 
   test("test integral divide") {
     // https://github.com/apache/datafusion-comet/issues/1441
-    assume(!CometConf.isExperimentalNativeScan)
+    assume(!usingDataSourceExec)
     Seq(true, false).foreach { dictionaryEnabled =>
       withTempDir { dir =>
         val path1 = new Path(dir.toURI.toString, "test1.parquet")
diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
index 4af73ce01..05901339b 100644
--- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
@@ -81,7 +81,7 @@ class CometFuzzTestSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     val df = spark.read.parquet(filename)
     df.createOrReplaceTempView("t1")
     val sql = "SELECT * FROM t1"
-    if (CometConf.isExperimentalNativeScan) {
+    if (usingDataSourceExec) {
       checkSparkAnswerAndOperator(sql)
     } else {
       checkSparkAnswer(sql)
@@ -92,7 +92,7 @@ class CometFuzzTestSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     val df = spark.read.parquet(filename)
     df.createOrReplaceTempView("t1")
     val sql = "SELECT * FROM t1 LIMIT 500"
-    if (CometConf.isExperimentalNativeScan) {
+    if (usingDataSourceExec) {
       checkSparkAnswerAndOperator(sql)
     } else {
       checkSparkAnswer(sql)
@@ -106,7 +106,7 @@ class CometFuzzTestSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
       val sql = s"SELECT $col FROM t1 ORDER BY $col"
       // cannot run fully natively due to range partitioning and sort
       val (_, cometPlan) = checkSparkAnswer(sql)
-      if (CometConf.isExperimentalNativeScan) {
+      if (usingDataSourceExec) {
         assert(1 == collectNativeScans(cometPlan).length)
       }
     }
@@ -118,7 +118,7 @@ class CometFuzzTestSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     for (col <- df.columns) {
       val sql = s"SELECT count(distinct $col) FROM t1"
       val (_, cometPlan) = checkSparkAnswer(sql)
-      if (CometConf.isExperimentalNativeScan) {
+      if (usingDataSourceExec) {
         assert(1 == collectNativeScans(cometPlan).length)
       }
     }
@@ -131,7 +131,7 @@ class CometFuzzTestSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     val sql = s"SELECT $allCols FROM t1 ORDER BY $allCols"
     // cannot run fully natively due to range partitioning and sort
     val (_, cometPlan) = checkSparkAnswer(sql)
-    if (CometConf.isExperimentalNativeScan) {
+    if (usingDataSourceExec) {
       assert(1 == collectNativeScans(cometPlan).length)
     }
   }
@@ -143,7 +143,7 @@ class CometFuzzTestSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
       // cannot run fully natively due to range partitioning and sort
       val sql = s"SELECT $col, count(*) FROM t1 GROUP BY $col ORDER BY $col"
       val (_, cometPlan) = checkSparkAnswer(sql)
-      if (CometConf.isExperimentalNativeScan) {
+      if (usingDataSourceExec) {
         assert(1 == collectNativeScans(cometPlan).length)
       }
     }
@@ -156,7 +156,7 @@ class CometFuzzTestSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
       // cannot run fully native due to HashAggregate
       val sql = s"SELECT min($col), max($col) FROM t1"
       val (_, cometPlan) = checkSparkAnswer(sql)
-      if (CometConf.isExperimentalNativeScan) {
+      if (usingDataSourceExec) {
         assert(1 == collectNativeScans(cometPlan).length)
       }
     }
@@ -195,7 +195,7 @@ class CometFuzzTestSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     val df = spark.read.parquet(filename)
     val df2 = df.repartition(8, df.col("c0")).sort("c1")
     df2.collect()
-    if (CometConf.isExperimentalNativeScan) {
+    if (usingDataSourceExec) {
       val cometShuffles = 
collectCometShuffleExchanges(df2.queryExecution.executedPlan)
       assert(1 == cometShuffles.length)
     }
@@ -209,7 +209,7 @@ class CometFuzzTestSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
       // cannot run fully native due to HashAggregate
       val sql = s"SELECT count(*) FROM t1 JOIN t2 ON t1.$col = t2.$col"
       val (_, cometPlan) = checkSparkAnswer(sql)
-      if (CometConf.isExperimentalNativeScan) {
+      if (usingDataSourceExec) {
         assert(2 == collectNativeScans(cometPlan).length)
       }
     }
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
index e0a873dd1..d47b4e0c1 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
@@ -199,7 +199,7 @@ class CometJoinSuite extends CometTestBase {
 
   test("HashJoin struct key") {
     // https://github.com/apache/datafusion-comet/issues/1441
-    assume(!CometConf.isExperimentalNativeScan)
+    assume(!usingDataSourceExec)
     withSQLConf(
       "spark.sql.join.forceApplyShuffledHashJoin" -> "true",
       SQLConf.PREFER_SORTMERGEJOIN.key -> "false",
diff --git 
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala 
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
index 2fbad102d..334c468c5 100644
--- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
@@ -48,8 +48,8 @@ import org.apache.spark.unsafe.types.UTF8String
 import com.google.common.primitives.UnsignedLong
 
 import org.apache.comet.CometConf
-import org.apache.comet.CometConf.SCAN_NATIVE_ICEBERG_COMPAT
 import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus
+import org.apache.comet.rules.CometScanTypeChecker
 
 abstract class ParquetReadSuite extends CometTestBase {
   import testImplicits._
@@ -88,8 +88,7 @@ abstract class ParquetReadSuite extends CometTestBase {
     // for native iceberg compat, CometScanExec supports some types that 
native_comet does not.
     // note that native_datafusion does not use CometScanExec so we need not 
include that in
     // the check
-    val usingNativeIcebergCompat =
-      CometConf.COMET_NATIVE_SCAN_IMPL.get() == 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT
+    val isDataFusionScan = usingDataSourceExec(conf)
     Seq(
       NullType -> false,
       BooleanType -> true,
@@ -103,22 +102,22 @@ abstract class ParquetReadSuite extends CometTestBase {
       StringType -> true,
       // Timestamp here arbitrary for picking a concrete data type to from 
ArrayType
       // Any other type works
-      ArrayType(TimestampType) -> usingNativeIcebergCompat,
+      ArrayType(TimestampType) -> isDataFusionScan,
       StructType(
         Seq(
           StructField("f1", DecimalType.SYSTEM_DEFAULT),
-          StructField("f2", StringType))) -> usingNativeIcebergCompat,
-      MapType(keyType = LongType, valueType = DateType) -> 
usingNativeIcebergCompat,
+          StructField("f2", StringType))) -> isDataFusionScan,
+      MapType(keyType = LongType, valueType = DateType) -> isDataFusionScan,
       StructType(
-        Seq(
-          StructField("f1", ByteType),
-          StructField("f2", StringType))) -> usingNativeIcebergCompat,
-      MapType(keyType = IntegerType, valueType = BinaryType) -> 
usingNativeIcebergCompat)
+        Seq(StructField("f1", ByteType), StructField("f2", StringType))) -> 
isDataFusionScan,
+      MapType(keyType = IntegerType, valueType = BinaryType) -> 
isDataFusionScan)
       .foreach { case (dt, expected) =>
         val fallbackReasons = new ListBuffer[String]()
-        assert(CometScanExec.isTypeSupported(dt, "", fallbackReasons) == 
expected)
+        assert(
+          CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get())
+            .isTypeSupported(dt, "", fallbackReasons) == expected)
         // usingDataFusionParquetExec does not support CometBatchScanExec yet
-        if (!usingDataSourceExec(conf)) {
+        if (!isDataFusionScan) {
           assert(CometBatchScanExec.isTypeSupported(dt, "", fallbackReasons) 
== expected)
         }
       }
@@ -131,8 +130,7 @@ abstract class ParquetReadSuite extends CometTestBase {
 
     // Arrays support for iceberg compat native and for Parquet V1
     val cometScanExecSupported =
-      if 
(sys.env.get("COMET_PARQUET_SCAN_IMPL").contains(SCAN_NATIVE_ICEBERG_COMPAT) && 
this
-          .isInstanceOf[ParquetReadV1Suite])
+      if (usingDataSourceExec(conf) && this.isInstanceOf[ParquetReadV1Suite])
         Seq(true, true, true)
       else Seq(true, false, false)
 
@@ -140,7 +138,9 @@ abstract class ParquetReadSuite extends CometTestBase {
     val fallbackReasons = new ListBuffer[String]()
 
     schemaDDLs.zip(cometScanExecSupported).foreach { case (schema, expected) =>
-      assert(CometScanExec.isSchemaSupported(StructType(schema), 
fallbackReasons) == expected)
+      assert(
+        CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get())
+          .isSchemaSupported(StructType(schema), fallbackReasons) == expected)
     }
 
     schemaDDLs.zip(cometBatchScanExecSupported).foreach { case (schema, 
expected) =>
@@ -354,7 +354,7 @@ abstract class ParquetReadSuite extends CometTestBase {
 
   test("test multiple pages with different sizes and nulls") {
     // https://github.com/apache/datafusion-comet/issues/1441
-    assume(!CometConf.isExperimentalNativeScan)
+    assume(!usingDataSourceExec)
     def makeRawParquetFile(
         path: Path,
         dictionaryEnabled: Boolean,
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala 
b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index f69b94459..fbfdbcda6 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -979,6 +979,8 @@ abstract class CometTestBase
     writer.close()
   }
 
+  def usingDataSourceExec: Boolean = usingDataSourceExec(SQLConf.get)
+
   def usingDataSourceExec(conf: SQLConf): Boolean =
     Seq(CometConf.SCAN_NATIVE_ICEBERG_COMPAT, 
CometConf.SCAN_NATIVE_DATAFUSION).contains(
       CometConf.COMET_NATIVE_SCAN_IMPL.get(conf))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to