This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new a690e9d6 perf: Remove one redundant CopyExec for SMJ (#962)
a690e9d6 is described below

commit a690e9d66c47ec431c73c65909154c1fc0f7c462
Author: Andy Grove <[email protected]>
AuthorDate: Fri Sep 27 06:09:29 2024 -0600

    perf: Remove one redundant CopyExec for SMJ (#962)
---
 native/core/src/execution/datafusion/planner.rs | 50 +++++++++++++------------
 native/core/src/execution/operators/copy.rs     |  2 +
 2 files changed, 29 insertions(+), 23 deletions(-)

diff --git a/native/core/src/execution/datafusion/planner.rs 
b/native/core/src/execution/datafusion/planner.rs
index 9000db61..ac82d7d0 100644
--- a/native/core/src/execution/datafusion/planner.rs
+++ b/native/core/src/execution/datafusion/planner.rs
@@ -917,15 +917,15 @@ impl PhysicalPlanner {
 
                 let fetch = sort.fetch.map(|num| num as usize);
 
-                let copy_exec = if can_reuse_input_batch(&child) {
-                    Arc::new(CopyExec::new(child, CopyMode::UnpackOrDeepCopy))
-                } else {
-                    Arc::new(CopyExec::new(child, CopyMode::UnpackOrClone))
-                };
+                // SortExec caches batches so we need to make a copy of 
incoming batches. Also,
+                // SortExec fails in some cases if we do not unpack 
dictionary-encoded arrays, and
+                // it would be more efficient if we could avoid that.
+                // https://github.com/apache/datafusion-comet/issues/963
+                let child = Self::wrap_in_copy_exec(child);
 
                 Ok((
                     scans,
-                    Arc::new(SortExec::new(exprs?, 
copy_exec).with_fetch(fetch)),
+                    Arc::new(SortExec::new(exprs?, child).with_fetch(fetch)),
                 ))
             }
             OpStruct::Scan(scan) => {
@@ -1069,9 +1069,17 @@ impl PhysicalPlanner {
                     join.join_type,
                     &join.condition,
                 )?;
+
+                // HashJoinExec may cache the input batch internally. We need
+                // to copy the input batch to avoid the data corruption from 
reusing the input
+                // batch. We also need to unpack dictionary arrays, because 
the join operators
+                // do not support them.
+                let left = Self::wrap_in_copy_exec(join_params.left);
+                let right = Self::wrap_in_copy_exec(join_params.right);
+
                 let hash_join = Arc::new(HashJoinExec::try_new(
-                    join_params.left,
-                    join_params.right,
+                    left,
+                    right,
                     join_params.join_on,
                     join_params.join_filter,
                     &join_params.join_type,
@@ -1135,6 +1143,7 @@ impl PhysicalPlanner {
         }
     }
 
+    #[allow(clippy::too_many_arguments)]
     fn parse_join_parameters(
         &self,
         inputs: &mut Vec<Arc<GlobalRef>>,
@@ -1263,21 +1272,6 @@ impl PhysicalPlanner {
             None
         };
 
-        // DataFusion Join operators keep the input batch internally. We need
-        // to copy the input batch to avoid the data corruption from reusing 
the input
-        // batch.
-        let left = if can_reuse_input_batch(&left) {
-            Arc::new(CopyExec::new(left, CopyMode::UnpackOrDeepCopy))
-        } else {
-            Arc::new(CopyExec::new(left, CopyMode::UnpackOrClone))
-        };
-
-        let right = if can_reuse_input_batch(&right) {
-            Arc::new(CopyExec::new(right, CopyMode::UnpackOrDeepCopy))
-        } else {
-            Arc::new(CopyExec::new(right, CopyMode::UnpackOrClone))
-        };
-
         Ok((
             JoinParameters {
                 left,
@@ -1290,6 +1284,16 @@ impl PhysicalPlanner {
         ))
     }
 
+    /// Wrap an ExecutionPlan in a CopyExec, which will unpack any 
dictionary-encoded arrays
+    /// and make a deep copy of other arrays if the plan re-uses batches.
+    fn wrap_in_copy_exec(plan: Arc<dyn ExecutionPlan>) -> Arc<dyn 
ExecutionPlan> {
+        if can_reuse_input_batch(&plan) {
+            Arc::new(CopyExec::new(plan, CopyMode::UnpackOrDeepCopy))
+        } else {
+            Arc::new(CopyExec::new(plan, CopyMode::UnpackOrClone))
+        }
+    }
+
     /// Create a DataFusion physical aggregate expression from Spark physical 
aggregate expression
     fn create_agg_expr(
         &self,
diff --git a/native/core/src/execution/operators/copy.rs 
b/native/core/src/execution/operators/copy.rs
index d6c095a7..d8e75c67 100644
--- a/native/core/src/execution/operators/copy.rs
+++ b/native/core/src/execution/operators/copy.rs
@@ -50,7 +50,9 @@ pub struct CopyExec {
 
 #[derive(Debug, PartialEq, Clone)]
 pub enum CopyMode {
+    /// Perform a deep copy and also unpack dictionaries
     UnpackOrDeepCopy,
+    /// Perform a clone and also unpack dictionaries
     UnpackOrClone,
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to