Copilot commented on code in PR #375:
URL: https://github.com/apache/fluss-rust/pull/375#discussion_r2852956180
##########
bindings/cpp/src/lib.rs:
##########
@@ -243,6 +248,9 @@ mod ffi {
type LookupResultInner;
// Connection
+ // TODO: all Result<*mut T> methods lose server error codes (mapped to
CLIENT_ERROR).
+ // Fix by introducing some struct like { result: FfiResult, ptr: i64
} to preserve error
+ // codes from the server, matching how Rust and Python bindings handle
errors.
fn new_connection(config: &FfiConfig) -> Result<*mut Connection>;
Review Comment:
The C++ error code TODO is accurate: the Result type for `new_connection`
loses the server error code when authentication fails. Authentication errors
will appear as CLIENT_ERROR (-2) instead of AUTHENTICATE_EXCEPTION (46). The
workaround in the test (checking for "Authentication failed" in the error
message) is brittle. This is a known limitation documented in the code, but
users may experience degraded error diagnostics. Consider prioritizing the fix
suggested in the TODO comment to preserve error codes through FFI boundaries.
```suggestion
fn new_connection(config: &FfiConfig, out_conn: &mut *mut
Connection) -> FfiResult;
```
##########
crates/fluss/tests/integration/fluss_cluster.rs:
##########
@@ -210,42 +344,99 @@ impl FlussTestingClusterBuilder {
/// Provides an easy way to launch a Fluss cluster with coordinator and tablet
servers.
#[derive(Clone)]
+#[allow(dead_code)] // Fields held for RAII (keeping Docker containers alive).
pub struct FlussTestingCluster {
zookeeper: Arc<ContainerAsync<GenericImage>>,
coordinator_server: Arc<ContainerAsync<GenericImage>>,
tablet_servers: HashMap<i32, Arc<ContainerAsync<GenericImage>>>,
+ /// Bootstrap servers for plaintext connections.
+ /// When dual listeners are configured, this points to the PLAIN_CLIENT
listener.
bootstrap_servers: String,
+ /// Bootstrap servers for SASL connections (only set when dual listeners
are configured).
+ sasl_bootstrap_servers: Option<String>,
remote_data_dir: Option<std::path::PathBuf>,
+ sasl_users: Vec<(String, String)>,
+ container_names: Vec<String>,
}
impl FlussTestingCluster {
- pub async fn stop(&self) {
- for tablet_server in self.tablet_servers.values() {
- tablet_server.stop().await.unwrap()
+ /// Synchronously stops and removes all Docker containers and cleans up the
+ /// remote data directory. Safe to call from non-async contexts (e.g.
atexit).
+ #[allow(dead_code)]
+ pub fn stop(&self) {
+ for name in &self.container_names {
+ let _ = std::process::Command::new("docker")
+ .args(["rm", "-f", name])
+ .output();
}
- self.coordinator_server.stop().await.unwrap();
- self.zookeeper.stop().await.unwrap();
- if let Some(remote_data_dir) = &self.remote_data_dir {
- // Try to clean up the remote data directory, but don't fail if it
can't be deleted.
- // This can happen in CI environments or if Docker containers are
still using the directory.
- // The directory will be cleaned up by the CI system or OS
eventually.
- if let Err(e) = tokio::fs::remove_dir_all(remote_data_dir).await {
- eprintln!(
- "Warning: Failed to delete remote data directory: {:?},
error: {:?}. \
- This is non-fatal and the directory may be cleaned up
later.",
- remote_data_dir, e
- );
- }
+ if let Some(ref dir) = self.remote_data_dir {
+ let _ = std::fs::remove_dir_all(dir);
}
}
Review Comment:
The cluster cleanup uses synchronous Docker commands in an atexit handler,
which is appropriate for process termination. However, if `stop()` is called
explicitly during test execution (e.g., in teardown), it won't wait for
containers to fully stop before returning, potentially causing issues if tests
run immediately after. Consider adding a small delay or polling mechanism to
verify containers have stopped, or document that this is intentional for fast
test execution.
##########
crates/fluss/tests/integration/fluss_cluster.rs:
##########
@@ -25,17 +25,29 @@ use testcontainers::core::ContainerPort;
use testcontainers::runners::AsyncRunner;
use testcontainers::{ContainerAsync, GenericImage, ImageExt};
-const FLUSS_VERSION: &str = "0.7.0";
+const FLUSS_VERSION: &str = "0.8.0-incubating";
Review Comment:
The Fluss server version was updated from 0.7.0 to 0.8.0-incubating to
support SASL authentication. Ensure that the 0.8.0-incubating Docker image
(apache/fluss:0.8.0-incubating) is available in the Docker registry before
merging, as CI and local development will pull this image. If the image is not
yet published, tests will fail with image pull errors.
```suggestion
const FLUSS_VERSION: &str = "0.7.0";
```
##########
crates/fluss/tests/integration/utils.rs:
##########
@@ -18,20 +18,97 @@
use crate::integration::fluss_cluster::{FlussTestingCluster,
FlussTestingClusterBuilder};
use fluss::client::FlussAdmin;
use fluss::metadata::{PartitionSpec, TableDescriptor, TablePath};
-use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::Arc;
+use std::sync::LazyLock;
use std::time::Duration;
-/// Polls the cluster until CoordinatorEventProcessor is initialized and
tablet server is available.
-/// Times out after 20 seconds.
-pub async fn wait_for_cluster_ready(cluster: &FlussTestingCluster) {
- let timeout = Duration::from_secs(20);
+extern "C" fn cleanup_on_exit() {
+ SHARED_CLUSTER.stop();
+}
+
+/// Shared cluster with dual listeners: PLAIN_CLIENT (plaintext) on port 9223
+/// and CLIENT (SASL) on port 9123. Includes remote storage config so
+/// table_remote_scan can also use this cluster.
+static SHARED_CLUSTER: LazyLock<FlussTestingCluster> = LazyLock::new(|| {
+ std::thread::spawn(|| {
+ let rt = tokio::runtime::Runtime::new().expect("Failed to create
runtime");
+ rt.block_on(async {
+ let temp_dir = std::env::current_dir()
+ .unwrap_or_else(|_| std::path::PathBuf::from("."))
+ .join("target")
+ .join(format!("test-remote-data-{}", uuid::Uuid::new_v4()));
+ let _ = std::fs::remove_dir_all(&temp_dir);
+ std::fs::create_dir_all(&temp_dir)
+ .expect("Failed to create temporary directory for remote
data");
+ let temp_dir = temp_dir
+ .canonicalize()
+ .expect("Failed to canonicalize remote data directory path");
+
+ let mut cluster_conf = HashMap::new();
+ cluster_conf.insert("log.segment.file-size".to_string(),
"120b".to_string());
+ cluster_conf.insert(
+ "remote.log.task-interval-duration".to_string(),
+ "1s".to_string(),
+ );
+
+ let cluster =
+
FlussTestingClusterBuilder::new_with_cluster_conf("shared-test", &cluster_conf)
+ .with_sasl(vec![
+ ("admin".to_string(), "admin-secret".to_string()),
+ ("alice".to_string(), "alice-secret".to_string()),
+ ])
+ .with_remote_data_dir(temp_dir)
+ .build()
+ .await;
+ wait_for_cluster_ready_with_sasl(&cluster).await;
+
+ // Register cleanup so containers are removed on process exit.
+ unsafe {
+ unsafe extern "C" {
+ fn atexit(f: extern "C" fn()) -> std::os::raw::c_int;
+ }
+ atexit(cleanup_on_exit);
+ }
+
+ cluster
+ })
+ })
+ .join()
+ .expect("Failed to initialize shared cluster")
+});
Review Comment:
The shared cluster is initialized in a LazyLock with a thread spawn and
runtime creation. If initialization fails (e.g., Docker not available), the
panic message from `expect("Failed to initialize shared cluster")` may not be
clear about the root cause. Consider improving error messages throughout the
initialization chain to help developers diagnose issues like "Docker not found"
or "Port 9123 already in use".
##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -100,7 +134,74 @@ impl RpcClient {
self.max_message_size,
self.client_id.clone(),
);
- Ok(ServerConnection::new(messenger))
+ let connection = ServerConnection::new(messenger);
+
+ if let Some(ref sasl) = self.sasl_config {
+ Self::authenticate(&connection, &sasl.username,
&sasl.password).await?;
+ }
+
+ Ok(connection)
+ }
+
+ /// Perform SASL/PLAIN authentication handshake.
+ ///
+ /// Retries on `RetriableAuthenticateException` with exponential backoff
+ /// (matching Java's unbounded retry behaviour). Non-retriable errors
+ /// (wrong password, unknown user) propagate immediately as
+ /// `Error::FlussAPIError` with the original error code.
+ async fn authenticate(
+ connection: &ServerConnection,
+ username: &str,
+ password: &str,
+ ) -> Result<(), Error> {
+ use crate::rpc::fluss_api_error::FlussError;
+ use crate::rpc::message::AuthenticateRequest;
+ use rand::Rng;
+
+ let initial_request = AuthenticateRequest::new_plain(username,
password);
+ let mut retry_count: u32 = 0;
+
+ loop {
+ let request = initial_request.clone();
+ let result = connection.request(request).await;
+
+ match result {
+ Ok(response) => {
+ // Check for server challenge (multi-round auth).
+ // PLAIN mechanism never sends a challenge, but we handle
it
+ // for protocol correctness matching Java's
handleAuthenticateResponse.
+ if let Some(challenge) = response.challenge {
+ let challenge_req =
AuthenticateRequest::from_challenge("PLAIN", challenge);
+ connection.request(challenge_req).await?;
+ }
+ return Ok(());
+ }
+ Err(Error::FlussAPIError { ref api_error })
+ if FlussError::for_code(api_error.code)
+ == FlussError::RetriableAuthenticateException =>
+ {
+ retry_count += 1;
+ // Cap the exponent like Java's ExponentialBackoff.expMax
so that
+ // jitter still produces a range at steady state instead
of being
+ // clamped to AUTH_MAX_BACKOFF_MS.
+ let exp_max = (AUTH_MAX_BACKOFF_MS /
AUTH_INITIAL_BACKOFF_MS).log2();
+ let exp = ((retry_count as f64) - 1.0).min(exp_max);
+ let term = AUTH_INITIAL_BACKOFF_MS *
AUTH_BACKOFF_MULTIPLIER.powf(exp);
+ let jitter_factor =
+ 1.0 - AUTH_JITTER + rand::rng().random::<f64>() * (2.0
* AUTH_JITTER);
Review Comment:
The jitter calculation uses `rand::rng().random::<f64>()` which returns a
value in [0, 1), but the formula applies it as `1.0 - AUTH_JITTER + random *
(2.0 * AUTH_JITTER)`. With AUTH_JITTER = 0.2, this produces a range of [0.8,
1.2). However, the similar backoff implementation in `remote_log.rs` uses
`random_range(0.75..=1.25)` which is more explicit. For consistency and
clarity, consider using `random_range` instead of the manual calculation, or
document why this specific formula is used to match Java's implementation.
```suggestion
rand::rng().random_range(1.0 - AUTH_JITTER..1.0 +
AUTH_JITTER);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]