This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 3bd6b1b chore: added check to get_admin() before creating new admin
(#369)
3bd6b1b is described below
commit 3bd6b1b392b767c2760e95db671ef6aa90e3bafa
Author: Aryamaan Singh <[email protected]>
AuthorDate: Mon Mar 23 07:20:00 2026 +0530
chore: added check to get_admin() before creating new admin (#369)
---
bindings/cpp/src/lib.rs | 2 +-
bindings/cpp/test/test_utils.h | 1 -
bindings/python/src/admin.rs | 6 +-
bindings/python/src/table.rs | 4 +-
.../examples/src/example_partitioned_kv_table.rs | 10 +--
crates/fluss/src/client/admin.rs | 73 +++++++++++++---------
crates/fluss/src/client/connection.rs | 30 +++++++--
crates/fluss/tests/integration/utils.rs | 11 ++--
8 files changed, 82 insertions(+), 55 deletions(-)
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 284eec8..82254ea 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -556,7 +556,7 @@ pub struct Connection {
}
pub struct Admin {
- inner: fcore::client::FlussAdmin,
+ inner: Arc<fcore::client::FlussAdmin>,
}
pub struct Table {
diff --git a/bindings/cpp/test/test_utils.h b/bindings/cpp/test/test_utils.h
index 17a1da7..1ff7e28 100644
--- a/bindings/cpp/test/test_utils.h
+++ b/bindings/cpp/test/test_utils.h
@@ -306,7 +306,6 @@ class FlussTestEnvironment : public ::testing::Environment {
if (result.Ok()) {
auto admin_result = connection_.GetAdmin(admin_);
if (admin_result.Ok()) {
- // check tablet server is available
std::vector<fluss::ServerNode> nodes;
auto nodes_result = admin_.GetServerNodes(nodes);
if (nodes_result.Ok() &&
diff --git a/bindings/python/src/admin.rs b/bindings/python/src/admin.rs
index 703b133..5f4e45d 100644
--- a/bindings/python/src/admin.rs
+++ b/bindings/python/src/admin.rs
@@ -532,10 +532,8 @@ impl FlussAdmin {
impl FlussAdmin {
// Internal method to create FlussAdmin from core admin
- pub fn from_core(admin: fcore::client::FlussAdmin) -> Self {
- Self {
- __admin: Arc::new(admin),
- }
+ pub fn from_core(admin: Arc<fcore::client::FlussAdmin>) -> Self {
+ Self { __admin: admin }
}
}
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 660cd6b..8c9ea0e 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -1902,7 +1902,7 @@ macro_rules! with_scanner {
#[pyclass]
pub struct LogScanner {
scanner: ScannerKind,
- admin: fcore::client::FlussAdmin,
+ admin: Arc<fcore::client::FlussAdmin>,
table_info: fcore::metadata::TableInfo,
/// The projected Arrow schema to use for empty table creation
projected_schema: SchemaRef,
@@ -2207,7 +2207,7 @@ impl LogScanner {
impl LogScanner {
fn new(
scanner: ScannerKind,
- admin: fcore::client::FlussAdmin,
+ admin: Arc<fcore::client::FlussAdmin>,
table_info: fcore::metadata::TableInfo,
projected_schema: SchemaRef,
projected_row_type: fcore::metadata::RowType,
diff --git a/crates/examples/src/example_partitioned_kv_table.rs
b/crates/examples/src/example_partitioned_kv_table.rs
index 9cd2e7d..ba49934 100644
--- a/crates/examples/src/example_partitioned_kv_table.rs
+++ b/crates/examples/src/example_partitioned_kv_table.rs
@@ -46,7 +46,7 @@ pub async fn main() -> Result<()> {
let table_path = TablePath::new("fluss", "partitioned_kv_example");
- let mut admin = conn.get_admin().await?;
+ let admin = conn.get_admin().await?;
admin
.create_table(&table_path, &table_descriptor, true)
.await?;
@@ -55,9 +55,9 @@ pub async fn main() -> Result<()> {
admin.get_table_info(&table_path).await?
);
- create_partition(&table_path, &mut admin, "APAC", 1).await;
- create_partition(&table_path, &mut admin, "EMEA", 2).await;
- create_partition(&table_path, &mut admin, "US", 3).await;
+ create_partition(&table_path, &admin, "APAC", 1).await;
+ create_partition(&table_path, &admin, "EMEA", 2).await;
+ create_partition(&table_path, &admin, "US", 3).await;
let table = conn.get_table(&table_path).await?;
let table_upsert = table.new_upsert()?;
@@ -129,7 +129,7 @@ pub async fn main() -> Result<()> {
Ok(())
}
-async fn create_partition(table_path: &TablePath, admin: &mut FlussAdmin,
region: &str, zone: i64) {
+async fn create_partition(table_path: &TablePath, admin: &FlussAdmin, region:
&str, zone: i64) {
let mut partition_values = HashMap::new();
partition_values.insert("region".to_string(), region.to_string());
partition_values.insert("zone".to_string(), zone.to_string());
diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs
index 7a79e5e..7f1f64e 100644
--- a/crates/fluss/src/client/admin.rs
+++ b/crates/fluss/src/client/admin.rs
@@ -38,30 +38,28 @@ use std::sync::Arc;
use tokio::task::JoinHandle;
pub struct FlussAdmin {
- admin_gateway: ServerConnection,
- #[allow(dead_code)]
metadata: Arc<Metadata>,
- #[allow(dead_code)]
rpc_client: Arc<RpcClient>,
}
impl FlussAdmin {
- pub async fn new(connections: Arc<RpcClient>, metadata: Arc<Metadata>) ->
Result<Self> {
- let admin_con =
- connections
-
.get_connection(metadata.get_cluster().get_coordinator_server().ok_or_else(
- || Error::UnexpectedError {
- message: "Coordinator server not found in cluster
metadata".to_string(),
- source: None,
- },
- )?)
- .await?;
-
- Ok(FlussAdmin {
- admin_gateway: admin_con,
+ pub fn new(connections: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self {
+ FlussAdmin {
metadata,
rpc_client: connections,
- })
+ }
+ }
+
+ async fn admin_gateway(&self) -> Result<ServerConnection> {
+ let cluster = self.metadata.get_cluster();
+ let coordinator =
+ cluster
+ .get_coordinator_server()
+ .ok_or_else(|| Error::UnexpectedError {
+ message: "Coordinator server not found in cluster
metadata".to_string(),
+ source: None,
+ })?;
+ self.rpc_client.get_connection(coordinator).await
}
pub async fn create_database(
@@ -71,7 +69,8 @@ impl FlussAdmin {
ignore_if_exists: bool,
) -> Result<()> {
let _response = self
- .admin_gateway
+ .admin_gateway()
+ .await?
.request(CreateDatabaseRequest::new(
database_name,
database_descriptor,
@@ -88,7 +87,8 @@ impl FlussAdmin {
ignore_if_exists: bool,
) -> Result<()> {
let _response = self
- .admin_gateway
+ .admin_gateway()
+ .await?
.request(CreateTableRequest::new(
table_path,
table_descriptor,
@@ -104,7 +104,8 @@ impl FlussAdmin {
ignore_if_not_exists: bool,
) -> Result<()> {
let _response = self
- .admin_gateway
+ .admin_gateway()
+ .await?
.request(DropTableRequest::new(table_path, ignore_if_not_exists))
.await?;
Ok(())
@@ -112,7 +113,8 @@ impl FlussAdmin {
pub async fn get_table_info(&self, table_path: &TablePath) ->
Result<TableInfo> {
let response = self
- .admin_gateway
+ .admin_gateway()
+ .await?
.request(GetTableRequest::new(table_path))
.await?;
@@ -144,7 +146,8 @@ impl FlussAdmin {
/// List all tables in the given database
pub async fn list_tables(&self, database_name: &str) ->
Result<Vec<String>> {
let response = self
- .admin_gateway
+ .admin_gateway()
+ .await?
.request(ListTablesRequest::new(database_name))
.await?;
Ok(response.table_name)
@@ -162,7 +165,8 @@ impl FlussAdmin {
partial_partition_spec: Option<&PartitionSpec>,
) -> Result<Vec<PartitionInfo>> {
let response = self
- .admin_gateway
+ .admin_gateway()
+ .await?
.request(ListPartitionInfosRequest::new(
table_path,
partial_partition_spec,
@@ -179,7 +183,8 @@ impl FlussAdmin {
ignore_if_exists: bool,
) -> Result<()> {
let _response = self
- .admin_gateway
+ .admin_gateway()
+ .await?
.request(CreatePartitionRequest::new(
table_path,
partition_spec,
@@ -197,7 +202,8 @@ impl FlussAdmin {
ignore_if_not_exists: bool,
) -> Result<()> {
let _response = self
- .admin_gateway
+ .admin_gateway()
+ .await?
.request(DropPartitionRequest::new(
table_path,
partition_spec,
@@ -210,7 +216,8 @@ impl FlussAdmin {
/// Check if a table exists
pub async fn table_exists(&self, table_path: &TablePath) -> Result<bool> {
let response = self
- .admin_gateway
+ .admin_gateway()
+ .await?
.request(TableExistsRequest::new(table_path))
.await?;
Ok(response.exists)
@@ -224,7 +231,8 @@ impl FlussAdmin {
cascade: bool,
) -> Result<()> {
let _response = self
- .admin_gateway
+ .admin_gateway()
+ .await?
.request(DropDatabaseRequest::new(
database_name,
ignore_if_not_exists,
@@ -237,7 +245,8 @@ impl FlussAdmin {
/// List all databases
pub async fn list_databases(&self) -> Result<Vec<String>> {
let response = self
- .admin_gateway
+ .admin_gateway()
+ .await?
.request(ListDatabasesRequest::new())
.await?;
Ok(response.database_name)
@@ -246,7 +255,8 @@ impl FlussAdmin {
/// Check if a database exists
pub async fn database_exists(&self, database_name: &str) -> Result<bool> {
let response = self
- .admin_gateway
+ .admin_gateway()
+ .await?
.request(DatabaseExistsRequest::new(database_name))
.await?;
Ok(response.exists)
@@ -255,7 +265,7 @@ impl FlussAdmin {
/// Get database information
pub async fn get_database_info(&self, database_name: &str) ->
Result<DatabaseInfo> {
let request = GetDatabaseInfoRequest::new(database_name);
- let response = self.admin_gateway.request(request).await?;
+ let response = self.admin_gateway().await?.request(request).await?;
// Convert proto response to DatabaseInfo
let database_descriptor =
DatabaseDescriptor::from_json_bytes(&response.database_json)?;
@@ -278,7 +288,8 @@ impl FlussAdmin {
/// Get the latest lake snapshot for a table
pub async fn get_latest_lake_snapshot(&self, table_path: &TablePath) ->
Result<LakeSnapshot> {
let response = self
- .admin_gateway
+ .admin_gateway()
+ .await?
.request(GetLatestLakeSnapshotRequest::new(table_path))
.await?;
diff --git a/crates/fluss/src/client/connection.rs
b/crates/fluss/src/client/connection.rs
index 78e9362..7dc1285 100644
--- a/crates/fluss/src/client/connection.rs
+++ b/crates/fluss/src/client/connection.rs
@@ -20,14 +20,13 @@ use crate::client::admin::FlussAdmin;
use crate::client::metadata::Metadata;
use crate::client::table::FlussTable;
use crate::config::Config;
+use crate::error::{Error, FlussError, Result};
+use crate::metadata::TablePath;
use crate::rpc::RpcClient;
use parking_lot::RwLock;
use std::sync::Arc;
use std::time::Duration;
-use crate::error::{Error, FlussError, Result};
-use crate::metadata::TablePath;
-
// TODO: implement `close(&self, timeout: Duration)` to gracefully shut down
the
// writer client (drain pending batches, then force-close on timeout).
// Java's FlussConnection.close() calls writerClient.close(Long.MAX_VALUE).
@@ -37,6 +36,7 @@ pub struct FlussConnection {
network_connects: Arc<RpcClient>,
args: Config,
writer_client: RwLock<Option<Arc<WriterClient>>>,
+ admin_client: RwLock<Option<Arc<FlussAdmin>>>,
}
impl FlussConnection {
@@ -66,6 +66,7 @@ impl FlussConnection {
network_connects: connections.clone(),
args: arg.clone(),
writer_client: Default::default(),
+ admin_client: RwLock::new(None),
})
}
@@ -81,8 +82,27 @@ impl FlussConnection {
&self.args
}
- pub async fn get_admin(&self) -> Result<FlussAdmin> {
- FlussAdmin::new(self.network_connects.clone(),
self.metadata.clone()).await
+ pub async fn get_admin(&self) -> Result<Arc<FlussAdmin>> {
+ // 1. Fast path: return cached instance if already initialized.
+ if let Some(admin) = self.admin_client.read().as_ref() {
+ return Ok(admin.clone());
+ }
+
+ // 2. Slow path: acquire write lock.
+ let mut admin_guard = self.admin_client.write();
+
+ // 3. Double-check: another thread may have initialized while we
waited.
+ if let Some(admin) = admin_guard.as_ref() {
+ return Ok(admin.clone());
+ }
+
+ // 4. Initialize and cache.
+ let admin = Arc::new(FlussAdmin::new(
+ self.network_connects.clone(),
+ self.metadata.clone(),
+ ));
+ *admin_guard = Some(admin.clone());
+ Ok(admin)
}
pub fn get_or_create_writer_client(&self) -> Result<Arc<WriterClient>> {
diff --git a/crates/fluss/tests/integration/utils.rs
b/crates/fluss/tests/integration/utils.rs
index b53abc8..970b84a 100644
--- a/crates/fluss/tests/integration/utils.rs
+++ b/crates/fluss/tests/integration/utils.rs
@@ -109,12 +109,11 @@ pub async fn wait_for_cluster_ready_with_sasl(cluster:
&FlussTestingCluster) {
let connection = cluster
.get_fluss_connection_with_sasl(username, password)
.await;
- if connection.get_admin().await.is_ok()
- && connection
- .get_metadata()
- .get_cluster()
- .get_one_available_server()
- .is_some()
+ if connection
+ .get_metadata()
+ .get_cluster()
+ .get_one_available_server()
+ .is_some()
{
return;
}