This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 341db1d43 feat: rpad support column for second arg instead of just
literal (#2099)
341db1d43 is described below
commit 341db1d4325f07124e466154fa1cb9c5a75c7aa6
Author: B Vadlamani <[email protected]>
AuthorDate: Tue Sep 16 10:46:39 2025 -0700
feat: rpad support column for second arg instead of just literal (#2099)
---
.../char_varchar_utils/read_side_padding.rs | 141 +++++++++++++++------
.../org/apache/comet/CometExpressionSuite.scala | 7 +
2 files changed, 111 insertions(+), 37 deletions(-)
diff --git
a/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs
b/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs
index 6e56d9d86..40735de7a 100644
---
a/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs
+++
b/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs
@@ -18,12 +18,11 @@
use arrow::array::builder::GenericStringBuilder;
use arrow::array::cast::as_dictionary_array;
use arrow::array::types::Int32Type;
-use arrow::array::{make_array, Array, DictionaryArray};
+use arrow::array::{make_array, Array, AsArray, DictionaryArray};
use arrow::array::{ArrayRef, OffsetSizeTrait};
use arrow::datatypes::DataType;
use datafusion::common::{cast::as_generic_string_array, DataFusionError,
ScalarValue};
use datafusion::physical_plan::ColumnarValue;
-use std::fmt::Write;
use std::sync::Arc;
/// Similar to DataFusion `rpad`, but not to truncate when the string is
already longer than length
@@ -43,17 +42,31 @@ fn spark_read_side_padding2(
match args {
[ColumnarValue::Array(array),
ColumnarValue::Scalar(ScalarValue::Int32(Some(length)))] => {
match array.data_type() {
- DataType::Utf8 =>
spark_read_side_padding_internal::<i32>(array, *length, truncate),
- DataType::LargeUtf8 => {
- spark_read_side_padding_internal::<i64>(array, *length,
truncate)
- }
+ DataType::Utf8 => spark_read_side_padding_internal::<i32>(
+ array,
+ truncate,
+ ColumnarValue::Scalar(ScalarValue::Int32(Some(*length))),
+ ),
+ DataType::LargeUtf8 => spark_read_side_padding_internal::<i64>(
+ array,
+ truncate,
+ ColumnarValue::Scalar(ScalarValue::Int32(Some(*length))),
+ ),
// Dictionary support required for SPARK-48498
DataType::Dictionary(_, value_type) => {
let dict = as_dictionary_array::<Int32Type>(array);
let col = if value_type.as_ref() == &DataType::Utf8 {
- spark_read_side_padding_internal::<i32>(dict.values(),
*length, truncate)?
+ spark_read_side_padding_internal::<i32>(
+ dict.values(),
+ truncate,
+
ColumnarValue::Scalar(ScalarValue::Int32(Some(*length))),
+ )?
} else {
- spark_read_side_padding_internal::<i64>(dict.values(),
*length, truncate)?
+ spark_read_side_padding_internal::<i64>(
+ dict.values(),
+ truncate,
+
ColumnarValue::Scalar(ScalarValue::Int32(Some(*length))),
+ )?
};
// col consists of an array, so arg of to_array() is not
used. Can be anything
let values = col.to_array(0)?;
@@ -65,6 +78,21 @@ fn spark_read_side_padding2(
))),
}
}
+ [ColumnarValue::Array(array), ColumnarValue::Array(array_int)] =>
match array.data_type() {
+ DataType::Utf8 => spark_read_side_padding_internal::<i32>(
+ array,
+ truncate,
+ ColumnarValue::Array(Arc::<dyn Array>::clone(array_int)),
+ ),
+ DataType::LargeUtf8 => spark_read_side_padding_internal::<i64>(
+ array,
+ truncate,
+ ColumnarValue::Array(Arc::<dyn Array>::clone(array_int)),
+ ),
+ other => Err(DataFusionError::Internal(format!(
+ "Unsupported data type {other:?} for function
rpad/read_side_padding",
+ ))),
+ },
other => Err(DataFusionError::Internal(format!(
"Unsupported arguments {other:?} for function
rpad/read_side_padding",
))),
@@ -73,42 +101,81 @@ fn spark_read_side_padding2(
fn spark_read_side_padding_internal<T: OffsetSizeTrait>(
array: &ArrayRef,
- length: i32,
truncate: bool,
+ pad_type: ColumnarValue,
) -> Result<ColumnarValue, DataFusionError> {
let string_array = as_generic_string_array::<T>(array)?;
- let length = 0.max(length) as usize;
- let space_string = " ".repeat(length);
+ match pad_type {
+ ColumnarValue::Array(array_int) => {
+ let int_pad_array = array_int.as_primitive::<Int32Type>();
- let mut builder =
- GenericStringBuilder::<T>::with_capacity(string_array.len(),
string_array.len() * length);
+ let mut builder = GenericStringBuilder::<T>::with_capacity(
+ string_array.len(),
+ string_array.len() * int_pad_array.len(),
+ );
- for string in string_array.iter() {
- match string {
- Some(string) => {
- // It looks Spark's UTF8String is closer to chars rather than
graphemes
- // https://stackoverflow.com/a/46290728
- let char_len = string.chars().count();
- if length <= char_len {
- if truncate {
- let idx = string
- .char_indices()
- .nth(length)
- .map(|(i, _)| i)
- .unwrap_or(string.len());
- builder.append_value(&string[..idx]);
- } else {
- builder.append_value(string);
- }
- } else {
- // write_str updates only the value buffer, not null nor
offset buffer
- // This is convenient for concatenating str(s)
- builder.write_str(string)?;
- builder.append_value(&space_string[char_len..]);
+ for (string, length) in string_array.iter().zip(int_pad_array) {
+ match string {
+ Some(string) => builder.append_value(add_padding_string(
+ string.parse().unwrap(),
+ length.unwrap() as usize,
+ truncate,
+ )?),
+ _ => builder.append_null(),
+ }
+ }
+ Ok(ColumnarValue::Array(Arc::new(builder.finish())))
+ }
+ ColumnarValue::Scalar(const_pad_length) => {
+ let length = 0.max(i32::try_from(const_pad_length)?) as usize;
+
+ let mut builder = GenericStringBuilder::<T>::with_capacity(
+ string_array.len(),
+ string_array.len() * length,
+ );
+
+ for string in string_array.iter() {
+ match string {
+ Some(string) => builder.append_value(add_padding_string(
+ string.parse().unwrap(),
+ length,
+ truncate,
+ )?),
+ _ => builder.append_null(),
}
}
- _ => builder.append_null(),
+ Ok(ColumnarValue::Array(Arc::new(builder.finish())))
+ }
+ }
+}
+
+fn add_padding_string(
+ string: String,
+ length: usize,
+ truncate: bool,
+) -> Result<String, DataFusionError> {
+ // It looks Spark's UTF8String is closer to chars rather than graphemes
+ // https://stackoverflow.com/a/46290728
+ let space_string = " ".repeat(length);
+ let char_len = string.chars().count();
+ if length <= char_len {
+ if truncate {
+ let idx = string
+ .char_indices()
+ .nth(length)
+ .map(|(i, _)| i)
+ .unwrap_or(string.len());
+ match string[..idx].parse() {
+ Ok(string) => Ok(string),
+ Err(err) => Err(DataFusionError::Internal(format!(
+ "Failed adding padding string {} error {:}",
+ string, err
+ ))),
+ }
+ } else {
+ Ok(string)
}
+ } else {
+ Ok(string + &space_string[char_len..])
}
- Ok(ColumnarValue::Array(Arc::new(builder.finish())))
}
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 0e1d4fc24..6d7a5de7d 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -407,6 +407,13 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
}
}
+ test("Verify rpad expr support for second arg instead of just literal") {
+ val data = Seq(("IfIWasARoadIWouldBeBent", 10), ("తెలుగు", 2))
+ withParquetTable(data, "t1") {
+ val res = sql("select rpad(_1,_2) , rpad(_1,2) from t1 order by _1")
+ checkSparkAnswerAndOperator(res)
+ }
+ }
test("dictionary arithmetic") {
// TODO: test ANSI mode
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]