mbutrovich commented on PR #4393:
URL: 
https://github.com/apache/datafusion-comet/pull/4393#issuecomment-4518302631

   ## Failure counts
   
   | Log archive | Job set | Failures | Notes |
   |---|---|---|---|
   | `logs_70206724473` | Comet (Linux, Spark 3.4–4.2) | 5 | All same test |
   | `logs_70206740518` | Comet (macOS, Spark 4.0–4.2) | 3 | Same test; no 
platform-specific issue |
   | `logs_70206737291` | Upstream Spark SQL (3.4.3–4.1.1) | ~691 reported + 5 
jobs hit 6h timeout/OOM | True total likely 800–1000 |
   | `logs_70206724472` | Iceberg integration (Spark 3.4/3.5 × Iceberg 
1.8/1.9/1.10) | 220 | Silent assertion mismatches; no Comet stack frames |
   
   Clean: `catalyst` modules, all `iceberg-spark-runtime` runs, TPC-DS, TPC-H, 
Rust tests, lint, native builds, `sql_hive-2`.
   
   ## Root-cause buckets (ranked by blast radius)
   
   ### B1 — `NullType` rejected by `Utils.toArrowType` (~600 failures, ~75–85% 
of Spark SQL)
   Stack identical across occurrences:
   ```
   java.lang.UnsupportedOperationException: Unsupported data type: NullType
     at org.apache.spark.sql.comet.util.Utils$.toArrowType(Utils.scala:155)
     at CometArrowConverters$ArrowBatchIterBase.<init>(54)
     at CometLocalTableScanExec.doExecuteColumnar(78)
   ```
   Triggers any time `Seq(...).toDF` / `values (..., null)` yields a `void` 
column, including nested `MapType(_, NullType)` (e.g. `DatasetPrimitiveSuite 
"special floating point values"`).
   
   Fix options:
   - (a) Reject schemas containing `NullType` (including nested) in the rule 
that builds `CometLocalTableScanExec`, so plan falls back. Smallest blast 
radius.
   - (b) Wire `NullType → Arrow Null` in `Utils.toArrowType` (+ corresponding 
`ArrowWriter` case).
   
   Refs:
   - `spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala:155`
   - 
`spark/src/main/scala/org/apache/spark/sql/comet/execution/arrow/CometArrowConverters.scala:54`
   - 
`spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala:66-81`
   
   ### B2 — `TimeType` rejected (~25 failures, Spark 4.1.x only)
   ```
   SparkException: not support type: TimeType(6)
     at ArrowWriter$.createFieldWriter(ArrowWriters.scala:89)
   ```
   Hits Spark 4.1 TIME-type tests (`SPARK-51402`, `SPARK-52883`, `SPARK-53929`, 
`SPARK-53109`, `SPARK-53107`, `SPARK-53108`, `SPARK-52626`, `SPARK-52660`, 
etc.). Same fix shape as B1; fallback is the immediate move since the rest of 
Comet does not yet support TIME.
   
   Refs:
   - 
`spark/src/main/scala/org/apache/spark/sql/comet/execution/arrow/ArrowWriters.scala:89`
   
   ### B3 — Nested-type nullability mismatch (~20 Spark SQL + likely root cause 
of all 220 Iceberg failures)
   DataFusion error shape:
   ```
   Incorrect datatype for StructArray field "nested",
     expected Struct("a": Int32, "b": Int64),
     got      Struct("a": non-null Int32, "b": non-null Int64)
   ```
   `CometArrowConverters` derives child nullability from the Spark schema; 
downstream native operators were planned against schemas with different 
child-nullability inference. One side has to normalize.
   
   **Iceberg connection:**
   - Every failing Iceberg test seeds rows via `spark.createDataFrame(input, 
Employee.class)` (`TestDelete.java:1526`) — a `LocalRelation` that now becomes 
`CometLocalTableScanExec`.
   - Failures concentrate on the conjunction `branch=test, 
distributionMode=none, fanout=false, vectorized=true` (other parameterizations 
of the same test methods pass).
   - Symptoms are silent: assertion failures like `expected:<199> but 
was:<200>`, `[Snapshot property added-data-files has unexpected value]`, `View 
should have correct data: expected:<2> but was:<0>`. No exception, no Comet 
stack frame.
   - `CometLocalTableScanExec.isFfiSafe = false` (line 110) due to array reuse 
in `CometArrowConverters`. Safe for Comet-native consumers (the native side 
copies based on the proto flag). Unsafe for non-Comet consumers that buffer 
batches across `next()` calls. The Iceberg branch-write path likely buffers, 
which combined with B3-style nullability mismatches could produce the silent 
loss.
   
   Investigate first:
   - Run `TestCopyOnWriteDelete.testSkewDelete` with the failing 
parameterization locally; dump physical plan and per-batch row counts.
   - Determine whether Iceberg's DSv2 columnar branch write honors 
`isFfiSafe=false`, or whether the gap is purely nullability.
   
   Refs:
   - 
`spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala:48-81,110`
   - 
`spark/src/main/scala/org/apache/comet/serde/operator/CometSink.scala:44,68`
   - Iceberg `spark-extensions` `TestDelete.append(String, Employee...)` 
(`TestDelete.java:1520-1528`) and 
`SparkRowLevelOperationsTestBase.createAndInitTable` / `append` / 
`commitTarget` (`SparkRowLevelOperationsTestBase.java:282,294,433`)
   
   ### B4 — Stale `CometWindowExecSuite` assertion (8 failures: Comet's own 
suites, all platforms)
   Test: `window function: partition and order expressions`, all 5 Spark 
versions on Linux + 3 on macOS. Inverted assertion:
   ```scala
   // CometWindowExecSuite.scala:111-119
   } else {
     // we fall back to Spark for shuffle because we do not support
     // native shuffle with a LocalTableScan input, and we do not fall
     // back to Comet columnar shuffle due to
     // https://github.com/apache/datafusion-comet/issues/1248
     assert(cometShuffles.isEmpty)
   }
   ```
   The premise is no longer true; native shuffle now composes with the new 
local scan.
   
   Action: update the assertion to `cometShuffles.length == 1`, drop the 
comment, re-evaluate whether #1248 is still relevant. Grep for siblings: 
`"LocalTableScan input"`, `"issues/1248"`, `"fall back to Spark for shuffle"`.
   
   Refs:
   - 
`spark/src/test/scala/org/apache/comet/exec/CometWindowExecSuite.scala:108-119`
   
   ### B5 — Plan-shape / explain / metrics assertions in upstream Spark SQL 
(~40 failures)
   Tests grep the physical plan for Spark-only nodes or check 
`WholeStageCodegen` / SQL metrics that Comet doesn't emit. Examples:
   - `ExplainSuite "Support ExplainMode in Dataset.explain"`
   - `SQLAppStatusListenerSuite "SPARK-29894 test Codegen Stage Id"`, 
`"SPARK-32615,SPARK-33016"`
   - `BatchEvalPythonExecSuite "Python UDF: push down deterministic FilterExec 
predicates"`
   - `JoinHintSuite "broadcast"`, `"shuffle-replicate-nl"`
   - `DataFrameSetOperationsSuite SPARK-37371`
   - `OptimizeLocalRelationsSuite SPARK-25860 / SPARK-33847`
   
   Not Comet defects. Either extend the upstream-test skip list or add per-test 
config disable for `localTableScan`.
   
   ### B6 — Subquery not found when plan root is `CometLocalTableScan` (~8 
failures)
   ```
   CometRuntimeException: Subquery NNN not found for plan MMM
   ```
   Hits `CTEHintSuite "subquery in repartition"`, `SPARK-36447`, `CTE Predicate 
push-down and column pruning`. Real Comet bug — subquery registration path 
doesn't handle the new root operator.
   
   ### B7 — Long tail (~20 failures total)
   - ULP-level float math diffs in `MathFunctionsSuite` (`asinh`, `acosh`, 
`cosh`, `tan`, `cot`, `cbrt`, `pow`, `atan2`, etc.). Known JVM-libm vs 
Rust-libm gap; no Comet carve-out for upstream suite.
   - `bit_length` / `octet_length` on `BinaryType` rejected by DataFusion's 
string-only UDF (`SPARK-36751`).
   - `null IN ()` returns `false` in Comet, should be `null` (`EmptyInSuite`).
   - `RuntimeNullChecksV2Writes` expects `SparkRuntimeException`, gets 
`SparkException` (wrapper depth).
   - `to_binary`/`unhex` error class differs from Spark's 
`CONVERSION_INVALID_INPUT`.
   - `collect_set` element order differs.
   - 5 Spark SQL shards hit GitHub Actions 6h wall clock / exit 137 — likely 
cascade from B1 stack-trace volume; should resolve once B1 is fixed.
   
   ## Suggested fix order
   
   | Order | Action | Eliminates | Risk |
   |---|---|---|---|
   | 1 | B1 fallback (NullType in schema) | ~600 + likely unblocks the 5 
timed-out shards | Low |
   | 2 | B2 fallback (TimeType in schema) | ~25 | Low |
   | 3 | B3 investigation + fix (Iceberg silent corruption) | 220 + ~20 Spark 
SQL | Medium (data correctness) |
   | 4 | B4 stale assertion update + sibling grep | 8 | Trivial |
   | 5 | B6 subquery registration | ~8 | Low-medium |
   | 6 | B5 skip list (or per-test config) | ~40 | Low |
   | 7 | B7 individually | ~20 | Varies |
   
   After (1)–(2), failure count should drop from ~1,200 to ~250 and most 
remaining failures will be the substantive ones.
   
   ## Open questions
   
   - B3: does Iceberg's columnar DSv2 branch-write path honor 
`setArrowFfiSafe(false)`, or does the contract only apply to native consumers? 
If only native, `CometLocalTableScanExec` needs to copy batches before exposing 
them to non-Comet consumers (or be disallowed as a direct columnar input to 
non-Comet writers).
   - B5: skip list vs per-test disable — preference? A skip list is easier to 
maintain; per-test config keeps Comet exercised.
   - Whether to keep the default flip in PR #4393 and land fixes incrementally, 
or to gate the flip behind B1–B3 landing first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to