kosiew commented on code in PR #20191:
URL: https://github.com/apache/datafusion/pull/20191#discussion_r2792238967


##########
datafusion/physical-plan/src/joins/utils.rs:
##########
@@ -1057,6 +1057,14 @@ pub(crate) fn build_batch_empty_build_side(
         // the remaining joins will return data for the right columns and null 
for the left ones
         JoinType::Right | JoinType::Full | JoinType::RightAnti | 
JoinType::RightMark => {
             let num_rows = probe_batch.num_rows();
+            if schema.fields().is_empty() {
+                let options = 
RecordBatchOptions::new().with_row_count(Some(num_rows));
+                return Ok(RecordBatch::try_new_with_options(
+                    Arc::new(schema.clone()),
+                    vec![],
+                    &options,
+                )?);

Review Comment:
   `build_batch_empty_build_side` also contains empty-schema 
`RecordBatch::try_new_with_options(... row_count ...)` logic similar to 
`build_batch_from_indices`.
   
   Extracting a shared utility like `new_empty_batch_with_row_count(schema, 
row_count)` in this module to keep row-count handling consistent and reduce 
future drift.



##########
datafusion/core/tests/physical_optimizer/projection_pushdown.rs:
##########
@@ -1723,3 +1723,47 @@ fn test_cooperative_exec_after_projection() -> 
Result<()> {
 
     Ok(())
 }
+
+#[test]
+fn test_hash_join_empty_projection_embeds() -> Result<()> {
+    let left_csv = create_simple_csv_exec();
+    let right_csv = create_simple_csv_exec();
+
+    let join = Arc::new(HashJoinExec::try_new(
+        left_csv,
+        right_csv,
+        vec![(Arc::new(Column::new("a", 0)), Arc::new(Column::new("a", 0)))],
+        None,
+        &JoinType::Right,
+        None,
+        PartitionMode::CollectLeft,
+        NullEquality::NullEqualsNothing,
+        false,
+    )?);
+
+    // Empty projection: no columns needed from the join output
+    let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
+        vec![] as Vec<ProjectionExpr>,
+        join,
+    )?);
+
+    let after_optimize =
+        ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+    let after_optimize_string = displayable(after_optimize.as_ref())
+        .indent(true)
+        .to_string();
+    let actual = after_optimize_string.trim();
+
+    // The empty projection should be embedded into the HashJoinExec,
+    // resulting in projection=[] on the join and no ProjectionExec wrapper.
+    assert_snapshot!(
+        actual,
+        @r"
+    HashJoinExec: mode=CollectLeft, join_type=Right, on=[(a@0, a@0)], 
projection=[]
+      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=csv, has_header=false
+      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=csv, has_header=false
+    "

Review Comment:
   The new test asserts plan shape (`projection=[]` embedded into 
`HashJoinExec`). Since this change also touches runtime batch construction for 
empty schemas, consider adding one end-to-end assertion (for example, `SELECT 
count(*) ... RIGHT/FULL JOIN ...`) to validate output row counts when the join 
emits zero columns.
   
   This would guard against future regressions where explain-plan remains 
correct but runtime `RecordBatch::num_rows()` behavior drifts.



##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -563,6 +563,15 @@ pub fn try_embed_projection<Exec: EmbeddedProjection + 
'static>(
     projection: &ProjectionExec,
     execution_plan: &Exec,
 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+    // If the projection has no expressions at all (e.g., ProjectionExec: 
expr=[]),
+    // embed an empty projection into the execution plan so it outputs zero 
columns.
+    // This avoids allocating throwaway null arrays for build-side columns
+    // when no output columns are actually needed (e.g., count(1) over a right 
join).
+    if projection.expr().is_empty() {
+        let new_execution_plan = 
Arc::new(execution_plan.with_projection(Some(vec![]))?);
+        return Ok(Some(new_execution_plan));
+    }

Review Comment:
   I think amending API docs on `EmbeddedProjection::with_projection` (or 
nearby module docs) to explicitly state that `Some(vec![])` must preserve row 
count with zero output columns would be helpful to inform implementers of the 
behavioural change of this new branch.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to