This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 3bd6b1b  chore: added check to get_admin() before creating new admin 
(#369)
3bd6b1b is described below

commit 3bd6b1b392b767c2760e95db671ef6aa90e3bafa
Author: Aryamaan Singh <[email protected]>
AuthorDate: Mon Mar 23 07:20:00 2026 +0530

    chore: added check to get_admin() before creating new admin (#369)
---
 bindings/cpp/src/lib.rs                            |  2 +-
 bindings/cpp/test/test_utils.h                     |  1 -
 bindings/python/src/admin.rs                       |  6 +-
 bindings/python/src/table.rs                       |  4 +-
 .../examples/src/example_partitioned_kv_table.rs   | 10 +--
 crates/fluss/src/client/admin.rs                   | 73 +++++++++++++---------
 crates/fluss/src/client/connection.rs              | 30 +++++++--
 crates/fluss/tests/integration/utils.rs            | 11 ++--
 8 files changed, 82 insertions(+), 55 deletions(-)

diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 284eec8..82254ea 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -556,7 +556,7 @@ pub struct Connection {
 }
 
 pub struct Admin {
-    inner: fcore::client::FlussAdmin,
+    inner: Arc<fcore::client::FlussAdmin>,
 }
 
 pub struct Table {
diff --git a/bindings/cpp/test/test_utils.h b/bindings/cpp/test/test_utils.h
index 17a1da7..1ff7e28 100644
--- a/bindings/cpp/test/test_utils.h
+++ b/bindings/cpp/test/test_utils.h
@@ -306,7 +306,6 @@ class FlussTestEnvironment : public ::testing::Environment {
             if (result.Ok()) {
                 auto admin_result = connection_.GetAdmin(admin_);
                 if (admin_result.Ok()) {
-                    // check tablet server is available
                     std::vector<fluss::ServerNode> nodes;
                     auto nodes_result = admin_.GetServerNodes(nodes);
                     if (nodes_result.Ok() &&
diff --git a/bindings/python/src/admin.rs b/bindings/python/src/admin.rs
index 703b133..5f4e45d 100644
--- a/bindings/python/src/admin.rs
+++ b/bindings/python/src/admin.rs
@@ -532,10 +532,8 @@ impl FlussAdmin {
 
 impl FlussAdmin {
     // Internal method to create FlussAdmin from core admin
-    pub fn from_core(admin: fcore::client::FlussAdmin) -> Self {
-        Self {
-            __admin: Arc::new(admin),
-        }
+    pub fn from_core(admin: Arc<fcore::client::FlussAdmin>) -> Self {
+        Self { __admin: admin }
     }
 }
 
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 660cd6b..8c9ea0e 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -1902,7 +1902,7 @@ macro_rules! with_scanner {
 #[pyclass]
 pub struct LogScanner {
     scanner: ScannerKind,
-    admin: fcore::client::FlussAdmin,
+    admin: Arc<fcore::client::FlussAdmin>,
     table_info: fcore::metadata::TableInfo,
     /// The projected Arrow schema to use for empty table creation
     projected_schema: SchemaRef,
@@ -2207,7 +2207,7 @@ impl LogScanner {
 impl LogScanner {
     fn new(
         scanner: ScannerKind,
-        admin: fcore::client::FlussAdmin,
+        admin: Arc<fcore::client::FlussAdmin>,
         table_info: fcore::metadata::TableInfo,
         projected_schema: SchemaRef,
         projected_row_type: fcore::metadata::RowType,
diff --git a/crates/examples/src/example_partitioned_kv_table.rs 
b/crates/examples/src/example_partitioned_kv_table.rs
index 9cd2e7d..ba49934 100644
--- a/crates/examples/src/example_partitioned_kv_table.rs
+++ b/crates/examples/src/example_partitioned_kv_table.rs
@@ -46,7 +46,7 @@ pub async fn main() -> Result<()> {
 
     let table_path = TablePath::new("fluss", "partitioned_kv_example");
 
-    let mut admin = conn.get_admin().await?;
+    let admin = conn.get_admin().await?;
     admin
         .create_table(&table_path, &table_descriptor, true)
         .await?;
@@ -55,9 +55,9 @@ pub async fn main() -> Result<()> {
         admin.get_table_info(&table_path).await?
     );
 
-    create_partition(&table_path, &mut admin, "APAC", 1).await;
-    create_partition(&table_path, &mut admin, "EMEA", 2).await;
-    create_partition(&table_path, &mut admin, "US", 3).await;
+    create_partition(&table_path, &admin, "APAC", 1).await;
+    create_partition(&table_path, &admin, "EMEA", 2).await;
+    create_partition(&table_path, &admin, "US", 3).await;
 
     let table = conn.get_table(&table_path).await?;
     let table_upsert = table.new_upsert()?;
@@ -129,7 +129,7 @@ pub async fn main() -> Result<()> {
     Ok(())
 }
 
-async fn create_partition(table_path: &TablePath, admin: &mut FlussAdmin, 
region: &str, zone: i64) {
+async fn create_partition(table_path: &TablePath, admin: &FlussAdmin, region: 
&str, zone: i64) {
     let mut partition_values = HashMap::new();
     partition_values.insert("region".to_string(), region.to_string());
     partition_values.insert("zone".to_string(), zone.to_string());
diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs
index 7a79e5e..7f1f64e 100644
--- a/crates/fluss/src/client/admin.rs
+++ b/crates/fluss/src/client/admin.rs
@@ -38,30 +38,28 @@ use std::sync::Arc;
 use tokio::task::JoinHandle;
 
 pub struct FlussAdmin {
-    admin_gateway: ServerConnection,
-    #[allow(dead_code)]
     metadata: Arc<Metadata>,
-    #[allow(dead_code)]
     rpc_client: Arc<RpcClient>,
 }
 
 impl FlussAdmin {
-    pub async fn new(connections: Arc<RpcClient>, metadata: Arc<Metadata>) -> 
Result<Self> {
-        let admin_con =
-            connections
-                
.get_connection(metadata.get_cluster().get_coordinator_server().ok_or_else(
-                    || Error::UnexpectedError {
-                        message: "Coordinator server not found in cluster 
metadata".to_string(),
-                        source: None,
-                    },
-                )?)
-                .await?;
-
-        Ok(FlussAdmin {
-            admin_gateway: admin_con,
+    pub fn new(connections: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self {
+        FlussAdmin {
             metadata,
             rpc_client: connections,
-        })
+        }
+    }
+
+    async fn admin_gateway(&self) -> Result<ServerConnection> {
+        let cluster = self.metadata.get_cluster();
+        let coordinator =
+            cluster
+                .get_coordinator_server()
+                .ok_or_else(|| Error::UnexpectedError {
+                    message: "Coordinator server not found in cluster 
metadata".to_string(),
+                    source: None,
+                })?;
+        self.rpc_client.get_connection(coordinator).await
     }
 
     pub async fn create_database(
@@ -71,7 +69,8 @@ impl FlussAdmin {
         ignore_if_exists: bool,
     ) -> Result<()> {
         let _response = self
-            .admin_gateway
+            .admin_gateway()
+            .await?
             .request(CreateDatabaseRequest::new(
                 database_name,
                 database_descriptor,
@@ -88,7 +87,8 @@ impl FlussAdmin {
         ignore_if_exists: bool,
     ) -> Result<()> {
         let _response = self
-            .admin_gateway
+            .admin_gateway()
+            .await?
             .request(CreateTableRequest::new(
                 table_path,
                 table_descriptor,
@@ -104,7 +104,8 @@ impl FlussAdmin {
         ignore_if_not_exists: bool,
     ) -> Result<()> {
         let _response = self
-            .admin_gateway
+            .admin_gateway()
+            .await?
             .request(DropTableRequest::new(table_path, ignore_if_not_exists))
             .await?;
         Ok(())
@@ -112,7 +113,8 @@ impl FlussAdmin {
 
     pub async fn get_table_info(&self, table_path: &TablePath) -> 
Result<TableInfo> {
         let response = self
-            .admin_gateway
+            .admin_gateway()
+            .await?
             .request(GetTableRequest::new(table_path))
             .await?;
 
@@ -144,7 +146,8 @@ impl FlussAdmin {
     /// List all tables in the given database
     pub async fn list_tables(&self, database_name: &str) -> 
Result<Vec<String>> {
         let response = self
-            .admin_gateway
+            .admin_gateway()
+            .await?
             .request(ListTablesRequest::new(database_name))
             .await?;
         Ok(response.table_name)
@@ -162,7 +165,8 @@ impl FlussAdmin {
         partial_partition_spec: Option<&PartitionSpec>,
     ) -> Result<Vec<PartitionInfo>> {
         let response = self
-            .admin_gateway
+            .admin_gateway()
+            .await?
             .request(ListPartitionInfosRequest::new(
                 table_path,
                 partial_partition_spec,
@@ -179,7 +183,8 @@ impl FlussAdmin {
         ignore_if_exists: bool,
     ) -> Result<()> {
         let _response = self
-            .admin_gateway
+            .admin_gateway()
+            .await?
             .request(CreatePartitionRequest::new(
                 table_path,
                 partition_spec,
@@ -197,7 +202,8 @@ impl FlussAdmin {
         ignore_if_not_exists: bool,
     ) -> Result<()> {
         let _response = self
-            .admin_gateway
+            .admin_gateway()
+            .await?
             .request(DropPartitionRequest::new(
                 table_path,
                 partition_spec,
@@ -210,7 +216,8 @@ impl FlussAdmin {
     /// Check if a table exists
     pub async fn table_exists(&self, table_path: &TablePath) -> Result<bool> {
         let response = self
-            .admin_gateway
+            .admin_gateway()
+            .await?
             .request(TableExistsRequest::new(table_path))
             .await?;
         Ok(response.exists)
@@ -224,7 +231,8 @@ impl FlussAdmin {
         cascade: bool,
     ) -> Result<()> {
         let _response = self
-            .admin_gateway
+            .admin_gateway()
+            .await?
             .request(DropDatabaseRequest::new(
                 database_name,
                 ignore_if_not_exists,
@@ -237,7 +245,8 @@ impl FlussAdmin {
     /// List all databases
     pub async fn list_databases(&self) -> Result<Vec<String>> {
         let response = self
-            .admin_gateway
+            .admin_gateway()
+            .await?
             .request(ListDatabasesRequest::new())
             .await?;
         Ok(response.database_name)
@@ -246,7 +255,8 @@ impl FlussAdmin {
     /// Check if a database exists
     pub async fn database_exists(&self, database_name: &str) -> Result<bool> {
         let response = self
-            .admin_gateway
+            .admin_gateway()
+            .await?
             .request(DatabaseExistsRequest::new(database_name))
             .await?;
         Ok(response.exists)
@@ -255,7 +265,7 @@ impl FlussAdmin {
     /// Get database information
     pub async fn get_database_info(&self, database_name: &str) -> 
Result<DatabaseInfo> {
         let request = GetDatabaseInfoRequest::new(database_name);
-        let response = self.admin_gateway.request(request).await?;
+        let response = self.admin_gateway().await?.request(request).await?;
 
         // Convert proto response to DatabaseInfo
         let database_descriptor = 
DatabaseDescriptor::from_json_bytes(&response.database_json)?;
@@ -278,7 +288,8 @@ impl FlussAdmin {
     /// Get the latest lake snapshot for a table
     pub async fn get_latest_lake_snapshot(&self, table_path: &TablePath) -> 
Result<LakeSnapshot> {
         let response = self
-            .admin_gateway
+            .admin_gateway()
+            .await?
             .request(GetLatestLakeSnapshotRequest::new(table_path))
             .await?;
 
diff --git a/crates/fluss/src/client/connection.rs 
b/crates/fluss/src/client/connection.rs
index 78e9362..7dc1285 100644
--- a/crates/fluss/src/client/connection.rs
+++ b/crates/fluss/src/client/connection.rs
@@ -20,14 +20,13 @@ use crate::client::admin::FlussAdmin;
 use crate::client::metadata::Metadata;
 use crate::client::table::FlussTable;
 use crate::config::Config;
+use crate::error::{Error, FlussError, Result};
+use crate::metadata::TablePath;
 use crate::rpc::RpcClient;
 use parking_lot::RwLock;
 use std::sync::Arc;
 use std::time::Duration;
 
-use crate::error::{Error, FlussError, Result};
-use crate::metadata::TablePath;
-
 // TODO: implement `close(&self, timeout: Duration)` to gracefully shut down 
the
 // writer client (drain pending batches, then force-close on timeout).
 // Java's FlussConnection.close() calls writerClient.close(Long.MAX_VALUE).
@@ -37,6 +36,7 @@ pub struct FlussConnection {
     network_connects: Arc<RpcClient>,
     args: Config,
     writer_client: RwLock<Option<Arc<WriterClient>>>,
+    admin_client: RwLock<Option<Arc<FlussAdmin>>>,
 }
 
 impl FlussConnection {
@@ -66,6 +66,7 @@ impl FlussConnection {
             network_connects: connections.clone(),
             args: arg.clone(),
             writer_client: Default::default(),
+            admin_client: RwLock::new(None),
         })
     }
 
@@ -81,8 +82,27 @@ impl FlussConnection {
         &self.args
     }
 
-    pub async fn get_admin(&self) -> Result<FlussAdmin> {
-        FlussAdmin::new(self.network_connects.clone(), 
self.metadata.clone()).await
+    pub async fn get_admin(&self) -> Result<Arc<FlussAdmin>> {
+        // 1. Fast path: return cached instance if already initialized.
+        if let Some(admin) = self.admin_client.read().as_ref() {
+            return Ok(admin.clone());
+        }
+
+        // 2. Slow path: acquire write lock.
+        let mut admin_guard = self.admin_client.write();
+
+        // 3. Double-check: another thread may have initialized while we 
waited.
+        if let Some(admin) = admin_guard.as_ref() {
+            return Ok(admin.clone());
+        }
+
+        // 4. Initialize and cache.
+        let admin = Arc::new(FlussAdmin::new(
+            self.network_connects.clone(),
+            self.metadata.clone(),
+        ));
+        *admin_guard = Some(admin.clone());
+        Ok(admin)
     }
 
     pub fn get_or_create_writer_client(&self) -> Result<Arc<WriterClient>> {
diff --git a/crates/fluss/tests/integration/utils.rs 
b/crates/fluss/tests/integration/utils.rs
index b53abc8..970b84a 100644
--- a/crates/fluss/tests/integration/utils.rs
+++ b/crates/fluss/tests/integration/utils.rs
@@ -109,12 +109,11 @@ pub async fn wait_for_cluster_ready_with_sasl(cluster: 
&FlussTestingCluster) {
         let connection = cluster
             .get_fluss_connection_with_sasl(username, password)
             .await;
-        if connection.get_admin().await.is_ok()
-            && connection
-                .get_metadata()
-                .get_cluster()
-                .get_one_available_server()
-                .is_some()
+        if connection
+            .get_metadata()
+            .get_cluster()
+            .get_one_available_server()
+            .is_some()
         {
             return;
         }

Reply via email to