This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new b6dc4cfb feat: Support for V3 Metadata (#1682)
b6dc4cfb is described below

commit b6dc4cfb8bd68c777a14e73afcbd4cf8a1a093e1
Author: Christian <[email protected]>
AuthorDate: Tue Nov 4 17:35:46 2025 +0100

    feat: Support for V3 Metadata (#1682)
    
    ## Which issue does this PR close?
    Towards V3 Support!
    
    ## What changes are included in this PR?
    Introduce V3 FormatVersion accross Iceberg Metadata.
    
    ## Are these changes tested?
    Yes. Java has [a few more
    
tests](https://github.com/apache/iceberg/blob/ee90c10e39cec0ccceb9425e03a3e0b5690daf3b/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java#L36)
    regarding other table operations (different deletes, remove file), but
    we don't have those yet in our Transaction interface.
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
    Co-authored-by: Fokko Driesprong <[email protected]>
    Co-authored-by: Xuanwo <[email protected]>
---
 crates/iceberg/src/catalog/memory/catalog.rs       |   4 +-
 crates/iceberg/src/catalog/mod.rs                  | 219 ++++++-
 .../src/expr/visitors/manifest_evaluator.rs        |   1 +
 crates/iceberg/src/spec/encrypted_key.rs           |  16 +-
 crates/iceberg/src/spec/manifest/data_file.rs      |   6 +-
 crates/iceberg/src/spec/manifest/entry.rs          |  38 ++
 crates/iceberg/src/spec/manifest/mod.rs            |   3 +-
 crates/iceberg/src/spec/manifest/writer.rs         |  76 ++-
 crates/iceberg/src/spec/manifest_list.rs           | 671 ++++++++++++++++++++-
 crates/iceberg/src/spec/snapshot.rs                | 138 ++++-
 crates/iceberg/src/spec/snapshot_summary.rs        |   2 +
 crates/iceberg/src/spec/table_metadata.rs          | 541 ++++++++++++++++-
 crates/iceberg/src/spec/table_metadata_builder.rs  | 453 +++++++++++++-
 crates/iceberg/src/transaction/mod.rs              | 122 +++-
 crates/iceberg/src/transaction/snapshot.rs         |  36 +-
 .../src/writer/file_writer/location_generator.rs   |   2 +
 .../TableMetadataV3ValidMinimal.json               |  74 +++
 17 files changed, 2276 insertions(+), 126 deletions(-)

diff --git a/crates/iceberg/src/catalog/memory/catalog.rs 
b/crates/iceberg/src/catalog/memory/catalog.rs
index fdb495f6..cfa3dc6b 100644
--- a/crates/iceberg/src/catalog/memory/catalog.rs
+++ b/crates/iceberg/src/catalog/memory/catalog.rs
@@ -377,7 +377,7 @@ impl Catalog for MemoryCatalog {
 }
 
 #[cfg(test)]
-mod tests {
+pub(crate) mod tests {
     use std::collections::HashSet;
     use std::hash::Hash;
     use std::iter::FromIterator;
@@ -396,7 +396,7 @@ mod tests {
         temp_dir.path().to_str().unwrap().to_string()
     }
 
-    async fn new_memory_catalog() -> impl Catalog {
+    pub(crate) async fn new_memory_catalog() -> impl Catalog {
         let warehouse_location = temp_path();
         MemoryCatalogBuilder::default()
             .load(
diff --git a/crates/iceberg/src/catalog/mod.rs 
b/crates/iceberg/src/catalog/mod.rs
index ec4b77fe..27d5edae 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -28,7 +28,7 @@ use std::ops::Deref;
 use std::str::FromStr;
 use std::sync::Arc;
 
-use _serde::deserialize_snapshot;
+use _serde::{deserialize_snapshot, serialize_snapshot};
 use async_trait::async_trait;
 pub use memory::MemoryCatalog;
 pub use metadata_location::*;
@@ -39,9 +39,9 @@ use typed_builder::TypedBuilder;
 use uuid::Uuid;
 
 use crate::spec::{
-    FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot, 
SnapshotReference,
-    SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder, 
UnboundPartitionSpec,
-    ViewFormatVersion, ViewRepresentations, ViewVersion,
+    EncryptedKey, FormatVersion, PartitionStatisticsFile, Schema, SchemaId, 
Snapshot,
+    SnapshotReference, SortOrder, StatisticsFile, TableMetadata, 
TableMetadataBuilder,
+    UnboundPartitionSpec, ViewFormatVersion, ViewRepresentations, ViewVersion,
 };
 use crate::table::Table;
 use crate::{Error, ErrorKind, Result};
@@ -291,6 +291,9 @@ pub struct TableCreation {
         props.into_iter().collect()
     }))]
     pub properties: HashMap<String, String>,
+    /// Format version of the table. Defaults to V2.
+    #[builder(default = FormatVersion::V2)]
+    pub format_version: FormatVersion,
 }
 
 /// TableCommit represents the commit of a table in the catalog.
@@ -479,7 +482,10 @@ pub enum TableUpdate {
     #[serde(rename_all = "kebab-case")]
     AddSnapshot {
         /// Snapshot to add.
-        #[serde(deserialize_with = "deserialize_snapshot")]
+        #[serde(
+            deserialize_with = "deserialize_snapshot",
+            serialize_with = "serialize_snapshot"
+        )]
         snapshot: Snapshot,
     },
     /// Set table's snapshot ref.
@@ -554,6 +560,18 @@ pub enum TableUpdate {
         /// Schema IDs to remove.
         schema_ids: Vec<i32>,
     },
+    /// Add an encryption key
+    #[serde(rename_all = "kebab-case")]
+    AddEncryptionKey {
+        /// The encryption key to add.
+        encryption_key: EncryptedKey,
+    },
+    /// Remove an encryption key
+    #[serde(rename_all = "kebab-case")]
+    RemoveEncryptionKey {
+        /// The id of the encryption key to remove.
+        key_id: String,
+    },
 }
 
 impl TableUpdate {
@@ -598,6 +616,12 @@ impl TableUpdate {
                 Ok(builder.remove_partition_statistics(snapshot_id))
             }
             TableUpdate::RemoveSchemas { schema_ids } => 
builder.remove_schemas(&schema_ids),
+            TableUpdate::AddEncryptionKey { encryption_key } => {
+                Ok(builder.add_encryption_key(encryption_key))
+            }
+            TableUpdate::RemoveEncryptionKey { key_id } => {
+                Ok(builder.remove_encryption_key(&key_id))
+            }
         }
     }
 }
@@ -742,7 +766,7 @@ impl TableRequirement {
 }
 
 pub(super) mod _serde {
-    use serde::{Deserialize as _, Deserializer};
+    use serde::{Deserialize as _, Deserializer, Serialize as _};
 
     use super::*;
     use crate::spec::{SchemaId, Summary};
@@ -755,7 +779,18 @@ pub(super) mod _serde {
         Ok(buf.into())
     }
 
-    #[derive(Debug, Deserialize, PartialEq, Eq)]
+    pub(super) fn serialize_snapshot<S>(
+        snapshot: &Snapshot,
+        serializer: S,
+    ) -> std::result::Result<S::Ok, S::Error>
+    where
+        S: serde::Serializer,
+    {
+        let buf: CatalogSnapshot = snapshot.clone().into();
+        buf.serialize(serializer)
+    }
+
+    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
     #[serde(rename_all = "kebab-case")]
     /// Defines the structure of a v2 snapshot for the catalog.
     /// Main difference to SnapshotV2 is that sequence-number is optional
@@ -771,6 +806,12 @@ pub(super) mod _serde {
         summary: Summary,
         #[serde(skip_serializing_if = "Option::is_none")]
         schema_id: Option<SchemaId>,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        first_row_id: Option<u64>,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        added_rows: Option<u64>,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        key_id: Option<String>,
     }
 
     impl From<CatalogSnapshot> for Snapshot {
@@ -783,6 +824,9 @@ pub(super) mod _serde {
                 manifest_list,
                 schema_id,
                 summary,
+                first_row_id,
+                added_rows,
+                key_id,
             } = snapshot;
             let builder = Snapshot::builder()
                 .with_snapshot_id(snapshot_id)
@@ -790,11 +834,49 @@ pub(super) mod _serde {
                 .with_sequence_number(sequence_number)
                 .with_timestamp_ms(timestamp_ms)
                 .with_manifest_list(manifest_list)
-                .with_summary(summary);
-            if let Some(schema_id) = schema_id {
-                builder.with_schema_id(schema_id).build()
-            } else {
-                builder.build()
+                .with_summary(summary)
+                .with_encryption_key_id(key_id);
+            let row_range = first_row_id.zip(added_rows);
+            match (schema_id, row_range) {
+                (None, None) => builder.build(),
+                (Some(schema_id), None) => 
builder.with_schema_id(schema_id).build(),
+                (None, Some((first_row_id, last_row_id))) => {
+                    builder.with_row_range(first_row_id, last_row_id).build()
+                }
+                (Some(schema_id), Some((first_row_id, last_row_id))) => builder
+                    .with_schema_id(schema_id)
+                    .with_row_range(first_row_id, last_row_id)
+                    .build(),
+            }
+        }
+    }
+
+    impl From<Snapshot> for CatalogSnapshot {
+        fn from(snapshot: Snapshot) -> Self {
+            let first_row_id = snapshot.first_row_id();
+            let added_rows = snapshot.added_rows_count();
+            let Snapshot {
+                snapshot_id,
+                parent_snapshot_id,
+                sequence_number,
+                timestamp_ms,
+                manifest_list,
+                summary,
+                schema_id,
+                row_range: _,
+                encryption_key_id: key_id,
+            } = snapshot;
+            CatalogSnapshot {
+                snapshot_id,
+                parent_snapshot_id,
+                sequence_number,
+                timestamp_ms,
+                manifest_list,
+                summary,
+                schema_id,
+                first_row_id,
+                added_rows,
+                key_id,
             }
         }
     }
@@ -938,6 +1020,7 @@ mod tests {
     use std::fs::File;
     use std::io::BufReader;
 
+    use base64::Engine as _;
     use serde::Serialize;
     use serde::de::DeserializeOwned;
     use uuid::uuid;
@@ -945,7 +1028,7 @@ mod tests {
     use super::ViewUpdate;
     use crate::io::FileIOBuilder;
     use crate::spec::{
-        BlobMetadata, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, 
Operation,
+        BlobMetadata, EncryptedKey, FormatVersion, MAIN_BRANCH, NestedField, 
NullOrder, Operation,
         PartitionStatisticsFile, PrimitiveType, Schema, Snapshot, 
SnapshotReference,
         SnapshotRetention, SortDirection, SortField, SortOrder, 
SqlViewRepresentation,
         StatisticsFile, Summary, TableMetadata, TableMetadataBuilder, 
Transform, Type,
@@ -1075,20 +1158,18 @@ mod tests {
         assert!(requirement.check(Some(&metadata)).is_ok());
 
         // Add snapshot
-        let record = r#"
-        {
-            "snapshot-id": 3051729675574597004,
-            "sequence-number": 10,
-            "timestamp-ms": 9992191116217,
-            "summary": {
-                "operation": "append"
-            },
-            "manifest-list": "s3://b/wh/.../s1.avro",
-            "schema-id": 0
-        }
-        "#;
+        let snapshot = Snapshot::builder()
+            .with_snapshot_id(3051729675574597004)
+            .with_sequence_number(10)
+            .with_timestamp_ms(9992191116217)
+            .with_manifest_list("s3://b/wh/.../s1.avro".to_string())
+            .with_schema_id(0)
+            .with_summary(Summary {
+                operation: Operation::Append,
+                additional_properties: HashMap::new(),
+            })
+            .build();
 
-        let snapshot = serde_json::from_str::<Snapshot>(record).unwrap();
         let builder = metadata.into_builder(None);
         let builder = TableUpdate::AddSnapshot {
             snapshot: snapshot.clone(),
@@ -1666,6 +1747,50 @@ mod tests {
         assert_eq!(actual, update, "Parsed value is not equal to expected");
     }
 
+    #[test]
+    fn test_add_snapshot_v3() {
+        let json = serde_json::json!(
+        {
+            "action": "add-snapshot",
+            "snapshot": {
+                "snapshot-id": 3055729675574597000i64,
+                "parent-snapshot-id": 3051729675574597000i64,
+                "timestamp-ms": 1555100955770i64,
+                "first-row-id":0,
+                "added-rows":2,
+                "key-id":"key123",
+                "summary": {
+                    "operation": "append"
+                },
+                "manifest-list": "s3://a/b/2.avro"
+            }
+        });
+
+        let update = TableUpdate::AddSnapshot {
+            snapshot: Snapshot::builder()
+                .with_snapshot_id(3055729675574597000)
+                .with_parent_snapshot_id(Some(3051729675574597000))
+                .with_timestamp_ms(1555100955770)
+                .with_sequence_number(0)
+                .with_manifest_list("s3://a/b/2.avro")
+                .with_row_range(0, 2)
+                .with_encryption_key_id(Some("key123".to_string()))
+                .with_summary(Summary {
+                    operation: Operation::Append,
+                    additional_properties: HashMap::default(),
+                })
+                .build(),
+        };
+
+        let actual: TableUpdate = serde_json::from_value(json).expect("Failed 
to parse from json");
+        assert_eq!(actual, update, "Parsed value is not equal to expected");
+        let restored: TableUpdate = serde_json::from_str(
+            &serde_json::to_string(&actual).expect("Failed to serialize to 
json"),
+        )
+        .expect("Failed to parse from serialized json");
+        assert_eq!(restored, update);
+    }
+
     #[test]
     fn test_remove_snapshots() {
         let json = r#"
@@ -2169,6 +2294,48 @@ mod tests {
         );
     }
 
+    #[test]
+    fn test_add_encryption_key() {
+        let key_bytes = "key".as_bytes();
+        let encoded_key = 
base64::engine::general_purpose::STANDARD.encode(key_bytes);
+        test_serde_json(
+            format!(
+                r#"
+                {{
+                    "action": "add-encryption-key",
+                    "encryption-key": {{
+                        "key-id": "a",
+                        "encrypted-key-metadata": "{encoded_key}",
+                        "encrypted-by-id": "b"
+                    }}
+                }}        
+            "#
+            ),
+            TableUpdate::AddEncryptionKey {
+                encryption_key: EncryptedKey::builder()
+                    .key_id("a")
+                    .encrypted_key_metadata(key_bytes.to_vec())
+                    .encrypted_by_id("b")
+                    .build(),
+            },
+        );
+    }
+
+    #[test]
+    fn test_remove_encryption_key() {
+        test_serde_json(
+            r#"
+                {
+                    "action": "remove-encryption-key",
+                    "key-id": "a"
+                }        
+            "#,
+            TableUpdate::RemoveEncryptionKey {
+                key_id: "a".to_string(),
+            },
+        );
+    }
+
     #[test]
     fn test_table_commit() {
         let table = {
diff --git a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs 
b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
index 903c9ea4..abbd136c 100644
--- a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
+++ b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
@@ -703,6 +703,7 @@ mod test {
             deleted_rows_count: None,
             partitions: Some(partitions),
             key_metadata: None,
+            first_row_id: None,
         }
     }
 
diff --git a/crates/iceberg/src/spec/encrypted_key.rs 
b/crates/iceberg/src/spec/encrypted_key.rs
index db19a023..6908ade1 100644
--- a/crates/iceberg/src/spec/encrypted_key.rs
+++ b/crates/iceberg/src/spec/encrypted_key.rs
@@ -26,16 +26,16 @@ use serde::{Deserialize, Serialize};
 pub struct EncryptedKey {
     /// Unique identifier for the key
     #[builder(setter(into))]
-    key_id: String,
+    pub(crate) key_id: String,
     /// Encrypted key metadata as binary data
     #[builder(setter(into))]
-    encrypted_key_metadata: Vec<u8>,
+    pub(crate) encrypted_key_metadata: Vec<u8>,
     /// Identifier of the entity that encrypted this key
-    #[builder(setter(into))]
-    encrypted_by_id: String,
+    #[builder(default, setter(into, strip_option))]
+    pub(crate) encrypted_by_id: Option<String>,
     /// Additional properties associated with the key
     #[builder(default)]
-    properties: HashMap<String, String>,
+    pub(crate) properties: HashMap<String, String>,
 }
 
 impl EncryptedKey {
@@ -50,8 +50,8 @@ impl EncryptedKey {
     }
 
     /// Returns the ID of the entity that encrypted this key
-    pub fn encrypted_by_id(&self) -> &str {
-        &self.encrypted_by_id
+    pub fn encrypted_by_id(&self) -> Option<&str> {
+        self.encrypted_by_id.as_deref()
     }
 
     /// Returns the properties map
@@ -72,7 +72,7 @@ pub(super) mod _serde {
     pub(super) struct EncryptedKeySerde {
         pub key_id: String,
         pub encrypted_key_metadata: String, // Base64 encoded
-        pub encrypted_by_id: String,
+        pub encrypted_by_id: Option<String>,
         #[serde(default, skip_serializing_if = "HashMap::is_empty")]
         pub properties: HashMap<String, String>,
     }
diff --git a/crates/iceberg/src/spec/manifest/data_file.rs 
b/crates/iceberg/src/spec/manifest/data_file.rs
index 6c63f622..a9c041f5 100644
--- a/crates/iceberg/src/spec/manifest/data_file.rs
+++ b/crates/iceberg/src/spec/manifest/data_file.rs
@@ -24,7 +24,9 @@ use serde_derive::{Deserialize, Serialize};
 use serde_with::{DeserializeFromStr, SerializeDisplay};
 
 use super::_serde::DataFileSerde;
-use super::{Datum, FormatVersion, Schema, data_file_schema_v1, 
data_file_schema_v2};
+use super::{
+    Datum, FormatVersion, Schema, data_file_schema_v1, data_file_schema_v2, 
data_file_schema_v3,
+};
 use crate::error::Result;
 use crate::spec::{DEFAULT_PARTITION_SPEC_ID, Struct, StructType};
 use crate::{Error, ErrorKind};
@@ -295,6 +297,7 @@ pub fn write_data_files_to_avro<W: Write>(
     let avro_schema = match version {
         FormatVersion::V1 => data_file_schema_v1(partition_type).unwrap(),
         FormatVersion::V2 => data_file_schema_v2(partition_type).unwrap(),
+        FormatVersion::V3 => data_file_schema_v3(partition_type).unwrap(),
     };
     let mut writer = AvroWriter::new(&avro_schema, writer);
 
@@ -322,6 +325,7 @@ pub fn read_data_files_from_avro<R: Read>(
     let avro_schema = match version {
         FormatVersion::V1 => data_file_schema_v1(partition_type).unwrap(),
         FormatVersion::V2 => data_file_schema_v2(partition_type).unwrap(),
+        FormatVersion::V3 => data_file_schema_v3(partition_type).unwrap(),
     };
 
     let reader = AvroReader::with_schema(&avro_schema, reader)?;
diff --git a/crates/iceberg/src/spec/manifest/entry.rs 
b/crates/iceberg/src/spec/manifest/entry.rs
index d11d8acf..e8fe0f22 100644
--- a/crates/iceberg/src/spec/manifest/entry.rs
+++ b/crates/iceberg/src/spec/manifest/entry.rs
@@ -509,6 +509,42 @@ static CONTENT_SIZE_IN_BYTES: Lazy<NestedFieldRef> = {
     })
 };
 
+fn data_file_fields_v3(partition_type: &StructType) -> Vec<NestedFieldRef> {
+    vec![
+        CONTENT.clone(),
+        FILE_PATH.clone(),
+        FILE_FORMAT.clone(),
+        Arc::new(NestedField::required(
+            102,
+            "partition",
+            Type::Struct(partition_type.clone()),
+        )),
+        RECORD_COUNT.clone(),
+        FILE_SIZE_IN_BYTES.clone(),
+        COLUMN_SIZES.clone(),
+        VALUE_COUNTS.clone(),
+        NULL_VALUE_COUNTS.clone(),
+        NAN_VALUE_COUNTS.clone(),
+        LOWER_BOUNDS.clone(),
+        UPPER_BOUNDS.clone(),
+        KEY_METADATA.clone(),
+        SPLIT_OFFSETS.clone(),
+        EQUALITY_IDS.clone(),
+        SORT_ORDER_ID.clone(),
+        FIRST_ROW_ID.clone(),
+        REFERENCE_DATA_FILE.clone(),
+        CONTENT_OFFSET.clone(),
+        CONTENT_SIZE_IN_BYTES.clone(),
+    ]
+}
+
+pub(super) fn data_file_schema_v3(partition_type: &StructType) -> 
Result<AvroSchema> {
+    let schema = Schema::builder()
+        .with_fields(data_file_fields_v3(partition_type))
+        .build()?;
+    schema_to_avro_schema("data_file", &schema)
+}
+
 fn data_file_fields_v2(partition_type: &StructType) -> Vec<NestedFieldRef> {
     vec![
         CONTENT.clone(),
@@ -533,6 +569,8 @@ fn data_file_fields_v2(partition_type: &StructType) -> 
Vec<NestedFieldRef> {
         SORT_ORDER_ID.clone(),
         FIRST_ROW_ID.clone(),
         REFERENCE_DATA_FILE.clone(),
+        // Why are the following two fields here in the existing v2 schema?
+        // In the spec, they are not even listed as optional for v2.
         CONTENT_OFFSET.clone(),
         CONTENT_SIZE_IN_BYTES.clone(),
     ]
diff --git a/crates/iceberg/src/spec/manifest/mod.rs 
b/crates/iceberg/src/spec/manifest/mod.rs
index a1a5612c..51219bfd 100644
--- a/crates/iceberg/src/spec/manifest/mod.rs
+++ b/crates/iceberg/src/spec/manifest/mod.rs
@@ -70,7 +70,8 @@ impl Manifest {
                     })
                     .collect::<Result<Vec<_>>>()?
             }
-            FormatVersion::V2 => {
+            // Manifest Schema & Manifest Entry did not change between V2 and 
V3
+            FormatVersion::V2 | FormatVersion::V3 => {
                 let schema = manifest_schema_v2(&partition_type)?;
                 let reader = AvroReader::with_schema(&schema, bs)?;
                 reader
diff --git a/crates/iceberg/src/spec/manifest/writer.rs 
b/crates/iceberg/src/spec/manifest/writer.rs
index 673f8b5d..ebb0590b 100644
--- a/crates/iceberg/src/spec/manifest/writer.rs
+++ b/crates/iceberg/src/spec/manifest/writer.rs
@@ -72,7 +72,13 @@ impl ManifestWriterBuilder {
             .format_version(FormatVersion::V1)
             .content(ManifestContentType::Data)
             .build();
-        ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata, 
metadata)
+        ManifestWriter::new(
+            self.output,
+            self.snapshot_id,
+            self.key_metadata,
+            metadata,
+            None,
+        )
     }
 
     /// Build a [`ManifestWriter`] for format version 2, data content.
@@ -84,7 +90,13 @@ impl ManifestWriterBuilder {
             .format_version(FormatVersion::V2)
             .content(ManifestContentType::Data)
             .build();
-        ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata, 
metadata)
+        ManifestWriter::new(
+            self.output,
+            self.snapshot_id,
+            self.key_metadata,
+            metadata,
+            None,
+        )
     }
 
     /// Build a [`ManifestWriter`] for format version 2, deletes content.
@@ -96,7 +108,51 @@ impl ManifestWriterBuilder {
             .format_version(FormatVersion::V2)
             .content(ManifestContentType::Deletes)
             .build();
-        ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata, 
metadata)
+        ManifestWriter::new(
+            self.output,
+            self.snapshot_id,
+            self.key_metadata,
+            metadata,
+            None,
+        )
+    }
+
+    /// Build a [`ManifestWriter`] for format version 2, data content.
+    pub fn build_v3_data(self) -> ManifestWriter {
+        let metadata = ManifestMetadata::builder()
+            .schema_id(self.schema.schema_id())
+            .schema(self.schema)
+            .partition_spec(self.partition_spec)
+            .format_version(FormatVersion::V3)
+            .content(ManifestContentType::Data)
+            .build();
+        ManifestWriter::new(
+            self.output,
+            self.snapshot_id,
+            self.key_metadata,
+            metadata,
+            // First row id is assigned by the [`ManifestListWriter`] when the 
manifest
+            // is added to the list.
+            None,
+        )
+    }
+
+    /// Build a [`ManifestWriter`] for format version 3, deletes content.
+    pub fn build_v3_deletes(self) -> ManifestWriter {
+        let metadata = ManifestMetadata::builder()
+            .schema_id(self.schema.schema_id())
+            .schema(self.schema)
+            .partition_spec(self.partition_spec)
+            .format_version(FormatVersion::V3)
+            .content(ManifestContentType::Deletes)
+            .build();
+        ManifestWriter::new(
+            self.output,
+            self.snapshot_id,
+            self.key_metadata,
+            metadata,
+            None,
+        )
     }
 }
 
@@ -112,6 +168,7 @@ pub struct ManifestWriter {
     existing_rows: u64,
     deleted_files: u32,
     deleted_rows: u64,
+    first_row_id: Option<u64>,
 
     min_seq_num: Option<i64>,
 
@@ -129,6 +186,7 @@ impl ManifestWriter {
         snapshot_id: Option<i64>,
         key_metadata: Option<Vec<u8>>,
         metadata: ManifestMetadata,
+        first_row_id: Option<u64>,
     ) -> Self {
         Self {
             output,
@@ -139,6 +197,7 @@ impl ManifestWriter {
             existing_rows: 0,
             deleted_files: 0,
             deleted_rows: 0,
+            first_row_id,
             min_seq_num: None,
             key_metadata,
             manifest_entries: Vec::new(),
@@ -348,7 +407,8 @@ impl ManifestWriter {
         let table_schema = &self.metadata.schema;
         let avro_schema = match self.metadata.format_version {
             FormatVersion::V1 => manifest_schema_v1(&partition_type)?,
-            FormatVersion::V2 => manifest_schema_v2(&partition_type)?,
+            // Manifest schema did not change between V2 and V3
+            FormatVersion::V2 | FormatVersion::V3 => 
manifest_schema_v2(&partition_type)?,
         };
         let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new());
         avro_writer.add_user_metadata(
@@ -388,8 +448,11 @@ impl ManifestWriter {
             let value = match self.metadata.format_version {
                 FormatVersion::V1 => to_value(ManifestEntryV1::try_from(entry, 
&partition_type)?)?
                     .resolve(&avro_schema)?,
-                FormatVersion::V2 => to_value(ManifestEntryV2::try_from(entry, 
&partition_type)?)?
-                    .resolve(&avro_schema)?,
+                // Manifest entry format did not change between V2 and V3
+                FormatVersion::V2 | FormatVersion::V3 => {
+                    to_value(ManifestEntryV2::try_from(entry, 
&partition_type)?)?
+                        .resolve(&avro_schema)?
+                }
             };
 
             avro_writer.append(value)?;
@@ -417,6 +480,7 @@ impl ManifestWriter {
             deleted_rows_count: Some(self.deleted_rows),
             partitions: Some(partition_summary),
             key_metadata: self.key_metadata,
+            first_row_id: self.first_row_id,
         })
     }
 }
diff --git a/crates/iceberg/src/spec/manifest_list.rs 
b/crates/iceberg/src/spec/manifest_list.rs
index 16409ffe..5e97e546 100644
--- a/crates/iceberg/src/spec/manifest_list.rs
+++ b/crates/iceberg/src/spec/manifest_list.rs
@@ -31,6 +31,8 @@ use self::_serde::{ManifestFileV1, ManifestFileV2};
 use super::{FormatVersion, Manifest};
 use crate::error::Result;
 use crate::io::{FileIO, OutputFile};
+use crate::spec::manifest_list::_const_schema::MANIFEST_LIST_AVRO_SCHEMA_V3;
+use crate::spec::manifest_list::_serde::ManifestFileV3;
 use crate::{Error, ErrorKind};
 
 /// Placeholder for sequence number. The field with this value must be 
replaced with the actual sequence number before it write.
@@ -69,6 +71,11 @@ impl ManifestList {
                 let values = 
Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
                 from_value::<_serde::ManifestListV2>(&values)?.try_into()
             }
+            FormatVersion::V3 => {
+                let reader = Reader::new(bs)?;
+                let values = 
Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
+                from_value::<_serde::ManifestListV3>(&values)?.try_into()
+            }
         }
     }
 
@@ -90,6 +97,7 @@ pub struct ManifestListWriter {
     avro_writer: Writer<'static, Vec<u8>>,
     sequence_number: i64,
     snapshot_id: i64,
+    next_row_id: Option<u64>,
 }
 
 impl std::fmt::Debug for ManifestListWriter {
@@ -103,6 +111,11 @@ impl std::fmt::Debug for ManifestListWriter {
 }
 
 impl ManifestListWriter {
+    /// Get the next row ID that will be assigned to the next data manifest 
added.
+    pub fn next_row_id(&self) -> Option<u64> {
+        self.next_row_id
+    }
+
     /// Construct a v1 [`ManifestListWriter`] that writes to a provided 
[`OutputFile`].
     pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: 
Option<i64>) -> Self {
         let mut metadata = HashMap::from_iter([
@@ -115,7 +128,14 @@ impl ManifestListWriter {
                 parent_snapshot_id.to_string(),
             );
         }
-        Self::new(FormatVersion::V1, output_file, metadata, 0, snapshot_id)
+        Self::new(
+            FormatVersion::V1,
+            output_file,
+            metadata,
+            0,
+            snapshot_id,
+            None,
+        )
     }
 
     /// Construct a v2 [`ManifestListWriter`] that writes to a provided 
[`OutputFile`].
@@ -142,6 +162,42 @@ impl ManifestListWriter {
             metadata,
             sequence_number,
             snapshot_id,
+            None,
+        )
+    }
+
+    /// Construct a v3 [`ManifestListWriter`] that writes to a provided 
[`OutputFile`].
+    pub fn v3(
+        output_file: OutputFile,
+        snapshot_id: i64,
+        parent_snapshot_id: Option<i64>,
+        sequence_number: i64,
+        first_row_id: Option<u64>, // Always None for delete manifests
+    ) -> Self {
+        let mut metadata = HashMap::from_iter([
+            ("snapshot-id".to_string(), snapshot_id.to_string()),
+            ("sequence-number".to_string(), sequence_number.to_string()),
+            ("format-version".to_string(), "3".to_string()),
+        ]);
+        metadata.insert(
+            "parent-snapshot-id".to_string(),
+            parent_snapshot_id
+                .map(|v| v.to_string())
+                .unwrap_or("null".to_string()),
+        );
+        metadata.insert(
+            "first-row-id".to_string(),
+            first_row_id
+                .map(|v| v.to_string())
+                .unwrap_or("null".to_string()),
+        );
+        Self::new(
+            FormatVersion::V3,
+            output_file,
+            metadata,
+            sequence_number,
+            snapshot_id,
+            first_row_id,
         )
     }
 
@@ -151,10 +207,12 @@ impl ManifestListWriter {
         metadata: HashMap<String, String>,
         sequence_number: i64,
         snapshot_id: i64,
+        first_row_id: Option<u64>,
     ) -> Self {
         let avro_schema = match format_version {
             FormatVersion::V1 => &MANIFEST_LIST_AVRO_SCHEMA_V1,
             FormatVersion::V2 => &MANIFEST_LIST_AVRO_SCHEMA_V2,
+            FormatVersion::V3 => &MANIFEST_LIST_AVRO_SCHEMA_V3,
         };
         let mut avro_writer = Writer::new(avro_schema, Vec::new());
         for (key, value) in metadata {
@@ -168,46 +226,35 @@ impl ManifestListWriter {
             avro_writer,
             sequence_number,
             snapshot_id,
+            next_row_id: first_row_id,
         }
     }
 
     /// Append manifests to be written.
+    ///
+    /// If V3 Manifests are added and the `first_row_id` of any data manifest 
is unassigned,
+    /// it will be assigned based on the `next_row_id` of the writer, and the 
`next_row_id` of the writer will be updated accordingly.
+    /// If `first_row_id` is already assigned, it will be validated against 
the `next_row_id` of the writer.
     pub fn add_manifests(&mut self, manifests: impl Iterator<Item = 
ManifestFile>) -> Result<()> {
         match self.format_version {
             FormatVersion::V1 => {
                 for manifest in manifests {
-                    let manifes: ManifestFileV1 = manifest.try_into()?;
-                    self.avro_writer.append_ser(manifes)?;
+                    let manifests: ManifestFileV1 = manifest.try_into()?;
+                    self.avro_writer.append_ser(manifests)?;
                 }
             }
-            FormatVersion::V2 => {
+            FormatVersion::V2 | FormatVersion::V3 => {
                 for mut manifest in manifests {
-                    if manifest.sequence_number == UNASSIGNED_SEQUENCE_NUMBER {
-                        if manifest.added_snapshot_id != self.snapshot_id {
-                            return Err(Error::new(
-                                ErrorKind::DataInvalid,
-                                format!(
-                                    "Found unassigned sequence number for a 
manifest from snapshot {}.",
-                                    manifest.added_snapshot_id
-                                ),
-                            ));
-                        }
-                        manifest.sequence_number = self.sequence_number;
+                    self.assign_sequence_numbers(&mut manifest)?;
+
+                    if self.format_version == FormatVersion::V2 {
+                        let manifest_entry: ManifestFileV2 = 
manifest.try_into()?;
+                        self.avro_writer.append_ser(manifest_entry)?;
+                    } else if self.format_version == FormatVersion::V3 {
+                        self.assign_first_row_id(&mut manifest)?;
+                        let manifest_entry: ManifestFileV3 = 
manifest.try_into()?;
+                        self.avro_writer.append_ser(manifest_entry)?;
                     }
-                    if manifest.min_sequence_number == 
UNASSIGNED_SEQUENCE_NUMBER {
-                        if manifest.added_snapshot_id != self.snapshot_id {
-                            return Err(Error::new(
-                                ErrorKind::DataInvalid,
-                                format!(
-                                    "Found unassigned sequence number for a 
manifest from snapshot {}.",
-                                    manifest.added_snapshot_id
-                                ),
-                            ));
-                        }
-                        manifest.min_sequence_number = self.sequence_number;
-                    }
-                    let manifest_entry: ManifestFileV2 = manifest.try_into()?;
-                    self.avro_writer.append_ser(manifest_entry)?;
                 }
             }
         }
@@ -222,6 +269,112 @@ impl ManifestListWriter {
         writer.close().await?;
         Ok(())
     }
+
+    /// Assign sequence numbers to manifest if they are unassigned
+    fn assign_sequence_numbers(&self, manifest: &mut ManifestFile) -> 
Result<()> {
+        if manifest.sequence_number == UNASSIGNED_SEQUENCE_NUMBER {
+            if manifest.added_snapshot_id != self.snapshot_id {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Found unassigned sequence number for a manifest from 
snapshot {}.",
+                        manifest.added_snapshot_id
+                    ),
+                ));
+            }
+            manifest.sequence_number = self.sequence_number;
+        }
+
+        if manifest.min_sequence_number == UNASSIGNED_SEQUENCE_NUMBER {
+            if manifest.added_snapshot_id != self.snapshot_id {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Found unassigned sequence number for a manifest from 
snapshot {}.",
+                        manifest.added_snapshot_id
+                    ),
+                ));
+            }
+            manifest.min_sequence_number = self.sequence_number;
+        }
+
+        Ok(())
+    }
+
+    /// Returns number of newly assigned first-row-ids, if any.
+    fn assign_first_row_id(&mut self, manifest: &mut ManifestFile) -> 
Result<()> {
+        match manifest.content {
+            ManifestContentType::Data => {
+                match (self.next_row_id, manifest.first_row_id) {
+                    (Some(_), Some(_)) => {
+                        // Case: Manifest with already assigned first row ID.
+                        // No need to increase next_row_id, as this manifest 
is already assigned.
+                    }
+                    (None, Some(manifest_first_row_id)) => {
+                        // Case: Assigned first row ID for data manifest, but 
the writer does not have a next-row-id assigned.
+                        return Err(Error::new(
+                            ErrorKind::Unexpected,
+                            format!(
+                                "Found invalid first-row-id assignment for 
Manifest {}. Writer does not have a next-row-id assigned, but the manifest has 
first-row-id assigned to {}.",
+                                manifest.manifest_path, manifest_first_row_id,
+                            ),
+                        ));
+                    }
+                    (Some(writer_next_row_id), None) => {
+                        // Case: Unassigned first row ID for data manifest. 
This is either a new
+                        // manifest, or a manifest from a pre-v3 snapshot. We 
need to assign one.
+                        let (existing_rows_count, added_rows_count) =
+                            require_row_counts_in_manifest(manifest)?;
+                        manifest.first_row_id = Some(writer_next_row_id);
+
+                        self.next_row_id = writer_next_row_id
+                        .checked_add(existing_rows_count)
+                        .and_then(|sum| sum.checked_add(added_rows_count))
+                        .ok_or_else(|| {
+                            Error::new(
+                                ErrorKind::DataInvalid,
+                                format!(
+                                    "Row ID overflow when computing next row 
ID for Manifest {}. Next Row ID: {writer_next_row_id}, Existing Rows Count: 
{existing_rows_count}, Added Rows Count: {added_rows_count}",
+                                    manifest.manifest_path
+                                ),
+                            )
+                        }).map(Some)?;
+                    }
+                    (None, None) => {
+                        // Case: Table without row lineage. No action needed.
+                    }
+                }
+            }
+            ManifestContentType::Deletes => {
+                // Deletes never have a first-row-id assigned.
+                manifest.first_row_id = None;
+            }
+        };
+
+        Ok(())
+    }
+}
+
+fn require_row_counts_in_manifest(manifest: &ManifestFile) -> Result<(u64, 
u64)> {
+    let existing_rows_count = manifest.existing_rows_count.ok_or_else(|| {
+        Error::new(
+            ErrorKind::DataInvalid,
+            format!(
+                "Cannot include a Manifest without existing-rows-count to a 
table with row lineage enabled. Manifest path: {}",
+                manifest.manifest_path,
+            ),
+        )
+    })?;
+    let added_rows_count = manifest.added_rows_count.ok_or_else(|| {
+        Error::new(
+            ErrorKind::DataInvalid,
+            format!(
+                "Cannot include a Manifest without added-rows-count to a table 
with row lineage enabled. Manifest path: {}",
+                manifest.manifest_path,
+            ),
+        )
+    })?;
+    Ok((existing_rows_count, added_rows_count))
 }
 
 /// This is a helper module that defines the schema field of the manifest list 
entry.
@@ -453,6 +606,15 @@ mod _const_schema {
             ))
         })
     };
+    static FIRST_ROW_ID: Lazy<NestedFieldRef> = {
+        Lazy::new(|| {
+            Arc::new(NestedField::optional(
+                520,
+                "first_row_id",
+                Type::Primitive(PrimitiveType::Long),
+            ))
+        })
+    };
 
     static V1_SCHEMA: Lazy<Schema> = {
         Lazy::new(|| {
@@ -497,11 +659,38 @@ mod _const_schema {
         })
     };
 
+    static V3_SCHEMA: Lazy<Schema> = {
+        Lazy::new(|| {
+            let fields = vec![
+                MANIFEST_PATH.clone(),
+                MANIFEST_LENGTH.clone(),
+                PARTITION_SPEC_ID.clone(),
+                CONTENT.clone(),
+                SEQUENCE_NUMBER.clone(),
+                MIN_SEQUENCE_NUMBER.clone(),
+                ADDED_SNAPSHOT_ID.clone(),
+                ADDED_FILES_COUNT_V2.clone(),
+                EXISTING_FILES_COUNT_V2.clone(),
+                DELETED_FILES_COUNT_V2.clone(),
+                ADDED_ROWS_COUNT_V2.clone(),
+                EXISTING_ROWS_COUNT_V2.clone(),
+                DELETED_ROWS_COUNT_V2.clone(),
+                PARTITIONS.clone(),
+                KEY_METADATA.clone(),
+                FIRST_ROW_ID.clone(),
+            ];
+            Schema::builder().with_fields(fields).build().unwrap()
+        })
+    };
+
     pub(super) static MANIFEST_LIST_AVRO_SCHEMA_V1: Lazy<AvroSchema> =
         Lazy::new(|| schema_to_avro_schema("manifest_file", 
&V1_SCHEMA).unwrap());
 
     pub(super) static MANIFEST_LIST_AVRO_SCHEMA_V2: Lazy<AvroSchema> =
         Lazy::new(|| schema_to_avro_schema("manifest_file", 
&V2_SCHEMA).unwrap());
+
+    pub(super) static MANIFEST_LIST_AVRO_SCHEMA_V3: Lazy<AvroSchema> =
+        Lazy::new(|| schema_to_avro_schema("manifest_file", 
&V3_SCHEMA).unwrap());
 }
 
 /// Entry in a manifest list.
@@ -580,6 +769,10 @@ pub struct ManifestFile {
     ///
     /// Implementation-specific key metadata for encryption
     pub key_metadata: Option<Vec<u8>>,
+    /// field 520
+    ///
+    /// The starting _row_id to assign to rows added by ADDED data files
+    pub first_row_id: Option<u64>,
 }
 
 impl ManifestFile {
@@ -703,6 +896,12 @@ pub(super) mod _serde {
     use crate::error::Result;
     use crate::spec::FieldSummary;
 
+    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+    #[serde(transparent)]
+    pub(crate) struct ManifestListV3 {
+        entries: Vec<ManifestFileV3>,
+    }
+
     #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
     #[serde(transparent)]
     pub(crate) struct ManifestListV2 {
@@ -715,6 +914,33 @@ pub(super) mod _serde {
         entries: Vec<ManifestFileV1>,
     }
 
+    impl ManifestListV3 {
+        /// Converts the [ManifestListV3] into a [ManifestList].
+        pub fn try_into(self) -> Result<super::ManifestList> {
+            Ok(super::ManifestList {
+                entries: self
+                    .entries
+                    .into_iter()
+                    .map(|v| v.try_into())
+                    .collect::<Result<Vec<_>>>()?,
+            })
+        }
+    }
+
+    impl TryFrom<super::ManifestList> for ManifestListV3 {
+        type Error = Error;
+
+        fn try_from(value: super::ManifestList) -> std::result::Result<Self, 
Self::Error> {
+            Ok(Self {
+                entries: value
+                    .entries
+                    .into_iter()
+                    .map(|v| v.try_into())
+                    .collect::<std::result::Result<Vec<_>, _>>()?,
+            })
+        }
+    }
+
     impl ManifestListV2 {
         /// Converts the [ManifestListV2] into a [ManifestList].
         pub fn try_into(self) -> Result<super::ManifestList> {
@@ -813,6 +1039,58 @@ pub(super) mod _serde {
         pub key_metadata: Option<ByteBuf>,
     }
 
+    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+    pub(super) struct ManifestFileV3 {
+        pub manifest_path: String,
+        pub manifest_length: i64,
+        pub partition_spec_id: i32,
+        #[serde(default = "v2_default_content_for_v1")]
+        pub content: i32,
+        #[serde(default = "v2_default_sequence_number_for_v1")]
+        pub sequence_number: i64,
+        #[serde(default = "v2_default_min_sequence_number_for_v1")]
+        pub min_sequence_number: i64,
+        pub added_snapshot_id: i64,
+        #[serde(alias = "added_data_files_count", alias = "added_files_count")]
+        pub added_files_count: i32,
+        #[serde(alias = "existing_data_files_count", alias = 
"existing_files_count")]
+        pub existing_files_count: i32,
+        #[serde(alias = "deleted_data_files_count", alias = 
"deleted_files_count")]
+        pub deleted_files_count: i32,
+        pub added_rows_count: i64,
+        pub existing_rows_count: i64,
+        pub deleted_rows_count: i64,
+        pub partitions: Option<Vec<FieldSummary>>,
+        pub key_metadata: Option<ByteBuf>,
+        pub first_row_id: Option<u64>,
+    }
+
+    impl ManifestFileV3 {
+        /// Converts the [ManifestFileV3] into a [ManifestFile].
+        pub fn try_into(self) -> Result<ManifestFile> {
+            let manifest_file = ManifestFile {
+                manifest_path: self.manifest_path,
+                manifest_length: self.manifest_length,
+                partition_spec_id: self.partition_spec_id,
+                content: self.content.try_into()?,
+                sequence_number: self.sequence_number,
+                min_sequence_number: self.min_sequence_number,
+                added_snapshot_id: self.added_snapshot_id,
+                added_files_count: Some(self.added_files_count.try_into()?),
+                existing_files_count: 
Some(self.existing_files_count.try_into()?),
+                deleted_files_count: 
Some(self.deleted_files_count.try_into()?),
+                added_rows_count: Some(self.added_rows_count.try_into()?),
+                existing_rows_count: 
Some(self.existing_rows_count.try_into()?),
+                deleted_rows_count: Some(self.deleted_rows_count.try_into()?),
+                partitions: self.partitions,
+                key_metadata: self.key_metadata.map(|b| b.into_vec()),
+                first_row_id: self.first_row_id,
+            };
+
+            Ok(manifest_file)
+        }
+    }
+
     impl ManifestFileV2 {
         /// Converts the [ManifestFileV2] into a [ManifestFile].
         pub fn try_into(self) -> Result<ManifestFile> {
@@ -832,6 +1110,7 @@ pub(super) mod _serde {
                 deleted_rows_count: Some(self.deleted_rows_count.try_into()?),
                 partitions: self.partitions,
                 key_metadata: self.key_metadata.map(|b| b.into_vec()),
+                first_row_id: None,
             })
         }
     }
@@ -881,6 +1160,7 @@ pub(super) mod _serde {
                 content: super::ManifestContentType::Data,
                 sequence_number: 0,
                 min_sequence_number: 0,
+                first_row_id: None,
             })
         }
     }
@@ -892,6 +1172,80 @@ pub(super) mod _serde {
         }
     }
 
+    impl TryFrom<ManifestFile> for ManifestFileV3 {
+        type Error = Error;
+
+        fn try_from(value: ManifestFile) -> std::result::Result<Self, 
Self::Error> {
+            let key_metadata = 
convert_to_serde_key_metadata(value.key_metadata);
+            Ok(Self {
+                manifest_path: value.manifest_path,
+                manifest_length: value.manifest_length,
+                partition_spec_id: value.partition_spec_id,
+                content: value.content as i32,
+                sequence_number: value.sequence_number,
+                min_sequence_number: value.min_sequence_number,
+                added_snapshot_id: value.added_snapshot_id,
+                added_files_count: value
+                    .added_files_count
+                    .ok_or_else(|| {
+                        Error::new(
+                            crate::ErrorKind::DataInvalid,
+                            "added_data_files_count in ManifestFileV3 is 
required",
+                        )
+                    })?
+                    .try_into()?,
+                existing_files_count: value
+                    .existing_files_count
+                    .ok_or_else(|| {
+                        Error::new(
+                            crate::ErrorKind::DataInvalid,
+                            "existing_data_files_count in ManifestFileV3 is 
required",
+                        )
+                    })?
+                    .try_into()?,
+                deleted_files_count: value
+                    .deleted_files_count
+                    .ok_or_else(|| {
+                        Error::new(
+                            crate::ErrorKind::DataInvalid,
+                            "deleted_data_files_count in ManifestFileV3 is 
required",
+                        )
+                    })?
+                    .try_into()?,
+                added_rows_count: value
+                    .added_rows_count
+                    .ok_or_else(|| {
+                        Error::new(
+                            crate::ErrorKind::DataInvalid,
+                            "added_rows_count in ManifestFileV3 is required",
+                        )
+                    })?
+                    .try_into()?,
+                existing_rows_count: value
+                    .existing_rows_count
+                    .ok_or_else(|| {
+                        Error::new(
+                            crate::ErrorKind::DataInvalid,
+                            "existing_rows_count in ManifestFileV3 is 
required",
+                        )
+                    })?
+                    .try_into()?,
+                deleted_rows_count: value
+                    .deleted_rows_count
+                    .ok_or_else(|| {
+                        Error::new(
+                            crate::ErrorKind::DataInvalid,
+                            "deleted_rows_count in ManifestFileV3 is required",
+                        )
+                    })?
+                    .try_into()?,
+                partitions: value.partitions,
+                key_metadata,
+                first_row_id: value.first_row_id,
+            })
+        }
+    }
+
     impl TryFrom<ManifestFile> for ManifestFileV2 {
         type Error = Error;
 
@@ -1012,7 +1366,7 @@ mod test {
 
     use super::_serde::ManifestListV2;
     use crate::io::FileIOBuilder;
-    use crate::spec::manifest_list::_serde::ManifestListV1;
+    use crate::spec::manifest_list::_serde::{ManifestListV1, ManifestListV3};
     use crate::spec::{
         Datum, FieldSummary, ManifestContentType, ManifestFile, ManifestList, 
ManifestListWriter,
         UNASSIGNED_SEQUENCE_NUMBER,
@@ -1038,6 +1392,7 @@ mod test {
                     deleted_rows_count: Some(0),
                     partitions: Some(vec![]),
                     key_metadata: None,
+                    first_row_id: None,
                 }
             ]
         };
@@ -1089,6 +1444,7 @@ mod test {
                         vec![FieldSummary { contains_null: false, 
contains_nan: Some(false), lower_bound: 
Some(Datum::long(1).to_bytes().unwrap()), upper_bound: 
Some(Datum::long(1).to_bytes().unwrap())}]
                     ),
                     key_metadata: None,
+                    first_row_id: None,
                 },
                 ManifestFile {
                     manifest_path: 
"s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m1.avro".to_string(),
@@ -1108,6 +1464,7 @@ mod test {
                         vec![FieldSummary { contains_null: false, 
contains_nan: Some(false), lower_bound: 
Some(Datum::float(1.1).to_bytes().unwrap()), upper_bound: 
Some(Datum::float(2.1).to_bytes().unwrap())}]
                     ),
                     key_metadata: None,
+                    first_row_id: None,
                 }
             ]
         };
@@ -1138,6 +1495,80 @@ mod test {
         assert_eq!(manifest_list, parsed_manifest_list);
     }
 
+    #[tokio::test]
+    async fn test_parse_manifest_list_v3() {
+        let manifest_list = ManifestList {
+            entries: vec![
+                ManifestFile {
+                    manifest_path: 
"s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
+                    manifest_length: 6926,
+                    partition_spec_id: 1,
+                    content: ManifestContentType::Data,
+                    sequence_number: 1,
+                    min_sequence_number: 1,
+                    added_snapshot_id: 377075049360453639,
+                    added_files_count: Some(1),
+                    existing_files_count: Some(0),
+                    deleted_files_count: Some(0),
+                    added_rows_count: Some(3),
+                    existing_rows_count: Some(0),
+                    deleted_rows_count: Some(0),
+                    partitions: Some(
+                        vec![FieldSummary { contains_null: false, 
contains_nan: Some(false), lower_bound: 
Some(Datum::long(1).to_bytes().unwrap()), upper_bound: 
Some(Datum::long(1).to_bytes().unwrap())}]
+                    ),
+                    key_metadata: None,
+                    first_row_id: Some(10),
+                },
+                ManifestFile {
+                    manifest_path: 
"s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m1.avro".to_string(),
+                    manifest_length: 6926,
+                    partition_spec_id: 2,
+                    content: ManifestContentType::Data,
+                    sequence_number: 1,
+                    min_sequence_number: 1,
+                    added_snapshot_id: 377075049360453639,
+                    added_files_count: Some(1),
+                    existing_files_count: Some(0),
+                    deleted_files_count: Some(0),
+                    added_rows_count: Some(3),
+                    existing_rows_count: Some(0),
+                    deleted_rows_count: Some(0),
+                    partitions: Some(
+                        vec![FieldSummary { contains_null: false, 
contains_nan: Some(false), lower_bound: 
Some(Datum::float(1.1).to_bytes().unwrap()), upper_bound: 
Some(Datum::float(2.1).to_bytes().unwrap())}]
+                    ),
+                    key_metadata: None,
+                    first_row_id: Some(13),
+                }
+            ]
+        };
+
+        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
+
+        let tmp_dir = TempDir::new().unwrap();
+        let file_name = "simple_manifest_list_v3.avro";
+        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), 
file_name);
+
+        let mut writer = ManifestListWriter::v3(
+            file_io.new_output(full_path.clone()).unwrap(),
+            377075049360453639,
+            Some(377075049360453639),
+            1,
+            Some(10),
+        );
+
+        writer
+            .add_manifests(manifest_list.entries.clone().into_iter())
+            .unwrap();
+        writer.close().await.unwrap();
+
+        let bs = fs::read(full_path).expect("read_file must succeed");
+
+        let parsed_manifest_list =
+            ManifestList::parse_with_version(&bs, 
crate::spec::FormatVersion::V3).unwrap();
+
+        assert_eq!(manifest_list, parsed_manifest_list);
+    }
+
     #[test]
     fn test_serialize_manifest_list_v1() {
         let manifest_list:ManifestListV1 = ManifestList {
@@ -1157,6 +1588,7 @@ mod test {
                 deleted_rows_count: Some(0),
                 partitions: None,
                 key_metadata: None,
+                first_row_id: None,
             }]
         }.try_into().unwrap();
         let result = serde_json::to_string(&manifest_list).unwrap();
@@ -1187,6 +1619,7 @@ mod test {
                     vec![FieldSummary { contains_null: false, contains_nan: 
Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), 
upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
                 ),
                 key_metadata: None,
+                first_row_id: None,
             }]
         }.try_into().unwrap();
         let result = serde_json::to_string(&manifest_list).unwrap();
@@ -1196,6 +1629,37 @@ mod test {
         );
     }
 
+    #[test]
+    fn test_serialize_manifest_list_v3() {
+        let manifest_list: ManifestListV3 = ManifestList {
+            entries: vec![ManifestFile {
+                manifest_path: 
"s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
+                manifest_length: 6926,
+                partition_spec_id: 1,
+                content: ManifestContentType::Data,
+                sequence_number: 1,
+                min_sequence_number: 1,
+                added_snapshot_id: 377075049360453639,
+                added_files_count: Some(1),
+                existing_files_count: Some(0),
+                deleted_files_count: Some(0),
+                added_rows_count: Some(3),
+                existing_rows_count: Some(0),
+                deleted_rows_count: Some(0),
+                partitions: Some(
+                    vec![FieldSummary { contains_null: false, contains_nan: 
Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), 
upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
+                ),
+                key_metadata: None,
+                first_row_id: Some(10),
+            }]
+        }.try_into().unwrap();
+        let result = serde_json::to_string(&manifest_list).unwrap();
+        assert_eq!(
+            result,
+            
r#"[{"manifest_path":"s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro","manifest_length":6926,"partition_spec_id":1,"content":0,"sequence_number":1,"min_sequence_number":1,"added_snapshot_id":377075049360453639,"added_files_count":1,"existing_files_count":0,"deleted_files_count":0,"added_rows_count":3,"existing_rows_count":0,"deleted_rows_count":0,"partitions":[{"contains_null":false,"contains_nan":false,"lower_bound":[1,0,0,0,0,0,0,0],"uppe
 [...]
+        );
+    }
+
     #[tokio::test]
     async fn test_manifest_list_writer_v1() {
         let expected_manifest_list = ManifestList {
@@ -1217,6 +1681,7 @@ mod test {
                     vec![FieldSummary { contains_null: false, contains_nan: 
Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), 
upper_bound: Some(Datum::long(1).to_bytes().unwrap())}],
                 ),
                 key_metadata: None,
+                first_row_id: None,
             }]
         };
 
@@ -1263,6 +1728,7 @@ mod test {
                     vec![FieldSummary { contains_null: false, contains_nan: 
Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), 
upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
                 ),
                 key_metadata: None,
+                first_row_id: None,
             }]
         };
 
@@ -1287,6 +1753,56 @@ mod test {
         temp_dir.close().unwrap();
     }
 
+    #[tokio::test]
+    async fn test_manifest_list_writer_v3() {
+        let snapshot_id = 377075049360453639;
+        let seq_num = 1;
+        let mut expected_manifest_list = ManifestList {
+            entries: vec![ManifestFile {
+                manifest_path: 
"s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
+                manifest_length: 6926,
+                partition_spec_id: 1,
+                content: ManifestContentType::Data,
+                sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
+                min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
+                added_snapshot_id: snapshot_id,
+                added_files_count: Some(1),
+                existing_files_count: Some(0),
+                deleted_files_count: Some(0),
+                added_rows_count: Some(3),
+                existing_rows_count: Some(0),
+                deleted_rows_count: Some(0),
+                partitions: Some(
+                    vec![FieldSummary { contains_null: false, contains_nan: 
Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), 
upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
+                ),
+                key_metadata: None,
+                first_row_id: Some(10),
+            }]
+        };
+
+        let temp_dir = TempDir::new().unwrap();
+        let path = temp_dir.path().join("manifest_list_v2.avro");
+        let io = FileIOBuilder::new_fs_io().build().unwrap();
+        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
+
+        let mut writer =
+            ManifestListWriter::v3(output_file, snapshot_id, Some(0), seq_num, 
Some(10));
+        writer
+            .add_manifests(expected_manifest_list.entries.clone().into_iter())
+            .unwrap();
+        writer.close().await.unwrap();
+
+        let bs = fs::read(path).unwrap();
+        let manifest_list =
+            ManifestList::parse_with_version(&bs, 
crate::spec::FormatVersion::V3).unwrap();
+        expected_manifest_list.entries[0].sequence_number = seq_num;
+        expected_manifest_list.entries[0].min_sequence_number = seq_num;
+        expected_manifest_list.entries[0].first_row_id = Some(10);
+        assert_eq!(manifest_list, expected_manifest_list);
+
+        temp_dir.close().unwrap();
+    }
+
     #[tokio::test]
     async fn test_manifest_list_writer_v1_as_v2() {
         let expected_manifest_list = ManifestList {
@@ -1308,6 +1824,7 @@ mod test {
                     vec![FieldSummary { contains_null: false, contains_nan: 
Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), 
upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
                 ),
                 key_metadata: None,
+                first_row_id: None,
             }]
         };
 
@@ -1331,6 +1848,100 @@ mod test {
         temp_dir.close().unwrap();
     }
 
+    #[tokio::test]
+    async fn test_manifest_list_writer_v1_as_v3() {
+        let expected_manifest_list = ManifestList {
+            entries: vec![ManifestFile {
+                manifest_path: 
"/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
+                manifest_length: 5806,
+                partition_spec_id: 1,
+                content: ManifestContentType::Data,
+                sequence_number: 0,
+                min_sequence_number: 0,
+                added_snapshot_id: 1646658105718557341,
+                added_files_count: Some(3),
+                existing_files_count: Some(0),
+                deleted_files_count: Some(0),
+                added_rows_count: Some(3),
+                existing_rows_count: Some(0),
+                deleted_rows_count: Some(0),
+                partitions: Some(
+                    vec![FieldSummary { contains_null: false, contains_nan: 
Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), 
upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
+                ),
+                key_metadata: None,
+                first_row_id: None,
+            }]
+        };
+
+        let temp_dir = TempDir::new().unwrap();
+        let path = temp_dir.path().join("manifest_list_v1.avro");
+        let io = FileIOBuilder::new_fs_io().build().unwrap();
+        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
+
+        let mut writer = ManifestListWriter::v1(output_file, 
1646658105718557341, Some(0));
+        writer
+            .add_manifests(expected_manifest_list.entries.clone().into_iter())
+            .unwrap();
+        writer.close().await.unwrap();
+
+        let bs = fs::read(path).unwrap();
+
+        let manifest_list =
+            ManifestList::parse_with_version(&bs, 
crate::spec::FormatVersion::V3).unwrap();
+        assert_eq!(manifest_list, expected_manifest_list);
+
+        temp_dir.close().unwrap();
+    }
+
+    #[tokio::test]
+    async fn test_manifest_list_writer_v2_as_v3() {
+        let snapshot_id = 377075049360453639;
+        let seq_num = 1;
+        let mut expected_manifest_list = ManifestList {
+            entries: vec![ManifestFile {
+                manifest_path: 
"s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
+                manifest_length: 6926,
+                partition_spec_id: 1,
+                content: ManifestContentType::Data,
+                sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
+                min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
+                added_snapshot_id: snapshot_id,
+                added_files_count: Some(1),
+                existing_files_count: Some(0),
+                deleted_files_count: Some(0),
+                added_rows_count: Some(3),
+                existing_rows_count: Some(0),
+                deleted_rows_count: Some(0),
+                partitions: Some(
+                    vec![FieldSummary { contains_null: false, contains_nan: 
Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), 
upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
+                ),
+                key_metadata: None,
+                first_row_id: None,
+            }]
+        };
+
+        let temp_dir = TempDir::new().unwrap();
+        let path = temp_dir.path().join("manifest_list_v2.avro");
+        let io = FileIOBuilder::new_fs_io().build().unwrap();
+        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
+
+        let mut writer = ManifestListWriter::v2(output_file, snapshot_id, 
Some(0), seq_num);
+        writer
+            .add_manifests(expected_manifest_list.entries.clone().into_iter())
+            .unwrap();
+        writer.close().await.unwrap();
+
+        let bs = fs::read(path).unwrap();
+
+        let manifest_list =
+            ManifestList::parse_with_version(&bs, 
crate::spec::FormatVersion::V3).unwrap();
+        expected_manifest_list.entries[0].sequence_number = seq_num;
+        expected_manifest_list.entries[0].min_sequence_number = seq_num;
+        assert_eq!(manifest_list, expected_manifest_list);
+
+        temp_dir.close().unwrap();
+    }
+
     #[tokio::test]
     async fn test_manifest_list_v2_deserializer_aliases() {
         // reading avro manifest file generated by iceberg 1.4.0
diff --git a/crates/iceberg/src/spec/snapshot.rs 
b/crates/iceberg/src/spec/snapshot.rs
index 809bf099..5371cf68 100644
--- a/crates/iceberg/src/spec/snapshot.rs
+++ b/crates/iceberg/src/spec/snapshot.rs
@@ -21,7 +21,6 @@
 use std::collections::HashMap;
 use std::sync::Arc;
 
-use _serde::SnapshotV2;
 use chrono::{DateTime, Utc};
 use serde::{Deserialize, Serialize};
 use typed_builder::TypedBuilder;
@@ -82,33 +81,52 @@ impl Default for Operation {
     }
 }
 
-#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, TypedBuilder)]
-#[serde(from = "SnapshotV2", into = "SnapshotV2")]
+#[derive(Debug, PartialEq, Eq, Clone)]
+/// Row range of a snapshot, contains first_row_id and added_rows_count.
+pub struct SnapshotRowRange {
+    /// The first _row_id assigned to the first row in the first data file in 
the first manifest.
+    pub first_row_id: u64,
+    /// The upper bound of the number of rows with assigned row IDs
+    pub added_rows: u64,
+}
+
+#[derive(Debug, PartialEq, Eq, Clone, TypedBuilder)]
 #[builder(field_defaults(setter(prefix = "with_")))]
 /// A snapshot represents the state of a table at some time and is used to 
access the complete set of data files in the table.
 pub struct Snapshot {
     /// A unique long ID
-    snapshot_id: i64,
+    pub(crate) snapshot_id: i64,
     /// The snapshot ID of the snapshot’s parent.
     /// Omitted for any snapshot with no parent
     #[builder(default = None)]
-    parent_snapshot_id: Option<i64>,
+    pub(crate) parent_snapshot_id: Option<i64>,
     /// A monotonically increasing long that tracks the order of
     /// changes to a table.
-    sequence_number: i64,
+    pub(crate) sequence_number: i64,
     /// A timestamp when the snapshot was created, used for garbage
     /// collection and table inspection
-    timestamp_ms: i64,
+    pub(crate) timestamp_ms: i64,
     /// The location of a manifest list for this snapshot that
     /// tracks manifest files with additional metadata.
     /// Currently we only support manifest list file, and manifest files are 
not supported.
     #[builder(setter(into))]
-    manifest_list: String,
+    pub(crate) manifest_list: String,
     /// A string map that summarizes the snapshot changes, including operation.
-    summary: Summary,
+    pub(crate) summary: Summary,
     /// ID of the table’s current schema when the snapshot was created.
     #[builder(setter(strip_option(fallback = schema_id_opt)), default = None)]
-    schema_id: Option<SchemaId>,
+    pub(crate) schema_id: Option<SchemaId>,
+    /// Encryption Key ID
+    #[builder(default)]
+    pub(crate) encryption_key_id: Option<String>,
+    /// Row range of this snapshot, required when the table version supports 
row lineage.
+    /// Specify as a tuple of (first_row_id, added_rows_count)
+    #[builder(default, setter(!strip_option, transform = |first_row_id: u64, 
added_rows: u64| Some(SnapshotRowRange { first_row_id, added_rows })))]
+    // This is specified as a struct instead of two separate fields to ensure 
that both fields are either set or not set.
+    // The java implementations uses two separate fields, then sets 
`added_row_counts` to Null if `first_row_id` is set to Null.
+    // It throws an error if `added_row_counts` is set but `first_row_id` is 
not set, or if either of the two is negative.
+    // We handle all cases infallible using the rust type system.
+    pub(crate) row_range: Option<SnapshotRowRange>,
 }
 
 impl Snapshot {
@@ -205,6 +223,37 @@ impl Snapshot {
             snapshot_id: self.snapshot_id,
         }
     }
+
+    /// The row-id of the first newly added row in this snapshot. All rows 
added in this snapshot will
+    /// have a row-id assigned to them greater than this value. All rows with 
a row-id less than this
+    /// value were created in a snapshot that was added to the table (but not 
necessarily committed to
+    /// this branch) in the past.
+    ///
+    /// This field is optional but is required when the table version supports 
row lineage.
+    pub fn first_row_id(&self) -> Option<u64> {
+        self.row_range.as_ref().map(|r| r.first_row_id)
+    }
+
+    /// The total number of newly added rows in this snapshot. It should be 
the summation of {@link
+    /// ManifestFile#ADDED_ROWS_COUNT} for every manifest added in this 
snapshot.
+    ///
+    /// This field is optional but is required when the table version supports 
row lineage.
+    pub fn added_rows_count(&self) -> Option<u64> {
+        self.row_range.as_ref().map(|r| r.added_rows)
+    }
+
+    /// Returns the row range of this snapshot, if available.
+    /// This is a tuple containing (first_row_id, added_rows_count).
+    pub fn row_range(&self) -> Option<(u64, u64)> {
+        self.row_range
+            .as_ref()
+            .map(|r| (r.first_row_id, r.added_rows))
+    }
+
+    /// Get encryption key id, if available.
+    pub fn encryption_key_id(&self) -> Option<&str> {
+        self.encryption_key_id.as_deref()
+    }
 }
 
 pub(super) mod _serde {
@@ -219,6 +268,26 @@ pub(super) mod _serde {
     use super::{Operation, Snapshot, Summary};
     use crate::Error;
     use crate::spec::SchemaId;
+    use crate::spec::snapshot::SnapshotRowRange;
+
+    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+    #[serde(rename_all = "kebab-case")]
+    /// Defines the structure of a v2 snapshot for 
serialization/deserialization
+    pub(crate) struct SnapshotV3 {
+        pub snapshot_id: i64,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub parent_snapshot_id: Option<i64>,
+        pub sequence_number: i64,
+        pub timestamp_ms: i64,
+        pub manifest_list: String,
+        pub summary: Summary,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub schema_id: Option<SchemaId>,
+        pub first_row_id: u64,
+        pub added_rows: u64,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub key_id: Option<String>,
+    }
 
     #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
     #[serde(rename_all = "kebab-case")]
@@ -253,6 +322,51 @@ pub(super) mod _serde {
         pub schema_id: Option<SchemaId>,
     }
 
+    impl From<SnapshotV3> for Snapshot {
+        fn from(s: SnapshotV3) -> Self {
+            Snapshot {
+                snapshot_id: s.snapshot_id,
+                parent_snapshot_id: s.parent_snapshot_id,
+                sequence_number: s.sequence_number,
+                timestamp_ms: s.timestamp_ms,
+                manifest_list: s.manifest_list,
+                summary: s.summary,
+                schema_id: s.schema_id,
+                encryption_key_id: s.key_id,
+                row_range: Some(SnapshotRowRange {
+                    first_row_id: s.first_row_id,
+                    added_rows: s.added_rows,
+                }),
+            }
+        }
+    }
+
+    impl TryFrom<Snapshot> for SnapshotV3 {
+        type Error = Error;
+
+        fn try_from(s: Snapshot) -> Result<Self, Self::Error> {
+            let row_range = s.row_range.ok_or_else(|| {
+                Error::new(
+                    crate::ErrorKind::DataInvalid,
+                    "v3 Snapshots must have first-row-id and rows-added fields 
set.".to_string(),
+                )
+            })?;
+
+            Ok(SnapshotV3 {
+                snapshot_id: s.snapshot_id,
+                parent_snapshot_id: s.parent_snapshot_id,
+                sequence_number: s.sequence_number,
+                timestamp_ms: s.timestamp_ms,
+                manifest_list: s.manifest_list,
+                summary: s.summary,
+                schema_id: s.schema_id,
+                first_row_id: row_range.first_row_id,
+                added_rows: row_range.added_rows,
+                key_id: s.encryption_key_id,
+            })
+        }
+    }
+
     impl From<SnapshotV2> for Snapshot {
         fn from(v2: SnapshotV2) -> Self {
             Snapshot {
@@ -263,6 +377,8 @@ pub(super) mod _serde {
                 manifest_list: v2.manifest_list,
                 summary: v2.summary,
                 schema_id: v2.schema_id,
+                encryption_key_id: None,
+                row_range: None,
             }
         }
     }
@@ -300,6 +416,8 @@ pub(super) mod _serde {
                     additional_properties: HashMap::new(),
                 }),
                 schema_id: v1.schema_id,
+                encryption_key_id: None,
+                row_range: None,
             })
         }
     }
diff --git a/crates/iceberg/src/spec/snapshot_summary.rs 
b/crates/iceberg/src/spec/snapshot_summary.rs
index a9dd5699..4cd3715e 100644
--- a/crates/iceberg/src/spec/snapshot_summary.rs
+++ b/crates/iceberg/src/spec/snapshot_summary.rs
@@ -850,6 +850,7 @@ mod tests {
             deleted_rows_count: Some(50),
             partitions: Some(Vec::new()),
             key_metadata: None,
+            first_row_id: None,
         };
 
         collector
@@ -974,6 +975,7 @@ mod tests {
             deleted_rows_count: Some(0),
             partitions: Some(Vec::new()),
             key_metadata: None,
+            first_row_id: None,
         });
 
         summary_four.add_file(
diff --git a/crates/iceberg/src/spec/table_metadata.rs 
b/crates/iceberg/src/spec/table_metadata.rs
index a98dc4f4..437b0df5 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -38,6 +38,7 @@ use super::{
 };
 use crate::error::{Result, timestamp_ms_to_utc};
 use crate::io::FileIO;
+use crate::spec::EncryptedKey;
 use crate::{Error, ErrorKind};
 
 static MAIN_BRANCH: &str = "main";
@@ -46,6 +47,10 @@ pub(crate) static ONE_MINUTE_MS: i64 = 60_000;
 pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1;
 pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0;
 
+/// Initial row id for row lineage for new v3 tables and older tables 
upgrading to v3.
+pub const INITIAL_ROW_ID: u64 = 0;
+/// Minimum format version that supports row lineage (v3).
+pub const MIN_FORMAT_VERSION_ROW_LINEAGE: FormatVersion = FormatVersion::V3;
 /// Reference to [`TableMetadata`].
 pub type TableMetadataRef = Arc<TableMetadata>;
 
@@ -123,8 +128,10 @@ pub struct TableMetadata {
     pub(crate) statistics: HashMap<i64, StatisticsFile>,
     /// Mapping of snapshot ids to partition statistics files.
     pub(crate) partition_statistics: HashMap<i64, PartitionStatisticsFile>,
-    /// Encryption Keys
-    pub(crate) encryption_keys: HashMap<String, String>,
+    /// Encryption Keys - map of key id to the actual key
+    pub(crate) encryption_keys: HashMap<String, EncryptedKey>,
+    /// Next row id to be assigned for Row Lineage (v3)
+    pub(crate) next_row_id: u64,
 }
 
 impl TableMetadata {
@@ -398,16 +405,22 @@ impl TableMetadata {
 
     /// Iterate over all encryption keys
     #[inline]
-    pub fn encryption_keys_iter(&self) -> impl ExactSizeIterator<Item = 
(&String, &String)> {
-        self.encryption_keys.iter()
+    pub fn encryption_keys_iter(&self) -> impl ExactSizeIterator<Item = 
&EncryptedKey> {
+        self.encryption_keys.values()
     }
 
     /// Get the encryption key for a given key id
     #[inline]
-    pub fn encryption_key(&self, key_id: &str) -> Option<&String> {
+    pub fn encryption_key(&self, key_id: &str) -> Option<&EncryptedKey> {
         self.encryption_keys.get(key_id)
     }
 
+    /// Get the next row id to be assigned
+    #[inline]
+    pub fn next_row_id(&self) -> u64 {
+        self.next_row_id
+    }
+
     /// Read table metadata from the given location.
     pub async fn read_from(
         file_io: &FileIO,
@@ -673,16 +686,18 @@ pub(super) mod _serde {
         TableMetadata,
     };
     use crate::spec::schema::_serde::{SchemaV1, SchemaV2};
-    use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2};
+    use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2, SnapshotV3};
     use crate::spec::{
-        PartitionField, PartitionSpec, PartitionSpecRef, 
PartitionStatisticsFile, Schema,
-        SchemaRef, Snapshot, SnapshotReference, SnapshotRetention, SortOrder, 
StatisticsFile,
+        EncryptedKey, INITIAL_ROW_ID, PartitionField, PartitionSpec, 
PartitionSpecRef,
+        PartitionStatisticsFile, Schema, SchemaRef, Snapshot, 
SnapshotReference, SnapshotRetention,
+        SortOrder, StatisticsFile,
     };
     use crate::{Error, ErrorKind};
 
     #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
     #[serde(untagged)]
     pub(super) enum TableMetadataEnum {
+        V3(TableMetadataV3),
         V2(TableMetadataV2),
         V1(TableMetadataV1),
     }
@@ -690,8 +705,21 @@ pub(super) mod _serde {
     #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
     #[serde(rename_all = "kebab-case")]
     /// Defines the structure of a v2 table metadata for 
serialization/deserialization
-    pub(super) struct TableMetadataV2 {
-        pub format_version: VersionNumber<2>,
+    pub(super) struct TableMetadataV3 {
+        pub format_version: VersionNumber<3>,
+        #[serde(flatten)]
+        pub shared: TableMetadataV2V3Shared,
+        pub next_row_id: u64,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub encryption_keys: Option<Vec<EncryptedKey>>,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub snapshots: Option<Vec<SnapshotV3>>,
+    }
+
+    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+    #[serde(rename_all = "kebab-case")]
+    /// Defines the structure of a v2 table metadata for 
serialization/deserialization
+    pub(super) struct TableMetadataV2V3Shared {
         pub table_uuid: Uuid,
         pub location: String,
         pub last_sequence_number: i64,
@@ -707,8 +735,6 @@ pub(super) mod _serde {
         #[serde(skip_serializing_if = "Option::is_none")]
         pub current_snapshot_id: Option<i64>,
         #[serde(skip_serializing_if = "Option::is_none")]
-        pub snapshots: Option<Vec<SnapshotV2>>,
-        #[serde(skip_serializing_if = "Option::is_none")]
         pub snapshot_log: Option<Vec<SnapshotLog>>,
         #[serde(skip_serializing_if = "Option::is_none")]
         pub metadata_log: Option<Vec<MetadataLog>>,
@@ -722,6 +748,17 @@ pub(super) mod _serde {
         pub partition_statistics: Vec<PartitionStatisticsFile>,
     }
 
+    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+    #[serde(rename_all = "kebab-case")]
+    /// Defines the structure of a v2 table metadata for 
serialization/deserialization
+    pub(super) struct TableMetadataV2 {
+        pub format_version: VersionNumber<2>,
+        #[serde(flatten)]
+        pub shared: TableMetadataV2V3Shared,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub snapshots: Option<Vec<SnapshotV2>>,
+    }
+
     #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
     #[serde(rename_all = "kebab-case")]
     /// Defines the structure of a v1 table metadata for 
serialization/deserialization
@@ -802,6 +839,7 @@ pub(super) mod _serde {
         type Error = Error;
         fn try_from(value: TableMetadataEnum) -> Result<Self, Error> {
             match value {
+                TableMetadataEnum::V3(value) => value.try_into(),
                 TableMetadataEnum::V2(value) => value.try_into(),
                 TableMetadataEnum::V1(value) => value.try_into(),
             }
@@ -812,15 +850,136 @@ pub(super) mod _serde {
         type Error = Error;
         fn try_from(value: TableMetadata) -> Result<Self, Error> {
             Ok(match value.format_version {
+                FormatVersion::V3 => TableMetadataEnum::V3(value.try_into()?),
                 FormatVersion::V2 => TableMetadataEnum::V2(value.into()),
                 FormatVersion::V1 => TableMetadataEnum::V1(value.try_into()?),
             })
         }
     }
 
+    impl TryFrom<TableMetadataV3> for TableMetadata {
+        type Error = Error;
+        fn try_from(value: TableMetadataV3) -> Result<Self, self::Error> {
+            let TableMetadataV3 {
+                format_version: _,
+                shared: value,
+                next_row_id,
+                encryption_keys,
+                snapshots,
+            } = value;
+            let current_snapshot_id = if let &Some(-1) = 
&value.current_snapshot_id {
+                None
+            } else {
+                value.current_snapshot_id
+            };
+            let schemas = HashMap::from_iter(
+                value
+                    .schemas
+                    .into_iter()
+                    .map(|schema| Ok((schema.schema_id, 
Arc::new(schema.try_into()?))))
+                    .collect::<Result<Vec<_>, Error>>()?,
+            );
+
+            let current_schema: &SchemaRef =
+                schemas.get(&value.current_schema_id).ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "No schema exists with the current schema id {}.",
+                            value.current_schema_id
+                        ),
+                    )
+                })?;
+            let partition_specs = HashMap::from_iter(
+                value
+                    .partition_specs
+                    .into_iter()
+                    .map(|x| (x.spec_id(), Arc::new(x))),
+            );
+            let default_spec_id = value.default_spec_id;
+            let default_spec: PartitionSpecRef = partition_specs
+                .get(&value.default_spec_id)
+                .map(|spec| (**spec).clone())
+                .or_else(|| {
+                    (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
+                        .then(PartitionSpec::unpartition_spec)
+                })
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Default partition spec {default_spec_id} not 
found"),
+                    )
+                })?
+                .into();
+            let default_partition_type = 
default_spec.partition_type(current_schema)?;
+
+            let mut metadata = TableMetadata {
+                format_version: FormatVersion::V3,
+                table_uuid: value.table_uuid,
+                location: value.location,
+                last_sequence_number: value.last_sequence_number,
+                last_updated_ms: value.last_updated_ms,
+                last_column_id: value.last_column_id,
+                current_schema_id: value.current_schema_id,
+                schemas,
+                partition_specs,
+                default_partition_type,
+                default_spec,
+                last_partition_id: value.last_partition_id,
+                properties: value.properties.unwrap_or_default(),
+                current_snapshot_id,
+                snapshots: snapshots
+                    .map(|snapshots| {
+                        HashMap::from_iter(
+                            snapshots
+                                .into_iter()
+                                .map(|x| (x.snapshot_id, Arc::new(x.into()))),
+                        )
+                    })
+                    .unwrap_or_default(),
+                snapshot_log: value.snapshot_log.unwrap_or_default(),
+                metadata_log: value.metadata_log.unwrap_or_default(),
+                sort_orders: HashMap::from_iter(
+                    value
+                        .sort_orders
+                        .into_iter()
+                        .map(|x| (x.order_id, Arc::new(x))),
+                ),
+                default_sort_order_id: value.default_sort_order_id,
+                refs: value.refs.unwrap_or_else(|| {
+                    if let Some(snapshot_id) = current_snapshot_id {
+                        HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), 
SnapshotReference {
+                            snapshot_id,
+                            retention: SnapshotRetention::Branch {
+                                min_snapshots_to_keep: None,
+                                max_snapshot_age_ms: None,
+                                max_ref_age_ms: None,
+                            },
+                        })])
+                    } else {
+                        HashMap::new()
+                    }
+                }),
+                statistics: index_statistics(value.statistics),
+                partition_statistics: 
index_partition_statistics(value.partition_statistics),
+                encryption_keys: encryption_keys
+                    .map(|keys| {
+                        HashMap::from_iter(keys.into_iter().map(|key| 
(key.key_id.clone(), key)))
+                    })
+                    .unwrap_or_default(),
+                next_row_id,
+            };
+
+            metadata.borrow_mut().try_normalize()?;
+            Ok(metadata)
+        }
+    }
+
     impl TryFrom<TableMetadataV2> for TableMetadata {
         type Error = Error;
         fn try_from(value: TableMetadataV2) -> Result<Self, self::Error> {
+            let snapshots = value.snapshots;
+            let value = value.shared;
             let current_snapshot_id = if let &Some(-1) = 
&value.current_snapshot_id {
                 None
             } else {
@@ -882,8 +1041,7 @@ pub(super) mod _serde {
                 last_partition_id: value.last_partition_id,
                 properties: value.properties.unwrap_or_default(),
                 current_snapshot_id,
-                snapshots: value
-                    .snapshots
+                snapshots: snapshots
                     .map(|snapshots| {
                         HashMap::from_iter(
                             snapshots
@@ -918,6 +1076,7 @@ pub(super) mod _serde {
                 statistics: index_statistics(value.statistics),
                 partition_statistics: 
index_partition_statistics(value.partition_statistics),
                 encryption_keys: HashMap::new(),
+                next_row_id: INITIAL_ROW_ID,
             };
 
             metadata.borrow_mut().try_normalize()?;
@@ -1072,6 +1231,7 @@ pub(super) mod _serde {
                 statistics: index_statistics(value.statistics),
                 partition_statistics: 
index_partition_statistics(value.partition_statistics),
                 encryption_keys: HashMap::new(),
+                next_row_id: INITIAL_ROW_ID, // v1 has no row lineage
             };
 
             metadata.borrow_mut().try_normalize()?;
@@ -1079,10 +1239,63 @@ pub(super) mod _serde {
         }
     }
 
+    impl TryFrom<TableMetadata> for TableMetadataV3 {
+        type Error = Error;
+
+        fn try_from(mut v: TableMetadata) -> Result<Self, Self::Error> {
+            let next_row_id = v.next_row_id;
+            let encryption_keys = std::mem::take(&mut v.encryption_keys);
+            let snapshots = std::mem::take(&mut v.snapshots);
+            let shared = v.into();
+
+            Ok(TableMetadataV3 {
+                format_version: VersionNumber::<3>,
+                shared,
+                next_row_id,
+                encryption_keys: if encryption_keys.is_empty() {
+                    None
+                } else {
+                    Some(encryption_keys.into_values().collect())
+                },
+                snapshots: if snapshots.is_empty() {
+                    None
+                } else {
+                    Some(
+                        snapshots
+                            .into_values()
+                            .map(|s| 
SnapshotV3::try_from(Arc::unwrap_or_clone(s)))
+                            .collect::<Result<_, _>>()?,
+                    )
+                },
+            })
+        }
+    }
+
     impl From<TableMetadata> for TableMetadataV2 {
-        fn from(v: TableMetadata) -> Self {
+        fn from(mut v: TableMetadata) -> Self {
+            let snapshots = std::mem::take(&mut v.snapshots);
+            let shared = v.into();
+
             TableMetadataV2 {
                 format_version: VersionNumber::<2>,
+                shared,
+                snapshots: if snapshots.is_empty() {
+                    None
+                } else {
+                    Some(
+                        snapshots
+                            .into_values()
+                            .map(|s| SnapshotV2::from(Arc::unwrap_or_clone(s)))
+                            .collect(),
+                    )
+                },
+            }
+        }
+    }
+
+    impl From<TableMetadata> for TableMetadataV2V3Shared {
+        fn from(v: TableMetadata) -> Self {
+            TableMetadataV2V3Shared {
                 table_uuid: v.table_uuid,
                 location: v.location,
                 last_sequence_number: v.last_sequence_number,
@@ -1111,20 +1324,6 @@ pub(super) mod _serde {
                     Some(v.properties)
                 },
                 current_snapshot_id: v.current_snapshot_id,
-                snapshots: if v.snapshots.is_empty() {
-                    None
-                } else {
-                    Some(
-                        v.snapshots
-                            .into_values()
-                            .map(|x| {
-                                Arc::try_unwrap(x)
-                                    .unwrap_or_else(|snapshot| 
snapshot.as_ref().clone())
-                                    .into()
-                            })
-                            .collect(),
-                    )
-                },
                 snapshot_log: if v.snapshot_log.is_empty() {
                     None
                 } else {
@@ -1254,6 +1453,8 @@ pub enum FormatVersion {
     V1 = 1u8,
     /// Iceberg spec version 2
     V2 = 2u8,
+    /// Iceberg spec version 3
+    V3 = 3u8,
 }
 
 impl PartialOrd for FormatVersion {
@@ -1273,6 +1474,7 @@ impl Display for FormatVersion {
         match self {
             FormatVersion::V1 => write!(f, "v1"),
             FormatVersion::V2 => write!(f, "v2"),
+            FormatVersion::V3 => write!(f, "v3"),
         }
     }
 }
@@ -1317,6 +1519,7 @@ mod tests {
     use std::sync::Arc;
 
     use anyhow::Result;
+    use base64::Engine as _;
     use pretty_assertions::assert_eq;
     use tempfile::TempDir;
     use uuid::Uuid;
@@ -1326,9 +1529,10 @@ mod tests {
     use crate::io::FileIOBuilder;
     use crate::spec::table_metadata::TableMetadata;
     use crate::spec::{
-        BlobMetadata, NestedField, NullOrder, Operation, PartitionSpec, 
PartitionStatisticsFile,
-        PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, 
SortDirection,
-        SortField, SortOrder, StatisticsFile, Summary, Transform, Type, 
UnboundPartitionField,
+        BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField, 
NullOrder, Operation,
+        PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, 
PrimitiveType, Schema, Snapshot,
+        SnapshotReference, SnapshotRetention, SortDirection, SortField, 
SortOrder, StatisticsFile,
+        Summary, Transform, Type, UnboundPartitionField,
     };
 
     fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) {
@@ -1474,6 +1678,183 @@ mod tests {
             statistics: HashMap::new(),
             partition_statistics: HashMap::new(),
             encryption_keys: HashMap::new(),
+            next_row_id: INITIAL_ROW_ID,
+        };
+
+        let expected_json_value = serde_json::to_value(&expected).unwrap();
+        check_table_metadata_serde(data, expected);
+
+        let json_value = 
serde_json::from_str::<serde_json::Value>(data).unwrap();
+        assert_eq!(json_value, expected_json_value);
+    }
+
+    #[test]
+    fn test_table_data_v3() {
+        let data = r#"
+            {
+                "format-version" : 3,
+                "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
+                "location": "s3://b/wh/data.db/table",
+                "last-sequence-number" : 1,
+                "last-updated-ms": 1515100955770,
+                "last-column-id": 1,
+                "next-row-id": 5,
+                "schemas": [
+                    {
+                        "schema-id" : 1,
+                        "type" : "struct",
+                        "fields" :[
+                            {
+                                "id": 4,
+                                "name": "ts",
+                                "required": true,
+                                "type": "timestamp"
+                            }
+                        ]
+                    }
+                ],
+                "current-schema-id" : 1,
+                "partition-specs": [
+                    {
+                        "spec-id": 0,
+                        "fields": [
+                            {
+                                "source-id": 4,
+                                "field-id": 1000,
+                                "name": "ts_day",
+                                "transform": "day"
+                            }
+                        ]
+                    }
+                ],
+                "default-spec-id": 0,
+                "last-partition-id": 1000,
+                "properties": {
+                    "commit.retry.num-retries": "1"
+                },
+                "metadata-log": [
+                    {
+                        "metadata-file": "s3://bucket/.../v1.json",
+                        "timestamp-ms": 1515100
+                    }
+                ],
+                "refs": {},
+                "snapshots" : [ {
+                    "snapshot-id" : 1,
+                    "timestamp-ms" : 1662532818843,
+                    "sequence-number" : 0,
+                    "first-row-id" : 0,
+                    "added-rows" : 4,
+                    "key-id" : "key1",
+                    "summary" : {
+                        "operation" : "append"
+                    },
+                    "manifest-list" : 
"/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
+                    "schema-id" : 0
+                    }
+                ],
+                "encryption-keys": [
+                    {
+                        "key-id": "key1",
+                        "encrypted-by-id": "KMS",
+                        "encrypted-key-metadata": 
"c29tZS1lbmNyeXB0aW9uLWtleQ==",
+                        "properties": {
+                            "p1": "v1"
+                        }
+                    }
+                ],
+                "sort-orders": [
+                    {
+                    "order-id": 0,
+                    "fields": []
+                    }
+                ],
+                "default-sort-order-id": 0
+            }
+        "#;
+
+        let schema = Schema::builder()
+            .with_schema_id(1)
+            .with_fields(vec![Arc::new(NestedField::required(
+                4,
+                "ts",
+                Type::Primitive(PrimitiveType::Timestamp),
+            ))])
+            .build()
+            .unwrap();
+
+        let partition_spec = PartitionSpec::builder(schema.clone())
+            .with_spec_id(0)
+            .add_unbound_field(UnboundPartitionField {
+                name: "ts_day".to_string(),
+                transform: Transform::Day,
+                source_id: 4,
+                field_id: Some(1000),
+            })
+            .unwrap()
+            .build()
+            .unwrap();
+
+        let snapshot = Snapshot::builder()
+            .with_snapshot_id(1)
+            .with_timestamp_ms(1662532818843)
+            .with_sequence_number(0)
+            .with_row_range(0, 4)
+            .with_encryption_key_id(Some("key1".to_string()))
+            .with_summary(Summary {
+                operation: Operation::Append,
+                additional_properties: HashMap::new(),
+            })
+            
.with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string())
+            .with_schema_id(0)
+            .build();
+
+        let encryption_key = EncryptedKey::builder()
+            .key_id("key1".to_string())
+            .encrypted_by_id("KMS".to_string())
+            .encrypted_key_metadata(
+                base64::prelude::BASE64_STANDARD
+                    .decode("c29tZS1lbmNyeXB0aW9uLWtleQ==")
+                    .unwrap(),
+            )
+            .properties(HashMap::from_iter(vec![(
+                "p1".to_string(),
+                "v1".to_string(),
+            )]))
+            .build();
+
+        let default_partition_type = 
partition_spec.partition_type(&schema).unwrap();
+        let expected = TableMetadata {
+            format_version: FormatVersion::V3,
+            table_uuid: 
Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
+            location: "s3://b/wh/data.db/table".to_string(),
+            last_updated_ms: 1515100955770,
+            last_column_id: 1,
+            schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
+            current_schema_id: 1,
+            partition_specs: HashMap::from_iter(vec![(0, 
partition_spec.clone().into())]),
+            default_partition_type,
+            default_spec: partition_spec.into(),
+            last_partition_id: 1000,
+            default_sort_order_id: 0,
+            sort_orders: HashMap::from_iter(vec![(0, 
SortOrder::unsorted_order().into())]),
+            snapshots: HashMap::from_iter(vec![(1, snapshot.into())]),
+            current_snapshot_id: None,
+            last_sequence_number: 1,
+            properties: HashMap::from_iter(vec![(
+                "commit.retry.num-retries".to_string(),
+                "1".to_string(),
+            )]),
+            snapshot_log: Vec::new(),
+            metadata_log: vec![MetadataLog {
+                metadata_file: "s3://bucket/.../v1.json".to_string(),
+                timestamp_ms: 1515100,
+            }],
+            refs: HashMap::new(),
+            statistics: HashMap::new(),
+            partition_statistics: HashMap::new(),
+            encryption_keys: HashMap::from_iter(vec![("key1".to_string(), 
encryption_key)]),
+            next_row_id: 5,
         };
 
         let expected_json_value = serde_json::to_value(&expected).unwrap();
@@ -1650,6 +2031,7 @@ mod tests {
             statistics: HashMap::new(),
             partition_statistics: HashMap::new(),
             encryption_keys: HashMap::new(),
+            next_row_id: INITIAL_ROW_ID,
         };
 
         check_table_metadata_serde(data, expected);
@@ -1748,6 +2130,7 @@ mod tests {
             statistics: HashMap::new(),
             partition_statistics: HashMap::new(),
             encryption_keys: HashMap::new(),
+            next_row_id: INITIAL_ROW_ID,
         };
 
         let expected_json_value = serde_json::to_value(&expected).unwrap();
@@ -2282,6 +2665,7 @@ mod tests {
                 },
             })]),
             encryption_keys: HashMap::new(),
+            next_row_id: INITIAL_ROW_ID,
         };
 
         check_table_metadata_serde(data, expected);
@@ -2417,6 +2801,7 @@ mod tests {
                 },
             })]),
             encryption_keys: HashMap::new(),
+            next_row_id: INITIAL_ROW_ID,
         };
 
         check_table_metadata_serde(data, expected);
@@ -2445,6 +2830,95 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn test_table_metadata_v3_valid_minimal() {
+        let metadata_str =
+            
fs::read_to_string("testdata/table_metadata/TableMetadataV3ValidMinimal.json").unwrap();
+
+        let table_metadata = 
serde_json::from_str::<TableMetadata>(&metadata_str).unwrap();
+        assert_eq!(table_metadata.format_version, FormatVersion::V3);
+
+        let schema = Schema::builder()
+            .with_schema_id(0)
+            .with_fields(vec![
+                Arc::new(
+                    NestedField::required(1, "x", 
Type::Primitive(PrimitiveType::Long))
+                        
.with_initial_default(Literal::Primitive(PrimitiveLiteral::Long(1)))
+                        
.with_write_default(Literal::Primitive(PrimitiveLiteral::Long(1))),
+                ),
+                Arc::new(
+                    NestedField::required(2, "y", 
Type::Primitive(PrimitiveType::Long))
+                        .with_doc("comment"),
+                ),
+                Arc::new(NestedField::required(
+                    3,
+                    "z",
+                    Type::Primitive(PrimitiveType::Long),
+                )),
+            ])
+            .build()
+            .unwrap();
+
+        let partition_spec = PartitionSpec::builder(schema.clone())
+            .with_spec_id(0)
+            .add_unbound_field(UnboundPartitionField {
+                name: "x".to_string(),
+                transform: Transform::Identity,
+                source_id: 1,
+                field_id: Some(1000),
+            })
+            .unwrap()
+            .build()
+            .unwrap();
+
+        let sort_order = SortOrder::builder()
+            .with_order_id(3)
+            .with_sort_field(SortField {
+                source_id: 2,
+                transform: Transform::Identity,
+                direction: SortDirection::Ascending,
+                null_order: NullOrder::First,
+            })
+            .with_sort_field(SortField {
+                source_id: 3,
+                transform: Transform::Bucket(4),
+                direction: SortDirection::Descending,
+                null_order: NullOrder::Last,
+            })
+            .build_unbound()
+            .unwrap();
+
+        let default_partition_type = 
partition_spec.partition_type(&schema).unwrap();
+        let expected = TableMetadata {
+            format_version: FormatVersion::V3,
+            table_uuid: 
Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
+            location: "s3://bucket/test/location".to_string(),
+            last_updated_ms: 1602638573590,
+            last_column_id: 3,
+            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
+            current_schema_id: 0,
+            partition_specs: HashMap::from_iter(vec![(0, 
partition_spec.clone().into())]),
+            default_spec: Arc::new(partition_spec),
+            default_partition_type,
+            last_partition_id: 1000,
+            default_sort_order_id: 3,
+            sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
+            snapshots: HashMap::default(),
+            current_snapshot_id: None,
+            last_sequence_number: 34,
+            properties: HashMap::new(),
+            snapshot_log: Vec::new(),
+            metadata_log: Vec::new(),
+            refs: HashMap::new(),
+            statistics: HashMap::new(),
+            partition_statistics: HashMap::new(),
+            encryption_keys: HashMap::new(),
+            next_row_id: 0, // V3 specific field from the JSON
+        };
+
+        check_table_metadata_serde(&metadata_str, expected);
+    }
+
     #[test]
     fn test_table_metadata_v2_file_valid() {
         let metadata =
@@ -2579,6 +3053,7 @@ mod tests {
             statistics: HashMap::new(),
             partition_statistics: HashMap::new(),
             encryption_keys: HashMap::new(),
+            next_row_id: INITIAL_ROW_ID,
         };
 
         check_table_metadata_serde(&metadata, expected);
@@ -2664,6 +3139,7 @@ mod tests {
             statistics: HashMap::new(),
             partition_statistics: HashMap::new(),
             encryption_keys: HashMap::new(),
+            next_row_id: INITIAL_ROW_ID,
         };
 
         check_table_metadata_serde(&metadata, expected);
@@ -2733,6 +3209,7 @@ mod tests {
             statistics: HashMap::new(),
             partition_statistics: HashMap::new(),
             encryption_keys: HashMap::new(),
+            next_row_id: INITIAL_ROW_ID,
         };
 
         check_table_metadata_serde(&metadata, expected);
diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs 
b/crates/iceberg/src/spec/table_metadata_builder.rs
index 25af8c30..6b8ce1e6 100644
--- a/crates/iceberg/src/spec/table_metadata_builder.rs
+++ b/crates/iceberg/src/spec/table_metadata_builder.rs
@@ -28,6 +28,7 @@ use super::{
     UnboundPartitionSpec,
 };
 use crate::error::{Error, ErrorKind, Result};
+use crate::spec::{EncryptedKey, INITIAL_ROW_ID, 
MIN_FORMAT_VERSION_ROW_LINEAGE};
 use crate::{TableCreation, TableUpdate};
 
 const FIRST_FIELD_ID: u32 = 1;
@@ -120,6 +121,7 @@ impl TableMetadataBuilder {
                 statistics: HashMap::new(),
                 partition_statistics: HashMap::new(),
                 encryption_keys: HashMap::new(),
+                next_row_id: INITIAL_ROW_ID,
             },
             last_updated_ms: None,
             changes: vec![],
@@ -170,6 +172,7 @@ impl TableMetadataBuilder {
             partition_spec,
             sort_order,
             properties,
+            format_version,
         } = table_creation;
 
         let location = location.ok_or_else(|| {
@@ -188,7 +191,7 @@ impl TableMetadataBuilder {
             partition_spec,
             sort_order.unwrap_or(SortOrder::unsorted_order()),
             location,
-            FormatVersion::V2,
+            format_version,
             properties,
         )
     }
@@ -228,6 +231,11 @@ impl TableMetadataBuilder {
                     self.changes
                         .push(TableUpdate::UpgradeFormatVersion { 
format_version });
                 }
+                FormatVersion::V3 => {
+                    self.metadata.format_version = format_version;
+                    self.changes
+                        .push(TableUpdate::UpgradeFormatVersion { 
format_version });
+                }
             }
         }
 
@@ -329,6 +337,9 @@ impl TableMetadataBuilder {
     /// # Errors
     /// - Snapshot id already exists.
     /// - For format version > 1: the sequence number of the snapshot is lower 
than the highest sequence number specified so far.
+    /// - For format version >= 3: the first-row-id of the snapshot is lower 
than the next-row-id of the table.
+    /// - For format version >= 3: added-rows is null or first-row-id is null.
+    /// - For format version >= 3: next-row-id would overflow when adding 
added-rows.
     pub fn add_snapshot(mut self, snapshot: Snapshot) -> Result<Self> {
         if self
             .metadata
@@ -385,6 +396,43 @@ impl TableMetadataBuilder {
             ));
         }
 
+        let mut added_rows = None;
+        if self.metadata.format_version >= MIN_FORMAT_VERSION_ROW_LINEAGE {
+            if let Some((first_row_id, added_rows_count)) = 
snapshot.row_range() {
+                if first_row_id < self.metadata.next_row_id {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "Cannot add a snapshot, first-row-id is behind 
table next-row-id: {first_row_id} < {}",
+                            self.metadata.next_row_id
+                        ),
+                    ));
+                }
+
+                added_rows = Some(added_rows_count);
+            } else {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Cannot add a snapshot: first-row-id is null. 
first-row-id must be set for format version >= 
{MIN_FORMAT_VERSION_ROW_LINEAGE}",
+                    ),
+                ));
+            }
+        }
+
+        if let Some(added_rows) = added_rows {
+            self.metadata.next_row_id = self
+                .metadata
+                .next_row_id
+                .checked_add(added_rows)
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        "Cannot add snapshot: next-row-id overflowed when 
adding added-rows",
+                    )
+                })?;
+        }
+
         // Mutation happens in next line - must be infallible from here
         self.changes.push(TableUpdate::AddSnapshot {
             snapshot: snapshot.clone(),
@@ -1014,6 +1062,31 @@ impl TableMetadataBuilder {
             .set_default_sort_order(Self::LAST_ADDED as i64)
     }
 
+    /// Add an encryption key to the table metadata.
+    pub fn add_encryption_key(mut self, key: EncryptedKey) -> Self {
+        let key_id = key.key_id().to_string();
+        if self.metadata.encryption_keys.contains_key(&key_id) {
+            // already exists
+            return self;
+        }
+
+        self.metadata.encryption_keys.insert(key_id, key.clone());
+        self.changes.push(TableUpdate::AddEncryptionKey {
+            encryption_key: key,
+        });
+        self
+    }
+
+    /// Remove an encryption key from the table metadata.
+    pub fn remove_encryption_key(mut self, key_id: &str) -> Self {
+        if self.metadata.encryption_keys.remove(key_id).is_some() {
+            self.changes.push(TableUpdate::RemoveEncryptionKey {
+                key_id: key_id.to_string(),
+            });
+        }
+        self
+    }
+
     /// Build the table metadata.
     pub fn build(mut self) -> Result<TableMetadataBuildResult> {
         self.metadata.last_updated_ms = self
@@ -2988,4 +3061,382 @@ mod tests {
 
         assert!(result.is_ok());
     }
+
+    #[test]
+    fn test_row_lineage_addition() {
+        let new_rows = 30;
+        let base = builder_without_changes(FormatVersion::V3)
+            .build()
+            .unwrap()
+            .metadata;
+        let add_rows = Snapshot::builder()
+            .with_snapshot_id(0)
+            .with_timestamp_ms(base.last_updated_ms + 1)
+            .with_sequence_number(0)
+            .with_schema_id(0)
+            .with_manifest_list("foo")
+            .with_parent_snapshot_id(None)
+            .with_summary(Summary {
+                operation: Operation::Append,
+                additional_properties: HashMap::new(),
+            })
+            .with_row_range(base.next_row_id(), new_rows)
+            .build();
+
+        let first_addition = base
+            .into_builder(None)
+            .add_snapshot(add_rows.clone())
+            .unwrap()
+            .build()
+            .unwrap()
+            .metadata;
+
+        assert_eq!(first_addition.next_row_id(), new_rows);
+
+        let add_more_rows = Snapshot::builder()
+            .with_snapshot_id(1)
+            .with_timestamp_ms(first_addition.last_updated_ms + 1)
+            .with_sequence_number(1)
+            .with_schema_id(0)
+            .with_manifest_list("foo")
+            .with_parent_snapshot_id(Some(0))
+            .with_summary(Summary {
+                operation: Operation::Append,
+                additional_properties: HashMap::new(),
+            })
+            .with_row_range(first_addition.next_row_id(), new_rows)
+            .build();
+
+        let second_addition = first_addition
+            .into_builder(None)
+            .add_snapshot(add_more_rows)
+            .unwrap()
+            .build()
+            .unwrap()
+            .metadata;
+        assert_eq!(second_addition.next_row_id(), new_rows * 2);
+    }
+
+    #[test]
+    fn test_row_lineage_invalid_snapshot() {
+        let new_rows = 30;
+        let base = builder_without_changes(FormatVersion::V3)
+            .build()
+            .unwrap()
+            .metadata;
+
+        // add rows to check TableMetadata validation; Snapshot rejects 
negative next-row-id
+        let add_rows = Snapshot::builder()
+            .with_snapshot_id(0)
+            .with_timestamp_ms(base.last_updated_ms + 1)
+            .with_sequence_number(0)
+            .with_schema_id(0)
+            .with_manifest_list("foo")
+            .with_parent_snapshot_id(None)
+            .with_summary(Summary {
+                operation: Operation::Append,
+                additional_properties: HashMap::new(),
+            })
+            .with_row_range(base.next_row_id(), new_rows)
+            .build();
+
+        let added = base
+            .into_builder(None)
+            .add_snapshot(add_rows)
+            .unwrap()
+            .build()
+            .unwrap()
+            .metadata;
+
+        let invalid_new_rows = Snapshot::builder()
+            .with_snapshot_id(1)
+            .with_timestamp_ms(added.last_updated_ms + 1)
+            .with_sequence_number(1)
+            .with_schema_id(0)
+            .with_manifest_list("foo")
+            .with_parent_snapshot_id(Some(0))
+            .with_summary(Summary {
+                operation: Operation::Append,
+                additional_properties: HashMap::new(),
+            })
+            // first_row_id is behind table next_row_id
+            .with_row_range(added.next_row_id() - 1, 10)
+            .build();
+
+        let err = added
+            .into_builder(None)
+            .add_snapshot(invalid_new_rows)
+            .unwrap_err();
+        assert!(
+            err.to_string().contains(
+                "Cannot add a snapshot, first-row-id is behind table 
next-row-id: 29 < 30"
+            )
+        );
+    }
+
+    #[test]
+    fn test_row_lineage_append_branch() {
+        // Appends to a branch should still change last-row-id even if not on 
main, these changes
+        // should also affect commits to main
+
+        let branch = "some_branch";
+
+        // Start with V3 metadata to support row lineage
+        let base = builder_without_changes(FormatVersion::V3)
+            .build()
+            .unwrap()
+            .metadata;
+
+        // Initial next_row_id should be 0
+        assert_eq!(base.next_row_id(), 0);
+
+        // Write to Branch - append 30 rows
+        let branch_snapshot_1 = Snapshot::builder()
+            .with_snapshot_id(1)
+            .with_timestamp_ms(base.last_updated_ms + 1)
+            .with_sequence_number(0)
+            .with_schema_id(0)
+            .with_manifest_list("foo")
+            .with_parent_snapshot_id(None)
+            .with_summary(Summary {
+                operation: Operation::Append,
+                additional_properties: HashMap::new(),
+            })
+            .with_row_range(base.next_row_id(), 30)
+            .build();
+
+        let table_after_branch_1 = base
+            .into_builder(None)
+            .set_branch_snapshot(branch_snapshot_1.clone(), branch)
+            .unwrap()
+            .build()
+            .unwrap()
+            .metadata;
+
+        // Current snapshot should be null (no main branch snapshot yet)
+        assert!(table_after_branch_1.current_snapshot().is_none());
+
+        // Branch snapshot should have first_row_id = 0
+        let branch_ref = table_after_branch_1.refs.get(branch).unwrap();
+        let branch_snap_1 = table_after_branch_1
+            .snapshots
+            .get(&branch_ref.snapshot_id)
+            .unwrap();
+        assert_eq!(branch_snap_1.first_row_id(), Some(0));
+
+        // Next row id should be 30
+        assert_eq!(table_after_branch_1.next_row_id(), 30);
+
+        // Write to Main - append 28 rows
+        let main_snapshot = Snapshot::builder()
+            .with_snapshot_id(2)
+            .with_timestamp_ms(table_after_branch_1.last_updated_ms + 1)
+            .with_sequence_number(1)
+            .with_schema_id(0)
+            .with_manifest_list("bar")
+            .with_parent_snapshot_id(None)
+            .with_summary(Summary {
+                operation: Operation::Append,
+                additional_properties: HashMap::new(),
+            })
+            .with_row_range(table_after_branch_1.next_row_id(), 28)
+            .build();
+
+        let table_after_main = table_after_branch_1
+            .into_builder(None)
+            .add_snapshot(main_snapshot.clone())
+            .unwrap()
+            .set_ref(MAIN_BRANCH, SnapshotReference {
+                snapshot_id: main_snapshot.snapshot_id(),
+                retention: SnapshotRetention::Branch {
+                    min_snapshots_to_keep: None,
+                    max_snapshot_age_ms: None,
+                    max_ref_age_ms: None,
+                },
+            })
+            .unwrap()
+            .build()
+            .unwrap()
+            .metadata;
+
+        // Main snapshot should have first_row_id = 30
+        let current_snapshot = table_after_main.current_snapshot().unwrap();
+        assert_eq!(current_snapshot.first_row_id(), Some(30));
+
+        // Next row id should be 58 (30 + 28)
+        assert_eq!(table_after_main.next_row_id(), 58);
+
+        // Write again to branch - append 21 rows
+        let branch_snapshot_2 = Snapshot::builder()
+            .with_snapshot_id(3)
+            .with_timestamp_ms(table_after_main.last_updated_ms + 1)
+            .with_sequence_number(2)
+            .with_schema_id(0)
+            .with_manifest_list("baz")
+            .with_parent_snapshot_id(Some(branch_snapshot_1.snapshot_id()))
+            .with_summary(Summary {
+                operation: Operation::Append,
+                additional_properties: HashMap::new(),
+            })
+            .with_row_range(table_after_main.next_row_id(), 21)
+            .build();
+
+        let table_after_branch_2 = table_after_main
+            .into_builder(None)
+            .set_branch_snapshot(branch_snapshot_2.clone(), branch)
+            .unwrap()
+            .build()
+            .unwrap()
+            .metadata;
+
+        // Branch snapshot should have first_row_id = 58 (30 + 28)
+        let branch_ref_2 = table_after_branch_2.refs.get(branch).unwrap();
+        let branch_snap_2 = table_after_branch_2
+            .snapshots
+            .get(&branch_ref_2.snapshot_id)
+            .unwrap();
+        assert_eq!(branch_snap_2.first_row_id(), Some(58));
+
+        // Next row id should be 79 (30 + 28 + 21)
+        assert_eq!(table_after_branch_2.next_row_id(), 79);
+    }
+
+    #[test]
+    fn test_encryption_keys() {
+        let builder = builder_without_changes(FormatVersion::V2);
+
+        // Create test encryption keys
+        let encryption_key_1 = EncryptedKey::builder()
+            .key_id("key-1")
+            .encrypted_key_metadata(vec![1, 2, 3, 4])
+            .encrypted_by_id("encryption-service-1")
+            .properties(HashMap::from_iter(vec![(
+                "algorithm".to_string(),
+                "AES-256".to_string(),
+            )]))
+            .build();
+
+        let encryption_key_2 = EncryptedKey::builder()
+            .key_id("key-2")
+            .encrypted_key_metadata(vec![5, 6, 7, 8])
+            .encrypted_by_id("encryption-service-2")
+            .properties(HashMap::new())
+            .build();
+
+        // Add first encryption key
+        let build_result = builder
+            .add_encryption_key(encryption_key_1.clone())
+            .build()
+            .unwrap();
+
+        assert_eq!(build_result.changes.len(), 1);
+        assert_eq!(build_result.metadata.encryption_keys.len(), 1);
+        assert_eq!(
+            build_result.metadata.encryption_key("key-1"),
+            Some(&encryption_key_1)
+        );
+        assert_eq!(build_result.changes[0], TableUpdate::AddEncryptionKey {
+            encryption_key: encryption_key_1.clone()
+        });
+
+        // Add second encryption key
+        let build_result = build_result
+            .metadata
+            .into_builder(Some(
+                
"s3://bucket/test/location/metadata/metadata1.json".to_string(),
+            ))
+            .add_encryption_key(encryption_key_2.clone())
+            .build()
+            .unwrap();
+
+        assert_eq!(build_result.changes.len(), 1);
+        assert_eq!(build_result.metadata.encryption_keys.len(), 2);
+        assert_eq!(
+            build_result.metadata.encryption_key("key-1"),
+            Some(&encryption_key_1)
+        );
+        assert_eq!(
+            build_result.metadata.encryption_key("key-2"),
+            Some(&encryption_key_2)
+        );
+        assert_eq!(build_result.changes[0], TableUpdate::AddEncryptionKey {
+            encryption_key: encryption_key_2.clone()
+        });
+
+        // Try to add duplicate key - should not create a change
+        let build_result = build_result
+            .metadata
+            .into_builder(Some(
+                
"s3://bucket/test/location/metadata/metadata2.json".to_string(),
+            ))
+            .add_encryption_key(encryption_key_1.clone())
+            .build()
+            .unwrap();
+
+        assert_eq!(build_result.changes.len(), 0);
+        assert_eq!(build_result.metadata.encryption_keys.len(), 2);
+
+        // Remove first encryption key
+        let build_result = build_result
+            .metadata
+            .into_builder(Some(
+                
"s3://bucket/test/location/metadata/metadata3.json".to_string(),
+            ))
+            .remove_encryption_key("key-1")
+            .build()
+            .unwrap();
+
+        assert_eq!(build_result.changes.len(), 1);
+        assert_eq!(build_result.metadata.encryption_keys.len(), 1);
+        assert_eq!(build_result.metadata.encryption_key("key-1"), None);
+        assert_eq!(
+            build_result.metadata.encryption_key("key-2"),
+            Some(&encryption_key_2)
+        );
+        assert_eq!(build_result.changes[0], TableUpdate::RemoveEncryptionKey {
+            key_id: "key-1".to_string()
+        });
+
+        // Try to remove non-existent key - should not create a change
+        let build_result = build_result
+            .metadata
+            .into_builder(Some(
+                
"s3://bucket/test/location/metadata/metadata4.json".to_string(),
+            ))
+            .remove_encryption_key("non-existent-key")
+            .build()
+            .unwrap();
+
+        assert_eq!(build_result.changes.len(), 0);
+        assert_eq!(build_result.metadata.encryption_keys.len(), 1);
+
+        // Test encryption_keys_iter()
+        let keys = build_result
+            .metadata
+            .encryption_keys_iter()
+            .collect::<Vec<_>>();
+        assert_eq!(keys.len(), 1);
+        assert_eq!(keys[0], &encryption_key_2);
+
+        // Remove last encryption key
+        let build_result = build_result
+            .metadata
+            .into_builder(Some(
+                
"s3://bucket/test/location/metadata/metadata5.json".to_string(),
+            ))
+            .remove_encryption_key("key-2")
+            .build()
+            .unwrap();
+
+        assert_eq!(build_result.changes.len(), 1);
+        assert_eq!(build_result.metadata.encryption_keys.len(), 0);
+        assert_eq!(build_result.metadata.encryption_key("key-2"), None);
+        assert_eq!(build_result.changes[0], TableUpdate::RemoveEncryptionKey {
+            key_id: "key-2".to_string()
+        });
+
+        // Verify empty encryption_keys_iter()
+        let keys = build_result.metadata.encryption_keys_iter();
+        assert_eq!(keys.len(), 0);
+    }
 }
diff --git a/crates/iceberg/src/transaction/mod.rs 
b/crates/iceberg/src/transaction/mod.rs
index 26bd6522..4116264a 100644
--- a/crates/iceberg/src/transaction/mod.rs
+++ b/crates/iceberg/src/transaction/mod.rs
@@ -243,7 +243,7 @@ mod tests {
     use crate::spec::TableMetadata;
     use crate::table::Table;
     use crate::transaction::{ApplyTransactionAction, Transaction};
-    use crate::{Error, ErrorKind, TableIdent};
+    use crate::{Catalog, Error, ErrorKind, TableCreation, TableIdent};
 
     pub fn make_v1_table() -> Table {
         let file = File::open(format!(
@@ -302,8 +302,41 @@ mod tests {
             .unwrap()
     }
 
+    pub(crate) async fn make_v3_minimal_table_in_catalog(catalog: &impl 
Catalog) -> Table {
+        let table_ident =
+            TableIdent::from_strs([format!("ns1-{}", uuid::Uuid::new_v4()), 
"test1".to_string()])
+                .unwrap();
+
+        catalog
+            .create_namespace(table_ident.namespace(), HashMap::new())
+            .await
+            .unwrap();
+
+        let file = File::open(format!(
+            "{}/testdata/table_metadata/{}",
+            env!("CARGO_MANIFEST_DIR"),
+            "TableMetadataV3ValidMinimal.json"
+        ))
+        .unwrap();
+        let reader = BufReader::new(file);
+        let base_metadata = serde_json::from_reader::<_, 
TableMetadata>(reader).unwrap();
+
+        let table_creation = TableCreation::builder()
+            .schema((**base_metadata.current_schema()).clone())
+            .partition_spec((**base_metadata.default_partition_spec()).clone())
+            .sort_order((**base_metadata.default_sort_order()).clone())
+            .name(table_ident.name().to_string())
+            .format_version(crate::spec::FormatVersion::V3)
+            .build();
+
+        catalog
+            .create_table(table_ident.namespace(), table_creation)
+            .await
+            .unwrap()
+    }
+
     /// Helper function to create a test table with retry properties
-    fn setup_test_table(num_retries: &str) -> Table {
+    pub(super) fn setup_test_table(num_retries: &str) -> Table {
         let table = make_v2_table();
 
         // Set retry properties
@@ -469,3 +502,88 @@ mod tests {
         }
     }
 }
+
+#[cfg(test)]
+mod test_row_lineage {
+    use crate::memory::tests::new_memory_catalog;
+    use crate::spec::{
+        DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal, 
Struct,
+    };
+    use crate::transaction::tests::make_v3_minimal_table_in_catalog;
+    use crate::transaction::{ApplyTransactionAction, Transaction};
+
+    #[tokio::test]
+    async fn test_fast_append_with_row_lineage() {
+        // Helper function to create a data file with specified number of rows
+        fn file_with_rows(record_count: u64) -> DataFile {
+            DataFileBuilder::default()
+                .content(DataContentType::Data)
+                .file_path(format!("test/{}.parquet", record_count))
+                .file_format(DataFileFormat::Parquet)
+                .file_size_in_bytes(100)
+                .record_count(record_count)
+                .partition(Struct::from_iter([Some(Literal::long(0))]))
+                .partition_spec_id(0)
+                .build()
+                .unwrap()
+        }
+        let catalog = new_memory_catalog().await;
+
+        let table = make_v3_minimal_table_in_catalog(&catalog).await;
+
+        // Check initial state - next_row_id should be 0
+        assert_eq!(table.metadata().next_row_id(), 0);
+
+        // First fast append with 30 rows
+        let tx = Transaction::new(&table);
+        let data_file_30 = file_with_rows(30);
+        let action = tx.fast_append().add_data_files(vec![data_file_30]);
+        let tx = action.apply(tx).unwrap();
+        let table = tx.commit(&catalog).await.unwrap();
+
+        // Check snapshot and table state after first append
+        let snapshot = table.metadata().current_snapshot().unwrap();
+        assert_eq!(snapshot.first_row_id(), Some(0));
+        assert_eq!(table.metadata().next_row_id(), 30);
+
+        // Check written manifest for first_row_id
+        let manifest_list = table
+            .metadata()
+            .current_snapshot()
+            .unwrap()
+            .load_manifest_list(table.file_io(), table.metadata())
+            .await
+            .unwrap();
+
+        assert_eq!(manifest_list.entries().len(), 1);
+        let manifest_file = &manifest_list.entries()[0];
+        assert_eq!(manifest_file.first_row_id, Some(0));
+
+        // Second fast append with 17 and 11 rows
+        let tx = Transaction::new(&table);
+        let data_file_17 = file_with_rows(17);
+        let data_file_11 = file_with_rows(11);
+        let action = tx
+            .fast_append()
+            .add_data_files(vec![data_file_17, data_file_11]);
+        let tx = action.apply(tx).unwrap();
+        let table = tx.commit(&catalog).await.unwrap();
+
+        // Check snapshot and table state after second append
+        let snapshot = table.metadata().current_snapshot().unwrap();
+        assert_eq!(snapshot.first_row_id(), Some(30));
+        assert_eq!(table.metadata().next_row_id(), 30 + 17 + 11);
+
+        // Check written manifest for first_row_id
+        let manifest_list = table
+            .metadata()
+            .current_snapshot()
+            .unwrap()
+            .load_manifest_list(table.file_io(), table.metadata())
+            .await
+            .unwrap();
+        assert_eq!(manifest_list.entries().len(), 2);
+        let manifest_file = &manifest_list.entries()[1];
+        assert_eq!(manifest_file.first_row_id, Some(30));
+    }
+}
diff --git a/crates/iceberg/src/transaction/snapshot.rs 
b/crates/iceberg/src/transaction/snapshot.rs
index 93dd819d..4f85962f 100644
--- a/crates/iceberg/src/transaction/snapshot.rs
+++ b/crates/iceberg/src/transaction/snapshot.rs
@@ -206,13 +206,16 @@ impl<'a> SnapshotProducer<'a> {
                 .as_ref()
                 .clone(),
         );
-        if self.table.metadata().format_version() == FormatVersion::V1 {
-            Ok(builder.build_v1())
-        } else {
-            match content {
+        match self.table.metadata().format_version() {
+            FormatVersion::V1 => Ok(builder.build_v1()),
+            FormatVersion::V2 => match content {
                 ManifestContentType::Data => Ok(builder.build_v2_data()),
                 ManifestContentType::Deletes => Ok(builder.build_v2_deletes()),
-            }
+            },
+            FormatVersion::V3 => match content {
+                ManifestContentType::Data => Ok(builder.build_v3_data()),
+                ManifestContentType::Deletes => Ok(builder.build_v3_deletes()),
+            },
         }
     }
 
@@ -382,6 +385,7 @@ impl<'a> SnapshotProducer<'a> {
     ) -> Result<ActionCommit> {
         let manifest_list_path = self.generate_manifest_list_file_path(0);
         let next_seq_num = self.table.metadata().next_sequence_number();
+        let first_row_id = self.table.metadata().next_row_id();
         let mut manifest_list_writer = match 
self.table.metadata().format_version() {
             FormatVersion::V1 => ManifestListWriter::v1(
                 self.table
@@ -398,6 +402,15 @@ impl<'a> SnapshotProducer<'a> {
                 self.table.metadata().current_snapshot_id(),
                 next_seq_num,
             ),
+            FormatVersion::V3 => ManifestListWriter::v3(
+                self.table
+                    .file_io()
+                    .new_output(manifest_list_path.clone())?,
+                self.snapshot_id,
+                self.table.metadata().current_snapshot_id(),
+                next_seq_num,
+                Some(first_row_id),
+            ),
         };
 
         // Calling self.summary() before self.manifest_file() is important 
because self.added_data_files
@@ -412,6 +425,7 @@ impl<'a> SnapshotProducer<'a> {
             .await?;
 
         manifest_list_writer.add_manifests(new_manifests.into_iter())?;
+        let writer_next_row_id = manifest_list_writer.next_row_id();
         manifest_list_writer.close().await?;
 
         let commit_ts = chrono::Utc::now().timestamp_millis();
@@ -422,8 +436,16 @@ impl<'a> SnapshotProducer<'a> {
             .with_sequence_number(next_seq_num)
             .with_summary(summary)
             .with_schema_id(self.table.metadata().current_schema_id())
-            .with_timestamp_ms(commit_ts)
-            .build();
+            .with_timestamp_ms(commit_ts);
+
+        let new_snapshot = if let Some(writer_next_row_id) = 
writer_next_row_id {
+            let assigned_rows = writer_next_row_id - 
self.table.metadata().next_row_id();
+            new_snapshot
+                .with_row_range(first_row_id, assigned_rows)
+                .build()
+        } else {
+            new_snapshot.build()
+        };
 
         let updates = vec![
             TableUpdate::AddSnapshot {
diff --git a/crates/iceberg/src/writer/file_writer/location_generator.rs 
b/crates/iceberg/src/writer/file_writer/location_generator.rs
index 4cfc2784..a5cfc282 100644
--- a/crates/iceberg/src/writer/file_writer/location_generator.rs
+++ b/crates/iceberg/src/writer/file_writer/location_generator.rs
@@ -183,6 +183,7 @@ pub(crate) mod test {
             statistics: HashMap::new(),
             partition_statistics: HashMap::new(),
             encryption_keys: HashMap::new(),
+            next_row_id: 0,
         };
 
         let file_name_generator = super::DefaultFileNameGenerator::new(
@@ -297,6 +298,7 @@ pub(crate) mod test {
             statistics: HashMap::new(),
             partition_statistics: HashMap::new(),
             encryption_keys: HashMap::new(),
+            next_row_id: 0,
         };
 
         // Test with DefaultLocationGenerator
diff --git 
a/crates/iceberg/testdata/table_metadata/TableMetadataV3ValidMinimal.json 
b/crates/iceberg/testdata/table_metadata/TableMetadataV3ValidMinimal.json
new file mode 100644
index 00000000..bf85114c
--- /dev/null
+++ b/crates/iceberg/testdata/table_metadata/TableMetadataV3ValidMinimal.json
@@ -0,0 +1,74 @@
+{
+  "format-version": 3,
+  "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
+  "location": "s3://bucket/test/location",
+  "last-sequence-number": 34,
+  "last-updated-ms": 1602638573590,
+  "last-column-id": 3,
+  "current-schema-id": 0,
+  "next-row-id": 0,
+  "schemas": [
+    {
+      "type": "struct",
+      "schema-id": 0,
+      "fields": [
+        {
+          "id": 1,
+          "name": "x",
+          "required": true,
+          "type": "long",
+          "initial-default": 1,
+          "write-default": 1
+        },
+        {
+          "id": 2,
+          "name": "y",
+          "required": true,
+          "type": "long",
+          "doc": "comment"
+        },
+        {
+          "id": 3,
+          "name": "z",
+          "required": true,
+          "type": "long"
+        }
+      ]
+    }
+  ],
+  "default-spec-id": 0,
+  "partition-specs": [
+    {
+      "spec-id": 0,
+      "fields": [
+        {
+          "name": "x",
+          "transform": "identity",
+          "source-id": 1,
+          "field-id": 1000
+        }
+      ]
+    }
+  ],
+  "last-partition-id": 1000,
+  "default-sort-order-id": 3,
+  "sort-orders": [
+    {
+      "order-id": 3,
+      "fields": [
+        {
+          "transform": "identity",
+          "source-id": 2,
+          "direction": "asc",
+          "null-order": "nulls-first"
+        },
+        {
+          "transform": "bucket[4]",
+          "source-id": 3,
+          "direction": "desc",
+          "null-order": "nulls-last"
+        }
+      ]
+    }
+  ]
+}
\ No newline at end of file

Reply via email to