This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new d04d3fb  ci: parallelize cpp integration tests (#434)
d04d3fb is described below

commit d04d3fb482a0423c685a98eb5ac91234974a22dc
Author: Anton Borisov <[email protected]>
AuthorDate: Sun Mar 8 09:43:34 2026 +0000

    ci: parallelize cpp integration tests (#434)
---
 .github/workflows/build_and_test_cpp.yml |   4 +-
 bindings/cpp/CMakeLists.txt              |  15 ++++-
 bindings/cpp/test/test_main.cpp          |  10 +++
 bindings/cpp/test/test_utils.h           | 102 +++++++++++++++++++------------
 4 files changed, 90 insertions(+), 41 deletions(-)

diff --git a/.github/workflows/build_and_test_cpp.yml 
b/.github/workflows/build_and_test_cpp.yml
index 5cdd14d..1931983 100644
--- a/.github/workflows/build_and_test_cpp.yml
+++ b/.github/workflows/build_and_test_cpp.yml
@@ -70,9 +70,9 @@ jobs:
           cmake -B build -DFLUSS_ENABLE_TESTING=ON -DCMAKE_BUILD_TYPE=Debug
           cmake --build build --parallel
 
-      - name: Run C++ integration tests
+      - name: Run C++ integration tests (parallel)
         working-directory: bindings/cpp
-        run: cd build && ctest --output-on-failure --timeout 300
+        run: cd build && ctest -j$(nproc) --output-on-failure --timeout 300
         env:
           RUST_LOG: DEBUG
           RUST_BACKTRACE: full
diff --git a/bindings/cpp/CMakeLists.txt b/bindings/cpp/CMakeLists.txt
index 0cedf68..ac93611 100644
--- a/bindings/cpp/CMakeLists.txt
+++ b/bindings/cpp/CMakeLists.txt
@@ -256,6 +256,7 @@ if (FLUSS_ENABLE_TESTING)
     FetchContent_MakeAvailable(googletest)
 
     enable_testing()
+    include(GoogleTest)
 
     file(GLOB TEST_SOURCE_FILES "test/*.cpp")
     add_executable(fluss_cpp_test ${TEST_SOURCE_FILES})
@@ -267,5 +268,17 @@ if (FLUSS_ENABLE_TESTING)
         ${PROJECT_SOURCE_DIR}/test
     )
 
-    add_test(NAME fluss_cpp_integration_tests COMMAND fluss_cpp_test)
+    # Individual tests for parallel execution via ctest -j.
+    gtest_discover_tests(fluss_cpp_test
+        PROPERTIES
+            TIMEOUT 120
+            FIXTURES_REQUIRED fluss_cluster
+    )
+
+    # Cleanup: stop Docker containers after all tests finish.
+    # Mirrors Python's pytest_unconfigure and Rust's atexit cleanup.
+    add_test(NAME fluss_cluster_cleanup COMMAND fluss_cpp_test --cleanup)
+    set_tests_properties(fluss_cluster_cleanup PROPERTIES
+        FIXTURES_CLEANUP fluss_cluster
+    )
 endif()
diff --git a/bindings/cpp/test/test_main.cpp b/bindings/cpp/test/test_main.cpp
index 8c2e2d9..7b132d2 100644
--- a/bindings/cpp/test/test_main.cpp
+++ b/bindings/cpp/test/test_main.cpp
@@ -22,6 +22,16 @@
 #include "test_utils.h"
 
 int main(int argc, char** argv) {
+    // --cleanup: stop Docker containers and exit (used by ctest 
FIXTURES_CLEANUP).
+    for (int i = 1; i < argc; ++i) {
+        if (std::string(argv[i]) == "--cleanup") {
+            const char* env = std::getenv("FLUSS_BOOTSTRAP_SERVERS");
+            if (env && std::strlen(env) > 0) return 0;
+            fluss_test::FlussTestCluster::StopAll();
+            return 0;
+        }
+    }
+
     ::testing::InitGoogleTest(&argc, argv);
 
     // Register the global test environment (manages the Fluss cluster 
lifecycle).
diff --git a/bindings/cpp/test/test_utils.h b/bindings/cpp/test/test_utils.h
index 05c32cf..f5b4971 100644
--- a/bindings/cpp/test/test_utils.h
+++ b/bindings/cpp/test/test_utils.h
@@ -126,29 +126,42 @@ class FlussTestCluster {
         const char* env_servers = std::getenv("FLUSS_BOOTSTRAP_SERVERS");
         if (env_servers && std::strlen(env_servers) > 0) {
             bootstrap_servers_ = env_servers;
+            const char* env_sasl = std::getenv("FLUSS_SASL_BOOTSTRAP_SERVERS");
+            if (env_sasl && std::strlen(env_sasl) > 0) {
+                sasl_bootstrap_servers_ = env_sasl;
+            }
             external_cluster_ = true;
             std::cout << "Using external cluster: " << bootstrap_servers_ << 
std::endl;
             return true;
         }
 
+        // Reuse cluster started by another parallel test process or previous 
run.
+        if (WaitForPort("127.0.0.1", kPlainClientPort, /*timeout_seconds=*/1)) 
{
+            SetBootstrapServers();
+            external_cluster_ = true;
+            return true;
+        }
+
         std::cout << "Starting Fluss cluster via Docker..." << std::endl;
 
-        // Create network
+        // Remove stopped (not running) containers from previous runs.
+        RunCommand(std::string("docker rm ") + kTabletServerName + " 
2>/dev/null || true");
+        RunCommand(std::string("docker rm ") + kCoordinatorName + " 
2>/dev/null || true");
+        RunCommand(std::string("docker rm ") + kZookeeperName + " 2>/dev/null 
|| true");
+        RunCommand(std::string("docker network rm ") + kNetworkName + " 
2>/dev/null || true");
+
         RunCommand(std::string("docker network create ") + kNetworkName + " 
2>/dev/null || true");
 
-        // Start ZooKeeper
         std::string zk_cmd = std::string("docker run -d --rm") + " --name " + 
kZookeeperName +
                              " --network " + kNetworkName + " zookeeper:3.9.2";
         if (RunCommand(zk_cmd) != 0) {
-            std::cerr << "Failed to start ZooKeeper" << std::endl;
-            return false;
+            return WaitForCluster();
         }
 
-        // Wait for ZooKeeper to be ready before starting Fluss servers
+        // Wait for ZooKeeper to be ready
         std::this_thread::sleep_for(std::chrono::seconds(5));
 
-        // Start Coordinator Server (dual listeners: CLIENT=SASL on 9123, 
PLAIN_CLIENT=plaintext on
-        // 9223)
+        // Coordinator Server (dual listeners: SASL on 9123, plaintext on 9223)
         std::string sasl_jaas =
             "org.apache.fluss.security.auth.sasl.plain.PlainLoginModule 
required"
             " user_admin=\"admin-secret\" user_alice=\"alice-secret\";";
@@ -171,19 +184,15 @@ class FlussTestCluster {
         std::string coord_cmd = DockerRunCmd(kCoordinatorName, coord_props,
                                              {"9123:9123", "9223:9223"}, 
"coordinatorServer");
         if (RunCommand(coord_cmd) != 0) {
-            std::cerr << "Failed to start Coordinator Server" << std::endl;
-            Stop();
-            return false;
+            return WaitForCluster();
         }
 
-        // Wait for coordinator to be ready
         if (!WaitForPort("127.0.0.1", kCoordinatorPort)) {
             std::cerr << "Coordinator Server did not become ready" << 
std::endl;
-            Stop();
             return false;
         }
 
-        // Start Tablet Server (dual listeners: CLIENT=SASL on 9123, 
PLAIN_CLIENT=plaintext on 9223)
+        // Tablet Server (dual listeners: SASL on 9124, plaintext on 9224)
         std::string ts = std::string(kTabletServerName);
         std::string ts_props = JoinProps({
             "zookeeper.address: " + zk + ":2181",
@@ -205,43 +214,33 @@ class FlussTestCluster {
                                            
std::to_string(kPlainClientTabletPort) + ":9223"},
                                           "tabletServer");
         if (RunCommand(ts_cmd) != 0) {
-            std::cerr << "Failed to start Tablet Server" << std::endl;
-            Stop();
-            return false;
+            return WaitForCluster();
         }
 
-        // Wait for tablet server to be ready
-        if (!WaitForPort("127.0.0.1", kTabletServerPort)) {
-            std::cerr << "Tablet Server did not become ready" << std::endl;
-            Stop();
-            return false;
-        }
-
-        // Wait for plaintext listeners
-        if (!WaitForPort("127.0.0.1", kPlainClientPort)) {
-            std::cerr << "Coordinator plaintext listener did not become ready" 
<< std::endl;
-            Stop();
-            return false;
-        }
-        if (!WaitForPort("127.0.0.1", kPlainClientTabletPort)) {
-            std::cerr << "Tablet Server plaintext listener did not become 
ready" << std::endl;
-            Stop();
+        if (!WaitForPort("127.0.0.1", kTabletServerPort) ||
+            !WaitForPort("127.0.0.1", kPlainClientPort) ||
+            !WaitForPort("127.0.0.1", kPlainClientTabletPort)) {
+            std::cerr << "Cluster listeners did not become ready" << std::endl;
             return false;
         }
 
-        bootstrap_servers_ = "127.0.0.1:" + std::to_string(kPlainClientPort);
-        sasl_bootstrap_servers_ = "127.0.0.1:" + 
std::to_string(kCoordinatorPort);
+        SetBootstrapServers();
         std::cout << "Fluss cluster started successfully." << std::endl;
         return true;
     }
 
     void Stop() {
         if (external_cluster_) return;
+        StopAll();
+    }
 
+    /// Unconditionally stop and remove all cluster containers and the network.
+    /// Used by the --cleanup flag from ctest FIXTURES_CLEANUP.
+    static void StopAll() {
         std::cout << "Stopping Fluss cluster..." << std::endl;
-        RunCommand(std::string("docker stop ") + kTabletServerName + " 
2>/dev/null || true");
-        RunCommand(std::string("docker stop ") + kCoordinatorName + " 
2>/dev/null || true");
-        RunCommand(std::string("docker stop ") + kZookeeperName + " 
2>/dev/null || true");
+        RunCommand(std::string("docker rm -f ") + kTabletServerName + " 
2>/dev/null || true");
+        RunCommand(std::string("docker rm -f ") + kCoordinatorName + " 
2>/dev/null || true");
+        RunCommand(std::string("docker rm -f ") + kZookeeperName + " 
2>/dev/null || true");
         RunCommand(std::string("docker network rm ") + kNetworkName + " 
2>/dev/null || true");
         std::cout << "Fluss cluster stopped." << std::endl;
     }
@@ -250,6 +249,32 @@ class FlussTestCluster {
     const std::string& GetSaslBootstrapServers() const { return 
sasl_bootstrap_servers_; }
 
    private:
+    void SetBootstrapServers() {
+        bootstrap_servers_ = "127.0.0.1:" + std::to_string(kPlainClientPort);
+        sasl_bootstrap_servers_ = "127.0.0.1:" + 
std::to_string(kCoordinatorPort);
+    }
+
+    /// Wait for a cluster being started by another process.
+    /// Fails fast if no containers exist (real Docker failure vs race).
+    bool WaitForCluster() {
+        if (RunCommand(std::string("docker inspect ") + kZookeeperName + " 
>/dev/null 2>&1") != 0) {
+            std::cerr << "Failed to start cluster (docker error)" << std::endl;
+            return false;
+        }
+        std::cout << "Waiting for cluster started by another process..." << 
std::endl;
+        if (!WaitForPort("127.0.0.1", kPlainClientPort) ||
+            !WaitForPort("127.0.0.1", kPlainClientTabletPort) ||
+            !WaitForPort("127.0.0.1", kCoordinatorPort) ||
+            !WaitForPort("127.0.0.1", kTabletServerPort)) {
+            std::cerr << "Cluster did not become ready" << std::endl;
+            return false;
+        }
+        SetBootstrapServers();
+        external_cluster_ = true;
+        std::cout << "Cluster ready." << std::endl;
+        return true;
+    }
+
     std::string bootstrap_servers_;
     std::string sasl_bootstrap_servers_;
     bool external_cluster_{false};
@@ -291,7 +316,8 @@ class FlussTestEnvironment : public ::testing::Environment {
         GTEST_SKIP() << "Fluss cluster did not become ready within timeout.";
     }
 
-    void TearDown() override { cluster_.Stop(); }
+    // Cluster stays alive for parallel processes and subsequent runs.
+    void TearDown() override {}
 
     fluss::Connection& GetConnection() { return connection_; }
     fluss::Admin& GetAdmin() { return admin_; }

Reply via email to