This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new a690e9d6 perf: Remove one redundant CopyExec for SMJ (#962)
a690e9d6 is described below
commit a690e9d66c47ec431c73c65909154c1fc0f7c462
Author: Andy Grove <[email protected]>
AuthorDate: Fri Sep 27 06:09:29 2024 -0600
perf: Remove one redundant CopyExec for SMJ (#962)
---
native/core/src/execution/datafusion/planner.rs | 50 +++++++++++++------------
native/core/src/execution/operators/copy.rs | 2 +
2 files changed, 29 insertions(+), 23 deletions(-)
diff --git a/native/core/src/execution/datafusion/planner.rs
b/native/core/src/execution/datafusion/planner.rs
index 9000db61..ac82d7d0 100644
--- a/native/core/src/execution/datafusion/planner.rs
+++ b/native/core/src/execution/datafusion/planner.rs
@@ -917,15 +917,15 @@ impl PhysicalPlanner {
let fetch = sort.fetch.map(|num| num as usize);
- let copy_exec = if can_reuse_input_batch(&child) {
- Arc::new(CopyExec::new(child, CopyMode::UnpackOrDeepCopy))
- } else {
- Arc::new(CopyExec::new(child, CopyMode::UnpackOrClone))
- };
+ // SortExec caches batches so we need to make a copy of
incoming batches. Also,
+ // SortExec fails in some cases if we do not unpack
dictionary-encoded arrays, and
+ // it would be more efficient if we could avoid that.
+ // https://github.com/apache/datafusion-comet/issues/963
+ let child = Self::wrap_in_copy_exec(child);
Ok((
scans,
- Arc::new(SortExec::new(exprs?,
copy_exec).with_fetch(fetch)),
+ Arc::new(SortExec::new(exprs?, child).with_fetch(fetch)),
))
}
OpStruct::Scan(scan) => {
@@ -1069,9 +1069,17 @@ impl PhysicalPlanner {
join.join_type,
&join.condition,
)?;
+
+ // HashJoinExec may cache the input batch internally. We need
+ // to copy the input batch to avoid the data corruption from
reusing the input
+ // batch. We also need to unpack dictionary arrays, because
the join operators
+ // do not support them.
+ let left = Self::wrap_in_copy_exec(join_params.left);
+ let right = Self::wrap_in_copy_exec(join_params.right);
+
let hash_join = Arc::new(HashJoinExec::try_new(
- join_params.left,
- join_params.right,
+ left,
+ right,
join_params.join_on,
join_params.join_filter,
&join_params.join_type,
@@ -1135,6 +1143,7 @@ impl PhysicalPlanner {
}
}
+ #[allow(clippy::too_many_arguments)]
fn parse_join_parameters(
&self,
inputs: &mut Vec<Arc<GlobalRef>>,
@@ -1263,21 +1272,6 @@ impl PhysicalPlanner {
None
};
- // DataFusion Join operators keep the input batch internally. We need
- // to copy the input batch to avoid the data corruption from reusing
the input
- // batch.
- let left = if can_reuse_input_batch(&left) {
- Arc::new(CopyExec::new(left, CopyMode::UnpackOrDeepCopy))
- } else {
- Arc::new(CopyExec::new(left, CopyMode::UnpackOrClone))
- };
-
- let right = if can_reuse_input_batch(&right) {
- Arc::new(CopyExec::new(right, CopyMode::UnpackOrDeepCopy))
- } else {
- Arc::new(CopyExec::new(right, CopyMode::UnpackOrClone))
- };
-
Ok((
JoinParameters {
left,
@@ -1290,6 +1284,16 @@ impl PhysicalPlanner {
))
}
+ /// Wrap an ExecutionPlan in a CopyExec, which will unpack any
dictionary-encoded arrays
+ /// and make a deep copy of other arrays if the plan re-uses batches.
+ fn wrap_in_copy_exec(plan: Arc<dyn ExecutionPlan>) -> Arc<dyn
ExecutionPlan> {
+ if can_reuse_input_batch(&plan) {
+ Arc::new(CopyExec::new(plan, CopyMode::UnpackOrDeepCopy))
+ } else {
+ Arc::new(CopyExec::new(plan, CopyMode::UnpackOrClone))
+ }
+ }
+
/// Create a DataFusion physical aggregate expression from Spark physical
aggregate expression
fn create_agg_expr(
&self,
diff --git a/native/core/src/execution/operators/copy.rs
b/native/core/src/execution/operators/copy.rs
index d6c095a7..d8e75c67 100644
--- a/native/core/src/execution/operators/copy.rs
+++ b/native/core/src/execution/operators/copy.rs
@@ -50,7 +50,9 @@ pub struct CopyExec {
#[derive(Debug, PartialEq, Clone)]
pub enum CopyMode {
+ /// Perform a deep copy and also unpack dictionaries
UnpackOrDeepCopy,
+ /// Perform a clone and also unpack dictionaries
UnpackOrClone,
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]