avantgardnerio commented on code in PR #4902:
URL: https://github.com/apache/arrow-datafusion/pull/4902#discussion_r1070327132


##########
datafusion/sql/src/statement.rs:
##########
@@ -505,6 +533,192 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         }))
     }
 
+    fn delete_to_plan(
+        &self,
+        table_factor: TableFactor,
+        predicate_expr: Option<Expr>,
+    ) -> Result<LogicalPlan> {
+        let table_name = match &table_factor {
+            TableFactor::Table { name, .. } => name.clone(),
+            _ => Err(DataFusionError::Plan(
+                "Unsupported table type for delete!".to_string(),
+            ))?,
+        };
+
+        // Do a table lookup to verify the table exists
+        let table_ref = object_name_to_table_reference(table_name.clone())?;
+        let provider = self
+            .schema_provider
+            .get_table_provider((&table_ref).into())?;
+        let schema = (*provider.schema()).clone();
+        let schema = DFSchema::try_from(schema)?;
+        let scan =
+            LogicalPlanBuilder::scan(table_name.to_string(), provider, 
None)?.build()?;
+        let mut planner_context = PlannerContext::new();
+
+        let source = match predicate_expr {
+            None => scan,
+            Some(predicate_expr) => {
+                let filter_expr =
+                    self.sql_to_expr(predicate_expr, &schema, &mut 
planner_context)?;
+                let schema = Arc::new(schema.clone());
+                let mut using_columns = HashSet::new();
+                expr_to_columns(&filter_expr, &mut using_columns)?;
+                let filter_expr = normalize_col_with_schemas(
+                    filter_expr,
+                    &[&schema],
+                    &[using_columns],
+                )?;
+                LogicalPlan::Filter(Filter::try_new(filter_expr, 
Arc::new(scan))?)
+            }
+        };
+
+        let plan = LogicalPlan::Write(WriteRel {
+            table_name: table_ref,
+            table_schema: schema.into(),
+            op: WriteOp::Delete,
+            input: Arc::new(source),
+        });
+        Ok(plan)
+    }
+
+    fn update_to_plan(
+        &self,
+        table: TableWithJoins,
+        assignments: Vec<Assignment>,
+        from: Option<TableWithJoins>,
+        predicate_expr: Option<Expr>,
+    ) -> Result<LogicalPlan> {
+        let table_name = match &table.relation {
+            TableFactor::Table { name, .. } => name.clone(),
+            _ => Err(DataFusionError::Plan(
+                "Unsupported table type for update!".to_string(),
+            ))?,
+        };
+
+        // Do a table lookup to verify the table exists
+        let table_ref = object_name_to_table_reference(table_name)?;
+        let provider = self
+            .schema_provider
+            .get_table_provider((&table_ref).into())?;
+
+        // Build schema
+        let mut planner_context = PlannerContext::new();
+        let table_schema = provider.schema();
+        let mut fields = vec![];
+        let mut values = vec![];
+        for assign in assignments.iter() {
+            let col_name: &Ident = assign
+                .id
+                .iter()
+                .last()
+                .ok_or(DataFusionError::Plan("Empty column id".to_string()))?;
+            let col_name = col_name.value.as_str();
+            values.push((col_name.to_string(), assign.value.clone()));
+            let (_, field) =
+                table_schema
+                    .column_with_name(col_name)
+                    .ok_or(DataFusionError::Plan(format!(
+                        "Column not found: {col_name}"
+                    )))?;
+            let field = DFField::from(field.clone());
+            fields.push(field);
+        }
+
+        // Build scan
+        let from = from.unwrap_or(table);
+        let scan = self.plan_from_tables(vec![from], &mut planner_context)?;
+
+        // Filter
+        let source = match predicate_expr {
+            None => scan,
+            Some(predicate_expr) => {
+                let plan_schema = scan.schema();
+                let filter_expr =
+                    self.sql_to_expr(predicate_expr, plan_schema, &mut 
planner_context)?;
+                let schema = Arc::new(plan_schema.clone());
+                let mut using_columns = HashSet::new();
+                expr_to_columns(&filter_expr, &mut using_columns)?;
+                for use_col in using_columns.iter() {
+                    let col_name = use_col.name.clone();
+                    let rel = use_col.relation.as_ref().cloned();
+                    let field = plan_schema
+                        .field_with_name(rel.as_deref(), 
use_col.name.as_str())?
+                        .clone();
+                    fields.push(field.clone());
+                    values.push((
+                        col_name.clone(),
+                        
ast::Expr::Identifier(ast::Ident::from(col_name.as_str())),
+                    ));
+                }
+                let filter_expr = normalize_col_with_schemas(
+                    filter_expr,
+                    &[&schema],
+                    &[using_columns],
+                )?;
+                LogicalPlan::Filter(Filter::try_new(filter_expr, 
Arc::new(scan))?)
+            }
+        };
+
+        // Projection
+        let proj_schema = DFSchema::new_with_metadata(fields, HashMap::new())?;
+        let mut exprs = vec![];
+        for (col_name, expr) in values.into_iter() {
+            let expr = self.sql_to_expr(expr, &proj_schema, &mut 
planner_context)?;
+            let expr = expr.alias(col_name);
+            exprs.push(expr);
+        }
+        let source = project(source, exprs)?;
+
+        let plan = LogicalPlan::Write(WriteRel {
+            table_name: table_ref,
+            table_schema: proj_schema.into(),

Review Comment:
   > I don't understand the reason for the schema manipulation
   
   I think we need to preserve which fields are being updated, and what the 
filter values are in the schema for `WriteRel.input`. I think you're correct 
however in that `WriteRel.table_schema` should just be the table schema. I'll 
fix that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to