This is an automated email from the ASF dual-hosted git repository.
leekei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 1fc28c0 feat: [TASK-315] Proto + RPC for LimitScan (#472)
1fc28c0 is described below
commit 1fc28c0a9e4fffeaa75c015d967f8619cc6c649f
Author: Anton Borisov <[email protected]>
AuthorDate: Wed Apr 1 23:21:06 2026 +0100
feat: [TASK-315] Proto + RPC for LimitScan (#472)
---
crates/fluss/src/proto/fluss_api.proto | 17 +++++++++
crates/fluss/src/rpc/api_key.rs | 4 +++
crates/fluss/src/rpc/message/limit_scan.rs | 58 ++++++++++++++++++++++++++++++
crates/fluss/src/rpc/message/mod.rs | 2 ++
4 files changed, 81 insertions(+)
diff --git a/crates/fluss/src/proto/fluss_api.proto
b/crates/fluss/src/proto/fluss_api.proto
index a733dd7..8f081c2 100644
--- a/crates/fluss/src/proto/fluss_api.proto
+++ b/crates/fluss/src/proto/fluss_api.proto
@@ -419,6 +419,23 @@ message AuthenticateResponse {
optional bytes challenge = 1;
}
+// limit scan request and response
+message LimitScanRequest {
+ required int64 table_id = 2;
+ optional int64 partition_id = 3;
+ required int32 bucket_id = 4;
+ required int32 limit = 5;
+}
+
+message LimitScanResponse{
+ optional int32 error_code = 1;
+ optional string error_message = 2;
+ // flag to indicate the table type
+ optional bool is_log_table = 3;
+ // LogRecordBatch if is_log_table is true, otherwise KvRecordBatch
+ optional bytes records = 4;
+}
+
// init writer request and response
message InitWriterRequest {
repeated PbTablePath table_path = 1;
diff --git a/crates/fluss/src/rpc/api_key.rs b/crates/fluss/src/rpc/api_key.rs
index d1b3ea6..5e4fddf 100644
--- a/crates/fluss/src/rpc/api_key.rs
+++ b/crates/fluss/src/rpc/api_key.rs
@@ -38,6 +38,7 @@ pub enum ApiKey {
GetFileSystemSecurityToken, // 1025
InitWriter, // 1026
GetLatestLakeSnapshot, // 1032
+ LimitScan, // 1033
GetDatabaseInfo, // 1035
CreatePartition, // 1036
DropPartition, // 1037
@@ -67,6 +68,7 @@ impl From<i16> for ApiKey {
1025 => ApiKey::GetFileSystemSecurityToken,
1026 => ApiKey::InitWriter,
1032 => ApiKey::GetLatestLakeSnapshot,
+ 1033 => ApiKey::LimitScan,
1035 => ApiKey::GetDatabaseInfo,
1036 => ApiKey::CreatePartition,
1037 => ApiKey::DropPartition,
@@ -99,6 +101,7 @@ impl From<ApiKey> for i16 {
ApiKey::GetFileSystemSecurityToken => 1025,
ApiKey::InitWriter => 1026,
ApiKey::GetLatestLakeSnapshot => 1032,
+ ApiKey::LimitScan => 1033,
ApiKey::GetDatabaseInfo => 1035,
ApiKey::CreatePartition => 1036,
ApiKey::DropPartition => 1037,
@@ -134,6 +137,7 @@ mod tests {
(1025, ApiKey::GetFileSystemSecurityToken),
(1026, ApiKey::InitWriter),
(1032, ApiKey::GetLatestLakeSnapshot),
+ (1033, ApiKey::LimitScan),
(1035, ApiKey::GetDatabaseInfo),
(1036, ApiKey::CreatePartition),
(1037, ApiKey::DropPartition),
diff --git a/crates/fluss/src/rpc/message/limit_scan.rs
b/crates/fluss/src/rpc/message/limit_scan.rs
new file mode 100644
index 0000000..d83a2e8
--- /dev/null
+++ b/crates/fluss/src/rpc/message/limit_scan.rs
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::proto::LimitScanResponse;
+use crate::rpc::frame::ReadError;
+
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::WriteError;
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+use prost::Message;
+
+use bytes::{Buf, BufMut};
+
+pub struct LimitScanRequest {
+ pub inner_request: proto::LimitScanRequest,
+}
+
+impl LimitScanRequest {
+ pub fn new(table_id: i64, partition_id: Option<i64>, bucket_id: i32,
limit: i32) -> Self {
+ let request = proto::LimitScanRequest {
+ table_id,
+ partition_id,
+ bucket_id,
+ limit,
+ };
+
+ Self {
+ inner_request: request,
+ }
+ }
+}
+
+impl RequestBody for LimitScanRequest {
+ type ResponseBody = LimitScanResponse;
+
+ const API_KEY: ApiKey = ApiKey::LimitScan;
+
+ const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(LimitScanRequest);
+impl_read_version_type!(LimitScanResponse);
diff --git a/crates/fluss/src/rpc/message/mod.rs
b/crates/fluss/src/rpc/message/mod.rs
index 89a8ba1..1080802 100644
--- a/crates/fluss/src/rpc/message/mod.rs
+++ b/crates/fluss/src/rpc/message/mod.rs
@@ -35,6 +35,7 @@ mod get_security_token;
mod get_table;
mod header;
mod init_writer;
+mod limit_scan;
mod list_databases;
mod list_offsets;
mod list_partition_infos;
@@ -61,6 +62,7 @@ pub use get_security_token::*;
pub use get_table::*;
pub use header::*;
pub use init_writer::*;
+pub use limit_scan::*;
pub use list_databases::*;
pub use list_offsets::*;
pub use list_partition_infos::*;