andygrove opened a new pull request, #4305: URL: https://github.com/apache/datafusion-comet/pull/4305
## Which issue does this PR close? Closes #. ## Rationale for this change The native Rust JSON expressions in Comet have known compatibility gaps with Spark and feature restrictions (e.g., `from_json` only supports PERMISSIVE mode with simple schemas; `to_json` does not handle map/array at the top level; `get_json_object` differs from Spark on certain path expressions). Routing through Spark's own expression classes via the JVM UDF framework (introduced in #4232 and exercised for regexp in #4239) guarantees byte-exact compatibility, at the cost of a JNI roundtrip per batch. This is opt-in via a new config and the existing native path remains the default. ## What changes are included in this PR? - New config `spark.comet.exec.json.engine` (`rust` default, `java` experimental). - Three new `CometUDF` implementations under `common/src/main/scala/org/apache/comet/udf/`: `GetJsonObjectUDF`, `FromJsonUDF`, `ToJsonUDF`. - Each UDF builds a fresh Spark expression per `evaluate` call. Spark's JSON evaluators (`GetJsonObjectEvaluator`, `StructsToJsonEvaluator`, `JsonToStructsEvaluator`) hold mutable per-row state, and the JVM UDF framework shares one UDF instance across native worker threads, so a cached cross-thread expression races on its evaluator state. - `from_json` and `to_json` use a serde-side `CometLambdaRegistry` to pass the configured Spark expression (schema, options, timezone) to the UDF. The serde rebinds the child to `BoundReference(0)` so the UDF can call `eval(row)` against a single-column wrapper row. - Serde dispatch in three existing serde objects: `CometGetJsonObject` (`strings.scala`), `CometStructsToJson` and `CometJsonToStructs` (`structs.scala`) branch on the engine. - Compatibility doc `docs/source/user-guide/latest/compatibility/json.md` and updated expressions table. `json_array_length` and `json_object_keys` are intentionally out of scope. Both are `RuntimeReplaceable` in Spark 4.x and Catalyst's `ReplaceExpressions` rewrites them to `StaticInvoke` before Comet sees the plan, so `classOf[LengthOfJsonArray]` / `classOf[JsonObjectKeys]` registrations never match. Adding support requires recognizing the rewritten `StaticInvoke` form in Comet's serde dispatch and is left to a follow-up. This PR was scaffolded with the project's brainstorming, writing-plans, and subagent-driven-development skills. ## How are these changes tested? - Three unit suites under `spark/src/test/scala/org/apache/comet/udf/` covering valid input, null input, empty input vectors, malformed JSON, and (for `from_json`) PERMISSIVE mode all-null struct output. - `CometJsonJvmSuite` integration suite extending `CometTestBase` that flips `spark.comet.exec.json.engine=java` and runs the three expressions through `checkSparkAnswerAndOperator`, asserting Comet exec and byte-exact match with Spark. - Existing `CometJsonExpressionSuite` runs unchanged on default `engine=rust` to confirm no regression. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
