milenkovicm commented on code in PR #1691:
URL: 
https://github.com/apache/datafusion-ballista/pull/1691#discussion_r3226646531


##########
ballista/scheduler/src/physical_optimizer/join_selection.rs:
##########
@@ -746,6 +755,108 @@ mod test {
 
     /// Create join filter for NLJoinExec with expression `big_col > small_col`
     /// where both columns are 0-indexed and come from left and right inputs 
respectively
+    // Regression for https://github.com/apache/datafusion-ballista/issues/1681
+    //
+    // `JoinSelection` must not swap a `HashJoinExec(CollectLeft)` join's
+    // inputs in a way that puts a multi-partition reader on the build side.
+    // Ballista's pipeline has no `EnforceDistribution` pass after this rule,
+    // so the swapped plan reaches the executor unchanged and trips
+    // `HashJoinExec::execute`'s assertion that `left_partitions == 1` in
+    // `CollectLeft` mode.
+    #[tokio::test]
+    async fn collect_left_swap_preserves_one_partition_build() {
+        use datafusion::{
+            common::NullEquality,
+            config::ConfigOptions,
+            physical_expr::expressions::Column,
+            physical_optimizer::PhysicalOptimizerRule,
+            physical_plan::{
+                ExecutionPlanProperties, 
coalesce_partitions::CoalescePartitionsExec,
+                joins::HashJoinExec, joins::PartitionMode,
+            },
+        };
+
+        use crate::physical_optimizer::join_selection::JoinSelection;
+
+        let schema = Schema::new(vec![Field::new("k", DataType::Int32, 
false)]);
+
+        // Build side: 1 logical partition (e.g. a Ballista broadcast
+        // ShuffleReaderExec) but BIG total_byte_size.
+        let big_inner = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Precision::Exact(10_000),
+                total_byte_size: Precision::Exact(1_000_000),
+                column_statistics: vec![ColumnStatistics::new_unknown()],
+            },
+            schema.clone(),
+        ));
+        let left =
+            Arc::new(CoalescePartitionsExec::new(big_inner)) as Arc<dyn 
ExecutionPlan>;
+        assert_eq!(left.output_partitioning().partition_count(), 1);
+
+        // Probe side: 2 partitions (the raw StatisticsExec default), SMALL
+        // total_byte_size. Stand-in for a multi-partition hash shuffle reader.
+        let right = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Precision::Exact(3_000),
+                total_byte_size: Precision::Exact(200_000),
+                column_statistics: vec![ColumnStatistics::new_unknown()],
+            },
+            schema.clone(),
+        )) as Arc<dyn ExecutionPlan>;
+        assert!(right.output_partitioning().partition_count() > 1);
+
+        let on = vec![(
+            Arc::new(Column::new_with_schema("k", &left.schema()).unwrap()) as 
_,
+            Arc::new(Column::new_with_schema("k", &right.schema()).unwrap()) 
as _,
+        )];
+
+        let join = Arc::new(
+            HashJoinExec::try_new(
+                Arc::clone(&left),
+                Arc::clone(&right),
+                on,
+                None,
+                &JoinType::Inner,
+                None,
+                PartitionMode::CollectLeft,
+                NullEquality::NullEqualsNothing,
+                false,
+            )
+            .unwrap(),
+        ) as Arc<dyn ExecutionPlan>;
+
+        let optimized = JoinSelection::new()
+            .optimize(join, &ConfigOptions::new())
+            .unwrap();
+
+        // `swap_inputs` for Inner wraps the join in a ProjectionExec to
+        // restore the output column order. Walk the tree to find the join.
+        fn find_hash_join(plan: &Arc<dyn ExecutionPlan>) -> 
Option<&HashJoinExec> {
+            if let Some(hj) = plan.as_any().downcast_ref::<HashJoinExec>() {
+                return Some(hj);
+            }
+            for child in plan.children() {
+                if let Some(hj) = find_hash_join(child) {
+                    return Some(hj);
+                }
+            }
+            None
+        }
+
+        let hj =
+            find_hash_join(&optimized).expect("HashJoinExec missing from 
optimized plan");
+
+        assert_eq!(*hj.partition_mode(), PartitionMode::CollectLeft);
+        assert_eq!(
+            hj.left().output_partitioning().partition_count(),
+            1,
+            "JoinSelection swapped a CollectLeft join's inputs and ended up \
+             with a multi-partition reader on the build side, which violates \
+             CollectLeft's invariant",
+        );

Review Comment:
   minor, would it make sense to check if 
`hj.right().output_partitioning().partition_count() == 2` just in case  ?



##########
ballista/scheduler/src/physical_optimizer/join_selection.rs:
##########
@@ -746,6 +755,108 @@ mod test {
 
     /// Create join filter for NLJoinExec with expression `big_col > small_col`
     /// where both columns are 0-indexed and come from left and right inputs 
respectively

Review Comment:
   has this comment been hijacked from next method down `nl_join_filter` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to