This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch dv-support
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git

commit 9546a72269df10aa78c2f44aec674d54dd79bd72
Author: luoyuxia <[email protected]>
AuthorDate: Thu Mar 19 17:07:30 2026 +0800

    [dv] deletion vector support
---
 Cargo.toml                                         |   2 +-
 bindings/python/Cargo.toml                         |   6 +-
 crates/fluss/Cargo.toml                            |   2 +-
 crates/fluss/src/client/admin.rs                   |  33 +++-
 crates/fluss/src/client/table/scanner.rs           |  21 ++-
 crates/fluss/src/proto/fluss_api.proto             |  40 ++++-
 crates/fluss/src/record/arrow.rs                   | 191 +++++++++++++++++++--
 crates/fluss/src/rpc/api_key.rs                    |   4 +
 .../fluss/src/rpc/message/get_dv_for_union_read.rs |  58 +++++++
 crates/fluss/src/rpc/message/get_lake_snapshot.rs  |  82 +++++++++
 crates/fluss/src/rpc/message/mod.rs                |   4 +
 11 files changed, 410 insertions(+), 33 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 5848977..26cce30 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -35,7 +35,7 @@ members = ["crates/fluss", "crates/examples", 
"bindings/python", "bindings/cpp"]
 fluss = { package = "fluss-rs", version = "0.2.0", path = "crates/fluss", 
features = ["storage-all"] }
 tokio = { version = "1.44.2", features = ["full"] }
 clap = { version = "4.5.37", features = ["derive"] }
-arrow = { version = "57.0.0", features = ["ipc_compression"] }
+arrow = { version = "57.1.0", features = ["ipc_compression"] }
 
 bigdecimal = "0.4"
 serde = { version = "1.0", features = ["derive"] }
diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml
index 30ac046..d1ac803 100644
--- a/bindings/python/Cargo.toml
+++ b/bindings/python/Cargo.toml
@@ -31,9 +31,9 @@ pyo3 = { version = "0.26.0", features = ["extension-module", 
"generate-import-li
 fluss = { workspace = true, features = ["storage-all"] }
 tokio = { workspace = true }
 arrow = { workspace = true }
-arrow-pyarrow = "57.0.0"
-arrow-schema = "57.0.0"
-arrow-array = "57.0.0"
+arrow-pyarrow = "57.1.0"
+arrow-schema = "57.1.0"
+arrow-array = "57.1.0"
 pyo3-async-runtimes = { version = "0.26.0", features = ["tokio-runtime"] }
 jiff = { workspace = true }
 bigdecimal = "0.4"
diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index db1348a..7a1f604 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -42,7 +42,7 @@ integration_tests = []
 
 [dependencies]
 arrow = { workspace = true }
-arrow-schema = "57.0.0"
+arrow-schema = "57.1.0"
 bitvec = "1"
 byteorder = "1.5"
 futures = "0.3"
diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs
index 7a79e5e..ae4739e 100644
--- a/crates/fluss/src/client/admin.rs
+++ b/crates/fluss/src/client/admin.rs
@@ -24,7 +24,7 @@ use crate::metadata::{
 use crate::rpc::message::{
     CreateDatabaseRequest, CreatePartitionRequest, CreateTableRequest, 
DatabaseExistsRequest,
     DropDatabaseRequest, DropPartitionRequest, DropTableRequest, 
GetDatabaseInfoRequest,
-    GetLatestLakeSnapshotRequest, GetTableRequest, ListDatabasesRequest, 
ListPartitionInfosRequest,
+    GetLakeSnapshotRequest, GetTableRequest, ListDatabasesRequest, 
ListPartitionInfosRequest,
     ListTablesRequest, TableExistsRequest,
 };
 use crate::rpc::message::{ListOffsetsRequest, OffsetSpec};
@@ -279,10 +279,37 @@ impl FlussAdmin {
     pub async fn get_latest_lake_snapshot(&self, table_path: &TablePath) -> 
Result<LakeSnapshot> {
         let response = self
             .admin_gateway
-            .request(GetLatestLakeSnapshotRequest::new(table_path))
+            .request(GetLakeSnapshotRequest::latest(table_path))
             .await?;
 
-        // Convert proto response to LakeSnapshot
+        Self::to_lake_snapshot(response)
+    }
+
+    /// Get the latest readable lake snapshot for a table.
+    pub async fn get_readable_lake_snapshot(&self, table_path: &TablePath) -> 
Result<LakeSnapshot> {
+        let response = self
+            .admin_gateway
+            .request(GetLakeSnapshotRequest::readable(table_path))
+            .await?;
+
+        Self::to_lake_snapshot(response)
+    }
+
+    /// Get a specific lake snapshot by snapshot id.
+    pub async fn get_lake_snapshot(
+        &self,
+        table_path: &TablePath,
+        snapshot_id: i64,
+    ) -> Result<LakeSnapshot> {
+        let response = self
+            .admin_gateway
+            .request(GetLakeSnapshotRequest::by_id(table_path, snapshot_id))
+            .await?;
+
+        Self::to_lake_snapshot(response)
+    }
+
+    fn to_lake_snapshot(response: crate::proto::GetLakeSnapshotResponse) -> 
Result<LakeSnapshot> {
         let mut table_buckets_offset = HashMap::new();
         for bucket_snapshot in response.bucket_snapshots {
             let table_bucket = TableBucket::new_with_partition(
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index e837ba7..a666d3a 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -1676,12 +1676,6 @@ impl BucketScanStatus {
 }
 
 fn validate_scan_support(table_path: &TablePath, table_info: &TableInfo) -> 
Result<()> {
-    if table_info.schema.primary_key().is_some() {
-        return Err(UnsupportedOperation {
-            message: format!("Table {table_path} is not a Log Table and 
doesn't support scan."),
-        });
-    }
-
     let log_format = table_info.table_config.get_log_format()?;
     if LogFormat::ARROW != log_format {
         return Err(UnsupportedOperation {
@@ -1960,18 +1954,23 @@ mod tests {
 
     #[test]
     fn test_validate_scan_support() {
-        // Primary key table
+        // Primary key table with ARROW format — should succeed
         let (table_info, table_path) = create_test_table_info(true, 
Some("ARROW"));
         let result = validate_scan_support(&table_path, &table_info);
+        assert!(result.is_ok());
 
+        // Primary key table with INDEXED format — should fail
+        let (table_info, table_path) = create_test_table_info(true, 
Some("INDEXED"));
+        let result = validate_scan_support(&table_path, &table_info);
         assert!(result.is_err());
         let err = result.unwrap_err();
         assert!(matches!(err, UnsupportedOperation { .. }));
-        assert!(err.to_string().contains(
-            format!("Table {table_path} is not a Log Table and doesn't support 
scan.").as_str()
-        ));
+        assert!(
+            err.to_string()
+                .contains("Scan is only supported for ARROW format")
+        );
 
-        // Indexed format
+        // Indexed format (log table)
         let (table_info, table_path) = create_test_table_info(false, 
Some("INDEXED"));
         let result = validate_scan_support(&table_path, &table_info);
 
diff --git a/crates/fluss/src/proto/fluss_api.proto 
b/crates/fluss/src/proto/fluss_api.proto
index 1c7ee7e..52a5d80 100644
--- a/crates/fluss/src/proto/fluss_api.proto
+++ b/crates/fluss/src/proto/fluss_api.proto
@@ -327,6 +327,18 @@ message GetLatestLakeSnapshotResponse {
   repeated PbLakeSnapshotForBucket bucket_snapshots = 3;
 }
 
+message GetLakeSnapshotRequest {
+  required PbTablePath table_path = 1;
+  optional int64 snapshot_id = 2;
+  optional bool readable = 3;
+}
+
+message GetLakeSnapshotResponse {
+  required int64 table_id = 1;
+  required int64 snapshotId = 2;
+  repeated PbLakeSnapshotForBucket bucket_snapshots = 3;
+}
+
 message PbLakeSnapshotForBucket {
   optional int64 partition_id = 1;
   required int32 bucket_id = 2;
@@ -417,4 +429,30 @@ message AuthenticateRequest {
 
 message AuthenticateResponse {
   optional bytes challenge = 1;
-}
\ No newline at end of file
+}
+
+message PbLakeDvEntry {
+  required string file_path = 1;
+  required bytes del_bitmap = 2;
+}
+
+message GetDvForUnionReadRequest {
+  required int64 table_id = 1;
+  optional int64 partition_id = 2;
+  required int32 bucket_id = 3;
+  required int64 requested_snapshot_id = 4;
+  repeated string data_files = 5;
+}
+
+message GetDvForUnionReadResponse {
+  repeated PbLakeDvEntry lake_dv = 1;
+  repeated PbLogDvEntry log_dv = 2;
+  required int64 log_end_offset = 3;
+  optional int64 current_readable_snapshot = 4;
+  optional bool is_stale = 5;
+}
+
+message PbLogDvEntry {
+  required int64 base_offset = 1;
+  required bytes del_bits = 2;
+}
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index ea27836..5b92887 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -890,21 +890,54 @@ impl LogRecordBatch {
         LittleEndian::read_i32(&self.data[offset..offset + 
RECORDS_COUNT_LENGTH])
     }
 
+    pub fn is_append_only(&self) -> bool {
+        self.attributes() & 1 == 1
+    }
+
     pub fn records(&self, read_context: &ReadContext) -> 
Result<LogRecordIterator> {
         if self.record_count() == 0 {
             return Ok(LogRecordIterator::empty());
         }
 
-        let data = &self.data[RECORDS_OFFSET..];
+        let record_count = self.record_count() as usize;
+        let (arrow_data, change_types) = if self.is_append_only() {
+            let data = &self.data[RECORDS_OFFSET..];
+            let ct = vec![ChangeType::AppendOnly; record_count];
+            (data, ct)
+        } else {
+            let ct_start = RECORDS_OFFSET;
+            let ct_end = ct_start + record_count;
+            if self.data.len() < ct_end {
+                return Err(Error::UnexpectedError {
+                    message: format!(
+                        "Corrupt log record batch: data length {} too short 
for {} change type bytes",
+                        self.data.len(),
+                        record_count
+                    ),
+                    source: None,
+                });
+            }
+            let ct_bytes = &self.data[ct_start..ct_end];
+            let ct: std::result::Result<Vec<ChangeType>, String> = ct_bytes
+                .iter()
+                .map(|&b| ChangeType::from_byte_value(b))
+                .collect();
+            let ct = ct.map_err(|e| Error::UnexpectedError {
+                message: format!("Invalid change type in record batch: {e}"),
+                source: None,
+            })?;
+            let data = &self.data[ct_end..];
+            (data, ct)
+        };
 
-        let record_batch = read_context.record_batch(data)?;
+        let record_batch = read_context.record_batch(arrow_data)?;
         let arrow_reader = ArrowReader::new(Arc::new(record_batch));
         let log_record_iterator = 
LogRecordIterator::Arrow(ArrowLogRecordIterator {
             reader: arrow_reader,
             base_offset: self.base_log_offset(),
             timestamp: self.commit_timestamp(),
             row_id: 0,
-            change_type: ChangeType::AppendOnly,
+            change_types,
         });
 
         Ok(log_record_iterator)
@@ -915,9 +948,38 @@ impl LogRecordBatch {
             return Ok(LogRecordIterator::empty());
         }
 
-        let data = &self.data[RECORDS_OFFSET..];
+        let record_count = self.record_count() as usize;
+        let (arrow_data, change_types) = if self.is_append_only() {
+            let data = &self.data[RECORDS_OFFSET..];
+            let ct = vec![ChangeType::AppendOnly; record_count];
+            (data, ct)
+        } else {
+            let ct_start = RECORDS_OFFSET;
+            let ct_end = ct_start + record_count;
+            if self.data.len() < ct_end {
+                return Err(Error::UnexpectedError {
+                    message: format!(
+                        "Corrupt log record batch: data length {} too short 
for {} change type bytes",
+                        self.data.len(),
+                        record_count
+                    ),
+                    source: None,
+                });
+            }
+            let ct_bytes = &self.data[ct_start..ct_end];
+            let ct: std::result::Result<Vec<ChangeType>, String> = ct_bytes
+                .iter()
+                .map(|&b| ChangeType::from_byte_value(b))
+                .collect();
+            let ct = ct.map_err(|e| Error::UnexpectedError {
+                message: format!("Invalid change type in record batch: {e}"),
+                source: None,
+            })?;
+            let data = &self.data[ct_end..];
+            (data, ct)
+        };
 
-        let record_batch = read_context.record_batch_for_remote_log(data)?;
+        let record_batch = 
read_context.record_batch_for_remote_log(arrow_data)?;
         let log_record_iterator = match record_batch {
             None => LogRecordIterator::empty(),
             Some(record_batch) => {
@@ -927,7 +989,7 @@ impl LogRecordBatch {
                     base_offset: self.base_log_offset(),
                     timestamp: self.commit_timestamp(),
                     row_id: 0,
-                    change_type: ChangeType::AppendOnly,
+                    change_types,
                 })
             }
         };
@@ -943,14 +1005,20 @@ impl LogRecordBatch {
             return 
Ok(RecordBatch::new_empty(read_context.target_schema.clone()));
         }
 
+        let arrow_start = if self.is_append_only() {
+            RECORDS_OFFSET
+        } else {
+            RECORDS_OFFSET + self.record_count() as usize
+        };
+
         let data = self
             .data
-            .get(RECORDS_OFFSET..)
+            .get(arrow_start..)
             .ok_or_else(|| Error::UnexpectedError {
                 message: format!(
-                    "Corrupt log record batch: data length {} is less than 
RECORDS_OFFSET {}",
+                    "Corrupt log record batch: data length {} is less than 
arrow data offset {}",
                     self.data.len(),
-                    RECORDS_OFFSET
+                    arrow_start
                 ),
                 source: None,
             })?;
@@ -1407,18 +1475,23 @@ pub struct ArrowLogRecordIterator {
     base_offset: i64,
     timestamp: i64,
     row_id: usize,
-    change_type: ChangeType,
+    change_types: Vec<ChangeType>,
 }
 
 #[allow(dead_code)]
 impl ArrowLogRecordIterator {
-    fn new(reader: ArrowReader, base_offset: i64, timestamp: i64, change_type: 
ChangeType) -> Self {
+    fn new(
+        reader: ArrowReader,
+        base_offset: i64,
+        timestamp: i64,
+        change_types: Vec<ChangeType>,
+    ) -> Self {
         Self {
             reader,
             base_offset,
             timestamp,
             row_id: 0,
-            change_type,
+            change_types,
         }
     }
 }
@@ -1431,12 +1504,17 @@ impl Iterator for ArrowLogRecordIterator {
             return None;
         }
 
+        let change_type = self
+            .change_types
+            .get(self.row_id)
+            .copied()
+            .unwrap_or(ChangeType::AppendOnly);
         let columnar_row = self.reader.read(self.row_id);
         let scan_record = ScanRecord::new(
             columnar_row,
             self.base_offset + self.row_id as i64,
             self.timestamp,
-            self.change_type,
+            change_type,
         );
         self.row_id += 1;
         Some(scan_record)
@@ -2012,4 +2090,91 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_change_type_byte_parsing_in_non_append_only_batch() -> Result<()> {
+        use crate::compression::{
+            ArrowCompressionInfo, ArrowCompressionType, 
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
+        };
+        use crate::metadata::{PhysicalTablePath, TablePath};
+        use crate::row::GenericRow;
+
+        let row_type = RowType::new(vec![
+            DataField::new("id", DataTypes::int(), None),
+            DataField::new("name", DataTypes::string(), None),
+        ]);
+        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+        let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1));
+        let physical_table_path = 
Arc::new(PhysicalTablePath::of(Arc::new(table_path)));
+
+        let mut builder = MemoryLogRecordsArrowBuilder::new(
+            1,
+            &row_type,
+            false,
+            ArrowCompressionInfo {
+                compression_type: ArrowCompressionType::None,
+                compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
+            },
+        )?;
+
+        for (id, name) in [(1i32, "Alice"), (2, "Bob"), (3, "Charlie")] {
+            let mut row = GenericRow::new(2);
+            row.set_field(0, id);
+            row.set_field(1, name);
+            let record = WriteRecord::for_append(
+                Arc::clone(&table_info),
+                physical_table_path.clone(),
+                1,
+                &row,
+            );
+            builder.append(&record)?;
+        }
+
+        let append_only_bytes = builder.build()?;
+        let arrow_ipc_data = &append_only_bytes[RECORDS_OFFSET..];
+
+        let record_count: usize = 3;
+        let change_type_bytes: Vec<u8> = vec![
+            ChangeType::Insert.to_byte_value(),
+            ChangeType::UpdateBefore.to_byte_value(),
+            ChangeType::UpdateAfter.to_byte_value(),
+        ];
+
+        let total_len = RECORDS_OFFSET + change_type_bytes.len() + 
arrow_ipc_data.len();
+        let mut batch_bytes = vec![0u8; total_len];
+        
batch_bytes[..RECORDS_OFFSET].copy_from_slice(&append_only_bytes[..RECORDS_OFFSET]);
+        batch_bytes[ATTRIBUTES_OFFSET] = 0;
+        batch_bytes[RECORDS_OFFSET..RECORDS_OFFSET + record_count]
+            .copy_from_slice(&change_type_bytes);
+        batch_bytes[RECORDS_OFFSET + 
record_count..].copy_from_slice(arrow_ipc_data);
+
+        LittleEndian::write_i32(
+            &mut batch_bytes[LENGTH_OFFSET..LENGTH_OFFSET + LENGTH_LENGTH],
+            (total_len - BASE_OFFSET_LENGTH - LENGTH_LENGTH) as i32,
+        );
+        let crc = crc32c(&batch_bytes[SCHEMA_ID_OFFSET..]);
+        LittleEndian::write_u32(&mut batch_bytes[CRC_OFFSET..CRC_OFFSET + 
CRC_LENGTH], crc);
+
+        let batch = LogRecordBatch::new(Bytes::from(batch_bytes));
+        assert!(!batch.is_append_only());
+        assert_eq!(batch.record_count(), 3);
+
+        let arrow_schema = to_arrow_schema(&row_type)?;
+        let read_context = ReadContext::new(arrow_schema, false);
+        let records: Vec<ScanRecord> = batch.records(&read_context)?.collect();
+
+        assert_eq!(records.len(), 3);
+        assert_eq!(*records[0].change_type(), ChangeType::Insert);
+        assert_eq!(*records[1].change_type(), ChangeType::UpdateBefore);
+        assert_eq!(*records[2].change_type(), ChangeType::UpdateAfter);
+        assert_eq!(records[0].row().get_int(0)?, 1);
+        assert_eq!(records[0].row().get_string(1)?, "Alice");
+        assert_eq!(records[1].row().get_int(0)?, 2);
+        assert_eq!(records[1].row().get_string(1)?, "Bob");
+        assert_eq!(records[2].row().get_int(0)?, 3);
+        assert_eq!(records[2].row().get_string(1)?, "Charlie");
+
+        Ok(())
+    }
+
 }
diff --git a/crates/fluss/src/rpc/api_key.rs b/crates/fluss/src/rpc/api_key.rs
index 4231fb0..344c1c3 100644
--- a/crates/fluss/src/rpc/api_key.rs
+++ b/crates/fluss/src/rpc/api_key.rs
@@ -38,6 +38,7 @@ pub enum ApiKey {
     GetFileSystemSecurityToken,
     GetDatabaseInfo,
     GetLatestLakeSnapshot,
+    GetDvForUnionRead,
     CreatePartition,
     DropPartition,
     Authenticate,
@@ -65,6 +66,7 @@ impl From<i16> for ApiKey {
             1021 => ApiKey::ListOffsets,
             1025 => ApiKey::GetFileSystemSecurityToken,
             1032 => ApiKey::GetLatestLakeSnapshot,
+            1064 => ApiKey::GetDvForUnionRead,
             1035 => ApiKey::GetDatabaseInfo,
             1036 => ApiKey::CreatePartition,
             1037 => ApiKey::DropPartition,
@@ -95,6 +97,7 @@ impl From<ApiKey> for i16 {
             ApiKey::ListOffsets => 1021,
             ApiKey::GetFileSystemSecurityToken => 1025,
             ApiKey::GetLatestLakeSnapshot => 1032,
+            ApiKey::GetDvForUnionRead => 1064,
             ApiKey::GetDatabaseInfo => 1035,
             ApiKey::CreatePartition => 1036,
             ApiKey::DropPartition => 1037,
@@ -129,6 +132,7 @@ mod tests {
             (1021, ApiKey::ListOffsets),
             (1025, ApiKey::GetFileSystemSecurityToken),
             (1032, ApiKey::GetLatestLakeSnapshot),
+            (1064, ApiKey::GetDvForUnionRead),
             (1035, ApiKey::GetDatabaseInfo),
             (1036, ApiKey::CreatePartition),
             (1037, ApiKey::DropPartition),
diff --git a/crates/fluss/src/rpc/message/get_dv_for_union_read.rs 
b/crates/fluss/src/rpc/message/get_dv_for_union_read.rs
new file mode 100644
index 0000000..a8bb2ae
--- /dev/null
+++ b/crates/fluss/src/rpc/message/get_dv_for_union_read.rs
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::TableBucket;
+use crate::proto;
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::{impl_read_version_type, impl_write_version_type};
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct GetDvForUnionReadRequest {
+    pub inner_request: proto::GetDvForUnionReadRequest,
+}
+
+impl GetDvForUnionReadRequest {
+    pub fn new(
+        table_bucket: &TableBucket,
+        requested_snapshot_id: i64,
+        data_files: Vec<String>,
+    ) -> Self {
+        Self {
+            inner_request: proto::GetDvForUnionReadRequest {
+                table_id: table_bucket.table_id(),
+                partition_id: table_bucket.partition_id(),
+                bucket_id: table_bucket.bucket_id(),
+                requested_snapshot_id,
+                data_files,
+            },
+        }
+    }
+}
+
+impl RequestBody for GetDvForUnionReadRequest {
+    type ResponseBody = proto::GetDvForUnionReadResponse;
+    const API_KEY: ApiKey = ApiKey::GetDvForUnionRead;
+    const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(GetDvForUnionReadRequest);
+impl_read_version_type!(proto::GetDvForUnionReadResponse);
diff --git a/crates/fluss/src/rpc/message/get_lake_snapshot.rs 
b/crates/fluss/src/rpc/message/get_lake_snapshot.rs
new file mode 100644
index 0000000..203e982
--- /dev/null
+++ b/crates/fluss/src/rpc/message/get_lake_snapshot.rs
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::TablePath;
+use crate::proto;
+use crate::proto::PbTablePath;
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::{impl_read_version_type, impl_write_version_type};
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct GetLakeSnapshotRequest {
+    pub inner_request: proto::GetLakeSnapshotRequest,
+}
+
+impl GetLakeSnapshotRequest {
+    pub fn latest(table_path: &TablePath) -> Self {
+        Self {
+            inner_request: proto::GetLakeSnapshotRequest {
+                table_path: PbTablePath {
+                    database_name: table_path.database().to_string(),
+                    table_name: table_path.table().to_string(),
+                },
+                snapshot_id: None,
+                readable: None,
+            },
+        }
+    }
+
+    pub fn readable(table_path: &TablePath) -> Self {
+        Self {
+            inner_request: proto::GetLakeSnapshotRequest {
+                table_path: PbTablePath {
+                    database_name: table_path.database().to_string(),
+                    table_name: table_path.table().to_string(),
+                },
+                snapshot_id: None,
+                readable: Some(true),
+            },
+        }
+    }
+
+    pub fn by_id(table_path: &TablePath, snapshot_id: i64) -> Self {
+        Self {
+            inner_request: proto::GetLakeSnapshotRequest {
+                table_path: PbTablePath {
+                    database_name: table_path.database().to_string(),
+                    table_name: table_path.table().to_string(),
+                },
+                snapshot_id: Some(snapshot_id),
+                readable: None,
+            },
+        }
+    }
+}
+
+impl RequestBody for GetLakeSnapshotRequest {
+    type ResponseBody = proto::GetLakeSnapshotResponse;
+    const API_KEY: ApiKey = ApiKey::GetLatestLakeSnapshot;
+    const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(GetLakeSnapshotRequest);
+impl_read_version_type!(proto::GetLakeSnapshotResponse);
diff --git a/crates/fluss/src/rpc/message/mod.rs 
b/crates/fluss/src/rpc/message/mod.rs
index 9ad4545..f563e89 100644
--- a/crates/fluss/src/rpc/message/mod.rs
+++ b/crates/fluss/src/rpc/message/mod.rs
@@ -30,6 +30,8 @@ mod drop_partition;
 mod drop_table;
 mod fetch;
 mod get_database_info;
+mod get_dv_for_union_read;
+mod get_lake_snapshot;
 mod get_latest_lake_snapshot;
 mod get_security_token;
 mod get_table;
@@ -55,6 +57,8 @@ pub use drop_partition::*;
 pub use drop_table::*;
 pub use fetch::*;
 pub use get_database_info::*;
+pub use get_dv_for_union_read::*;
+pub use get_lake_snapshot::*;
 pub use get_latest_lake_snapshot::*;
 pub use get_security_token::*;
 pub use get_table::*;

Reply via email to