mbutrovich commented on PR #4393:
URL:
https://github.com/apache/datafusion-comet/pull/4393#issuecomment-4518302631
## Failure counts
| Log archive | Job set | Failures | Notes |
|---|---|---|---|
| `logs_70206724473` | Comet (Linux, Spark 3.4–4.2) | 5 | All same test |
| `logs_70206740518` | Comet (macOS, Spark 4.0–4.2) | 3 | Same test; no
platform-specific issue |
| `logs_70206737291` | Upstream Spark SQL (3.4.3–4.1.1) | ~691 reported + 5
jobs hit 6h timeout/OOM | True total likely 800–1000 |
| `logs_70206724472` | Iceberg integration (Spark 3.4/3.5 × Iceberg
1.8/1.9/1.10) | 220 | Silent assertion mismatches; no Comet stack frames |
Clean: `catalyst` modules, all `iceberg-spark-runtime` runs, TPC-DS, TPC-H,
Rust tests, lint, native builds, `sql_hive-2`.
## Root-cause buckets (ranked by blast radius)
### B1 — `NullType` rejected by `Utils.toArrowType` (~600 failures, ~75–85%
of Spark SQL)
Stack identical across occurrences:
```
java.lang.UnsupportedOperationException: Unsupported data type: NullType
at org.apache.spark.sql.comet.util.Utils$.toArrowType(Utils.scala:155)
at CometArrowConverters$ArrowBatchIterBase.<init>(54)
at CometLocalTableScanExec.doExecuteColumnar(78)
```
Triggers any time `Seq(...).toDF` / `values (..., null)` yields a `void`
column, including nested `MapType(_, NullType)` (e.g. `DatasetPrimitiveSuite
"special floating point values"`).
Fix options:
- (a) Reject schemas containing `NullType` (including nested) in the rule
that builds `CometLocalTableScanExec`, so plan falls back. Smallest blast
radius.
- (b) Wire `NullType → Arrow Null` in `Utils.toArrowType` (+ corresponding
`ArrowWriter` case).
Refs:
- `spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala:155`
-
`spark/src/main/scala/org/apache/spark/sql/comet/execution/arrow/CometArrowConverters.scala:54`
-
`spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala:66-81`
### B2 — `TimeType` rejected (~25 failures, Spark 4.1.x only)
```
SparkException: not support type: TimeType(6)
at ArrowWriter$.createFieldWriter(ArrowWriters.scala:89)
```
Hits Spark 4.1 TIME-type tests (`SPARK-51402`, `SPARK-52883`, `SPARK-53929`,
`SPARK-53109`, `SPARK-53107`, `SPARK-53108`, `SPARK-52626`, `SPARK-52660`,
etc.). Same fix shape as B1; fallback is the immediate move since the rest of
Comet does not yet support TIME.
Refs:
-
`spark/src/main/scala/org/apache/spark/sql/comet/execution/arrow/ArrowWriters.scala:89`
### B3 — Nested-type nullability mismatch (~20 Spark SQL + likely root cause
of all 220 Iceberg failures)
DataFusion error shape:
```
Incorrect datatype for StructArray field "nested",
expected Struct("a": Int32, "b": Int64),
got Struct("a": non-null Int32, "b": non-null Int64)
```
`CometArrowConverters` derives child nullability from the Spark schema;
downstream native operators were planned against schemas with different
child-nullability inference. One side has to normalize.
**Iceberg connection:**
- Every failing Iceberg test seeds rows via `spark.createDataFrame(input,
Employee.class)` (`TestDelete.java:1526`) — a `LocalRelation` that now becomes
`CometLocalTableScanExec`.
- Failures concentrate on the conjunction `branch=test,
distributionMode=none, fanout=false, vectorized=true` (other parameterizations
of the same test methods pass).
- Symptoms are silent: assertion failures like `expected:<199> but
was:<200>`, `[Snapshot property added-data-files has unexpected value]`, `View
should have correct data: expected:<2> but was:<0>`. No exception, no Comet
stack frame.
- `CometLocalTableScanExec.isFfiSafe = false` (line 110) due to array reuse
in `CometArrowConverters`. Safe for Comet-native consumers (the native side
copies based on the proto flag). Unsafe for non-Comet consumers that buffer
batches across `next()` calls. The Iceberg branch-write path likely buffers,
which combined with B3-style nullability mismatches could produce the silent
loss.
Investigate first:
- Run `TestCopyOnWriteDelete.testSkewDelete` with the failing
parameterization locally; dump physical plan and per-batch row counts.
- Determine whether Iceberg's DSv2 columnar branch write honors
`isFfiSafe=false`, or whether the gap is purely nullability.
Refs:
-
`spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala:48-81,110`
-
`spark/src/main/scala/org/apache/comet/serde/operator/CometSink.scala:44,68`
- Iceberg `spark-extensions` `TestDelete.append(String, Employee...)`
(`TestDelete.java:1520-1528`) and
`SparkRowLevelOperationsTestBase.createAndInitTable` / `append` /
`commitTarget` (`SparkRowLevelOperationsTestBase.java:282,294,433`)
### B4 — Stale `CometWindowExecSuite` assertion (8 failures: Comet's own
suites, all platforms)
Test: `window function: partition and order expressions`, all 5 Spark
versions on Linux + 3 on macOS. Inverted assertion:
```scala
// CometWindowExecSuite.scala:111-119
} else {
// we fall back to Spark for shuffle because we do not support
// native shuffle with a LocalTableScan input, and we do not fall
// back to Comet columnar shuffle due to
// https://github.com/apache/datafusion-comet/issues/1248
assert(cometShuffles.isEmpty)
}
```
The premise is no longer true; native shuffle now composes with the new
local scan.
Action: update the assertion to `cometShuffles.length == 1`, drop the
comment, re-evaluate whether #1248 is still relevant. Grep for siblings:
`"LocalTableScan input"`, `"issues/1248"`, `"fall back to Spark for shuffle"`.
Refs:
-
`spark/src/test/scala/org/apache/comet/exec/CometWindowExecSuite.scala:108-119`
### B5 — Plan-shape / explain / metrics assertions in upstream Spark SQL
(~40 failures)
Tests grep the physical plan for Spark-only nodes or check
`WholeStageCodegen` / SQL metrics that Comet doesn't emit. Examples:
- `ExplainSuite "Support ExplainMode in Dataset.explain"`
- `SQLAppStatusListenerSuite "SPARK-29894 test Codegen Stage Id"`,
`"SPARK-32615,SPARK-33016"`
- `BatchEvalPythonExecSuite "Python UDF: push down deterministic FilterExec
predicates"`
- `JoinHintSuite "broadcast"`, `"shuffle-replicate-nl"`
- `DataFrameSetOperationsSuite SPARK-37371`
- `OptimizeLocalRelationsSuite SPARK-25860 / SPARK-33847`
Not Comet defects. Either extend the upstream-test skip list or add per-test
config disable for `localTableScan`.
### B6 — Subquery not found when plan root is `CometLocalTableScan` (~8
failures)
```
CometRuntimeException: Subquery NNN not found for plan MMM
```
Hits `CTEHintSuite "subquery in repartition"`, `SPARK-36447`, `CTE Predicate
push-down and column pruning`. Real Comet bug — subquery registration path
doesn't handle the new root operator.
### B7 — Long tail (~20 failures total)
- ULP-level float math diffs in `MathFunctionsSuite` (`asinh`, `acosh`,
`cosh`, `tan`, `cot`, `cbrt`, `pow`, `atan2`, etc.). Known JVM-libm vs
Rust-libm gap; no Comet carve-out for upstream suite.
- `bit_length` / `octet_length` on `BinaryType` rejected by DataFusion's
string-only UDF (`SPARK-36751`).
- `null IN ()` returns `false` in Comet, should be `null` (`EmptyInSuite`).
- `RuntimeNullChecksV2Writes` expects `SparkRuntimeException`, gets
`SparkException` (wrapper depth).
- `to_binary`/`unhex` error class differs from Spark's
`CONVERSION_INVALID_INPUT`.
- `collect_set` element order differs.
- 5 Spark SQL shards hit GitHub Actions 6h wall clock / exit 137 — likely
cascade from B1 stack-trace volume; should resolve once B1 is fixed.
## Suggested fix order
| Order | Action | Eliminates | Risk |
|---|---|---|---|
| 1 | B1 fallback (NullType in schema) | ~600 + likely unblocks the 5
timed-out shards | Low |
| 2 | B2 fallback (TimeType in schema) | ~25 | Low |
| 3 | B3 investigation + fix (Iceberg silent corruption) | 220 + ~20 Spark
SQL | Medium (data correctness) |
| 4 | B4 stale assertion update + sibling grep | 8 | Trivial |
| 5 | B6 subquery registration | ~8 | Low-medium |
| 6 | B5 skip list (or per-test config) | ~40 | Low |
| 7 | B7 individually | ~20 | Varies |
After (1)–(2), failure count should drop from ~1,200 to ~250 and most
remaining failures will be the substantive ones.
## Open questions
- B3: does Iceberg's columnar DSv2 branch-write path honor
`setArrowFfiSafe(false)`, or does the contract only apply to native consumers?
If only native, `CometLocalTableScanExec` needs to copy batches before exposing
them to non-Comet consumers (or be disallowed as a direct columnar input to
non-Comet writers).
- B5: skip list vs per-test disable — preference? A skip list is easier to
maintain; per-test config keeps Comet exercised.
- Whether to keep the default flip in PR #4393 and land fixes incrementally,
or to gate the flip behind B1–B3 landing first.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]