This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a000abba refactor: rename scan.allowIncompatible to 
scan.unsignedSmallIntSafetyCheck (#3238)
1a000abba is described below

commit 1a000abba11ce668662485b63ec55e3a177aa5d5
Author: Andy Grove <[email protected]>
AuthorDate: Sun Jan 25 15:34:09 2026 -0700

    refactor: rename scan.allowIncompatible to scan.unsignedSmallIntSafetyCheck 
(#3238)
    
    * refactor: rename scan.allowIncompatible to 
scan.unsignedSmallIntSafetyCheck
    
    This change renames `spark.comet.scan.allowIncompatible` to
    `spark.comet.scan.unsignedSmallIntSafetyCheck` and changes its default
    to `true` (enabled by default).
    
    The key change is that ByteType is removed from the safety check entirely,
    leaving only ShortType subject to fallback behavior.
    
    ## Why ByteType is Safe
    
    ByteType columns are always safe for native execution because:
    
    1. **Parquet type mapping**: Spark's ByteType can only originate from signed
       INT8 in Parquet. There is no unsigned 8-bit Parquet type (UINT_8) that 
maps
       to ByteType.
    
    2. **UINT_8 maps to ShortType**: When Parquet files contain unsigned UINT_8
       columns, Spark maps them to ShortType (16-bit), not ByteType. This is
       because UINT_8 values (0-255) exceed the signed byte range (-128 to 127).
    
    3. **Truncation preserves signed values**: When storing signed INT8 in 8 
bits,
       the truncation from any wider representation preserves the correct signed
       value due to two's complement representation.
    
    ## Why ShortType Needs the Safety Check
    
    ShortType columns may be problematic because:
    
    1. **Ambiguous origin**: ShortType can come from either signed INT16 (safe) 
or
       unsigned UINT_8 (potentially incompatible).
    
    2. **Different reader behavior**: Arrow-based readers like DataFusion 
respect
       the unsigned UINT_8 logical type and read data as unsigned, while Spark
       ignores the logical type and reads as signed. This can produce different
       results for values 128-255.
    
    3. **No metadata available**: At scan time, Comet cannot determine whether a
       ShortType column originated from INT16 or UINT_8, so the safety check
       conservatively falls back to Spark for all ShortType columns.
    
    Users who know their data does not contain unsigned UINT_8 columns can 
disable
    the safety check with `spark.comet.scan.unsignedSmallIntSafetyCheck=false`.
    
    Co-Authored-By: Claude Opus 4.5 <[email protected]>
    
    * format
    
    * rename
    
    * rename
    
    * Fix clippy warnings for Rust 1.93
    
    - Use local `root_op` variable instead of unwrapping `exec_context.root_op`
    - Replace `is_some()` + `unwrap()` pattern with `if let Some(...)`
    
    Co-Authored-By: Claude Opus 4.5 <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.5 <[email protected]>
---
 common/src/main/scala/org/apache/comet/CometConf.scala    | 15 ++++++++++-----
 docs/source/contributor-guide/parquet_scans.md            | 13 +++++++------
 .../main/scala/org/apache/comet/rules/CometScanRule.scala | 10 ++++++----
 .../test/scala/org/apache/comet/CometFuzzTestBase.scala   |  2 +-
 .../apache/comet/parquet/CometParquetWriterSuite.scala    |  4 ++--
 .../scala/org/apache/comet/parquet/ParquetReadSuite.scala |  2 +-
 .../scala/org/apache/comet/rules/CometScanRuleSuite.scala | 14 +++++++-------
 .../test/scala/org/apache/spark/sql/CometTestBase.scala   |  6 +++---
 8 files changed, 37 insertions(+), 29 deletions(-)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index ee1093a88..1c51d608d 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -766,13 +766,18 @@ object CometConf extends ShimCometConf {
       .booleanConf
       .createWithDefault(false)
 
-  val COMET_SCAN_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
-    conf("spark.comet.scan.allowIncompatible")
+  val COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK: ConfigEntry[Boolean] =
+    conf("spark.comet.scan.unsignedSmallIntSafetyCheck")
       .category(CATEGORY_SCAN)
-      .doc("Some Comet scan implementations are not currently fully compatible 
with Spark for " +
-        s"all datatypes. Set this config to true to allow them anyway. 
$COMPAT_GUIDE.")
+      .doc(
+        "Parquet files may contain unsigned 8-bit integers (UINT_8) which 
Spark maps to " +
+          "ShortType. When this config is true (default), Comet falls back to 
Spark for " +
+          "ShortType columns because we cannot distinguish signed INT16 (safe) 
from unsigned " +
+          "UINT_8 (may produce different results). Set to false to allow 
native execution of " +
+          "ShortType columns if you know your data does not contain unsigned 
UINT_8 columns " +
+          s"from improperly encoded Parquet files. $COMPAT_GUIDE.")
       .booleanConf
-      .createWithDefault(false)
+      .createWithDefault(true)
 
   val COMET_EXEC_STRICT_FLOATING_POINT: ConfigEntry[Boolean] =
     conf("spark.comet.exec.strictFloatingPoint")
diff --git a/docs/source/contributor-guide/parquet_scans.md 
b/docs/source/contributor-guide/parquet_scans.md
index 48fda7dc3..3dcb78e87 100644
--- a/docs/source/contributor-guide/parquet_scans.md
+++ b/docs/source/contributor-guide/parquet_scans.md
@@ -42,12 +42,13 @@ implementation:
 
 The `native_datafusion` and `native_iceberg_compat` scans share the following 
limitations:
 
-- When reading Parquet files written by systems other than Spark that contain 
columns with the logical types `UINT_8`
-  or `UINT_16`, Comet will produce different results than Spark because Spark 
does not preserve or understand these
-  logical types. Arrow-based readers, such as DataFusion and Comet do respect 
these types and read the data as unsigned
-  rather than signed. By default, Comet will fall back to Spark's native scan 
when scanning Parquet files containing
-  `byte` or `short` types (regardless of the logical type). This behavior can 
be disabled by setting
-  `spark.comet.scan.allowIncompatible=true`.
+- When reading Parquet files written by systems other than Spark that contain 
columns with the logical type `UINT_8`
+  (unsigned 8-bit integers), Comet may produce different results than Spark. 
Spark maps `UINT_8` to `ShortType`, but
+  Comet's Arrow-based readers respect the unsigned type and read the data as 
unsigned rather than signed. Since Comet
+  cannot distinguish `ShortType` columns that came from `UINT_8` versus signed 
`INT16`, by default Comet falls back to
+  Spark when scanning Parquet files containing `ShortType` columns. This 
behavior can be disabled by setting
+  `spark.comet.scan.unsignedSmallIntSafetyCheck=false`. Note that `ByteType` 
columns are always safe because they can
+  only come from signed `INT8`, where truncation preserves the signed value.
 - No support for default values that are nested types (e.g., maps, arrays, 
structs). Literal default values are supported.
 
 The `native_datafusion` scan has some additional limitations:
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala 
b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
index bfcf25074..4291e3fb6 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
@@ -721,11 +721,13 @@ case class CometScanTypeChecker(scanImpl: String) extends 
DataTypeSupport with C
       name: String,
       fallbackReasons: ListBuffer[String]): Boolean = {
     dt match {
-      case ByteType | ShortType
+      case ShortType
           if scanImpl != CometConf.SCAN_NATIVE_COMET &&
-            !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() =>
-        fallbackReasons += s"$scanImpl scan cannot read $dt when " +
-          s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false. 
${CometConf.COMPAT_GUIDE}."
+            CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get() =>
+        fallbackReasons += s"$scanImpl scan may not handle unsigned UINT_8 
correctly for $dt. " +
+          s"Set ${CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key}=false 
to allow " +
+          "native execution if your data does not contain unsigned small 
integers. " +
+          CometConf.COMPAT_GUIDE
         false
       case _: StructType | _: ArrayType | _: MapType if scanImpl == 
CometConf.SCAN_NATIVE_COMET =>
         false
diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala 
b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala
index 74858ed61..5c5251b5e 100644
--- a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala
+++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala
@@ -120,7 +120,7 @@ class CometFuzzTestBase extends CometTestBase with 
AdaptiveSparkPlanHelper {
         super.test(testName + s" ($scanImpl, $shuffleMode shuffle)", testTags: 
_*) {
           withSQLConf(
             CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl,
-            CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true",
+            CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "false",
             CometConf.COMET_SHUFFLE_MODE.key -> shuffleMode) {
             testFun
           }
diff --git 
a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala 
b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala
index c4856c3cc..1d63b9d3a 100644
--- 
a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala
+++ 
b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala
@@ -459,8 +459,8 @@ class CometParquetWriterSuite extends CometTestBase {
         CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
         // explicitly set scan impl to override CI defaults
         CometConf.COMET_NATIVE_SCAN_IMPL.key -> "auto",
-        // COMET_SCAN_ALLOW_INCOMPATIBLE is needed because input data contains 
byte/short types
-        CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true",
+        // Disable unsigned small int safety check for ShortType columns
+        CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "false",
         // use a different timezone to make sure that timezone handling works 
with nested types
         SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Halifax") {
 
diff --git 
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala 
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
index b028a70dc..a05bb7c39 100644
--- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
@@ -1904,7 +1904,7 @@ class ParquetReadV1Suite extends ParquetReadSuite with 
AdaptiveSparkPlanHelper {
       val rows = 1000
       withSQLConf(
         CometConf.COMET_NATIVE_SCAN_IMPL.key -> 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT,
-        CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false") {
+        CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "true") {
         makeParquetFileAllPrimitiveTypes(
           path,
           dictionaryEnabled = false,
diff --git 
a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala 
b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala
index 7f54d0b7c..d0dfbbb09 100644
--- a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala
@@ -140,21 +140,21 @@ class CometScanRuleSuite extends CometTestBase {
     }
   }
 
-  test("CometScanRule should fallback to Spark for unsupported data types in 
v1 scan") {
+  test("CometScanRule should fallback to Spark for ShortType when safety check 
enabled") {
     withTempPath { path =>
-      // Create test data with unsupported types (e.g., BinaryType, 
CalendarIntervalType)
+      // Create test data with ShortType which may be from unsigned UINT_8
       import org.apache.spark.sql.types._
       val unsupportedSchema = new StructType(
         Array(
           StructField("id", DataTypes.IntegerType, nullable = true),
           StructField(
             "value",
-            DataTypes.ByteType,
+            DataTypes.ShortType,
             nullable = true
-          ), // Unsupported in some scan modes
+          ), // May be from unsigned UINT_8
           StructField("name", DataTypes.StringType, nullable = true)))
 
-      val testData = Seq(Row(1, 1.toByte, "test1"), Row(2, -1.toByte, "test2"))
+      val testData = Seq(Row(1, 1.toShort, "test1"), Row(2, -1.toShort, 
"test2"))
 
       val df = spark.createDataFrame(spark.sparkContext.parallelize(testData), 
unsupportedSchema)
       df.write.parquet(path.toString)
@@ -167,10 +167,10 @@ class CometScanRuleSuite extends CometTestBase {
 
         withSQLConf(
           CometConf.COMET_NATIVE_SCAN_IMPL.key -> 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT,
-          CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false") {
+          CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "true") {
           val transformedPlan = applyCometScanRule(sparkPlan)
 
-          // Should fallback to Spark due to unsupported ByteType in schema
+          // Should fallback to Spark due to ShortType (may be from unsigned 
UINT_8)
           assert(countOperators(transformedPlan, classOf[FileSourceScanExec]) 
== 1)
           assert(countOperators(transformedPlan, classOf[CometScanExec]) == 0)
         }
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala 
b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index 81ac72247..89249240c 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -83,7 +83,7 @@ abstract class CometTestBase
     conf.set(CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.key, "true")
     conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
     conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true")
-    conf.set(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key, "true")
+    conf.set(CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key, "false")
     conf.set(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key, "2g")
     
conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, 
"true")
     // SortOrder is incompatible for mixed zero and negative zero floating 
point values, but
@@ -1113,7 +1113,7 @@ abstract class CometTestBase
    *         |""".stripMargin,
    *       "select arr from tbl",
    *       sqlConf = Seq(
-   *         CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false",
+   *         CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.key -> 
"true",
    *         "spark.comet.explainFallback.enabled" -> "false"
    *       ),
    *       debugCometDF = df => {
@@ -1275,6 +1275,6 @@ abstract class CometTestBase
 
   def usingDataSourceExecWithIncompatTypes(conf: SQLConf): Boolean = {
     usingDataSourceExec(conf) &&
-    !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get(conf)
+    CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get(conf)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to