kumarUjjawal commented on code in PR #19540: URL: https://github.com/apache/datafusion/pull/19540#discussion_r2653032202
########## datafusion/functions/src/datetime/to_time.rs: ########## @@ -0,0 +1,582 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::datetime::common::*; +use arrow::array::builder::PrimitiveBuilder; +use arrow::array::cast::AsArray; +use arrow::array::temporal_conversions::NANOSECONDS; +use arrow::array::types::Time64NanosecondType; +use arrow::array::{Array, PrimitiveArray, StringArrayType}; +use arrow::datatypes::DataType; +use arrow::datatypes::DataType::*; +use chrono::{NaiveTime, Timelike}; +use datafusion_common::{Result, ScalarValue, exec_err}; +use datafusion_expr::{ + ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, +}; +use datafusion_macros::user_doc; +use std::any::Any; +use std::sync::Arc; + +/// Nanoseconds per second (1 billion) +const NANOS_PER_SECOND: i64 = NANOSECONDS; +/// Nanoseconds per minute +const NANOS_PER_MINUTE: i64 = 60 * NANOS_PER_SECOND; +/// Nanoseconds per hour +const NANOS_PER_HOUR: i64 = 60 * NANOS_PER_MINUTE; +/// Nanoseconds per day (used for extracting time from timestamp) +const NANOS_PER_DAY: i64 = 24 * NANOS_PER_HOUR; + +/// Default time formats to try when parsing without an explicit format +const DEFAULT_TIME_FORMATS: &[&str] = &[ + "%H:%M:%S%.f", // 12:30:45.123456789 + "%H:%M:%S", // 12:30:45 + "%H:%M", // 12:30 +]; + +#[user_doc( + doc_section(label = "Time and Date Functions"), + description = r"Converts a value to a time (`HH:MM:SS.nnnnnnnnn`). +Supports strings and timestamps as input. +Strings are parsed as `HH:MM:SS`, `HH:MM:SS.nnnnnnnnn`, or `HH:MM` if no [Chrono format](https://docs.rs/chrono/latest/chrono/format/strftime/index.html)s are provided. +Timestamps will have the time portion extracted. +Returns the corresponding time. + +Note: `to_time` returns Time64(Nanosecond), which represents the time of day in nanoseconds since midnight.", + syntax_example = "to_time('12:30:45', '%H:%M:%S')", + sql_example = r#"```sql +> select to_time('12:30:45'); ++---------------------------+ +| to_time(Utf8("12:30:45")) | ++---------------------------+ +| 12:30:45 | ++---------------------------+ +> select to_time('12-30-45', '%H-%M-%S'); ++--------------------------------------------+ +| to_time(Utf8("12-30-45"),Utf8("%H-%M-%S")) | ++--------------------------------------------+ +| 12:30:45 | ++--------------------------------------------+ +> select to_time('2024-01-15 14:30:45'::timestamp); ++--------------------------------------------------+ +| to_time(Utf8("2024-01-15 14:30:45")) | ++--------------------------------------------------+ +| 14:30:45 | ++--------------------------------------------------+ +``` + +Additional examples can be found [here](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/builtin_functions/date_time.rs) +"#, + standard_argument(name = "expression", prefix = "String or Timestamp"), + argument( + name = "format_n", + description = r"Optional [Chrono format](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) strings to use to parse the expression. Formats will be tried in the order + they appear with the first successful one being returned. If none of the formats successfully parse the expression + an error will be returned." + ) +)] +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct ToTimeFunc { + signature: Signature, +} + +impl Default for ToTimeFunc { + fn default() -> Self { + Self::new() + } +} + +impl ToTimeFunc { + pub fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + } + } + + fn to_time(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> { + let formats: Vec<&str> = if args.len() > 1 { + // Collect format strings from arguments + args[1..] + .iter() + .filter_map(|arg| { + if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) + | ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(s))) + | ColumnarValue::Scalar(ScalarValue::Utf8View(Some(s))) = arg + { + Some(s.as_str()) + } else { + None + } + }) + .collect() + } else { + DEFAULT_TIME_FORMATS.to_vec() + }; + + match &args[0] { + ColumnarValue::Scalar(ScalarValue::Utf8(s)) + | ColumnarValue::Scalar(ScalarValue::LargeUtf8(s)) + | ColumnarValue::Scalar(ScalarValue::Utf8View(s)) => { + let result = s + .as_ref() + .map(|s| parse_time_with_formats(s, &formats)) + .transpose()?; + Ok(ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(result))) + } + ColumnarValue::Array(array) => { + let result = match array.data_type() { + Utf8 => parse_time_array(&array.as_string::<i32>(), &formats)?, + LargeUtf8 => parse_time_array(&array.as_string::<i64>(), &formats)?, + Utf8View => parse_time_array(&array.as_string_view(), &formats)?, + other => return exec_err!("Unsupported type for to_time: {}", other), + }; + Ok(ColumnarValue::Array(Arc::new(result))) + } + other => exec_err!("Unsupported argument for to_time: {:?}", other), + } + } +} + +impl ScalarUDFImpl for ToTimeFunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "to_time" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> { + Ok(Time64(arrow::datatypes::TimeUnit::Nanosecond)) + } + + fn invoke_with_args( + &self, + args: datafusion_expr::ScalarFunctionArgs, + ) -> Result<ColumnarValue> { + let args = args.args; + if args.is_empty() { + return exec_err!("to_time function requires 1 or more arguments, got 0"); + } + + // validate that any args after the first one are Utf8 + if args.len() > 1 { + validate_data_types(&args, "to_time")?; + } + + match args[0].data_type() { + Utf8View | LargeUtf8 | Utf8 => self.to_time(&args), + Null => Ok(ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(None))), + // Support timestamp input by extracting time portion + Timestamp(_, _) => { + let nanos = extract_time_from_timestamp(&args[0])?; + Ok(nanos) + } + other => { + exec_err!("Unsupported data type {} for function to_time", other) + } + } + } + + fn documentation(&self) -> Option<&Documentation> { + self.doc() + } +} + +/// Parse time array using the provided formats +fn parse_time_array<'a, A: StringArrayType<'a>>( + array: &A, + formats: &[&str], +) -> Result<PrimitiveArray<Time64NanosecondType>> { + let mut builder: PrimitiveBuilder<Time64NanosecondType> = + PrimitiveArray::builder(array.len()); + + for i in 0..array.len() { + if array.is_null(i) { + builder.append_null(); + } else { + let s = array.value(i); + let nanos = parse_time_with_formats(s, formats)?; + builder.append_value(nanos); + } + } + + Ok(builder.finish()) +} + +/// Parse time string using provided formats +fn parse_time_with_formats(s: &str, formats: &[&str]) -> Result<i64> { + for format in formats { + if let Ok(time) = NaiveTime::parse_from_str(s, format) { + return Ok(time_to_nanos(time)); + } + } + exec_err!( + "Error parsing '{}' as time. Tried formats: {:?}", + s, + formats + ) +} + +/// Convert NaiveTime to nanoseconds since midnight +fn time_to_nanos(time: NaiveTime) -> i64 { + let hours = time.hour() as i64; + let minutes = time.minute() as i64; + let seconds = time.second() as i64; + let nanos = time.nanosecond() as i64; + + hours * NANOS_PER_HOUR + + minutes * NANOS_PER_MINUTE + + seconds * NANOS_PER_SECOND + + nanos +} + +/// Extract time portion from timestamp (nanoseconds since midnight) +fn extract_time_from_timestamp(arg: &ColumnarValue) -> Result<ColumnarValue> { + match arg { + ColumnarValue::Scalar(scalar) => { + let nanos = match scalar { + ScalarValue::TimestampNanosecond(Some(ts), _) => *ts % NANOS_PER_DAY, + ScalarValue::TimestampMicrosecond(Some(ts), _) => { + (*ts * 1_000) % NANOS_PER_DAY + } + ScalarValue::TimestampMillisecond(Some(ts), _) => { + (*ts * 1_000_000) % NANOS_PER_DAY + } + ScalarValue::TimestampSecond(Some(ts), _) => { + (*ts * NANOS_PER_SECOND) % NANOS_PER_DAY + } + ScalarValue::TimestampNanosecond(None, _) + | ScalarValue::TimestampMicrosecond(None, _) + | ScalarValue::TimestampMillisecond(None, _) + | ScalarValue::TimestampSecond(None, _) => { + return Ok(ColumnarValue::Scalar(ScalarValue::Time64Nanosecond( + None, + ))); + } + _ => return exec_err!("Unsupported timestamp type for to_time"), + }; + // Handle negative timestamps (before epoch) - normalize to positive time + let normalized_nanos = if nanos < 0 { + nanos + NANOS_PER_DAY + } else { + nanos + }; + Ok(ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(Some( + normalized_nanos, + )))) + } + ColumnarValue::Array(array) => { + let len = array.len(); + let mut builder: PrimitiveBuilder<Time64NanosecondType> = + PrimitiveArray::builder(len); + + match array.data_type() { + Timestamp(arrow::datatypes::TimeUnit::Nanosecond, _) => { + let ts_array = + array.as_primitive::<arrow::datatypes::TimestampNanosecondType>(); Review Comment: This is much better. Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
