This is an automated email from the ASF dual-hosted git repository. goldmedal pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 18e54f2773 chore: migrate to `invoke_with_args` for datetime functions (#14876) 18e54f2773 is described below commit 18e54f277319d4a5f7449ff4130923d089665410 Author: Andy Yen <38731840+onlyjackfr...@users.noreply.github.com> AuthorDate: Wed Feb 26 09:04:45 2025 +0800 chore: migrate to `invoke_with_args` for datetime functions (#14876) * migrate to involk_with_args for datatime functions * Fix clippy --------- Co-authored-by: Andrew Lamb <and...@nerdnetworks.org> --- datafusion/functions/src/datetime/current_date.rs | 5 +- datafusion/functions/src/datetime/current_time.rs | 5 +- datafusion/functions/src/datetime/date_bin.rs | 179 ++++++++++++--------- datafusion/functions/src/datetime/date_part.rs | 10 +- datafusion/functions/src/datetime/date_trunc.rs | 44 +++-- datafusion/functions/src/datetime/from_unixtime.rs | 42 +++-- datafusion/functions/src/datetime/make_date.rs | 125 +++++++------- datafusion/functions/src/datetime/now.rs | 5 +- datafusion/functions/src/datetime/to_char.rs | 93 ++++++----- datafusion/functions/src/datetime/to_date.rs | 89 ++++++---- datafusion/functions/src/datetime/to_local_time.rs | 17 +- datafusion/functions/src/datetime/to_timestamp.rs | 76 +++++---- datafusion/functions/src/datetime/to_unixtime.rs | 23 ++- 13 files changed, 399 insertions(+), 314 deletions(-) diff --git a/datafusion/functions/src/datetime/current_date.rs b/datafusion/functions/src/datetime/current_date.rs index 868cbe23d6..9998e7d375 100644 --- a/datafusion/functions/src/datetime/current_date.rs +++ b/datafusion/functions/src/datetime/current_date.rs @@ -81,10 +81,9 @@ impl ScalarUDFImpl for CurrentDateFunc { Ok(Date32) } - fn invoke_batch( + fn invoke_with_args( &self, - _args: &[ColumnarValue], - _number_rows: usize, + _args: datafusion_expr::ScalarFunctionArgs, ) -> Result<ColumnarValue> { internal_err!( "invoke should not be called on a simplified current_date() function" diff --git a/datafusion/functions/src/datetime/current_time.rs b/datafusion/functions/src/datetime/current_time.rs index 142184508e..c416d0240b 100644 --- a/datafusion/functions/src/datetime/current_time.rs +++ b/datafusion/functions/src/datetime/current_time.rs @@ -78,10 +78,9 @@ impl ScalarUDFImpl for CurrentTimeFunc { Ok(Time64(Nanosecond)) } - fn invoke_batch( + fn invoke_with_args( &self, - _args: &[ColumnarValue], - _number_rows: usize, + _args: datafusion_expr::ScalarFunctionArgs, ) -> Result<ColumnarValue> { internal_err!( "invoke should not be called on a simplified current_time() function" diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index a288693699..5ffae46dde 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -187,11 +187,11 @@ impl ScalarUDFImpl for DateBinFunc { } } - fn invoke_batch( + fn invoke_with_args( &self, - args: &[ColumnarValue], - _number_rows: usize, + args: datafusion_expr::ScalarFunctionArgs, ) -> Result<ColumnarValue> { + let args = &args.args; if args.len() == 2 { // Default to unix EPOCH let origin = ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( @@ -514,10 +514,9 @@ mod tests { use chrono::TimeDelta; #[test] - #[allow(deprecated)] // TODO migrate UDF invoke from invoke_batch fn test_date_bin() { - let res = DateBinFunc::new().invoke_batch( - &[ + let mut args = datafusion_expr::ScalarFunctionArgs { + args: vec![ ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some( IntervalDayTime { days: 0, @@ -527,14 +526,16 @@ mod tests { ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ], - 1, - ); + number_rows: 1, + return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None), + }; + let res = DateBinFunc::new().invoke_with_args(args); assert!(res.is_ok()); let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>()); let batch_len = timestamps.len(); - let res = DateBinFunc::new().invoke_batch( - &[ + args = datafusion_expr::ScalarFunctionArgs { + args: vec![ ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some( IntervalDayTime { days: 0, @@ -544,12 +545,14 @@ mod tests { ColumnarValue::Array(timestamps), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ], - batch_len, - ); + number_rows: batch_len, + return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None), + }; + let res = DateBinFunc::new().invoke_with_args(args); assert!(res.is_ok()); - let res = DateBinFunc::new().invoke_batch( - &[ + args = datafusion_expr::ScalarFunctionArgs { + args: vec![ ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some( IntervalDayTime { days: 0, @@ -558,13 +561,15 @@ mod tests { ))), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ], - 1, - ); + number_rows: 1, + return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None), + }; + let res = DateBinFunc::new().invoke_with_args(args); assert!(res.is_ok()); // stride supports month-day-nano - let res = DateBinFunc::new().invoke_batch( - &[ + args = datafusion_expr::ScalarFunctionArgs { + args: vec![ ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some( IntervalMonthDayNano { months: 0, @@ -575,8 +580,10 @@ mod tests { ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ], - 1, - ); + number_rows: 1, + return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None), + }; + let res = DateBinFunc::new().invoke_with_args(args); assert!(res.is_ok()); // @@ -584,37 +591,42 @@ mod tests { // // invalid number of arguments - let res = DateBinFunc::new().invoke_batch( - &[ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some( + args = datafusion_expr::ScalarFunctionArgs { + args: vec![ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some( IntervalDayTime { days: 0, milliseconds: 1, }, )))], - 1, - ); + number_rows: 1, + return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None), + }; + let res = DateBinFunc::new().invoke_with_args(args); assert_eq!( res.err().unwrap().strip_backtrace(), "Execution error: DATE_BIN expected two or three arguments" ); // stride: invalid type - let res = DateBinFunc::new().invoke_batch( - &[ + args = datafusion_expr::ScalarFunctionArgs { + args: vec![ ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ], - 1, - ); + number_rows: 1, + return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None), + }; + let res = DateBinFunc::new().invoke_with_args(args); assert_eq!( res.err().unwrap().strip_backtrace(), "Execution error: DATE_BIN expects stride argument to be an INTERVAL but got Interval(YearMonth)" ); // stride: invalid value - let res = DateBinFunc::new().invoke_batch( - &[ + + args = datafusion_expr::ScalarFunctionArgs { + args: vec![ ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some( IntervalDayTime { days: 0, @@ -624,60 +636,69 @@ mod tests { ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ], - 1, - ); + number_rows: 1, + return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None), + }; + + let res = DateBinFunc::new().invoke_with_args(args); assert_eq!( res.err().unwrap().strip_backtrace(), "Execution error: DATE_BIN stride must be non-zero" ); // stride: overflow of day-time interval - let res = DateBinFunc::new().invoke_batch( - &[ + args = datafusion_expr::ScalarFunctionArgs { + args: vec![ ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some( IntervalDayTime::MAX, ))), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ], - 1, - ); + number_rows: 1, + return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None), + }; + let res = DateBinFunc::new().invoke_with_args(args); assert_eq!( res.err().unwrap().strip_backtrace(), "Execution error: DATE_BIN stride argument is too large" ); // stride: overflow of month-day-nano interval - let res = DateBinFunc::new().invoke_batch( - &[ + args = datafusion_expr::ScalarFunctionArgs { + args: vec![ ColumnarValue::Scalar(ScalarValue::new_interval_mdn(0, i32::MAX, 1)), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ], - 1, - ); + number_rows: 1, + return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None), + }; + let res = DateBinFunc::new().invoke_with_args(args); assert_eq!( res.err().unwrap().strip_backtrace(), "Execution error: DATE_BIN stride argument is too large" ); // stride: month intervals - let res = DateBinFunc::new().invoke_batch( - &[ + args = datafusion_expr::ScalarFunctionArgs { + args: vec![ ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1, 1, 1)), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ], - 1, - ); + number_rows: 1, + return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None), + }; + let res = DateBinFunc::new().invoke_with_args(args); assert_eq!( res.err().unwrap().strip_backtrace(), "This feature is not implemented: DATE_BIN stride does not support combination of month, day and nanosecond intervals" ); // origin: invalid type - let res = DateBinFunc::new().invoke_batch( - &[ + args = datafusion_expr::ScalarFunctionArgs { + args: vec![ ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some( IntervalDayTime { days: 0, @@ -687,15 +708,17 @@ mod tests { ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)), ], - 1, - ); + number_rows: 1, + return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None), + }; + let res = DateBinFunc::new().invoke_with_args(args); assert_eq!( res.err().unwrap().strip_backtrace(), "Execution error: DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision but got Timestamp(Microsecond, None)" ); - let res = DateBinFunc::new().invoke_batch( - &[ + args = datafusion_expr::ScalarFunctionArgs { + args: vec![ ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some( IntervalDayTime { days: 0, @@ -705,8 +728,10 @@ mod tests { ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ], - 1, - ); + number_rows: 1, + return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None), + }; + let res = DateBinFunc::new().invoke_with_args(args); assert!(res.is_ok()); // unsupported array type for stride @@ -720,14 +745,16 @@ mod tests { }) .collect::<IntervalDayTimeArray>(), ); - let res = DateBinFunc::new().invoke_batch( - &[ + args = datafusion_expr::ScalarFunctionArgs { + args: vec![ ColumnarValue::Array(intervals), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ], - 1, - ); + number_rows: 1, + return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None), + }; + let res = DateBinFunc::new().invoke_with_args(args); assert_eq!( res.err().unwrap().strip_backtrace(), "This feature is not implemented: DATE_BIN only supports literal values for the stride argument, not arrays" @@ -736,8 +763,8 @@ mod tests { // unsupported array type for origin let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>()); let batch_len = timestamps.len(); - let res = DateBinFunc::new().invoke_batch( - &[ + args = datafusion_expr::ScalarFunctionArgs { + args: vec![ ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some( IntervalDayTime { days: 0, @@ -747,8 +774,10 @@ mod tests { ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ColumnarValue::Array(timestamps), ], - batch_len, - ); + number_rows: batch_len, + return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None), + }; + let res = DateBinFunc::new().invoke_with_args(args); assert_eq!( res.err().unwrap().strip_backtrace(), "This feature is not implemented: DATE_BIN only supports literal values for the origin argument, not arrays" @@ -864,20 +893,22 @@ mod tests { .collect::<TimestampNanosecondArray>() .with_timezone_opt(tz_opt.clone()); let batch_len = input.len(); - #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch - let result = DateBinFunc::new() - .invoke_batch( - &[ - ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)), - ColumnarValue::Array(Arc::new(input)), - ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( - Some(string_to_timestamp_nanos(origin).unwrap()), - tz_opt.clone(), - )), - ], - batch_len, - ) - .unwrap(); + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)), + ColumnarValue::Array(Arc::new(input)), + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( + Some(string_to_timestamp_nanos(origin).unwrap()), + tz_opt.clone(), + )), + ], + number_rows: batch_len, + return_type: &DataType::Timestamp( + TimeUnit::Nanosecond, + tz_opt.clone(), + ), + }; + let result = DateBinFunc::new().invoke_with_args(args).unwrap(); if let ColumnarValue::Array(result) = result { assert_eq!( result.data_type(), diff --git a/datafusion/functions/src/datetime/date_part.rs b/datafusion/functions/src/datetime/date_part.rs index 49b7a4ec46..bfd06b39d2 100644 --- a/datafusion/functions/src/datetime/date_part.rs +++ b/datafusion/functions/src/datetime/date_part.rs @@ -167,11 +167,11 @@ impl ScalarUDFImpl for DatePartFunc { ) } - fn invoke_batch( + fn invoke_with_args( &self, - args: &[ColumnarValue], - _number_rows: usize, + args: datafusion_expr::ScalarFunctionArgs, ) -> Result<ColumnarValue> { + let args = args.args; let [part, array] = take_function_args(self.name(), args)?; let part = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = part { @@ -187,11 +187,11 @@ impl ScalarUDFImpl for DatePartFunc { let is_scalar = matches!(array, ColumnarValue::Scalar(_)); let array = match array { - ColumnarValue::Array(array) => Arc::clone(array), + ColumnarValue::Array(array) => Arc::clone(&array), ColumnarValue::Scalar(scalar) => scalar.to_array()?, }; - let part_trim = part_normalization(part); + let part_trim = part_normalization(&part); // using IntervalUnit here means we hand off all the work of supporting plurals (like "seconds") // and synonyms ( like "ms,msec,msecond,millisecond") to Arrow diff --git a/datafusion/functions/src/datetime/date_trunc.rs b/datafusion/functions/src/datetime/date_trunc.rs index 7c10cdd002..ed3eb228bf 100644 --- a/datafusion/functions/src/datetime/date_trunc.rs +++ b/datafusion/functions/src/datetime/date_trunc.rs @@ -160,11 +160,11 @@ impl ScalarUDFImpl for DateTruncFunc { } } - fn invoke_batch( + fn invoke_with_args( &self, - args: &[ColumnarValue], - _number_rows: usize, + args: datafusion_expr::ScalarFunctionArgs, ) -> Result<ColumnarValue> { + let args = args.args; let (granularity, array) = (&args[0], &args[1]); let granularity = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = @@ -726,16 +726,15 @@ mod tests { .collect::<TimestampNanosecondArray>() .with_timezone_opt(tz_opt.clone()); let batch_len = input.len(); - #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch - let result = DateTruncFunc::new() - .invoke_batch( - &[ - ColumnarValue::Scalar(ScalarValue::from("day")), - ColumnarValue::Array(Arc::new(input)), - ], - batch_len, - ) - .unwrap(); + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Scalar(ScalarValue::from("day")), + ColumnarValue::Array(Arc::new(input)), + ], + number_rows: batch_len, + return_type: &DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone()), + }; + let result = DateTruncFunc::new().invoke_with_args(args).unwrap(); if let ColumnarValue::Array(result) = result { assert_eq!( result.data_type(), @@ -889,16 +888,15 @@ mod tests { .collect::<TimestampNanosecondArray>() .with_timezone_opt(tz_opt.clone()); let batch_len = input.len(); - #[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch - let result = DateTruncFunc::new() - .invoke_batch( - &[ - ColumnarValue::Scalar(ScalarValue::from("hour")), - ColumnarValue::Array(Arc::new(input)), - ], - batch_len, - ) - .unwrap(); + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Scalar(ScalarValue::from("hour")), + ColumnarValue::Array(Arc::new(input)), + ], + number_rows: batch_len, + return_type: &DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone()), + }; + let result = DateTruncFunc::new().invoke_with_args(args).unwrap(); if let ColumnarValue::Array(result) = result { assert_eq!( result.data_type(), diff --git a/datafusion/functions/src/datetime/from_unixtime.rs b/datafusion/functions/src/datetime/from_unixtime.rs index 534b7a4fa6..ed8181452d 100644 --- a/datafusion/functions/src/datetime/from_unixtime.rs +++ b/datafusion/functions/src/datetime/from_unixtime.rs @@ -117,11 +117,11 @@ impl ScalarUDFImpl for FromUnixtimeFunc { internal_err!("call return_type_from_args instead") } - fn invoke_batch( + fn invoke_with_args( &self, - args: &[ColumnarValue], - _number_rows: usize, + args: datafusion_expr::ScalarFunctionArgs, ) -> Result<ColumnarValue> { + let args = args.args; let len = args.len(); if len != 1 && len != 2 { return exec_err!( @@ -161,16 +161,21 @@ impl ScalarUDFImpl for FromUnixtimeFunc { #[cfg(test)] mod test { use crate::datetime::from_unixtime::FromUnixtimeFunc; + use arrow::datatypes::DataType; + use arrow::datatypes::TimeUnit::Second; use datafusion_common::ScalarValue; use datafusion_common::ScalarValue::Int64; use datafusion_expr::{ColumnarValue, ScalarUDFImpl}; + use std::sync::Arc; #[test] fn test_without_timezone() { - let args = [ColumnarValue::Scalar(Int64(Some(1729900800)))]; - - // TODO use invoke_with_args - let result = FromUnixtimeFunc::new().invoke_batch(&args, 1).unwrap(); + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ColumnarValue::Scalar(Int64(Some(1729900800)))], + number_rows: 1, + return_type: &DataType::Timestamp(Second, None), + }; + let result = FromUnixtimeFunc::new().invoke_with_args(args).unwrap(); match result { ColumnarValue::Scalar(ScalarValue::TimestampSecond(Some(sec), None)) => { @@ -182,15 +187,20 @@ mod test { #[test] fn test_with_timezone() { - let args = [ - ColumnarValue::Scalar(Int64(Some(1729900800))), - ColumnarValue::Scalar(ScalarValue::Utf8(Some( - "America/New_York".to_string(), - ))), - ]; - - // TODO use invoke_with_args - let result = FromUnixtimeFunc::new().invoke_batch(&args, 2).unwrap(); + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Scalar(Int64(Some(1729900800))), + ColumnarValue::Scalar(ScalarValue::Utf8(Some( + "America/New_York".to_string(), + ))), + ], + number_rows: 2, + return_type: &DataType::Timestamp( + Second, + Some(Arc::from("America/New_York")), + ), + }; + let result = FromUnixtimeFunc::new().invoke_with_args(args).unwrap(); match result { ColumnarValue::Scalar(ScalarValue::TimestampSecond(Some(sec), Some(tz))) => { diff --git a/datafusion/functions/src/datetime/make_date.rs b/datafusion/functions/src/datetime/make_date.rs index f081dfd11e..929fa601f1 100644 --- a/datafusion/functions/src/datetime/make_date.rs +++ b/datafusion/functions/src/datetime/make_date.rs @@ -106,13 +106,13 @@ impl ScalarUDFImpl for MakeDateFunc { Ok(Date32) } - fn invoke_batch( + fn invoke_with_args( &self, - args: &[ColumnarValue], - _number_rows: usize, + args: datafusion_expr::ScalarFunctionArgs, ) -> Result<ColumnarValue> { // first, identify if any of the arguments is an Array. If yes, store its `len`, // as any scalar will need to be converted to an array of len `len`. + let args = args.args; let len = args .iter() .fold(Option::<usize>::None, |acc, arg| match arg { @@ -223,22 +223,24 @@ fn make_date_inner<F: FnMut(i32)>( mod tests { use crate::datetime::make_date::MakeDateFunc; use arrow::array::{Array, Date32Array, Int32Array, Int64Array, UInt32Array}; + use arrow::datatypes::DataType; use datafusion_common::ScalarValue; use datafusion_expr::{ColumnarValue, ScalarUDFImpl}; use std::sync::Arc; #[test] fn test_make_date() { - #[allow(deprecated)] // TODO migrate UDF to invoke from invoke_batch + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Scalar(ScalarValue::Int32(Some(2024))), + ColumnarValue::Scalar(ScalarValue::Int64(Some(1))), + ColumnarValue::Scalar(ScalarValue::UInt32(Some(14))), + ], + number_rows: 1, + return_type: &DataType::Date32, + }; let res = MakeDateFunc::new() - .invoke_batch( - &[ - ColumnarValue::Scalar(ScalarValue::Int32(Some(2024))), - ColumnarValue::Scalar(ScalarValue::Int64(Some(1))), - ColumnarValue::Scalar(ScalarValue::UInt32(Some(14))), - ], - 1, - ) + .invoke_with_args(args) .expect("that make_date parsed values without error"); if let ColumnarValue::Scalar(ScalarValue::Date32(date)) = res { @@ -247,16 +249,17 @@ mod tests { panic!("Expected a scalar value") } - #[allow(deprecated)] // TODO migrate UDF to invoke from invoke_batch + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Scalar(ScalarValue::Int64(Some(2024))), + ColumnarValue::Scalar(ScalarValue::UInt64(Some(1))), + ColumnarValue::Scalar(ScalarValue::UInt32(Some(14))), + ], + number_rows: 1, + return_type: &DataType::Date32, + }; let res = MakeDateFunc::new() - .invoke_batch( - &[ - ColumnarValue::Scalar(ScalarValue::Int64(Some(2024))), - ColumnarValue::Scalar(ScalarValue::UInt64(Some(1))), - ColumnarValue::Scalar(ScalarValue::UInt32(Some(14))), - ], - 1, - ) + .invoke_with_args(args) .expect("that make_date parsed values without error"); if let ColumnarValue::Scalar(ScalarValue::Date32(date)) = res { @@ -265,16 +268,17 @@ mod tests { panic!("Expected a scalar value") } - #[allow(deprecated)] // TODO migrate UDF to invoke from invoke_batch + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Scalar(ScalarValue::Utf8(Some("2024".to_string()))), + ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some("1".to_string()))), + ColumnarValue::Scalar(ScalarValue::Utf8(Some("14".to_string()))), + ], + number_rows: 1, + return_type: &DataType::Date32, + }; let res = MakeDateFunc::new() - .invoke_batch( - &[ - ColumnarValue::Scalar(ScalarValue::Utf8(Some("2024".to_string()))), - ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some("1".to_string()))), - ColumnarValue::Scalar(ScalarValue::Utf8(Some("14".to_string()))), - ], - 1, - ) + .invoke_with_args(args) .expect("that make_date parsed values without error"); if let ColumnarValue::Scalar(ScalarValue::Date32(date)) = res { @@ -287,16 +291,17 @@ mod tests { let months = Arc::new((1..5).map(Some).collect::<Int32Array>()); let days = Arc::new((11..15).map(Some).collect::<UInt32Array>()); let batch_len = years.len(); - #[allow(deprecated)] // TODO migrate UDF to invoke from invoke_batch + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(years), + ColumnarValue::Array(months), + ColumnarValue::Array(days), + ], + number_rows: batch_len, + return_type: &DataType::Date32, + }; let res = MakeDateFunc::new() - .invoke_batch( - &[ - ColumnarValue::Array(years), - ColumnarValue::Array(months), - ColumnarValue::Array(days), - ], - batch_len, - ) + .invoke_with_args(args) .expect("that make_date parsed values without error"); if let ColumnarValue::Array(array) = res { @@ -316,54 +321,60 @@ mod tests { // // invalid number of arguments - #[allow(deprecated)] // TODO migrate UDF to invoke from invoke_batch - let res = MakeDateFunc::new() - .invoke_batch(&[ColumnarValue::Scalar(ScalarValue::Int32(Some(1)))], 1); + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ColumnarValue::Scalar(ScalarValue::Int32(Some(1)))], + number_rows: 1, + return_type: &DataType::Date32, + }; + let res = MakeDateFunc::new().invoke_with_args(args); assert_eq!( res.err().unwrap().strip_backtrace(), "Execution error: make_date function requires 3 arguments, got 1" ); // invalid type - #[allow(deprecated)] // TODO migrate UDF to invoke from invoke_batch - let res = MakeDateFunc::new().invoke_batch( - &[ + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ], - 1, - ); + number_rows: 1, + return_type: &DataType::Date32, + }; + let res = MakeDateFunc::new().invoke_with_args(args); assert_eq!( res.err().unwrap().strip_backtrace(), "Arrow error: Cast error: Casting from Interval(YearMonth) to Int32 not supported" ); // overflow of month - #[allow(deprecated)] // TODO migrate UDF to invoke from invoke_batch - let res = MakeDateFunc::new().invoke_batch( - &[ + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ ColumnarValue::Scalar(ScalarValue::Int32(Some(2023))), ColumnarValue::Scalar(ScalarValue::UInt64(Some(u64::MAX))), ColumnarValue::Scalar(ScalarValue::Int32(Some(22))), ], - 1, - ); + number_rows: 1, + return_type: &DataType::Date32, + }; + let res = MakeDateFunc::new().invoke_with_args(args); assert_eq!( res.err().unwrap().strip_backtrace(), "Arrow error: Cast error: Can't cast value 18446744073709551615 to type Int32" ); // overflow of day - #[allow(deprecated)] // TODO migrate UDF to invoke from invoke_batch - let res = MakeDateFunc::new().invoke_batch( - &[ + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ ColumnarValue::Scalar(ScalarValue::Int32(Some(2023))), ColumnarValue::Scalar(ScalarValue::Int32(Some(22))), ColumnarValue::Scalar(ScalarValue::UInt32(Some(u32::MAX))), ], - 1, - ); + number_rows: 1, + return_type: &DataType::Date32, + }; + let res = MakeDateFunc::new().invoke_with_args(args); assert_eq!( res.err().unwrap().strip_backtrace(), "Arrow error: Cast error: Can't cast value 4294967295 to type Int32" diff --git a/datafusion/functions/src/datetime/now.rs b/datafusion/functions/src/datetime/now.rs index 76e8757376..b26dc52cee 100644 --- a/datafusion/functions/src/datetime/now.rs +++ b/datafusion/functions/src/datetime/now.rs @@ -88,10 +88,9 @@ impl ScalarUDFImpl for NowFunc { internal_err!("return_type_from_args should be called instead") } - fn invoke_batch( + fn invoke_with_args( &self, - _args: &[ColumnarValue], - _number_rows: usize, + _args: datafusion_expr::ScalarFunctionArgs, ) -> Result<ColumnarValue> { internal_err!("invoke should not be called on a simplified now() function") } diff --git a/datafusion/functions/src/datetime/to_char.rs b/datafusion/functions/src/datetime/to_char.rs index b049ca01ac..a9630ea4f9 100644 --- a/datafusion/functions/src/datetime/to_char.rs +++ b/datafusion/functions/src/datetime/to_char.rs @@ -135,12 +135,12 @@ impl ScalarUDFImpl for ToCharFunc { Ok(Utf8) } - fn invoke_batch( + fn invoke_with_args( &self, - args: &[ColumnarValue], - _number_rows: usize, + args: datafusion_expr::ScalarFunctionArgs, ) -> Result<ColumnarValue> { - let [date_time, format] = take_function_args(self.name(), args)?; + let args = args.args; + let [date_time, format] = take_function_args(self.name(), &args)?; match format { ColumnarValue::Scalar(ScalarValue::Utf8(None)) @@ -152,7 +152,7 @@ impl ScalarUDFImpl for ToCharFunc { // invoke to_char_scalar with the known string, without converting to array _to_char_scalar(date_time.clone(), Some(format)) } - ColumnarValue::Array(_) => _to_char_array(args), + ColumnarValue::Array(_) => _to_char_array(&args), _ => { exec_err!( "Format for `to_char` must be non-null Utf8, received {:?}", @@ -297,6 +297,7 @@ mod tests { TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, }; + use arrow::datatypes::DataType; use chrono::{NaiveDateTime, Timelike}; use datafusion_common::ScalarValue; use datafusion_expr::{ColumnarValue, ScalarUDFImpl}; @@ -378,12 +379,13 @@ mod tests { ]; for (value, format, expected) in scalar_data { - #[allow(deprecated)] // TODO migrate UDF to invoke from invoke_batch + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ColumnarValue::Scalar(value), ColumnarValue::Scalar(format)], + number_rows: 1, + return_type: &DataType::Utf8, + }; let result = ToCharFunc::new() - .invoke_batch( - &[ColumnarValue::Scalar(value), ColumnarValue::Scalar(format)], - 1, - ) + .invoke_with_args(args) .expect("that to_char parsed values without error"); if let ColumnarValue::Scalar(ScalarValue::Utf8(date)) = result { @@ -457,15 +459,16 @@ mod tests { for (value, format, expected) in scalar_array_data { let batch_len = format.len(); - #[allow(deprecated)] // TODO migrate UDF to invoke from invoke_batch + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Scalar(value), + ColumnarValue::Array(Arc::new(format) as ArrayRef), + ], + number_rows: batch_len, + return_type: &DataType::Utf8, + }; let result = ToCharFunc::new() - .invoke_batch( - &[ - ColumnarValue::Scalar(value), - ColumnarValue::Array(Arc::new(format) as ArrayRef), - ], - batch_len, - ) + .invoke_with_args(args) .expect("that to_char parsed values without error"); if let ColumnarValue::Scalar(ScalarValue::Utf8(date)) = result { @@ -587,15 +590,16 @@ mod tests { for (value, format, expected) in array_scalar_data { let batch_len = value.len(); - #[allow(deprecated)] // TODO migrate UDF to invoke from invoke_batch + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(value as ArrayRef), + ColumnarValue::Scalar(format), + ], + number_rows: batch_len, + return_type: &DataType::Utf8, + }; let result = ToCharFunc::new() - .invoke_batch( - &[ - ColumnarValue::Array(value as ArrayRef), - ColumnarValue::Scalar(format), - ], - batch_len, - ) + .invoke_with_args(args) .expect("that to_char parsed values without error"); if let ColumnarValue::Array(result) = result { @@ -608,15 +612,16 @@ mod tests { for (value, format, expected) in array_array_data { let batch_len = value.len(); - #[allow(deprecated)] // TODO migrate UDF to invoke from invoke_batch + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(value), + ColumnarValue::Array(Arc::new(format) as ArrayRef), + ], + number_rows: batch_len, + return_type: &DataType::Utf8, + }; let result = ToCharFunc::new() - .invoke_batch( - &[ - ColumnarValue::Array(value), - ColumnarValue::Array(Arc::new(format) as ArrayRef), - ], - batch_len, - ) + .invoke_with_args(args) .expect("that to_char parsed values without error"); if let ColumnarValue::Array(result) = result { @@ -632,23 +637,27 @@ mod tests { // // invalid number of arguments - #[allow(deprecated)] // TODO migrate UDF to invoke from invoke_batch - let result = ToCharFunc::new() - .invoke_batch(&[ColumnarValue::Scalar(ScalarValue::Int32(Some(1)))], 1); + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ColumnarValue::Scalar(ScalarValue::Int32(Some(1)))], + number_rows: 1, + return_type: &DataType::Utf8, + }; + let result = ToCharFunc::new().invoke_with_args(args); assert_eq!( result.err().unwrap().strip_backtrace(), "Execution error: to_char function requires 2 arguments, got 1" ); // invalid type - #[allow(deprecated)] // TODO migrate UDF to invoke from invoke_batch - let result = ToCharFunc::new().invoke_batch( - &[ + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ ColumnarValue::Scalar(ScalarValue::Int32(Some(1))), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ], - 1, - ); + number_rows: 1, + return_type: &DataType::Utf8, + }; + let result = ToCharFunc::new().invoke_with_args(args); assert_eq!( result.err().unwrap().strip_backtrace(), "Execution error: Format for `to_char` must be non-null Utf8, received Timestamp(Nanosecond, None)" diff --git a/datafusion/functions/src/datetime/to_date.rs b/datafusion/functions/src/datetime/to_date.rs index 6d873ab524..91740b2c31 100644 --- a/datafusion/functions/src/datetime/to_date.rs +++ b/datafusion/functions/src/datetime/to_date.rs @@ -130,25 +130,25 @@ impl ScalarUDFImpl for ToDateFunc { Ok(Date32) } - fn invoke_batch( + fn invoke_with_args( &self, - args: &[ColumnarValue], - _number_rows: usize, + args: datafusion_expr::ScalarFunctionArgs, ) -> Result<ColumnarValue> { + let args = args.args; if args.is_empty() { return exec_err!("to_date function requires 1 or more arguments, got 0"); } // validate that any args after the first one are Utf8 if args.len() > 1 { - validate_data_types(args, "to_date")?; + validate_data_types(&args, "to_date")?; } match args[0].data_type() { Int32 | Int64 | Null | Float64 | Date32 | Date64 => { args[0].cast_to(&Date32, None) } - Utf8View | LargeUtf8 | Utf8 => self.to_date(args), + Utf8View | LargeUtf8 | Utf8 => self.to_date(&args), other => { exec_err!("Unsupported data type {:?} for function to_date", other) } @@ -163,6 +163,7 @@ impl ScalarUDFImpl for ToDateFunc { #[cfg(test)] mod tests { use arrow::array::{Array, Date32Array, GenericStringArray, StringViewArray}; + use arrow::datatypes::DataType; use arrow::{compute::kernels::cast_utils::Parser, datatypes::Date32Type}; use datafusion_common::ScalarValue; use datafusion_expr::{ColumnarValue, ScalarUDFImpl}; @@ -207,9 +208,12 @@ mod tests { } fn test_scalar(sv: ScalarValue, tc: &TestCase) { - #[allow(deprecated)] // TODO migrate UDF to invoke from invoke_batch - let to_date_result = - ToDateFunc::new().invoke_batch(&[ColumnarValue::Scalar(sv)], 1); + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ColumnarValue::Scalar(sv)], + number_rows: 1, + return_type: &DataType::Date32, + }; + let to_date_result = ToDateFunc::new().invoke_with_args(args); match to_date_result { Ok(ColumnarValue::Scalar(ScalarValue::Date32(date_val))) => { @@ -230,9 +234,12 @@ mod tests { { let date_array = A::from(vec![tc.date_str]); let batch_len = date_array.len(); - #[allow(deprecated)] // TODO migrate UDF to invoke from invoke_batch - let to_date_result = ToDateFunc::new() - .invoke_batch(&[ColumnarValue::Array(Arc::new(date_array))], batch_len); + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::new(date_array))], + number_rows: batch_len, + return_type: &DataType::Date32, + }; + let to_date_result = ToDateFunc::new().invoke_with_args(args); match to_date_result { Ok(ColumnarValue::Array(a)) => { @@ -321,14 +328,15 @@ mod tests { fn test_scalar(sv: ScalarValue, tc: &TestCase) { let format_scalar = ScalarValue::Utf8(Some(tc.format_str.to_string())); - #[allow(deprecated)] // TODO migrate UDF to invoke from invoke_batch - let to_date_result = ToDateFunc::new().invoke_batch( - &[ + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ ColumnarValue::Scalar(sv), ColumnarValue::Scalar(format_scalar), ], - 1, - ); + number_rows: 1, + return_type: &DataType::Date32, + }; + let to_date_result = ToDateFunc::new().invoke_with_args(args); match to_date_result { Ok(ColumnarValue::Scalar(ScalarValue::Date32(date_val))) => { @@ -350,14 +358,15 @@ mod tests { let format_array = A::from(vec![tc.format_str]); let batch_len = date_array.len(); - #[allow(deprecated)] // TODO migrate UDF to invoke from invoke_batch - let to_date_result = ToDateFunc::new().invoke_batch( - &[ + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ ColumnarValue::Array(Arc::new(date_array)), ColumnarValue::Array(Arc::new(format_array)), ], - batch_len, - ); + number_rows: batch_len, + return_type: &DataType::Date32, + }; + let to_date_result = ToDateFunc::new().invoke_with_args(args); match to_date_result { Ok(ColumnarValue::Array(a)) => { @@ -389,15 +398,16 @@ mod tests { let format1_scalar = ScalarValue::Utf8(Some("%Y-%m-%d".into())); let format2_scalar = ScalarValue::Utf8(Some("%Y/%m/%d".into())); - #[allow(deprecated)] // TODO migrate UDF to invoke from invoke_batch - let to_date_result = ToDateFunc::new().invoke_batch( - &[ + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ ColumnarValue::Scalar(formatted_date_scalar), ColumnarValue::Scalar(format1_scalar), ColumnarValue::Scalar(format2_scalar), ], - 1, - ); + number_rows: 1, + return_type: &DataType::Date32, + }; + let to_date_result = ToDateFunc::new().invoke_with_args(args); match to_date_result { Ok(ColumnarValue::Scalar(ScalarValue::Date32(date_val))) => { @@ -421,9 +431,12 @@ mod tests { for date_str in test_cases { let formatted_date_scalar = ScalarValue::Utf8(Some(date_str.into())); - #[allow(deprecated)] // TODO migrate UDF to invoke from invoke_batch - let to_date_result = ToDateFunc::new() - .invoke_batch(&[ColumnarValue::Scalar(formatted_date_scalar)], 1); + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ColumnarValue::Scalar(formatted_date_scalar)], + number_rows: 1, + return_type: &DataType::Date32, + }; + let to_date_result = ToDateFunc::new().invoke_with_args(args); match to_date_result { Ok(ColumnarValue::Scalar(ScalarValue::Date32(date_val))) => { @@ -440,9 +453,12 @@ mod tests { let date_str = "20241231"; let date_scalar = ScalarValue::Utf8(Some(date_str.into())); - #[allow(deprecated)] // TODO migrate UDF to invoke from invoke_batch - let to_date_result = - ToDateFunc::new().invoke_batch(&[ColumnarValue::Scalar(date_scalar)], 1); + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ColumnarValue::Scalar(date_scalar)], + number_rows: 1, + return_type: &DataType::Date32, + }; + let to_date_result = ToDateFunc::new().invoke_with_args(args); match to_date_result { Ok(ColumnarValue::Scalar(ScalarValue::Date32(date_val))) => { @@ -462,9 +478,12 @@ mod tests { let date_str = "202412311"; let date_scalar = ScalarValue::Utf8(Some(date_str.into())); - #[allow(deprecated)] // TODO migrate UDF to invoke from invoke_batch - let to_date_result = - ToDateFunc::new().invoke_batch(&[ColumnarValue::Scalar(date_scalar)], 1); + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ColumnarValue::Scalar(date_scalar)], + number_rows: 1, + return_type: &DataType::Date32, + }; + let to_date_result = ToDateFunc::new().invoke_with_args(args); if let Ok(ColumnarValue::Scalar(ScalarValue::Date32(_))) = to_date_result { panic!( diff --git a/datafusion/functions/src/datetime/to_local_time.rs b/datafusion/functions/src/datetime/to_local_time.rs index 0e235735e2..8dbef90cdc 100644 --- a/datafusion/functions/src/datetime/to_local_time.rs +++ b/datafusion/functions/src/datetime/to_local_time.rs @@ -366,12 +366,11 @@ impl ScalarUDFImpl for ToLocalTimeFunc { } } - fn invoke_batch( + fn invoke_with_args( &self, - args: &[ColumnarValue], - _number_rows: usize, + args: datafusion_expr::ScalarFunctionArgs, ) -> Result<ColumnarValue> { - let [time_value] = take_function_args(self.name(), args)?; + let [time_value] = take_function_args(self.name(), args.args)?; self.to_local_time(&[time_value.clone()]) } @@ -603,10 +602,12 @@ mod tests { .map(|s| Some(string_to_timestamp_nanos(s).unwrap())) .collect::<TimestampNanosecondArray>(); let batch_size = input.len(); - #[allow(deprecated)] // TODO: migrate to invoke_with_args - let result = ToLocalTimeFunc::new() - .invoke_batch(&[ColumnarValue::Array(Arc::new(input))], batch_size) - .unwrap(); + let args = ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::new(input))], + number_rows: batch_size, + return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None), + }; + let result = ToLocalTimeFunc::new().invoke_with_args(args).unwrap(); if let ColumnarValue::Array(result) = result { assert_eq!( result.data_type(), diff --git a/datafusion/functions/src/datetime/to_timestamp.rs b/datafusion/functions/src/datetime/to_timestamp.rs index 9d88fc00b9..f1c61fe2b9 100644 --- a/datafusion/functions/src/datetime/to_timestamp.rs +++ b/datafusion/functions/src/datetime/to_timestamp.rs @@ -299,11 +299,11 @@ impl ScalarUDFImpl for ToTimestampFunc { Ok(return_type_for(&arg_types[0], Nanosecond)) } - fn invoke_batch( + fn invoke_with_args( &self, - args: &[ColumnarValue], - _number_rows: usize, + args: datafusion_expr::ScalarFunctionArgs, ) -> Result<ColumnarValue> { + let args = args.args; if args.is_empty() { return exec_err!( "to_timestamp function requires 1 or more arguments, got {}", @@ -313,7 +313,7 @@ impl ScalarUDFImpl for ToTimestampFunc { // validate that any args after the first one are Utf8 if args.len() > 1 { - validate_data_types(args, "to_timestamp")?; + validate_data_types(&args, "to_timestamp")?; } match args[0].data_type() { @@ -327,7 +327,7 @@ impl ScalarUDFImpl for ToTimestampFunc { args[0].cast_to(&Timestamp(Nanosecond, Some(tz)), None) } Utf8View | LargeUtf8 | Utf8 => { - to_timestamp_impl::<TimestampNanosecondType>(args, "to_timestamp") + to_timestamp_impl::<TimestampNanosecondType>(&args, "to_timestamp") } other => { exec_err!( @@ -359,11 +359,11 @@ impl ScalarUDFImpl for ToTimestampSecondsFunc { Ok(return_type_for(&arg_types[0], Second)) } - fn invoke_batch( + fn invoke_with_args( &self, - args: &[ColumnarValue], - _number_rows: usize, + args: datafusion_expr::ScalarFunctionArgs, ) -> Result<ColumnarValue> { + let args = args.args; if args.is_empty() { return exec_err!( "to_timestamp_seconds function requires 1 or more arguments, got {}", @@ -373,7 +373,7 @@ impl ScalarUDFImpl for ToTimestampSecondsFunc { // validate that any args after the first one are Utf8 if args.len() > 1 { - validate_data_types(args, "to_timestamp")?; + validate_data_types(&args, "to_timestamp")?; } match args[0].data_type() { @@ -382,7 +382,7 @@ impl ScalarUDFImpl for ToTimestampSecondsFunc { } Timestamp(_, Some(tz)) => args[0].cast_to(&Timestamp(Second, Some(tz)), None), Utf8View | LargeUtf8 | Utf8 => { - to_timestamp_impl::<TimestampSecondType>(args, "to_timestamp_seconds") + to_timestamp_impl::<TimestampSecondType>(&args, "to_timestamp_seconds") } other => { exec_err!( @@ -414,11 +414,11 @@ impl ScalarUDFImpl for ToTimestampMillisFunc { Ok(return_type_for(&arg_types[0], Millisecond)) } - fn invoke_batch( + fn invoke_with_args( &self, - args: &[ColumnarValue], - _number_rows: usize, + args: datafusion_expr::ScalarFunctionArgs, ) -> Result<ColumnarValue> { + let args = args.args; if args.is_empty() { return exec_err!( "to_timestamp_millis function requires 1 or more arguments, got {}", @@ -428,7 +428,7 @@ impl ScalarUDFImpl for ToTimestampMillisFunc { // validate that any args after the first one are Utf8 if args.len() > 1 { - validate_data_types(args, "to_timestamp")?; + validate_data_types(&args, "to_timestamp")?; } match args[0].data_type() { @@ -438,9 +438,10 @@ impl ScalarUDFImpl for ToTimestampMillisFunc { Timestamp(_, Some(tz)) => { args[0].cast_to(&Timestamp(Millisecond, Some(tz)), None) } - Utf8View | LargeUtf8 | Utf8 => { - to_timestamp_impl::<TimestampMillisecondType>(args, "to_timestamp_millis") - } + Utf8View | LargeUtf8 | Utf8 => to_timestamp_impl::<TimestampMillisecondType>( + &args, + "to_timestamp_millis", + ), other => { exec_err!( "Unsupported data type {:?} for function to_timestamp_millis", @@ -471,11 +472,11 @@ impl ScalarUDFImpl for ToTimestampMicrosFunc { Ok(return_type_for(&arg_types[0], Microsecond)) } - fn invoke_batch( + fn invoke_with_args( &self, - args: &[ColumnarValue], - _number_rows: usize, + args: datafusion_expr::ScalarFunctionArgs, ) -> Result<ColumnarValue> { + let args = args.args; if args.is_empty() { return exec_err!( "to_timestamp_micros function requires 1 or more arguments, got {}", @@ -485,7 +486,7 @@ impl ScalarUDFImpl for ToTimestampMicrosFunc { // validate that any args after the first one are Utf8 if args.len() > 1 { - validate_data_types(args, "to_timestamp")?; + validate_data_types(&args, "to_timestamp")?; } match args[0].data_type() { @@ -495,9 +496,10 @@ impl ScalarUDFImpl for ToTimestampMicrosFunc { Timestamp(_, Some(tz)) => { args[0].cast_to(&Timestamp(Microsecond, Some(tz)), None) } - Utf8View | LargeUtf8 | Utf8 => { - to_timestamp_impl::<TimestampMicrosecondType>(args, "to_timestamp_micros") - } + Utf8View | LargeUtf8 | Utf8 => to_timestamp_impl::<TimestampMicrosecondType>( + &args, + "to_timestamp_micros", + ), other => { exec_err!( "Unsupported data type {:?} for function to_timestamp_micros", @@ -528,11 +530,11 @@ impl ScalarUDFImpl for ToTimestampNanosFunc { Ok(return_type_for(&arg_types[0], Nanosecond)) } - fn invoke_batch( + fn invoke_with_args( &self, - args: &[ColumnarValue], - _number_rows: usize, + args: datafusion_expr::ScalarFunctionArgs, ) -> Result<ColumnarValue> { + let args = args.args; if args.is_empty() { return exec_err!( "to_timestamp_nanos function requires 1 or more arguments, got {}", @@ -542,7 +544,7 @@ impl ScalarUDFImpl for ToTimestampNanosFunc { // validate that any args after the first one are Utf8 if args.len() > 1 { - validate_data_types(args, "to_timestamp")?; + validate_data_types(&args, "to_timestamp")?; } match args[0].data_type() { @@ -553,7 +555,7 @@ impl ScalarUDFImpl for ToTimestampNanosFunc { args[0].cast_to(&Timestamp(Nanosecond, Some(tz)), None) } Utf8View | LargeUtf8 | Utf8 => { - to_timestamp_impl::<TimestampNanosecondType>(args, "to_timestamp_nanos") + to_timestamp_impl::<TimestampNanosecondType>(&args, "to_timestamp_nanos") } other => { exec_err!( @@ -988,9 +990,13 @@ mod tests { for array in arrays { let rt = udf.return_type(&[array.data_type()]).unwrap(); assert!(matches!(rt, Timestamp(_, Some(_)))); - #[allow(deprecated)] // TODO: migrate to invoke_with_args + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![array.clone()], + number_rows: 4, + return_type: &rt, + }; let res = udf - .invoke_batch(&[array.clone()], 1) + .invoke_with_args(args) .expect("that to_timestamp parsed values without error"); let array = match res { ColumnarValue::Array(res) => res, @@ -1031,9 +1037,13 @@ mod tests { for array in arrays { let rt = udf.return_type(&[array.data_type()]).unwrap(); assert!(matches!(rt, Timestamp(_, None))); - #[allow(deprecated)] // TODO: migrate to invoke_with_args + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![array.clone()], + number_rows: 5, + return_type: &rt, + }; let res = udf - .invoke_batch(&[array.clone()], 1) + .invoke_with_args(args) .expect("that to_timestamp parsed values without error"); let array = match res { ColumnarValue::Array(res) => res, diff --git a/datafusion/functions/src/datetime/to_unixtime.rs b/datafusion/functions/src/datetime/to_unixtime.rs index 265fa7b59c..653ec10851 100644 --- a/datafusion/functions/src/datetime/to_unixtime.rs +++ b/datafusion/functions/src/datetime/to_unixtime.rs @@ -90,33 +90,32 @@ impl ScalarUDFImpl for ToUnixtimeFunc { Ok(DataType::Int64) } - fn invoke_batch( + fn invoke_with_args( &self, - args: &[ColumnarValue], - batch_size: usize, + args: datafusion_expr::ScalarFunctionArgs, ) -> Result<ColumnarValue> { - if args.is_empty() { + let arg_args = &args.args; + if arg_args.is_empty() { return exec_err!("to_unixtime function requires 1 or more arguments, got 0"); } // validate that any args after the first one are Utf8 - if args.len() > 1 { - validate_data_types(args, "to_unixtime")?; + if arg_args.len() > 1 { + validate_data_types(arg_args, "to_unixtime")?; } - match args[0].data_type() { + match arg_args[0].data_type() { DataType::Int32 | DataType::Int64 | DataType::Null | DataType::Float64 => { - args[0].cast_to(&DataType::Int64, None) + arg_args[0].cast_to(&DataType::Int64, None) } - DataType::Date64 | DataType::Date32 => args[0] + DataType::Date64 | DataType::Date32 => arg_args[0] .cast_to(&DataType::Timestamp(TimeUnit::Second, None), None)? .cast_to(&DataType::Int64, None), - DataType::Timestamp(_, tz) => args[0] + DataType::Timestamp(_, tz) => arg_args[0] .cast_to(&DataType::Timestamp(TimeUnit::Second, tz), None)? .cast_to(&DataType::Int64, None), - #[allow(deprecated)] // TODO: migrate to invoke_with_args DataType::Utf8 => ToTimestampSecondsFunc::new() - .invoke_batch(args, batch_size)? + .invoke_with_args(args)? .cast_to(&DataType::Int64, None), other => { exec_err!("Unsupported data type {:?} for function to_unixtime", other) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org