my-vegetable-has-exploded commented on code in PR #9236:
URL: https://github.com/apache/arrow-datafusion/pull/9236#discussion_r1517117933


##########
datafusion/core/src/physical_optimizer/projection_pushdown.rs:
##########
@@ -524,13 +529,100 @@ fn try_pushdown_through_union(
     Ok(Some(Arc::new(UnionExec::new(new_children))))
 }
 
+/// Some projection can't be pushed down left input or right input of hash 
join because filter or on need may need some columns that won't be used in 
later.
+/// By embed those projection to hash join, we can reduce the cost of 
build_batch_from_indices in hash join (build_batch_from_indices need to can 
compute::take() for each column) and avoid unecessary output creation.
+fn try_embed_to_hash_join(
+    projection: &ProjectionExec,
+    hash_join: &HashJoinExec,
+) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+    // Collect all column indices from the given projection expressions.
+    let projection_index = collect_column_indices(projection.expr());
+
+    if projection_index.is_empty() {
+        return Ok(None);
+    };
+
+    let new_hash_join =
+        Arc::new(hash_join.with_projection(Some(projection_index.to_vec()))?);
+
+    // Build projection expressions for update_expr. Zip the projection_index 
with the new_hash_join output schema fields.
+    let embed_project_exprs = projection_index
+        .iter()
+        .zip(new_hash_join.schema().fields())
+        .map(|(index, field)| {
+            (
+                Arc::new(Column::new(field.name(), *index)) as Arc<dyn 
PhysicalExpr>,
+                field.name().to_owned(),
+            )
+        })
+        .collect::<Vec<_>>();
+
+    let mut new_projection_exprs = Vec::with_capacity(projection.expr().len());
+
+    for (expr, alias) in projection.expr() {
+        // update column index for projection expression since the input 
schema has been changed.
+        let Some(expr) = update_expr(expr, embed_project_exprs.as_slice(), 
false)? else {
+            return Ok(None);
+        };
+        new_projection_exprs.push((expr, alias.clone()));
+    }
+    // Old projection may contain some alias or expression such as `a + 1` and 
`CAST('true' AS BOOLEAN)`, but our projection_exprs in hash join just contain 
column, so we need to create the new projection to keep the original projection.
+    let new_projection = Arc::new(ProjectionExec::try_new(
+        new_projection_exprs,
+        new_hash_join.clone(),
+    )?);
+    if is_projection_removable(&new_projection) {
+        Ok(Some(new_hash_join))
+    } else {
+        Ok(Some(new_projection))
+    }
+}
+
+/// Collect all column indices from the given projection expressions.
+fn collect_column_indices(exprs: &[(Arc<dyn PhysicalExpr>, String)]) -> 
Vec<usize> {

Review Comment:
   good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to