This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new bb6933a  ci: parallelize python integration tests (#435)
bb6933a is described below

commit bb6933a5b01e1226c8345ffaf06388322eb640a4
Author: Anton Borisov <[email protected]>
AuthorDate: Sun Mar 8 10:37:15 2026 +0000

    ci: parallelize python integration tests (#435)
---
 .github/workflows/build_and_test_python.yml |   4 +-
 .gitignore                                  |   2 +
 bindings/python/pyproject.toml              |   1 +
 bindings/python/test/conftest.py            | 218 +++++++++++++++++++++-------
 4 files changed, 170 insertions(+), 55 deletions(-)

diff --git a/.github/workflows/build_and_test_python.yml 
b/.github/workflows/build_and_test_python.yml
index efb5caa..39dfa98 100644
--- a/.github/workflows/build_and_test_python.yml
+++ b/.github/workflows/build_and_test_python.yml
@@ -73,9 +73,9 @@ jobs:
           uv sync --extra dev
           uv run maturin develop
 
-      - name: Run Python integration tests
+      - name: Run Python integration tests (parallel)
         working-directory: bindings/python
-        run: uv run pytest test/ -v
+        run: uv run pytest test/ -v -n auto
         env:
           RUST_LOG: DEBUG
           RUST_BACKTRACE: full
diff --git a/.gitignore b/.gitignore
index f251aab..5d11a1c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -25,6 +25,8 @@ __pycache__/
 *.py[cod]
 *$py.class
 *.so
+*.dylib
+*.dSYM/
 *.egg-info/
 dist/
 build/
diff --git a/bindings/python/pyproject.toml b/bindings/python/pyproject.toml
index f5b0b68..63af88e 100644
--- a/bindings/python/pyproject.toml
+++ b/bindings/python/pyproject.toml
@@ -50,6 +50,7 @@ dev = [
     "mypy>=1.17.1",
     "pytest>=8.3.5",
     "pytest-asyncio>=0.25.3",
+    "pytest-xdist>=3.5.0",
     "ruff>=0.9.10",
     "maturin>=1.8.2",
     "testcontainers>=4.0.0",
diff --git a/bindings/python/test/conftest.py b/bindings/python/test/conftest.py
index bb8d18b..0e4cfe4 100644
--- a/bindings/python/test/conftest.py
+++ b/bindings/python/test/conftest.py
@@ -20,14 +20,27 @@
 If FLUSS_BOOTSTRAP_SERVERS is set, tests connect to an existing cluster.
 Otherwise, a Fluss cluster is started automatically via testcontainers.
 
+The first pytest-xdist worker to run starts the cluster; other workers
+detect it via port check and reuse it (matching the C++ test pattern).
+Containers are cleaned up after all workers finish via pytest_unconfigure.
+
 Run with:
-    uv run maturin develop && uv run pytest test/ -v
+    uv run maturin develop && uv run pytest test/ -v -n auto
 """
 
+import asyncio
 import os
 import socket
+import subprocess
 import time
 
+# Disable testcontainers Ryuk reaper for xdist runs — it would kill
+# containers when the first worker exits, while others are still running.
+# We handle cleanup ourselves in pytest_unconfigure.
+# In single-process mode, keep Ryuk as a safety net for hard crashes.
+if "PYTEST_XDIST_WORKER" in os.environ:
+    os.environ.setdefault("TESTCONTAINERS_RYUK_DISABLED", "true")
+
 import pytest
 import pytest_asyncio
 
@@ -37,6 +50,20 @@ FLUSS_IMAGE = "apache/fluss"
 FLUSS_VERSION = "0.9.0-incubating"
 BOOTSTRAP_SERVERS_ENV = os.environ.get("FLUSS_BOOTSTRAP_SERVERS")
 
+# Container / network names
+NETWORK_NAME = "fluss-python-test-network"
+ZOOKEEPER_NAME = "zookeeper-python-test"
+COORDINATOR_NAME = "coordinator-server-python-test"
+TABLET_SERVER_NAME = "tablet-server-python-test"
+
+# Fixed host ports (must match across workers)
+COORDINATOR_PORT = 9123
+TABLET_SERVER_PORT = 9124
+PLAIN_CLIENT_PORT = 9223
+PLAIN_CLIENT_TABLET_PORT = 9224
+
+ALL_PORTS = [COORDINATOR_PORT, TABLET_SERVER_PORT, PLAIN_CLIENT_PORT, 
PLAIN_CLIENT_TABLET_PORT]
+
 
 def _wait_for_port(host, port, timeout=60):
     """Wait for a TCP port to become available."""
@@ -44,40 +71,56 @@ def _wait_for_port(host, port, timeout=60):
     while time.time() - start < timeout:
         try:
             with socket.create_connection((host, port), timeout=1):
-                return
+                return True
         except (ConnectionRefusedError, TimeoutError, OSError):
             time.sleep(1)
-    raise TimeoutError(f"Port {port} on {host} not available after {timeout}s")
+    return False
 
 
[email protected](scope="session")
-def fluss_cluster():
-    """Start a Fluss cluster using testcontainers, or use an existing one."""
-    if BOOTSTRAP_SERVERS_ENV:
-        yield (BOOTSTRAP_SERVERS_ENV, BOOTSTRAP_SERVERS_ENV)
+def _all_ports_ready(timeout=60):
+    """Wait for all cluster ports to become available."""
+    deadline = time.time() + timeout
+    for port in ALL_PORTS:
+        remaining = deadline - time.time()
+        if remaining <= 0 or not _wait_for_port("localhost", port, 
timeout=remaining):
+            return False
+    return True
+
+
+def _run_cmd(cmd):
+    """Run a command (list form), return exit code."""
+    return subprocess.run(cmd, capture_output=True).returncode
+
+
+def _start_cluster():
+    """Start the Fluss Docker cluster via testcontainers.
+
+    If another worker already started the cluster (detected via port check),
+    reuse it. If container creation fails (name conflict from a racing worker),
+    wait for the other worker's cluster to become ready.
+    """
+    # Reuse cluster started by another parallel worker or previous run.
+    if _wait_for_port("localhost", PLAIN_CLIENT_PORT, timeout=1):
+        print("Reusing existing cluster via port check.")
         return
 
     from testcontainers.core.container import DockerContainer
-    from testcontainers.core.network import Network
 
-    network = Network()
-    network.create()
+    print("Starting Fluss cluster via testcontainers...")
 
-    zookeeper = (
-        DockerContainer("zookeeper:3.9.2")
-        .with_network(network)
-        .with_name("zookeeper-python-test")
-    )
+    # Create a named network via Docker CLI (idempotent, avoids orphaned
+    # random-named networks when multiple xdist workers race).
+    _run_cmd(["docker", "network", "create", NETWORK_NAME])
 
     sasl_jaas = (
         "org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required"
         ' user_admin="admin-secret" user_alice="alice-secret";'
     )
     coordinator_props = "\n".join([
-        "zookeeper.address: zookeeper-python-test:2181",
-        "bind.listeners: INTERNAL://coordinator-server-python-test:0,"
-        " CLIENT://coordinator-server-python-test:9123,"
-        " PLAIN_CLIENT://coordinator-server-python-test:9223",
+        f"zookeeper.address: {ZOOKEEPER_NAME}:2181",
+        f"bind.listeners: INTERNAL://{COORDINATOR_NAME}:0,"
+        f" CLIENT://{COORDINATOR_NAME}:9123,"
+        f" PLAIN_CLIENT://{COORDINATOR_NAME}:9223",
         "advertised.listeners: CLIENT://localhost:9123,"
         " PLAIN_CLIENT://localhost:9223",
         "internal.listener.name: INTERNAL",
@@ -87,21 +130,11 @@ def fluss_cluster():
         "netty.server.num-network-threads: 1",
         "netty.server.num-worker-threads: 3",
     ])
-    coordinator = (
-        DockerContainer(f"{FLUSS_IMAGE}:{FLUSS_VERSION}")
-        .with_network(network)
-        .with_name("coordinator-server-python-test")
-        .with_bind_ports(9123, 9123)
-        .with_bind_ports(9223, 9223)
-        .with_command("coordinatorServer")
-        .with_env("FLUSS_PROPERTIES", coordinator_props)
-    )
-
     tablet_props = "\n".join([
-        "zookeeper.address: zookeeper-python-test:2181",
-        "bind.listeners: INTERNAL://tablet-server-python-test:0,"
-        " CLIENT://tablet-server-python-test:9123,"
-        " PLAIN_CLIENT://tablet-server-python-test:9223",
+        f"zookeeper.address: {ZOOKEEPER_NAME}:2181",
+        f"bind.listeners: INTERNAL://{TABLET_SERVER_NAME}:0,"
+        f" CLIENT://{TABLET_SERVER_NAME}:9123,"
+        f" PLAIN_CLIENT://{TABLET_SERVER_NAME}:9223",
         "advertised.listeners: CLIENT://localhost:9124,"
         " PLAIN_CLIENT://localhost:9224",
         "internal.listener.name: INTERNAL",
@@ -112,42 +145,121 @@ def fluss_cluster():
         "netty.server.num-network-threads: 1",
         "netty.server.num-worker-threads: 3",
     ])
+
+    zookeeper = (
+        DockerContainer("zookeeper:3.9.2")
+        .with_kwargs(network=NETWORK_NAME)
+        .with_name(ZOOKEEPER_NAME)
+    )
+    coordinator = (
+        DockerContainer(f"{FLUSS_IMAGE}:{FLUSS_VERSION}")
+        .with_kwargs(network=NETWORK_NAME)
+        .with_name(COORDINATOR_NAME)
+        .with_bind_ports(9123, 9123)
+        .with_bind_ports(9223, 9223)
+        .with_command("coordinatorServer")
+        .with_env("FLUSS_PROPERTIES", coordinator_props)
+    )
     tablet_server = (
         DockerContainer(f"{FLUSS_IMAGE}:{FLUSS_VERSION}")
-        .with_network(network)
-        .with_name("tablet-server-python-test")
+        .with_kwargs(network=NETWORK_NAME)
+        .with_name(TABLET_SERVER_NAME)
         .with_bind_ports(9123, 9124)
         .with_bind_ports(9223, 9224)
         .with_command("tabletServer")
         .with_env("FLUSS_PROPERTIES", tablet_props)
     )
 
-    zookeeper.start()
-    coordinator.start()
-    tablet_server.start()
+    try:
+        zookeeper.start()
+        coordinator.start()
+        tablet_server.start()
+    except Exception as e:
+        # Another worker may have started containers with the same names.
+        # Wait for the cluster to become ready instead of failing.
+        print(f"Container start failed ({e}), waiting for cluster from another 
worker...")
+        if _all_ports_ready():
+            return
+        raise
 
-    _wait_for_port("localhost", 9123)
-    _wait_for_port("localhost", 9124)
-    _wait_for_port("localhost", 9223)
-    _wait_for_port("localhost", 9224)
-    # Extra wait for cluster to fully initialize
-    time.sleep(10)
+    if not _all_ports_ready():
+        raise RuntimeError("Cluster listeners did not become ready")
 
-    # (plaintext_bootstrap, sasl_bootstrap)
-    yield ("127.0.0.1:9223", "127.0.0.1:9123")
+    print("Fluss cluster started successfully.")
+
+
+def _stop_cluster():
+    """Stop and remove the Fluss Docker cluster containers."""
+    for name in [TABLET_SERVER_NAME, COORDINATOR_NAME, ZOOKEEPER_NAME]:
+        subprocess.run(["docker", "rm", "-f", name], capture_output=True)
+    subprocess.run(["docker", "network", "rm", NETWORK_NAME], 
capture_output=True)
+
+
+async def _connect_with_retry(bootstrap_servers, timeout=60):
+    """Connect to the Fluss cluster with retries until it's fully ready.
+
+    Waits until both the coordinator and at least one tablet server are
+    available, matching the Rust wait_for_cluster_ready pattern.
+    """
+    config = fluss.Config({"bootstrap.servers": bootstrap_servers})
+    start = time.time()
+    last_err = None
+    while time.time() - start < timeout:
+        conn = None
+        try:
+            conn = await fluss.FlussConnection.create(config)
+            admin = await conn.get_admin()
+            nodes = await admin.get_server_nodes()
+            if any(n.server_type == "TabletServer" for n in nodes):
+                return conn
+            last_err = RuntimeError("No TabletServer available yet")
+        except Exception as e:
+            last_err = e
+        if conn is not None:
+            conn.close()
+        await asyncio.sleep(1)
+    raise RuntimeError(
+        f"Could not connect to cluster after {timeout}s: {last_err}"
+    )
 
-    tablet_server.stop()
-    coordinator.stop()
-    zookeeper.stop()
-    network.remove()
+
+def pytest_unconfigure(config):
+    """Clean up Docker containers after all xdist workers finish.
+
+    Runs once on the controller process (or the single process when
+    not using xdist). Workers are identified by the 'workerinput' attr.
+    """
+    if BOOTSTRAP_SERVERS_ENV:
+        return
+    if hasattr(config, "workerinput"):
+        return  # This is a worker, skip
+    _stop_cluster()
+
+
[email protected](scope="session")
+def fluss_cluster():
+    """Start a Fluss cluster using testcontainers, or use an existing one."""
+    if BOOTSTRAP_SERVERS_ENV:
+        sasl_env = os.environ.get(
+            "FLUSS_SASL_BOOTSTRAP_SERVERS", BOOTSTRAP_SERVERS_ENV
+        )
+        yield (BOOTSTRAP_SERVERS_ENV, sasl_env)
+        return
+
+    _start_cluster()
+
+    # (plaintext_bootstrap, sasl_bootstrap)
+    yield (
+        f"127.0.0.1:{PLAIN_CLIENT_PORT}",
+        f"127.0.0.1:{COORDINATOR_PORT}",
+    )
 
 
 @pytest_asyncio.fixture(scope="session")
 async def connection(fluss_cluster):
     """Session-scoped connection to the Fluss cluster (plaintext)."""
     plaintext_addr, _sasl_addr = fluss_cluster
-    config = fluss.Config({"bootstrap.servers": plaintext_addr})
-    conn = await fluss.FlussConnection.create(config)
+    conn = await _connect_with_retry(plaintext_addr)
     yield conn
     conn.close()
 

Reply via email to