This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch make_get_admin_sync in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
commit 1c763e8cc3e9df428c36ef3f43fc575bc8d4e1d8 Author: luoyuxia <[email protected]> AuthorDate: Mon Mar 23 10:00:53 2026 +0800 chore: make get admin sync --- bindings/cpp/src/lib.rs | 2 +- bindings/cpp/src/types.rs | 2 +- bindings/python/example/example.py | 2 +- bindings/python/fluss/__init__.pyi | 2 +- bindings/python/src/connection.rs | 18 +++++--------- bindings/python/src/table.rs | 1 - bindings/python/test/conftest.py | 4 +-- bindings/python/test/test_sasl_auth.py | 4 +-- crates/examples/src/example_kv_table.rs | 2 +- .../examples/src/example_partitioned_kv_table.rs | 2 +- crates/examples/src/example_table.rs | 2 +- crates/fluss/README.md | 2 +- crates/fluss/src/client/connection.rs | 2 +- crates/fluss/src/client/table/scanner.rs | 4 +-- crates/fluss/src/client/write/sender.rs | 3 +-- crates/fluss/src/lib.rs | 2 +- crates/fluss/src/metadata/data_lake_format.rs | 2 +- crates/fluss/src/metadata/datatype.rs | 6 ++--- crates/fluss/tests/integration/admin.rs | 29 ++++++++-------------- crates/fluss/tests/integration/kv_table.rs | 10 ++++---- crates/fluss/tests/integration/log_table.rs | 14 +++++------ crates/fluss/tests/integration/sasl_auth.rs | 2 -- .../fluss/tests/integration/table_remote_scan.rs | 2 +- website/docs/user-guide/python/api-reference.md | 2 +- .../user-guide/python/example/admin-operations.md | 2 +- website/docs/user-guide/python/example/index.md | 2 +- .../docs/user-guide/python/example/log-tables.md | 2 +- website/docs/user-guide/rust/api-reference.md | 2 +- website/docs/user-guide/rust/error-handling.md | 4 +-- .../user-guide/rust/example/admin-operations.md | 2 +- website/docs/user-guide/rust/example/index.md | 2 +- website/docs/user-guide/rust/example/log-tables.md | 2 +- .../user-guide/rust/example/partitioned-tables.md | 2 +- 33 files changed, 61 insertions(+), 80 deletions(-) diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index 82254ea..778fef3 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -712,7 +712,7 @@ unsafe fn delete_connection(conn: *mut Connection) { impl Connection { fn get_admin(&self) -> ffi::FfiPtrResult { - let admin_result = RUNTIME.block_on(async { self.inner.get_admin().await }); + let admin_result = self.inner.get_admin(); match admin_result { Ok(admin) => { diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs index f8efe67..3c0c6f7 100644 --- a/bindings/cpp/src/types.rs +++ b/bindings/cpp/src/types.rs @@ -336,7 +336,7 @@ pub fn resolve_row_types( Some(fcore::metadata::DataType::Decimal(dt)) => { let (precision, scale) = (dt.precision(), dt.scale()); let bd = bigdecimal::BigDecimal::from_str(cow.as_ref()).map_err(|e| { - anyhow!("Column {idx}: invalid decimal string '{}': {e}", cow) + anyhow!("Column {idx}: invalid decimal string '{cow}': {e}") })?; let decimal = fcore::row::Decimal::from_big_decimal(bd, precision, scale) .map_err(|e| anyhow!("Column {idx}: {e}"))?; diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index 3564d91..52cefe1 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -65,7 +65,7 @@ async def main(): table_descriptor = fluss.TableDescriptor(fluss_schema) # Get the admin for Fluss - admin = await conn.get_admin() + admin = conn.get_admin() # Create a Fluss table table_path = fluss.TablePath("fluss", "sample_table_types") diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index 95f3080..9778457 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -233,7 +233,7 @@ class Config: class FlussConnection: @staticmethod async def create(config: Config) -> FlussConnection: ... - async def get_admin(self) -> FlussAdmin: ... + def get_admin(self) -> FlussAdmin: ... async def get_table(self, table_path: TablePath) -> FlussTable: ... def close(self) -> None: ... def __enter__(self) -> FlussConnection: ... diff --git a/bindings/python/src/connection.rs b/bindings/python/src/connection.rs index 31a9505..a8d2d9e 100644 --- a/bindings/python/src/connection.rs +++ b/bindings/python/src/connection.rs @@ -46,19 +46,13 @@ impl FlussConnection { } /// Get admin interface - fn get_admin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> { - let client = self.inner.clone(); + fn get_admin(&self, py: Python<'_>) -> PyResult<Py<FlussAdmin>> { + let admin = self + .inner + .get_admin() + .map_err(|e| FlussError::from_core_error(&e))?; - future_into_py(py, async move { - let admin = client - .get_admin() - .await - .map_err(|e| FlussError::from_core_error(&e))?; - - let py_admin = FlussAdmin::from_core(admin); - - Python::attach(|py| Py::new(py, py_admin)) - }) + Py::new(py, FlussAdmin::from_core(admin)) } /// Get a table diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index 8c9ea0e..cda8d3b 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -508,7 +508,6 @@ impl TableScan { let admin = conn .get_admin() - .await .map_err(|e| FlussError::from_core_error(&e))?; let (projected_schema, projected_row_type) = diff --git a/bindings/python/test/conftest.py b/bindings/python/test/conftest.py index 0e4cfe4..420747e 100644 --- a/bindings/python/test/conftest.py +++ b/bindings/python/test/conftest.py @@ -208,7 +208,7 @@ async def _connect_with_retry(bootstrap_servers, timeout=60): conn = None try: conn = await fluss.FlussConnection.create(config) - admin = await conn.get_admin() + admin = conn.get_admin() nodes = await admin.get_server_nodes() if any(n.server_type == "TabletServer" for n in nodes): return conn @@ -281,4 +281,4 @@ def plaintext_bootstrap_servers(fluss_cluster): @pytest_asyncio.fixture(scope="session") async def admin(connection): """Session-scoped admin client.""" - return await connection.get_admin() + return connection.get_admin() diff --git a/bindings/python/test/test_sasl_auth.py b/bindings/python/test/test_sasl_auth.py index 30fce4c..9dd2ddd 100644 --- a/bindings/python/test/test_sasl_auth.py +++ b/bindings/python/test/test_sasl_auth.py @@ -35,7 +35,7 @@ async def test_sasl_connect_with_valid_credentials(sasl_bootstrap_servers): "security.sasl.password": "admin-secret", }) conn = await fluss.FlussConnection.create(config) - admin = await conn.get_admin() + admin = conn.get_admin() db_name = "py_sasl_test_valid_db" db_descriptor = fluss.DatabaseDescriptor(comment="created via SASL auth") @@ -58,7 +58,7 @@ async def test_sasl_connect_with_second_user(sasl_bootstrap_servers): "security.sasl.password": "alice-secret", }) conn = await fluss.FlussConnection.create(config) - admin = await conn.get_admin() + admin = conn.get_admin() # Basic operation to confirm functional connection assert not await admin.database_exists("some_nonexistent_db_alice") diff --git a/crates/examples/src/example_kv_table.rs b/crates/examples/src/example_kv_table.rs index 8fb60ba..ad12ed7 100644 --- a/crates/examples/src/example_kv_table.rs +++ b/crates/examples/src/example_kv_table.rs @@ -43,7 +43,7 @@ pub async fn main() -> Result<()> { let table_path = TablePath::new("fluss", "rust_upsert_lookup_example"); - let admin = conn.get_admin().await?; + let admin = conn.get_admin()?; admin .create_table(&table_path, &table_descriptor, true) .await?; diff --git a/crates/examples/src/example_partitioned_kv_table.rs b/crates/examples/src/example_partitioned_kv_table.rs index ba49934..944d8d4 100644 --- a/crates/examples/src/example_partitioned_kv_table.rs +++ b/crates/examples/src/example_partitioned_kv_table.rs @@ -46,7 +46,7 @@ pub async fn main() -> Result<()> { let table_path = TablePath::new("fluss", "partitioned_kv_example"); - let admin = conn.get_admin().await?; + let admin = conn.get_admin()?; admin .create_table(&table_path, &table_descriptor, true) .await?; diff --git a/crates/examples/src/example_table.rs b/crates/examples/src/example_table.rs index 49f0ab4..1f751f3 100644 --- a/crates/examples/src/example_table.rs +++ b/crates/examples/src/example_table.rs @@ -49,7 +49,7 @@ pub async fn main() -> Result<()> { let table_path = TablePath::new("fluss", "rust_test_long"); - let admin = conn.get_admin().await?; + let admin = conn.get_admin()?; admin .create_table(&table_path, &table_descriptor, true) diff --git a/crates/fluss/README.md b/crates/fluss/README.md index b37a75d..76dc0ec 100644 --- a/crates/fluss/README.md +++ b/crates/fluss/README.md @@ -23,7 +23,7 @@ async fn main() -> Result<()> { let mut config = Config::default(); config.bootstrap_servers = "127.0.0.1:9123".to_string(); let connection = FlussConnection::new(config).await?; - let admin = connection.get_admin().await?; + let admin = connection.get_admin()?; // ---- Primary key (KV) table: upsert and lookup ---- let kv_path = TablePath::new("fluss", "users"); diff --git a/crates/fluss/src/client/connection.rs b/crates/fluss/src/client/connection.rs index 7dc1285..88f87e4 100644 --- a/crates/fluss/src/client/connection.rs +++ b/crates/fluss/src/client/connection.rs @@ -82,7 +82,7 @@ impl FlussConnection { &self.args } - pub async fn get_admin(&self) -> Result<Arc<FlussAdmin>> { + pub fn get_admin(&self) -> Result<Arc<FlussAdmin>> { // 1. Fast path: return cached instance if already initialized. if let Some(admin) = self.admin_client.read().as_ref() { return Ok(admin.clone()); diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 4302539..ab7d441 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -94,7 +94,7 @@ impl<'a> TableScan<'a> { /// .build()?, /// ).build()?; /// let table_path = TablePath::new("fluss".to_owned(), "rust_test_long".to_owned()); - /// let admin = conn.get_admin().await?; + /// let admin = conn.get_admin()?; /// admin.create_table(&table_path, &table_descriptor, true) /// .await?; /// let table_info = admin.get_table_info(&table_path).await?; @@ -169,7 +169,7 @@ impl<'a> TableScan<'a> { /// .build()?, /// ).build()?; /// let table_path = TablePath::new("fluss".to_owned(), "rust_test_long".to_owned()); - /// let admin = conn.get_admin().await?; + /// let admin = conn.get_admin()?; /// admin.create_table(&table_path, &table_descriptor, true) /// .await?; /// let table = conn.get_table(&table_path).await?; diff --git a/crates/fluss/src/client/write/sender.rs b/crates/fluss/src/client/write/sender.rs index dd5370d..efa3783 100644 --- a/crates/fluss/src/client/write/sender.rs +++ b/crates/fluss/src/client/write/sender.rs @@ -269,8 +269,7 @@ impl Sender { } debug!( - "Updated metadata for unknown leader tables: {:?}", - unknown_leaders + "Updated metadata for unknown leader tables: {unknown_leaders:?}" ); Ok(()) } diff --git a/crates/fluss/src/lib.rs b/crates/fluss/src/lib.rs index 13e8598..6537189 100644 --- a/crates/fluss/src/lib.rs +++ b/crates/fluss/src/lib.rs @@ -40,7 +40,7 @@ //! let mut config = Config::default(); //! config.bootstrap_servers = "127.0.0.1:9123".to_string(); //! let connection = FlussConnection::new(config).await?; -//! let admin = connection.get_admin().await?; +//! let admin = connection.get_admin()?; //! //! // ---- Primary key (KV) table: upsert and lookup ---- //! let kv_path = TablePath::new("fluss", "users"); diff --git a/crates/fluss/src/metadata/data_lake_format.rs b/crates/fluss/src/metadata/data_lake_format.rs index b0c3b0d..77e5ad3 100644 --- a/crates/fluss/src/metadata/data_lake_format.rs +++ b/crates/fluss/src/metadata/data_lake_format.rs @@ -54,7 +54,7 @@ mod tests { for (raw, expected) in cases { let parsed = raw.parse::<DataLakeFormat>().unwrap(); - assert_eq!(parsed, expected, "failed to parse: {}", raw); + assert_eq!(parsed, expected, "failed to parse: {raw}"); } // negative cases diff --git a/crates/fluss/src/metadata/datatype.rs b/crates/fluss/src/metadata/datatype.rs index 6d888d9..d9eb56b 100644 --- a/crates/fluss/src/metadata/datatype.rs +++ b/crates/fluss/src/metadata/datatype.rs @@ -954,7 +954,7 @@ impl RowType { .iter() .map(|name| { self.get_field_index(name).ok_or_else(|| IllegalArgument { - message: format!("Field '{}' does not exist in the row type", name), + message: format!("Field '{name}' does not exist in the row type"), }) }) .collect::<Result<Vec<_>>>()?; @@ -1522,7 +1522,7 @@ fn test_time_valid_precision() { // Test all valid precision values 0 through 9 for precision in 0..=9 { let result = TimeType::with_nullable(true, precision); - assert!(result.is_ok(), "precision {} should be valid", precision); + assert!(result.is_ok(), "precision {precision} should be valid"); let time = result.unwrap(); assert_eq!(time.precision(), precision); } @@ -1550,7 +1550,7 @@ fn test_timestamp_valid_precision() { // Test all valid precision values 0 through 9 for precision in 0..=9 { let result = TimestampType::with_nullable(true, precision); - assert!(result.is_ok(), "precision {} should be valid", precision); + assert!(result.is_ok(), "precision {precision} should be valid"); let timestamp_type = result.unwrap(); assert_eq!(timestamp_type.precision(), precision); } diff --git a/crates/fluss/tests/integration/admin.rs b/crates/fluss/tests/integration/admin.rs index c647704..0860cbe 100644 --- a/crates/fluss/tests/integration/admin.rs +++ b/crates/fluss/tests/integration/admin.rs @@ -30,7 +30,7 @@ mod admin_test { let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; - let admin = connection.get_admin().await.expect("should get admin"); + let admin = connection.get_admin().expect("should get admin"); let db_descriptor = DatabaseDescriptorBuilder::default() .comment("test_db") @@ -73,10 +73,7 @@ mod admin_test { async fn test_create_table() { let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; - let admin = connection - .get_admin() - .await - .expect("Failed to get admin client"); + let admin = connection.get_admin().expect("Failed to get admin client"); let test_db_name = "test_create_table_db"; let db_descriptor = DatabaseDescriptorBuilder::default() @@ -202,10 +199,7 @@ mod admin_test { async fn test_partition_apis() { let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; - let admin = connection - .get_admin() - .await - .expect("Failed to get admin client"); + let admin = connection.get_admin().expect("Failed to get admin client"); let test_db_name = "test_partition_apis_db"; let db_descriptor = DatabaseDescriptorBuilder::default() @@ -341,10 +335,7 @@ mod admin_test { async fn test_fluss_error_response() { let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; - let admin = connection - .get_admin() - .await - .expect("Failed to get admin client"); + let admin = connection.get_admin().expect("Failed to get admin client"); let table_path = TablePath::new("fluss", "not_exist"); @@ -375,7 +366,7 @@ mod admin_test { async fn test_error_database_not_exist() { let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; - let admin = connection.get_admin().await.unwrap(); + let admin = connection.get_admin().unwrap(); // get_database_info for non-existent database let result = admin.get_database_info("no_such_db").await; @@ -394,7 +385,7 @@ mod admin_test { async fn test_error_database_already_exist() { let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; - let admin = connection.get_admin().await.unwrap(); + let admin = connection.get_admin().unwrap(); let db_name = "test_error_db_already_exist"; let descriptor = DatabaseDescriptorBuilder::default().build(); @@ -424,7 +415,7 @@ mod admin_test { async fn test_error_table_already_exist() { let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; - let admin = connection.get_admin().await.unwrap(); + let admin = connection.get_admin().unwrap(); let db_name = "test_error_tbl_already_exist_db"; let descriptor = DatabaseDescriptorBuilder::default().build(); @@ -472,7 +463,7 @@ mod admin_test { async fn test_error_table_not_exist() { let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; - let admin = connection.get_admin().await.unwrap(); + let admin = connection.get_admin().unwrap(); let table_path = TablePath::new("fluss", "no_such_table"); @@ -491,7 +482,7 @@ mod admin_test { async fn test_get_server_nodes() { let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; - let admin = connection.get_admin().await.unwrap(); + let admin = connection.get_admin().unwrap(); let nodes = admin .get_server_nodes() @@ -534,7 +525,7 @@ mod admin_test { async fn test_error_table_not_partitioned() { let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; - let admin = connection.get_admin().await.unwrap(); + let admin = connection.get_admin().unwrap(); let db_name = "test_error_not_partitioned_db"; let descriptor = DatabaseDescriptorBuilder::default().build(); diff --git a/crates/fluss/tests/integration/kv_table.rs b/crates/fluss/tests/integration/kv_table.rs index f0e0c57..b942988 100644 --- a/crates/fluss/tests/integration/kv_table.rs +++ b/crates/fluss/tests/integration/kv_table.rs @@ -33,7 +33,7 @@ mod kv_table_test { let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; - let admin = connection.get_admin().await.unwrap(); + let admin = connection.get_admin().unwrap(); let table_path = TablePath::new("fluss", "test_upsert_and_lookup"); @@ -172,7 +172,7 @@ mod kv_table_test { let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; - let admin = connection.get_admin().await.unwrap(); + let admin = connection.get_admin().unwrap(); let table_path = TablePath::new("fluss", "test_composite_pk"); @@ -282,7 +282,7 @@ mod kv_table_test { let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; - let admin = connection.get_admin().await.expect("Failed to get admin"); + let admin = connection.get_admin().expect("Failed to get admin"); let table_path = TablePath::new("fluss", "test_partial_update"); @@ -403,7 +403,7 @@ mod kv_table_test { let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; - let admin = connection.get_admin().await.expect("Failed to get admin"); + let admin = connection.get_admin().expect("Failed to get admin"); let table_path = TablePath::new("fluss", "test_partitioned_kv_table"); @@ -573,7 +573,7 @@ mod kv_table_test { let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; - let admin = connection.get_admin().await.expect("Failed to get admin"); + let admin = connection.get_admin().expect("Failed to get admin"); let table_path = TablePath::new("fluss", "test_all_datatypes"); diff --git a/crates/fluss/tests/integration/log_table.rs b/crates/fluss/tests/integration/log_table.rs index 4aa88ac..330e143 100644 --- a/crates/fluss/tests/integration/log_table.rs +++ b/crates/fluss/tests/integration/log_table.rs @@ -33,7 +33,7 @@ mod table_test { let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; - let admin = connection.get_admin().await.expect("Failed to get admin"); + let admin = connection.get_admin().expect("Failed to get admin"); let table_path = TablePath::new("fluss", "test_append_record_batch_and_scan"); @@ -143,7 +143,7 @@ mod table_test { let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; - let admin = connection.get_admin().await.expect("Failed to get admin"); + let admin = connection.get_admin().expect("Failed to get admin"); let table_path = TablePath::new("fluss", "test_list_offsets"); @@ -296,7 +296,7 @@ mod table_test { let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; - let admin = connection.get_admin().await.expect("Failed to get admin"); + let admin = connection.get_admin().expect("Failed to get admin"); let table_path = TablePath::new("fluss", "test_project"); @@ -451,7 +451,7 @@ mod table_test { async fn test_poll_batches() { let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; - let admin = connection.get_admin().await.expect("Failed to get admin"); + let admin = connection.get_admin().expect("Failed to get admin"); let table_path = TablePath::new("fluss", "test_poll_batches"); let schema = Schema::builder() @@ -584,7 +584,7 @@ mod table_test { let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; - let admin = connection.get_admin().await.expect("Failed to get admin"); + let admin = connection.get_admin().expect("Failed to get admin"); let table_path = TablePath::new("fluss", "test_log_all_datatypes"); @@ -1015,7 +1015,7 @@ mod table_test { let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; - let admin = connection.get_admin().await.expect("Failed to get admin"); + let admin = connection.get_admin().expect("Failed to get admin"); let table_path = TablePath::new("fluss", "test_partitioned_log_append"); @@ -1309,7 +1309,7 @@ mod table_test { async fn undersized_row_returns_error() { let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; - let admin = connection.get_admin().await.expect("Failed to get admin"); + let admin = connection.get_admin().expect("Failed to get admin"); let table_path = TablePath::new("fluss", "test_log_undersized_row"); diff --git a/crates/fluss/tests/integration/sasl_auth.rs b/crates/fluss/tests/integration/sasl_auth.rs index 878c983..439d65f 100644 --- a/crates/fluss/tests/integration/sasl_auth.rs +++ b/crates/fluss/tests/integration/sasl_auth.rs @@ -36,7 +36,6 @@ mod sasl_auth_test { let admin = connection .get_admin() - .await .expect("Should get admin with valid SASL credentials"); // Perform a basic operation to confirm the connection is fully functional @@ -69,7 +68,6 @@ mod sasl_auth_test { let admin = connection .get_admin() - .await .expect("Should get admin with alice credentials"); // Basic operation to confirm functional connection diff --git a/crates/fluss/tests/integration/table_remote_scan.rs b/crates/fluss/tests/integration/table_remote_scan.rs index 52b8974..293c1f7 100644 --- a/crates/fluss/tests/integration/table_remote_scan.rs +++ b/crates/fluss/tests/integration/table_remote_scan.rs @@ -28,7 +28,7 @@ mod table_remote_scan_test { let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; - let admin = connection.get_admin().await.expect("Failed to get admin"); + let admin = connection.get_admin().expect("Failed to get admin"); let table_path = TablePath::new("fluss", "test_scan_remote_log"); diff --git a/website/docs/user-guide/python/api-reference.md b/website/docs/user-guide/python/api-reference.md index a4b594b..1268d37 100644 --- a/website/docs/user-guide/python/api-reference.md +++ b/website/docs/user-guide/python/api-reference.md @@ -36,7 +36,7 @@ Complete API reference for the Fluss Python client. | Method | Description | |-----------------------------------------------------------|---------------------------------------| | `await FlussConnection.create(config) -> FlussConnection` | Connect to a Fluss cluster | -| `await conn.get_admin() -> FlussAdmin` | Get admin interface | +| `conn.get_admin() -> FlussAdmin` | Get admin interface | | `await conn.get_table(table_path) -> FlussTable` | Get a table for read/write operations | | `conn.close()` | Close the connection | diff --git a/website/docs/user-guide/python/example/admin-operations.md b/website/docs/user-guide/python/example/admin-operations.md index ba3748a..2cda6c4 100644 --- a/website/docs/user-guide/python/example/admin-operations.md +++ b/website/docs/user-guide/python/example/admin-operations.md @@ -4,7 +4,7 @@ sidebar_position: 3 # Admin Operations ```python -admin = await conn.get_admin() +admin = conn.get_admin() ``` ## Databases diff --git a/website/docs/user-guide/python/example/index.md b/website/docs/user-guide/python/example/index.md index 389b648..ec9fa78 100644 --- a/website/docs/user-guide/python/example/index.md +++ b/website/docs/user-guide/python/example/index.md @@ -14,7 +14,7 @@ async def main(): # Connect config = fluss.Config({"bootstrap.servers": "127.0.0.1:9123"}) conn = await fluss.FlussConnection.create(config) - admin = await conn.get_admin() + admin = conn.get_admin() # Create a log table schema = fluss.Schema(pa.schema([ diff --git a/website/docs/user-guide/python/example/log-tables.md b/website/docs/user-guide/python/example/log-tables.md index adaa162..c320bf4 100644 --- a/website/docs/user-guide/python/example/log-tables.md +++ b/website/docs/user-guide/python/example/log-tables.md @@ -112,7 +112,7 @@ scanner.unsubscribe(bucket_id=0) To only consume new records (skip existing data), first resolve the current latest offset via `list_offsets`, then subscribe at that offset: ```python -admin = await conn.get_admin() +admin = conn.get_admin() offsets = await admin.list_offsets(table_path, [0], fluss.OffsetSpec.latest()) latest = offsets[0] diff --git a/website/docs/user-guide/rust/api-reference.md b/website/docs/user-guide/rust/api-reference.md index fbe3428..4408f35 100644 --- a/website/docs/user-guide/rust/api-reference.md +++ b/website/docs/user-guide/rust/api-reference.md @@ -35,7 +35,7 @@ Complete API reference for the Fluss Rust client. | Method | Description | |-------------------------------------------------------------------------------|------------------------------------------------| | `async fn new(config: Config) -> Result<Self>` | Create a new connection to a Fluss cluster | -| `async fn get_admin(&self) -> Result<FlussAdmin>` | Get the admin interface for cluster management | +| `fn get_admin(&self) -> Result<FlussAdmin>` | Get the admin interface for cluster management | | `async fn get_table(&self, table_path: &TablePath) -> Result<FlussTable<'_>>` | Get a table for read/write operations | | `fn config(&self) -> &Config` | Get a reference to the connection config | diff --git a/website/docs/user-guide/rust/error-handling.md b/website/docs/user-guide/rust/error-handling.md index b7e4b45..4966428 100644 --- a/website/docs/user-guide/rust/error-handling.md +++ b/website/docs/user-guide/rust/error-handling.md @@ -12,7 +12,7 @@ use fluss::error::{Error, Result}; // All operations return Result<T> let conn = FlussConnection::new(config).await?; -let admin = conn.get_admin().await?; +let admin = conn.get_admin()?; let table = conn.get_table(&table_path).await?; ``` @@ -217,7 +217,7 @@ use fluss::error::Result; async fn my_pipeline() -> Result<()> { let conn = FlussConnection::new(config).await?; - let admin = conn.get_admin().await?; + let admin = conn.get_admin()?; let table = conn.get_table(&table_path).await?; let writer = table.new_append()?.create_writer()?; writer.append(&row)?; diff --git a/website/docs/user-guide/rust/example/admin-operations.md b/website/docs/user-guide/rust/example/admin-operations.md index d18ad0e..3975275 100644 --- a/website/docs/user-guide/rust/example/admin-operations.md +++ b/website/docs/user-guide/rust/example/admin-operations.md @@ -6,7 +6,7 @@ sidebar_position: 3 ## Get Admin Interface ```rust -let admin = conn.get_admin().await?; +let admin = conn.get_admin()?; ``` ## Database Operations diff --git a/website/docs/user-guide/rust/example/index.md b/website/docs/user-guide/rust/example/index.md index e35c8dc..f1d5a68 100644 --- a/website/docs/user-guide/rust/example/index.md +++ b/website/docs/user-guide/rust/example/index.md @@ -19,7 +19,7 @@ async fn main() -> Result<()> { let mut config = Config::default(); config.bootstrap_servers = "127.0.0.1:9123".to_string(); let conn = FlussConnection::new(config).await?; - let admin = conn.get_admin().await?; + let admin = conn.get_admin()?; // Create a log table let table_path = TablePath::new("fluss", "quickstart_rust"); diff --git a/website/docs/user-guide/rust/example/log-tables.md b/website/docs/user-guide/rust/example/log-tables.md index 7c01cf1..0485779 100644 --- a/website/docs/user-guide/rust/example/log-tables.md +++ b/website/docs/user-guide/rust/example/log-tables.md @@ -106,7 +106,7 @@ To start reading only new records, first resolve the current latest offset via ` ```rust use fluss::rpc::message::OffsetSpec; -let admin = conn.get_admin().await?; +let admin = conn.get_admin()?; let offsets = admin.list_offsets(&table_path, &[0], OffsetSpec::Latest).await?; let latest = offsets[&0]; log_scanner.subscribe(0, latest).await?; diff --git a/website/docs/user-guide/rust/example/partitioned-tables.md b/website/docs/user-guide/rust/example/partitioned-tables.md index 40bd4d6..38517e1 100644 --- a/website/docs/user-guide/rust/example/partitioned-tables.md +++ b/website/docs/user-guide/rust/example/partitioned-tables.md @@ -65,7 +65,7 @@ For partitioned tables, use partition-aware subscribe methods. use std::time::Duration; let table = conn.get_table(&table_path).await?; -let admin = conn.get_admin().await?; +let admin = conn.get_admin()?; let partitions = admin.list_partition_infos(&table_path).await?; let log_scanner = table.new_scan().create_log_scanner()?;
