alamb commented on a change in pull request #9865: URL: https://github.com/apache/arrow/pull/9865#discussion_r605945522
########## File path: rust/datafusion/src/physical_optimizer/coalesce_batches.rs ########## @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! CoalesceBatches optimizer that groups batches together rows +//! in bigger batches to avoid overhead with small batches + +use super::optimizer::PhysicalOptimizerRule; +use crate::{ + error::Result, + physical_plan::{ + coalesce_batches::CoalesceBatchesExec, filter::FilterExec, + hash_join::HashJoinExec, repartition::RepartitionExec, + }, +}; +use std::sync::Arc; + +/// Optimizer that introduces CoalesceBatchesExec to avoid overhead with small batches +pub struct CoalesceBatches {} + +impl CoalesceBatches { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} +impl PhysicalOptimizerRule for CoalesceBatches { + fn optimize( + &self, + plan: Arc<dyn crate::physical_plan::ExecutionPlan>, + config: &crate::execution::context::ExecutionConfig, + ) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> { + // wrap operators in CoalesceBatches to avoid lots of tiny batches when we have + // highly selective filters + let children = plan + .children() + .iter() + .map(|child| self.optimize(child.clone(), config)) + .collect::<Result<Vec<_>>>()?; Review comment: I realize you are just moving code around so this comment is outside the context of this PR.... However, I wonder if it would be more performant to do the coalescing directly in the filter kernel code -- the way coalsce is written today requires copying the the (filtered) output into a different (coalesced) array I think @ritchie46 had some code that allowed incrementally building up output in several chunks as part of polars which may be relevant I think this code is good, but I wanted to plant a seed 🌱 for future optimizations ########## File path: rust/datafusion/src/physical_optimizer/merge_exec.rs ########## @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! CoalesceBatches optimizer that groups batches together rows Review comment: this comment seems like it may need updating ########## File path: rust/datafusion/src/physical_optimizer/repartition.rs ########## @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Repartition optimizer that introduces repartition nodes to increase the level of parallism available +use std::sync::Arc; + +use super::optimizer::PhysicalOptimizerRule; +use crate::physical_plan::{repartition::RepartitionExec, ExecutionPlan}; +use crate::physical_plan::{Distribution, Partitioning::*}; +use crate::{error::Result, execution::context::ExecutionConfig}; + +/// Optimizer that introduces repartition to introduce more parallelism in the plan +pub struct Repartition {} + +impl Repartition { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +fn optimize_concurrency( + concurrency: usize, + requires_single_partition: bool, + plan: Arc<dyn ExecutionPlan>, +) -> Result<Arc<dyn ExecutionPlan>> { + // Recurse into children bottom-up (added nodes should be as deep as possible) + + let new_plan = if plan.children().is_empty() { + // leaf node - don't replace children Review comment: it seems like `new_with_children` could handle the case of zero children as well, FWIW ########## File path: rust/datafusion/src/physical_plan/planner.rs ########## @@ -103,66 +102,21 @@ impl DefaultPhysicalPlanner { Self { extension_planners } } - /// Create a physical plan from a logical plan + /// Optimize a physical plan fn optimize_plan( &self, plan: Arc<dyn ExecutionPlan>, ctx_state: &ExecutionContextState, ) -> Result<Arc<dyn ExecutionPlan>> { - let children = plan - .children() - .iter() - .map(|child| self.optimize_plan(child.clone(), ctx_state)) - .collect::<Result<Vec<_>>>()?; - - if children.is_empty() { - // leaf node, children cannot be replaced - Ok(plan.clone()) - } else { - // wrap operators in CoalesceBatches to avoid lots of tiny batches when we have - // highly selective filters - let plan_any = plan.as_any(); - //TODO we should do this in a more generic way either by wrapping all operators - // or having an API so that operators can declare when their inputs or outputs - // need to be wrapped in a coalesce batches operator. - // See https://issues.apache.org/jira/browse/ARROW-11068 - let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some() - || plan_any.downcast_ref::<HashJoinExec>().is_some() - || plan_any.downcast_ref::<RepartitionExec>().is_some(); - - //TODO we should also do this for HashAggregateExec but we need to update tests - // as part of this work - see https://issues.apache.org/jira/browse/ARROW-11068 - // || plan_any.downcast_ref::<HashAggregateExec>().is_some(); - - let plan = if wrap_in_coalesce { - //TODO we should add specific configuration settings for coalescing batches and - // we should do that once https://issues.apache.org/jira/browse/ARROW-11059 is - // implemented. For now, we choose half the configured batch size to avoid copies - // when a small number of rows are removed from a batch - let target_batch_size = ctx_state.config.batch_size / 2; - Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size)) - } else { - plan.clone() - }; - - let children = plan.children().clone(); - - match plan.required_child_distribution() { - Distribution::UnspecifiedDistribution => plan.with_new_children(children), - Distribution::SinglePartition => plan.with_new_children( - children - .iter() - .map(|child| { - if child.output_partitioning().partition_count() == 1 { - child.clone() - } else { - Arc::new(MergeExec::new(child.clone())) - } - }) - .collect(), - ), - } + let optimizers = &ctx_state.config.physical_optimizers; Review comment: 👍 ########## File path: rust/datafusion/src/physical_optimizer/coalesce_batches.rs ########## @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! CoalesceBatches optimizer that groups batches together rows +//! in bigger batches to avoid overhead with small batches + +use super::optimizer::PhysicalOptimizerRule; +use crate::{ + error::Result, + physical_plan::{ + coalesce_batches::CoalesceBatchesExec, filter::FilterExec, + hash_join::HashJoinExec, repartition::RepartitionExec, + }, +}; +use std::sync::Arc; + +/// Optimizer that introduces CoalesceBatchesExec to avoid overhead with small batches +pub struct CoalesceBatches {} + +impl CoalesceBatches { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} +impl PhysicalOptimizerRule for CoalesceBatches { + fn optimize( + &self, + plan: Arc<dyn crate::physical_plan::ExecutionPlan>, + config: &crate::execution::context::ExecutionConfig, + ) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> { + // wrap operators in CoalesceBatches to avoid lots of tiny batches when we have + // highly selective filters + let children = plan + .children() + .iter() + .map(|child| self.optimize(child.clone(), config)) + .collect::<Result<Vec<_>>>()?; + + let plan_any = plan.as_any(); + //TODO we should do this in a more generic way either by wrapping all operators Review comment: what about if we did coalescing at runtime (as in had all operators buffer batches until the output exceeded whatever threshold we wanted)? That way we wouldn't have a plan time decision that we could get wrong -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
