Dandandan commented on code in PR #16208:
URL: https://github.com/apache/datafusion/pull/16208#discussion_r2115799584
##########
datafusion/physical-plan/src/filter.rs:
##########
@@ -689,6 +702,46 @@ fn filter_and_project(
})
}
+impl FilterExecStream {
+ /// Evaluates the predicate filter on the given batch and appends and rows
that match
+ /// to the in progress output batch builder.
+ fn filter_batch(&mut self, batch: RecordBatch) -> Result<()> {
+ self.predicate
+ .evaluate(&batch)
+ .and_then(|v| v.into_array(batch.num_rows()))
+ .and_then(|filter| {
+ let Some(filter) = filter.as_boolean_opt() else {
+ return internal_err!(
+ "Cannot create filter_array from non-boolean
predicates"
+ );
+ };
+
+ let batch = match self.projection.as_ref() {
+ Some(projection) => {
+ let projected_columns = projection
+ .iter()
+ .map(|i| Arc::clone(batch.column(*i)))
+ .collect();
+ // Safety -- the input was a valid RecordBatch and
thus the projection is too
+ unsafe {
+ RecordBatch::new_unchecked(
+ Arc::clone(&self.schema),
+ projected_columns,
+ batch.num_rows(),
+ )
+ }
+ }
+ None => batch,
+ };
+ let output_batch_builder = self
+ .output_batch_builder
+ .as_mut()
+ .expect("output_batch_builder should be Some");
+ Ok(output_batch_builder.append_filtered(batch, filter)?)
Review Comment:
> space each batch will take up front and can just straight up allocate
Hm yeah with primitive and views this might be okay - we might have to test
other data sources though (with normal binary types were it can't be
preallocated).
##########
datafusion/physical-plan/src/filter.rs:
##########
@@ -689,6 +702,46 @@ fn filter_and_project(
})
}
+impl FilterExecStream {
+ /// Evaluates the predicate filter on the given batch and appends and rows
that match
+ /// to the in progress output batch builder.
+ fn filter_batch(&mut self, batch: RecordBatch) -> Result<()> {
+ self.predicate
+ .evaluate(&batch)
+ .and_then(|v| v.into_array(batch.num_rows()))
+ .and_then(|filter| {
+ let Some(filter) = filter.as_boolean_opt() else {
+ return internal_err!(
+ "Cannot create filter_array from non-boolean
predicates"
+ );
+ };
+
+ let batch = match self.projection.as_ref() {
+ Some(projection) => {
+ let projected_columns = projection
+ .iter()
+ .map(|i| Arc::clone(batch.column(*i)))
+ .collect();
+ // Safety -- the input was a valid RecordBatch and
thus the projection is too
+ unsafe {
+ RecordBatch::new_unchecked(
+ Arc::clone(&self.schema),
+ projected_columns,
+ batch.num_rows(),
+ )
+ }
+ }
+ None => batch,
+ };
+ let output_batch_builder = self
+ .output_batch_builder
+ .as_mut()
+ .expect("output_batch_builder should be Some");
+ Ok(output_batch_builder.append_filtered(batch, filter)?)
Review Comment:
> space each batch will take up front and can just straight up allocate
Hm yeah with primitive and views this might be okay - we might have to test
other data sources though (with normal binary types were it can't be
preallocated).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]