mbutrovich commented on PR #4232: URL: https://github.com/apache/datafusion-comet/pull/4232#issuecomment-4397085601
Follow-up on the generic dispatch comment above. The generic dispatcher still pays per-row Scala bookkeeping. This variant compiles the whole per-batch loop, Arrow reads and writes included, into one Janino-produced method. Matches the hand-coded `RegExpLikeUDF` across all patterns, closes the `multi_class` gap the `MutableProjection` dispatcher showed, and keeps one dispatcher class for any scalar Spark `Expression`. Branch: https://github.com/mbutrovich/datafusion-comet/tree/jvm-udf-generic-dispatcher. Core files: [`CometCodegenDispatchUDF.scala`](https://github.com/mbutrovich/datafusion-comet/blob/jvm-udf-generic-dispatcher/common/src/main/scala/org/apache/comet/udf/CometCodegenDispatchUDF.scala), [`CometBatchKernelCodegen.scala`](https://github.com/mbutrovich/datafusion-comet/blob/jvm-udf-generic-dispatcher/common/src/main/scala/org/apache/comet/udf/CometBatchKernelCodegen.scala), [`ArrowBackedRow.scala`](https://github.com/mbutrovich/datafusion-comet/blob/jvm-udf-generic-disp atcher/common/src/main/scala/org/apache/comet/udf/ArrowBackedRow.scala). ## What it does Plan-time wiring is identical to the `MutableProjection` dispatcher: bind the Spark `Expression`, register it in `CometLambdaRegistry`, pass the registry key as arg 0 of the `JvmScalarUdf` proto, pass data columns as args 1..N. At execute time, on cache miss per registry key, `CometBatchKernelCodegen` emits a specialized `CometBatchKernel` subclass whose `process` method looks like: ```java void process(VarCharVector[] inputs, BitVector output, int numRows) { ArrowBackedRow row = new ArrowBackedRow(inputs); for (int i = 0; i < numRows; i++) { row.setRowIdx(i); <inlined expr.genCode(ctx) output> if (<result.isNull>) output.setNull(i); else output.set(i, <result.value> ? 1 : 0); } } ``` `ArrowBackedRow extends InternalRow` through a `CometInternalRow` shim. `BoundReference.genCode` emits `row.getUTF8String(ord)`, which reads the Arrow vector at the current row index directly. No per-row `SpecificInternalRow.update`, no `MutableProjection.apply` virtual call, no intermediate `InternalRow` between expression evaluation and the Arrow write. One Janino compile per expression tree, cached by registry key. ## Three approaches side by side | Aspect | Hand-coded `CometUDF` (this PR) | Generic `MutableProjection` | Arrow-direct codegen | |---|---|---|---| | Classes per expression | One | Zero | Zero | | Per-row batch loop | Hand-written Scala | Interpreted Scala | Compiled Java | | Arrow read and write | Hand-written | Interpreted Scala | Compiled Java | | Expression evaluation | Hand-written | Compiled (projection codegen) | Compiled (`doGenCode`), inlined into the fused loop | | Composed expression trees | No, without native support for children | Yes | Yes | | Adding a new scalar expression | New UDF class + serde branch | Free in the supported type set | Free in the supported type set | | Current input types | Per UDF | StringType | StringType | | Current output types | Per UDF | BooleanType | BooleanType | | Widening inputs | Per UDF | Add `ColumnReader` cases | Add getters to `ArrowBackedRow` | | Widening outputs | Per UDF | Dispatch in `evaluate` | Dispatch in codegen template | | Cluster mode today | Works | Local only | Local only | | Aggregates, windows, generators | Requires bridge change | Same | Same | Both dispatcher paths share the same architectural limits (local-mode registry, scalar-shape one-in-one-out, no `Nondeterministic.initialize`, never-freed registry entries, per-row `UTF8String.fromBytes` copy). None block measurement. All are addressable incrementally without reshaping either dispatcher. ## Benchmark numbers Per-row nanoseconds, lower is better. Apple M3 Max, OpenJDK 11.0.30+7-LTS, macOS 26.4.1. From `CometRegExpBenchmark` with a sixth column for the codegen variant. | Pattern | Spark | Comet (Scan) | Comet (Exec, native Rust) | Comet (Exec, JVM hand-coded) | Comet (Exec, JVM generic) | Comet (Exec, JVM codegen) | |---|---|---|---|---|---|---| | character_class `[0-9]+` | 11468.3 | 10554.4 | 4903.0 | 4181.6 | 4365.6 | 4108.0 | | anchored `^[0-9]` | 8562.0 | 8548.7 | 3306.3 | 3263.2 | 3355.7 | 3137.7 | | alternation `abc\|def\|ghi` | 11060.1 | 11167.9 | 5778.0 | 5754.4 | 5847.5 | 5769.7 | | multi_class `[a-zA-Z][0-9]+` | 9255.0 | 9469.0 | 4179.3 | 4401.4 | 5323.0 | 4152.2 | | repetition `(ab){2,}` | 8930.1 | 8846.0 | 3932.4 | 3860.3 | 3937.4 | 3879.9 | Reading the three JVM columns: - **vs. hand-coded `RegExpLikeUDF`**: codegen is within noise on all five patterns. No regression. - **vs. `MutableProjection` generic dispatch**: codegen matches on four patterns and is faster on `multi_class` (4152 vs 5323 per row), where the Scala-side per-row update loop trailed the hand-coded path. Fusing the Arrow read and write into the compiled loop closes that gap. Regex matching dominates at this cardinality, so per-row dispatcher bookkeeping barely moves the needle when matches are expensive. On patterns where per-row work is larger relative to regex work (short subjects, cheap patterns, composed expression trees with several children), the codegen variant's lead over the `MutableProjection` dispatcher should widen. This benchmark does not stress that regime. ## Limitations Carried over from the `MutableProjection` variant: - `CometLambdaRegistry` is JVM-local. Cluster mode needs serialized-expression bytes in the proto. - Scalar shape only. Aggregates, windows, generators need a different bridge. - `Nondeterministic.initialize(partitionIndex)` is not wired. - Registry entries are never removed. - Per-row `UTF8String.fromBytes(vec.get(i))` copies into a fresh `byte[]`. Matches hand-coded, so the A/B is fair. Both paths improve with `UTF8String.fromAddress`. Specific to the codegen variant: - `ArrowBackedRow` implements `isNullAt` and `getUTF8String` only. Other getters inherit the `CometInternalRow` shim's throwing defaults until widened. - The codegen template's output write is hardcoded to `BitVector` and boolean. New output types are additive cases. - Dictionary-encoded Arrow inputs are not handled. Widening is local: one getter in `ArrowBackedRow` per new input Arrow type, one case in the codegen template per new output Spark type. Expression evaluation comes from Spark's `doGenCode`, so composition is free. ## On #4239 The follow-up PR #4239 adds hand-coded UDFs covering the remaining regex expressions on this framework. LLM-assisted authoring makes that work cheap at write time, but every class is code to review, test against Spark semantics across versions, and maintain as Spark evolves. The generic dispatcher covers the same surface with zero per-expression code. Worth weighing the two directions before merging, since committing to the hand-coded catalogue makes a later switch to the dispatcher path harder to justify. ## Bottom line Matches hand-coded performance, closes the `MutableProjection` dispatcher's `multi_class` gap, costs zero Scala per new scalar expression. The Arrow-direct codegen dispatcher is the shape to reach for before writing a new `CometUDF`, with hand-coded kept only where the dispatcher's remaining gaps matter. Thanks again @andygrove, this is a very exciting solution! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
