mbutrovich commented on code in PR #4778:
URL: https://github.com/apache/datafusion-comet/pull/4778#discussion_r3506555504


##########
native/spark-expr/src/string_funcs/base64.rs:
##########
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::array::{Array, GenericBinaryArray, OffsetSizeTrait, StringArray};
+use arrow::datatypes::DataType;
+use base64::prelude::BASE64_STANDARD;
+use base64::Engine;
+use datafusion::common::{exec_err, DataFusionError, ScalarValue};
+use datafusion::physical_plan::ColumnarValue;
+
+/// Spark `base64(bin)`: encodes a binary value as a padded base64 string.
+///
+/// The second argument is a boolean `chunk` flag mirroring Spark's
+/// `spark.sql.chunkBase64String.enabled`. When `chunk` is true (Spark's 
default, and the only
+/// behavior on Spark 3.4), the output matches 
`java.util.Base64.getMimeEncoder()`: lines of at most
+/// 76 characters joined by a CRLF (`\r\n`), with no trailing separator. When 
false, the output is a
+/// single unwrapped line, matching `java.util.Base64.getMimeEncoder(-1, [])`.
+pub fn spark_base64(args: &[ColumnarValue]) -> Result<ColumnarValue, 
DataFusionError> {
+    if args.len() != 2 {
+        return exec_err!("base64 expects exactly two arguments, got {}", 
args.len());
+    }
+    let chunk = match &args[1] {
+        ColumnarValue::Scalar(ScalarValue::Boolean(Some(chunk))) => *chunk,
+        other => return exec_err!("base64 expects a boolean chunk flag, got 
{other:?}"),
+    };
+    match &args[0] {
+        ColumnarValue::Array(array) => match array.data_type() {
+            DataType::Binary => Ok(ColumnarValue::Array(Arc::new(encode_array(
+                array
+                    .as_any()

Review Comment:
   `array.as_any().downcast_ref::<GenericBinaryArray<i32>>().unwrap()` can be 
`array.as_binary::<i32>()` via the `AsArray` trait, which is a bit cleaner.



##########
native/spark-expr/src/string_funcs/base64.rs:
##########
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::array::{Array, GenericBinaryArray, OffsetSizeTrait, StringArray};
+use arrow::datatypes::DataType;
+use base64::prelude::BASE64_STANDARD;
+use base64::Engine;
+use datafusion::common::{exec_err, DataFusionError, ScalarValue};
+use datafusion::physical_plan::ColumnarValue;
+
+/// Spark `base64(bin)`: encodes a binary value as a padded base64 string.
+///
+/// The second argument is a boolean `chunk` flag mirroring Spark's
+/// `spark.sql.chunkBase64String.enabled`. When `chunk` is true (Spark's 
default, and the only
+/// behavior on Spark 3.4), the output matches 
`java.util.Base64.getMimeEncoder()`: lines of at most
+/// 76 characters joined by a CRLF (`\r\n`), with no trailing separator. When 
false, the output is a
+/// single unwrapped line, matching `java.util.Base64.getMimeEncoder(-1, [])`.
+pub fn spark_base64(args: &[ColumnarValue]) -> Result<ColumnarValue, 
DataFusionError> {
+    if args.len() != 2 {
+        return exec_err!("base64 expects exactly two arguments, got {}", 
args.len());
+    }
+    let chunk = match &args[1] {
+        ColumnarValue::Scalar(ScalarValue::Boolean(Some(chunk))) => *chunk,
+        other => return exec_err!("base64 expects a boolean chunk flag, got 
{other:?}"),
+    };
+    match &args[0] {
+        ColumnarValue::Array(array) => match array.data_type() {
+            DataType::Binary => Ok(ColumnarValue::Array(Arc::new(encode_array(
+                array
+                    .as_any()
+                    .downcast_ref::<GenericBinaryArray<i32>>()
+                    .unwrap(),
+                chunk,
+            )))),
+            DataType::LargeBinary => 
Ok(ColumnarValue::Array(Arc::new(encode_array(
+                array
+                    .as_any()
+                    .downcast_ref::<GenericBinaryArray<i64>>()
+                    .unwrap(),
+                chunk,
+            )))),

Review Comment:
   The array match handles `Binary` and `LargeBinary` and falls through to 
`exec_err!` for anything else. Since the child is always `cast(... AS binary)` 
this is normally a plain `Binary` array, so I expect this is fine. Do we ever 
anticipate a dictionary-encoded or `BinaryView` array reaching here from a 
scan? If so it would surface as a runtime error rather than a graceful 
fallback. Not a panic, since the match is on `data_type()` and the `unwrap()` 
downcasts at lines 47 and 54 are guarded by the arms above them.



##########
native/spark-expr/src/string_funcs/base64.rs:
##########
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::array::{Array, GenericBinaryArray, OffsetSizeTrait, StringArray};
+use arrow::datatypes::DataType;
+use base64::prelude::BASE64_STANDARD;
+use base64::Engine;
+use datafusion::common::{exec_err, DataFusionError, ScalarValue};
+use datafusion::physical_plan::ColumnarValue;
+
+/// Spark `base64(bin)`: encodes a binary value as a padded base64 string.
+///
+/// The second argument is a boolean `chunk` flag mirroring Spark's
+/// `spark.sql.chunkBase64String.enabled`. When `chunk` is true (Spark's 
default, and the only
+/// behavior on Spark 3.4), the output matches 
`java.util.Base64.getMimeEncoder()`: lines of at most
+/// 76 characters joined by a CRLF (`\r\n`), with no trailing separator. When 
false, the output is a
+/// single unwrapped line, matching `java.util.Base64.getMimeEncoder(-1, [])`.
+pub fn spark_base64(args: &[ColumnarValue]) -> Result<ColumnarValue, 
DataFusionError> {
+    if args.len() != 2 {
+        return exec_err!("base64 expects exactly two arguments, got {}", 
args.len());
+    }
+    let chunk = match &args[1] {
+        ColumnarValue::Scalar(ScalarValue::Boolean(Some(chunk))) => *chunk,
+        other => return exec_err!("base64 expects a boolean chunk flag, got 
{other:?}"),
+    };
+    match &args[0] {
+        ColumnarValue::Array(array) => match array.data_type() {
+            DataType::Binary => Ok(ColumnarValue::Array(Arc::new(encode_array(
+                array
+                    .as_any()
+                    .downcast_ref::<GenericBinaryArray<i32>>()
+                    .unwrap(),
+                chunk,
+            )))),
+            DataType::LargeBinary => 
Ok(ColumnarValue::Array(Arc::new(encode_array(
+                array
+                    .as_any()

Review Comment:
   Same `array.as_any().downcast_ref::<GenericBinaryArray<i32>>().unwrap()` 
comment.



##########
spark/src/test/resources/sql-tests/expressions/string/base64.sql:
##########
@@ -0,0 +1,40 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+statement
+CREATE TABLE test_base64(s string) USING parquet

Review Comment:
   `spark.sql.chunkBase64String.enabled` defaults to `true` (added in 3.5.2), 
and Spark 3.4 always chunks with no config at all. So no Spark version reaches 
the `chunk = false` branch under default configs, which means the `chunkBase64` 
passthrough in `CometBase64StaticInvoke` only has Rust unit coverage, not a 
Spark-vs-Comet comparison.
   
   `CometSqlFileTestSuite` supports a per-file `-- Config:` header (see 
`decimal_div_ansi.sql`), so this is easy to cover with a sibling file, for 
example `base64_unchunked.sql`:
   
   ```
   -- Config: spark.sql.chunkBase64String.enabled=false
   ```
   
   with an input long enough to wrap. On 3.4 the config is ignored and both 
engines still chunk, so the file is harmless there and meaningfully exercises 
the unwrapped path on 3.5+.



##########
native/spark-expr/src/string_funcs/base64.rs:
##########
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::array::{Array, GenericBinaryArray, OffsetSizeTrait, StringArray};
+use arrow::datatypes::DataType;
+use base64::prelude::BASE64_STANDARD;
+use base64::Engine;
+use datafusion::common::{exec_err, DataFusionError, ScalarValue};
+use datafusion::physical_plan::ColumnarValue;
+
+/// Spark `base64(bin)`: encodes a binary value as a padded base64 string.
+///
+/// The second argument is a boolean `chunk` flag mirroring Spark's
+/// `spark.sql.chunkBase64String.enabled`. When `chunk` is true (Spark's 
default, and the only
+/// behavior on Spark 3.4), the output matches 
`java.util.Base64.getMimeEncoder()`: lines of at most
+/// 76 characters joined by a CRLF (`\r\n`), with no trailing separator. When 
false, the output is a
+/// single unwrapped line, matching `java.util.Base64.getMimeEncoder(-1, [])`.
+pub fn spark_base64(args: &[ColumnarValue]) -> Result<ColumnarValue, 
DataFusionError> {
+    if args.len() != 2 {
+        return exec_err!("base64 expects exactly two arguments, got {}", 
args.len());
+    }
+    let chunk = match &args[1] {
+        ColumnarValue::Scalar(ScalarValue::Boolean(Some(chunk))) => *chunk,
+        other => return exec_err!("base64 expects a boolean chunk flag, got 
{other:?}"),
+    };
+    match &args[0] {
+        ColumnarValue::Array(array) => match array.data_type() {
+            DataType::Binary => Ok(ColumnarValue::Array(Arc::new(encode_array(
+                array
+                    .as_any()
+                    .downcast_ref::<GenericBinaryArray<i32>>()
+                    .unwrap(),
+                chunk,
+            )))),
+            DataType::LargeBinary => 
Ok(ColumnarValue::Array(Arc::new(encode_array(
+                array
+                    .as_any()
+                    .downcast_ref::<GenericBinaryArray<i64>>()
+                    .unwrap(),
+                chunk,
+            )))),
+            other => exec_err!("base64 expects a binary argument, got 
{other}"),
+        },
+        ColumnarValue::Scalar(ScalarValue::Binary(value))
+        | ColumnarValue::Scalar(ScalarValue::LargeBinary(value)) => {
+            let encoded = value.as_ref().map(|bytes| encode(bytes, chunk));
+            Ok(ColumnarValue::Scalar(ScalarValue::Utf8(encoded)))
+        }
+        ColumnarValue::Scalar(other) => {
+            exec_err!("base64 expects a binary argument, got {other}")
+        }
+    }
+}
+
+fn encode_array<O: OffsetSizeTrait>(array: &GenericBinaryArray<O>, chunk: 
bool) -> StringArray {
+    array
+        .iter()
+        .map(|value| value.map(|bytes| encode(bytes, chunk)))
+        .collect()
+}
+
+fn encode(bytes: &[u8], chunk: bool) -> String {
+    let encoded = BASE64_STANDARD.encode(bytes);
+    if chunk {
+        chunk_into_lines(&encoded)
+    } else {
+        encoded
+    }
+}
+
+/// Wrap a base64 string into lines of at most 76 characters joined by CRLF, 
with no trailing
+/// separator. Matches `java.util.Base64.getMimeEncoder()`. base64 output is 
pure ASCII, so byte
+/// offsets and character offsets coincide.
+fn chunk_into_lines(encoded: &str) -> String {
+    const LINE_LEN: usize = 76;
+    if encoded.len() <= LINE_LEN {
+        return encoded.to_string();

Review Comment:
   With `chunk = true` (Spark's default) and an input of 76 base64 chars or 
fewer, which is the common case, `encode` builds `encoded` at line 79, then 
`chunk_into_lines(&encoded)` at line 81 hits the short-string branch and does 
`return encoded.to_string()` at line 93. That allocates and copies a second 
`String` from the one we just built, then drops the original. Passing the 
`String` in by value avoids the extra allocation:
   
   ```rust
   fn encode(bytes: &[u8], chunk: bool) -> String {
       let encoded = BASE64_STANDARD.encode(bytes);
       if chunk { chunk_into_lines(encoded) } else { encoded }
   }
   
   fn chunk_into_lines(encoded: String) -> String {
       const LINE_LEN: usize = 76;
       if encoded.len() <= LINE_LEN {
           return encoded; // no copy
       }
       // build the wrapped output as today
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to