This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 237929d6d fix: Remove fallback for maps containing complex types 
(#2943)
237929d6d is described below

commit 237929d6d9ecfdd9e1db4def94e24e7cc440b6da
Author: Andy Grove <[email protected]>
AuthorDate: Tue Dec 23 08:11:58 2025 -0700

    fix: Remove fallback for maps containing complex types (#2943)
---
 .../scala/org/apache/comet/DataTypeSupport.scala   | 10 +++++
 .../org/apache/comet/rules/CometScanRule.scala     | 24 +---------
 .../scala/org/apache/comet/CometFuzzTestBase.scala | 44 ++++++++++++++++---
 .../org/apache/comet/CometFuzzTestSuite.scala      | 51 +++++++++++++---------
 .../scala/org/apache/spark/sql/CometTestBase.scala |  1 -
 .../spark/sql/CometToPrettyStringSuite.scala       | 11 ++---
 .../spark/sql/CometToPrettyStringSuite.scala       | 11 ++---
 7 files changed, 86 insertions(+), 66 deletions(-)

diff --git a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala 
b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
index 9adf82958..9f8fc77eb 100644
--- a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
+++ b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
@@ -79,4 +79,14 @@ object DataTypeSupport {
     case _: StructType | _: ArrayType | _: MapType => true
     case _ => false
   }
+
+  def hasTemporalType(t: DataType): Boolean = t match {
+    case DataTypes.DateType | DataTypes.TimestampType | 
DataTypes.TimestampNTZType =>
+      true
+    case t: StructType => t.exists(f => hasTemporalType(f.dataType))
+    case t: ArrayType => hasTemporalType(t.elementType)
+    case t: MapType => hasTemporalType(t.keyType) || 
hasTemporalType(t.valueType)
+    case _ => false
+  }
+
 }
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala 
b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
index 69bce7555..01e385b0a 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
@@ -592,34 +592,12 @@ case class CometScanRule(session: SparkSession) extends 
Rule[SparkPlan] with Com
     val partitionSchemaSupported =
       typeChecker.isSchemaSupported(partitionSchema, fallbackReasons)
 
-    def hasUnsupportedType(dataType: DataType): Boolean = {
-      dataType match {
-        case s: StructType => s.exists(field => 
hasUnsupportedType(field.dataType))
-        case a: ArrayType => hasUnsupportedType(a.elementType)
-        case m: MapType =>
-          // maps containing complex types are not supported
-          isComplexType(m.keyType) || isComplexType(m.valueType) ||
-          hasUnsupportedType(m.keyType) || hasUnsupportedType(m.valueType)
-        case dt if isStringCollationType(dt) => true
-        case _ => false
-      }
-    }
-
-    val knownIssues =
-      scanExec.requiredSchema.exists(field => 
hasUnsupportedType(field.dataType)) ||
-        partitionSchema.exists(field => hasUnsupportedType(field.dataType))
-
-    if (knownIssues) {
-      fallbackReasons += "Schema contains data types that are not supported by 
" +
-        s"$SCAN_NATIVE_ICEBERG_COMPAT"
-    }
-
     val cometExecEnabled = COMET_EXEC_ENABLED.get()
     if (!cometExecEnabled) {
       fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT requires 
${COMET_EXEC_ENABLED.key}=true"
     }
 
-    if (cometExecEnabled && schemaSupported && partitionSchemaSupported && 
!knownIssues &&
+    if (cometExecEnabled && schemaSupported && partitionSchemaSupported &&
       fallbackReasons.isEmpty) {
       logInfo(s"Auto scan mode selecting $SCAN_NATIVE_ICEBERG_COMPAT")
       SCAN_NATIVE_ICEBERG_COMPAT
diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala 
b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala
index 1c0636780..74858ed61 100644
--- a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala
+++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala
@@ -35,12 +35,15 @@ import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.internal.SQLConf
 
-import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, 
SchemaGenOptions}
+import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator, 
ParquetGenerator, SchemaGenOptions}
 
 class CometFuzzTestBase extends CometTestBase with AdaptiveSparkPlanHelper {
 
   var filename: String = null
 
+  /** Filename for data file with deeply nested complex types */
+  var complexTypesFilename: String = null
+
   /**
    * We use Asia/Kathmandu because it has a non-zero number of minutes as the 
offset, so is an
    * interesting edge case. Also, this timezone tends to be different from the 
default system
@@ -53,18 +56,20 @@ class CometFuzzTestBase extends CometTestBase with 
AdaptiveSparkPlanHelper {
   override def beforeAll(): Unit = {
     super.beforeAll()
     val tempDir = System.getProperty("java.io.tmpdir")
-    filename = 
s"$tempDir/CometFuzzTestSuite_${System.currentTimeMillis()}.parquet"
     val random = new Random(42)
+    val dataGenOptions = DataGenOptions(
+      generateNegativeZero = false,
+      // override base date due to known issues with experimental scans
+      baseDate = new SimpleDateFormat("YYYY-MM-DD hh:mm:ss").parse("2024-05-25 
12:34:56").getTime)
+
+    // generate Parquet file with primitives, structs, and arrays, but no maps
+    // and no nested complex types
+    filename = 
s"$tempDir/CometFuzzTestSuite_${System.currentTimeMillis()}.parquet"
     withSQLConf(
       CometConf.COMET_ENABLED.key -> "false",
       SQLConf.SESSION_LOCAL_TIMEZONE.key -> defaultTimezone) {
       val schemaGenOptions =
         SchemaGenOptions(generateArray = true, generateStruct = true)
-      val dataGenOptions = DataGenOptions(
-        generateNegativeZero = false,
-        // override base date due to known issues with experimental scans
-        baseDate =
-          new SimpleDateFormat("YYYY-MM-DD hh:mm:ss").parse("2024-05-25 
12:34:56").getTime)
       ParquetGenerator.makeParquetFile(
         random,
         spark,
@@ -73,6 +78,30 @@ class CometFuzzTestBase extends CometTestBase with 
AdaptiveSparkPlanHelper {
         schemaGenOptions,
         dataGenOptions)
     }
+
+    // generate Parquet file with complex nested types
+    complexTypesFilename =
+      
s"$tempDir/CometFuzzTestSuite_nested_${System.currentTimeMillis()}.parquet"
+    withSQLConf(
+      CometConf.COMET_ENABLED.key -> "false",
+      SQLConf.SESSION_LOCAL_TIMEZONE.key -> defaultTimezone) {
+      val schemaGenOptions =
+        SchemaGenOptions(generateArray = true, generateStruct = true, 
generateMap = true)
+      val schema = FuzzDataGenerator.generateNestedSchema(
+        random,
+        numCols = 10,
+        minDepth = 2,
+        maxDepth = 4,
+        options = schemaGenOptions)
+      ParquetGenerator.makeParquetFile(
+        random,
+        spark,
+        complexTypesFilename,
+        schema,
+        1000,
+        dataGenOptions)
+    }
+
   }
 
   protected override def afterAll(): Unit = {
@@ -84,6 +113,7 @@ class CometFuzzTestBase extends CometTestBase with 
AdaptiveSparkPlanHelper {
       pos: Position): Unit = {
     Seq("native", "jvm").foreach { shuffleMode =>
       Seq(
+        CometConf.SCAN_AUTO,
         CometConf.SCAN_NATIVE_COMET,
         CometConf.SCAN_NATIVE_DATAFUSION,
         CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach { scanImpl =>
diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
index 59680bd6b..833314a5c 100644
--- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
@@ -28,7 +28,7 @@ import 
org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
 import org.apache.spark.sql.types._
 
 import org.apache.comet.DataTypeSupport.isComplexType
-import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, 
SchemaGenOptions}
+import org.apache.comet.testing.{DataGenOptions, ParquetGenerator}
 import org.apache.comet.testing.FuzzDataGenerator.{doubleNaNLiteral, 
floatNaNLiteral}
 
 class CometFuzzTestSuite extends CometFuzzTestBase {
@@ -44,6 +44,17 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
     }
   }
 
+  test("select * with deeply nested complex types") {
+    val df = spark.read.parquet(complexTypesFilename)
+    df.createOrReplaceTempView("t1")
+    val sql = "SELECT * FROM t1"
+    if (CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_COMET) 
{
+      checkSparkAnswerAndOperator(sql)
+    } else {
+      checkSparkAnswer(sql)
+    }
+  }
+
   test("select * with limit") {
     val df = spark.read.parquet(filename)
     df.createOrReplaceTempView("t1")
@@ -179,7 +190,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
         case CometConf.SCAN_NATIVE_COMET =>
           // native_comet does not support reading complex types
           0
-        case CometConf.SCAN_NATIVE_ICEBERG_COMPAT | 
CometConf.SCAN_NATIVE_DATAFUSION =>
+        case _ =>
           CometConf.COMET_SHUFFLE_MODE.get() match {
             case "jvm" =>
               1
@@ -202,7 +213,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
         case CometConf.SCAN_NATIVE_COMET =>
           // native_comet does not support reading complex types
           0
-        case CometConf.SCAN_NATIVE_ICEBERG_COMPAT | 
CometConf.SCAN_NATIVE_DATAFUSION =>
+        case _ =>
           CometConf.COMET_SHUFFLE_MODE.get() match {
             case "jvm" =>
               1
@@ -272,12 +283,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
   }
 
   private def testParquetTemporalTypes(
-      outputTimestampType: ParquetOutputTimestampType.Value,
-      generateArray: Boolean = true,
-      generateStruct: Boolean = true): Unit = {
-
-    val schemaGenOptions =
-      SchemaGenOptions(generateArray = generateArray, generateStruct = 
generateStruct)
+      outputTimestampType: ParquetOutputTimestampType.Value): Unit = {
 
     val dataGenOptions = DataGenOptions(generateNegativeZero = false)
 
@@ -287,12 +293,23 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
         CometConf.COMET_ENABLED.key -> "false",
         SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> 
outputTimestampType.toString,
         SQLConf.SESSION_LOCAL_TIMEZONE.key -> defaultTimezone) {
+
+        // TODO test with MapType
+        // https://github.com/apache/datafusion-comet/issues/2945
+        val schema = StructType(
+          Seq(
+            StructField("c0", DataTypes.DateType),
+            StructField("c1", DataTypes.createArrayType(DataTypes.DateType)),
+            StructField(
+              "c2",
+              DataTypes.createStructType(Array(StructField("c3", 
DataTypes.DateType))))))
+
         ParquetGenerator.makeParquetFile(
           random,
           spark,
           filename.toString,
+          schema,
           100,
-          schemaGenOptions,
           dataGenOptions)
       }
 
@@ -309,18 +326,10 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
 
                 val df = spark.read.parquet(filename.toString)
                 df.createOrReplaceTempView("t1")
-
-                def hasTemporalType(t: DataType): Boolean = t match {
-                  case DataTypes.DateType | DataTypes.TimestampType |
-                      DataTypes.TimestampNTZType =>
-                    true
-                  case t: StructType => t.exists(f => 
hasTemporalType(f.dataType))
-                  case t: ArrayType => hasTemporalType(t.elementType)
-                  case _ => false
-                }
-
                 val columns =
-                  df.schema.fields.filter(f => 
hasTemporalType(f.dataType)).map(_.name)
+                  df.schema.fields
+                    .filter(f => DataTypeSupport.hasTemporalType(f.dataType))
+                    .map(_.name)
 
                 for (col <- columns) {
                   checkSparkAnswer(s"SELECT $col FROM t1 ORDER BY $col")
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala 
b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index bc9e521d3..8011e5e70 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -116,7 +116,6 @@ abstract class CometTestBase
       sparkPlan = dfSpark.queryExecution.executedPlan
     }
     val dfComet = datasetOfRows(spark, df.logicalPlan)
-
     if (withTol.isDefined) {
       checkAnswerWithTolerance(dfComet, expected, withTol.get)
     } else {
diff --git 
a/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala 
b/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala
index 991d02014..70119f44a 100644
--- 
a/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala
+++ 
b/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala
@@ -19,8 +19,6 @@
 
 package org.apache.spark.sql
 
-import scala.collection.mutable.ListBuffer
-
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions.{Alias, ToPrettyString}
@@ -29,7 +27,6 @@ import org.apache.spark.sql.types.DataTypes
 
 import org.apache.comet.{CometConf, CometFuzzTestBase}
 import org.apache.comet.expressions.{CometCast, CometEvalMode}
-import org.apache.comet.rules.CometScanTypeChecker
 import org.apache.comet.serde.Compatible
 
 class CometToPrettyStringSuite extends CometFuzzTestBase {
@@ -45,14 +42,14 @@ class CometToPrettyStringSuite extends CometFuzzTestBase {
       val plan = Project(Seq(prettyExpr), table)
       val analyzed = spark.sessionState.analyzer.execute(plan)
       val result: DataFrame = Dataset.ofRows(spark, analyzed)
-      CometCast.isSupported(
+      val supportLevel = CometCast.isSupported(
         field.dataType,
         DataTypes.StringType,
         Some(spark.sessionState.conf.sessionLocalTimeZone),
-        CometEvalMode.TRY) match {
+        CometEvalMode.TRY)
+      supportLevel match {
         case _: Compatible
-            if CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get())
-              .isTypeSupported(field.dataType, field.name, ListBuffer.empty) =>
+            if CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_COMET =>
           checkSparkAnswerAndOperator(result)
         case _ => checkSparkAnswer(result)
       }
diff --git 
a/spark/src/test/spark-4.0/org/apache/spark/sql/CometToPrettyStringSuite.scala 
b/spark/src/test/spark-4.0/org/apache/spark/sql/CometToPrettyStringSuite.scala
index f842e3f55..b0f40edf7 100644
--- 
a/spark/src/test/spark-4.0/org/apache/spark/sql/CometToPrettyStringSuite.scala
+++ 
b/spark/src/test/spark-4.0/org/apache/spark/sql/CometToPrettyStringSuite.scala
@@ -19,8 +19,6 @@
 
 package org.apache.spark.sql
 
-import scala.collection.mutable.ListBuffer
-
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions.{Alias, ToPrettyString}
@@ -32,7 +30,6 @@ import org.apache.spark.sql.types.DataTypes
 
 import org.apache.comet.{CometConf, CometFuzzTestBase}
 import org.apache.comet.expressions.{CometCast, CometEvalMode}
-import org.apache.comet.rules.CometScanTypeChecker
 import org.apache.comet.serde.Compatible
 
 class CometToPrettyStringSuite extends CometFuzzTestBase {
@@ -56,14 +53,14 @@ class CometToPrettyStringSuite extends CometFuzzTestBase {
           val plan = Project(Seq(prettyExpr), table)
           val analyzed = spark.sessionState.analyzer.execute(plan)
           val result: DataFrame = Dataset.ofRows(spark, analyzed)
-          CometCast.isSupported(
+          val supportLevel = CometCast.isSupported(
             field.dataType,
             DataTypes.StringType,
             Some(spark.sessionState.conf.sessionLocalTimeZone),
-            CometEvalMode.TRY) match {
+            CometEvalMode.TRY)
+          supportLevel match {
             case _: Compatible
-                if CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get())
-                  .isTypeSupported(field.dataType, field.name, 
ListBuffer.empty) =>
+                if CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_COMET =>
               checkSparkAnswerAndOperator(result)
             case _ => checkSparkAnswer(result)
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to