This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 186ded1 [feat] Add more functions to the Rust client Admin (#5)
186ded1 is described below
commit 186ded18baf6cd84d72bac2bc6cd7c870c44f318
Author: naivedogger <[email protected]>
AuthorDate: Thu Sep 11 16:25:12 2025 +0800
[feat] Add more functions to the Rust client Admin (#5)
---
.github/{dependabot.yml => dependabot.yml} | 0
crates/fluss/src/client/admin.rs | 133 +++++++++++-
crates/fluss/src/client/mod.rs | 6 +-
crates/fluss/src/client/table/mod.rs | 5 +-
crates/fluss/src/client/write/accumulator.rs | 6 +-
crates/fluss/src/cluster/cluster.rs | 1 +
crates/fluss/src/metadata/database.rs | 234 +++++++++++++++++++++
crates/fluss/src/metadata/datatype.rs | 12 ++
crates/fluss/src/metadata/mod.rs | 4 +-
crates/fluss/src/metadata/table.rs | 65 ++++++
crates/fluss/src/proto/fluss_api.proto | 83 ++++++++
crates/fluss/src/record/arrow.rs | 7 +-
crates/fluss/src/row/datum.rs | 103 +++++----
crates/fluss/src/row/mod.rs | 3 +-
crates/fluss/src/rpc/api_key.rs | 39 +++-
crates/fluss/src/rpc/message/create_database.rs | 67 ++++++
crates/fluss/src/rpc/message/database_exists.rs | 49 +++++
crates/fluss/src/rpc/message/drop_database.rs | 51 +++++
crates/fluss/src/rpc/message/drop_table.rs | 56 +++++
crates/fluss/src/rpc/message/get_database_info.rs | 49 +++++
.../src/rpc/message/get_latest_lake_snapshot.rs | 55 +++++
crates/fluss/src/rpc/message/list_databases.rs | 47 +++++
crates/fluss/src/rpc/message/list_tables.rs | 53 +++++
crates/fluss/src/rpc/message/mod.rs | 18 ++
crates/fluss/src/rpc/message/table_exists.rs | 55 +++++
crates/fluss/src/rpc/server_connection.rs | 1 -
26 files changed, 1144 insertions(+), 58 deletions(-)
diff --git a/.github/dependabot.yml b/.github/dependabot.yml
similarity index 100%
rename from .github/dependabot.yml
rename to .github/dependabot.yml
diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs
index 8688a2d..2584034 100644
--- a/crates/fluss/src/client/admin.rs
+++ b/crates/fluss/src/client/admin.rs
@@ -16,18 +16,28 @@
// under the License.
use crate::client::metadata::Metadata;
-use crate::metadata::{JsonSerde, TableDescriptor, TableInfo, TablePath};
-use crate::rpc::message::{CreateTableRequest, GetTableRequest};
+use crate::metadata::{
+ DatabaseDescriptor, DatabaseInfo, JsonSerde, LakeSnapshot, TableBucket,
TableDescriptor,
+ TableInfo, TablePath,
+};
+use crate::rpc::message::{
+ CreateDatabaseRequest, CreateTableRequest, DatabaseExistsRequest,
DropDatabaseRequest,
+ DropTableRequest, GetDatabaseInfoRequest, GetLatestLakeSnapshotRequest,
GetTableRequest,
+ ListDatabasesRequest, ListTablesRequest, TableExistsRequest,
+};
use crate::rpc::{RpcClient, ServerConnection};
+
+use std::collections::HashMap;
use std::sync::Arc;
use crate::error::Result;
use crate::proto::GetTableInfoResponse;
-#[allow(dead_code)]
pub struct FlussAdmin {
admin_gateway: ServerConnection,
+ #[allow(dead_code)]
metadata: Arc<Metadata>,
+ #[allow(dead_code)]
rpc_client: Arc<RpcClient>,
}
@@ -49,6 +59,23 @@ impl FlussAdmin {
})
}
+ pub async fn create_database(
+ &self,
+ database_name: &str,
+ ignore_if_exists: bool,
+ database_descriptor: Option<&DatabaseDescriptor>,
+ ) -> Result<()> {
+ let _response = self
+ .admin_gateway
+ .request(CreateDatabaseRequest::new(
+ database_name,
+ ignore_if_exists,
+ database_descriptor,
+ )?)
+ .await?;
+ Ok(())
+ }
+
pub async fn create_table(
&self,
table_path: &TablePath,
@@ -66,6 +93,14 @@ impl FlussAdmin {
Ok(())
}
+ pub async fn drop_table(&self, table_path: &TablePath, ignore_if_exists:
bool) -> Result<()> {
+ let _response = self
+ .admin_gateway
+ .request(DropTableRequest::new(table_path, ignore_if_exists))
+ .await?;
+ Ok(())
+ }
+
pub async fn get_table(&self, table_path: &TablePath) -> Result<TableInfo>
{
let response = self
.admin_gateway
@@ -90,4 +125,96 @@ impl FlussAdmin {
modified_time,
))
}
+
+ /// List all tables in the given database
+ pub async fn list_tables(&self, database_name: &str) ->
Result<Vec<String>> {
+ let response = self
+ .admin_gateway
+ .request(ListTablesRequest::new(database_name))
+ .await?;
+ Ok(response.table_name)
+ }
+
+ /// Check if a table exists
+ pub async fn table_exists(&self, table_path: &TablePath) -> Result<bool> {
+ let response = self
+ .admin_gateway
+ .request(TableExistsRequest::new(table_path))
+ .await?;
+ Ok(response.exists)
+ }
+
+ /// Drop a database
+ pub async fn drop_database(
+ &self,
+ database_name: &str,
+ ignore_if_not_exists: bool,
+ cascade: bool,
+ ) -> Result<()> {
+ let _response = self
+ .admin_gateway
+ .request(DropDatabaseRequest::new(
+ database_name,
+ ignore_if_not_exists,
+ cascade,
+ ))
+ .await?;
+ Ok(())
+ }
+
+ /// List all databases
+ pub async fn list_databases(&self) -> Result<Vec<String>> {
+ let response = self
+ .admin_gateway
+ .request(ListDatabasesRequest::new())
+ .await?;
+ Ok(response.database_name)
+ }
+
+ /// Check if a database exists
+ pub async fn database_exists(&self, database_name: &str) -> Result<bool> {
+ let response = self
+ .admin_gateway
+ .request(DatabaseExistsRequest::new(database_name))
+ .await?;
+ Ok(response.exists)
+ }
+
+ /// Get database information
+ pub async fn get_database_info(&self, database_name: &str) ->
Result<DatabaseInfo> {
+ let request = GetDatabaseInfoRequest::new(database_name);
+ let response = self.admin_gateway.request(request).await?;
+
+ // Convert proto response to DatabaseInfo
+ let database_descriptor =
DatabaseDescriptor::from_json_bytes(&response.database_json)?;
+
+ Ok(DatabaseInfo::new(
+ database_name.to_string(),
+ database_descriptor,
+ response.created_time,
+ response.modified_time,
+ ))
+ }
+
+ /// Get the latest lake snapshot for a table
+ pub async fn get_latest_lake_snapshot(&self, table_path: &TablePath) ->
Result<LakeSnapshot> {
+ let response = self
+ .admin_gateway
+ .request(GetLatestLakeSnapshotRequest::new(table_path))
+ .await?;
+
+ // Convert proto response to LakeSnapshot
+ let mut table_buckets_offset = HashMap::new();
+ for bucket_snapshot in response.bucket_snapshots {
+ let table_bucket = TableBucket::new(response.table_id,
bucket_snapshot.bucket_id);
+ if let Some(log_offset) = bucket_snapshot.log_offset {
+ table_buckets_offset.insert(table_bucket, log_offset);
+ }
+ }
+
+ Ok(LakeSnapshot::new(
+ response.snapshot_id,
+ table_buckets_offset,
+ ))
+ }
}
diff --git a/crates/fluss/src/client/mod.rs b/crates/fluss/src/client/mod.rs
index 5b6908e..a971439 100644
--- a/crates/fluss/src/client/mod.rs
+++ b/crates/fluss/src/client/mod.rs
@@ -17,10 +17,12 @@
mod admin;
mod connection;
+mod metadata;
mod table;
mod write;
+pub use admin::*;
pub use connection::*;
-mod metadata;
-
+pub use metadata::*;
+pub use table::*;
pub use write::*;
diff --git a/crates/fluss/src/client/table/mod.rs
b/crates/fluss/src/client/table/mod.rs
index 503a1ed..4d6f8f0 100644
--- a/crates/fluss/src/client/table/mod.rs
+++ b/crates/fluss/src/client/table/mod.rs
@@ -17,8 +17,6 @@
use crate::client::connection::FlussConnection;
use crate::client::metadata::Metadata;
-use crate::client::table::append::TableAppend;
-use crate::client::table::scanner::TableScan;
use crate::metadata::{TableInfo, TablePath};
use std::sync::Arc;
@@ -29,6 +27,9 @@ mod append;
mod scanner;
mod writer;
+pub use append::TableAppend;
+pub use scanner::TableScan;
+
#[allow(dead_code)]
pub struct FlussTable<'a> {
conn: &'a FlussConnection,
diff --git a/crates/fluss/src/client/write/accumulator.rs
b/crates/fluss/src/client/write/accumulator.rs
index 0b77894..32622c7 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -93,13 +93,15 @@ impl RecordAccumulator {
}
let table_path = &record.table_path;
-
+ let table_info = cluster.get_table(table_path);
let row_type = &cluster.get_table(table_path).row_type;
+ let schema_id = table_info.schema_id;
+
let mut batch = ArrowLog(ArrowLogWriteBatch::new(
self.batch_id.fetch_add(1, Ordering::Relaxed),
table_path.as_ref().clone(),
- 0,
+ schema_id,
row_type,
bucket_id,
current_time_ms(),
diff --git a/crates/fluss/src/cluster/cluster.rs
b/crates/fluss/src/cluster/cluster.rs
index 1f8341d..a6f20a8 100644
--- a/crates/fluss/src/cluster/cluster.rs
+++ b/crates/fluss/src/cluster/cluster.rs
@@ -126,6 +126,7 @@ impl Cluster {
table_metadata.modified_time,
);
table_info_by_path.insert(table_path.clone(), table_info);
+ table_id_by_path.insert(table_path.clone(), table_id);
// now, get bucket matadata
let mut found_unavailable_bucket = false;
diff --git a/crates/fluss/src/metadata/database.rs
b/crates/fluss/src/metadata/database.rs
new file mode 100644
index 0000000..2649421
--- /dev/null
+++ b/crates/fluss/src/metadata/database.rs
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Error::JsonSerdeError;
+use crate::error::Result;
+use crate::metadata::JsonSerde;
+use serde::{Deserialize, Serialize};
+use serde_json::{Value, json};
+use std::collections::HashMap;
+
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+pub struct DatabaseDescriptor {
+ comment: Option<String>,
+ custom_properties: HashMap<String, String>,
+}
+
+#[derive(Debug, Clone)]
+pub struct DatabaseInfo {
+ database_name: String,
+ database_descriptor: DatabaseDescriptor,
+ created_time: i64,
+ modified_time: i64,
+}
+
+impl DatabaseInfo {
+ pub fn new(
+ database_name: String,
+ database_descriptor: DatabaseDescriptor,
+ created_time: i64,
+ modified_time: i64,
+ ) -> Self {
+ Self {
+ database_name,
+ database_descriptor,
+ created_time,
+ modified_time,
+ }
+ }
+
+ pub fn database_name(&self) -> &str {
+ &self.database_name
+ }
+
+ pub fn database_descriptor(&self) -> &DatabaseDescriptor {
+ &self.database_descriptor
+ }
+
+ pub fn created_time(&self) -> i64 {
+ self.created_time
+ }
+
+ pub fn modified_time(&self) -> i64 {
+ self.modified_time
+ }
+}
+
+#[derive(Debug, Default)]
+pub struct DatabaseDescriptorBuilder {
+ comment: Option<String>,
+ custom_properties: HashMap<String, String>,
+}
+
+impl DatabaseDescriptor {
+ pub fn builder() -> DatabaseDescriptorBuilder {
+ DatabaseDescriptorBuilder::default()
+ }
+
+ pub fn comment(&self) -> Option<&str> {
+ self.comment.as_deref()
+ }
+
+ pub fn custom_properties(&self) -> &HashMap<String, String> {
+ &self.custom_properties
+ }
+}
+
+impl DatabaseDescriptorBuilder {
+ pub fn comment(mut self, comment: &str) -> Self {
+ self.comment = Some(comment.to_string());
+ self
+ }
+
+ pub fn custom_properties(mut self, properties: HashMap<String, String>) ->
Self {
+ self.custom_properties = properties;
+ self
+ }
+
+ pub fn custom_property(mut self, key: &str, value: &str) -> Self {
+ self.custom_properties
+ .insert(key.to_string(), value.to_string());
+ self
+ }
+
+ pub fn build(self) -> Result<DatabaseDescriptor> {
+ Ok(DatabaseDescriptor {
+ comment: self.comment,
+ custom_properties: self.custom_properties,
+ })
+ }
+}
+
+impl DatabaseDescriptor {
+ const CUSTOM_PROPERTIES_NAME: &'static str = "custom_properties";
+ const COMMENT_NAME: &'static str = "comment";
+ const VERSION_KEY: &'static str = "version";
+ const VERSION: u32 = 1;
+}
+
+impl JsonSerde for DatabaseDescriptor {
+ fn serialize_json(&self) -> Result<Value> {
+ let mut obj = serde_json::Map::new();
+
+ // Serialize version
+ obj.insert(Self::VERSION_KEY.to_string(), json!(Self::VERSION));
+
+ // Serialize comment if present
+ if let Some(comment) = self.comment() {
+ obj.insert(Self::COMMENT_NAME.to_string(), json!(comment));
+ }
+
+ // Serialize custom properties
+ obj.insert(
+ Self::CUSTOM_PROPERTIES_NAME.to_string(),
+ json!(self.custom_properties()),
+ );
+
+ Ok(Value::Object(obj))
+ }
+
+ fn deserialize_json(node: &Value) -> Result<Self> {
+ let mut builder = DatabaseDescriptor::builder();
+
+ // Deserialize comment if present
+ if let Some(comment_node) = node.get(Self::COMMENT_NAME) {
+ let comment = comment_node
+ .as_str()
+ .ok_or_else(|| {
+ JsonSerdeError(format!("{} should be a string",
Self::COMMENT_NAME))
+ })?
+ .to_owned();
+ builder = builder.comment(&comment);
+ }
+
+ // Deserialize custom properties directly
+ let custom_properties = if let Some(props_node) =
node.get(Self::CUSTOM_PROPERTIES_NAME) {
+ let obj = props_node.as_object().ok_or_else(|| {
+ JsonSerdeError("Custom properties should be an
object".to_string())
+ })?;
+
+ let mut properties = HashMap::with_capacity(obj.len());
+ for (key, value) in obj {
+ properties.insert(
+ key.clone(),
+ value
+ .as_str()
+ .ok_or_else(|| {
+ JsonSerdeError("Property value should be a
string".to_string())
+ })?
+ .to_owned(),
+ );
+ }
+ properties
+ } else {
+ HashMap::new()
+ };
+ builder = builder.custom_properties(custom_properties);
+
+ builder.build()
+ }
+}
+
+impl DatabaseDescriptor {
+ /// Create DatabaseDescriptor from JSON bytes (equivalent to Java's
fromJsonBytes)
+ pub fn from_json_bytes(bytes: &[u8]) -> Result<Self> {
+ let json_value: Value = serde_json::from_slice(bytes)
+ .map_err(|e| JsonSerdeError(format!("Failed to parse JSON: {}",
e)))?;
+ Self::deserialize_json(&json_value)
+ }
+
+ /// Convert DatabaseDescriptor to JSON bytes
+ pub fn to_json_bytes(&self) -> Result<Vec<u8>> {
+ let json_value = self.serialize_json()?;
+ serde_json::to_vec(&json_value)
+ .map_err(|e| JsonSerdeError(format!("Failed to serialize to JSON:
{}", e)))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_database_descriptor_json_serde() {
+ let mut custom_props = HashMap::new();
+ custom_props.insert("key1".to_string(), "value1".to_string());
+ custom_props.insert("key2".to_string(), "value2".to_string());
+
+ let descriptor = DatabaseDescriptor::builder()
+ .comment("Test database")
+ .custom_properties(custom_props)
+ .build()
+ .unwrap();
+
+ // Test serialization
+ let json_bytes = descriptor.to_json_bytes().unwrap();
+ println!("Serialized JSON: {}", String::from_utf8_lossy(&json_bytes));
+
+ // Test deserialization
+ let deserialized =
DatabaseDescriptor::from_json_bytes(&json_bytes).unwrap();
+ assert_eq!(descriptor, deserialized);
+ }
+
+ #[test]
+ fn test_empty_database_descriptor() {
+ let descriptor = DatabaseDescriptor::builder().build().unwrap();
+ let json_bytes = descriptor.to_json_bytes().unwrap();
+ let deserialized =
DatabaseDescriptor::from_json_bytes(&json_bytes).unwrap();
+ assert_eq!(descriptor, deserialized);
+ }
+}
diff --git a/crates/fluss/src/metadata/datatype.rs
b/crates/fluss/src/metadata/datatype.rs
index 0c00c6f..09ca0c2 100644
--- a/crates/fluss/src/metadata/datatype.rs
+++ b/crates/fluss/src/metadata/datatype.rs
@@ -591,6 +591,10 @@ impl ArrayType {
element_type: self.element_type.clone(),
}
}
+
+ pub fn get_element_type(&self) -> &DataType {
+ &self.element_type
+ }
}
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, Hash)]
@@ -620,6 +624,14 @@ impl MapType {
value_type: self.value_type.clone(),
}
}
+
+ pub fn key_type(&self) -> &DataType {
+ &self.key_type
+ }
+
+ pub fn value_type(&self) -> &DataType {
+ &self.value_type
+ }
}
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, Hash)]
diff --git a/crates/fluss/src/metadata/mod.rs b/crates/fluss/src/metadata/mod.rs
index 7946547..8754007 100644
--- a/crates/fluss/src/metadata/mod.rs
+++ b/crates/fluss/src/metadata/mod.rs
@@ -15,10 +15,12 @@
// specific language governing permissions and limitations
// under the License.
+mod database;
mod datatype;
-pub use datatype::*;
mod json_serde;
mod table;
+pub use database::*;
+pub use datatype::*;
pub use json_serde::*;
pub use table::*;
diff --git a/crates/fluss/src/metadata/table.rs
b/crates/fluss/src/metadata/table.rs
index a5ab61d..90e3573 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -584,6 +584,16 @@ impl Display for LogFormat {
}
}
+impl LogFormat {
+ pub fn parse(s: &str) -> Result<Self> {
+ match s.to_uppercase().as_str() {
+ "ARROW" => Ok(LogFormat::ARROW),
+ "INDEXED" => Ok(LogFormat::INDEXED),
+ _ => Err(InvalidTableError(format!("Unknown log format: {}", s))),
+ }
+ }
+}
+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum KvFormat {
INDEXED,
@@ -600,6 +610,16 @@ impl Display for KvFormat {
}
}
+impl KvFormat {
+ pub fn parse(s: &str) -> Result<Self> {
+ match s.to_uppercase().as_str() {
+ "INDEXED" => Ok(KvFormat::INDEXED),
+ "COMPACTED" => Ok(KvFormat::COMPACTED),
+ _ => Err(InvalidTableError(format!("Unknown kv format: {}", s))),
+ }
+ }
+}
+
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct TablePath {
database: String,
@@ -631,6 +651,28 @@ impl TablePath {
}
}
+#[derive(Debug, Clone)]
+pub struct PhysicalTablePath {
+ table_path: TablePath,
+ #[allow(dead_code)]
+ partition: Option<String>,
+}
+
+impl PhysicalTablePath {
+ pub fn of(table_path: TablePath) -> Self {
+ Self {
+ table_path,
+ partition: None,
+ }
+ }
+
+ // TODO: support partition
+
+ pub fn get_table_path(&self) -> &TablePath {
+ &self.table_path
+ }
+}
+
#[derive(Debug, Clone)]
pub struct TableInfo {
pub table_path: TablePath,
@@ -918,3 +960,26 @@ impl TableBucket {
self.partition_id
}
}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LakeSnapshot {
+ pub snapshot_id: i64,
+ pub table_buckets_offset: HashMap<TableBucket, i64>,
+}
+
+impl LakeSnapshot {
+ pub fn new(snapshot_id: i64, table_buckets_offset: HashMap<TableBucket,
i64>) -> Self {
+ Self {
+ snapshot_id,
+ table_buckets_offset,
+ }
+ }
+
+ pub fn snapshot_id(&self) -> i64 {
+ self.snapshot_id
+ }
+
+ pub fn table_buckets_offset(&self) -> &HashMap<TableBucket, i64> {
+ &self.table_buckets_offset
+ }
+}
diff --git a/crates/fluss/src/proto/fluss_api.proto
b/crates/fluss/src/proto/fluss_api.proto
index 195b8f8..d71197b 100644
--- a/crates/fluss/src/proto/fluss_api.proto
+++ b/crates/fluss/src/proto/fluss_api.proto
@@ -123,6 +123,21 @@ message CreateTableRequest {
message CreateTableResponse {
}
+message DropTableRequest {
+ required PbTablePath table_path = 1;
+ required bool ignore_if_not_exists = 2;
+}
+
+message DropTableResponse {
+}
+
+message TableExistsRequest {
+ required PbTablePath table_path = 1;
+}
+
+message TableExistsResponse {
+ required bool exists = 1;
+}
message GetTableInfoRequest {
required PbTablePath table_path = 1;
@@ -136,6 +151,57 @@ message GetTableInfoResponse {
required int64 modified_time = 5;
}
+message ListTablesRequest {
+ required string database_name = 1;
+}
+
+message ListTablesResponse {
+ repeated string table_name = 1;
+}
+
+message CreateDatabaseRequest {
+ required string database_name = 1;
+ required bool ignore_if_exists = 2;
+ optional bytes database_json = 3;
+}
+
+message CreateDatabaseResponse {
+}
+
+message GetDatabaseInfoRequest {
+ required string database_name = 1;
+}
+
+message GetDatabaseInfoResponse {
+ required bytes database_json = 3;
+ required int64 created_time = 4;
+ required int64 modified_time = 5;
+}
+
+message DropDatabaseRequest {
+ required string database_name = 1;
+ required bool ignore_if_not_exists = 2;
+ required bool cascade = 3;
+}
+
+message DropDatabaseResponse {
+}
+
+message DatabaseExistsRequest {
+ required string database_name = 1;
+}
+
+message DatabaseExistsResponse {
+ required bool exists = 1;
+}
+
+message ListDatabasesRequest {
+}
+
+message ListDatabasesResponse {
+ repeated string database_name = 1;
+}
+
// fetch log request and response
message FetchLogRequest {
@@ -194,4 +260,21 @@ message PbRemoteLogSegment {
required int64 remote_log_start_offset = 2;
required int64 remote_log_end_offset = 3;
required int32 segment_size_in_bytes = 4;
+}
+
+// fetch latest lake snapshot
+message GetLatestLakeSnapshotRequest {
+ required PbTablePath table_path = 1;
+}
+
+message GetLatestLakeSnapshotResponse {
+ required int64 table_id = 1;
+ required int64 snapshotId = 2;
+ repeated PbLakeSnapshotForBucket bucket_snapshots = 3;
+}
+
+message PbLakeSnapshotForBucket {
+ optional int64 partition_id = 1;
+ required int32 bucket_id = 2;
+ optional int64 log_offset = 3;
}
\ No newline at end of file
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 2f595d0..fa63b00 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -16,9 +16,9 @@
// under the License.
use arrow::array::{
- ArrayBuilder, ArrayRef, BooleanBuilder, Float32Builder, Float64Builder,
Int8Builder,
- Int16Builder, Int32Builder, Int64Builder, StringBuilder, UInt8Builder,
UInt16Builder,
- UInt32Builder, UInt64Builder,
+ ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Float32Builder,
Float64Builder,
+ Int8Builder, Int16Builder, Int32Builder, Int64Builder, StringBuilder,
UInt8Builder,
+ UInt16Builder, UInt32Builder, UInt64Builder,
};
use arrow::{
array::RecordBatch,
@@ -224,6 +224,7 @@ impl MemoryLogRecordsArrowBuilder {
arrow_schema::DataType::Float64 => Box::new(Float64Builder::new()),
arrow_schema::DataType::Boolean => Box::new(BooleanBuilder::new()),
arrow_schema::DataType::Utf8 => Box::new(StringBuilder::new()),
+ arrow_schema::DataType::Binary => Box::new(BinaryBuilder::new()),
dt => panic!("Unsupported data type: {dt:?}"),
}
}
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index 3c65a7d..d8c4f74 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -19,7 +19,10 @@ use chrono::Datelike;
use crate::error::Error::RowConvertError;
use crate::error::Result;
-use arrow::array::{ArrayBuilder, Int8Builder, Int16Builder, Int32Builder,
StringBuilder};
+use arrow::array::{
+ ArrayBuilder, BinaryBuilder, BooleanBuilder, Float32Builder,
Float64Builder, Int8Builder,
+ Int16Builder, Int32Builder, Int64Builder, StringBuilder,
+};
use chrono::NaiveDate;
use ordered_float::OrderedFloat;
use parse_display::Display;
@@ -47,6 +50,8 @@ pub enum Datum<'a> {
#[display("{0}")]
Int64(i64),
#[display("{0}")]
+ Float32(F32),
+ #[display("{0}")]
Float64(F64),
#[display("'{0}'")]
String(&'a str),
@@ -96,6 +101,20 @@ impl From<Option<&()>> for Datum<'_> {
}
}
+impl<'a> From<f32> for Datum<'a> {
+ #[inline]
+ fn from(f: f32) -> Datum<'a> {
+ Datum::Float32(F32::from(f))
+ }
+}
+
+impl<'a> From<f64> for Datum<'a> {
+ #[inline]
+ fn from(f: f64) -> Datum<'a> {
+ Datum::Float64(F64::from(f))
+ }
+}
+
impl TryFrom<&Datum<'_>> for i32 {
type Error = ();
@@ -126,45 +145,56 @@ pub trait ToArrow {
impl Datum<'_> {
pub fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()> {
+ macro_rules! append_null_to_arrow {
+ ($builder_type:ty) => {
+ if let Some(b) =
builder.as_any_mut().downcast_mut::<$builder_type>() {
+ b.append_null();
+ return Ok(());
+ }
+ };
+ }
+
+ macro_rules! append_value_to_arrow {
+ ($builder_type:ty, $value:expr) => {
+ if let Some(b) =
builder.as_any_mut().downcast_mut::<$builder_type>() {
+ b.append_value($value);
+ return Ok(());
+ }
+ };
+ }
+
match self {
Datum::Null => {
- todo!()
- }
- Datum::Bool(_v) => {
- todo!()
- }
- Datum::Int16(_v) => {
- todo!()
- }
- Datum::Int32(v) => {
- v.append_to(builder)?;
- }
- Datum::Int64(_v) => {
- todo!()
- }
- Datum::Float64(_v) => {
- todo!()
- }
- Datum::String(v) => {
- v.append_to(builder)?;
+ append_null_to_arrow!(BooleanBuilder);
+ append_null_to_arrow!(Int16Builder);
+ append_null_to_arrow!(Int32Builder);
+ append_null_to_arrow!(Int64Builder);
+ append_null_to_arrow!(Float32Builder);
+ append_null_to_arrow!(Float64Builder);
+ append_null_to_arrow!(StringBuilder);
+ append_null_to_arrow!(BinaryBuilder);
}
- Datum::Blob(_v) => {
- todo!()
- }
- Datum::Decimal(_v) => {
- todo!()
- }
- Datum::Date(_v) => {
- todo!()
- }
- Datum::Timestamp(_v) => {
- todo!()
- }
- Datum::TimestampTz(_v) => {
- todo!()
+ Datum::Bool(v) => append_value_to_arrow!(BooleanBuilder, *v),
+ Datum::Int16(v) => append_value_to_arrow!(Int16Builder, *v),
+ Datum::Int32(v) => append_value_to_arrow!(Int32Builder, *v),
+ Datum::Int64(v) => append_value_to_arrow!(Int64Builder, *v),
+ Datum::Float32(v) => append_value_to_arrow!(Float32Builder,
v.into_inner()),
+ Datum::Float64(v) => append_value_to_arrow!(Float64Builder,
v.into_inner()),
+ Datum::String(v) => append_value_to_arrow!(StringBuilder, *v),
+ Datum::Blob(v) => append_value_to_arrow!(BinaryBuilder,
v.as_ref()),
+ Datum::Decimal(_) | Datum::Date(_) | Datum::Timestamp(_) |
Datum::TimestampTz(_) => {
+ return Err(RowConvertError(format!(
+ "Type {:?} is not yet supported for Arrow conversion",
+ std::mem::discriminant(self)
+ )));
}
}
- Ok(())
+
+ Err(RowConvertError(format!(
+ "Cannot append {:?} to builder of type {}",
+ self,
+ std::any::type_name_of_val(builder)
+ )))
}
}
@@ -190,9 +220,10 @@ macro_rules! impl_to_arrow {
impl_to_arrow!(i8, Int8Builder);
impl_to_arrow!(i16, Int16Builder);
impl_to_arrow!(i32, Int32Builder);
+impl_to_arrow!(f32, Float32Builder);
+impl_to_arrow!(f64, Float64Builder);
impl_to_arrow!(&str, StringBuilder);
-#[allow(dead_code)]
pub type F32 = OrderedFloat<f32>;
pub type F64 = OrderedFloat<f64>;
#[allow(dead_code)]
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index ead6ff0..b900cb5 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -15,13 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-use crate::row::datum::Datum;
-
mod column;
mod datum;
pub use column::*;
+pub use datum::*;
pub trait InternalRow {
/// Returns the number of fields in this row
diff --git a/crates/fluss/src/rpc/api_key.rs b/crates/fluss/src/rpc/api_key.rs
index 4928208..18ce44f 100644
--- a/crates/fluss/src/rpc/api_key.rs
+++ b/crates/fluss/src/rpc/api_key.rs
@@ -19,22 +19,40 @@ use crate::rpc::api_key::ApiKey::Unknown;
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
pub enum ApiKey {
+ CreateDatabase,
+ DropDatabase,
+ ListDatabases,
+ DatabaseExists,
CreateTable,
+ DropTable,
+ GetTable,
+ ListTables,
+ TableExists,
+ MetaData,
ProduceLog,
FetchLog,
- MetaData,
- GetTable,
+ GetDatabaseInfo,
+ GetLatestLakeSnapshot,
Unknown(i16),
}
impl From<i16> for ApiKey {
fn from(key: i16) -> Self {
match key {
+ 1001 => ApiKey::CreateDatabase,
+ 1002 => ApiKey::DropDatabase,
+ 1003 => ApiKey::ListDatabases,
+ 1004 => ApiKey::DatabaseExists,
1005 => ApiKey::CreateTable,
+ 1006 => ApiKey::DropTable,
+ 1007 => ApiKey::GetTable,
+ 1008 => ApiKey::ListTables,
+ 1010 => ApiKey::TableExists,
+ 1012 => ApiKey::MetaData,
1014 => ApiKey::ProduceLog,
1015 => ApiKey::FetchLog,
- 1012 => ApiKey::MetaData,
- 1007 => ApiKey::GetTable,
+ 1032 => ApiKey::GetLatestLakeSnapshot,
+ 1035 => ApiKey::GetDatabaseInfo,
_ => Unknown(key),
}
}
@@ -43,11 +61,20 @@ impl From<i16> for ApiKey {
impl From<ApiKey> for i16 {
fn from(key: ApiKey) -> Self {
match key {
+ ApiKey::CreateDatabase => 1001,
+ ApiKey::DropDatabase => 1002,
+ ApiKey::ListDatabases => 1003,
+ ApiKey::DatabaseExists => 1004,
ApiKey::CreateTable => 1005,
- ApiKey::ProduceLog => 1014,
- ApiKey::MetaData => 1012,
+ ApiKey::DropTable => 1006,
ApiKey::GetTable => 1007,
+ ApiKey::ListTables => 1008,
+ ApiKey::TableExists => 1010,
+ ApiKey::MetaData => 1012,
+ ApiKey::ProduceLog => 1014,
ApiKey::FetchLog => 1015,
+ ApiKey::GetLatestLakeSnapshot => 1032,
+ ApiKey::GetDatabaseInfo => 1035,
Unknown(x) => x,
}
}
diff --git a/crates/fluss/src/rpc/message/create_database.rs
b/crates/fluss/src/rpc/message/create_database.rs
new file mode 100644
index 0000000..e4052ef
--- /dev/null
+++ b/crates/fluss/src/rpc/message/create_database.rs
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::DatabaseDescriptor;
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+
+use crate::error::Result as FlussResult;
+use crate::proto::CreateDatabaseResponse;
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct CreateDatabaseRequest {
+ pub inner_request: proto::CreateDatabaseRequest,
+}
+
+impl CreateDatabaseRequest {
+ pub fn new(
+ database_name: &str,
+ ignore_if_exists: bool,
+ database_descriptor: Option<&DatabaseDescriptor>,
+ ) -> FlussResult<Self> {
+ let database_json = if let Some(descriptor) = database_descriptor {
+ Some(descriptor.to_json_bytes()?)
+ } else {
+ None
+ };
+
+ Ok(CreateDatabaseRequest {
+ inner_request: proto::CreateDatabaseRequest {
+ database_name: database_name.to_string(),
+ ignore_if_exists,
+ database_json,
+ },
+ })
+ }
+}
+
+impl RequestBody for CreateDatabaseRequest {
+ type ResponseBody = CreateDatabaseResponse;
+
+ const API_KEY: ApiKey = ApiKey::CreateDatabase;
+
+ const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(CreateDatabaseRequest);
+impl_read_version_type!(CreateDatabaseResponse);
diff --git a/crates/fluss/src/rpc/message/database_exists.rs
b/crates/fluss/src/rpc/message/database_exists.rs
new file mode 100644
index 0000000..795eea1
--- /dev/null
+++ b/crates/fluss/src/rpc/message/database_exists.rs
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct DatabaseExistsRequest {
+ pub inner_request: proto::DatabaseExistsRequest,
+}
+
+impl DatabaseExistsRequest {
+ pub fn new(database_name: &str) -> Self {
+ DatabaseExistsRequest {
+ inner_request: proto::DatabaseExistsRequest {
+ database_name: database_name.to_string(),
+ },
+ }
+ }
+}
+
+impl RequestBody for DatabaseExistsRequest {
+ type ResponseBody = proto::DatabaseExistsResponse;
+
+ const API_KEY: ApiKey = ApiKey::DatabaseExists;
+ const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(DatabaseExistsRequest);
+impl_read_version_type!(proto::DatabaseExistsResponse);
diff --git a/crates/fluss/src/rpc/message/drop_database.rs
b/crates/fluss/src/rpc/message/drop_database.rs
new file mode 100644
index 0000000..49cbfaf
--- /dev/null
+++ b/crates/fluss/src/rpc/message/drop_database.rs
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct DropDatabaseRequest {
+ pub inner_request: proto::DropDatabaseRequest,
+}
+
+impl DropDatabaseRequest {
+ pub fn new(database_name: &str, ignore_if_not_exists: bool, cascade: bool)
-> Self {
+ DropDatabaseRequest {
+ inner_request: proto::DropDatabaseRequest {
+ database_name: database_name.to_string(),
+ ignore_if_not_exists,
+ cascade,
+ },
+ }
+ }
+}
+
+impl RequestBody for DropDatabaseRequest {
+ type ResponseBody = proto::DropDatabaseResponse;
+
+ const API_KEY: ApiKey = ApiKey::DropDatabase;
+ const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(DropDatabaseRequest);
+impl_read_version_type!(proto::DropDatabaseResponse);
diff --git a/crates/fluss/src/rpc/message/drop_table.rs
b/crates/fluss/src/rpc/message/drop_table.rs
new file mode 100644
index 0000000..0dbc21b
--- /dev/null
+++ b/crates/fluss/src/rpc/message/drop_table.rs
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::TablePath;
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+
+use crate::proto::DropTableResponse;
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::convert::to_table_path;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct DropTableRequest {
+ pub inner_request: proto::DropTableRequest,
+}
+
+impl DropTableRequest {
+ pub fn new(table_path: &TablePath, ignore_if_not_exists: bool) -> Self {
+ DropTableRequest {
+ inner_request: proto::DropTableRequest {
+ table_path: to_table_path(table_path),
+ ignore_if_not_exists,
+ },
+ }
+ }
+}
+
+impl RequestBody for DropTableRequest {
+ type ResponseBody = DropTableResponse;
+
+ const API_KEY: ApiKey = ApiKey::DropTable;
+
+ const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(DropTableRequest);
+impl_read_version_type!(DropTableResponse);
diff --git a/crates/fluss/src/rpc/message/get_database_info.rs
b/crates/fluss/src/rpc/message/get_database_info.rs
new file mode 100644
index 0000000..85492a8
--- /dev/null
+++ b/crates/fluss/src/rpc/message/get_database_info.rs
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct GetDatabaseInfoRequest {
+ pub inner_request: proto::GetDatabaseInfoRequest,
+}
+
+impl GetDatabaseInfoRequest {
+ pub fn new(database_name: &str) -> Self {
+ GetDatabaseInfoRequest {
+ inner_request: proto::GetDatabaseInfoRequest {
+ database_name: database_name.to_string(),
+ },
+ }
+ }
+}
+
+impl RequestBody for GetDatabaseInfoRequest {
+ type ResponseBody = proto::GetDatabaseInfoResponse;
+
+ const API_KEY: ApiKey = ApiKey::GetDatabaseInfo;
+ const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(GetDatabaseInfoRequest);
+impl_read_version_type!(proto::GetDatabaseInfoResponse);
diff --git a/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs
b/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs
new file mode 100644
index 0000000..a0e186e
--- /dev/null
+++ b/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::proto;
+use crate::proto::PbTablePath;
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+
+use crate::metadata::TablePath;
+use crate::{impl_read_version_type, impl_write_version_type};
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct GetLatestLakeSnapshotRequest {
+ pub inner_request: proto::GetLatestLakeSnapshotRequest,
+}
+
+impl GetLatestLakeSnapshotRequest {
+ pub fn new(table_path: &TablePath) -> Self {
+ let inner_request = proto::GetLatestLakeSnapshotRequest {
+ table_path: PbTablePath {
+ database_name: table_path.database().to_string(),
+ table_name: table_path.table().to_string(),
+ },
+ };
+
+ Self { inner_request }
+ }
+}
+
+impl RequestBody for GetLatestLakeSnapshotRequest {
+ type ResponseBody = proto::GetLatestLakeSnapshotResponse;
+ const API_KEY: ApiKey = ApiKey::GetLatestLakeSnapshot;
+ const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(GetLatestLakeSnapshotRequest);
+impl_read_version_type!(proto::GetLatestLakeSnapshotResponse);
diff --git a/crates/fluss/src/rpc/message/list_databases.rs
b/crates/fluss/src/rpc/message/list_databases.rs
new file mode 100644
index 0000000..ce5a091
--- /dev/null
+++ b/crates/fluss/src/rpc/message/list_databases.rs
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug, Default)]
+pub struct ListDatabasesRequest {
+ pub inner_request: proto::ListDatabasesRequest,
+}
+
+impl ListDatabasesRequest {
+ pub fn new() -> Self {
+ ListDatabasesRequest {
+ inner_request: proto::ListDatabasesRequest {},
+ }
+ }
+}
+
+impl RequestBody for ListDatabasesRequest {
+ type ResponseBody = proto::ListDatabasesResponse;
+
+ const API_KEY: ApiKey = ApiKey::ListDatabases;
+ const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(ListDatabasesRequest);
+impl_read_version_type!(proto::ListDatabasesResponse);
diff --git a/crates/fluss/src/rpc/message/list_tables.rs
b/crates/fluss/src/rpc/message/list_tables.rs
new file mode 100644
index 0000000..daf57ea
--- /dev/null
+++ b/crates/fluss/src/rpc/message/list_tables.rs
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+
+use crate::proto::ListTablesResponse;
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct ListTablesRequest {
+ pub inner_request: proto::ListTablesRequest,
+}
+
+impl ListTablesRequest {
+ pub fn new(database_name: &str) -> Self {
+ ListTablesRequest {
+ inner_request: proto::ListTablesRequest {
+ database_name: database_name.to_string(),
+ },
+ }
+ }
+}
+
+impl RequestBody for ListTablesRequest {
+ type ResponseBody = ListTablesResponse;
+
+ const API_KEY: ApiKey = ApiKey::ListTables;
+
+ const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(ListTablesRequest);
+impl_read_version_type!(ListTablesResponse);
diff --git a/crates/fluss/src/rpc/message/mod.rs
b/crates/fluss/src/rpc/message/mod.rs
index 742c393..d5f8ebd 100644
--- a/crates/fluss/src/rpc/message/mod.rs
+++ b/crates/fluss/src/rpc/message/mod.rs
@@ -20,18 +20,36 @@ use crate::rpc::api_version::ApiVersion;
use crate::rpc::frame::{ReadError, WriteError};
use bytes::{Buf, BufMut};
+mod create_database;
mod create_table;
+mod database_exists;
+mod drop_database;
+mod drop_table;
mod fetch;
+mod get_database_info;
+mod get_latest_lake_snapshot;
mod get_table;
mod header;
+mod list_databases;
+mod list_tables;
mod produce_log;
+mod table_exists;
mod update_metadata;
+pub use create_database::*;
pub use create_table::*;
+pub use database_exists::*;
+pub use drop_database::*;
+pub use drop_table::*;
pub use fetch::*;
+pub use get_database_info::*;
+pub use get_latest_lake_snapshot::*;
pub use get_table::*;
pub use header::*;
+pub use list_databases::*;
+pub use list_tables::*;
pub use produce_log::*;
+pub use table_exists::*;
pub use update_metadata::*;
pub trait RequestBody {
diff --git a/crates/fluss/src/rpc/message/table_exists.rs
b/crates/fluss/src/rpc/message/table_exists.rs
new file mode 100644
index 0000000..3b71f47
--- /dev/null
+++ b/crates/fluss/src/rpc/message/table_exists.rs
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::TablePath;
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+
+use crate::proto::TableExistsResponse;
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::convert::to_table_path;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct TableExistsRequest {
+ pub inner_request: proto::TableExistsRequest,
+}
+
+impl TableExistsRequest {
+ pub fn new(table_path: &TablePath) -> Self {
+ TableExistsRequest {
+ inner_request: proto::TableExistsRequest {
+ table_path: to_table_path(table_path),
+ },
+ }
+ }
+}
+
+impl RequestBody for TableExistsRequest {
+ type ResponseBody = TableExistsResponse;
+
+ const API_KEY: ApiKey = ApiKey::TableExists;
+
+ const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(TableExistsRequest);
+impl_read_version_type!(TableExistsResponse);
diff --git a/crates/fluss/src/rpc/server_connection.rs
b/crates/fluss/src/rpc/server_connection.rs
index a102aa3..4eeda46 100644
--- a/crates/fluss/src/rpc/server_connection.rs
+++ b/crates/fluss/src/rpc/server_connection.rs
@@ -72,7 +72,6 @@ impl RpcClient {
return Ok(connection.clone());
}
}
-
let new_server = self.connect(server_node).await?;
self.connections
.write()