wirybeaver commented on code in PR #20763:
URL: https://github.com/apache/datafusion/pull/20763#discussion_r3214252442


##########
datafusion/proto/src/logical_plan/from_proto.rs:
##########
@@ -243,10 +245,137 @@ impl From<protobuf::dml_node::Type> for WriteOp {
             protobuf::dml_node::Type::InsertReplace => 
WriteOp::Insert(InsertOp::Replace),
             protobuf::dml_node::Type::Ctas => WriteOp::Ctas,
             protobuf::dml_node::Type::Truncate => WriteOp::Truncate,
+            // MERGE_INTO carries a payload (`MergeIntoOpNode`) that this
+            // tag-only conversion cannot read; callers must use
+            // [`parse_write_op`] for `DmlNode`s with a MergeInto payload.
+            protobuf::dml_node::Type::MergeInto => unreachable!(
+                "WriteOp::MergeInto requires the MergeIntoOpNode payload; use 
parse_write_op",
+            ),
+        }
+    }
+}
+
+impl From<protobuf::merge_into_clause_node::Kind> for MergeIntoClauseKind {
+    fn from(k: protobuf::merge_into_clause_node::Kind) -> Self {
+        match k {
+            protobuf::merge_into_clause_node::Kind::Matched => {
+                MergeIntoClauseKind::Matched
+            }
+            protobuf::merge_into_clause_node::Kind::NotMatched => {
+                MergeIntoClauseKind::NotMatched
+            }
+            protobuf::merge_into_clause_node::Kind::NotMatchedByTarget => {
+                MergeIntoClauseKind::NotMatchedByTarget
+            }
+            protobuf::merge_into_clause_node::Kind::NotMatchedBySource => {
+                MergeIntoClauseKind::NotMatchedBySource
+            }
         }
     }
 }
 
+/// Reconstruct a [`WriteOp`] from a [`protobuf::DmlNode`], reading the
+/// `merge_into` payload when the type tag is `MergeInto`.
+pub fn parse_write_op(
+    node: &protobuf::DmlNode,
+    ctx: &TaskContext,
+    codec: &dyn LogicalExtensionCodec,
+) -> Result<WriteOp, Error> {
+    let typ = node.dml_type();
+    if matches!(typ, protobuf::dml_node::Type::MergeInto) {
+        let merge_into = node.merge_into.as_ref().ok_or_else(|| {
+            Error::General(
+                "DmlNode with MERGE_INTO type is missing the merge_into 
payload"
+                    .to_string(),
+            )
+        })?;
+        return Ok(WriteOp::MergeInto(parse_merge_into_op(
+            merge_into, ctx, codec,
+        )?));
+    }
+    Ok(typ.into())
+}
+
+fn parse_merge_into_op(
+    op: &protobuf::MergeIntoOpNode,
+    ctx: &TaskContext,
+    codec: &dyn LogicalExtensionCodec,
+) -> Result<MergeIntoOp, Error> {
+    let on = op.on.as_ref().ok_or_else(|| {
+        Error::General("MergeIntoOpNode is missing required `on` 
expression".to_string())
+    })?;
+    let on = parse_expr(on, ctx, codec)?;
+    let clauses = op
+        .clauses
+        .iter()
+        .map(|c| parse_merge_into_clause(c, ctx, codec))
+        .collect::<Result<Vec<_>, Error>>()?;
+    Ok(MergeIntoOp { on, clauses })
+}
+
+fn parse_merge_into_clause(
+    clause: &protobuf::MergeIntoClauseNode,
+    ctx: &TaskContext,
+    codec: &dyn LogicalExtensionCodec,
+) -> Result<MergeIntoClause, Error> {
+    let kind = protobuf::merge_into_clause_node::Kind::try_from(clause.kind)

Review Comment:
   Added 4 new unit tests in `roundtrip_logical_plan.rs` covering each 
defensive check: missing `on` expression 
(`parse_merge_into_op_missing_on_errors`), unknown clause kind tag 
(`parse_merge_into_clause_unknown_kind_errors`), missing clause `action` 
(`parse_merge_into_clause_missing_action_errors`), and missing `action` oneof 
on `MergeIntoActionNode` (`parse_merge_into_action_missing_oneof_errors`). 
Together with the existing `parse_write_op_merge_into_without_payload_errors`, 
every error branch in the `parse_merge_into_*` helpers is exercised.



##########
datafusion/expr/src/logical_plan/dml.rs:
##########
@@ -291,10 +294,94 @@ impl Display for InsertOp {
     }
 }
 
+/// Describes a MERGE INTO operation's parameters.
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
+pub struct MergeIntoOp {
+    /// The join condition from `ON <expr>`.
+    pub on: Expr,
+    /// The WHEN clauses, in the order they appeared in the SQL.
+    pub clauses: Vec<MergeIntoClause>,
+}
+
+/// A single WHEN clause within a MERGE INTO statement.
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
+pub struct MergeIntoClause {
+    /// Whether this fires on matched or unmatched rows.
+    pub kind: MergeIntoClauseKind,
+    /// Optional additional predicate (`AND <expr>`).
+    pub predicate: Option<Expr>,
+    /// The action to take.
+    pub action: MergeIntoAction,
+}
+
+/// Which rows a MERGE WHEN clause applies to.
+///
+/// Mirrors `sqlparser::ast::MergeClauseKind` so that the SQL spelling is
+/// preserved through the logical plan.
+///
+/// **Note on `NotMatched` vs `NotMatchedByTarget`:** these two variants are

Review Comment:
   Added two helpers on `MergeIntoClauseKind`:
   
   - `is_not_matched_by_target(&self) -> bool` — predicate that returns `true` 
for both `NotMatched` and `NotMatchedByTarget`. Use when you only need a 
boolean check.
   - `canonical(self) -> Self` — collapses `NotMatched` into 
`NotMatchedByTarget`, so consumers can match on the canonical 3-variant form 
(`Matched`, `NotMatchedByTarget`, `NotMatchedBySource`) without re-deriving the 
equivalence in every match. Use when dispatching in a planner / optimizer / 
table provider.
   
   Unit tests for both in `datafusion-expr::logical_plan::dml::tests`. The 
type-level doc on `MergeIntoClauseKind` now points at these helpers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to