wirybeaver commented on code in PR #2203:
URL: https://github.com/apache/iceberg-rust/pull/2203#discussion_r3090691304


##########
crates/iceberg/src/transaction/row_delta.rs:
##########
@@ -0,0 +1,491 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use uuid::Uuid;
+
+use crate::error::Result;
+use crate::spec::{DataFile, ManifestEntry, ManifestFile, ManifestStatus, 
Operation};
+use crate::table::Table;
+use crate::transaction::snapshot::{
+    DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer,
+};
+use crate::transaction::{ActionCommit, TransactionAction};
+
+/// RowDeltaAction handles both data file additions and deletions in a single 
snapshot.
+/// This is the core transaction type for MERGE, UPDATE, DELETE operations.
+///
+/// Corresponds to `org.apache.iceberg.RowDelta` in the Java implementation.
+///
+/// # Copy-on-Write (COW) Strategy
+///
+/// For row-level modifications:
+/// 1. Read target data files that contain rows to be modified
+/// 2. Apply modifications (UPDATE/DELETE logic)
+/// 3. Write modified rows to new data files via `add_data_files()`
+/// 4. Mark original files as deleted via `remove_data_files()`
+///
+/// For inserts (NOT MATCHED in MERGE):
+/// 1. Write new rows to data files
+/// 2. Add files via `add_data_files()`
+///
+/// # Future: Merge-on-Read (MOR) Strategy
+///
+/// The `add_delete_files()` method is reserved for future MOR support, which 
uses
+/// delete files instead of rewriting data files.
+pub struct RowDeltaAction {
+    /// New data files to add (for inserts or rewritten files in COW mode)
+    added_data_files: Vec<DataFile>,
+    /// Data files to mark as deleted (for COW mode when rewriting files)
+    removed_data_files: Vec<DataFile>,
+    /// Delete files to add (reserved for future MOR mode support)
+    added_delete_files: Vec<DataFile>,
+    /// Optional commit UUID for manifest file naming
+    commit_uuid: Option<Uuid>,
+    /// Additional properties to add to snapshot summary
+    snapshot_properties: HashMap<String, String>,
+    /// Optional starting snapshot ID for conflict detection
+    starting_snapshot_id: Option<i64>,
+}
+
+impl RowDeltaAction {
+    pub(crate) fn new() -> Self {
+        Self {
+            added_data_files: vec![],
+            removed_data_files: vec![],
+            added_delete_files: vec![],
+            commit_uuid: None,
+            snapshot_properties: HashMap::default(),
+            starting_snapshot_id: None,
+        }
+    }
+
+    /// Add new data files to the snapshot.
+    ///
+    /// Used for:
+    /// - New rows from INSERT operations
+    /// - Rewritten data files in COW mode (after applying UPDATE/DELETE)
+    ///
+    /// Corresponds to `addRows(DataFile)` in Java implementation.
+    pub fn add_data_files(mut self, data_files: impl IntoIterator<Item = 
DataFile>) -> Self {
+        self.added_data_files.extend(data_files);
+        self
+    }
+
+    /// Mark data files as deleted in the snapshot.
+    ///
+    /// Used in COW mode to mark original files as deleted when they've been 
rewritten
+    /// with modifications.
+    ///
+    /// Corresponds to `removeRows(DataFile)` in Java implementation.
+    pub fn remove_data_files(mut self, data_files: impl IntoIterator<Item = 
DataFile>) -> Self {
+        self.removed_data_files.extend(data_files);
+        self
+    }
+
+    /// Add delete files to the snapshot (reserved for future MOR mode).
+    ///
+    /// Corresponds to `addDeletes(DeleteFile)` in Java implementation.
+    ///
+    /// # Note
+    ///
+    /// This is not yet implemented and is reserved for future Merge-on-Read 
(MOR)
+    /// optimization where delete files are used instead of rewriting data 
files.
+    pub fn add_delete_files(mut self, delete_files: impl IntoIterator<Item = 
DataFile>) -> Self {
+        self.added_delete_files.extend(delete_files);
+        self
+    }
+
+    /// Set commit UUID for the snapshot.
+    pub fn set_commit_uuid(mut self, commit_uuid: Uuid) -> Self {
+        self.commit_uuid = Some(commit_uuid);
+        self
+    }
+
+    /// Set snapshot summary properties.
+    pub fn set_snapshot_properties(mut self, snapshot_properties: 
HashMap<String, String>) -> Self {
+        self.snapshot_properties = snapshot_properties;
+        self
+    }
+
+    /// Validate that the operation is applied on top of a specific snapshot.
+    ///
+    /// This can be used for conflict detection in concurrent modification 
scenarios.
+    ///
+    /// Corresponds to `validateFromSnapshot(long snapshotId)` in Java 
implementation.
+    pub fn validate_from_snapshot(mut self, snapshot_id: i64) -> Self {
+        self.starting_snapshot_id = Some(snapshot_id);
+        self
+    }
+}
+
+#[async_trait]
+impl TransactionAction for RowDeltaAction {
+    async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
+        // Validate starting snapshot if specified
+        if let Some(expected_snapshot_id) = self.starting_snapshot_id
+            && table.metadata().current_snapshot_id() != 
Some(expected_snapshot_id)
+        {
+            return Err(crate::Error::new(
+                crate::ErrorKind::DataInvalid,
+                format!(
+                    "Cannot commit RowDelta based on stale snapshot. Expected: 
{}, Current: {:?}",
+                    expected_snapshot_id,
+                    table.metadata().current_snapshot_id()
+                ),
+            ));
+        }
+
+        let snapshot_producer = SnapshotProducer::new(
+            table,
+            self.commit_uuid.unwrap_or_else(Uuid::now_v7),
+            None, // key_metadata - not used for row delta
+            self.snapshot_properties.clone(),
+            self.added_data_files.clone(),
+        );
+
+        // Validate added files (same validation as FastAppend)
+        snapshot_producer.validate_added_data_files()?;
+
+        // Create RowDeltaOperation with removed files
+        let operation = RowDeltaOperation {
+            removed_data_files: self.removed_data_files.clone(),
+            added_delete_files: self.added_delete_files.clone(),
+        };
+
+        snapshot_producer
+            .commit(operation, DefaultManifestProcess)
+            .await
+    }
+}
+
+/// Implements the snapshot production logic for RowDelta operations.
+///
+/// This determines:
+/// - Which operation type is recorded (Append/Delete/Overwrite)
+/// - Which manifest entries should be marked as deleted
+/// - Which existing manifests should be carried forward
+struct RowDeltaOperation {
+    removed_data_files: Vec<DataFile>,
+    added_delete_files: Vec<DataFile>,
+}
+
+impl SnapshotProduceOperation for RowDeltaOperation {
+    /// Determine operation type based on what's being added/removed.
+    ///
+    /// Logic matches Java implementation in BaseRowDelta:
+    /// - Only adds data files (no deletes, no removes) → Append
+    /// - Only adds delete files → Delete

Review Comment:
   Yes, `Operation::Delete` is reserved for the Merge-on-Read path and is not 
yet returned. In Java, `BaseRowDelta.operation()` returns `Operation.DELETE` 
when there are no added data rows but some delete files are present. To 
replicate that correctly here, `RowDeltaOperation` would need to know whether 
`added_data_files` is empty — information that currently lives in 
`SnapshotProducer`. Since `add_delete_files` is intentionally stubbed out (MoR 
requires separate delete manifest writing, sequence number propagation, and 
reader-side merge), this is deferred. Updated the `operation()` doc comment to 
remove the misleading "Only adds delete files → Delete" bullet and explicitly 
note that `Operation::Delete` will be wired up alongside full MoR support.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to