mbutrovich commented on PR #4393: URL: https://github.com/apache/datafusion-comet/pull/4393#issuecomment-4557800295
## Branch status JVM and native input now use the canonical Arrow C Stream Interface: `Data.exportArrayStream` on the JVM, `ArrowArrayStreamReader` on the native side. The bespoke `CometBatchIterator` JNI machinery and the `arrow_ffi_safe` flag are gone. Closes #3770. Net diff vs main: ~1070 insertions, ~660 deletions across 35 files. JVM-side input is unified behind three `ArrowReader` subclasses, replacing the per-shape conversion paths that funneled into `CometBatchIterator`: - `RowArrowReader` for `InternalRow` input - `SparkColumnarArrowReader` for non-Comet `ColumnarBatch` input - `ColumnarBatchArrowReader` for Comet `ColumnarBatch` input `CometExecRDD` / `CometExecIterator` / `operators.scala` route input slots from the protobuf (`findShuffleScanIndices`) instead of a conf flag, so JVM and native agree on which slot is `ShuffleScan` vs `Scan`. Allocator/reader/stream lifecycle is bound to `TaskContext` with rollback on partial-setup failure. Native side: `ScanExec.input_source: Option<Arc<Mutex<ArrowArrayStreamReader>>>`. The `arrow_ffi_safe`-gated deep-copy branch is removed; `copy_or_unpack_array` is preserved as the boundary that strips dictionary encoding before downstream DataFusion operators see it. ## Bucket status | Bucket | Status | |---|---| | B1 NullType end-to-end | Landed | | B2 TimeType fallback via `DataTypeSupport` | Landed | | B3b Iceberg silent corruption | Resolved (consumer-owned reader lifecycle) | | B4 Stale window-suite assertion | Landed | | B6 Subquery under `CometLocalTableScan` | Expected transitive on B1, pending CI | | B3a Nested-type nullability residual | Deferred | | B5 Plan-shape / explain / metrics | Deferred, needs skip-list discussion | | B7 Long tail | Deferred, individual fixes | ## Recent fixes since the last CI sweep ### Dictionary-encoded `ColumnarBatch` input Native `HashAggregate`'s row converter emits `Dictionary<Int32, T>` columns for string group keys. `CometColumnarShuffle` round-trips them, so the next stage's input is a `CometDictionaryVector`. `ColumnarBatchArrowReader` builds its stable VSR from the logical (non-dict) Spark schema; the unload/load step then trips inside `VectorLoader.loadBuffers` with `no more buffers for field ...: Utf8. Expected 3` before any data reaches the C Stream. Fix: decode dictionary source columns via `DictionaryEncoder.decode` before `VectorUnloader`. Native still unpacks downstream via `copy_or_unpack_array`, so end-to-end semantics are unchanged. Caught by `CometAggregateSuite "multiple column distinct count"`. ### Shaded `ArrayStreamExporter` `IllegalAccessError` `org.apache.arrow.c.ArrayStreamExporter$ExportedArrayStreamPrivateData` is already excluded from relocation because Arrow's JNI looks it up by literal classname (`jni_wrapper.cc:341`). The outer `ArrayStreamExporter` was being shaded, which split outer and inner across packages and broke package-private constructor access. Fix: also exclude the outer from relocation. It's `final class` package-private, an implementation detail, no public-API clash risk. Caught by `ParquetEncryptionITCase`. ## In-flight Latest commit pushed; CI sweep running. Will update once it settles. ## Deferred (won't block this PR) - **B3a, nested struct nullability residual.** Reproducer: `DataFrameSetOperationsSuite "SPARK-35756: unionByName support struct having same col names but different sequence"`. `unionByName` reorders the right side via `named_struct(...)` (children become nullable); the left passes through (children stay non-null); DataFusion strict-validates Union/Project schemas. Only reproduces under the `spark.plugins=...CometPlugin` path; no `CometTestBase` reproducer yet. - **B5, plan-shape / explain / metrics assertions** in upstream Spark SQL. ~40 tests grep for Spark-only nodes (`WholeStageCodegen`, `FilterExec`, `BroadcastHashJoinExec`) or codegen-stage IDs Comet operators don't emit. Needs an upstream-test skip list or per-test `spark.comet.exec.localTableScan.enabled=false`. - **B7, long tail.** ULP-level float math (asinh / acosh / cosh / tan / cot / cbrt / pow / atan2), `bit_length`/`octet_length` on `BinaryType`, `null IN ()` returning `false` instead of `null`, `RuntimeNullChecksV2Writes` error-class wrapper depth, `to_binary`/`unhex` error class, `collect_set` ordering. Each is a separate fix. ## Splittable into separate PRs Once CI is clean these can peel off independently from the `localTableScan` default flip: 1. **NullType end-to-end** (B1): `Utils.scala`, `CometShuffleExchangeExec`, `native/shuffle/src/spark_unsafe/row.rs`, three test suites. 2. **TimeType fallback via `DataTypeSupport`** (B2): self-contained type-check addition on `CometLocalTableScanExec`. 3. **Stale window-suite assertion fix** (B4): one-line test fix. 4. **Arrow C Stream Interface input + close of #3770**: the bulk of this PR. Doesn't strictly require the default flip but unblocks it. 5. **Default flip for `localTableScan.enabled = true`**: lands last, after B3a residuals are resolved or accepted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
