Copilot commented on code in PR #369:
URL: https://github.com/apache/fluss-rust/pull/369#discussion_r2870767395
##########
crates/fluss/src/client/connection.rs:
##########
@@ -76,7 +79,16 @@ impl FlussConnection {
}
pub async fn get_admin(&self) -> Result<FlussAdmin> {
- FlussAdmin::new(self.network_connects.clone(),
self.metadata.clone()).await
+ // Lazily initialize and cache the FlussAdmin instance. The cached
FlussAdmin
+ // holds a reference to RpcClient, which manages connection reuse and
re-acquisition
+ // when a cached connection becomes poisoned. Subsequent calls clone
cheaply —
+ // all internal fields (ServerConnection, Arc<Metadata>,
Arc<RpcClient>) are
+ // Arc-backed so cloning is just a reference-count bump.
+ let admin = self
+ .admin_client
+ .get_or_try_init(|| FlussAdmin::new(self.network_connects.clone(),
self.metadata.clone()))
+ .await?;
+ Ok(admin.clone())
Review Comment:
Caching `FlussAdmin` in a `tokio::sync::OnceCell` makes the admin instance
effectively permanent for the lifetime of the connection. If the coordinator
changes (metadata refresh) or the underlying `ServerConnection` becomes
poisoned, `get_admin()` will keep returning the same cached instance and
callers may be unable to recover without constructing a new `FlussConnection`.
Consider using a cache that supports invalidation/refresh (e.g., async lock
around an `Option<FlussAdmin>`), or make `FlussAdmin` acquire/refresh its
`admin_gateway` on demand when the current connection is poisoned or when
requests return `InvalidCoordinatorException`.
##########
crates/fluss/src/client/connection.rs:
##########
@@ -76,7 +79,16 @@ impl FlussConnection {
}
pub async fn get_admin(&self) -> Result<FlussAdmin> {
- FlussAdmin::new(self.network_connects.clone(),
self.metadata.clone()).await
+ // Lazily initialize and cache the FlussAdmin instance. The cached
FlussAdmin
+ // holds a reference to RpcClient, which manages connection reuse and
re-acquisition
+ // when a cached connection becomes poisoned. Subsequent calls clone
cheaply —
+ // all internal fields (ServerConnection, Arc<Metadata>,
Arc<RpcClient>) are
+ // Arc-backed so cloning is just a reference-count bump.
Review Comment:
The doc comment here suggests the cached `FlussAdmin` will automatically
re-acquire connections when a cached connection becomes poisoned, but
`FlussAdmin` stores a concrete `admin_gateway: ServerConnection` created once
in `FlussAdmin::new()`. `RpcClient` only re-connects when `get_connection()` is
called again, so this comment is misleading and the cached admin can keep
returning a poisoned gateway indefinitely. Please adjust the comment and/or
implement a reconnection path in `FlussAdmin`/`get_admin()` that actually
re-acquires a coordinator connection on poison/failover.
```suggestion
// Lazily initialize and cache the FlussAdmin instance. Subsequent
calls
// return a clone of the same cached admin. All internal fields
// (ServerConnection, Arc<Metadata>, Arc<RpcClient>) are Arc-backed,
so
// cloning is effectively just a reference-count bump.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]