cetra3 commented on code in PR #21353: URL: https://github.com/apache/datafusion/pull/21353#discussion_r3480702111
########## datafusion/functions-json/src/json_get_str.rs: ########## @@ -0,0 +1,512 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`JsonGetStr`] UDF implementation for extracting string values from JSON. + +use arrow::array::{Array, AsArray, StringArray, StringBuilder}; +use arrow::datatypes::DataType; +use datafusion_common::{Result, ScalarValue, exec_err, plan_err}; +use datafusion_expr::{ + ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, + TypeSignature, Volatility, +}; +use datafusion_macros::user_doc; +use std::sync::Arc; + +#[user_doc( + doc_section(label = "JSON Functions"), + description = r#"Extract a string value from a JSON string at the given path. + +The path is specified as zero or more keys (strings for object access) or +indices (integers for array access). With no path keys, the function returns +the JSON value itself if it is a string (jq `.` semantics). Otherwise, the +function navigates into the JSON document and returns the value at the path. + +Returns NULL if the input JSON is not a valid JSON string, the path does not +exist, the value at the path is not a string, or any argument (input JSON or +path key) is NULL. Invalid JSON is silently treated as NULL — no error is +returned."#, + syntax_example = "json_get_str(json_string [, key1, key2, ...])", + sql_example = r#"```sql +> select json_get_str('{"a": {"b": "hello"}}', 'a', 'b'); ++-----------------------------------------------------------+ +| json_get_str(Utf8("{"a": {"b": "hello"}}"),Utf8("a"),Utf8("b")) | ++-----------------------------------------------------------+ +| hello | ++-----------------------------------------------------------+ +```"#, + argument( + name = "json_string", + description = "A string containing valid JSON data." + ), + argument( + name = "keys", + description = "Zero or more path keys (string for object key, integer for array index)." + ) +)] +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct JsonGetStr { + signature: Signature, +} + +impl Default for JsonGetStr { + fn default() -> Self { + Self::new() + } +} + +impl JsonGetStr { + pub fn new() -> Self { + Self { + signature: Signature::new(TypeSignature::UserDefined, Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for JsonGetStr { + fn name(&self) -> &str { + "json_get_str" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> { + Ok(DataType::Utf8) + } + + fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> { + if arg_types.is_empty() { + return plan_err!( + "json_get_str requires at least 1 argument (json_string), got 0" + ); + } + // First arg must be a string type; remaining are path keys (string or integer) + let json_type = match &arg_types[0] { + DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => { + arg_types[0].clone() + } + DataType::Null => DataType::Utf8, + other => { + return plan_err!( + "json_get_str first argument must be a string type, got {other}" + ); + } + }; + let mut coerced = vec![json_type]; + for (i, dt) in arg_types[1..].iter().enumerate() { + let coerced_type = match dt { + DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => dt.clone(), + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 => dt.clone(), + DataType::Null => DataType::Utf8, + other => { + return plan_err!( + "json_get_str path argument {} must be a string or integer type, got {other}", + i + 1 + ); + } + }; + coerced.push(coerced_type); + } + Ok(coerced) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> { + let json_arg = &args.args[0]; + let path_args = &args.args[1..]; + + // Build path keys. If any path argument is NULL, the result is NULL — + // return early on the first NULL encountered without scanning the rest. + let mut path_keys: Vec<PathKey<'_>> = Vec::with_capacity(path_args.len()); + for arg in path_args { + let key = match arg { + ColumnarValue::Scalar( + ScalarValue::Utf8(Some(s)) + | ScalarValue::LargeUtf8(Some(s)) + | ScalarValue::Utf8View(Some(s)), + ) => PathKey::Key(s.as_str()), + ColumnarValue::Scalar(ScalarValue::Int64(Some(i))) => { + PathKey::Index(*i as usize) + } + ColumnarValue::Scalar(ScalarValue::Int32(Some(i))) => { + PathKey::Index(*i as usize) + } + ColumnarValue::Scalar(ScalarValue::UInt64(Some(i))) => { + PathKey::Index(*i as usize) + } + ColumnarValue::Scalar(s) if s.is_null() => { + return Ok(null_result(json_arg)); + } + _ => { + return exec_err!( + "json_get_str path arguments must be scalar strings or integers, got {:?}", + arg.data_type() + ); + } + }; + path_keys.push(key); + } + + match json_arg { + ColumnarValue::Array(array) => { + let len = array.len(); + let mut builder = StringBuilder::with_capacity(len, len * 32); + + match array.data_type() { + DataType::Utf8 => { + let arr = array.as_string::<i32>(); + for i in 0..len { + if arr.is_null(i) { + builder.append_null(); + } else { + match extract_str_at_path(arr.value(i), &path_keys) { + Some(s) => builder.append_value(s), + None => builder.append_null(), + } + } + } + } + DataType::LargeUtf8 => { + let arr = array.as_string::<i64>(); + for i in 0..len { + if arr.is_null(i) { + builder.append_null(); + } else { + match extract_str_at_path(arr.value(i), &path_keys) { + Some(s) => builder.append_value(s), + None => builder.append_null(), + } + } + } + } + DataType::Utf8View => { + let arr = array.as_string_view(); + for i in 0..len { + if arr.is_null(i) { + builder.append_null(); + } else { + match extract_str_at_path(arr.value(i), &path_keys) { + Some(s) => builder.append_value(s), + None => builder.append_null(), + } + } + } + } + other => { + return exec_err!( + "json_get_str first argument must be a string type, got {other:?}" + ); + } + } + + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) + } + ColumnarValue::Scalar(scalar) => { + let json_str = match scalar { + ScalarValue::Utf8(Some(s)) + | ScalarValue::LargeUtf8(Some(s)) + | ScalarValue::Utf8View(Some(s)) => s, + _ => return Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None))), + }; + + let result = extract_str_at_path(json_str, &path_keys); + Ok(ColumnarValue::Scalar(ScalarValue::Utf8(result))) + } + } + } + + fn documentation(&self) -> Option<&Documentation> { + self.doc() + } +} + +/// Represents a path element for navigating JSON. Borrows from the input +/// arguments to avoid per-call clones of path strings. +#[derive(Debug, Clone, Copy)] +enum PathKey<'a> { + Key(&'a str), + Index(usize), +} + +/// Helper: build the NULL-typed result matching the input shape. +fn null_result(json_arg: &ColumnarValue) -> ColumnarValue { + match json_arg { + ColumnarValue::Array(arr) => { + ColumnarValue::Array(Arc::new(StringArray::new_null(arr.len()))) + } + ColumnarValue::Scalar(_) => ColumnarValue::Scalar(ScalarValue::Utf8(None)), + } +} + +/// Navigate a JSON string using the given path and extract a string value. +/// +/// Returns `None` (silently) if: +/// - The input is not valid JSON +/// - The path does not exist in the JSON document +/// - The value at the path is not a JSON string +/// +/// With an empty `path`, returns the JSON value itself if it is a string +/// (jq `.` semantics). +fn extract_str_at_path(json_str: &str, path: &[PathKey<'_>]) -> Option<String> { + let root: serde_json::Value = serde_json::from_str(json_str).ok()?; Review Comment: This is going to be pretty much unusable for us. `serde_json::Value` will allocate the entire `json_str` here, so if you have a giant JSON string, it will produce a pretty large copy of memory. This is why datafusion-functions-json is using jiter, to do zero copy reading of json structures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
