This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ray.git


The following commit(s) were added to refs/heads/main by this push:
     new dcea736  Address a TODO about simplify Ray stages collection (#80)
dcea736 is described below

commit dcea7360eca8b63eb4d87e37a08c25cdfdef593e
Author: Ming Chen <[email protected]>
AuthorDate: Tue Mar 11 16:34:37 2025 -0400

    Address a TODO about simplify Ray stages collection (#80)
---
 src/dataframe.rs | 53 +++++++++--------------------------------------------
 1 file changed, 9 insertions(+), 44 deletions(-)

diff --git a/src/dataframe.rs b/src/dataframe.rs
index 7189c6d..dc3a720 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -96,49 +96,10 @@ impl DFRayDataFrame {
     ) -> PyResult<Vec<PyDFRayStage>> {
         let mut stages = vec![];
 
-        // TODO: This can be done more efficiently, likely in one pass but I'm
-        // struggling to get the TreeNodeRecursion return values to make it do
-        // what I want. So, two steps for now
-
-        // Step 2: we walk down this stage and replace stages earlier in the 
tree with
-        // RayStageReaderExecs as we will need to consume their output instead 
of
-        // execute that part of the tree ourselves
-        let down = |plan: Arc<dyn ExecutionPlan>| {
-            trace!(
-                "examining plan down:\n{}",
-                display_plan_with_partition_counts(&plan)
-            );
-
-            if let Some(stage_exec) = 
plan.as_any().downcast_ref::<DFRayStageExec>() {
-                let input = plan.children();
-                assert!(input.len() == 1, "RayStageExec must have exactly one 
child");
-                let input = input[0];
-
-                trace!(
-                    "inserting a ray stage reader to consume: {} with 
partitioning {}",
-                    displayable(plan.as_ref()).one_line(),
-                    plan.output_partitioning().partition_count()
-                );
-
-                let replacement = Arc::new(DFRayStageReaderExec::try_new(
-                    plan.output_partitioning().clone(),
-                    input.schema(),
-                    stage_exec.stage_id,
-                )?) as Arc<dyn ExecutionPlan>;
-
-                Ok(Transformed {
-                    data: replacement,
-                    transformed: true,
-                    tnr: TreeNodeRecursion::Jump,
-                })
-            } else {
-                Ok(Transformed::no(plan))
-            }
-        };
-
         let mut partition_groups = vec![];
         let mut full_partitions = false;
-        // Step 1: we walk up the tree from the leaves to find the stages
+        // We walk up the tree from the leaves to find the stages, record ray 
stages, and replace
+        // each ray stage with a corresponding ray reader stage.
         let up = |plan: Arc<dyn ExecutionPlan>| {
             trace!(
                 "Examining plan up: {}",
@@ -151,11 +112,15 @@ impl DFRayDataFrame {
                 assert!(input.len() == 1, "RayStageExec must have exactly one 
child");
                 let input = input[0];
 
-                let fixed_plan = input.clone().transform_down(down)?.data;
+                let replacement = Arc::new(DFRayStageReaderExec::try_new(
+                    plan.output_partitioning().clone(),
+                    input.schema(),
+                    stage_exec.stage_id,
+                )?) as Arc<dyn ExecutionPlan>;
 
                 let stage = PyDFRayStage::new(
                     stage_exec.stage_id,
-                    fixed_plan,
+                    input.clone(),
                     partition_groups.clone(),
                     full_partitions,
                 );
@@ -163,7 +128,7 @@ impl DFRayDataFrame {
                 full_partitions = false;
 
                 stages.push(stage);
-                Ok(Transformed::no(plan))
+                Ok(Transformed::yes(replacement))
             } else if 
plan.as_any().downcast_ref::<RepartitionExec>().is_some() {
                 trace!("repartition exec");
                 let (calculated_partition_groups, replacement) = 
build_replacement(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to