andygrove opened a new pull request, #4305:
URL: https://github.com/apache/datafusion-comet/pull/4305

   ## Which issue does this PR close?
   
   Closes #.
   
   ## Rationale for this change
   
   The native Rust JSON expressions in Comet have known compatibility gaps with 
Spark and feature restrictions (e.g., `from_json` only supports PERMISSIVE mode 
with simple schemas; `to_json` does not handle map/array at the top level; 
`get_json_object` differs from Spark on certain path expressions). Routing 
through Spark's own expression classes via the JVM UDF framework (introduced in 
#4232 and exercised for regexp in #4239) guarantees byte-exact compatibility, 
at the cost of a JNI roundtrip per batch. This is opt-in via a new config and 
the existing native path remains the default.
   
   ## What changes are included in this PR?
   
   - New config `spark.comet.exec.json.engine` (`rust` default, `java` 
experimental).
   - Three new `CometUDF` implementations under 
`common/src/main/scala/org/apache/comet/udf/`: `GetJsonObjectUDF`, 
`FromJsonUDF`, `ToJsonUDF`.
   - Each UDF builds a fresh Spark expression per `evaluate` call. Spark's JSON 
evaluators (`GetJsonObjectEvaluator`, `StructsToJsonEvaluator`, 
`JsonToStructsEvaluator`) hold mutable per-row state, and the JVM UDF framework 
shares one UDF instance across native worker threads, so a cached cross-thread 
expression races on its evaluator state.
   - `from_json` and `to_json` use a serde-side `CometLambdaRegistry` to pass 
the configured Spark expression (schema, options, timezone) to the UDF. The 
serde rebinds the child to `BoundReference(0)` so the UDF can call `eval(row)` 
against a single-column wrapper row.
   - Serde dispatch in three existing serde objects: `CometGetJsonObject` 
(`strings.scala`), `CometStructsToJson` and `CometJsonToStructs` 
(`structs.scala`) branch on the engine.
   - Compatibility doc `docs/source/user-guide/latest/compatibility/json.md` 
and updated expressions table.
   
   `json_array_length` and `json_object_keys` are intentionally out of scope. 
Both are `RuntimeReplaceable` in Spark 4.x and Catalyst's `ReplaceExpressions` 
rewrites them to `StaticInvoke` before Comet sees the plan, so 
`classOf[LengthOfJsonArray]` / `classOf[JsonObjectKeys]` registrations never 
match. Adding support requires recognizing the rewritten `StaticInvoke` form in 
Comet's serde dispatch and is left to a follow-up.
   
   This PR was scaffolded with the project's brainstorming, writing-plans, and 
subagent-driven-development skills.
   
   ## How are these changes tested?
   
   - Three unit suites under `spark/src/test/scala/org/apache/comet/udf/` 
covering valid input, null input, empty input vectors, malformed JSON, and (for 
`from_json`) PERMISSIVE mode all-null struct output.
   - `CometJsonJvmSuite` integration suite extending `CometTestBase` that flips 
`spark.comet.exec.json.engine=java` and runs the three expressions through 
`checkSparkAnswerAndOperator`, asserting Comet exec and byte-exact match with 
Spark.
   - Existing `CometJsonExpressionSuite` runs unchanged on default 
`engine=rust` to confirm no regression.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to