alamb commented on code in PR #18630:
URL: https://github.com/apache/datafusion/pull/18630#discussion_r2524630048
##########
datafusion/physical-plan/src/filter.rs:
##########
@@ -409,8 +423,11 @@ impl ExecutionPlan for FilterExec {
input: self.input.execute(partition, context)?,
metrics,
projection: self.projection.clone(),
- batch_coalescer: BatchCoalescer::new(self.schema(),
self.batch_size)
- .with_biggest_coalesce_batch_size(Some(self.batch_size / 2)),
+ batch_coalescer: LimitedBatchCoalescer::new(
+ self.schema(),
Review Comment:
it is not something you did with this PR, but it is confusing that the
```
.with_biggest_coalesce_batch_size(Some(self.batch_size / 2))
```
Is called as part of the LimitedBatchCoalescer constructor rather than inline
I see this is because `LimitedBatchCoalescer` is part of DataFusion
I worry about people forgetting to set this in other callsites -- maybe we
should make a DataFusion wrapper for BatchCoalescer 🤔 that sets this
appropriately
##########
datafusion/physical-plan/src/filter.rs:
##########
@@ -729,11 +777,11 @@ impl Stream for FilterExecStream {
}).and_then(|(array, batch)| {
match as_boolean_array(&array) {
Ok(filter_array) => {
-
self.metrics.selectivity.add_part(filter_array.true_count());
self.metrics.selectivity.add_total(batch.num_rows());
-
-
self.batch_coalescer.push_batch_with_filter(batch.clone(), filter_array)?;
- Ok(())
+ // TODO: support push_batch_with_filter in
LimitedBatchCoalescer
Review Comment:
this is a good todo -- it would be good to file as a follow on ticket / PR
perhaps
I can do it if you like
##########
datafusion/physical-optimizer/src/coalesce_batches.rs:
##########
@@ -60,8 +60,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
// wrap those ones with a CoalesceBatchesExec operator. An
alternate approach here
// would be to build the coalescing logic directly into the
operators
// See https://github.com/apache/datafusion/issues/139
Review Comment:
This comment could probably be removed now (the ticket is long since closed)
##########
datafusion/physical-optimizer/src/coalesce_batches.rs:
##########
@@ -60,8 +60,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
// wrap those ones with a CoalesceBatchesExec operator. An
alternate approach here
// would be to build the coalescing logic directly into the
operators
// See https://github.com/apache/datafusion/issues/139
- let wrap_in_coalesce =
plan_any.downcast_ref::<FilterExec>().is_some()
- || plan_any.downcast_ref::<HashJoinExec>().is_some()
+ let wrap_in_coalesce =
plan_any.downcast_ref::<HashJoinExec>().is_some()
Review Comment:
It looks like we only have `HashJoinExec` and `RepartitionExec` to update
before we could remove `CoalesceBatches` entirely 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]