Re: [PR] POC: Reduce `Arc` cloning on hashmap build side [datafusion]

2025-06-20 Thread via GitHub


jonathanc-n closed pull request #16380: POC: Reduce `Arc` cloning on hashmap 
build side
URL: https://github.com/apache/datafusion/pull/16380


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] POC: Reduce `Arc` cloning on hashmap build side [datafusion]

2025-06-12 Thread via GitHub


Dandandan commented on PR #16380:
URL: https://github.com/apache/datafusion/pull/16380#issuecomment-2969139729

   > I've noticed that it is possible for `interleave` to perform worse than 
`take` despite the `Arc` clones from `take`. This happens twice as well for 
`equal_row_arr` and `build_batch_from_indices`.
   
   Yes that will be the tricky part. We gain some speed during probing by 
concatenating the left side (which itself can be slow) into a singe batch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] POC: Reduce `Arc` cloning on hashmap build side [datafusion]

2025-06-12 Thread via GitHub


Dandandan commented on code in PR #16380:
URL: https://github.com/apache/datafusion/pull/16380#discussion_r2142058321


##
datafusion/physical-plan/src/joins/hash_join.rs:
##
@@ -95,9 +96,11 @@ struct JoinLeftData {
 /// The hash table with indices into `batch`
 hash_map: JoinHashMap,
 /// The input rows for the build side
-batch: RecordBatch,
-/// The build side on expressions values
-values: Vec,
+batch: Vec,

Review Comment:
   Yeah this should be it, I think the tricky part of it might be to get it to 
equal performance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] POC: Reduce `Arc` cloning on hashmap build side [datafusion]

2025-06-12 Thread via GitHub


Dandandan commented on code in PR #16380:
URL: https://github.com/apache/datafusion/pull/16380#discussion_r2142059305


##
datafusion/physical-plan/src/joins/hash_join.rs:
##
@@ -991,52 +998,72 @@ async fn collect_left_input(
 
 let mut hashmap = JoinHashMap::with_capacity(num_rows);
 let mut hashes_buffer = Vec::new();
-let mut offset = 0;
 
 // Updating hashmap starting from the last batch
 let batches_iter = batches.iter().rev();
-for batch in batches_iter.clone() {
+let mut batch_indices: Vec<(usize, usize)> = Vec::new();
+let mut batch_row_total = usize::default();
+
+for (batch_id, batch) in batches_iter.enumerate().clone() {
+let batch_num_rows = batch.num_rows();
+
 hashes_buffer.clear();
 hashes_buffer.resize(batch.num_rows(), 0);
 update_hash(
 &on_left,
 batch,
 &mut hashmap,
-offset,
+batch_row_total, // this is our offset
 &random_state,
 &mut hashes_buffer,
 0,
 true,
 )?;
-offset += batch.num_rows();
+
+for row_id in 0..batch_num_rows {

Review Comment:
   this can use `extend` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] POC: Reduce `Arc` cloning on hashmap build side [datafusion]

2025-06-11 Thread via GitHub


jonathanc-n commented on code in PR #16380:
URL: https://github.com/apache/datafusion/pull/16380#discussion_r2141169183


##
datafusion/physical-plan/src/joins/hash_join.rs:
##
@@ -1372,15 +1407,16 @@ pub fn equal_rows_arr(
 // The results are then folded (combined) using the and function to get a 
final equality result.
 equal = iter
 .map(|(left, right)| {
-let arr_left = take(left.as_ref(), indices_left, None)?;
+let left_refs: Vec<&dyn Array> = 
left.iter().map(Arc::as_ref).collect();
+let arr_left = interleave(&left_refs, indices_left)?;
 let arr_right = take(right.as_ref(), indices_right, None)?;
 eq_dyn_null(arr_left.as_ref(), arr_right.as_ref(), 
null_equals_null)
 })
 .try_fold(equal, |acc, equal2| and(&acc, &equal2?))?;
 
 let filter_builder = FilterBuilder::new(&equal).optimize().build();
 
-let left_filtered = filter_builder.filter(indices_left)?;
+let left_filtered = filter_builder.filter(indices_left)?; // annoying

Review Comment:
   ```let mut left_filtered = Vec::with_capacity(equal.len());
   for (i, &pair) in indices_left.iter().enumerate() {
   if equal.value(i) {
   left_filtered.push(pair);
   }
   }
   
   ```
   I currently did this for now, but it defeats the whole point of the mask
  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] POC: Reduce `Arc` cloning on hashmap build side [datafusion]

2025-06-11 Thread via GitHub


jonathanc-n commented on PR #16380:
URL: https://github.com/apache/datafusion/pull/16380#issuecomment-2964423579

   I've noticed that it is possible for `interleave` to perform worse than 
`take` despite the `Arc` clones from `take`. This happens twice as well for 
`equal_row_arr` and `build_batch_from_indices`. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] POC: Reduce `Arc` cloning on hashmap build side [datafusion]

2025-06-11 Thread via GitHub


jonathanc-n commented on code in PR #16380:
URL: https://github.com/apache/datafusion/pull/16380#discussion_r2141169183


##
datafusion/physical-plan/src/joins/hash_join.rs:
##
@@ -1372,15 +1407,16 @@ pub fn equal_rows_arr(
 // The results are then folded (combined) using the and function to get a 
final equality result.
 equal = iter
 .map(|(left, right)| {
-let arr_left = take(left.as_ref(), indices_left, None)?;
+let left_refs: Vec<&dyn Array> = 
left.iter().map(Arc::as_ref).collect();
+let arr_left = interleave(&left_refs, indices_left)?;
 let arr_right = take(right.as_ref(), indices_right, None)?;
 eq_dyn_null(arr_left.as_ref(), arr_right.as_ref(), 
null_equals_null)
 })
 .try_fold(equal, |acc, equal2| and(&acc, &equal2?))?;
 
 let filter_builder = FilterBuilder::new(&equal).optimize().build();
 
-let left_filtered = filter_builder.filter(indices_left)?;
+let left_filtered = filter_builder.filter(indices_left)?; // annoying

Review Comment:
   ```let mut left_filtered = Vec::with_capacity(equal.len());
   for (i, &pair) in indices_left.iter().enumerate() {
   if equal.value(i) {
   left_filtered.push(pair);
   }
   }```
   I currently did this for now, but it defeats the whole point of the mask
  



##
datafusion/physical-plan/src/joins/hash_join.rs:
##
@@ -1372,15 +1407,16 @@ pub fn equal_rows_arr(
 // The results are then folded (combined) using the and function to get a 
final equality result.
 equal = iter
 .map(|(left, right)| {
-let arr_left = take(left.as_ref(), indices_left, None)?;
+let left_refs: Vec<&dyn Array> = 
left.iter().map(Arc::as_ref).collect();
+let arr_left = interleave(&left_refs, indices_left)?;
 let arr_right = take(right.as_ref(), indices_right, None)?;
 eq_dyn_null(arr_left.as_ref(), arr_right.as_ref(), 
null_equals_null)
 })
 .try_fold(equal, |acc, equal2| and(&acc, &equal2?))?;
 
 let filter_builder = FilterBuilder::new(&equal).optimize().build();
 
-let left_filtered = filter_builder.filter(indices_left)?;
+let left_filtered = filter_builder.filter(indices_left)?; // annoying

Review Comment:
   ```let mut left_filtered = Vec::with_capacity(equal.len());
   for (i, &pair) in indices_left.iter().enumerate() {
   if equal.value(i) {
   left_filtered.push(pair);
   }
   }
   
   ```
   I currently did this for now, but it defeats the whole point of the mask
  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] POC: Reduce `Arc` cloning on hashmap build side [datafusion]

2025-06-11 Thread via GitHub


jonathanc-n commented on code in PR #16380:
URL: https://github.com/apache/datafusion/pull/16380#discussion_r2141135830


##
datafusion/physical-plan/src/joins/hash_join.rs:
##
@@ -95,9 +96,11 @@ struct JoinLeftData {
 /// The hash table with indices into `batch`
 hash_map: JoinHashMap,
 /// The input rows for the build side
-batch: RecordBatch,
-/// The build side on expressions values
-values: Vec,
+batch: Vec,

Review Comment:
   @Dandandan Does this look like what you were looking for



##
datafusion/physical-plan/src/joins/hash_join.rs:
##
@@ -1372,15 +1407,16 @@ pub fn equal_rows_arr(
 // The results are then folded (combined) using the and function to get a 
final equality result.
 equal = iter
 .map(|(left, right)| {
-let arr_left = take(left.as_ref(), indices_left, None)?;
+let left_refs: Vec<&dyn Array> = 
left.iter().map(Arc::as_ref).collect();
+let arr_left = interleave(&left_refs, indices_left)?;
 let arr_right = take(right.as_ref(), indices_right, None)?;
 eq_dyn_null(arr_left.as_ref(), arr_right.as_ref(), 
null_equals_null)
 })
 .try_fold(equal, |acc, equal2| and(&acc, &equal2?))?;
 
 let filter_builder = FilterBuilder::new(&equal).optimize().build();
 
-let left_filtered = filter_builder.filter(indices_left)?;
+let left_filtered = filter_builder.filter(indices_left)?; // annoying

Review Comment:
   Currently this is a bit annoying, do you know how we can filter on this left 
indice (Vec<(usize, usize)>)@Dandandan 



##
datafusion/physical-plan/src/joins/utils.rs:
##
@@ -850,6 +850,43 @@ pub(crate) fn apply_join_filter_to_indices(
 ))
 }
 
+
+pub(crate) fn apply_join_filter_to_hash_indices(

Review Comment:
   This is temporary new functions, just so I can test it out first by 
isolating it for hash joins. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]