This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new bb6933a ci: parallelize python integration tests (#435)
bb6933a is described below
commit bb6933a5b01e1226c8345ffaf06388322eb640a4
Author: Anton Borisov <[email protected]>
AuthorDate: Sun Mar 8 10:37:15 2026 +0000
ci: parallelize python integration tests (#435)
---
.github/workflows/build_and_test_python.yml | 4 +-
.gitignore | 2 +
bindings/python/pyproject.toml | 1 +
bindings/python/test/conftest.py | 218 +++++++++++++++++++++-------
4 files changed, 170 insertions(+), 55 deletions(-)
diff --git a/.github/workflows/build_and_test_python.yml
b/.github/workflows/build_and_test_python.yml
index efb5caa..39dfa98 100644
--- a/.github/workflows/build_and_test_python.yml
+++ b/.github/workflows/build_and_test_python.yml
@@ -73,9 +73,9 @@ jobs:
uv sync --extra dev
uv run maturin develop
- - name: Run Python integration tests
+ - name: Run Python integration tests (parallel)
working-directory: bindings/python
- run: uv run pytest test/ -v
+ run: uv run pytest test/ -v -n auto
env:
RUST_LOG: DEBUG
RUST_BACKTRACE: full
diff --git a/.gitignore b/.gitignore
index f251aab..5d11a1c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -25,6 +25,8 @@ __pycache__/
*.py[cod]
*$py.class
*.so
+*.dylib
+*.dSYM/
*.egg-info/
dist/
build/
diff --git a/bindings/python/pyproject.toml b/bindings/python/pyproject.toml
index f5b0b68..63af88e 100644
--- a/bindings/python/pyproject.toml
+++ b/bindings/python/pyproject.toml
@@ -50,6 +50,7 @@ dev = [
"mypy>=1.17.1",
"pytest>=8.3.5",
"pytest-asyncio>=0.25.3",
+ "pytest-xdist>=3.5.0",
"ruff>=0.9.10",
"maturin>=1.8.2",
"testcontainers>=4.0.0",
diff --git a/bindings/python/test/conftest.py b/bindings/python/test/conftest.py
index bb8d18b..0e4cfe4 100644
--- a/bindings/python/test/conftest.py
+++ b/bindings/python/test/conftest.py
@@ -20,14 +20,27 @@
If FLUSS_BOOTSTRAP_SERVERS is set, tests connect to an existing cluster.
Otherwise, a Fluss cluster is started automatically via testcontainers.
+The first pytest-xdist worker to run starts the cluster; other workers
+detect it via port check and reuse it (matching the C++ test pattern).
+Containers are cleaned up after all workers finish via pytest_unconfigure.
+
Run with:
- uv run maturin develop && uv run pytest test/ -v
+ uv run maturin develop && uv run pytest test/ -v -n auto
"""
+import asyncio
import os
import socket
+import subprocess
import time
+# Disable testcontainers Ryuk reaper for xdist runs — it would kill
+# containers when the first worker exits, while others are still running.
+# We handle cleanup ourselves in pytest_unconfigure.
+# In single-process mode, keep Ryuk as a safety net for hard crashes.
+if "PYTEST_XDIST_WORKER" in os.environ:
+ os.environ.setdefault("TESTCONTAINERS_RYUK_DISABLED", "true")
+
import pytest
import pytest_asyncio
@@ -37,6 +50,20 @@ FLUSS_IMAGE = "apache/fluss"
FLUSS_VERSION = "0.9.0-incubating"
BOOTSTRAP_SERVERS_ENV = os.environ.get("FLUSS_BOOTSTRAP_SERVERS")
+# Container / network names
+NETWORK_NAME = "fluss-python-test-network"
+ZOOKEEPER_NAME = "zookeeper-python-test"
+COORDINATOR_NAME = "coordinator-server-python-test"
+TABLET_SERVER_NAME = "tablet-server-python-test"
+
+# Fixed host ports (must match across workers)
+COORDINATOR_PORT = 9123
+TABLET_SERVER_PORT = 9124
+PLAIN_CLIENT_PORT = 9223
+PLAIN_CLIENT_TABLET_PORT = 9224
+
+ALL_PORTS = [COORDINATOR_PORT, TABLET_SERVER_PORT, PLAIN_CLIENT_PORT,
PLAIN_CLIENT_TABLET_PORT]
+
def _wait_for_port(host, port, timeout=60):
"""Wait for a TCP port to become available."""
@@ -44,40 +71,56 @@ def _wait_for_port(host, port, timeout=60):
while time.time() - start < timeout:
try:
with socket.create_connection((host, port), timeout=1):
- return
+ return True
except (ConnectionRefusedError, TimeoutError, OSError):
time.sleep(1)
- raise TimeoutError(f"Port {port} on {host} not available after {timeout}s")
+ return False
[email protected](scope="session")
-def fluss_cluster():
- """Start a Fluss cluster using testcontainers, or use an existing one."""
- if BOOTSTRAP_SERVERS_ENV:
- yield (BOOTSTRAP_SERVERS_ENV, BOOTSTRAP_SERVERS_ENV)
+def _all_ports_ready(timeout=60):
+ """Wait for all cluster ports to become available."""
+ deadline = time.time() + timeout
+ for port in ALL_PORTS:
+ remaining = deadline - time.time()
+ if remaining <= 0 or not _wait_for_port("localhost", port,
timeout=remaining):
+ return False
+ return True
+
+
+def _run_cmd(cmd):
+ """Run a command (list form), return exit code."""
+ return subprocess.run(cmd, capture_output=True).returncode
+
+
+def _start_cluster():
+ """Start the Fluss Docker cluster via testcontainers.
+
+ If another worker already started the cluster (detected via port check),
+ reuse it. If container creation fails (name conflict from a racing worker),
+ wait for the other worker's cluster to become ready.
+ """
+ # Reuse cluster started by another parallel worker or previous run.
+ if _wait_for_port("localhost", PLAIN_CLIENT_PORT, timeout=1):
+ print("Reusing existing cluster via port check.")
return
from testcontainers.core.container import DockerContainer
- from testcontainers.core.network import Network
- network = Network()
- network.create()
+ print("Starting Fluss cluster via testcontainers...")
- zookeeper = (
- DockerContainer("zookeeper:3.9.2")
- .with_network(network)
- .with_name("zookeeper-python-test")
- )
+ # Create a named network via Docker CLI (idempotent, avoids orphaned
+ # random-named networks when multiple xdist workers race).
+ _run_cmd(["docker", "network", "create", NETWORK_NAME])
sasl_jaas = (
"org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required"
' user_admin="admin-secret" user_alice="alice-secret";'
)
coordinator_props = "\n".join([
- "zookeeper.address: zookeeper-python-test:2181",
- "bind.listeners: INTERNAL://coordinator-server-python-test:0,"
- " CLIENT://coordinator-server-python-test:9123,"
- " PLAIN_CLIENT://coordinator-server-python-test:9223",
+ f"zookeeper.address: {ZOOKEEPER_NAME}:2181",
+ f"bind.listeners: INTERNAL://{COORDINATOR_NAME}:0,"
+ f" CLIENT://{COORDINATOR_NAME}:9123,"
+ f" PLAIN_CLIENT://{COORDINATOR_NAME}:9223",
"advertised.listeners: CLIENT://localhost:9123,"
" PLAIN_CLIENT://localhost:9223",
"internal.listener.name: INTERNAL",
@@ -87,21 +130,11 @@ def fluss_cluster():
"netty.server.num-network-threads: 1",
"netty.server.num-worker-threads: 3",
])
- coordinator = (
- DockerContainer(f"{FLUSS_IMAGE}:{FLUSS_VERSION}")
- .with_network(network)
- .with_name("coordinator-server-python-test")
- .with_bind_ports(9123, 9123)
- .with_bind_ports(9223, 9223)
- .with_command("coordinatorServer")
- .with_env("FLUSS_PROPERTIES", coordinator_props)
- )
-
tablet_props = "\n".join([
- "zookeeper.address: zookeeper-python-test:2181",
- "bind.listeners: INTERNAL://tablet-server-python-test:0,"
- " CLIENT://tablet-server-python-test:9123,"
- " PLAIN_CLIENT://tablet-server-python-test:9223",
+ f"zookeeper.address: {ZOOKEEPER_NAME}:2181",
+ f"bind.listeners: INTERNAL://{TABLET_SERVER_NAME}:0,"
+ f" CLIENT://{TABLET_SERVER_NAME}:9123,"
+ f" PLAIN_CLIENT://{TABLET_SERVER_NAME}:9223",
"advertised.listeners: CLIENT://localhost:9124,"
" PLAIN_CLIENT://localhost:9224",
"internal.listener.name: INTERNAL",
@@ -112,42 +145,121 @@ def fluss_cluster():
"netty.server.num-network-threads: 1",
"netty.server.num-worker-threads: 3",
])
+
+ zookeeper = (
+ DockerContainer("zookeeper:3.9.2")
+ .with_kwargs(network=NETWORK_NAME)
+ .with_name(ZOOKEEPER_NAME)
+ )
+ coordinator = (
+ DockerContainer(f"{FLUSS_IMAGE}:{FLUSS_VERSION}")
+ .with_kwargs(network=NETWORK_NAME)
+ .with_name(COORDINATOR_NAME)
+ .with_bind_ports(9123, 9123)
+ .with_bind_ports(9223, 9223)
+ .with_command("coordinatorServer")
+ .with_env("FLUSS_PROPERTIES", coordinator_props)
+ )
tablet_server = (
DockerContainer(f"{FLUSS_IMAGE}:{FLUSS_VERSION}")
- .with_network(network)
- .with_name("tablet-server-python-test")
+ .with_kwargs(network=NETWORK_NAME)
+ .with_name(TABLET_SERVER_NAME)
.with_bind_ports(9123, 9124)
.with_bind_ports(9223, 9224)
.with_command("tabletServer")
.with_env("FLUSS_PROPERTIES", tablet_props)
)
- zookeeper.start()
- coordinator.start()
- tablet_server.start()
+ try:
+ zookeeper.start()
+ coordinator.start()
+ tablet_server.start()
+ except Exception as e:
+ # Another worker may have started containers with the same names.
+ # Wait for the cluster to become ready instead of failing.
+ print(f"Container start failed ({e}), waiting for cluster from another
worker...")
+ if _all_ports_ready():
+ return
+ raise
- _wait_for_port("localhost", 9123)
- _wait_for_port("localhost", 9124)
- _wait_for_port("localhost", 9223)
- _wait_for_port("localhost", 9224)
- # Extra wait for cluster to fully initialize
- time.sleep(10)
+ if not _all_ports_ready():
+ raise RuntimeError("Cluster listeners did not become ready")
- # (plaintext_bootstrap, sasl_bootstrap)
- yield ("127.0.0.1:9223", "127.0.0.1:9123")
+ print("Fluss cluster started successfully.")
+
+
+def _stop_cluster():
+ """Stop and remove the Fluss Docker cluster containers."""
+ for name in [TABLET_SERVER_NAME, COORDINATOR_NAME, ZOOKEEPER_NAME]:
+ subprocess.run(["docker", "rm", "-f", name], capture_output=True)
+ subprocess.run(["docker", "network", "rm", NETWORK_NAME],
capture_output=True)
+
+
+async def _connect_with_retry(bootstrap_servers, timeout=60):
+ """Connect to the Fluss cluster with retries until it's fully ready.
+
+ Waits until both the coordinator and at least one tablet server are
+ available, matching the Rust wait_for_cluster_ready pattern.
+ """
+ config = fluss.Config({"bootstrap.servers": bootstrap_servers})
+ start = time.time()
+ last_err = None
+ while time.time() - start < timeout:
+ conn = None
+ try:
+ conn = await fluss.FlussConnection.create(config)
+ admin = await conn.get_admin()
+ nodes = await admin.get_server_nodes()
+ if any(n.server_type == "TabletServer" for n in nodes):
+ return conn
+ last_err = RuntimeError("No TabletServer available yet")
+ except Exception as e:
+ last_err = e
+ if conn is not None:
+ conn.close()
+ await asyncio.sleep(1)
+ raise RuntimeError(
+ f"Could not connect to cluster after {timeout}s: {last_err}"
+ )
- tablet_server.stop()
- coordinator.stop()
- zookeeper.stop()
- network.remove()
+
+def pytest_unconfigure(config):
+ """Clean up Docker containers after all xdist workers finish.
+
+ Runs once on the controller process (or the single process when
+ not using xdist). Workers are identified by the 'workerinput' attr.
+ """
+ if BOOTSTRAP_SERVERS_ENV:
+ return
+ if hasattr(config, "workerinput"):
+ return # This is a worker, skip
+ _stop_cluster()
+
+
[email protected](scope="session")
+def fluss_cluster():
+ """Start a Fluss cluster using testcontainers, or use an existing one."""
+ if BOOTSTRAP_SERVERS_ENV:
+ sasl_env = os.environ.get(
+ "FLUSS_SASL_BOOTSTRAP_SERVERS", BOOTSTRAP_SERVERS_ENV
+ )
+ yield (BOOTSTRAP_SERVERS_ENV, sasl_env)
+ return
+
+ _start_cluster()
+
+ # (plaintext_bootstrap, sasl_bootstrap)
+ yield (
+ f"127.0.0.1:{PLAIN_CLIENT_PORT}",
+ f"127.0.0.1:{COORDINATOR_PORT}",
+ )
@pytest_asyncio.fixture(scope="session")
async def connection(fluss_cluster):
"""Session-scoped connection to the Fluss cluster (plaintext)."""
plaintext_addr, _sasl_addr = fluss_cluster
- config = fluss.Config({"bootstrap.servers": plaintext_addr})
- conn = await fluss.FlussConnection.create(config)
+ conn = await _connect_with_retry(plaintext_addr)
yield conn
conn.close()