advancedxy commented on code in PR #449: URL: https://github.com/apache/datafusion-comet/pull/449#discussion_r1614516643
########## core/src/execution/datafusion/expressions/scalar_funcs/hex.rs: ########## @@ -0,0 +1,371 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow::{ + array::{as_dictionary_array, as_string_array}, + datatypes::Int32Type, +}; +use arrow_array::StringArray; +use arrow_schema::DataType; +use datafusion::logical_expr::ColumnarValue; +use datafusion_common::{ + cast::{as_binary_array, as_fixed_size_binary_array, as_int64_array}, + exec_err, DataFusionError, ScalarValue, +}; +use std::fmt::Write; + +fn hex_int64(num: i64) -> String { + if num >= 0 { + format!("{:X}", num) + } else { + format!("{:016X}", num as u64) + } +} + +fn hex_bytes(bytes: &[u8]) -> Vec<u8> { + let length = bytes.len(); + let mut value = vec![0; length * 2]; + let mut i = 0; + while i < length { + value[i * 2] = (bytes[i] & 0xF0) >> 4; + value[i * 2 + 1] = bytes[i] & 0x0F; + i += 1; + } + value +} + +fn hex_string(s: &str) -> Vec<u8> { + hex_bytes(s.as_bytes()) +} + +fn hex_bytes_to_string(bytes: &[u8]) -> Result<String, std::fmt::Error> { + let mut hex_string = String::with_capacity(bytes.len() * 2); + for byte in bytes { + write!(&mut hex_string, "{:X}", byte)?; + } + Ok(hex_string) +} + +pub(super) fn spark_hex(args: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> { + if args.len() != 1 { + return Err(DataFusionError::Internal( + "hex expects exactly one argument".to_string(), + )); + } + + match &args[0] { + ColumnarValue::Array(array) => match array.data_type() { + DataType::Int64 => { + let array = as_int64_array(array)?; + + let hexed: Vec<Option<String>> = array.iter().map(|v| v.map(hex_int64)).collect(); + + let string_array = StringArray::from(hexed); + Ok(ColumnarValue::Array(Arc::new(string_array))) + } + DataType::Utf8 | DataType::LargeUtf8 => { + let array = as_string_array(array); + + let hexed: Vec<Option<String>> = array + .iter() + .map(|v| v.map(|v| hex_bytes_to_string(&hex_string(v))).transpose()) + .collect::<Result<_, _>>()?; + + let string_array = StringArray::from(hexed); + + Ok(ColumnarValue::Array(Arc::new(string_array))) + } + DataType::Binary => { + let array = as_binary_array(array)?; + + let hexed: Vec<Option<String>> = array + .iter() + .map(|v| v.map(|v| hex_bytes_to_string(&hex_bytes(v))).transpose()) + .collect::<Result<_, _>>()?; + + let string_array = StringArray::from(hexed); + + Ok(ColumnarValue::Array(Arc::new(string_array))) + } + DataType::FixedSizeBinary(_) => { + let array = as_fixed_size_binary_array(array)?; + + let hexed: Vec<Option<String>> = array + .iter() + .map(|v| v.map(|v| hex_bytes_to_string(&hex_bytes(v))).transpose()) + .collect::<Result<_, _>>()?; + + let string_array = StringArray::from(hexed); + + Ok(ColumnarValue::Array(Arc::new(string_array))) + } + DataType::Dictionary(_, value_type) if matches!(**value_type, DataType::Int64) => { Review Comment: > The trick we used to use is adding Cast(child) in QueryPlanSerde that unpacks dictionaries. Do you have any concrete examples of how this works by any chance? I remember I saw some ~~unnecessary~~ cast operation in the query plan serde file, didn't realize it was for unpacking dictionaries. > We should consolidate the unpacking logic, otherwise we will need to add it every function. Or until that happens we can workaround with Cast Yes, maybe this logic should added in the rust planner side, which can unpack the dictionary automatically if it knows the expression cannot handle dictionary types. ########## spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala: ########## @@ -1038,6 +1038,20 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } } + test("hex") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "hex.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) + + withParquetTable(path.toString, "tbl") { + // _9 and _10 (uint8 and uint16) not supported + checkSparkAnswerAndOperator( + "SELECT hex(_1), hex(_2), hex(_3), hex(_4), hex(_5), hex(_6), hex(_7), hex(_8), hex(_11), hex(_12), hex(_13), hex(_14), hex(_15), hex(_16), hex(_17), hex(_18), hex(_19), hex(_20) FROM tbl") Review Comment: I think we can keep the scalar functions, and it should be pretty straightforward to test scalar input? Namely, it should be something like: ``` select hex(10), hex('abc') ``` The constant literal should be encoded as a ScalarValue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org