This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 1dd1d2b [chore] Introduce IT infra and add IT for database operations
in admin (#28)
1dd1d2b is described below
commit 1dd1d2b1e42edda7ea953a2bc5f1b182a6498fdf
Author: yuxia Luo <[email protected]>
AuthorDate: Wed Oct 15 18:02:18 2025 +0800
[chore] Introduce IT infra and add IT for database operations in admin (#28)
---
.github/workflows/ci.yml | 6 +-
crates/fluss/Cargo.toml | 4 +
crates/fluss/src/client/admin.rs | 5 +-
crates/fluss/src/metadata/database.rs | 19 ++-
crates/fluss/src/metadata/table.rs | 4 +-
crates/fluss/tests/integration/admin.rs | 131 ++++++++++++++++
crates/fluss/tests/integration/client/mod.rs | 21 ---
crates/fluss/tests/integration/fluss_cluster.rs | 192 ++++++++++++++++++++++++
crates/fluss/tests/test_fluss.rs | 4 +-
9 files changed, 347 insertions(+), 39 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 2661629..73e2b3f 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -88,7 +88,11 @@ jobs:
RUST_LOG: DEBUG
RUST_BACKTRACE: full
- name: Integration Test
- run: cargo test --features integration_tests --all-targets --workspace
+ # only run IT in linux since no docker in macos by default
+ run: |
+ if [ "$RUNNER_OS" == "Linux" ]; then
+ cargo test --features integration_tests --all-targets --workspace
+ fi
env:
RUST_LOG: DEBUG
RUST_BACKTRACE: full
\ No newline at end of file
diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index cc26014..a728bd7 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -46,6 +46,10 @@ parse-display = "0.10"
ref-cast = "1.0"
chrono = { version = "0.4", features = ["clock", "std", "wasmbind"] }
+[dev-dependencies]
+testcontainers = "0.25.0"
+once_cell = "1.19"
+test-env-helpers = "0.2.2"
[features]
integration_tests = []
diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs
index 2584034..fd0f316 100644
--- a/crates/fluss/src/client/admin.rs
+++ b/crates/fluss/src/client/admin.rs
@@ -150,7 +150,7 @@ impl FlussAdmin {
database_name: &str,
ignore_if_not_exists: bool,
cascade: bool,
- ) -> Result<()> {
+ ) {
let _response = self
.admin_gateway
.request(DropDatabaseRequest::new(
@@ -158,8 +158,7 @@ impl FlussAdmin {
ignore_if_not_exists,
cascade,
))
- .await?;
- Ok(())
+ .await;
}
/// List all databases
diff --git a/crates/fluss/src/metadata/database.rs
b/crates/fluss/src/metadata/database.rs
index 2649421..8eaa4d3 100644
--- a/crates/fluss/src/metadata/database.rs
+++ b/crates/fluss/src/metadata/database.rs
@@ -22,7 +22,7 @@ use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use std::collections::HashMap;
-#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DatabaseDescriptor {
comment: Option<String>,
custom_properties: HashMap<String, String>,
@@ -105,11 +105,11 @@ impl DatabaseDescriptorBuilder {
self
}
- pub fn build(self) -> Result<DatabaseDescriptor> {
- Ok(DatabaseDescriptor {
+ pub fn build(self) -> DatabaseDescriptor {
+ DatabaseDescriptor {
comment: self.comment,
custom_properties: self.custom_properties,
- })
+ }
}
}
@@ -179,7 +179,7 @@ impl JsonSerde for DatabaseDescriptor {
};
builder = builder.custom_properties(custom_properties);
- builder.build()
+ Ok(builder.build())
}
}
@@ -187,7 +187,7 @@ impl DatabaseDescriptor {
/// Create DatabaseDescriptor from JSON bytes (equivalent to Java's
fromJsonBytes)
pub fn from_json_bytes(bytes: &[u8]) -> Result<Self> {
let json_value: Value = serde_json::from_slice(bytes)
- .map_err(|e| JsonSerdeError(format!("Failed to parse JSON: {}",
e)))?;
+ .map_err(|e| JsonSerdeError(format!("Failed to parse JSON:
{e}")))?;
Self::deserialize_json(&json_value)
}
@@ -195,7 +195,7 @@ impl DatabaseDescriptor {
pub fn to_json_bytes(&self) -> Result<Vec<u8>> {
let json_value = self.serialize_json()?;
serde_json::to_vec(&json_value)
- .map_err(|e| JsonSerdeError(format!("Failed to serialize to JSON:
{}", e)))
+ .map_err(|e| JsonSerdeError(format!("Failed to serialize to JSON:
{e}")))
}
}
@@ -212,8 +212,7 @@ mod tests {
let descriptor = DatabaseDescriptor::builder()
.comment("Test database")
.custom_properties(custom_props)
- .build()
- .unwrap();
+ .build();
// Test serialization
let json_bytes = descriptor.to_json_bytes().unwrap();
@@ -226,7 +225,7 @@ mod tests {
#[test]
fn test_empty_database_descriptor() {
- let descriptor = DatabaseDescriptor::builder().build().unwrap();
+ let descriptor = DatabaseDescriptor::builder().build();
let json_bytes = descriptor.to_json_bytes().unwrap();
let deserialized =
DatabaseDescriptor::from_json_bytes(&json_bytes).unwrap();
assert_eq!(descriptor, deserialized);
diff --git a/crates/fluss/src/metadata/table.rs
b/crates/fluss/src/metadata/table.rs
index 90e3573..2b48ec6 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -589,7 +589,7 @@ impl LogFormat {
match s.to_uppercase().as_str() {
"ARROW" => Ok(LogFormat::ARROW),
"INDEXED" => Ok(LogFormat::INDEXED),
- _ => Err(InvalidTableError(format!("Unknown log format: {}", s))),
+ _ => Err(InvalidTableError(format!("Unknown log format: {s}"))),
}
}
}
@@ -615,7 +615,7 @@ impl KvFormat {
match s.to_uppercase().as_str() {
"INDEXED" => Ok(KvFormat::INDEXED),
"COMPACTED" => Ok(KvFormat::COMPACTED),
- _ => Err(InvalidTableError(format!("Unknown kv format: {}", s))),
+ _ => Err(InvalidTableError(format!("Unknown kv format: {s}"))),
}
}
}
diff --git a/crates/fluss/tests/integration/admin.rs
b/crates/fluss/tests/integration/admin.rs
new file mode 100644
index 0000000..73f52db
--- /dev/null
+++ b/crates/fluss/tests/integration/admin.rs
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::integration::fluss_cluster::FlussTestingCluster;
+use once_cell::sync::Lazy;
+use parking_lot::RwLock;
+use std::sync::Arc;
+
+#[cfg(test)]
+use test_env_helpers::*;
+
+// Module-level shared cluster instance (only for this test file)
+static SHARED_FLUSS_CLUSTER: Lazy<Arc<RwLock<Option<FlussTestingCluster>>>> =
+ Lazy::new(|| Arc::new(RwLock::new(None)));
+
+#[cfg(test)]
+#[before_all]
+#[after_all]
+mod admin_test {
+ use super::SHARED_FLUSS_CLUSTER;
+ use crate::integration::fluss_cluster::{FlussTestingCluster,
FlussTestingClusterBuilder};
+ use fluss::metadata::DatabaseDescriptorBuilder;
+ use std::sync::Arc;
+
+ fn before_all() {
+ // Create a new tokio runtime in a separate thread
+ let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
+ std::thread::spawn(move || {
+ let rt = tokio::runtime::Runtime::new().expect("Failed to create
runtime");
+ rt.block_on(async {
+ let cluster = FlussTestingClusterBuilder::new().build().await;
+ let mut guard = cluster_guard.write();
+ *guard = Some(cluster);
+ });
+ })
+ .join()
+ .expect("Failed to create cluster");
+ }
+
+ fn get_fluss_cluster() -> Arc<FlussTestingCluster> {
+ let cluster_guard = SHARED_FLUSS_CLUSTER.read();
+ if cluster_guard.is_none() {
+ panic!("Fluss cluster not initialized. Make sure before_all() was
called.");
+ }
+ Arc::new(cluster_guard.as_ref().unwrap().clone())
+ }
+
+ fn after_all() {
+ // Create a new tokio runtime in a separate thread
+ let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
+ std::thread::spawn(move || {
+ let rt = tokio::runtime::Runtime::new().expect("Failed to create
runtime");
+ rt.block_on(async {
+ let mut guard = cluster_guard.write();
+ if let Some(cluster) = guard.take() {
+ cluster.stop().await;
+ }
+ });
+ })
+ .join()
+ .expect("Failed to cleanup cluster");
+ }
+
+ #[tokio::test]
+ async fn test_create_database() {
+ let cluster = get_fluss_cluster();
+ let connection = cluster.get_fluss_connection().await;
+
+ let admin = connection.get_admin().await.expect("should get admin");
+
+ let db_descriptor = DatabaseDescriptorBuilder::default()
+ .comment("test_db")
+ .custom_properties(
+ [
+ ("k1".to_string(), "v1".to_string()),
+ ("k2".to_string(), "v2".to_string()),
+ ]
+ .into(),
+ )
+ .build();
+
+ let db_name = "test_create_database";
+
+ assert_eq!(admin.database_exists(db_name).await.unwrap(), false);
+
+ // create database
+ admin
+ .create_database(db_name, false, Some(&db_descriptor))
+ .await
+ .expect("should create database");
+
+ // database should exist
+ assert_eq!(admin.database_exists(db_name).await.unwrap(), true);
+
+ // get database
+ let db_info = admin
+ .get_database_info(db_name)
+ .await
+ .expect("should get database info");
+
+ assert_eq!(db_info.database_name(), db_name);
+ assert_eq!(db_info.database_descriptor(), &db_descriptor);
+
+ // drop database
+ admin.drop_database(db_name, false, true).await;
+
+ // database shouldn't exist now
+ assert_eq!(admin.database_exists(db_name).await.unwrap(), false);
+
+ // Note: We don't stop the shared cluster here as it's used by other
tests
+ }
+
+ #[tokio::test]
+ async fn test_create_table() {
+ // todo
+ }
+}
diff --git a/crates/fluss/tests/integration/client/mod.rs
b/crates/fluss/tests/integration/client/mod.rs
deleted file mode 100644
index 567c358..0000000
--- a/crates/fluss/tests/integration/client/mod.rs
+++ /dev/null
@@ -1,21 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#[test]
-fn test() {
- println!("Running integration tests");
-}
diff --git a/crates/fluss/tests/integration/fluss_cluster.rs
b/crates/fluss/tests/integration/fluss_cluster.rs
new file mode 100644
index 0000000..83a4795
--- /dev/null
+++ b/crates/fluss/tests/integration/fluss_cluster.rs
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use fluss::client::FlussConnection;
+use fluss::config::Config;
+use std::collections::HashMap;
+use std::string::ToString;
+use std::sync::Arc;
+use std::time::Duration;
+use testcontainers::core::ContainerPort;
+use testcontainers::runners::AsyncRunner;
+use testcontainers::{ContainerAsync, GenericImage, ImageExt};
+
+const FLUSS_VERSION: &str = "0.7.0";
+
+pub struct FlussTestingClusterBuilder {
+ number_of_tablet_servers: usize,
+ network: &'static str,
+ cluster_conf: HashMap<String, String>,
+}
+
+impl FlussTestingClusterBuilder {
+ pub fn new() -> Self {
+ // reduce testing resources
+ let mut cluster_conf = HashMap::new();
+ cluster_conf.insert(
+ "netty.server.num-network-threads".to_string(),
+ "1".to_string(),
+ );
+ cluster_conf.insert(
+ "netty.server.num-worker-threads".to_string(),
+ "3".to_string(),
+ );
+
+ FlussTestingClusterBuilder {
+ number_of_tablet_servers: 1,
+ cluster_conf,
+ network: "fluss-cluster-network",
+ }
+ }
+
+ pub async fn build(&mut self) -> FlussTestingCluster {
+ let zookeeper = Arc::new(
+ GenericImage::new("zookeeper", "3.9.2")
+ .with_network(self.network)
+ .with_container_name("zookeeper")
+ .start()
+ .await
+ .unwrap(),
+ );
+
+ let coordinator_server =
Arc::new(self.start_coordinator_server().await);
+
+ let mut tablet_servers = HashMap::new();
+ for server_id in 0..self.number_of_tablet_servers {
+ tablet_servers.insert(
+ server_id,
+ Arc::new(self.start_tablet_server(server_id).await),
+ );
+ }
+
+ FlussTestingCluster {
+ zookeeper,
+ coordinator_server,
+ tablet_servers,
+ bootstrap_servers: "127.0.0.1:9123".to_string(),
+ }
+ }
+
+ async fn start_coordinator_server(&mut self) ->
ContainerAsync<GenericImage> {
+ let mut coordinator_confs = HashMap::new();
+ coordinator_confs.insert("zookeeper.address", "zookeeper:2181");
+ coordinator_confs.insert(
+ "bind.listeners",
+ "INTERNAL://coordinator-server:0,
CLIENT://coordinator-server:9123",
+ );
+ coordinator_confs.insert("advertised.listeners",
"CLIENT://localhost:9123");
+ coordinator_confs.insert("internal.listener.name", "INTERNAL");
+ GenericImage::new("fluss/fluss", FLUSS_VERSION)
+ .with_container_name("coordinator-server")
+ .with_mapped_port(9123, ContainerPort::Tcp(9123))
+ .with_network(self.network)
+ .with_cmd(vec!["coordinatorServer"])
+ .with_env_var(
+ "FLUSS_PROPERTIES",
+ self.to_fluss_properties_with(coordinator_confs),
+ )
+ .start()
+ .await
+ .unwrap()
+ }
+
+ async fn start_tablet_server(&self, server_id: usize) ->
ContainerAsync<GenericImage> {
+ let mut tablet_server_confs = HashMap::new();
+ let bind_listeners = format!(
+ "INTERNAL://tablet-server-{}:0, CLIENT://tablet-server-{}:9123",
+ server_id, server_id
+ );
+ let expose_host_port = 9124 + server_id;
+ let advertised_listeners = format!("CLIENT://localhost:{}",
expose_host_port);
+ let tablet_server_id = format!("{}", server_id);
+ tablet_server_confs.insert("zookeeper.address", "zookeeper:2181");
+ tablet_server_confs.insert("bind.listeners", bind_listeners.as_str());
+ tablet_server_confs.insert("advertised.listeners",
advertised_listeners.as_str());
+ tablet_server_confs.insert("internal.listener.name", "INTERNAL");
+ tablet_server_confs.insert("tablet-server.id",
tablet_server_id.as_str());
+
+ GenericImage::new("fluss/fluss", FLUSS_VERSION)
+ .with_cmd(vec!["tabletServer"])
+ .with_mapped_port(expose_host_port as u16,
ContainerPort::Tcp(9123))
+ .with_network(self.network)
+ .with_container_name(format!("tablet-server-{}", server_id))
+ .with_env_var(
+ "FLUSS_PROPERTIES",
+ self.to_fluss_properties_with(tablet_server_confs),
+ )
+ .start()
+ .await
+ .unwrap()
+ }
+
+ fn to_fluss_properties_with(&self, extra_properties: HashMap<&str, &str>)
-> String {
+ let mut fluss_properties = Vec::new();
+ for (k, v) in self.cluster_conf.iter() {
+ fluss_properties.push(format!("{}: {}", k, v));
+ }
+ for (k, v) in extra_properties.iter() {
+ fluss_properties.push(format!("{}: {}", k, v));
+ }
+ fluss_properties.join("\n")
+ }
+}
+
+/// Provides an easy way to launch a Fluss cluster with coordinator and tablet
servers.
+#[derive(Clone)]
+pub struct FlussTestingCluster {
+ zookeeper: Arc<ContainerAsync<GenericImage>>,
+ coordinator_server: Arc<ContainerAsync<GenericImage>>,
+ tablet_servers: HashMap<usize, Arc<ContainerAsync<GenericImage>>>,
+ bootstrap_servers: String,
+}
+
+impl FlussTestingCluster {
+ pub async fn stop(&self) {
+ for tablet_server in self.tablet_servers.values() {
+ tablet_server.stop().await.unwrap()
+ }
+ self.coordinator_server.stop().await.unwrap();
+ self.zookeeper.stop().await.unwrap();
+ }
+
+ pub async fn get_fluss_connection(&self) -> FlussConnection {
+ let mut config = Config::default();
+ config.bootstrap_server = Some(self.bootstrap_servers.clone());
+
+ // Retry mechanism: retry for up to 1 minute
+ let max_retries = 60; // 60 retry attempts
+ let retry_interval = Duration::from_secs(1); // 1 second interval
between retries
+
+ for attempt in 1..=max_retries {
+ match FlussConnection::new(config.clone()).await {
+ Ok(connection) => {
+ return connection;
+ }
+ Err(e) => {
+ if attempt == max_retries {
+ panic!(
+ "Failed to connect to Fluss cluster after {}
attempts: {}",
+ max_retries, e
+ );
+ }
+ tokio::time::sleep(retry_interval).await;
+ }
+ }
+ }
+ unreachable!()
+ }
+}
diff --git a/crates/fluss/tests/test_fluss.rs b/crates/fluss/tests/test_fluss.rs
index 7840638..28b9bef 100644
--- a/crates/fluss/tests/test_fluss.rs
+++ b/crates/fluss/tests/test_fluss.rs
@@ -20,6 +20,6 @@ extern crate fluss;
#[cfg(feature = "integration_tests")]
mod integration {
-
- mod client;
+ mod admin;
+ mod fluss_cluster;
}