This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 186ded1  [feat] Add more functions to the Rust client Admin (#5)
186ded1 is described below

commit 186ded18baf6cd84d72bac2bc6cd7c870c44f318
Author: naivedogger <[email protected]>
AuthorDate: Thu Sep 11 16:25:12 2025 +0800

    [feat] Add more functions to the Rust client Admin (#5)
---
 .github/{dependabot.yml  => dependabot.yml}        |   0
 crates/fluss/src/client/admin.rs                   | 133 +++++++++++-
 crates/fluss/src/client/mod.rs                     |   6 +-
 crates/fluss/src/client/table/mod.rs               |   5 +-
 crates/fluss/src/client/write/accumulator.rs       |   6 +-
 crates/fluss/src/cluster/cluster.rs                |   1 +
 crates/fluss/src/metadata/database.rs              | 234 +++++++++++++++++++++
 crates/fluss/src/metadata/datatype.rs              |  12 ++
 crates/fluss/src/metadata/mod.rs                   |   4 +-
 crates/fluss/src/metadata/table.rs                 |  65 ++++++
 crates/fluss/src/proto/fluss_api.proto             |  83 ++++++++
 crates/fluss/src/record/arrow.rs                   |   7 +-
 crates/fluss/src/row/datum.rs                      | 103 +++++----
 crates/fluss/src/row/mod.rs                        |   3 +-
 crates/fluss/src/rpc/api_key.rs                    |  39 +++-
 crates/fluss/src/rpc/message/create_database.rs    |  67 ++++++
 crates/fluss/src/rpc/message/database_exists.rs    |  49 +++++
 crates/fluss/src/rpc/message/drop_database.rs      |  51 +++++
 crates/fluss/src/rpc/message/drop_table.rs         |  56 +++++
 crates/fluss/src/rpc/message/get_database_info.rs  |  49 +++++
 .../src/rpc/message/get_latest_lake_snapshot.rs    |  55 +++++
 crates/fluss/src/rpc/message/list_databases.rs     |  47 +++++
 crates/fluss/src/rpc/message/list_tables.rs        |  53 +++++
 crates/fluss/src/rpc/message/mod.rs                |  18 ++
 crates/fluss/src/rpc/message/table_exists.rs       |  55 +++++
 crates/fluss/src/rpc/server_connection.rs          |   1 -
 26 files changed, 1144 insertions(+), 58 deletions(-)

diff --git a/.github/dependabot.yml  b/.github/dependabot.yml
similarity index 100%
rename from .github/dependabot.yml 
rename to .github/dependabot.yml
diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs
index 8688a2d..2584034 100644
--- a/crates/fluss/src/client/admin.rs
+++ b/crates/fluss/src/client/admin.rs
@@ -16,18 +16,28 @@
 // under the License.
 
 use crate::client::metadata::Metadata;
-use crate::metadata::{JsonSerde, TableDescriptor, TableInfo, TablePath};
-use crate::rpc::message::{CreateTableRequest, GetTableRequest};
+use crate::metadata::{
+    DatabaseDescriptor, DatabaseInfo, JsonSerde, LakeSnapshot, TableBucket, 
TableDescriptor,
+    TableInfo, TablePath,
+};
+use crate::rpc::message::{
+    CreateDatabaseRequest, CreateTableRequest, DatabaseExistsRequest, 
DropDatabaseRequest,
+    DropTableRequest, GetDatabaseInfoRequest, GetLatestLakeSnapshotRequest, 
GetTableRequest,
+    ListDatabasesRequest, ListTablesRequest, TableExistsRequest,
+};
 use crate::rpc::{RpcClient, ServerConnection};
+
+use std::collections::HashMap;
 use std::sync::Arc;
 
 use crate::error::Result;
 use crate::proto::GetTableInfoResponse;
 
-#[allow(dead_code)]
 pub struct FlussAdmin {
     admin_gateway: ServerConnection,
+    #[allow(dead_code)]
     metadata: Arc<Metadata>,
+    #[allow(dead_code)]
     rpc_client: Arc<RpcClient>,
 }
 
@@ -49,6 +59,23 @@ impl FlussAdmin {
         })
     }
 
+    pub async fn create_database(
+        &self,
+        database_name: &str,
+        ignore_if_exists: bool,
+        database_descriptor: Option<&DatabaseDescriptor>,
+    ) -> Result<()> {
+        let _response = self
+            .admin_gateway
+            .request(CreateDatabaseRequest::new(
+                database_name,
+                ignore_if_exists,
+                database_descriptor,
+            )?)
+            .await?;
+        Ok(())
+    }
+
     pub async fn create_table(
         &self,
         table_path: &TablePath,
@@ -66,6 +93,14 @@ impl FlussAdmin {
         Ok(())
     }
 
+    pub async fn drop_table(&self, table_path: &TablePath, ignore_if_exists: 
bool) -> Result<()> {
+        let _response = self
+            .admin_gateway
+            .request(DropTableRequest::new(table_path, ignore_if_exists))
+            .await?;
+        Ok(())
+    }
+
     pub async fn get_table(&self, table_path: &TablePath) -> Result<TableInfo> 
{
         let response = self
             .admin_gateway
@@ -90,4 +125,96 @@ impl FlussAdmin {
             modified_time,
         ))
     }
+
+    /// List all tables in the given database
+    pub async fn list_tables(&self, database_name: &str) -> 
Result<Vec<String>> {
+        let response = self
+            .admin_gateway
+            .request(ListTablesRequest::new(database_name))
+            .await?;
+        Ok(response.table_name)
+    }
+
+    /// Check if a table exists
+    pub async fn table_exists(&self, table_path: &TablePath) -> Result<bool> {
+        let response = self
+            .admin_gateway
+            .request(TableExistsRequest::new(table_path))
+            .await?;
+        Ok(response.exists)
+    }
+
+    /// Drop a database
+    pub async fn drop_database(
+        &self,
+        database_name: &str,
+        ignore_if_not_exists: bool,
+        cascade: bool,
+    ) -> Result<()> {
+        let _response = self
+            .admin_gateway
+            .request(DropDatabaseRequest::new(
+                database_name,
+                ignore_if_not_exists,
+                cascade,
+            ))
+            .await?;
+        Ok(())
+    }
+
+    /// List all databases
+    pub async fn list_databases(&self) -> Result<Vec<String>> {
+        let response = self
+            .admin_gateway
+            .request(ListDatabasesRequest::new())
+            .await?;
+        Ok(response.database_name)
+    }
+
+    /// Check if a database exists
+    pub async fn database_exists(&self, database_name: &str) -> Result<bool> {
+        let response = self
+            .admin_gateway
+            .request(DatabaseExistsRequest::new(database_name))
+            .await?;
+        Ok(response.exists)
+    }
+
+    /// Get database information
+    pub async fn get_database_info(&self, database_name: &str) -> 
Result<DatabaseInfo> {
+        let request = GetDatabaseInfoRequest::new(database_name);
+        let response = self.admin_gateway.request(request).await?;
+
+        // Convert proto response to DatabaseInfo
+        let database_descriptor = 
DatabaseDescriptor::from_json_bytes(&response.database_json)?;
+
+        Ok(DatabaseInfo::new(
+            database_name.to_string(),
+            database_descriptor,
+            response.created_time,
+            response.modified_time,
+        ))
+    }
+
+    /// Get the latest lake snapshot for a table
+    pub async fn get_latest_lake_snapshot(&self, table_path: &TablePath) -> 
Result<LakeSnapshot> {
+        let response = self
+            .admin_gateway
+            .request(GetLatestLakeSnapshotRequest::new(table_path))
+            .await?;
+
+        // Convert proto response to LakeSnapshot
+        let mut table_buckets_offset = HashMap::new();
+        for bucket_snapshot in response.bucket_snapshots {
+            let table_bucket = TableBucket::new(response.table_id, 
bucket_snapshot.bucket_id);
+            if let Some(log_offset) = bucket_snapshot.log_offset {
+                table_buckets_offset.insert(table_bucket, log_offset);
+            }
+        }
+
+        Ok(LakeSnapshot::new(
+            response.snapshot_id,
+            table_buckets_offset,
+        ))
+    }
 }
diff --git a/crates/fluss/src/client/mod.rs b/crates/fluss/src/client/mod.rs
index 5b6908e..a971439 100644
--- a/crates/fluss/src/client/mod.rs
+++ b/crates/fluss/src/client/mod.rs
@@ -17,10 +17,12 @@
 
 mod admin;
 mod connection;
+mod metadata;
 mod table;
 mod write;
 
+pub use admin::*;
 pub use connection::*;
-mod metadata;
-
+pub use metadata::*;
+pub use table::*;
 pub use write::*;
diff --git a/crates/fluss/src/client/table/mod.rs 
b/crates/fluss/src/client/table/mod.rs
index 503a1ed..4d6f8f0 100644
--- a/crates/fluss/src/client/table/mod.rs
+++ b/crates/fluss/src/client/table/mod.rs
@@ -17,8 +17,6 @@
 
 use crate::client::connection::FlussConnection;
 use crate::client::metadata::Metadata;
-use crate::client::table::append::TableAppend;
-use crate::client::table::scanner::TableScan;
 use crate::metadata::{TableInfo, TablePath};
 use std::sync::Arc;
 
@@ -29,6 +27,9 @@ mod append;
 mod scanner;
 mod writer;
 
+pub use append::TableAppend;
+pub use scanner::TableScan;
+
 #[allow(dead_code)]
 pub struct FlussTable<'a> {
     conn: &'a FlussConnection,
diff --git a/crates/fluss/src/client/write/accumulator.rs 
b/crates/fluss/src/client/write/accumulator.rs
index 0b77894..32622c7 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -93,13 +93,15 @@ impl RecordAccumulator {
         }
 
         let table_path = &record.table_path;
-
+        let table_info = cluster.get_table(table_path);
         let row_type = &cluster.get_table(table_path).row_type;
 
+        let schema_id = table_info.schema_id;
+
         let mut batch = ArrowLog(ArrowLogWriteBatch::new(
             self.batch_id.fetch_add(1, Ordering::Relaxed),
             table_path.as_ref().clone(),
-            0,
+            schema_id,
             row_type,
             bucket_id,
             current_time_ms(),
diff --git a/crates/fluss/src/cluster/cluster.rs 
b/crates/fluss/src/cluster/cluster.rs
index 1f8341d..a6f20a8 100644
--- a/crates/fluss/src/cluster/cluster.rs
+++ b/crates/fluss/src/cluster/cluster.rs
@@ -126,6 +126,7 @@ impl Cluster {
                 table_metadata.modified_time,
             );
             table_info_by_path.insert(table_path.clone(), table_info);
+            table_id_by_path.insert(table_path.clone(), table_id);
 
             // now, get bucket matadata
             let mut found_unavailable_bucket = false;
diff --git a/crates/fluss/src/metadata/database.rs 
b/crates/fluss/src/metadata/database.rs
new file mode 100644
index 0000000..2649421
--- /dev/null
+++ b/crates/fluss/src/metadata/database.rs
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Error::JsonSerdeError;
+use crate::error::Result;
+use crate::metadata::JsonSerde;
+use serde::{Deserialize, Serialize};
+use serde_json::{Value, json};
+use std::collections::HashMap;
+
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+pub struct DatabaseDescriptor {
+    comment: Option<String>,
+    custom_properties: HashMap<String, String>,
+}
+
+#[derive(Debug, Clone)]
+pub struct DatabaseInfo {
+    database_name: String,
+    database_descriptor: DatabaseDescriptor,
+    created_time: i64,
+    modified_time: i64,
+}
+
+impl DatabaseInfo {
+    pub fn new(
+        database_name: String,
+        database_descriptor: DatabaseDescriptor,
+        created_time: i64,
+        modified_time: i64,
+    ) -> Self {
+        Self {
+            database_name,
+            database_descriptor,
+            created_time,
+            modified_time,
+        }
+    }
+
+    pub fn database_name(&self) -> &str {
+        &self.database_name
+    }
+
+    pub fn database_descriptor(&self) -> &DatabaseDescriptor {
+        &self.database_descriptor
+    }
+
+    pub fn created_time(&self) -> i64 {
+        self.created_time
+    }
+
+    pub fn modified_time(&self) -> i64 {
+        self.modified_time
+    }
+}
+
+#[derive(Debug, Default)]
+pub struct DatabaseDescriptorBuilder {
+    comment: Option<String>,
+    custom_properties: HashMap<String, String>,
+}
+
+impl DatabaseDescriptor {
+    pub fn builder() -> DatabaseDescriptorBuilder {
+        DatabaseDescriptorBuilder::default()
+    }
+
+    pub fn comment(&self) -> Option<&str> {
+        self.comment.as_deref()
+    }
+
+    pub fn custom_properties(&self) -> &HashMap<String, String> {
+        &self.custom_properties
+    }
+}
+
+impl DatabaseDescriptorBuilder {
+    pub fn comment(mut self, comment: &str) -> Self {
+        self.comment = Some(comment.to_string());
+        self
+    }
+
+    pub fn custom_properties(mut self, properties: HashMap<String, String>) -> 
Self {
+        self.custom_properties = properties;
+        self
+    }
+
+    pub fn custom_property(mut self, key: &str, value: &str) -> Self {
+        self.custom_properties
+            .insert(key.to_string(), value.to_string());
+        self
+    }
+
+    pub fn build(self) -> Result<DatabaseDescriptor> {
+        Ok(DatabaseDescriptor {
+            comment: self.comment,
+            custom_properties: self.custom_properties,
+        })
+    }
+}
+
+impl DatabaseDescriptor {
+    const CUSTOM_PROPERTIES_NAME: &'static str = "custom_properties";
+    const COMMENT_NAME: &'static str = "comment";
+    const VERSION_KEY: &'static str = "version";
+    const VERSION: u32 = 1;
+}
+
+impl JsonSerde for DatabaseDescriptor {
+    fn serialize_json(&self) -> Result<Value> {
+        let mut obj = serde_json::Map::new();
+
+        // Serialize version
+        obj.insert(Self::VERSION_KEY.to_string(), json!(Self::VERSION));
+
+        // Serialize comment if present
+        if let Some(comment) = self.comment() {
+            obj.insert(Self::COMMENT_NAME.to_string(), json!(comment));
+        }
+
+        // Serialize custom properties
+        obj.insert(
+            Self::CUSTOM_PROPERTIES_NAME.to_string(),
+            json!(self.custom_properties()),
+        );
+
+        Ok(Value::Object(obj))
+    }
+
+    fn deserialize_json(node: &Value) -> Result<Self> {
+        let mut builder = DatabaseDescriptor::builder();
+
+        // Deserialize comment if present
+        if let Some(comment_node) = node.get(Self::COMMENT_NAME) {
+            let comment = comment_node
+                .as_str()
+                .ok_or_else(|| {
+                    JsonSerdeError(format!("{} should be a string", 
Self::COMMENT_NAME))
+                })?
+                .to_owned();
+            builder = builder.comment(&comment);
+        }
+
+        // Deserialize custom properties directly
+        let custom_properties = if let Some(props_node) = 
node.get(Self::CUSTOM_PROPERTIES_NAME) {
+            let obj = props_node.as_object().ok_or_else(|| {
+                JsonSerdeError("Custom properties should be an 
object".to_string())
+            })?;
+
+            let mut properties = HashMap::with_capacity(obj.len());
+            for (key, value) in obj {
+                properties.insert(
+                    key.clone(),
+                    value
+                        .as_str()
+                        .ok_or_else(|| {
+                            JsonSerdeError("Property value should be a 
string".to_string())
+                        })?
+                        .to_owned(),
+                );
+            }
+            properties
+        } else {
+            HashMap::new()
+        };
+        builder = builder.custom_properties(custom_properties);
+
+        builder.build()
+    }
+}
+
+impl DatabaseDescriptor {
+    /// Create DatabaseDescriptor from JSON bytes (equivalent to Java's 
fromJsonBytes)
+    pub fn from_json_bytes(bytes: &[u8]) -> Result<Self> {
+        let json_value: Value = serde_json::from_slice(bytes)
+            .map_err(|e| JsonSerdeError(format!("Failed to parse JSON: {}", 
e)))?;
+        Self::deserialize_json(&json_value)
+    }
+
+    /// Convert DatabaseDescriptor to JSON bytes
+    pub fn to_json_bytes(&self) -> Result<Vec<u8>> {
+        let json_value = self.serialize_json()?;
+        serde_json::to_vec(&json_value)
+            .map_err(|e| JsonSerdeError(format!("Failed to serialize to JSON: 
{}", e)))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_database_descriptor_json_serde() {
+        let mut custom_props = HashMap::new();
+        custom_props.insert("key1".to_string(), "value1".to_string());
+        custom_props.insert("key2".to_string(), "value2".to_string());
+
+        let descriptor = DatabaseDescriptor::builder()
+            .comment("Test database")
+            .custom_properties(custom_props)
+            .build()
+            .unwrap();
+
+        // Test serialization
+        let json_bytes = descriptor.to_json_bytes().unwrap();
+        println!("Serialized JSON: {}", String::from_utf8_lossy(&json_bytes));
+
+        // Test deserialization
+        let deserialized = 
DatabaseDescriptor::from_json_bytes(&json_bytes).unwrap();
+        assert_eq!(descriptor, deserialized);
+    }
+
+    #[test]
+    fn test_empty_database_descriptor() {
+        let descriptor = DatabaseDescriptor::builder().build().unwrap();
+        let json_bytes = descriptor.to_json_bytes().unwrap();
+        let deserialized = 
DatabaseDescriptor::from_json_bytes(&json_bytes).unwrap();
+        assert_eq!(descriptor, deserialized);
+    }
+}
diff --git a/crates/fluss/src/metadata/datatype.rs 
b/crates/fluss/src/metadata/datatype.rs
index 0c00c6f..09ca0c2 100644
--- a/crates/fluss/src/metadata/datatype.rs
+++ b/crates/fluss/src/metadata/datatype.rs
@@ -591,6 +591,10 @@ impl ArrayType {
             element_type: self.element_type.clone(),
         }
     }
+
+    pub fn get_element_type(&self) -> &DataType {
+        &self.element_type
+    }
 }
 
 #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, Hash)]
@@ -620,6 +624,14 @@ impl MapType {
             value_type: self.value_type.clone(),
         }
     }
+
+    pub fn key_type(&self) -> &DataType {
+        &self.key_type
+    }
+
+    pub fn value_type(&self) -> &DataType {
+        &self.value_type
+    }
 }
 
 #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, Hash)]
diff --git a/crates/fluss/src/metadata/mod.rs b/crates/fluss/src/metadata/mod.rs
index 7946547..8754007 100644
--- a/crates/fluss/src/metadata/mod.rs
+++ b/crates/fluss/src/metadata/mod.rs
@@ -15,10 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
+mod database;
 mod datatype;
-pub use datatype::*;
 mod json_serde;
 mod table;
 
+pub use database::*;
+pub use datatype::*;
 pub use json_serde::*;
 pub use table::*;
diff --git a/crates/fluss/src/metadata/table.rs 
b/crates/fluss/src/metadata/table.rs
index a5ab61d..90e3573 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -584,6 +584,16 @@ impl Display for LogFormat {
     }
 }
 
+impl LogFormat {
+    pub fn parse(s: &str) -> Result<Self> {
+        match s.to_uppercase().as_str() {
+            "ARROW" => Ok(LogFormat::ARROW),
+            "INDEXED" => Ok(LogFormat::INDEXED),
+            _ => Err(InvalidTableError(format!("Unknown log format: {}", s))),
+        }
+    }
+}
+
 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
 pub enum KvFormat {
     INDEXED,
@@ -600,6 +610,16 @@ impl Display for KvFormat {
     }
 }
 
+impl KvFormat {
+    pub fn parse(s: &str) -> Result<Self> {
+        match s.to_uppercase().as_str() {
+            "INDEXED" => Ok(KvFormat::INDEXED),
+            "COMPACTED" => Ok(KvFormat::COMPACTED),
+            _ => Err(InvalidTableError(format!("Unknown kv format: {}", s))),
+        }
+    }
+}
+
 #[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
 pub struct TablePath {
     database: String,
@@ -631,6 +651,28 @@ impl TablePath {
     }
 }
 
+#[derive(Debug, Clone)]
+pub struct PhysicalTablePath {
+    table_path: TablePath,
+    #[allow(dead_code)]
+    partition: Option<String>,
+}
+
+impl PhysicalTablePath {
+    pub fn of(table_path: TablePath) -> Self {
+        Self {
+            table_path,
+            partition: None,
+        }
+    }
+
+    // TODO: support partition
+
+    pub fn get_table_path(&self) -> &TablePath {
+        &self.table_path
+    }
+}
+
 #[derive(Debug, Clone)]
 pub struct TableInfo {
     pub table_path: TablePath,
@@ -918,3 +960,26 @@ impl TableBucket {
         self.partition_id
     }
 }
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LakeSnapshot {
+    pub snapshot_id: i64,
+    pub table_buckets_offset: HashMap<TableBucket, i64>,
+}
+
+impl LakeSnapshot {
+    pub fn new(snapshot_id: i64, table_buckets_offset: HashMap<TableBucket, 
i64>) -> Self {
+        Self {
+            snapshot_id,
+            table_buckets_offset,
+        }
+    }
+
+    pub fn snapshot_id(&self) -> i64 {
+        self.snapshot_id
+    }
+
+    pub fn table_buckets_offset(&self) -> &HashMap<TableBucket, i64> {
+        &self.table_buckets_offset
+    }
+}
diff --git a/crates/fluss/src/proto/fluss_api.proto 
b/crates/fluss/src/proto/fluss_api.proto
index 195b8f8..d71197b 100644
--- a/crates/fluss/src/proto/fluss_api.proto
+++ b/crates/fluss/src/proto/fluss_api.proto
@@ -123,6 +123,21 @@ message CreateTableRequest {
 message CreateTableResponse {
 }
 
+message DropTableRequest {
+  required PbTablePath table_path = 1;
+  required bool ignore_if_not_exists = 2;
+}
+
+message DropTableResponse {
+}
+
+message TableExistsRequest {
+  required PbTablePath table_path = 1;
+}
+
+message TableExistsResponse {
+  required bool exists = 1;
+}
 
 message GetTableInfoRequest {
   required PbTablePath table_path = 1;
@@ -136,6 +151,57 @@ message GetTableInfoResponse {
   required int64 modified_time = 5;
 }
 
+message ListTablesRequest {
+  required string database_name = 1;
+}
+
+message ListTablesResponse {
+  repeated string table_name = 1;
+}
+
+message CreateDatabaseRequest {
+  required string database_name = 1;
+  required bool ignore_if_exists = 2;
+  optional bytes database_json = 3;
+}
+
+message CreateDatabaseResponse {
+}
+
+message GetDatabaseInfoRequest {
+  required string database_name = 1;
+}
+
+message GetDatabaseInfoResponse {
+  required bytes database_json = 3;
+  required int64 created_time = 4;
+  required int64 modified_time = 5;
+}
+
+message DropDatabaseRequest {
+  required string database_name = 1;
+  required bool ignore_if_not_exists = 2;
+  required bool cascade = 3;
+}
+
+message DropDatabaseResponse {
+}
+
+message DatabaseExistsRequest {
+  required string database_name = 1;
+}
+
+message DatabaseExistsResponse {
+  required bool exists = 1;
+}
+
+message ListDatabasesRequest {
+}
+
+message ListDatabasesResponse {
+  repeated string database_name = 1;
+}
+
 
 // fetch log request and response
 message FetchLogRequest {
@@ -194,4 +260,21 @@ message PbRemoteLogSegment {
   required int64 remote_log_start_offset = 2;
   required int64 remote_log_end_offset = 3;
   required int32 segment_size_in_bytes = 4;
+}
+
+// fetch latest lake snapshot
+message GetLatestLakeSnapshotRequest {
+  required PbTablePath table_path = 1;
+}
+
+message GetLatestLakeSnapshotResponse {
+  required int64 table_id = 1;
+  required int64 snapshotId = 2;
+  repeated PbLakeSnapshotForBucket bucket_snapshots = 3;
+}
+
+message PbLakeSnapshotForBucket {
+  optional int64 partition_id = 1;
+  required int32 bucket_id = 2;
+  optional int64 log_offset = 3;
 }
\ No newline at end of file
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 2f595d0..fa63b00 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -16,9 +16,9 @@
 // under the License.
 
 use arrow::array::{
-    ArrayBuilder, ArrayRef, BooleanBuilder, Float32Builder, Float64Builder, 
Int8Builder,
-    Int16Builder, Int32Builder, Int64Builder, StringBuilder, UInt8Builder, 
UInt16Builder,
-    UInt32Builder, UInt64Builder,
+    ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Float32Builder, 
Float64Builder,
+    Int8Builder, Int16Builder, Int32Builder, Int64Builder, StringBuilder, 
UInt8Builder,
+    UInt16Builder, UInt32Builder, UInt64Builder,
 };
 use arrow::{
     array::RecordBatch,
@@ -224,6 +224,7 @@ impl MemoryLogRecordsArrowBuilder {
             arrow_schema::DataType::Float64 => Box::new(Float64Builder::new()),
             arrow_schema::DataType::Boolean => Box::new(BooleanBuilder::new()),
             arrow_schema::DataType::Utf8 => Box::new(StringBuilder::new()),
+            arrow_schema::DataType::Binary => Box::new(BinaryBuilder::new()),
             dt => panic!("Unsupported data type: {dt:?}"),
         }
     }
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index 3c65a7d..d8c4f74 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -19,7 +19,10 @@ use chrono::Datelike;
 
 use crate::error::Error::RowConvertError;
 use crate::error::Result;
-use arrow::array::{ArrayBuilder, Int8Builder, Int16Builder, Int32Builder, 
StringBuilder};
+use arrow::array::{
+    ArrayBuilder, BinaryBuilder, BooleanBuilder, Float32Builder, 
Float64Builder, Int8Builder,
+    Int16Builder, Int32Builder, Int64Builder, StringBuilder,
+};
 use chrono::NaiveDate;
 use ordered_float::OrderedFloat;
 use parse_display::Display;
@@ -47,6 +50,8 @@ pub enum Datum<'a> {
     #[display("{0}")]
     Int64(i64),
     #[display("{0}")]
+    Float32(F32),
+    #[display("{0}")]
     Float64(F64),
     #[display("'{0}'")]
     String(&'a str),
@@ -96,6 +101,20 @@ impl From<Option<&()>> for Datum<'_> {
     }
 }
 
+impl<'a> From<f32> for Datum<'a> {
+    #[inline]
+    fn from(f: f32) -> Datum<'a> {
+        Datum::Float32(F32::from(f))
+    }
+}
+
+impl<'a> From<f64> for Datum<'a> {
+    #[inline]
+    fn from(f: f64) -> Datum<'a> {
+        Datum::Float64(F64::from(f))
+    }
+}
+
 impl TryFrom<&Datum<'_>> for i32 {
     type Error = ();
 
@@ -126,45 +145,56 @@ pub trait ToArrow {
 
 impl Datum<'_> {
     pub fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()> {
+        macro_rules! append_null_to_arrow {
+            ($builder_type:ty) => {
+                if let Some(b) = 
builder.as_any_mut().downcast_mut::<$builder_type>() {
+                    b.append_null();
+                    return Ok(());
+                }
+            };
+        }
+
+        macro_rules! append_value_to_arrow {
+            ($builder_type:ty, $value:expr) => {
+                if let Some(b) = 
builder.as_any_mut().downcast_mut::<$builder_type>() {
+                    b.append_value($value);
+                    return Ok(());
+                }
+            };
+        }
+
         match self {
             Datum::Null => {
-                todo!()
-            }
-            Datum::Bool(_v) => {
-                todo!()
-            }
-            Datum::Int16(_v) => {
-                todo!()
-            }
-            Datum::Int32(v) => {
-                v.append_to(builder)?;
-            }
-            Datum::Int64(_v) => {
-                todo!()
-            }
-            Datum::Float64(_v) => {
-                todo!()
-            }
-            Datum::String(v) => {
-                v.append_to(builder)?;
+                append_null_to_arrow!(BooleanBuilder);
+                append_null_to_arrow!(Int16Builder);
+                append_null_to_arrow!(Int32Builder);
+                append_null_to_arrow!(Int64Builder);
+                append_null_to_arrow!(Float32Builder);
+                append_null_to_arrow!(Float64Builder);
+                append_null_to_arrow!(StringBuilder);
+                append_null_to_arrow!(BinaryBuilder);
             }
-            Datum::Blob(_v) => {
-                todo!()
-            }
-            Datum::Decimal(_v) => {
-                todo!()
-            }
-            Datum::Date(_v) => {
-                todo!()
-            }
-            Datum::Timestamp(_v) => {
-                todo!()
-            }
-            Datum::TimestampTz(_v) => {
-                todo!()
+            Datum::Bool(v) => append_value_to_arrow!(BooleanBuilder, *v),
+            Datum::Int16(v) => append_value_to_arrow!(Int16Builder, *v),
+            Datum::Int32(v) => append_value_to_arrow!(Int32Builder, *v),
+            Datum::Int64(v) => append_value_to_arrow!(Int64Builder, *v),
+            Datum::Float32(v) => append_value_to_arrow!(Float32Builder, 
v.into_inner()),
+            Datum::Float64(v) => append_value_to_arrow!(Float64Builder, 
v.into_inner()),
+            Datum::String(v) => append_value_to_arrow!(StringBuilder, *v),
+            Datum::Blob(v) => append_value_to_arrow!(BinaryBuilder, 
v.as_ref()),
+            Datum::Decimal(_) | Datum::Date(_) | Datum::Timestamp(_) | 
Datum::TimestampTz(_) => {
+                return Err(RowConvertError(format!(
+                    "Type {:?} is not yet supported for Arrow conversion",
+                    std::mem::discriminant(self)
+                )));
             }
         }
-        Ok(())
+
+        Err(RowConvertError(format!(
+            "Cannot append {:?} to builder of type {}",
+            self,
+            std::any::type_name_of_val(builder)
+        )))
     }
 }
 
@@ -190,9 +220,10 @@ macro_rules! impl_to_arrow {
 impl_to_arrow!(i8, Int8Builder);
 impl_to_arrow!(i16, Int16Builder);
 impl_to_arrow!(i32, Int32Builder);
+impl_to_arrow!(f32, Float32Builder);
+impl_to_arrow!(f64, Float64Builder);
 impl_to_arrow!(&str, StringBuilder);
 
-#[allow(dead_code)]
 pub type F32 = OrderedFloat<f32>;
 pub type F64 = OrderedFloat<f64>;
 #[allow(dead_code)]
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index ead6ff0..b900cb5 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -15,13 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::row::datum::Datum;
-
 mod column;
 
 mod datum;
 
 pub use column::*;
+pub use datum::*;
 
 pub trait InternalRow {
     /// Returns the number of fields in this row
diff --git a/crates/fluss/src/rpc/api_key.rs b/crates/fluss/src/rpc/api_key.rs
index 4928208..18ce44f 100644
--- a/crates/fluss/src/rpc/api_key.rs
+++ b/crates/fluss/src/rpc/api_key.rs
@@ -19,22 +19,40 @@ use crate::rpc::api_key::ApiKey::Unknown;
 
 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 pub enum ApiKey {
+    CreateDatabase,
+    DropDatabase,
+    ListDatabases,
+    DatabaseExists,
     CreateTable,
+    DropTable,
+    GetTable,
+    ListTables,
+    TableExists,
+    MetaData,
     ProduceLog,
     FetchLog,
-    MetaData,
-    GetTable,
+    GetDatabaseInfo,
+    GetLatestLakeSnapshot,
     Unknown(i16),
 }
 
 impl From<i16> for ApiKey {
     fn from(key: i16) -> Self {
         match key {
+            1001 => ApiKey::CreateDatabase,
+            1002 => ApiKey::DropDatabase,
+            1003 => ApiKey::ListDatabases,
+            1004 => ApiKey::DatabaseExists,
             1005 => ApiKey::CreateTable,
+            1006 => ApiKey::DropTable,
+            1007 => ApiKey::GetTable,
+            1008 => ApiKey::ListTables,
+            1010 => ApiKey::TableExists,
+            1012 => ApiKey::MetaData,
             1014 => ApiKey::ProduceLog,
             1015 => ApiKey::FetchLog,
-            1012 => ApiKey::MetaData,
-            1007 => ApiKey::GetTable,
+            1032 => ApiKey::GetLatestLakeSnapshot,
+            1035 => ApiKey::GetDatabaseInfo,
             _ => Unknown(key),
         }
     }
@@ -43,11 +61,20 @@ impl From<i16> for ApiKey {
 impl From<ApiKey> for i16 {
     fn from(key: ApiKey) -> Self {
         match key {
+            ApiKey::CreateDatabase => 1001,
+            ApiKey::DropDatabase => 1002,
+            ApiKey::ListDatabases => 1003,
+            ApiKey::DatabaseExists => 1004,
             ApiKey::CreateTable => 1005,
-            ApiKey::ProduceLog => 1014,
-            ApiKey::MetaData => 1012,
+            ApiKey::DropTable => 1006,
             ApiKey::GetTable => 1007,
+            ApiKey::ListTables => 1008,
+            ApiKey::TableExists => 1010,
+            ApiKey::MetaData => 1012,
+            ApiKey::ProduceLog => 1014,
             ApiKey::FetchLog => 1015,
+            ApiKey::GetLatestLakeSnapshot => 1032,
+            ApiKey::GetDatabaseInfo => 1035,
             Unknown(x) => x,
         }
     }
diff --git a/crates/fluss/src/rpc/message/create_database.rs 
b/crates/fluss/src/rpc/message/create_database.rs
new file mode 100644
index 0000000..e4052ef
--- /dev/null
+++ b/crates/fluss/src/rpc/message/create_database.rs
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::DatabaseDescriptor;
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+
+use crate::error::Result as FlussResult;
+use crate::proto::CreateDatabaseResponse;
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct CreateDatabaseRequest {
+    pub inner_request: proto::CreateDatabaseRequest,
+}
+
+impl CreateDatabaseRequest {
+    pub fn new(
+        database_name: &str,
+        ignore_if_exists: bool,
+        database_descriptor: Option<&DatabaseDescriptor>,
+    ) -> FlussResult<Self> {
+        let database_json = if let Some(descriptor) = database_descriptor {
+            Some(descriptor.to_json_bytes()?)
+        } else {
+            None
+        };
+
+        Ok(CreateDatabaseRequest {
+            inner_request: proto::CreateDatabaseRequest {
+                database_name: database_name.to_string(),
+                ignore_if_exists,
+                database_json,
+            },
+        })
+    }
+}
+
+impl RequestBody for CreateDatabaseRequest {
+    type ResponseBody = CreateDatabaseResponse;
+
+    const API_KEY: ApiKey = ApiKey::CreateDatabase;
+
+    const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(CreateDatabaseRequest);
+impl_read_version_type!(CreateDatabaseResponse);
diff --git a/crates/fluss/src/rpc/message/database_exists.rs 
b/crates/fluss/src/rpc/message/database_exists.rs
new file mode 100644
index 0000000..795eea1
--- /dev/null
+++ b/crates/fluss/src/rpc/message/database_exists.rs
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct DatabaseExistsRequest {
+    pub inner_request: proto::DatabaseExistsRequest,
+}
+
+impl DatabaseExistsRequest {
+    pub fn new(database_name: &str) -> Self {
+        DatabaseExistsRequest {
+            inner_request: proto::DatabaseExistsRequest {
+                database_name: database_name.to_string(),
+            },
+        }
+    }
+}
+
+impl RequestBody for DatabaseExistsRequest {
+    type ResponseBody = proto::DatabaseExistsResponse;
+
+    const API_KEY: ApiKey = ApiKey::DatabaseExists;
+    const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(DatabaseExistsRequest);
+impl_read_version_type!(proto::DatabaseExistsResponse);
diff --git a/crates/fluss/src/rpc/message/drop_database.rs 
b/crates/fluss/src/rpc/message/drop_database.rs
new file mode 100644
index 0000000..49cbfaf
--- /dev/null
+++ b/crates/fluss/src/rpc/message/drop_database.rs
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct DropDatabaseRequest {
+    pub inner_request: proto::DropDatabaseRequest,
+}
+
+impl DropDatabaseRequest {
+    pub fn new(database_name: &str, ignore_if_not_exists: bool, cascade: bool) 
-> Self {
+        DropDatabaseRequest {
+            inner_request: proto::DropDatabaseRequest {
+                database_name: database_name.to_string(),
+                ignore_if_not_exists,
+                cascade,
+            },
+        }
+    }
+}
+
+impl RequestBody for DropDatabaseRequest {
+    type ResponseBody = proto::DropDatabaseResponse;
+
+    const API_KEY: ApiKey = ApiKey::DropDatabase;
+    const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(DropDatabaseRequest);
+impl_read_version_type!(proto::DropDatabaseResponse);
diff --git a/crates/fluss/src/rpc/message/drop_table.rs 
b/crates/fluss/src/rpc/message/drop_table.rs
new file mode 100644
index 0000000..0dbc21b
--- /dev/null
+++ b/crates/fluss/src/rpc/message/drop_table.rs
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::TablePath;
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+
+use crate::proto::DropTableResponse;
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::convert::to_table_path;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct DropTableRequest {
+    pub inner_request: proto::DropTableRequest,
+}
+
+impl DropTableRequest {
+    pub fn new(table_path: &TablePath, ignore_if_not_exists: bool) -> Self {
+        DropTableRequest {
+            inner_request: proto::DropTableRequest {
+                table_path: to_table_path(table_path),
+                ignore_if_not_exists,
+            },
+        }
+    }
+}
+
+impl RequestBody for DropTableRequest {
+    type ResponseBody = DropTableResponse;
+
+    const API_KEY: ApiKey = ApiKey::DropTable;
+
+    const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(DropTableRequest);
+impl_read_version_type!(DropTableResponse);
diff --git a/crates/fluss/src/rpc/message/get_database_info.rs 
b/crates/fluss/src/rpc/message/get_database_info.rs
new file mode 100644
index 0000000..85492a8
--- /dev/null
+++ b/crates/fluss/src/rpc/message/get_database_info.rs
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct GetDatabaseInfoRequest {
+    pub inner_request: proto::GetDatabaseInfoRequest,
+}
+
+impl GetDatabaseInfoRequest {
+    pub fn new(database_name: &str) -> Self {
+        GetDatabaseInfoRequest {
+            inner_request: proto::GetDatabaseInfoRequest {
+                database_name: database_name.to_string(),
+            },
+        }
+    }
+}
+
+impl RequestBody for GetDatabaseInfoRequest {
+    type ResponseBody = proto::GetDatabaseInfoResponse;
+
+    const API_KEY: ApiKey = ApiKey::GetDatabaseInfo;
+    const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(GetDatabaseInfoRequest);
+impl_read_version_type!(proto::GetDatabaseInfoResponse);
diff --git a/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs 
b/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs
new file mode 100644
index 0000000..a0e186e
--- /dev/null
+++ b/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::proto;
+use crate::proto::PbTablePath;
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+
+use crate::metadata::TablePath;
+use crate::{impl_read_version_type, impl_write_version_type};
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct GetLatestLakeSnapshotRequest {
+    pub inner_request: proto::GetLatestLakeSnapshotRequest,
+}
+
+impl GetLatestLakeSnapshotRequest {
+    pub fn new(table_path: &TablePath) -> Self {
+        let inner_request = proto::GetLatestLakeSnapshotRequest {
+            table_path: PbTablePath {
+                database_name: table_path.database().to_string(),
+                table_name: table_path.table().to_string(),
+            },
+        };
+
+        Self { inner_request }
+    }
+}
+
+impl RequestBody for GetLatestLakeSnapshotRequest {
+    type ResponseBody = proto::GetLatestLakeSnapshotResponse;
+    const API_KEY: ApiKey = ApiKey::GetLatestLakeSnapshot;
+    const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(GetLatestLakeSnapshotRequest);
+impl_read_version_type!(proto::GetLatestLakeSnapshotResponse);
diff --git a/crates/fluss/src/rpc/message/list_databases.rs 
b/crates/fluss/src/rpc/message/list_databases.rs
new file mode 100644
index 0000000..ce5a091
--- /dev/null
+++ b/crates/fluss/src/rpc/message/list_databases.rs
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug, Default)]
+pub struct ListDatabasesRequest {
+    pub inner_request: proto::ListDatabasesRequest,
+}
+
+impl ListDatabasesRequest {
+    pub fn new() -> Self {
+        ListDatabasesRequest {
+            inner_request: proto::ListDatabasesRequest {},
+        }
+    }
+}
+
+impl RequestBody for ListDatabasesRequest {
+    type ResponseBody = proto::ListDatabasesResponse;
+
+    const API_KEY: ApiKey = ApiKey::ListDatabases;
+    const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(ListDatabasesRequest);
+impl_read_version_type!(proto::ListDatabasesResponse);
diff --git a/crates/fluss/src/rpc/message/list_tables.rs 
b/crates/fluss/src/rpc/message/list_tables.rs
new file mode 100644
index 0000000..daf57ea
--- /dev/null
+++ b/crates/fluss/src/rpc/message/list_tables.rs
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+
+use crate::proto::ListTablesResponse;
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct ListTablesRequest {
+    pub inner_request: proto::ListTablesRequest,
+}
+
+impl ListTablesRequest {
+    pub fn new(database_name: &str) -> Self {
+        ListTablesRequest {
+            inner_request: proto::ListTablesRequest {
+                database_name: database_name.to_string(),
+            },
+        }
+    }
+}
+
+impl RequestBody for ListTablesRequest {
+    type ResponseBody = ListTablesResponse;
+
+    const API_KEY: ApiKey = ApiKey::ListTables;
+
+    const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(ListTablesRequest);
+impl_read_version_type!(ListTablesResponse);
diff --git a/crates/fluss/src/rpc/message/mod.rs 
b/crates/fluss/src/rpc/message/mod.rs
index 742c393..d5f8ebd 100644
--- a/crates/fluss/src/rpc/message/mod.rs
+++ b/crates/fluss/src/rpc/message/mod.rs
@@ -20,18 +20,36 @@ use crate::rpc::api_version::ApiVersion;
 use crate::rpc::frame::{ReadError, WriteError};
 use bytes::{Buf, BufMut};
 
+mod create_database;
 mod create_table;
+mod database_exists;
+mod drop_database;
+mod drop_table;
 mod fetch;
+mod get_database_info;
+mod get_latest_lake_snapshot;
 mod get_table;
 mod header;
+mod list_databases;
+mod list_tables;
 mod produce_log;
+mod table_exists;
 mod update_metadata;
 
+pub use create_database::*;
 pub use create_table::*;
+pub use database_exists::*;
+pub use drop_database::*;
+pub use drop_table::*;
 pub use fetch::*;
+pub use get_database_info::*;
+pub use get_latest_lake_snapshot::*;
 pub use get_table::*;
 pub use header::*;
+pub use list_databases::*;
+pub use list_tables::*;
 pub use produce_log::*;
+pub use table_exists::*;
 pub use update_metadata::*;
 
 pub trait RequestBody {
diff --git a/crates/fluss/src/rpc/message/table_exists.rs 
b/crates/fluss/src/rpc/message/table_exists.rs
new file mode 100644
index 0000000..3b71f47
--- /dev/null
+++ b/crates/fluss/src/rpc/message/table_exists.rs
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::TablePath;
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+
+use crate::proto::TableExistsResponse;
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::convert::to_table_path;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct TableExistsRequest {
+    pub inner_request: proto::TableExistsRequest,
+}
+
+impl TableExistsRequest {
+    pub fn new(table_path: &TablePath) -> Self {
+        TableExistsRequest {
+            inner_request: proto::TableExistsRequest {
+                table_path: to_table_path(table_path),
+            },
+        }
+    }
+}
+
+impl RequestBody for TableExistsRequest {
+    type ResponseBody = TableExistsResponse;
+
+    const API_KEY: ApiKey = ApiKey::TableExists;
+
+    const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(TableExistsRequest);
+impl_read_version_type!(TableExistsResponse);
diff --git a/crates/fluss/src/rpc/server_connection.rs 
b/crates/fluss/src/rpc/server_connection.rs
index a102aa3..4eeda46 100644
--- a/crates/fluss/src/rpc/server_connection.rs
+++ b/crates/fluss/src/rpc/server_connection.rs
@@ -72,7 +72,6 @@ impl RpcClient {
                 return Ok(connection.clone());
             }
         }
-
         let new_server = self.connect(server_node).await?;
         self.connections
             .write()

Reply via email to