alamb commented on a change in pull request #9865:
URL: https://github.com/apache/arrow/pull/9865#discussion_r605945522



##########
File path: rust/datafusion/src/physical_optimizer/coalesce_batches.rs
##########
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! CoalesceBatches optimizer that groups batches together rows
+//! in bigger batches to avoid overhead with small batches
+
+use super::optimizer::PhysicalOptimizerRule;
+use crate::{
+    error::Result,
+    physical_plan::{
+        coalesce_batches::CoalesceBatchesExec, filter::FilterExec,
+        hash_join::HashJoinExec, repartition::RepartitionExec,
+    },
+};
+use std::sync::Arc;
+
+/// Optimizer that introduces CoalesceBatchesExec to avoid overhead with small 
batches
+pub struct CoalesceBatches {}
+
+impl CoalesceBatches {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+impl PhysicalOptimizerRule for CoalesceBatches {
+    fn optimize(
+        &self,
+        plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
+        config: &crate::execution::context::ExecutionConfig,
+    ) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
+        // wrap operators in CoalesceBatches to avoid lots of tiny batches 
when we have
+        // highly selective filters
+        let children = plan
+            .children()
+            .iter()
+            .map(|child| self.optimize(child.clone(), config))
+            .collect::<Result<Vec<_>>>()?;

Review comment:
       I realize you are just moving code around so this comment is outside the 
context of this PR....
   
   However, I wonder if it would be more performant to do the coalescing 
directly in the filter kernel code -- the way coalsce is written today requires 
copying the the (filtered) output into a different (coalesced) array
   
   I think @ritchie46  had some code that allowed incrementally building up 
output in several chunks as part of polars which may be relevant
   
   I think this code is good, but I wanted to plant a seed 🌱 for future 
optimizations

##########
File path: rust/datafusion/src/physical_optimizer/merge_exec.rs
##########
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! CoalesceBatches optimizer that groups batches together rows

Review comment:
       this comment seems like it may need updating

##########
File path: rust/datafusion/src/physical_optimizer/repartition.rs
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Repartition optimizer that introduces repartition nodes to increase the 
level of parallism available
+use std::sync::Arc;
+
+use super::optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::{repartition::RepartitionExec, ExecutionPlan};
+use crate::physical_plan::{Distribution, Partitioning::*};
+use crate::{error::Result, execution::context::ExecutionConfig};
+
+/// Optimizer that introduces repartition to introduce more parallelism in the 
plan
+pub struct Repartition {}
+
+impl Repartition {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+fn optimize_concurrency(
+    concurrency: usize,
+    requires_single_partition: bool,
+    plan: Arc<dyn ExecutionPlan>,
+) -> Result<Arc<dyn ExecutionPlan>> {
+    // Recurse into children bottom-up (added nodes should be as deep as 
possible)
+
+    let new_plan = if plan.children().is_empty() {
+        // leaf node - don't replace children

Review comment:
       it seems like `new_with_children` could handle the case of zero children 
as well, FWIW

##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -103,66 +102,21 @@ impl DefaultPhysicalPlanner {
         Self { extension_planners }
     }
 
-    /// Create a physical plan from a logical plan
+    /// Optimize a physical plan
     fn optimize_plan(
         &self,
         plan: Arc<dyn ExecutionPlan>,
         ctx_state: &ExecutionContextState,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let children = plan
-            .children()
-            .iter()
-            .map(|child| self.optimize_plan(child.clone(), ctx_state))
-            .collect::<Result<Vec<_>>>()?;
-
-        if children.is_empty() {
-            // leaf node, children cannot be replaced
-            Ok(plan.clone())
-        } else {
-            // wrap operators in CoalesceBatches to avoid lots of tiny batches 
when we have
-            // highly selective filters
-            let plan_any = plan.as_any();
-            //TODO we should do this in a more generic way either by wrapping 
all operators
-            // or having an API so that operators can declare when their 
inputs or outputs
-            // need to be wrapped in a coalesce batches operator.
-            // See https://issues.apache.org/jira/browse/ARROW-11068
-            let wrap_in_coalesce = 
plan_any.downcast_ref::<FilterExec>().is_some()
-                || plan_any.downcast_ref::<HashJoinExec>().is_some()
-                || plan_any.downcast_ref::<RepartitionExec>().is_some();
-
-            //TODO we should also do this for HashAggregateExec but we need to 
update tests
-            // as part of this work - see 
https://issues.apache.org/jira/browse/ARROW-11068
-            // || plan_any.downcast_ref::<HashAggregateExec>().is_some();
-
-            let plan = if wrap_in_coalesce {
-                //TODO we should add specific configuration settings for 
coalescing batches and
-                // we should do that once 
https://issues.apache.org/jira/browse/ARROW-11059 is
-                // implemented. For now, we choose half the configured batch 
size to avoid copies
-                // when a small number of rows are removed from a batch
-                let target_batch_size = ctx_state.config.batch_size / 2;
-                Arc::new(CoalesceBatchesExec::new(plan.clone(), 
target_batch_size))
-            } else {
-                plan.clone()
-            };
-
-            let children = plan.children().clone();
-
-            match plan.required_child_distribution() {
-                Distribution::UnspecifiedDistribution => 
plan.with_new_children(children),
-                Distribution::SinglePartition => plan.with_new_children(
-                    children
-                        .iter()
-                        .map(|child| {
-                            if child.output_partitioning().partition_count() 
== 1 {
-                                child.clone()
-                            } else {
-                                Arc::new(MergeExec::new(child.clone()))
-                            }
-                        })
-                        .collect(),
-                ),
-            }
+        let optimizers = &ctx_state.config.physical_optimizers;

Review comment:
       👍 

##########
File path: rust/datafusion/src/physical_optimizer/coalesce_batches.rs
##########
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! CoalesceBatches optimizer that groups batches together rows
+//! in bigger batches to avoid overhead with small batches
+
+use super::optimizer::PhysicalOptimizerRule;
+use crate::{
+    error::Result,
+    physical_plan::{
+        coalesce_batches::CoalesceBatchesExec, filter::FilterExec,
+        hash_join::HashJoinExec, repartition::RepartitionExec,
+    },
+};
+use std::sync::Arc;
+
+/// Optimizer that introduces CoalesceBatchesExec to avoid overhead with small 
batches
+pub struct CoalesceBatches {}
+
+impl CoalesceBatches {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+impl PhysicalOptimizerRule for CoalesceBatches {
+    fn optimize(
+        &self,
+        plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
+        config: &crate::execution::context::ExecutionConfig,
+    ) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
+        // wrap operators in CoalesceBatches to avoid lots of tiny batches 
when we have
+        // highly selective filters
+        let children = plan
+            .children()
+            .iter()
+            .map(|child| self.optimize(child.clone(), config))
+            .collect::<Result<Vec<_>>>()?;
+
+        let plan_any = plan.as_any();
+        //TODO we should do this in a more generic way either by wrapping all 
operators

Review comment:
       what about if we did coalescing at runtime (as in had all operators 
buffer batches until the output exceeded whatever threshold we wanted)? That 
way we wouldn't have a plan time decision that we could get wrong




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to