This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new d04d3fb ci: parallelize cpp integration tests (#434)
d04d3fb is described below
commit d04d3fb482a0423c685a98eb5ac91234974a22dc
Author: Anton Borisov <[email protected]>
AuthorDate: Sun Mar 8 09:43:34 2026 +0000
ci: parallelize cpp integration tests (#434)
---
.github/workflows/build_and_test_cpp.yml | 4 +-
bindings/cpp/CMakeLists.txt | 15 ++++-
bindings/cpp/test/test_main.cpp | 10 +++
bindings/cpp/test/test_utils.h | 102 +++++++++++++++++++------------
4 files changed, 90 insertions(+), 41 deletions(-)
diff --git a/.github/workflows/build_and_test_cpp.yml
b/.github/workflows/build_and_test_cpp.yml
index 5cdd14d..1931983 100644
--- a/.github/workflows/build_and_test_cpp.yml
+++ b/.github/workflows/build_and_test_cpp.yml
@@ -70,9 +70,9 @@ jobs:
cmake -B build -DFLUSS_ENABLE_TESTING=ON -DCMAKE_BUILD_TYPE=Debug
cmake --build build --parallel
- - name: Run C++ integration tests
+ - name: Run C++ integration tests (parallel)
working-directory: bindings/cpp
- run: cd build && ctest --output-on-failure --timeout 300
+ run: cd build && ctest -j$(nproc) --output-on-failure --timeout 300
env:
RUST_LOG: DEBUG
RUST_BACKTRACE: full
diff --git a/bindings/cpp/CMakeLists.txt b/bindings/cpp/CMakeLists.txt
index 0cedf68..ac93611 100644
--- a/bindings/cpp/CMakeLists.txt
+++ b/bindings/cpp/CMakeLists.txt
@@ -256,6 +256,7 @@ if (FLUSS_ENABLE_TESTING)
FetchContent_MakeAvailable(googletest)
enable_testing()
+ include(GoogleTest)
file(GLOB TEST_SOURCE_FILES "test/*.cpp")
add_executable(fluss_cpp_test ${TEST_SOURCE_FILES})
@@ -267,5 +268,17 @@ if (FLUSS_ENABLE_TESTING)
${PROJECT_SOURCE_DIR}/test
)
- add_test(NAME fluss_cpp_integration_tests COMMAND fluss_cpp_test)
+ # Individual tests for parallel execution via ctest -j.
+ gtest_discover_tests(fluss_cpp_test
+ PROPERTIES
+ TIMEOUT 120
+ FIXTURES_REQUIRED fluss_cluster
+ )
+
+ # Cleanup: stop Docker containers after all tests finish.
+ # Mirrors Python's pytest_unconfigure and Rust's atexit cleanup.
+ add_test(NAME fluss_cluster_cleanup COMMAND fluss_cpp_test --cleanup)
+ set_tests_properties(fluss_cluster_cleanup PROPERTIES
+ FIXTURES_CLEANUP fluss_cluster
+ )
endif()
diff --git a/bindings/cpp/test/test_main.cpp b/bindings/cpp/test/test_main.cpp
index 8c2e2d9..7b132d2 100644
--- a/bindings/cpp/test/test_main.cpp
+++ b/bindings/cpp/test/test_main.cpp
@@ -22,6 +22,16 @@
#include "test_utils.h"
int main(int argc, char** argv) {
+ // --cleanup: stop Docker containers and exit (used by ctest
FIXTURES_CLEANUP).
+ for (int i = 1; i < argc; ++i) {
+ if (std::string(argv[i]) == "--cleanup") {
+ const char* env = std::getenv("FLUSS_BOOTSTRAP_SERVERS");
+ if (env && std::strlen(env) > 0) return 0;
+ fluss_test::FlussTestCluster::StopAll();
+ return 0;
+ }
+ }
+
::testing::InitGoogleTest(&argc, argv);
// Register the global test environment (manages the Fluss cluster
lifecycle).
diff --git a/bindings/cpp/test/test_utils.h b/bindings/cpp/test/test_utils.h
index 05c32cf..f5b4971 100644
--- a/bindings/cpp/test/test_utils.h
+++ b/bindings/cpp/test/test_utils.h
@@ -126,29 +126,42 @@ class FlussTestCluster {
const char* env_servers = std::getenv("FLUSS_BOOTSTRAP_SERVERS");
if (env_servers && std::strlen(env_servers) > 0) {
bootstrap_servers_ = env_servers;
+ const char* env_sasl = std::getenv("FLUSS_SASL_BOOTSTRAP_SERVERS");
+ if (env_sasl && std::strlen(env_sasl) > 0) {
+ sasl_bootstrap_servers_ = env_sasl;
+ }
external_cluster_ = true;
std::cout << "Using external cluster: " << bootstrap_servers_ <<
std::endl;
return true;
}
+ // Reuse cluster started by another parallel test process or previous
run.
+ if (WaitForPort("127.0.0.1", kPlainClientPort, /*timeout_seconds=*/1))
{
+ SetBootstrapServers();
+ external_cluster_ = true;
+ return true;
+ }
+
std::cout << "Starting Fluss cluster via Docker..." << std::endl;
- // Create network
+ // Remove stopped (not running) containers from previous runs.
+ RunCommand(std::string("docker rm ") + kTabletServerName + "
2>/dev/null || true");
+ RunCommand(std::string("docker rm ") + kCoordinatorName + "
2>/dev/null || true");
+ RunCommand(std::string("docker rm ") + kZookeeperName + " 2>/dev/null
|| true");
+ RunCommand(std::string("docker network rm ") + kNetworkName + "
2>/dev/null || true");
+
RunCommand(std::string("docker network create ") + kNetworkName + "
2>/dev/null || true");
- // Start ZooKeeper
std::string zk_cmd = std::string("docker run -d --rm") + " --name " +
kZookeeperName +
" --network " + kNetworkName + " zookeeper:3.9.2";
if (RunCommand(zk_cmd) != 0) {
- std::cerr << "Failed to start ZooKeeper" << std::endl;
- return false;
+ return WaitForCluster();
}
- // Wait for ZooKeeper to be ready before starting Fluss servers
+ // Wait for ZooKeeper to be ready
std::this_thread::sleep_for(std::chrono::seconds(5));
- // Start Coordinator Server (dual listeners: CLIENT=SASL on 9123,
PLAIN_CLIENT=plaintext on
- // 9223)
+ // Coordinator Server (dual listeners: SASL on 9123, plaintext on 9223)
std::string sasl_jaas =
"org.apache.fluss.security.auth.sasl.plain.PlainLoginModule
required"
" user_admin=\"admin-secret\" user_alice=\"alice-secret\";";
@@ -171,19 +184,15 @@ class FlussTestCluster {
std::string coord_cmd = DockerRunCmd(kCoordinatorName, coord_props,
{"9123:9123", "9223:9223"},
"coordinatorServer");
if (RunCommand(coord_cmd) != 0) {
- std::cerr << "Failed to start Coordinator Server" << std::endl;
- Stop();
- return false;
+ return WaitForCluster();
}
- // Wait for coordinator to be ready
if (!WaitForPort("127.0.0.1", kCoordinatorPort)) {
std::cerr << "Coordinator Server did not become ready" <<
std::endl;
- Stop();
return false;
}
- // Start Tablet Server (dual listeners: CLIENT=SASL on 9123,
PLAIN_CLIENT=plaintext on 9223)
+ // Tablet Server (dual listeners: SASL on 9124, plaintext on 9224)
std::string ts = std::string(kTabletServerName);
std::string ts_props = JoinProps({
"zookeeper.address: " + zk + ":2181",
@@ -205,43 +214,33 @@ class FlussTestCluster {
std::to_string(kPlainClientTabletPort) + ":9223"},
"tabletServer");
if (RunCommand(ts_cmd) != 0) {
- std::cerr << "Failed to start Tablet Server" << std::endl;
- Stop();
- return false;
+ return WaitForCluster();
}
- // Wait for tablet server to be ready
- if (!WaitForPort("127.0.0.1", kTabletServerPort)) {
- std::cerr << "Tablet Server did not become ready" << std::endl;
- Stop();
- return false;
- }
-
- // Wait for plaintext listeners
- if (!WaitForPort("127.0.0.1", kPlainClientPort)) {
- std::cerr << "Coordinator plaintext listener did not become ready"
<< std::endl;
- Stop();
- return false;
- }
- if (!WaitForPort("127.0.0.1", kPlainClientTabletPort)) {
- std::cerr << "Tablet Server plaintext listener did not become
ready" << std::endl;
- Stop();
+ if (!WaitForPort("127.0.0.1", kTabletServerPort) ||
+ !WaitForPort("127.0.0.1", kPlainClientPort) ||
+ !WaitForPort("127.0.0.1", kPlainClientTabletPort)) {
+ std::cerr << "Cluster listeners did not become ready" << std::endl;
return false;
}
- bootstrap_servers_ = "127.0.0.1:" + std::to_string(kPlainClientPort);
- sasl_bootstrap_servers_ = "127.0.0.1:" +
std::to_string(kCoordinatorPort);
+ SetBootstrapServers();
std::cout << "Fluss cluster started successfully." << std::endl;
return true;
}
void Stop() {
if (external_cluster_) return;
+ StopAll();
+ }
+ /// Unconditionally stop and remove all cluster containers and the network.
+ /// Used by the --cleanup flag from ctest FIXTURES_CLEANUP.
+ static void StopAll() {
std::cout << "Stopping Fluss cluster..." << std::endl;
- RunCommand(std::string("docker stop ") + kTabletServerName + "
2>/dev/null || true");
- RunCommand(std::string("docker stop ") + kCoordinatorName + "
2>/dev/null || true");
- RunCommand(std::string("docker stop ") + kZookeeperName + "
2>/dev/null || true");
+ RunCommand(std::string("docker rm -f ") + kTabletServerName + "
2>/dev/null || true");
+ RunCommand(std::string("docker rm -f ") + kCoordinatorName + "
2>/dev/null || true");
+ RunCommand(std::string("docker rm -f ") + kZookeeperName + "
2>/dev/null || true");
RunCommand(std::string("docker network rm ") + kNetworkName + "
2>/dev/null || true");
std::cout << "Fluss cluster stopped." << std::endl;
}
@@ -250,6 +249,32 @@ class FlussTestCluster {
const std::string& GetSaslBootstrapServers() const { return
sasl_bootstrap_servers_; }
private:
+ void SetBootstrapServers() {
+ bootstrap_servers_ = "127.0.0.1:" + std::to_string(kPlainClientPort);
+ sasl_bootstrap_servers_ = "127.0.0.1:" +
std::to_string(kCoordinatorPort);
+ }
+
+ /// Wait for a cluster being started by another process.
+ /// Fails fast if no containers exist (real Docker failure vs race).
+ bool WaitForCluster() {
+ if (RunCommand(std::string("docker inspect ") + kZookeeperName + "
>/dev/null 2>&1") != 0) {
+ std::cerr << "Failed to start cluster (docker error)" << std::endl;
+ return false;
+ }
+ std::cout << "Waiting for cluster started by another process..." <<
std::endl;
+ if (!WaitForPort("127.0.0.1", kPlainClientPort) ||
+ !WaitForPort("127.0.0.1", kPlainClientTabletPort) ||
+ !WaitForPort("127.0.0.1", kCoordinatorPort) ||
+ !WaitForPort("127.0.0.1", kTabletServerPort)) {
+ std::cerr << "Cluster did not become ready" << std::endl;
+ return false;
+ }
+ SetBootstrapServers();
+ external_cluster_ = true;
+ std::cout << "Cluster ready." << std::endl;
+ return true;
+ }
+
std::string bootstrap_servers_;
std::string sasl_bootstrap_servers_;
bool external_cluster_{false};
@@ -291,7 +316,8 @@ class FlussTestEnvironment : public ::testing::Environment {
GTEST_SKIP() << "Fluss cluster did not become ready within timeout.";
}
- void TearDown() override { cluster_.Stop(); }
+ // Cluster stays alive for parallel processes and subsequent runs.
+ void TearDown() override {}
fluss::Connection& GetConnection() { return connection_; }
fluss::Admin& GetAdmin() { return admin_; }