This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 5a318cd20e Optimize base64/hex decoding by pre-allocating output
buffers (~2x faster) (#12675)
5a318cd20e is described below
commit 5a318cd20e6c6359928cbbc6d84d01f074665ca9
Author: Simon Vandel Sillesen <[email protected]>
AuthorDate: Thu Oct 3 00:04:37 2024 +0200
Optimize base64/hex decoding by pre-allocating output buffers (~2x faster)
(#12675)
* add bench
* replace macro with generic function
* remove duplicated code
* optimize base64/hex decode
---
datafusion/functions/Cargo.toml | 5 ++
datafusion/functions/benches/encoding.rs | 53 +++++++++++++++++
datafusion/functions/src/encoding/inner.rs | 94 ++++++++++++++++++++++--------
3 files changed, 127 insertions(+), 25 deletions(-)
diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml
index ff1b926a9b..a3d114221d 100644
--- a/datafusion/functions/Cargo.toml
+++ b/datafusion/functions/Cargo.toml
@@ -102,6 +102,11 @@ harness = false
name = "to_timestamp"
required-features = ["datetime_expressions"]
+[[bench]]
+harness = false
+name = "encoding"
+required-features = ["encoding_expressions"]
+
[[bench]]
harness = false
name = "regx"
diff --git a/datafusion/functions/benches/encoding.rs
b/datafusion/functions/benches/encoding.rs
new file mode 100644
index 0000000000..d49235aac9
--- /dev/null
+++ b/datafusion/functions/benches/encoding.rs
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+extern crate criterion;
+
+use arrow::util::bench_util::create_string_array_with_len;
+use criterion::{black_box, criterion_group, criterion_main, Criterion};
+use datafusion_expr::ColumnarValue;
+use datafusion_functions::encoding;
+use std::sync::Arc;
+
+fn criterion_benchmark(c: &mut Criterion) {
+ let decode = encoding::decode();
+ for size in [1024, 4096, 8192] {
+ let str_array = Arc::new(create_string_array_with_len::<i32>(size,
0.2, 32));
+ c.bench_function(&format!("base64_decode/{size}"), |b| {
+ let method = ColumnarValue::Scalar("base64".into());
+ let encoded = encoding::encode()
+ .invoke(&[ColumnarValue::Array(str_array.clone()),
method.clone()])
+ .unwrap();
+
+ let args = vec![encoded, method];
+ b.iter(|| black_box(decode.invoke(&args).unwrap()))
+ });
+
+ c.bench_function(&format!("hex_decode/{size}"), |b| {
+ let method = ColumnarValue::Scalar("hex".into());
+ let encoded = encoding::encode()
+ .invoke(&[ColumnarValue::Array(str_array.clone()),
method.clone()])
+ .unwrap();
+
+ let args = vec![encoded, method];
+ b.iter(|| black_box(decode.invoke(&args).unwrap()))
+ });
+ }
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/datafusion/functions/src/encoding/inner.rs
b/datafusion/functions/src/encoding/inner.rs
index 5b80c908cf..2a22e57261 100644
--- a/datafusion/functions/src/encoding/inner.rs
+++ b/datafusion/functions/src/encoding/inner.rs
@@ -18,9 +18,12 @@
//! Encoding expressions
use arrow::{
- array::{Array, ArrayRef, BinaryArray, OffsetSizeTrait, StringArray},
- datatypes::DataType,
+ array::{
+ Array, ArrayRef, BinaryArray, GenericByteArray, OffsetSizeTrait,
StringArray,
+ },
+ datatypes::{ByteArrayType, DataType},
};
+use arrow_buffer::{Buffer, OffsetBufferBuilder};
use base64::{engine::general_purpose, Engine as _};
use datafusion_common::{
cast::{as_generic_binary_array, as_generic_string_array},
@@ -245,16 +248,22 @@ fn base64_encode(input: &[u8]) -> String {
general_purpose::STANDARD_NO_PAD.encode(input)
}
-fn hex_decode(input: &[u8]) -> Result<Vec<u8>> {
- hex::decode(input).map_err(|e| {
+fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
+ // only write input / 2 bytes to buf
+ let out_len = input.len() / 2;
+ let buf = &mut buf[..out_len];
+ hex::decode_to_slice(input, buf).map_err(|e| {
DataFusionError::Internal(format!("Failed to decode from hex: {}", e))
- })
+ })?;
+ Ok(out_len)
}
-fn base64_decode(input: &[u8]) -> Result<Vec<u8>> {
- general_purpose::STANDARD_NO_PAD.decode(input).map_err(|e| {
- DataFusionError::Internal(format!("Failed to decode from base64: {}",
e))
- })
+fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
+ general_purpose::STANDARD_NO_PAD
+ .decode_slice(input, buf)
+ .map_err(|e| {
+ DataFusionError::Internal(format!("Failed to decode from base64:
{}", e))
+ })
}
macro_rules! encode_to_array {
@@ -267,14 +276,35 @@ macro_rules! encode_to_array {
}};
}
-macro_rules! decode_to_array {
- ($METHOD: ident, $INPUT:expr) => {{
- let binary_array: BinaryArray = $INPUT
- .iter()
- .map(|x| x.map(|x| $METHOD(x.as_ref())).transpose())
- .collect::<Result<_>>()?;
- Arc::new(binary_array)
- }};
+fn decode_to_array<F, T: ByteArrayType>(
+ method: F,
+ input: &GenericByteArray<T>,
+ conservative_upper_bound_size: usize,
+) -> Result<ArrayRef>
+where
+ F: Fn(&[u8], &mut [u8]) -> Result<usize>,
+{
+ let mut values = vec![0; conservative_upper_bound_size];
+ let mut offsets = OffsetBufferBuilder::new(input.len());
+ let mut total_bytes_decoded = 0;
+ for v in input {
+ if let Some(v) = v {
+ let cursor = &mut values[total_bytes_decoded..];
+ let decoded = method(v.as_ref(), cursor)?;
+ total_bytes_decoded += decoded;
+ offsets.push_length(decoded);
+ } else {
+ offsets.push_length(0);
+ }
+ }
+ // We reserved an upper bound size for the values buffer, but we only use
the actual size
+ values.truncate(total_bytes_decoded);
+ let binary_array = BinaryArray::try_new(
+ offsets.finish(),
+ Buffer::from_vec(values),
+ input.nulls().cloned(),
+ )?;
+ Ok(Arc::new(binary_array))
}
impl Encoding {
@@ -381,10 +411,7 @@ impl Encoding {
T: OffsetSizeTrait,
{
let input_value = as_generic_binary_array::<T>(value)?;
- let array: ArrayRef = match self {
- Self::Base64 => decode_to_array!(base64_decode, input_value),
- Self::Hex => decode_to_array!(hex_decode, input_value),
- };
+ let array = self.decode_byte_array(input_value)?;
Ok(ColumnarValue::Array(array))
}
@@ -393,12 +420,29 @@ impl Encoding {
T: OffsetSizeTrait,
{
let input_value = as_generic_string_array::<T>(value)?;
- let array: ArrayRef = match self {
- Self::Base64 => decode_to_array!(base64_decode, input_value),
- Self::Hex => decode_to_array!(hex_decode, input_value),
- };
+ let array = self.decode_byte_array(input_value)?;
Ok(ColumnarValue::Array(array))
}
+
+ fn decode_byte_array<T: ByteArrayType>(
+ &self,
+ input_value: &GenericByteArray<T>,
+ ) -> Result<ArrayRef> {
+ match self {
+ Self::Base64 => {
+ let upper_bound =
+ base64::decoded_len_estimate(input_value.values().len());
+ decode_to_array(base64_decode, input_value, upper_bound)
+ }
+ Self::Hex => {
+ // Calculate the upper bound for decoded byte size
+ // For hex encoding, each pair of hex characters (2 bytes)
represents 1 byte when decoded
+ // So the upper bound is half the length of the input values.
+ let upper_bound = input_value.values().len() / 2;
+ decode_to_array(hex_decode, input_value, upper_bound)
+ }
+ }
+ }
}
impl fmt::Display for Encoding {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]