This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch dv-support in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
commit 9546a72269df10aa78c2f44aec674d54dd79bd72 Author: luoyuxia <[email protected]> AuthorDate: Thu Mar 19 17:07:30 2026 +0800 [dv] deletion vector support --- Cargo.toml | 2 +- bindings/python/Cargo.toml | 6 +- crates/fluss/Cargo.toml | 2 +- crates/fluss/src/client/admin.rs | 33 +++- crates/fluss/src/client/table/scanner.rs | 21 ++- crates/fluss/src/proto/fluss_api.proto | 40 ++++- crates/fluss/src/record/arrow.rs | 191 +++++++++++++++++++-- crates/fluss/src/rpc/api_key.rs | 4 + .../fluss/src/rpc/message/get_dv_for_union_read.rs | 58 +++++++ crates/fluss/src/rpc/message/get_lake_snapshot.rs | 82 +++++++++ crates/fluss/src/rpc/message/mod.rs | 4 + 11 files changed, 410 insertions(+), 33 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5848977..26cce30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,7 @@ members = ["crates/fluss", "crates/examples", "bindings/python", "bindings/cpp"] fluss = { package = "fluss-rs", version = "0.2.0", path = "crates/fluss", features = ["storage-all"] } tokio = { version = "1.44.2", features = ["full"] } clap = { version = "4.5.37", features = ["derive"] } -arrow = { version = "57.0.0", features = ["ipc_compression"] } +arrow = { version = "57.1.0", features = ["ipc_compression"] } bigdecimal = "0.4" serde = { version = "1.0", features = ["derive"] } diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index 30ac046..d1ac803 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -31,9 +31,9 @@ pyo3 = { version = "0.26.0", features = ["extension-module", "generate-import-li fluss = { workspace = true, features = ["storage-all"] } tokio = { workspace = true } arrow = { workspace = true } -arrow-pyarrow = "57.0.0" -arrow-schema = "57.0.0" -arrow-array = "57.0.0" +arrow-pyarrow = "57.1.0" +arrow-schema = "57.1.0" +arrow-array = "57.1.0" pyo3-async-runtimes = { version = "0.26.0", features = ["tokio-runtime"] } jiff = { workspace = true } bigdecimal = "0.4" diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml index db1348a..7a1f604 100644 --- a/crates/fluss/Cargo.toml +++ b/crates/fluss/Cargo.toml @@ -42,7 +42,7 @@ integration_tests = [] [dependencies] arrow = { workspace = true } -arrow-schema = "57.0.0" +arrow-schema = "57.1.0" bitvec = "1" byteorder = "1.5" futures = "0.3" diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index 7a79e5e..ae4739e 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -24,7 +24,7 @@ use crate::metadata::{ use crate::rpc::message::{ CreateDatabaseRequest, CreatePartitionRequest, CreateTableRequest, DatabaseExistsRequest, DropDatabaseRequest, DropPartitionRequest, DropTableRequest, GetDatabaseInfoRequest, - GetLatestLakeSnapshotRequest, GetTableRequest, ListDatabasesRequest, ListPartitionInfosRequest, + GetLakeSnapshotRequest, GetTableRequest, ListDatabasesRequest, ListPartitionInfosRequest, ListTablesRequest, TableExistsRequest, }; use crate::rpc::message::{ListOffsetsRequest, OffsetSpec}; @@ -279,10 +279,37 @@ impl FlussAdmin { pub async fn get_latest_lake_snapshot(&self, table_path: &TablePath) -> Result<LakeSnapshot> { let response = self .admin_gateway - .request(GetLatestLakeSnapshotRequest::new(table_path)) + .request(GetLakeSnapshotRequest::latest(table_path)) .await?; - // Convert proto response to LakeSnapshot + Self::to_lake_snapshot(response) + } + + /// Get the latest readable lake snapshot for a table. + pub async fn get_readable_lake_snapshot(&self, table_path: &TablePath) -> Result<LakeSnapshot> { + let response = self + .admin_gateway + .request(GetLakeSnapshotRequest::readable(table_path)) + .await?; + + Self::to_lake_snapshot(response) + } + + /// Get a specific lake snapshot by snapshot id. + pub async fn get_lake_snapshot( + &self, + table_path: &TablePath, + snapshot_id: i64, + ) -> Result<LakeSnapshot> { + let response = self + .admin_gateway + .request(GetLakeSnapshotRequest::by_id(table_path, snapshot_id)) + .await?; + + Self::to_lake_snapshot(response) + } + + fn to_lake_snapshot(response: crate::proto::GetLakeSnapshotResponse) -> Result<LakeSnapshot> { let mut table_buckets_offset = HashMap::new(); for bucket_snapshot in response.bucket_snapshots { let table_bucket = TableBucket::new_with_partition( diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index e837ba7..a666d3a 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -1676,12 +1676,6 @@ impl BucketScanStatus { } fn validate_scan_support(table_path: &TablePath, table_info: &TableInfo) -> Result<()> { - if table_info.schema.primary_key().is_some() { - return Err(UnsupportedOperation { - message: format!("Table {table_path} is not a Log Table and doesn't support scan."), - }); - } - let log_format = table_info.table_config.get_log_format()?; if LogFormat::ARROW != log_format { return Err(UnsupportedOperation { @@ -1960,18 +1954,23 @@ mod tests { #[test] fn test_validate_scan_support() { - // Primary key table + // Primary key table with ARROW format — should succeed let (table_info, table_path) = create_test_table_info(true, Some("ARROW")); let result = validate_scan_support(&table_path, &table_info); + assert!(result.is_ok()); + // Primary key table with INDEXED format — should fail + let (table_info, table_path) = create_test_table_info(true, Some("INDEXED")); + let result = validate_scan_support(&table_path, &table_info); assert!(result.is_err()); let err = result.unwrap_err(); assert!(matches!(err, UnsupportedOperation { .. })); - assert!(err.to_string().contains( - format!("Table {table_path} is not a Log Table and doesn't support scan.").as_str() - )); + assert!( + err.to_string() + .contains("Scan is only supported for ARROW format") + ); - // Indexed format + // Indexed format (log table) let (table_info, table_path) = create_test_table_info(false, Some("INDEXED")); let result = validate_scan_support(&table_path, &table_info); diff --git a/crates/fluss/src/proto/fluss_api.proto b/crates/fluss/src/proto/fluss_api.proto index 1c7ee7e..52a5d80 100644 --- a/crates/fluss/src/proto/fluss_api.proto +++ b/crates/fluss/src/proto/fluss_api.proto @@ -327,6 +327,18 @@ message GetLatestLakeSnapshotResponse { repeated PbLakeSnapshotForBucket bucket_snapshots = 3; } +message GetLakeSnapshotRequest { + required PbTablePath table_path = 1; + optional int64 snapshot_id = 2; + optional bool readable = 3; +} + +message GetLakeSnapshotResponse { + required int64 table_id = 1; + required int64 snapshotId = 2; + repeated PbLakeSnapshotForBucket bucket_snapshots = 3; +} + message PbLakeSnapshotForBucket { optional int64 partition_id = 1; required int32 bucket_id = 2; @@ -417,4 +429,30 @@ message AuthenticateRequest { message AuthenticateResponse { optional bytes challenge = 1; -} \ No newline at end of file +} + +message PbLakeDvEntry { + required string file_path = 1; + required bytes del_bitmap = 2; +} + +message GetDvForUnionReadRequest { + required int64 table_id = 1; + optional int64 partition_id = 2; + required int32 bucket_id = 3; + required int64 requested_snapshot_id = 4; + repeated string data_files = 5; +} + +message GetDvForUnionReadResponse { + repeated PbLakeDvEntry lake_dv = 1; + repeated PbLogDvEntry log_dv = 2; + required int64 log_end_offset = 3; + optional int64 current_readable_snapshot = 4; + optional bool is_stale = 5; +} + +message PbLogDvEntry { + required int64 base_offset = 1; + required bytes del_bits = 2; +} diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index ea27836..5b92887 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -890,21 +890,54 @@ impl LogRecordBatch { LittleEndian::read_i32(&self.data[offset..offset + RECORDS_COUNT_LENGTH]) } + pub fn is_append_only(&self) -> bool { + self.attributes() & 1 == 1 + } + pub fn records(&self, read_context: &ReadContext) -> Result<LogRecordIterator> { if self.record_count() == 0 { return Ok(LogRecordIterator::empty()); } - let data = &self.data[RECORDS_OFFSET..]; + let record_count = self.record_count() as usize; + let (arrow_data, change_types) = if self.is_append_only() { + let data = &self.data[RECORDS_OFFSET..]; + let ct = vec![ChangeType::AppendOnly; record_count]; + (data, ct) + } else { + let ct_start = RECORDS_OFFSET; + let ct_end = ct_start + record_count; + if self.data.len() < ct_end { + return Err(Error::UnexpectedError { + message: format!( + "Corrupt log record batch: data length {} too short for {} change type bytes", + self.data.len(), + record_count + ), + source: None, + }); + } + let ct_bytes = &self.data[ct_start..ct_end]; + let ct: std::result::Result<Vec<ChangeType>, String> = ct_bytes + .iter() + .map(|&b| ChangeType::from_byte_value(b)) + .collect(); + let ct = ct.map_err(|e| Error::UnexpectedError { + message: format!("Invalid change type in record batch: {e}"), + source: None, + })?; + let data = &self.data[ct_end..]; + (data, ct) + }; - let record_batch = read_context.record_batch(data)?; + let record_batch = read_context.record_batch(arrow_data)?; let arrow_reader = ArrowReader::new(Arc::new(record_batch)); let log_record_iterator = LogRecordIterator::Arrow(ArrowLogRecordIterator { reader: arrow_reader, base_offset: self.base_log_offset(), timestamp: self.commit_timestamp(), row_id: 0, - change_type: ChangeType::AppendOnly, + change_types, }); Ok(log_record_iterator) @@ -915,9 +948,38 @@ impl LogRecordBatch { return Ok(LogRecordIterator::empty()); } - let data = &self.data[RECORDS_OFFSET..]; + let record_count = self.record_count() as usize; + let (arrow_data, change_types) = if self.is_append_only() { + let data = &self.data[RECORDS_OFFSET..]; + let ct = vec![ChangeType::AppendOnly; record_count]; + (data, ct) + } else { + let ct_start = RECORDS_OFFSET; + let ct_end = ct_start + record_count; + if self.data.len() < ct_end { + return Err(Error::UnexpectedError { + message: format!( + "Corrupt log record batch: data length {} too short for {} change type bytes", + self.data.len(), + record_count + ), + source: None, + }); + } + let ct_bytes = &self.data[ct_start..ct_end]; + let ct: std::result::Result<Vec<ChangeType>, String> = ct_bytes + .iter() + .map(|&b| ChangeType::from_byte_value(b)) + .collect(); + let ct = ct.map_err(|e| Error::UnexpectedError { + message: format!("Invalid change type in record batch: {e}"), + source: None, + })?; + let data = &self.data[ct_end..]; + (data, ct) + }; - let record_batch = read_context.record_batch_for_remote_log(data)?; + let record_batch = read_context.record_batch_for_remote_log(arrow_data)?; let log_record_iterator = match record_batch { None => LogRecordIterator::empty(), Some(record_batch) => { @@ -927,7 +989,7 @@ impl LogRecordBatch { base_offset: self.base_log_offset(), timestamp: self.commit_timestamp(), row_id: 0, - change_type: ChangeType::AppendOnly, + change_types, }) } }; @@ -943,14 +1005,20 @@ impl LogRecordBatch { return Ok(RecordBatch::new_empty(read_context.target_schema.clone())); } + let arrow_start = if self.is_append_only() { + RECORDS_OFFSET + } else { + RECORDS_OFFSET + self.record_count() as usize + }; + let data = self .data - .get(RECORDS_OFFSET..) + .get(arrow_start..) .ok_or_else(|| Error::UnexpectedError { message: format!( - "Corrupt log record batch: data length {} is less than RECORDS_OFFSET {}", + "Corrupt log record batch: data length {} is less than arrow data offset {}", self.data.len(), - RECORDS_OFFSET + arrow_start ), source: None, })?; @@ -1407,18 +1475,23 @@ pub struct ArrowLogRecordIterator { base_offset: i64, timestamp: i64, row_id: usize, - change_type: ChangeType, + change_types: Vec<ChangeType>, } #[allow(dead_code)] impl ArrowLogRecordIterator { - fn new(reader: ArrowReader, base_offset: i64, timestamp: i64, change_type: ChangeType) -> Self { + fn new( + reader: ArrowReader, + base_offset: i64, + timestamp: i64, + change_types: Vec<ChangeType>, + ) -> Self { Self { reader, base_offset, timestamp, row_id: 0, - change_type, + change_types, } } } @@ -1431,12 +1504,17 @@ impl Iterator for ArrowLogRecordIterator { return None; } + let change_type = self + .change_types + .get(self.row_id) + .copied() + .unwrap_or(ChangeType::AppendOnly); let columnar_row = self.reader.read(self.row_id); let scan_record = ScanRecord::new( columnar_row, self.base_offset + self.row_id as i64, self.timestamp, - self.change_type, + change_type, ); self.row_id += 1; Some(scan_record) @@ -2012,4 +2090,91 @@ mod tests { Ok(()) } + + #[test] + fn test_change_type_byte_parsing_in_non_append_only_batch() -> Result<()> { + use crate::compression::{ + ArrowCompressionInfo, ArrowCompressionType, DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, + }; + use crate::metadata::{PhysicalTablePath, TablePath}; + use crate::row::GenericRow; + + let row_type = RowType::new(vec![ + DataField::new("id", DataTypes::int(), None), + DataField::new("name", DataTypes::string(), None), + ]); + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1)); + let physical_table_path = Arc::new(PhysicalTablePath::of(Arc::new(table_path))); + + let mut builder = MemoryLogRecordsArrowBuilder::new( + 1, + &row_type, + false, + ArrowCompressionInfo { + compression_type: ArrowCompressionType::None, + compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, + }, + )?; + + for (id, name) in [(1i32, "Alice"), (2, "Bob"), (3, "Charlie")] { + let mut row = GenericRow::new(2); + row.set_field(0, id); + row.set_field(1, name); + let record = WriteRecord::for_append( + Arc::clone(&table_info), + physical_table_path.clone(), + 1, + &row, + ); + builder.append(&record)?; + } + + let append_only_bytes = builder.build()?; + let arrow_ipc_data = &append_only_bytes[RECORDS_OFFSET..]; + + let record_count: usize = 3; + let change_type_bytes: Vec<u8> = vec![ + ChangeType::Insert.to_byte_value(), + ChangeType::UpdateBefore.to_byte_value(), + ChangeType::UpdateAfter.to_byte_value(), + ]; + + let total_len = RECORDS_OFFSET + change_type_bytes.len() + arrow_ipc_data.len(); + let mut batch_bytes = vec![0u8; total_len]; + batch_bytes[..RECORDS_OFFSET].copy_from_slice(&append_only_bytes[..RECORDS_OFFSET]); + batch_bytes[ATTRIBUTES_OFFSET] = 0; + batch_bytes[RECORDS_OFFSET..RECORDS_OFFSET + record_count] + .copy_from_slice(&change_type_bytes); + batch_bytes[RECORDS_OFFSET + record_count..].copy_from_slice(arrow_ipc_data); + + LittleEndian::write_i32( + &mut batch_bytes[LENGTH_OFFSET..LENGTH_OFFSET + LENGTH_LENGTH], + (total_len - BASE_OFFSET_LENGTH - LENGTH_LENGTH) as i32, + ); + let crc = crc32c(&batch_bytes[SCHEMA_ID_OFFSET..]); + LittleEndian::write_u32(&mut batch_bytes[CRC_OFFSET..CRC_OFFSET + CRC_LENGTH], crc); + + let batch = LogRecordBatch::new(Bytes::from(batch_bytes)); + assert!(!batch.is_append_only()); + assert_eq!(batch.record_count(), 3); + + let arrow_schema = to_arrow_schema(&row_type)?; + let read_context = ReadContext::new(arrow_schema, false); + let records: Vec<ScanRecord> = batch.records(&read_context)?.collect(); + + assert_eq!(records.len(), 3); + assert_eq!(*records[0].change_type(), ChangeType::Insert); + assert_eq!(*records[1].change_type(), ChangeType::UpdateBefore); + assert_eq!(*records[2].change_type(), ChangeType::UpdateAfter); + assert_eq!(records[0].row().get_int(0)?, 1); + assert_eq!(records[0].row().get_string(1)?, "Alice"); + assert_eq!(records[1].row().get_int(0)?, 2); + assert_eq!(records[1].row().get_string(1)?, "Bob"); + assert_eq!(records[2].row().get_int(0)?, 3); + assert_eq!(records[2].row().get_string(1)?, "Charlie"); + + Ok(()) + } + } diff --git a/crates/fluss/src/rpc/api_key.rs b/crates/fluss/src/rpc/api_key.rs index 4231fb0..344c1c3 100644 --- a/crates/fluss/src/rpc/api_key.rs +++ b/crates/fluss/src/rpc/api_key.rs @@ -38,6 +38,7 @@ pub enum ApiKey { GetFileSystemSecurityToken, GetDatabaseInfo, GetLatestLakeSnapshot, + GetDvForUnionRead, CreatePartition, DropPartition, Authenticate, @@ -65,6 +66,7 @@ impl From<i16> for ApiKey { 1021 => ApiKey::ListOffsets, 1025 => ApiKey::GetFileSystemSecurityToken, 1032 => ApiKey::GetLatestLakeSnapshot, + 1064 => ApiKey::GetDvForUnionRead, 1035 => ApiKey::GetDatabaseInfo, 1036 => ApiKey::CreatePartition, 1037 => ApiKey::DropPartition, @@ -95,6 +97,7 @@ impl From<ApiKey> for i16 { ApiKey::ListOffsets => 1021, ApiKey::GetFileSystemSecurityToken => 1025, ApiKey::GetLatestLakeSnapshot => 1032, + ApiKey::GetDvForUnionRead => 1064, ApiKey::GetDatabaseInfo => 1035, ApiKey::CreatePartition => 1036, ApiKey::DropPartition => 1037, @@ -129,6 +132,7 @@ mod tests { (1021, ApiKey::ListOffsets), (1025, ApiKey::GetFileSystemSecurityToken), (1032, ApiKey::GetLatestLakeSnapshot), + (1064, ApiKey::GetDvForUnionRead), (1035, ApiKey::GetDatabaseInfo), (1036, ApiKey::CreatePartition), (1037, ApiKey::DropPartition), diff --git a/crates/fluss/src/rpc/message/get_dv_for_union_read.rs b/crates/fluss/src/rpc/message/get_dv_for_union_read.rs new file mode 100644 index 0000000..a8bb2ae --- /dev/null +++ b/crates/fluss/src/rpc/message/get_dv_for_union_read.rs @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::TableBucket; +use crate::proto; +use crate::rpc::api_key::ApiKey; +use crate::rpc::api_version::ApiVersion; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; +use crate::{impl_read_version_type, impl_write_version_type}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct GetDvForUnionReadRequest { + pub inner_request: proto::GetDvForUnionReadRequest, +} + +impl GetDvForUnionReadRequest { + pub fn new( + table_bucket: &TableBucket, + requested_snapshot_id: i64, + data_files: Vec<String>, + ) -> Self { + Self { + inner_request: proto::GetDvForUnionReadRequest { + table_id: table_bucket.table_id(), + partition_id: table_bucket.partition_id(), + bucket_id: table_bucket.bucket_id(), + requested_snapshot_id, + data_files, + }, + } + } +} + +impl RequestBody for GetDvForUnionReadRequest { + type ResponseBody = proto::GetDvForUnionReadResponse; + const API_KEY: ApiKey = ApiKey::GetDvForUnionRead; + const REQUEST_VERSION: ApiVersion = ApiVersion(0); +} + +impl_write_version_type!(GetDvForUnionReadRequest); +impl_read_version_type!(proto::GetDvForUnionReadResponse); diff --git a/crates/fluss/src/rpc/message/get_lake_snapshot.rs b/crates/fluss/src/rpc/message/get_lake_snapshot.rs new file mode 100644 index 0000000..203e982 --- /dev/null +++ b/crates/fluss/src/rpc/message/get_lake_snapshot.rs @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::TablePath; +use crate::proto; +use crate::proto::PbTablePath; +use crate::rpc::api_key::ApiKey; +use crate::rpc::api_version::ApiVersion; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; +use crate::{impl_read_version_type, impl_write_version_type}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct GetLakeSnapshotRequest { + pub inner_request: proto::GetLakeSnapshotRequest, +} + +impl GetLakeSnapshotRequest { + pub fn latest(table_path: &TablePath) -> Self { + Self { + inner_request: proto::GetLakeSnapshotRequest { + table_path: PbTablePath { + database_name: table_path.database().to_string(), + table_name: table_path.table().to_string(), + }, + snapshot_id: None, + readable: None, + }, + } + } + + pub fn readable(table_path: &TablePath) -> Self { + Self { + inner_request: proto::GetLakeSnapshotRequest { + table_path: PbTablePath { + database_name: table_path.database().to_string(), + table_name: table_path.table().to_string(), + }, + snapshot_id: None, + readable: Some(true), + }, + } + } + + pub fn by_id(table_path: &TablePath, snapshot_id: i64) -> Self { + Self { + inner_request: proto::GetLakeSnapshotRequest { + table_path: PbTablePath { + database_name: table_path.database().to_string(), + table_name: table_path.table().to_string(), + }, + snapshot_id: Some(snapshot_id), + readable: None, + }, + } + } +} + +impl RequestBody for GetLakeSnapshotRequest { + type ResponseBody = proto::GetLakeSnapshotResponse; + const API_KEY: ApiKey = ApiKey::GetLatestLakeSnapshot; + const REQUEST_VERSION: ApiVersion = ApiVersion(0); +} + +impl_write_version_type!(GetLakeSnapshotRequest); +impl_read_version_type!(proto::GetLakeSnapshotResponse); diff --git a/crates/fluss/src/rpc/message/mod.rs b/crates/fluss/src/rpc/message/mod.rs index 9ad4545..f563e89 100644 --- a/crates/fluss/src/rpc/message/mod.rs +++ b/crates/fluss/src/rpc/message/mod.rs @@ -30,6 +30,8 @@ mod drop_partition; mod drop_table; mod fetch; mod get_database_info; +mod get_dv_for_union_read; +mod get_lake_snapshot; mod get_latest_lake_snapshot; mod get_security_token; mod get_table; @@ -55,6 +57,8 @@ pub use drop_partition::*; pub use drop_table::*; pub use fetch::*; pub use get_database_info::*; +pub use get_dv_for_union_read::*; +pub use get_lake_snapshot::*; pub use get_latest_lake_snapshot::*; pub use get_security_token::*; pub use get_table::*;
