LorrensP-2158466 commented on code in PR #18023:
URL: https://github.com/apache/datafusion/pull/18023#discussion_r2429197985


##########
datafusion/optimizer/src/reorder_join/query_graph.rs:
##########
@@ -0,0 +1,403 @@
+use std::sync::Arc;
+
+use datafusion_common::{
+    plan_err,
+    tree_node::{TreeNode, TreeNodeRecursion},
+    DataFusionError, Result,
+};
+use datafusion_expr::{utils::find_valid_equijoin_key_pair, Join, LogicalPlan};
+
+pub type NodeId = usize;
+
+pub struct Node {
+    pub plan: Arc<LogicalPlan>,
+    pub(crate) connections: Vec<EdgeId>,
+}
+
+impl Node {
+    pub(crate) fn connections(&self) -> &[EdgeId] {
+        &self.connections
+    }
+
+    pub(crate) fn connection_with<'graph>(
+        &self,
+        node_id: NodeId,
+        query_graph: &'graph QueryGraph,
+    ) -> Option<&'graph Edge> {
+        self.connections
+            .iter()
+            .filter_map(|edge_id| query_graph.get_edge(*edge_id))
+            .find(move |x| x.nodes.contains(&node_id))
+    }
+
+    pub(crate) fn neighbours(
+        &self,
+        node_id: NodeId,
+        query_graph: &QueryGraph,
+    ) -> Vec<NodeId> {
+        self.connections
+            .iter()
+            .filter_map(|edge_id| query_graph.get_edge(*edge_id))
+            .flat_map(|edge| edge.nodes)
+            .filter(|&id| id != node_id)
+            .collect()
+    }
+}
+
+pub type EdgeId = usize;
+
+pub struct Edge {
+    pub nodes: [NodeId; 2],
+    pub join: Join,
+}
+
+pub struct QueryGraph {
+    pub(crate) nodes: VecMap<Node>,
+    edges: VecMap<Edge>,
+}
+
+impl QueryGraph {
+    pub(crate) fn new() -> Self {
+        Self {
+            nodes: VecMap::new(),
+            edges: VecMap::new(),
+        }
+    }
+
+    pub(crate) fn add_node(&mut self, node_data: Arc<LogicalPlan>) -> NodeId {
+        self.nodes.insert(Node {
+            plan: node_data,
+            connections: Vec::new(),
+        })
+    }
+
+    pub(crate) fn add_node_with_edge(
+        &mut self,
+        other: NodeId,
+        node_data: Arc<LogicalPlan>,
+        edge_data: Join,
+    ) -> Option<NodeId> {
+        if self.nodes.contains_key(other) {
+            let new_id = self.nodes.insert(Node {
+                plan: node_data,
+                connections: Vec::new(),
+            });
+            self.add_edge(new_id, other, edge_data);
+            Some(new_id)
+        } else {
+            None
+        }
+    }
+
+    fn add_edge(&mut self, from: NodeId, to: NodeId, data: Join) -> 
Option<EdgeId> {
+        if self.nodes.contains_key(from) && self.nodes.contains_key(to) {
+            let edge_id = self.edges.insert(Edge {
+                nodes: [from, to],
+                join: data,
+            });
+            if let Some(from) = self.nodes.get_mut(from) {
+                from.connections.push(edge_id);
+            }
+            if let Some(to) = self.nodes.get_mut(to) {
+                to.connections.push(edge_id);
+            }
+            Some(edge_id)
+        } else {
+            None
+        }
+    }
+
+    pub(crate) fn remove_node(&mut self, node_id: NodeId) -> 
Option<Arc<LogicalPlan>> {
+        if let Some(node) = self.nodes.remove(node_id) {
+            // Remove all edges connected to this node
+            for edge_id in &node.connections {
+                if let Some(edge) = self.edges.remove(*edge_id) {
+                    // Remove the edge from the other node's connections
+                    for other_node_id in edge.nodes {
+                        if other_node_id != node_id {
+                            if let Some(other_node) = 
self.nodes.get_mut(other_node_id) {
+                                other_node.connections.retain(|id| id != 
edge_id);
+                            }
+                        }
+                    }
+                }
+            }
+            Some(node.plan)
+        } else {
+            None
+        }
+    }
+
+    fn remove_edge(&mut self, edge_id: EdgeId) -> Option<Join> {
+        if let Some(edge) = self.edges.remove(edge_id) {
+            // Remove the edge from both nodes' connections
+            for node_id in edge.nodes {
+                if let Some(node) = self.nodes.get_mut(node_id) {
+                    node.connections.retain(|id| *id != edge_id);
+                }
+            }
+            Some(edge.join)
+        } else {
+            None
+        }
+    }
+
+    pub(crate) fn nodes(&self) -> impl Iterator<Item = (NodeId, &Node)> {
+        self.nodes.iter()
+    }
+
+    pub(crate) fn get_node(&self, key: NodeId) -> Option<&Node> {
+        self.nodes.get(key)
+    }
+
+    pub(crate) fn get_edge(&self, key: EdgeId) -> Option<&Edge> {
+        self.edges.get(key)
+    }
+}
+
+impl TryFrom<LogicalPlan> for QueryGraph {
+    type Error = DataFusionError;
+
+    fn try_from(value: LogicalPlan) -> Result<Self, Self::Error> {
+        let mut query_graph = QueryGraph::new();
+        flatten_joins_recursive(value, &mut query_graph)?;
+        Ok(query_graph)
+    }
+}
+
+fn flatten_joins_recursive(
+    plan: LogicalPlan,
+    query_graph: &mut QueryGraph,
+) -> Result<()> {
+    match plan {
+        LogicalPlan::Join(join) => {
+            flatten_joins_recursive(
+                Arc::unwrap_or_clone(Arc::clone(&join.left)),
+                query_graph,
+            )?;
+            flatten_joins_recursive(
+                Arc::unwrap_or_clone(Arc::clone(&join.right)),
+                query_graph,
+            )?;
+
+            // Process each equijoin predicate to find which nodes it connects
+            for (left_key, right_key) in &join.on {
+                // Try to find which two nodes this predicate connects
+                let mut found_edge = false;
+
+                // Collect all node IDs and their schemas for iteration
+                let nodes: Vec<(NodeId, Arc<LogicalPlan>)> = query_graph
+                    .nodes()
+                    .map(|(id, node)| (id, Arc::clone(&node.plan)))
+                    .collect();
+
+                // Try all pairs of nodes to find which ones this predicate 
connects
+                for i in 0..nodes.len() {
+                    if found_edge {
+                        break;
+                    }
+                    for j in (i + 1)..nodes.len() {
+                        let (node_id_a, plan_a) = &nodes[i];
+                        let (node_id_b, plan_b) = &nodes[j];
+
+                        let schema_a = plan_a.schema();
+                        let schema_b = plan_b.schema();
+
+                        // Check if this predicate connects these two nodes
+                        if (find_valid_equijoin_key_pair(
+                            left_key,
+                            right_key,
+                            schema_a.as_ref(),
+                            schema_b.as_ref(),
+                        )?)
+                        .is_some()
+                        {
+                            // Found a valid connection between node_a and 
node_b
+                            // Add an edge if one doesn't exist yet
+                            if let Some(node_a) = 
query_graph.get_node(*node_id_a) {
+                                if node_a
+                                    .connection_with(*node_id_b, query_graph)
+                                    .is_none()
+                                {
+                                    // No edge exists yet, create one with 
this join
+                                    query_graph.add_edge(
+                                        *node_id_a,
+                                        *node_id_b,
+                                        join.clone(),
+                                    );
+                                }
+                            }
+                            found_edge = true;
+                            break;
+                        }
+                    }
+                }
+
+                if !found_edge {
+                    return plan_err!(
+                        "Could not find nodes for join predicate: {} = {}",
+                        left_key,
+                        right_key
+                    );
+                }
+            }
+
+            Ok(())
+        }
+        x => {
+            if contains_join(&x) {
+                plan_err!(
+                    "Join reordering requires joins to be consecutive in the 
plan tree. \
+                     Found a non-join node that contains nested joins: {}",
+                    x.display()
+                )
+            } else {
+                query_graph.add_node(Arc::new(x));
+                Ok(())
+            }
+        }
+    }
+}
+
+/// Checks if a LogicalPlan contains any join nodes
+///
+/// Uses a TreeNode visitor to traverse the plan tree and detect the presence
+/// of any `LogicalPlan::Join` nodes.
+///
+/// # Arguments
+///
+/// * `plan` - The logical plan to check
+///
+/// # Returns
+///
+/// `true` if the plan contains at least one join node, `false` otherwise
+pub(crate) fn contains_join(plan: &LogicalPlan) -> bool {
+    let mut has_join = false;
+
+    // Use TreeNode's apply method to traverse the plan
+    let _ = plan.apply(|node| {
+        if matches!(node, LogicalPlan::Join(_)) {
+            has_join = true;
+            // Stop traversal once we find a join
+            Ok(TreeNodeRecursion::Stop)
+        } else {
+            // Continue traversal
+            Ok(TreeNodeRecursion::Continue)
+        }
+    });
+
+    has_join
+}
+
+/// A simple Vec-based map that uses Option<T> for sparse storage
+/// Keys are never reused once removed
+pub(crate) struct VecMap<V>(Vec<Option<V>>);

Review Comment:
   > And since we are not deleting notes, I thought the vecmap suffices.
   
   Aha okay, than i have nothing to add here :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to