This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new da021ad chore: make get admin sync (#450)
da021ad is described below
commit da021adda0cec127d4b899b356c175dbc3694ade
Author: yuxia Luo <[email protected]>
AuthorDate: Mon Mar 23 10:21:48 2026 +0800
chore: make get admin sync (#450)
---
bindings/cpp/src/lib.rs | 2 +-
bindings/cpp/src/types.rs | 2 +-
bindings/python/example/example.py | 2 +-
bindings/python/fluss/__init__.pyi | 2 +-
bindings/python/src/connection.rs | 18 +++++---------
bindings/python/src/table.rs | 1 -
bindings/python/test/conftest.py | 4 +--
bindings/python/test/test_sasl_auth.py | 4 +--
crates/examples/src/example_kv_table.rs | 2 +-
.../examples/src/example_partitioned_kv_table.rs | 2 +-
crates/examples/src/example_table.rs | 2 +-
crates/fluss/README.md | 2 +-
crates/fluss/src/client/connection.rs | 2 +-
crates/fluss/src/client/table/scanner.rs | 4 +--
crates/fluss/src/client/write/sender.rs | 5 +---
crates/fluss/src/lib.rs | 2 +-
crates/fluss/src/metadata/data_lake_format.rs | 2 +-
crates/fluss/src/metadata/datatype.rs | 6 ++---
crates/fluss/tests/integration/admin.rs | 29 ++++++++--------------
crates/fluss/tests/integration/kv_table.rs | 10 ++++----
crates/fluss/tests/integration/log_table.rs | 14 +++++------
crates/fluss/tests/integration/sasl_auth.rs | 2 --
.../fluss/tests/integration/table_remote_scan.rs | 2 +-
website/docs/user-guide/python/api-reference.md | 2 +-
.../user-guide/python/example/admin-operations.md | 2 +-
website/docs/user-guide/python/example/index.md | 2 +-
.../docs/user-guide/python/example/log-tables.md | 2 +-
website/docs/user-guide/rust/api-reference.md | 2 +-
website/docs/user-guide/rust/error-handling.md | 4 +--
.../user-guide/rust/example/admin-operations.md | 2 +-
website/docs/user-guide/rust/example/index.md | 2 +-
website/docs/user-guide/rust/example/log-tables.md | 2 +-
.../user-guide/rust/example/partitioned-tables.md | 2 +-
33 files changed, 61 insertions(+), 82 deletions(-)
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 82254ea..778fef3 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -712,7 +712,7 @@ unsafe fn delete_connection(conn: *mut Connection) {
impl Connection {
fn get_admin(&self) -> ffi::FfiPtrResult {
- let admin_result = RUNTIME.block_on(async {
self.inner.get_admin().await });
+ let admin_result = self.inner.get_admin();
match admin_result {
Ok(admin) => {
diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs
index f8efe67..3c0c6f7 100644
--- a/bindings/cpp/src/types.rs
+++ b/bindings/cpp/src/types.rs
@@ -336,7 +336,7 @@ pub fn resolve_row_types(
Some(fcore::metadata::DataType::Decimal(dt)) => {
let (precision, scale) = (dt.precision(), dt.scale());
let bd =
bigdecimal::BigDecimal::from_str(cow.as_ref()).map_err(|e| {
- anyhow!("Column {idx}: invalid decimal string
'{}': {e}", cow)
+ anyhow!("Column {idx}: invalid decimal string
'{cow}': {e}")
})?;
let decimal =
fcore::row::Decimal::from_big_decimal(bd, precision, scale)
.map_err(|e| anyhow!("Column {idx}: {e}"))?;
diff --git a/bindings/python/example/example.py
b/bindings/python/example/example.py
index 3564d91..52cefe1 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -65,7 +65,7 @@ async def main():
table_descriptor = fluss.TableDescriptor(fluss_schema)
# Get the admin for Fluss
- admin = await conn.get_admin()
+ admin = conn.get_admin()
# Create a Fluss table
table_path = fluss.TablePath("fluss", "sample_table_types")
diff --git a/bindings/python/fluss/__init__.pyi
b/bindings/python/fluss/__init__.pyi
index 95f3080..9778457 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -233,7 +233,7 @@ class Config:
class FlussConnection:
@staticmethod
async def create(config: Config) -> FlussConnection: ...
- async def get_admin(self) -> FlussAdmin: ...
+ def get_admin(self) -> FlussAdmin: ...
async def get_table(self, table_path: TablePath) -> FlussTable: ...
def close(self) -> None: ...
def __enter__(self) -> FlussConnection: ...
diff --git a/bindings/python/src/connection.rs
b/bindings/python/src/connection.rs
index 31a9505..a8d2d9e 100644
--- a/bindings/python/src/connection.rs
+++ b/bindings/python/src/connection.rs
@@ -46,19 +46,13 @@ impl FlussConnection {
}
/// Get admin interface
- fn get_admin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
- let client = self.inner.clone();
+ fn get_admin(&self, py: Python<'_>) -> PyResult<Py<FlussAdmin>> {
+ let admin = self
+ .inner
+ .get_admin()
+ .map_err(|e| FlussError::from_core_error(&e))?;
- future_into_py(py, async move {
- let admin = client
- .get_admin()
- .await
- .map_err(|e| FlussError::from_core_error(&e))?;
-
- let py_admin = FlussAdmin::from_core(admin);
-
- Python::attach(|py| Py::new(py, py_admin))
- })
+ Py::new(py, FlussAdmin::from_core(admin))
}
/// Get a table
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 8c9ea0e..cda8d3b 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -508,7 +508,6 @@ impl TableScan {
let admin = conn
.get_admin()
- .await
.map_err(|e| FlussError::from_core_error(&e))?;
let (projected_schema, projected_row_type) =
diff --git a/bindings/python/test/conftest.py b/bindings/python/test/conftest.py
index 0e4cfe4..420747e 100644
--- a/bindings/python/test/conftest.py
+++ b/bindings/python/test/conftest.py
@@ -208,7 +208,7 @@ async def _connect_with_retry(bootstrap_servers,
timeout=60):
conn = None
try:
conn = await fluss.FlussConnection.create(config)
- admin = await conn.get_admin()
+ admin = conn.get_admin()
nodes = await admin.get_server_nodes()
if any(n.server_type == "TabletServer" for n in nodes):
return conn
@@ -281,4 +281,4 @@ def plaintext_bootstrap_servers(fluss_cluster):
@pytest_asyncio.fixture(scope="session")
async def admin(connection):
"""Session-scoped admin client."""
- return await connection.get_admin()
+ return connection.get_admin()
diff --git a/bindings/python/test/test_sasl_auth.py
b/bindings/python/test/test_sasl_auth.py
index 30fce4c..9dd2ddd 100644
--- a/bindings/python/test/test_sasl_auth.py
+++ b/bindings/python/test/test_sasl_auth.py
@@ -35,7 +35,7 @@ async def
test_sasl_connect_with_valid_credentials(sasl_bootstrap_servers):
"security.sasl.password": "admin-secret",
})
conn = await fluss.FlussConnection.create(config)
- admin = await conn.get_admin()
+ admin = conn.get_admin()
db_name = "py_sasl_test_valid_db"
db_descriptor = fluss.DatabaseDescriptor(comment="created via SASL auth")
@@ -58,7 +58,7 @@ async def
test_sasl_connect_with_second_user(sasl_bootstrap_servers):
"security.sasl.password": "alice-secret",
})
conn = await fluss.FlussConnection.create(config)
- admin = await conn.get_admin()
+ admin = conn.get_admin()
# Basic operation to confirm functional connection
assert not await admin.database_exists("some_nonexistent_db_alice")
diff --git a/crates/examples/src/example_kv_table.rs
b/crates/examples/src/example_kv_table.rs
index 8fb60ba..ad12ed7 100644
--- a/crates/examples/src/example_kv_table.rs
+++ b/crates/examples/src/example_kv_table.rs
@@ -43,7 +43,7 @@ pub async fn main() -> Result<()> {
let table_path = TablePath::new("fluss", "rust_upsert_lookup_example");
- let admin = conn.get_admin().await?;
+ let admin = conn.get_admin()?;
admin
.create_table(&table_path, &table_descriptor, true)
.await?;
diff --git a/crates/examples/src/example_partitioned_kv_table.rs
b/crates/examples/src/example_partitioned_kv_table.rs
index ba49934..944d8d4 100644
--- a/crates/examples/src/example_partitioned_kv_table.rs
+++ b/crates/examples/src/example_partitioned_kv_table.rs
@@ -46,7 +46,7 @@ pub async fn main() -> Result<()> {
let table_path = TablePath::new("fluss", "partitioned_kv_example");
- let admin = conn.get_admin().await?;
+ let admin = conn.get_admin()?;
admin
.create_table(&table_path, &table_descriptor, true)
.await?;
diff --git a/crates/examples/src/example_table.rs
b/crates/examples/src/example_table.rs
index 49f0ab4..1f751f3 100644
--- a/crates/examples/src/example_table.rs
+++ b/crates/examples/src/example_table.rs
@@ -49,7 +49,7 @@ pub async fn main() -> Result<()> {
let table_path = TablePath::new("fluss", "rust_test_long");
- let admin = conn.get_admin().await?;
+ let admin = conn.get_admin()?;
admin
.create_table(&table_path, &table_descriptor, true)
diff --git a/crates/fluss/README.md b/crates/fluss/README.md
index b37a75d..76dc0ec 100644
--- a/crates/fluss/README.md
+++ b/crates/fluss/README.md
@@ -23,7 +23,7 @@ async fn main() -> Result<()> {
let mut config = Config::default();
config.bootstrap_servers = "127.0.0.1:9123".to_string();
let connection = FlussConnection::new(config).await?;
- let admin = connection.get_admin().await?;
+ let admin = connection.get_admin()?;
// ---- Primary key (KV) table: upsert and lookup ----
let kv_path = TablePath::new("fluss", "users");
diff --git a/crates/fluss/src/client/connection.rs
b/crates/fluss/src/client/connection.rs
index 7dc1285..88f87e4 100644
--- a/crates/fluss/src/client/connection.rs
+++ b/crates/fluss/src/client/connection.rs
@@ -82,7 +82,7 @@ impl FlussConnection {
&self.args
}
- pub async fn get_admin(&self) -> Result<Arc<FlussAdmin>> {
+ pub fn get_admin(&self) -> Result<Arc<FlussAdmin>> {
// 1. Fast path: return cached instance if already initialized.
if let Some(admin) = self.admin_client.read().as_ref() {
return Ok(admin.clone());
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index 4302539..ab7d441 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -94,7 +94,7 @@ impl<'a> TableScan<'a> {
/// .build()?,
/// ).build()?;
/// let table_path = TablePath::new("fluss".to_owned(),
"rust_test_long".to_owned());
- /// let admin = conn.get_admin().await?;
+ /// let admin = conn.get_admin()?;
/// admin.create_table(&table_path, &table_descriptor, true)
/// .await?;
/// let table_info = admin.get_table_info(&table_path).await?;
@@ -169,7 +169,7 @@ impl<'a> TableScan<'a> {
/// .build()?,
/// ).build()?;
/// let table_path = TablePath::new("fluss".to_owned(),
"rust_test_long".to_owned());
- /// let admin = conn.get_admin().await?;
+ /// let admin = conn.get_admin()?;
/// admin.create_table(&table_path, &table_descriptor, true)
/// .await?;
/// let table = conn.get_table(&table_path).await?;
diff --git a/crates/fluss/src/client/write/sender.rs
b/crates/fluss/src/client/write/sender.rs
index dd5370d..8e738d0 100644
--- a/crates/fluss/src/client/write/sender.rs
+++ b/crates/fluss/src/client/write/sender.rs
@@ -268,10 +268,7 @@ impl Sender {
}
}
- debug!(
- "Updated metadata for unknown leader tables: {:?}",
- unknown_leaders
- );
+ debug!("Updated metadata for unknown leader tables:
{unknown_leaders:?}");
Ok(())
}
diff --git a/crates/fluss/src/lib.rs b/crates/fluss/src/lib.rs
index 13e8598..6537189 100644
--- a/crates/fluss/src/lib.rs
+++ b/crates/fluss/src/lib.rs
@@ -40,7 +40,7 @@
//! let mut config = Config::default();
//! config.bootstrap_servers = "127.0.0.1:9123".to_string();
//! let connection = FlussConnection::new(config).await?;
-//! let admin = connection.get_admin().await?;
+//! let admin = connection.get_admin()?;
//!
//! // ---- Primary key (KV) table: upsert and lookup ----
//! let kv_path = TablePath::new("fluss", "users");
diff --git a/crates/fluss/src/metadata/data_lake_format.rs
b/crates/fluss/src/metadata/data_lake_format.rs
index b0c3b0d..77e5ad3 100644
--- a/crates/fluss/src/metadata/data_lake_format.rs
+++ b/crates/fluss/src/metadata/data_lake_format.rs
@@ -54,7 +54,7 @@ mod tests {
for (raw, expected) in cases {
let parsed = raw.parse::<DataLakeFormat>().unwrap();
- assert_eq!(parsed, expected, "failed to parse: {}", raw);
+ assert_eq!(parsed, expected, "failed to parse: {raw}");
}
// negative cases
diff --git a/crates/fluss/src/metadata/datatype.rs
b/crates/fluss/src/metadata/datatype.rs
index 6d888d9..d9eb56b 100644
--- a/crates/fluss/src/metadata/datatype.rs
+++ b/crates/fluss/src/metadata/datatype.rs
@@ -954,7 +954,7 @@ impl RowType {
.iter()
.map(|name| {
self.get_field_index(name).ok_or_else(|| IllegalArgument {
- message: format!("Field '{}' does not exist in the row
type", name),
+ message: format!("Field '{name}' does not exist in the row
type"),
})
})
.collect::<Result<Vec<_>>>()?;
@@ -1522,7 +1522,7 @@ fn test_time_valid_precision() {
// Test all valid precision values 0 through 9
for precision in 0..=9 {
let result = TimeType::with_nullable(true, precision);
- assert!(result.is_ok(), "precision {} should be valid", precision);
+ assert!(result.is_ok(), "precision {precision} should be valid");
let time = result.unwrap();
assert_eq!(time.precision(), precision);
}
@@ -1550,7 +1550,7 @@ fn test_timestamp_valid_precision() {
// Test all valid precision values 0 through 9
for precision in 0..=9 {
let result = TimestampType::with_nullable(true, precision);
- assert!(result.is_ok(), "precision {} should be valid", precision);
+ assert!(result.is_ok(), "precision {precision} should be valid");
let timestamp_type = result.unwrap();
assert_eq!(timestamp_type.precision(), precision);
}
diff --git a/crates/fluss/tests/integration/admin.rs
b/crates/fluss/tests/integration/admin.rs
index c647704..0860cbe 100644
--- a/crates/fluss/tests/integration/admin.rs
+++ b/crates/fluss/tests/integration/admin.rs
@@ -30,7 +30,7 @@ mod admin_test {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
- let admin = connection.get_admin().await.expect("should get admin");
+ let admin = connection.get_admin().expect("should get admin");
let db_descriptor = DatabaseDescriptorBuilder::default()
.comment("test_db")
@@ -73,10 +73,7 @@ mod admin_test {
async fn test_create_table() {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
- let admin = connection
- .get_admin()
- .await
- .expect("Failed to get admin client");
+ let admin = connection.get_admin().expect("Failed to get admin
client");
let test_db_name = "test_create_table_db";
let db_descriptor = DatabaseDescriptorBuilder::default()
@@ -202,10 +199,7 @@ mod admin_test {
async fn test_partition_apis() {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
- let admin = connection
- .get_admin()
- .await
- .expect("Failed to get admin client");
+ let admin = connection.get_admin().expect("Failed to get admin
client");
let test_db_name = "test_partition_apis_db";
let db_descriptor = DatabaseDescriptorBuilder::default()
@@ -341,10 +335,7 @@ mod admin_test {
async fn test_fluss_error_response() {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
- let admin = connection
- .get_admin()
- .await
- .expect("Failed to get admin client");
+ let admin = connection.get_admin().expect("Failed to get admin
client");
let table_path = TablePath::new("fluss", "not_exist");
@@ -375,7 +366,7 @@ mod admin_test {
async fn test_error_database_not_exist() {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
- let admin = connection.get_admin().await.unwrap();
+ let admin = connection.get_admin().unwrap();
// get_database_info for non-existent database
let result = admin.get_database_info("no_such_db").await;
@@ -394,7 +385,7 @@ mod admin_test {
async fn test_error_database_already_exist() {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
- let admin = connection.get_admin().await.unwrap();
+ let admin = connection.get_admin().unwrap();
let db_name = "test_error_db_already_exist";
let descriptor = DatabaseDescriptorBuilder::default().build();
@@ -424,7 +415,7 @@ mod admin_test {
async fn test_error_table_already_exist() {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
- let admin = connection.get_admin().await.unwrap();
+ let admin = connection.get_admin().unwrap();
let db_name = "test_error_tbl_already_exist_db";
let descriptor = DatabaseDescriptorBuilder::default().build();
@@ -472,7 +463,7 @@ mod admin_test {
async fn test_error_table_not_exist() {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
- let admin = connection.get_admin().await.unwrap();
+ let admin = connection.get_admin().unwrap();
let table_path = TablePath::new("fluss", "no_such_table");
@@ -491,7 +482,7 @@ mod admin_test {
async fn test_get_server_nodes() {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
- let admin = connection.get_admin().await.unwrap();
+ let admin = connection.get_admin().unwrap();
let nodes = admin
.get_server_nodes()
@@ -534,7 +525,7 @@ mod admin_test {
async fn test_error_table_not_partitioned() {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
- let admin = connection.get_admin().await.unwrap();
+ let admin = connection.get_admin().unwrap();
let db_name = "test_error_not_partitioned_db";
let descriptor = DatabaseDescriptorBuilder::default().build();
diff --git a/crates/fluss/tests/integration/kv_table.rs
b/crates/fluss/tests/integration/kv_table.rs
index f0e0c57..b942988 100644
--- a/crates/fluss/tests/integration/kv_table.rs
+++ b/crates/fluss/tests/integration/kv_table.rs
@@ -33,7 +33,7 @@ mod kv_table_test {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
- let admin = connection.get_admin().await.unwrap();
+ let admin = connection.get_admin().unwrap();
let table_path = TablePath::new("fluss", "test_upsert_and_lookup");
@@ -172,7 +172,7 @@ mod kv_table_test {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
- let admin = connection.get_admin().await.unwrap();
+ let admin = connection.get_admin().unwrap();
let table_path = TablePath::new("fluss", "test_composite_pk");
@@ -282,7 +282,7 @@ mod kv_table_test {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
- let admin = connection.get_admin().await.expect("Failed to get admin");
+ let admin = connection.get_admin().expect("Failed to get admin");
let table_path = TablePath::new("fluss", "test_partial_update");
@@ -403,7 +403,7 @@ mod kv_table_test {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
- let admin = connection.get_admin().await.expect("Failed to get admin");
+ let admin = connection.get_admin().expect("Failed to get admin");
let table_path = TablePath::new("fluss", "test_partitioned_kv_table");
@@ -573,7 +573,7 @@ mod kv_table_test {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
- let admin = connection.get_admin().await.expect("Failed to get admin");
+ let admin = connection.get_admin().expect("Failed to get admin");
let table_path = TablePath::new("fluss", "test_all_datatypes");
diff --git a/crates/fluss/tests/integration/log_table.rs
b/crates/fluss/tests/integration/log_table.rs
index 4aa88ac..330e143 100644
--- a/crates/fluss/tests/integration/log_table.rs
+++ b/crates/fluss/tests/integration/log_table.rs
@@ -33,7 +33,7 @@ mod table_test {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
- let admin = connection.get_admin().await.expect("Failed to get admin");
+ let admin = connection.get_admin().expect("Failed to get admin");
let table_path = TablePath::new("fluss",
"test_append_record_batch_and_scan");
@@ -143,7 +143,7 @@ mod table_test {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
- let admin = connection.get_admin().await.expect("Failed to get admin");
+ let admin = connection.get_admin().expect("Failed to get admin");
let table_path = TablePath::new("fluss", "test_list_offsets");
@@ -296,7 +296,7 @@ mod table_test {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
- let admin = connection.get_admin().await.expect("Failed to get admin");
+ let admin = connection.get_admin().expect("Failed to get admin");
let table_path = TablePath::new("fluss", "test_project");
@@ -451,7 +451,7 @@ mod table_test {
async fn test_poll_batches() {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
- let admin = connection.get_admin().await.expect("Failed to get admin");
+ let admin = connection.get_admin().expect("Failed to get admin");
let table_path = TablePath::new("fluss", "test_poll_batches");
let schema = Schema::builder()
@@ -584,7 +584,7 @@ mod table_test {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
- let admin = connection.get_admin().await.expect("Failed to get admin");
+ let admin = connection.get_admin().expect("Failed to get admin");
let table_path = TablePath::new("fluss", "test_log_all_datatypes");
@@ -1015,7 +1015,7 @@ mod table_test {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
- let admin = connection.get_admin().await.expect("Failed to get admin");
+ let admin = connection.get_admin().expect("Failed to get admin");
let table_path = TablePath::new("fluss",
"test_partitioned_log_append");
@@ -1309,7 +1309,7 @@ mod table_test {
async fn undersized_row_returns_error() {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
- let admin = connection.get_admin().await.expect("Failed to get admin");
+ let admin = connection.get_admin().expect("Failed to get admin");
let table_path = TablePath::new("fluss", "test_log_undersized_row");
diff --git a/crates/fluss/tests/integration/sasl_auth.rs
b/crates/fluss/tests/integration/sasl_auth.rs
index 878c983..439d65f 100644
--- a/crates/fluss/tests/integration/sasl_auth.rs
+++ b/crates/fluss/tests/integration/sasl_auth.rs
@@ -36,7 +36,6 @@ mod sasl_auth_test {
let admin = connection
.get_admin()
- .await
.expect("Should get admin with valid SASL credentials");
// Perform a basic operation to confirm the connection is fully
functional
@@ -69,7 +68,6 @@ mod sasl_auth_test {
let admin = connection
.get_admin()
- .await
.expect("Should get admin with alice credentials");
// Basic operation to confirm functional connection
diff --git a/crates/fluss/tests/integration/table_remote_scan.rs
b/crates/fluss/tests/integration/table_remote_scan.rs
index 52b8974..293c1f7 100644
--- a/crates/fluss/tests/integration/table_remote_scan.rs
+++ b/crates/fluss/tests/integration/table_remote_scan.rs
@@ -28,7 +28,7 @@ mod table_remote_scan_test {
let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
- let admin = connection.get_admin().await.expect("Failed to get admin");
+ let admin = connection.get_admin().expect("Failed to get admin");
let table_path = TablePath::new("fluss", "test_scan_remote_log");
diff --git a/website/docs/user-guide/python/api-reference.md
b/website/docs/user-guide/python/api-reference.md
index a4b594b..1268d37 100644
--- a/website/docs/user-guide/python/api-reference.md
+++ b/website/docs/user-guide/python/api-reference.md
@@ -36,7 +36,7 @@ Complete API reference for the Fluss Python client.
| Method | Description
|
|-----------------------------------------------------------|---------------------------------------|
| `await FlussConnection.create(config) -> FlussConnection` | Connect to a
Fluss cluster |
-| `await conn.get_admin() -> FlussAdmin` | Get admin
interface |
+| `conn.get_admin() -> FlussAdmin` | Get admin
interface |
| `await conn.get_table(table_path) -> FlussTable` | Get a table for
read/write operations |
| `conn.close()` | Close the
connection |
diff --git a/website/docs/user-guide/python/example/admin-operations.md
b/website/docs/user-guide/python/example/admin-operations.md
index ba3748a..2cda6c4 100644
--- a/website/docs/user-guide/python/example/admin-operations.md
+++ b/website/docs/user-guide/python/example/admin-operations.md
@@ -4,7 +4,7 @@ sidebar_position: 3
# Admin Operations
```python
-admin = await conn.get_admin()
+admin = conn.get_admin()
```
## Databases
diff --git a/website/docs/user-guide/python/example/index.md
b/website/docs/user-guide/python/example/index.md
index 389b648..ec9fa78 100644
--- a/website/docs/user-guide/python/example/index.md
+++ b/website/docs/user-guide/python/example/index.md
@@ -14,7 +14,7 @@ async def main():
# Connect
config = fluss.Config({"bootstrap.servers": "127.0.0.1:9123"})
conn = await fluss.FlussConnection.create(config)
- admin = await conn.get_admin()
+ admin = conn.get_admin()
# Create a log table
schema = fluss.Schema(pa.schema([
diff --git a/website/docs/user-guide/python/example/log-tables.md
b/website/docs/user-guide/python/example/log-tables.md
index adaa162..c320bf4 100644
--- a/website/docs/user-guide/python/example/log-tables.md
+++ b/website/docs/user-guide/python/example/log-tables.md
@@ -112,7 +112,7 @@ scanner.unsubscribe(bucket_id=0)
To only consume new records (skip existing data), first resolve the current
latest offset via `list_offsets`, then subscribe at that offset:
```python
-admin = await conn.get_admin()
+admin = conn.get_admin()
offsets = await admin.list_offsets(table_path, [0], fluss.OffsetSpec.latest())
latest = offsets[0]
diff --git a/website/docs/user-guide/rust/api-reference.md
b/website/docs/user-guide/rust/api-reference.md
index fbe3428..a4befa5 100644
--- a/website/docs/user-guide/rust/api-reference.md
+++ b/website/docs/user-guide/rust/api-reference.md
@@ -35,7 +35,7 @@ Complete API reference for the Fluss Rust client.
| Method
| Description |
|-------------------------------------------------------------------------------|------------------------------------------------|
| `async fn new(config: Config) -> Result<Self>`
| Create a new connection to a Fluss cluster |
-| `async fn get_admin(&self) -> Result<FlussAdmin>`
| Get the admin interface for cluster management |
+| `fn get_admin(&self) -> Result<Arc<FlussAdmin>>`
| Get the admin interface for cluster management |
| `async fn get_table(&self, table_path: &TablePath) ->
Result<FlussTable<'_>>` | Get a table for read/write operations |
| `fn config(&self) -> &Config`
| Get a reference to the connection config |
diff --git a/website/docs/user-guide/rust/error-handling.md
b/website/docs/user-guide/rust/error-handling.md
index b7e4b45..4966428 100644
--- a/website/docs/user-guide/rust/error-handling.md
+++ b/website/docs/user-guide/rust/error-handling.md
@@ -12,7 +12,7 @@ use fluss::error::{Error, Result};
// All operations return Result<T>
let conn = FlussConnection::new(config).await?;
-let admin = conn.get_admin().await?;
+let admin = conn.get_admin()?;
let table = conn.get_table(&table_path).await?;
```
@@ -217,7 +217,7 @@ use fluss::error::Result;
async fn my_pipeline() -> Result<()> {
let conn = FlussConnection::new(config).await?;
- let admin = conn.get_admin().await?;
+ let admin = conn.get_admin()?;
let table = conn.get_table(&table_path).await?;
let writer = table.new_append()?.create_writer()?;
writer.append(&row)?;
diff --git a/website/docs/user-guide/rust/example/admin-operations.md
b/website/docs/user-guide/rust/example/admin-operations.md
index d18ad0e..3975275 100644
--- a/website/docs/user-guide/rust/example/admin-operations.md
+++ b/website/docs/user-guide/rust/example/admin-operations.md
@@ -6,7 +6,7 @@ sidebar_position: 3
## Get Admin Interface
```rust
-let admin = conn.get_admin().await?;
+let admin = conn.get_admin()?;
```
## Database Operations
diff --git a/website/docs/user-guide/rust/example/index.md
b/website/docs/user-guide/rust/example/index.md
index e35c8dc..f1d5a68 100644
--- a/website/docs/user-guide/rust/example/index.md
+++ b/website/docs/user-guide/rust/example/index.md
@@ -19,7 +19,7 @@ async fn main() -> Result<()> {
let mut config = Config::default();
config.bootstrap_servers = "127.0.0.1:9123".to_string();
let conn = FlussConnection::new(config).await?;
- let admin = conn.get_admin().await?;
+ let admin = conn.get_admin()?;
// Create a log table
let table_path = TablePath::new("fluss", "quickstart_rust");
diff --git a/website/docs/user-guide/rust/example/log-tables.md
b/website/docs/user-guide/rust/example/log-tables.md
index 7c01cf1..0485779 100644
--- a/website/docs/user-guide/rust/example/log-tables.md
+++ b/website/docs/user-guide/rust/example/log-tables.md
@@ -106,7 +106,7 @@ To start reading only new records, first resolve the
current latest offset via `
```rust
use fluss::rpc::message::OffsetSpec;
-let admin = conn.get_admin().await?;
+let admin = conn.get_admin()?;
let offsets = admin.list_offsets(&table_path, &[0], OffsetSpec::Latest).await?;
let latest = offsets[&0];
log_scanner.subscribe(0, latest).await?;
diff --git a/website/docs/user-guide/rust/example/partitioned-tables.md
b/website/docs/user-guide/rust/example/partitioned-tables.md
index 40bd4d6..38517e1 100644
--- a/website/docs/user-guide/rust/example/partitioned-tables.md
+++ b/website/docs/user-guide/rust/example/partitioned-tables.md
@@ -65,7 +65,7 @@ For partitioned tables, use partition-aware subscribe methods.
use std::time::Duration;
let table = conn.get_table(&table_path).await?;
-let admin = conn.get_admin().await?;
+let admin = conn.get_admin()?;
let partitions = admin.list_partition_infos(&table_path).await?;
let log_scanner = table.new_scan().create_log_scanner()?;