adriangb commented on code in PR #19142:
URL: https://github.com/apache/datafusion/pull/19142#discussion_r2629371746
##########
datafusion/catalog/src/memory/table.rs:
##########
@@ -295,4 +304,320 @@ impl TableProvider for MemTable {
fn get_column_default(&self, column: &str) -> Option<&Expr> {
self.column_defaults.get(column)
}
+
+ async fn delete_from(
+ &self,
+ state: &dyn Session,
+ filters: Vec<Expr>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ // Early exit if table has no partitions
+ if self.batches.is_empty() {
+ return Ok(Arc::new(DmlResultExec::new(0)));
+ }
+
+ *self.sort_order.lock() = vec![];
+
+ let mut total_deleted: u64 = 0;
+ let df_schema = DFSchema::try_from(Arc::clone(&self.schema))?;
+
+ for partition_data in &self.batches {
+ let mut partition = partition_data.write().await;
+ let mut new_batches = Vec::with_capacity(partition.len());
+
+ for batch in partition.iter() {
+ if batch.num_rows() == 0 {
+ continue;
+ }
+
+ let filter_mask = if filters.is_empty() {
+ BooleanArray::from(vec![true; batch.num_rows()])
+ } else {
+ let mut combined_mask: Option<BooleanArray> = None;
+
+ for filter_expr in &filters {
+ let physical_expr = create_physical_expr(
+ filter_expr,
+ &df_schema,
+ state.execution_props(),
+ )?;
+
+ let result = physical_expr.evaluate(batch)?;
+ let array = result.into_array(batch.num_rows())?;
+ let bool_array = array
+ .as_any()
+ .downcast_ref::<BooleanArray>()
+ .ok_or_else(|| {
+ datafusion_common::DataFusionError::Internal(
+ "Filter did not evaluate to
boolean".to_string(),
+ )
+ })?
+ .clone();
+
+ combined_mask = Some(match combined_mask {
+ Some(existing) => and(&existing, &bool_array)?,
+ None => bool_array,
+ });
+ }
+
+ combined_mask.unwrap_or_else(|| {
+ BooleanArray::from(vec![true; batch.num_rows()])
+ })
+ };
+
+ let delete_count =
+ filter_mask.iter().filter(|v| v == &Some(true)).count();
+ total_deleted += delete_count as u64;
+
+ // Keep rows where predicate is false or NULL (SQL
three-valued logic)
+ let keep_mask: BooleanArray =
+ filter_mask.iter().map(|v| Some(v !=
Some(true))).collect();
+ let filtered_batch = filter_record_batch(batch, &keep_mask)?;
+
+ if filtered_batch.num_rows() > 0 {
+ new_batches.push(filtered_batch);
+ }
+ }
+
+ *partition = new_batches;
+ }
+
+ Ok(Arc::new(DmlResultExec::new(total_deleted)))
+ }
+
+ async fn update(
+ &self,
+ state: &dyn Session,
+ assignments: Vec<(String, Expr)>,
+ filters: Vec<Expr>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ // Early exit if table has no partitions
+ if self.batches.is_empty() {
+ return Ok(Arc::new(DmlResultExec::new(0)));
+ }
+
+ // Validate column names upfront with clear error messages
+ let available_columns: Vec<&str> = self
+ .schema
+ .fields()
+ .iter()
+ .map(|f| f.name().as_str())
+ .collect();
+ for (column_name, _) in &assignments {
+ if self.schema.field_with_name(column_name).is_err() {
+ return plan_err!(
+ "UPDATE failed: column '{}' does not exist. Available
columns: {}",
+ column_name,
+ available_columns.join(", ")
+ );
+ }
+ }
+
+ let assignment_map: HashMap<&str, &Expr> = assignments
+ .iter()
+ .map(|(name, expr)| (name.as_str(), expr))
+ .collect();
+
+ *self.sort_order.lock() = vec![];
+
+ let mut total_updated: u64 = 0;
+ let df_schema = DFSchema::try_from(Arc::clone(&self.schema))?;
+
+ for partition_data in &self.batches {
+ let mut partition = partition_data.write().await;
+ let mut new_batches = Vec::with_capacity(partition.len());
+
+ for batch in partition.iter() {
+ if batch.num_rows() == 0 {
+ continue;
+ }
+
+ let filter_mask = if filters.is_empty() {
+ BooleanArray::from(vec![true; batch.num_rows()])
+ } else {
+ let mut combined_mask: Option<BooleanArray> = None;
+
+ for filter_expr in &filters {
+ let physical_expr = create_physical_expr(
+ filter_expr,
+ &df_schema,
+ state.execution_props(),
+ )?;
+
+ let result = physical_expr.evaluate(batch)?;
+ let array = result.into_array(batch.num_rows())?;
+ let bool_array = array
+ .as_any()
+ .downcast_ref::<BooleanArray>()
+ .ok_or_else(|| {
+ datafusion_common::DataFusionError::Internal(
+ "Filter did not evaluate to
boolean".to_string(),
+ )
+ })?
+ .clone();
+
+ combined_mask = Some(match combined_mask {
+ Some(existing) => and(&existing, &bool_array)?,
+ None => bool_array,
+ });
+ }
+
+ combined_mask.unwrap_or_else(|| {
+ BooleanArray::from(vec![true; batch.num_rows()])
+ })
+ };
Review Comment:
Can this be factored into common code `evaluate_filters_to_mask` or
something?
##########
datafusion/catalog/src/memory/table.rs:
##########
@@ -295,4 +304,320 @@ impl TableProvider for MemTable {
fn get_column_default(&self, column: &str) -> Option<&Expr> {
self.column_defaults.get(column)
}
+
+ async fn delete_from(
+ &self,
+ state: &dyn Session,
+ filters: Vec<Expr>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ // Early exit if table has no partitions
+ if self.batches.is_empty() {
+ return Ok(Arc::new(DmlResultExec::new(0)));
+ }
+
+ *self.sort_order.lock() = vec![];
+
+ let mut total_deleted: u64 = 0;
+ let df_schema = DFSchema::try_from(Arc::clone(&self.schema))?;
+
+ for partition_data in &self.batches {
+ let mut partition = partition_data.write().await;
+ let mut new_batches = Vec::with_capacity(partition.len());
+
+ for batch in partition.iter() {
+ if batch.num_rows() == 0 {
+ continue;
+ }
+
+ let filter_mask = if filters.is_empty() {
+ BooleanArray::from(vec![true; batch.num_rows()])
Review Comment:
Can't we just exit early here?
##########
datafusion/catalog/src/memory/table.rs:
##########
@@ -295,4 +304,320 @@ impl TableProvider for MemTable {
fn get_column_default(&self, column: &str) -> Option<&Expr> {
self.column_defaults.get(column)
}
+
+ async fn delete_from(
+ &self,
+ state: &dyn Session,
+ filters: Vec<Expr>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ // Early exit if table has no partitions
+ if self.batches.is_empty() {
+ return Ok(Arc::new(DmlResultExec::new(0)));
+ }
+
+ *self.sort_order.lock() = vec![];
+
+ let mut total_deleted: u64 = 0;
+ let df_schema = DFSchema::try_from(Arc::clone(&self.schema))?;
+
+ for partition_data in &self.batches {
+ let mut partition = partition_data.write().await;
+ let mut new_batches = Vec::with_capacity(partition.len());
+
+ for batch in partition.iter() {
+ if batch.num_rows() == 0 {
+ continue;
+ }
+
+ let filter_mask = if filters.is_empty() {
+ BooleanArray::from(vec![true; batch.num_rows()])
+ } else {
+ let mut combined_mask: Option<BooleanArray> = None;
+
+ for filter_expr in &filters {
+ let physical_expr = create_physical_expr(
+ filter_expr,
+ &df_schema,
+ state.execution_props(),
+ )?;
+
+ let result = physical_expr.evaluate(batch)?;
+ let array = result.into_array(batch.num_rows())?;
+ let bool_array = array
+ .as_any()
+ .downcast_ref::<BooleanArray>()
+ .ok_or_else(|| {
+ datafusion_common::DataFusionError::Internal(
+ "Filter did not evaluate to
boolean".to_string(),
+ )
+ })?
+ .clone();
+
+ combined_mask = Some(match combined_mask {
+ Some(existing) => and(&existing, &bool_array)?,
+ None => bool_array,
+ });
+ }
+
+ combined_mask.unwrap_or_else(|| {
+ BooleanArray::from(vec![true; batch.num_rows()])
+ })
+ };
+
+ let delete_count =
+ filter_mask.iter().filter(|v| v == &Some(true)).count();
+ total_deleted += delete_count as u64;
+
+ // Keep rows where predicate is false or NULL (SQL
three-valued logic)
+ let keep_mask: BooleanArray =
+ filter_mask.iter().map(|v| Some(v !=
Some(true))).collect();
+ let filtered_batch = filter_record_batch(batch, &keep_mask)?;
+
+ if filtered_batch.num_rows() > 0 {
+ new_batches.push(filtered_batch);
+ }
+ }
+
+ *partition = new_batches;
+ }
+
+ Ok(Arc::new(DmlResultExec::new(total_deleted)))
+ }
+
+ async fn update(
+ &self,
+ state: &dyn Session,
+ assignments: Vec<(String, Expr)>,
+ filters: Vec<Expr>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ // Early exit if table has no partitions
+ if self.batches.is_empty() {
+ return Ok(Arc::new(DmlResultExec::new(0)));
+ }
+
+ // Validate column names upfront with clear error messages
+ let available_columns: Vec<&str> = self
+ .schema
+ .fields()
+ .iter()
+ .map(|f| f.name().as_str())
+ .collect();
+ for (column_name, _) in &assignments {
+ if self.schema.field_with_name(column_name).is_err() {
+ return plan_err!(
+ "UPDATE failed: column '{}' does not exist. Available
columns: {}",
+ column_name,
+ available_columns.join(", ")
+ );
+ }
+ }
+
+ let assignment_map: HashMap<&str, &Expr> = assignments
+ .iter()
+ .map(|(name, expr)| (name.as_str(), expr))
+ .collect();
+
+ *self.sort_order.lock() = vec![];
+
+ let mut total_updated: u64 = 0;
+ let df_schema = DFSchema::try_from(Arc::clone(&self.schema))?;
+
+ for partition_data in &self.batches {
+ let mut partition = partition_data.write().await;
+ let mut new_batches = Vec::with_capacity(partition.len());
+
+ for batch in partition.iter() {
+ if batch.num_rows() == 0 {
+ continue;
+ }
+
+ let filter_mask = if filters.is_empty() {
+ BooleanArray::from(vec![true; batch.num_rows()])
+ } else {
+ let mut combined_mask: Option<BooleanArray> = None;
+
+ for filter_expr in &filters {
+ let physical_expr = create_physical_expr(
+ filter_expr,
+ &df_schema,
+ state.execution_props(),
+ )?;
+
+ let result = physical_expr.evaluate(batch)?;
+ let array = result.into_array(batch.num_rows())?;
+ let bool_array = array
+ .as_any()
+ .downcast_ref::<BooleanArray>()
+ .ok_or_else(|| {
+ datafusion_common::DataFusionError::Internal(
+ "Filter did not evaluate to
boolean".to_string(),
+ )
+ })?
+ .clone();
+
+ combined_mask = Some(match combined_mask {
+ Some(existing) => and(&existing, &bool_array)?,
+ None => bool_array,
+ });
+ }
+
+ combined_mask.unwrap_or_else(|| {
+ BooleanArray::from(vec![true; batch.num_rows()])
+ })
+ };
+
+ let update_count =
+ filter_mask.iter().filter(|v| v == &Some(true)).count();
+ total_updated += update_count as u64;
+
+ if update_count == 0 {
+ new_batches.push(batch.clone());
+ continue;
+ }
+
+ // Normalize mask: only true (not NULL) triggers update
+ let update_mask: BooleanArray =
+ filter_mask.iter().map(|v| Some(v ==
Some(true))).collect();
+
+ let mut new_columns: Vec<ArrayRef> =
+ Vec::with_capacity(batch.num_columns());
+
+ for field in self.schema.fields() {
+ let column_name = field.name();
+ let original_column =
+ batch.column_by_name(column_name).ok_or_else(|| {
+
datafusion_common::DataFusionError::Internal(format!(
+ "Column '{column_name}' not found in batch"
+ ))
+ })?;
+
+ let new_column = if let Some(value_expr) =
+ assignment_map.get(column_name.as_str())
+ {
+ let physical_expr = create_physical_expr(
+ value_expr,
+ &df_schema,
+ state.execution_props(),
+ )?;
Review Comment:
Can we create these upfront e.g. `assignment_map.map(...) -> HashMap<String,
Arc<dyn PhysicalExpr>>`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]