neilconway commented on code in PR #22390:
URL: https://github.com/apache/datafusion/pull/22390#discussion_r3275421250
##########
datafusion/functions-nested/src/remove.rs:
##########
@@ -468,6 +531,98 @@ fn general_remove<OffsetSize: OffsetSizeTrait>(
)?))
}
+/// For each element of `list_array[i]`, removed up to `arr_n[i]` occurrences
+/// of `needle[0]` (scalar element broadcasted).
+///
+/// This is a specialized version of `general_remove` for scalar elements that
+/// uses bulk comparison for better performance.
+fn general_remove_with_scalar<OffsetSize: OffsetSizeTrait>(
+ list_array: &GenericListArray<OffsetSize>,
+ needle: &ArrayRef,
+ arr_n: &[i64],
+) -> Result<ArrayRef> {
+ let list_field = match list_array.data_type() {
+ DataType::List(field) | DataType::LargeList(field) => field,
+ _ => {
+ return exec_err!(
+ "Expected List or LargeList data type, got {:?}",
+ list_array.data_type()
+ );
+ }
+ };
+ let original_data = list_array.values().to_data();
Review Comment:
This will be inefficient for sliced arrays.
##########
datafusion/functions-nested/src/remove.rs:
##########
@@ -468,6 +531,98 @@ fn general_remove<OffsetSize: OffsetSizeTrait>(
)?))
}
+/// For each element of `list_array[i]`, removed up to `arr_n[i]` occurrences
+/// of `needle[0]` (scalar element broadcasted).
+///
+/// This is a specialized version of `general_remove` for scalar elements that
+/// uses bulk comparison for better performance.
+fn general_remove_with_scalar<OffsetSize: OffsetSizeTrait>(
+ list_array: &GenericListArray<OffsetSize>,
+ needle: &ArrayRef,
+ arr_n: &[i64],
+) -> Result<ArrayRef> {
+ let list_field = match list_array.data_type() {
+ DataType::List(field) | DataType::LargeList(field) => field,
+ _ => {
+ return exec_err!(
+ "Expected List or LargeList data type, got {:?}",
+ list_array.data_type()
+ );
+ }
+ };
+ let original_data = list_array.values().to_data();
+ let mut offsets = Vec::<OffsetSize>::with_capacity(list_array.len() + 1);
+ offsets.push(OffsetSize::zero());
+
+ let mut mutable = MutableArrayData::with_capacities(
+ vec![&original_data],
+ false,
+ Capacities::Array(original_data.len()),
+ );
+ let nulls = list_array.nulls().cloned();
+ let keep_mask =
+ arrow_ord::cmp::distinct(list_array.values(),
&Scalar::new(Arc::clone(needle)))?;
+
+ for (row_index, offset_window) in
list_array.offsets().windows(2).enumerate() {
+ if nulls.as_ref().is_some_and(|nulls| nulls.is_null(row_index)) {
+ offsets.push(offsets[row_index]);
+ continue;
+ }
+
+ let start = offset_window[0].to_usize().unwrap();
+ let end = offset_window[1].to_usize().unwrap();
+
+ let n = arr_n[row_index];
+
+ if n <= 0 {
+ mutable.extend(0, start, end);
+ offsets.push(offsets[row_index] + OffsetSize::usize_as(end -
start));
+ continue;
+ }
+
+ let eq_array = keep_mask.slice(start, end - start);
+ let num_to_remove = eq_array.false_count();
+
+ if num_to_remove == 0 {
+ mutable.extend(0, start, end);
+ offsets.push(offsets[row_index] + OffsetSize::usize_as(end -
start));
+ continue;
+ }
+
+ let max_removals = n.min(num_to_remove as i64);
+ let mut removed = 0i64;
+ let mut copied = 0usize;
+ let mut pending_batch_to_retain: Option<usize> = None;
+ for (i, keep) in eq_array.iter().enumerate() {
+ if keep == Some(false) && removed < max_removals {
+ if let Some(bs) = pending_batch_to_retain {
+ mutable.extend(0, start + bs, start + i);
+ copied += i - bs;
+ pending_batch_to_retain = None;
+ }
+ removed += 1;
+ } else if pending_batch_to_retain.is_none() {
+ pending_batch_to_retain = Some(i);
+ }
+ }
Review Comment:
I wonder if it would be possible to iterate only over the "false" bits,
e.g., by negating the buffer and looking at `BooleanBuffer::set_indices`.
##########
datafusion/functions-nested/src/remove.rs:
##########
@@ -357,6 +381,45 @@ fn array_remove_internal(
}
}
+/// Dispatches scalar-needle array removal by list offset type.
+///
+/// `needle` must be a length-1 array containing the scalar element to remove.
+fn array_remove_dispatch_scalar(
+ array: &ArrayRef,
+ needle: &ArrayRef,
+ arr_n: &[i64],
+) -> Result<ArrayRef> {
+ match array.data_type() {
+ DataType::List(_) => {
+ let list_array = array.as_list::<i32>();
+ general_remove_with_scalar::<i32>(list_array, needle, arr_n)
+ }
+ DataType::LargeList(_) => {
+ let list_array = array.as_list::<i64>();
+ general_remove_with_scalar::<i64>(list_array, needle, arr_n)
+ }
+ array_type => exec_err!("array_remove does not support type
'{array_type}'."),
Review Comment:
This is called by more than just `array_remove`; can we improve the error
message?
##########
datafusion/functions-nested/src/remove.rs:
##########
@@ -468,6 +531,98 @@ fn general_remove<OffsetSize: OffsetSizeTrait>(
)?))
}
+/// For each element of `list_array[i]`, removed up to `arr_n[i]` occurrences
+/// of `needle[0]` (scalar element broadcasted).
+///
+/// This is a specialized version of `general_remove` for scalar elements that
+/// uses bulk comparison for better performance.
+fn general_remove_with_scalar<OffsetSize: OffsetSizeTrait>(
+ list_array: &GenericListArray<OffsetSize>,
+ needle: &ArrayRef,
+ arr_n: &[i64],
+) -> Result<ArrayRef> {
+ let list_field = match list_array.data_type() {
+ DataType::List(field) | DataType::LargeList(field) => field,
+ _ => {
+ return exec_err!(
+ "Expected List or LargeList data type, got {:?}",
+ list_array.data_type()
+ );
+ }
+ };
+ let original_data = list_array.values().to_data();
+ let mut offsets = Vec::<OffsetSize>::with_capacity(list_array.len() + 1);
+ offsets.push(OffsetSize::zero());
+
+ let mut mutable = MutableArrayData::with_capacities(
+ vec![&original_data],
+ false,
+ Capacities::Array(original_data.len()),
+ );
+ let nulls = list_array.nulls().cloned();
+ let keep_mask =
+ arrow_ord::cmp::distinct(list_array.values(),
&Scalar::new(Arc::clone(needle)))?;
+
+ for (row_index, offset_window) in
list_array.offsets().windows(2).enumerate() {
+ if nulls.as_ref().is_some_and(|nulls| nulls.is_null(row_index)) {
+ offsets.push(offsets[row_index]);
+ continue;
+ }
+
+ let start = offset_window[0].to_usize().unwrap();
+ let end = offset_window[1].to_usize().unwrap();
+
+ let n = arr_n[row_index];
+
+ if n <= 0 {
+ mutable.extend(0, start, end);
+ offsets.push(offsets[row_index] + OffsetSize::usize_as(end -
start));
+ continue;
+ }
+
+ let eq_array = keep_mask.slice(start, end - start);
+ let num_to_remove = eq_array.false_count();
+
+ if num_to_remove == 0 {
+ mutable.extend(0, start, end);
+ offsets.push(offsets[row_index] + OffsetSize::usize_as(end -
start));
+ continue;
+ }
+
+ let max_removals = n.min(num_to_remove as i64);
+ let mut removed = 0i64;
+ let mut copied = 0usize;
+ let mut pending_batch_to_retain: Option<usize> = None;
+ for (i, keep) in eq_array.iter().enumerate() {
+ if keep == Some(false) && removed < max_removals {
Review Comment:
Can we break from the loop once we hit `max_removals`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]