This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 5a318cd20e Optimize base64/hex decoding by pre-allocating output 
buffers (~2x faster) (#12675)
5a318cd20e is described below

commit 5a318cd20e6c6359928cbbc6d84d01f074665ca9
Author: Simon Vandel Sillesen <[email protected]>
AuthorDate: Thu Oct 3 00:04:37 2024 +0200

    Optimize base64/hex decoding by pre-allocating output buffers (~2x faster) 
(#12675)
    
    * add bench
    
    * replace macro with generic function
    
    * remove duplicated code
    
    * optimize base64/hex decode
---
 datafusion/functions/Cargo.toml            |  5 ++
 datafusion/functions/benches/encoding.rs   | 53 +++++++++++++++++
 datafusion/functions/src/encoding/inner.rs | 94 ++++++++++++++++++++++--------
 3 files changed, 127 insertions(+), 25 deletions(-)

diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml
index ff1b926a9b..a3d114221d 100644
--- a/datafusion/functions/Cargo.toml
+++ b/datafusion/functions/Cargo.toml
@@ -102,6 +102,11 @@ harness = false
 name = "to_timestamp"
 required-features = ["datetime_expressions"]
 
+[[bench]]
+harness = false
+name = "encoding"
+required-features = ["encoding_expressions"]
+
 [[bench]]
 harness = false
 name = "regx"
diff --git a/datafusion/functions/benches/encoding.rs 
b/datafusion/functions/benches/encoding.rs
new file mode 100644
index 0000000000..d49235aac9
--- /dev/null
+++ b/datafusion/functions/benches/encoding.rs
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+extern crate criterion;
+
+use arrow::util::bench_util::create_string_array_with_len;
+use criterion::{black_box, criterion_group, criterion_main, Criterion};
+use datafusion_expr::ColumnarValue;
+use datafusion_functions::encoding;
+use std::sync::Arc;
+
+fn criterion_benchmark(c: &mut Criterion) {
+    let decode = encoding::decode();
+    for size in [1024, 4096, 8192] {
+        let str_array = Arc::new(create_string_array_with_len::<i32>(size, 
0.2, 32));
+        c.bench_function(&format!("base64_decode/{size}"), |b| {
+            let method = ColumnarValue::Scalar("base64".into());
+            let encoded = encoding::encode()
+                .invoke(&[ColumnarValue::Array(str_array.clone()), 
method.clone()])
+                .unwrap();
+
+            let args = vec![encoded, method];
+            b.iter(|| black_box(decode.invoke(&args).unwrap()))
+        });
+
+        c.bench_function(&format!("hex_decode/{size}"), |b| {
+            let method = ColumnarValue::Scalar("hex".into());
+            let encoded = encoding::encode()
+                .invoke(&[ColumnarValue::Array(str_array.clone()), 
method.clone()])
+                .unwrap();
+
+            let args = vec![encoded, method];
+            b.iter(|| black_box(decode.invoke(&args).unwrap()))
+        });
+    }
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/datafusion/functions/src/encoding/inner.rs 
b/datafusion/functions/src/encoding/inner.rs
index 5b80c908cf..2a22e57261 100644
--- a/datafusion/functions/src/encoding/inner.rs
+++ b/datafusion/functions/src/encoding/inner.rs
@@ -18,9 +18,12 @@
 //! Encoding expressions
 
 use arrow::{
-    array::{Array, ArrayRef, BinaryArray, OffsetSizeTrait, StringArray},
-    datatypes::DataType,
+    array::{
+        Array, ArrayRef, BinaryArray, GenericByteArray, OffsetSizeTrait, 
StringArray,
+    },
+    datatypes::{ByteArrayType, DataType},
 };
+use arrow_buffer::{Buffer, OffsetBufferBuilder};
 use base64::{engine::general_purpose, Engine as _};
 use datafusion_common::{
     cast::{as_generic_binary_array, as_generic_string_array},
@@ -245,16 +248,22 @@ fn base64_encode(input: &[u8]) -> String {
     general_purpose::STANDARD_NO_PAD.encode(input)
 }
 
-fn hex_decode(input: &[u8]) -> Result<Vec<u8>> {
-    hex::decode(input).map_err(|e| {
+fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
+    // only write input / 2 bytes to buf
+    let out_len = input.len() / 2;
+    let buf = &mut buf[..out_len];
+    hex::decode_to_slice(input, buf).map_err(|e| {
         DataFusionError::Internal(format!("Failed to decode from hex: {}", e))
-    })
+    })?;
+    Ok(out_len)
 }
 
-fn base64_decode(input: &[u8]) -> Result<Vec<u8>> {
-    general_purpose::STANDARD_NO_PAD.decode(input).map_err(|e| {
-        DataFusionError::Internal(format!("Failed to decode from base64: {}", 
e))
-    })
+fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
+    general_purpose::STANDARD_NO_PAD
+        .decode_slice(input, buf)
+        .map_err(|e| {
+            DataFusionError::Internal(format!("Failed to decode from base64: 
{}", e))
+        })
 }
 
 macro_rules! encode_to_array {
@@ -267,14 +276,35 @@ macro_rules! encode_to_array {
     }};
 }
 
-macro_rules! decode_to_array {
-    ($METHOD: ident, $INPUT:expr) => {{
-        let binary_array: BinaryArray = $INPUT
-            .iter()
-            .map(|x| x.map(|x| $METHOD(x.as_ref())).transpose())
-            .collect::<Result<_>>()?;
-        Arc::new(binary_array)
-    }};
+fn decode_to_array<F, T: ByteArrayType>(
+    method: F,
+    input: &GenericByteArray<T>,
+    conservative_upper_bound_size: usize,
+) -> Result<ArrayRef>
+where
+    F: Fn(&[u8], &mut [u8]) -> Result<usize>,
+{
+    let mut values = vec![0; conservative_upper_bound_size];
+    let mut offsets = OffsetBufferBuilder::new(input.len());
+    let mut total_bytes_decoded = 0;
+    for v in input {
+        if let Some(v) = v {
+            let cursor = &mut values[total_bytes_decoded..];
+            let decoded = method(v.as_ref(), cursor)?;
+            total_bytes_decoded += decoded;
+            offsets.push_length(decoded);
+        } else {
+            offsets.push_length(0);
+        }
+    }
+    // We reserved an upper bound size for the values buffer, but we only use 
the actual size
+    values.truncate(total_bytes_decoded);
+    let binary_array = BinaryArray::try_new(
+        offsets.finish(),
+        Buffer::from_vec(values),
+        input.nulls().cloned(),
+    )?;
+    Ok(Arc::new(binary_array))
 }
 
 impl Encoding {
@@ -381,10 +411,7 @@ impl Encoding {
         T: OffsetSizeTrait,
     {
         let input_value = as_generic_binary_array::<T>(value)?;
-        let array: ArrayRef = match self {
-            Self::Base64 => decode_to_array!(base64_decode, input_value),
-            Self::Hex => decode_to_array!(hex_decode, input_value),
-        };
+        let array = self.decode_byte_array(input_value)?;
         Ok(ColumnarValue::Array(array))
     }
 
@@ -393,12 +420,29 @@ impl Encoding {
         T: OffsetSizeTrait,
     {
         let input_value = as_generic_string_array::<T>(value)?;
-        let array: ArrayRef = match self {
-            Self::Base64 => decode_to_array!(base64_decode, input_value),
-            Self::Hex => decode_to_array!(hex_decode, input_value),
-        };
+        let array = self.decode_byte_array(input_value)?;
         Ok(ColumnarValue::Array(array))
     }
+
+    fn decode_byte_array<T: ByteArrayType>(
+        &self,
+        input_value: &GenericByteArray<T>,
+    ) -> Result<ArrayRef> {
+        match self {
+            Self::Base64 => {
+                let upper_bound =
+                    base64::decoded_len_estimate(input_value.values().len());
+                decode_to_array(base64_decode, input_value, upper_bound)
+            }
+            Self::Hex => {
+                // Calculate the upper bound for decoded byte size
+                // For hex encoding, each pair of hex characters (2 bytes) 
represents 1 byte when decoded
+                // So the upper bound is half the length of the input values.
+                let upper_bound = input_value.values().len() / 2;
+                decode_to_array(hex_decode, input_value, upper_bound)
+            }
+        }
+    }
 }
 
 impl fmt::Display for Encoding {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to