This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 8550010bd5 Fix input handling for encoding functions & various
refactors (#18754)
8550010bd5 is described below
commit 8550010bd55ccae217b08240664c44fa8bda8261
Author: Jeffrey Vo <[email protected]>
AuthorDate: Fri Dec 19 18:04:31 2025 +0900
Fix input handling for encoding functions & various refactors (#18754)
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
Part of #12725
## Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
Started refactoring encoding functions (`encode`, `decode`) to remove
user defined signature (per linked issue). However, discovered in the
process it was bugged in handling of certain inputs. For example on main
we get these errors:
```sql
DataFusion CLI v51.0.0
> select encode(arrow_cast(column1, 'LargeUtf8'), 'hex') from values ('a'),
('b');
Internal error: Function 'encode' returned value of type 'Utf8' while the
following type was promised at planning time and expected: 'LargeUtf8'.
This issue was likely caused by a bug in DataFusion's code. Please help us
to resolve this by filing a bug report in our issue tracker:
https://github.com/apache/datafusion/issues
> select encode(arrow_cast(column1, 'LargeBinary'), 'hex') from values
('a'), ('b');
Internal error: Function 'encode' returned value of type 'Utf8' while the
following type was promised at planning time and expected: 'LargeUtf8'.
This issue was likely caused by a bug in DataFusion's code. Please help us
to resolve this by filing a bug report in our issue tracker:
https://github.com/apache/datafusion/issues
> select encode(arrow_cast(column1, 'BinaryView'), 'hex') from values
('a'), ('b');
Error during planning: Execution error: Function 'encode' user-defined
coercion failed with "Error during planning: 1st argument should be Utf8 or
Binary or Null, got BinaryView" No function matches the given name and argument
types 'encode(BinaryView, Utf8)'. You might need to add explicit type casts.
Candidate functions:
encode(UserDefined)
```
- LargeUtf8/LargeBinary array inputs are broken
- BinaryView input not supported (but Utf8View input is supported)
So went about fixing this input handling as well as doing various
refactors to try simplify the code.
(I also discovered #18746 in the process of this refactor).
## What changes are included in this PR?
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
Refactor signatures away from user defined to the signature coercion
API; importantly, we now accept only binary inputs, letting string
inputs be coerced by type coercion. This simplifies the internal code of
encode/decode to only need to consider binary inputs, instead of
duplicating essentially exact code for string inputs (given for string
inputs we just grabbed the underlying bytes anyway)
Consolidating the inner functions used by encode/decode to try
simplify/inline where possible.
## Are these changes tested?
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
Added new SLTs.
## Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
No.
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
---
datafusion/functions/src/encoding/inner.rs | 745 ++++++++++++------------
datafusion/sqllogictest/test_files/encoding.slt | 143 +++--
2 files changed, 485 insertions(+), 403 deletions(-)
diff --git a/datafusion/functions/src/encoding/inner.rs
b/datafusion/functions/src/encoding/inner.rs
index 7eed40dcc8..7b72c264e5 100644
--- a/datafusion/functions/src/encoding/inner.rs
+++ b/datafusion/functions/src/encoding/inner.rs
@@ -19,31 +19,30 @@
use arrow::{
array::{
- Array, ArrayRef, BinaryArray, GenericByteArray, OffsetSizeTrait,
StringArray,
+ Array, ArrayRef, AsArray, BinaryArrayType, FixedSizeBinaryArray,
+ GenericBinaryArray, GenericStringArray, OffsetSizeTrait,
},
- datatypes::{ByteArrayType, DataType},
+ datatypes::DataType,
};
use arrow_buffer::{Buffer, OffsetBufferBuilder};
use base64::{
Engine as _,
engine::{DecodePaddingMode, GeneralPurpose, GeneralPurposeConfig},
};
-use datafusion_common::{DataFusionError, Result};
-use datafusion_common::{ScalarValue, exec_err, internal_datafusion_err};
use datafusion_common::{
- cast::{
- as_fixed_size_binary_array, as_generic_binary_array,
as_generic_string_array,
- },
+ DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err,
internal_err,
not_impl_err, plan_err,
+ types::{NativeType, logical_string},
utils::take_function_args,
};
-use datafusion_expr::{ColumnarValue, Documentation};
-use std::sync::Arc;
-use std::{fmt, str::FromStr};
-
-use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
+use datafusion_expr::{
+ Coercion, ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl,
Signature,
+ TypeSignatureClass, Volatility,
+};
use datafusion_macros::user_doc;
use std::any::Any;
+use std::fmt;
+use std::sync::Arc;
// Allow padding characters, but don't require them, and don't generate them.
const BASE64_ENGINE: GeneralPurpose = GeneralPurpose::new(
@@ -81,7 +80,17 @@ impl Default for EncodeFunc {
impl EncodeFunc {
pub fn new() -> Self {
Self {
- signature: Signature::user_defined(Volatility::Immutable),
+ signature: Signature::coercible(
+ vec![
+ Coercion::new_implicit(
+ TypeSignatureClass::Binary,
+ vec![TypeSignatureClass::Native(logical_string())],
+ NativeType::Binary,
+ ),
+
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+ ],
+ Volatility::Immutable,
+ ),
}
}
}
@@ -90,6 +99,7 @@ impl ScalarUDFImpl for EncodeFunc {
fn as_any(&self) -> &dyn Any {
self
}
+
fn name(&self) -> &str {
"encode"
}
@@ -99,52 +109,21 @@ impl ScalarUDFImpl for EncodeFunc {
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
- use DataType::*;
-
- Ok(match arg_types[0] {
- Utf8 => Utf8,
- LargeUtf8 => LargeUtf8,
- Utf8View => Utf8,
- Binary => Utf8,
- LargeBinary => LargeUtf8,
- FixedSizeBinary(_) => Utf8,
- Null => Null,
- _ => {
- return plan_err!(
- "The encode function can only accept Utf8 or Binary or
Null."
- );
- }
- })
- }
-
- fn invoke_with_args(
- &self,
- args: datafusion_expr::ScalarFunctionArgs,
- ) -> Result<ColumnarValue> {
- encode(&args.args)
- }
-
- fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
- let [expression, format] = take_function_args(self.name(), arg_types)?;
-
- if format != &DataType::Utf8 {
- return Err(DataFusionError::Plan("2nd argument should be
Utf8".into()));
+ match &arg_types[0] {
+ DataType::LargeBinary => Ok(DataType::LargeUtf8),
+ _ => Ok(DataType::Utf8),
}
+ }
+ fn invoke_with_args(&self, args: ScalarFunctionArgs) ->
Result<ColumnarValue> {
+ let [expression, encoding] = take_function_args("encode", &args.args)?;
+ let encoding = Encoding::try_from(encoding)?;
match expression {
- DataType::Utf8 | DataType::Utf8View | DataType::Null => {
- Ok(vec![DataType::Utf8; 2])
+ _ if expression.data_type().is_null() => {
+ Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None)))
}
- DataType::LargeUtf8 => Ok(vec![DataType::LargeUtf8,
DataType::Utf8]),
- DataType::Binary => Ok(vec![DataType::Binary, DataType::Utf8]),
- DataType::LargeBinary => Ok(vec![DataType::LargeBinary,
DataType::Utf8]),
- DataType::FixedSizeBinary(sz) => {
- Ok(vec![DataType::FixedSizeBinary(*sz), DataType::Utf8])
- }
- _ => plan_err!(
- "1st argument should be Utf8 or Binary or Null, got {:?}",
- arg_types[0]
- ),
+ ColumnarValue::Array(array) => encode_array(array, encoding),
+ ColumnarValue::Scalar(scalar) => encode_scalar(scalar, encoding),
}
}
@@ -178,7 +157,17 @@ impl Default for DecodeFunc {
impl DecodeFunc {
pub fn new() -> Self {
Self {
- signature: Signature::user_defined(Volatility::Immutable),
+ signature: Signature::coercible(
+ vec![
+ Coercion::new_implicit(
+ TypeSignatureClass::Binary,
+ vec![TypeSignatureClass::Native(logical_string())],
+ NativeType::Binary,
+ ),
+
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
+ ],
+ Volatility::Immutable,
+ ),
}
}
}
@@ -187,6 +176,7 @@ impl ScalarUDFImpl for DecodeFunc {
fn as_any(&self) -> &dyn Any {
self
}
+
fn name(&self) -> &str {
"decode"
}
@@ -196,40 +186,21 @@ impl ScalarUDFImpl for DecodeFunc {
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
- Ok(arg_types[0].to_owned())
- }
-
- fn invoke_with_args(
- &self,
- args: datafusion_expr::ScalarFunctionArgs,
- ) -> Result<ColumnarValue> {
- decode(&args.args)
- }
-
- fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
- if arg_types.len() != 2 {
- return plan_err!(
- "{} expects to get 2 arguments, but got {}",
- self.name(),
- arg_types.len()
- );
- }
-
- if arg_types[1] != DataType::Utf8 {
- return plan_err!("2nd argument should be Utf8");
+ match &arg_types[0] {
+ DataType::LargeBinary => Ok(DataType::LargeBinary),
+ _ => Ok(DataType::Binary),
}
+ }
- match arg_types[0] {
- DataType::Utf8 | DataType::Utf8View | DataType::Binary |
DataType::Null => {
- Ok(vec![DataType::Binary, DataType::Utf8])
- }
- DataType::LargeUtf8 | DataType::LargeBinary => {
- Ok(vec![DataType::LargeBinary, DataType::Utf8])
+ fn invoke_with_args(&self, args: ScalarFunctionArgs) ->
Result<ColumnarValue> {
+ let [expression, encoding] = take_function_args("decode", &args.args)?;
+ let encoding = Encoding::try_from(encoding)?;
+ match expression {
+ _ if expression.data_type().is_null() => {
+ Ok(ColumnarValue::Scalar(ScalarValue::Binary(None)))
}
- _ => plan_err!(
- "1st argument should be Utf8 or Binary or Null, got {:?}",
- arg_types[0]
- ),
+ ColumnarValue::Array(array) => decode_array(array, encoding),
+ ColumnarValue::Scalar(scalar) => decode_scalar(scalar, encoding),
}
}
@@ -238,339 +209,385 @@ impl ScalarUDFImpl for DecodeFunc {
}
}
-#[derive(Debug, Copy, Clone)]
-enum Encoding {
- Base64,
- Hex,
-}
-
-fn encode_process(value: &ColumnarValue, encoding: Encoding) ->
Result<ColumnarValue> {
+fn encode_scalar(value: &ScalarValue, encoding: Encoding) ->
Result<ColumnarValue> {
match value {
- ColumnarValue::Array(a) => match a.data_type() {
- DataType::Utf8 => encoding.encode_utf8_array::<i32>(a.as_ref()),
- DataType::LargeUtf8 =>
encoding.encode_utf8_array::<i64>(a.as_ref()),
- DataType::Utf8View =>
encoding.encode_utf8_array::<i32>(a.as_ref()),
- DataType::Binary =>
encoding.encode_binary_array::<i32>(a.as_ref()),
- DataType::LargeBinary =>
encoding.encode_binary_array::<i64>(a.as_ref()),
- DataType::FixedSizeBinary(_) => {
- encoding.encode_fixed_size_binary_array(a.as_ref())
- }
- other => exec_err!(
- "Unsupported data type {other:?} for function
encode({encoding})"
- ),
- },
- ColumnarValue::Scalar(scalar) => {
- match scalar {
- ScalarValue::Utf8(a) => {
- Ok(encoding.encode_scalar(a.as_ref().map(|s: &String|
s.as_bytes())))
- }
- ScalarValue::LargeUtf8(a) => Ok(encoding
- .encode_large_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))),
- ScalarValue::Utf8View(a) => {
- Ok(encoding.encode_scalar(a.as_ref().map(|s: &String|
s.as_bytes())))
- }
- ScalarValue::Binary(a) => Ok(
- encoding.encode_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))
- ),
- ScalarValue::LargeBinary(a) => Ok(encoding
- .encode_large_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))),
- ScalarValue::FixedSizeBinary(_, a) => Ok(
- encoding.encode_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))
- ),
- other => exec_err!(
- "Unsupported data type {other:?} for function
encode({encoding})"
- ),
- }
+ ScalarValue::Binary(maybe_bytes)
+ | ScalarValue::BinaryView(maybe_bytes)
+ | ScalarValue::FixedSizeBinary(_, maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::Utf8(
+ maybe_bytes
+ .as_ref()
+ .map(|bytes| encoding.encode_bytes(bytes)),
+ )))
}
+ ScalarValue::LargeBinary(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::LargeUtf8(
+ maybe_bytes
+ .as_ref()
+ .map(|bytes| encoding.encode_bytes(bytes)),
+ )))
+ }
+ v => internal_err!("Unexpected value for encode: {v}"),
}
}
-fn decode_process(value: &ColumnarValue, encoding: Encoding) ->
Result<ColumnarValue> {
- match value {
- ColumnarValue::Array(a) => match a.data_type() {
- DataType::Utf8 => encoding.decode_utf8_array::<i32>(a.as_ref()),
- DataType::LargeUtf8 =>
encoding.decode_utf8_array::<i64>(a.as_ref()),
- DataType::Utf8View =>
encoding.decode_utf8_array::<i32>(a.as_ref()),
- DataType::Binary =>
encoding.decode_binary_array::<i32>(a.as_ref()),
- DataType::LargeBinary =>
encoding.decode_binary_array::<i64>(a.as_ref()),
- other => exec_err!(
- "Unsupported data type {other:?} for function
decode({encoding})"
- ),
- },
- ColumnarValue::Scalar(scalar) => {
- match scalar {
- ScalarValue::Utf8(a) => {
- encoding.decode_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))
- }
- ScalarValue::LargeUtf8(a) => encoding
- .decode_large_scalar(a.as_ref().map(|s: &String|
s.as_bytes())),
- ScalarValue::Utf8View(a) => {
- encoding.decode_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))
- }
- ScalarValue::Binary(a) => {
- encoding.decode_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))
- }
- ScalarValue::LargeBinary(a) => encoding
- .decode_large_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice())),
- other => exec_err!(
- "Unsupported data type {other:?} for function
decode({encoding})"
- ),
- }
+fn encode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue>
{
+ let array = match array.data_type() {
+ DataType::Binary => encoding.encode_array::<_,
i32>(&array.as_binary::<i32>()),
+ DataType::BinaryView => encoding.encode_array::<_,
i32>(&array.as_binary_view()),
+ DataType::LargeBinary => {
+ encoding.encode_array::<_, i64>(&array.as_binary::<i64>())
}
- }
+ DataType::FixedSizeBinary(_) => {
+ encoding.encode_fsb_array(array.as_fixed_size_binary())
+ }
+ dt => {
+ internal_err!("Unexpected data type for encode: {dt}")
+ }
+ };
+ array.map(ColumnarValue::Array)
}
-fn hex_encode(input: &[u8]) -> String {
- hex::encode(input)
+fn decode_scalar(value: &ScalarValue, encoding: Encoding) ->
Result<ColumnarValue> {
+ match value {
+ ScalarValue::Binary(maybe_bytes)
+ | ScalarValue::BinaryView(maybe_bytes)
+ | ScalarValue::FixedSizeBinary(_, maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::Binary(
+ maybe_bytes
+ .as_ref()
+ .map(|x| encoding.decode_bytes(x))
+ .transpose()?,
+ )))
+ }
+ ScalarValue::LargeBinary(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary(
+ maybe_bytes
+ .as_ref()
+ .map(|x| encoding.decode_bytes(x))
+ .transpose()?,
+ )))
+ }
+ v => internal_err!("Unexpected value for decode: {v}"),
+ }
}
-fn base64_encode(input: &[u8]) -> String {
- BASE64_ENGINE.encode(input)
+/// Estimate how many bytes are actually represented by the array; in case the
+/// the array slices it's internal buffer, this returns the byte size of that
slice
+/// but not the byte size of the entire buffer.
+///
+/// This is an estimation only as it can estimate higher if null slots are
non-zero
+/// sized.
+fn estimate_byte_data_size<O: OffsetSizeTrait>(array: &GenericBinaryArray<O>)
-> usize {
+ let offsets = array.value_offsets();
+ // Unwraps are safe as should always have 1 element in offset buffer
+ let start = *offsets.first().unwrap();
+ let end = *offsets.last().unwrap();
+ let data_size = end - start;
+ data_size.as_usize()
}
-fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
- // only write input / 2 bytes to buf
- let out_len = input.len() / 2;
- let buf = &mut buf[..out_len];
- hex::decode_to_slice(input, buf)
- .map_err(|e| internal_datafusion_err!("Failed to decode from hex:
{e}"))?;
- Ok(out_len)
+fn decode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue>
{
+ let array = match array.data_type() {
+ DataType::Binary => {
+ let array = array.as_binary::<i32>();
+ encoding.decode_array::<_, i32>(&array,
estimate_byte_data_size(array))
+ }
+ DataType::BinaryView => {
+ let array = array.as_binary_view();
+ // Don't know if there is a more strict upper bound we can infer
+ // for view arrays byte data size.
+ encoding.decode_array::<_, i32>(&array,
array.get_buffer_memory_size())
+ }
+ DataType::LargeBinary => {
+ let array = array.as_binary::<i64>();
+ encoding.decode_array::<_, i64>(&array,
estimate_byte_data_size(array))
+ }
+ DataType::FixedSizeBinary(size) => {
+ let array = array.as_fixed_size_binary();
+ // TODO: could we be more conservative by accounting for nulls?
+ let estimate = array.len().saturating_mul(*size as usize);
+ encoding.decode_fsb_array(array, estimate)
+ }
+ dt => {
+ internal_err!("Unexpected data type for decode: {dt}")
+ }
+ };
+ array.map(ColumnarValue::Array)
}
-fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
- BASE64_ENGINE
- .decode_slice(input, buf)
- .map_err(|e| internal_datafusion_err!("Failed to decode from base64:
{e}"))
+#[derive(Debug, Copy, Clone)]
+enum Encoding {
+ Base64,
+ Hex,
}
-macro_rules! encode_to_array {
- ($METHOD: ident, $INPUT:expr) => {{
- let utf8_array: StringArray = $INPUT
- .iter()
- .map(|x| x.map(|x| $METHOD(x.as_ref())))
- .collect();
- Arc::new(utf8_array)
- }};
+impl fmt::Display for Encoding {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "{}", format!("{self:?}").to_lowercase())
+ }
}
-fn decode_to_array<F, T: ByteArrayType>(
- method: F,
- input: &GenericByteArray<T>,
- conservative_upper_bound_size: usize,
-) -> Result<ArrayRef>
-where
- F: Fn(&[u8], &mut [u8]) -> Result<usize>,
-{
- let mut values = vec![0; conservative_upper_bound_size];
- let mut offsets = OffsetBufferBuilder::new(input.len());
- let mut total_bytes_decoded = 0;
- for v in input {
- if let Some(v) = v {
- let cursor = &mut values[total_bytes_decoded..];
- let decoded = method(v.as_ref(), cursor)?;
- total_bytes_decoded += decoded;
- offsets.push_length(decoded);
- } else {
- offsets.push_length(0);
+impl TryFrom<&ColumnarValue> for Encoding {
+ type Error = DataFusionError;
+
+ fn try_from(encoding: &ColumnarValue) -> Result<Self> {
+ let encoding = match encoding {
+ ColumnarValue::Scalar(encoding) => match
encoding.try_as_str().flatten() {
+ Some(encoding) => encoding,
+ _ => return exec_err!("Encoding must be a non-null string"),
+ },
+ ColumnarValue::Array(_) => {
+ return not_impl_err!(
+ "Encoding must be a scalar; array specified encoding is
not yet supported"
+ );
+ }
+ };
+ match encoding {
+ "base64" => Ok(Self::Base64),
+ "hex" => Ok(Self::Hex),
+ _ => {
+ let options = [Self::Base64, Self::Hex]
+ .iter()
+ .map(|i| i.to_string())
+ .collect::<Vec<_>>()
+ .join(", ");
+ plan_err!(
+ "There is no built-in encoding named '{encoding}',
currently supported encodings are: {options}"
+ )
+ }
}
}
- // We reserved an upper bound size for the values buffer, but we only use
the actual size
- values.truncate(total_bytes_decoded);
- let binary_array = BinaryArray::try_new(
- offsets.finish(),
- Buffer::from_vec(values),
- input.nulls().cloned(),
- )?;
- Ok(Arc::new(binary_array))
}
impl Encoding {
- fn encode_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
- ColumnarValue::Scalar(match self {
- Self::Base64 => ScalarValue::Utf8(value.map(|v|
BASE64_ENGINE.encode(v))),
- Self::Hex => ScalarValue::Utf8(value.map(hex::encode)),
- })
+ fn encode_bytes(self, value: &[u8]) -> String {
+ match self {
+ Self::Base64 => BASE64_ENGINE.encode(value),
+ Self::Hex => hex::encode(value),
+ }
}
- fn encode_large_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
- ColumnarValue::Scalar(match self {
- Self::Base64 => {
- ScalarValue::LargeUtf8(value.map(|v| BASE64_ENGINE.encode(v)))
- }
- Self::Hex => ScalarValue::LargeUtf8(value.map(hex::encode)),
- })
+ fn decode_bytes(self, value: &[u8]) -> Result<Vec<u8>> {
+ match self {
+ Self::Base64 => BASE64_ENGINE.decode(value).map_err(|e| {
+ exec_datafusion_err!("Failed to decode value using base64:
{e}")
+ }),
+ Self::Hex => hex::decode(value).map_err(|e| {
+ exec_datafusion_err!("Failed to decode value using hex: {e}")
+ }),
+ }
}
- fn encode_binary_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
+ // OutputOffset important to ensure Large types output Large arrays
+ fn encode_array<'a, InputBinaryArray, OutputOffset>(
+ self,
+ array: &InputBinaryArray,
+ ) -> Result<ArrayRef>
where
- T: OffsetSizeTrait,
+ InputBinaryArray: BinaryArrayType<'a>,
+ OutputOffset: OffsetSizeTrait,
{
- let input_value = as_generic_binary_array::<T>(value)?;
- let array: ArrayRef = match self {
- Self::Base64 => encode_to_array!(base64_encode, input_value),
- Self::Hex => encode_to_array!(hex_encode, input_value),
- };
- Ok(ColumnarValue::Array(array))
+ match self {
+ Self::Base64 => {
+ let array: GenericStringArray<OutputOffset> = array
+ .iter()
+ .map(|x| x.map(|x| BASE64_ENGINE.encode(x)))
+ .collect();
+ Ok(Arc::new(array))
+ }
+ Self::Hex => {
+ let array: GenericStringArray<OutputOffset> =
+ array.iter().map(|x| x.map(hex::encode)).collect();
+ Ok(Arc::new(array))
+ }
+ }
}
- fn encode_fixed_size_binary_array(self, value: &dyn Array) ->
Result<ColumnarValue> {
- let input_value = as_fixed_size_binary_array(value)?;
- let array: ArrayRef = match self {
- Self::Base64 => encode_to_array!(base64_encode, input_value),
- Self::Hex => encode_to_array!(hex_encode, input_value),
- };
- Ok(ColumnarValue::Array(array))
+ // TODO: refactor this away once
https://github.com/apache/arrow-rs/pull/8993 lands
+ fn encode_fsb_array(self, array: &FixedSizeBinaryArray) ->
Result<ArrayRef> {
+ match self {
+ Self::Base64 => {
+ let array: GenericStringArray<i32> = array
+ .iter()
+ .map(|x| x.map(|x| BASE64_ENGINE.encode(x)))
+ .collect();
+ Ok(Arc::new(array))
+ }
+ Self::Hex => {
+ let array: GenericStringArray<i32> =
+ array.iter().map(|x| x.map(hex::encode)).collect();
+ Ok(Arc::new(array))
+ }
+ }
}
- fn encode_utf8_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
+ // OutputOffset important to ensure Large types output Large arrays
+ fn decode_array<'a, InputBinaryArray, OutputOffset>(
+ self,
+ value: &InputBinaryArray,
+ approx_data_size: usize,
+ ) -> Result<ArrayRef>
where
- T: OffsetSizeTrait,
+ InputBinaryArray: BinaryArrayType<'a>,
+ OutputOffset: OffsetSizeTrait,
{
- let input_value = as_generic_string_array::<T>(value)?;
- let array: ArrayRef = match self {
- Self::Base64 => encode_to_array!(base64_encode, input_value),
- Self::Hex => encode_to_array!(hex_encode, input_value),
- };
- Ok(ColumnarValue::Array(array))
- }
-
- fn decode_scalar(self, value: Option<&[u8]>) -> Result<ColumnarValue> {
- let value = match value {
- Some(value) => value,
- None => return
Ok(ColumnarValue::Scalar(ScalarValue::Binary(None))),
- };
+ fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
+ // only write input / 2 bytes to buf
+ let out_len = input.len() / 2;
+ let buf = &mut buf[..out_len];
+ hex::decode_to_slice(input, buf)
+ .map_err(|e| exec_datafusion_err!("Failed to decode from hex:
{e}"))?;
+ Ok(out_len)
+ }
- let out = match self {
- Self::Base64 => BASE64_ENGINE.decode(value).map_err(|e| {
- internal_datafusion_err!("Failed to decode value using base64:
{e}")
- })?,
- Self::Hex => hex::decode(value).map_err(|e| {
- internal_datafusion_err!("Failed to decode value using hex:
{e}")
- })?,
- };
+ fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
+ BASE64_ENGINE
+ .decode_slice(input, buf)
+ .map_err(|e| exec_datafusion_err!("Failed to decode from
base64: {e}"))
+ }
- Ok(ColumnarValue::Scalar(ScalarValue::Binary(Some(out))))
+ match self {
+ Self::Base64 => {
+ let upper_bound =
base64::decoded_len_estimate(approx_data_size);
+ delegated_decode::<_, _, OutputOffset>(base64_decode, value,
upper_bound)
+ }
+ Self::Hex => {
+ // Calculate the upper bound for decoded byte size
+ // For hex encoding, each pair of hex characters (2 bytes)
represents 1 byte when decoded
+ // So the upper bound is half the length of the input values.
+ let upper_bound = approx_data_size / 2;
+ delegated_decode::<_, _, OutputOffset>(hex_decode, value,
upper_bound)
+ }
+ }
}
- fn decode_large_scalar(self, value: Option<&[u8]>) ->
Result<ColumnarValue> {
- let value = match value {
- Some(value) => value,
- None => return
Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary(None))),
- };
-
- let out = match self {
- Self::Base64 => BASE64_ENGINE.decode(value).map_err(|e| {
- internal_datafusion_err!("Failed to decode value using base64:
{e}")
- })?,
- Self::Hex => hex::decode(value).map_err(|e| {
- internal_datafusion_err!("Failed to decode value using hex:
{e}")
- })?,
- };
-
- Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary(Some(out))))
- }
+ // TODO: refactor this away once
https://github.com/apache/arrow-rs/pull/8993 lands
+ fn decode_fsb_array(
+ self,
+ value: &FixedSizeBinaryArray,
+ approx_data_size: usize,
+ ) -> Result<ArrayRef> {
+ fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
+ // only write input / 2 bytes to buf
+ let out_len = input.len() / 2;
+ let buf = &mut buf[..out_len];
+ hex::decode_to_slice(input, buf)
+ .map_err(|e| exec_datafusion_err!("Failed to decode from hex:
{e}"))?;
+ Ok(out_len)
+ }
- fn decode_binary_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
- where
- T: OffsetSizeTrait,
- {
- let input_value = as_generic_binary_array::<T>(value)?;
- let array = self.decode_byte_array(input_value)?;
- Ok(ColumnarValue::Array(array))
- }
+ fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
+ BASE64_ENGINE
+ .decode_slice(input, buf)
+ .map_err(|e| exec_datafusion_err!("Failed to decode from
base64: {e}"))
+ }
- fn decode_utf8_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
- where
- T: OffsetSizeTrait,
- {
- let input_value = as_generic_string_array::<T>(value)?;
- let array = self.decode_byte_array(input_value)?;
- Ok(ColumnarValue::Array(array))
- }
+ fn delegated_decode<DecodeFunction>(
+ decode: DecodeFunction,
+ input: &FixedSizeBinaryArray,
+ conservative_upper_bound_size: usize,
+ ) -> Result<ArrayRef>
+ where
+ DecodeFunction: Fn(&[u8], &mut [u8]) -> Result<usize>,
+ {
+ let mut values = vec![0; conservative_upper_bound_size];
+ let mut offsets = OffsetBufferBuilder::new(input.len());
+ let mut total_bytes_decoded = 0;
+ for v in input.iter() {
+ if let Some(v) = v {
+ let cursor = &mut values[total_bytes_decoded..];
+ let decoded = decode(v, cursor)?;
+ total_bytes_decoded += decoded;
+ offsets.push_length(decoded);
+ } else {
+ offsets.push_length(0);
+ }
+ }
+ // We reserved an upper bound size for the values buffer, but we
only use the actual size
+ values.truncate(total_bytes_decoded);
+ let binary_array = GenericBinaryArray::<i32>::try_new(
+ offsets.finish(),
+ Buffer::from_vec(values),
+ input.nulls().cloned(),
+ )?;
+ Ok(Arc::new(binary_array))
+ }
- fn decode_byte_array<T: ByteArrayType>(
- &self,
- input_value: &GenericByteArray<T>,
- ) -> Result<ArrayRef> {
match self {
Self::Base64 => {
- let upper_bound =
- base64::decoded_len_estimate(input_value.values().len());
- decode_to_array(base64_decode, input_value, upper_bound)
+ let upper_bound =
base64::decoded_len_estimate(approx_data_size);
+ delegated_decode(base64_decode, value, upper_bound)
}
Self::Hex => {
// Calculate the upper bound for decoded byte size
// For hex encoding, each pair of hex characters (2 bytes)
represents 1 byte when decoded
// So the upper bound is half the length of the input values.
- let upper_bound = input_value.values().len() / 2;
- decode_to_array(hex_decode, input_value, upper_bound)
+ let upper_bound = approx_data_size / 2;
+ delegated_decode(hex_decode, value, upper_bound)
}
}
}
}
-impl fmt::Display for Encoding {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(f, "{}", format!("{self:?}").to_lowercase())
+fn delegated_decode<'a, DecodeFunction, InputBinaryArray, OutputOffset>(
+ decode: DecodeFunction,
+ input: &InputBinaryArray,
+ conservative_upper_bound_size: usize,
+) -> Result<ArrayRef>
+where
+ DecodeFunction: Fn(&[u8], &mut [u8]) -> Result<usize>,
+ InputBinaryArray: BinaryArrayType<'a>,
+ OutputOffset: OffsetSizeTrait,
+{
+ let mut values = vec![0; conservative_upper_bound_size];
+ let mut offsets = OffsetBufferBuilder::new(input.len());
+ let mut total_bytes_decoded = 0;
+ for v in input.iter() {
+ if let Some(v) = v {
+ let cursor = &mut values[total_bytes_decoded..];
+ let decoded = decode(v, cursor)?;
+ total_bytes_decoded += decoded;
+ offsets.push_length(decoded);
+ } else {
+ offsets.push_length(0);
+ }
}
+ // We reserved an upper bound size for the values buffer, but we only use
the actual size
+ values.truncate(total_bytes_decoded);
+ let binary_array = GenericBinaryArray::<OutputOffset>::try_new(
+ offsets.finish(),
+ Buffer::from_vec(values),
+ input.nulls().cloned(),
+ )?;
+ Ok(Arc::new(binary_array))
}
-impl FromStr for Encoding {
- type Err = DataFusionError;
- fn from_str(name: &str) -> Result<Encoding> {
- Ok(match name {
- "base64" => Self::Base64,
- "hex" => Self::Hex,
- _ => {
- let options = [Self::Base64, Self::Hex]
- .iter()
- .map(|i| i.to_string())
- .collect::<Vec<_>>()
- .join(", ");
- return plan_err!(
- "There is no built-in encoding named '{name}', currently
supported encodings are: {options}"
- );
- }
- })
+#[cfg(test)]
+mod tests {
+ use arrow::array::BinaryArray;
+ use arrow_buffer::OffsetBuffer;
+
+ use super::*;
+
+ #[test]
+ fn test_estimate_byte_data_size() {
+ // Offsets starting at 0, but don't count entire data buffer size
+ let array = BinaryArray::new(
+ OffsetBuffer::new(vec![0, 5, 10, 15].into()),
+ vec![0; 100].into(),
+ None,
+ );
+ let size = estimate_byte_data_size(&array);
+ assert_eq!(size, 15);
+
+ // Offsets starting at 0, but don't count entire data buffer size
+ let array = BinaryArray::new(
+ OffsetBuffer::new(vec![50, 51, 51, 60, 80, 81].into()),
+ vec![0; 100].into(),
+ Some(vec![true, false, false, true, true].into()),
+ );
+ let size = estimate_byte_data_size(&array);
+ assert_eq!(size, 31);
}
}
-
-/// Encodes the given data, accepts Binary, LargeBinary, Utf8, Utf8View or
LargeUtf8 and returns a [`ColumnarValue`].
-/// Second argument is the encoding to use.
-/// Standard encodings are base64 and hex.
-fn encode(args: &[ColumnarValue]) -> Result<ColumnarValue> {
- let [expression, format] = take_function_args("encode", args)?;
-
- let encoding = match format {
- ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
- Some(Some(method)) => method.parse::<Encoding>(),
- _ => not_impl_err!(
- "Second argument to encode must be non null constant string:
Encode using dynamically decided method is not yet supported. Got {scalar:?}"
- ),
- },
- ColumnarValue::Array(_) => not_impl_err!(
- "Second argument to encode must be a constant: Encode using
dynamically decided method is not yet supported"
- ),
- }?;
- encode_process(expression, encoding)
-}
-
-/// Decodes the given data, accepts Binary, LargeBinary, Utf8, Utf8View or
LargeUtf8 and returns a [`ColumnarValue`].
-/// Second argument is the encoding to use.
-/// Standard encodings are base64 and hex.
-fn decode(args: &[ColumnarValue]) -> Result<ColumnarValue> {
- let [expression, format] = take_function_args("decode", args)?;
-
- let encoding = match format {
- ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
- Some(Some(method)) => method.parse::<Encoding>(),
- _ => not_impl_err!(
- "Second argument to decode must be a non null constant string:
Decode using dynamically decided method is not yet supported. Got {scalar:?}"
- ),
- },
- ColumnarValue::Array(_) => not_impl_err!(
- "Second argument to decode must be a utf8 constant: Decode using
dynamically decided method is not yet supported"
- ),
- }?;
- decode_process(expression, encoding)
-}
diff --git a/datafusion/sqllogictest/test_files/encoding.slt
b/datafusion/sqllogictest/test_files/encoding.slt
index 57fb76b6c8..f715f8f46a 100644
--- a/datafusion/sqllogictest/test_files/encoding.slt
+++ b/datafusion/sqllogictest/test_files/encoding.slt
@@ -15,6 +15,32 @@
# specific language governing permissions and limitations
# under the License.
+query T
+SELECT encode(arrow_cast('tom', 'Utf8View'),'base64');
+----
+dG9t
+
+query T
+SELECT arrow_cast(decode(arrow_cast('dG9t', 'Utf8View'),'base64'), 'Utf8');
+----
+tom
+
+query T
+SELECT encode(arrow_cast('tom', 'BinaryView'),'base64');
+----
+dG9t
+
+query T
+SELECT arrow_cast(decode(arrow_cast('dG9t', 'BinaryView'),'base64'), 'Utf8');
+----
+tom
+
+# test for hex digest
+query T
+select encode(digest('hello', 'sha256'), 'hex');
+----
+2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824
+
statement ok
CREATE TABLE test(
num INT,
@@ -29,23 +55,29 @@ CREATE TABLE test(
;
# errors
-query error 1st argument should be Utf8 or Binary or Null, got Int64
+query error DataFusion error: Error during planning: Internal error: Expect
TypeSignatureClass::Binary but received NativeType::Int64, DataType: Int64
select encode(12, 'hex');
-query error DataFusion error: Error during planning: There is no built\-in
encoding named 'non_encoding', currently supported encodings are: base64, hex
-select encode(bin_field, 'non_encoding') from test;
-
-query error 1st argument should be Utf8 or Binary or Null, got Int64
+query error DataFusion error: Error during planning: Internal error: Expect
TypeSignatureClass::Binary but received NativeType::Int64, DataType: Int64
select decode(12, 'hex');
query error DataFusion error: Error during planning: There is no built\-in
encoding named 'non_encoding', currently supported encodings are: base64, hex
-select decode(hex_field, 'non_encoding') from test;
+select encode('', 'non_encoding');
+
+query error DataFusion error: Error during planning: There is no built\-in
encoding named 'non_encoding', currently supported encodings are: base64, hex
+select decode('', 'non_encoding');
+
+query error DataFusion error: Execution error: Encoding must be a non-null
string
+select decode('', null) from test;
-query error
+query error DataFusion error: This feature is not implemented: Encoding must
be a scalar; array specified encoding is not yet supported
+select decode('', hex_field) from test;
+
+query error DataFusion error: Error during planning: Failed to coerce
arguments to satisfy a call to 'to_hex' function
select to_hex(hex_field) from test;
-query error
-select arrow_cast(decode(X'8f50d3f60eae370ddbf85c86219c55108a350165',
'base64'), 'Utf8');
+query error DataFusion error: Execution error: Failed to decode value using
base64
+select decode(X'8f50d3f60eae370ddbf85c86219c55108a350165', 'base64');
# Arrays tests
query T
@@ -56,13 +88,20 @@ SELECT encode(bin_field, 'hex') FROM test ORDER BY num;
NULL
8f50d3f60eae370ddbf85c86219c55108a350165
-query T
-SELECT arrow_cast(decode(base64_field, 'base64'), 'Utf8') FROM test ORDER BY
num;
+query TTTTTT
+SELECT
+ arrow_cast(decode(arrow_cast(base64_field, 'Utf8'), 'base64'), 'Utf8'),
+ arrow_cast(decode(arrow_cast(base64_field, 'LargeUtf8'), 'base64'), 'Utf8'),
+ arrow_cast(decode(arrow_cast(base64_field, 'Utf8View'), 'base64'), 'Utf8'),
+ arrow_cast(decode(arrow_cast(base64_field, 'Binary'), 'base64'), 'Utf8'),
+ arrow_cast(decode(arrow_cast(base64_field, 'LargeBinary'), 'base64'),
'Utf8'),
+ arrow_cast(decode(arrow_cast(base64_field, 'BinaryView'), 'base64'), 'Utf8')
+FROM test ORDER BY num;
----
-abc
-qweqw
-NULL
-8f50d3f60eae370ddbf85c86219c55108a350165
+abc abc abc abc abc abc
+qweqw qweqw qweqw qweqw qweqw qweqw
+NULL NULL NULL NULL NULL NULL
+8f50d3f60eae370ddbf85c86219c55108a350165
8f50d3f60eae370ddbf85c86219c55108a350165
8f50d3f60eae370ddbf85c86219c55108a350165
8f50d3f60eae370ddbf85c86219c55108a350165
8f50d3f60eae370ddbf85c86219c55108a350165
8f50d3f60eae370ddbf85c86219c55108a350165
query T
SELECT arrow_cast(decode(hex_field, 'hex'), 'Utf8') FROM test ORDER BY num;
@@ -90,51 +129,77 @@ select decode(encode(bin_field, 'base64'), 'base64') =
X'8f50d3f60eae370ddbf85c8
----
true
-# test for Utf8View support for encode
statement ok
-CREATE TABLE test_source AS VALUES
- ('Andrew', 'X'),
- ('Xiangpeng', 'Xiangpeng'),
- ('Raphael', 'R'),
- (NULL, 'R');
+drop table test
+# test for Utf8View support for encode
statement ok
CREATE TABLE test_utf8view AS
select
arrow_cast(column1, 'Utf8View') AS column1_utf8view,
arrow_cast(column2, 'Utf8View') AS column2_utf8view
-FROM test_source;
+FROM VALUES
+ ('Andrew', 'X'),
+ ('Xiangpeng', 'Xiangpeng'),
+ ('Raphael', 'R'),
+ (NULL, 'R');
-query TTTTTT
+query TTTT
SELECT
- column1_utf8view,
encode(column1_utf8view, 'base64') AS column1_base64,
encode(column1_utf8view, 'hex') AS column1_hex,
- column2_utf8view,
encode(column2_utf8view, 'base64') AS column2_base64,
encode(column2_utf8view, 'hex') AS column2_hex
FROM test_utf8view;
----
-Andrew QW5kcmV3 416e64726577 X WA 58
-Xiangpeng WGlhbmdwZW5n 5869616e6770656e67 Xiangpeng WGlhbmdwZW5n
5869616e6770656e67
-Raphael UmFwaGFlbA 5261706861656c R Ug 52
-NULL NULL NULL R Ug 52
+QW5kcmV3 416e64726577 WA 58
+WGlhbmdwZW5n 5869616e6770656e67 WGlhbmdwZW5n 5869616e6770656e67
+UmFwaGFlbA 5261706861656c Ug 52
+NULL NULL Ug 52
-# test for hex digest
-query T
-select encode(digest('hello', 'sha256'), 'hex');
+query TTTTTT
+SELECT
+ encode(arrow_cast(column1_utf8view, 'Utf8'), 'base64'),
+ encode(arrow_cast(column1_utf8view, 'LargeUtf8'), 'base64'),
+ encode(arrow_cast(column1_utf8view, 'Utf8View'), 'base64'),
+ encode(arrow_cast(column1_utf8view, 'Binary'), 'base64'),
+ encode(arrow_cast(column1_utf8view, 'LargeBinary'), 'base64'),
+ encode(arrow_cast(column1_utf8view, 'BinaryView'), 'base64')
+FROM test_utf8view;
----
-2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824
+QW5kcmV3 QW5kcmV3 QW5kcmV3 QW5kcmV3 QW5kcmV3 QW5kcmV3
+WGlhbmdwZW5n WGlhbmdwZW5n WGlhbmdwZW5n WGlhbmdwZW5n WGlhbmdwZW5n WGlhbmdwZW5n
+UmFwaGFlbA UmFwaGFlbA UmFwaGFlbA UmFwaGFlbA UmFwaGFlbA UmFwaGFlbA
+NULL NULL NULL NULL NULL NULL
-# test for FixedSizeBinary support for encode
+statement ok
+drop table test_utf8view
+
+# FixedSizeBinary support
statement ok
CREATE TABLE test_fsb AS
SELECT arrow_cast(X'0123456789ABCDEF', 'FixedSizeBinary(8)') as fsb_col;
-query TT
+query ??
SELECT
- encode(fsb_col, 'base64') AS fsb_base64,
- encode(fsb_col, 'hex') AS fsb_hex
-FROM test_fsb;
+ decode(encode(arrow_cast(X'0123456789abcdef', 'FixedSizeBinary(8)'),
'base64'), 'base64'),
+ decode(encode(arrow_cast(X'0123456789abcdef', 'FixedSizeBinary(8)'), 'hex'),
'hex');
----
-ASNFZ4mrze8 0123456789abcdef
+0123456789abcdef 0123456789abcdef
+
+query ??
+SELECT
+ decode(encode(column1, 'base64'), 'base64'),
+ decode(encode(column1, 'hex'), 'hex')
+FROM values
+ (arrow_cast(X'0123456789abcdef', 'FixedSizeBinary(8)')),
+ (arrow_cast(X'ffffffffffffffff', 'FixedSizeBinary(8)'));
+----
+0123456789abcdef 0123456789abcdef
+ffffffffffffffff ffffffffffffffff
+
+query error DataFusion error: Execution error: Failed to decode value using
base64
+select decode('invalid', 'base64');
+
+query error DataFusion error: Execution error: Failed to decode value using hex
+select decode('invalid', 'hex');
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]