This is an automated email from the ASF dual-hosted git repository.
asorokoumov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/otava.git
The following commit(s) were added to refs/heads/master by this push:
new a68bc98 Add e2e Graphite test (#106)
a68bc98 is described below
commit a68bc98cfafd32432b52f6125a4f1b311d2c6601
Author: Alex Sorokoumov <[email protected]>
AuthorDate: Sat Dec 13 10:31:16 2025 -0800
Add e2e Graphite test (#106)
* Add e2e Graphite test
* Extract common functions into e2e_test_utils.py
---
tests/csv_e2e_test.py | 5 +-
tests/e2e_test_utils.py | 177 ++++++++++++++++++++++++++++++++++++++
tests/graphite_e2e_test.py | 210 +++++++++++++++++++++++++++++++++++++++++++++
tests/postgres_e2e_test.py | 171 +++++++++++++-----------------------
4 files changed, 449 insertions(+), 114 deletions(-)
diff --git a/tests/csv_e2e_test.py b/tests/csv_e2e_test.py
index d3b5624..33d80f4 100644
--- a/tests/csv_e2e_test.py
+++ b/tests/csv_e2e_test.py
@@ -24,6 +24,7 @@ from datetime import datetime, timedelta, timezone
from pathlib import Path
import pytest
+from e2e_test_utils import _remove_trailing_whitespaces
def test_analyze_csv():
@@ -206,7 +207,3 @@ def test_regressions_csv():
)
assert _remove_trailing_whitespaces(proc.stdout) ==
expected_output.rstrip("\n")
-
-
-def _remove_trailing_whitespaces(s: str) -> str:
- return "\n".join(line.rstrip() for line in s.splitlines()).strip()
diff --git a/tests/e2e_test_utils.py b/tests/e2e_test_utils.py
new file mode 100644
index 0000000..439dcac
--- /dev/null
+++ b/tests/e2e_test_utils.py
@@ -0,0 +1,177 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import shutil
+import socket
+import subprocess
+import time
+from contextlib import contextmanager
+from typing import Callable
+
+import pytest
+
+
+@contextmanager
+def container(
+ image: str,
+ *,
+ env: dict[str, str] | None = None,
+ ports: list[int] | None = None,
+ volumes: dict[str, str] | None = None,
+ readiness_check: Callable[[str, dict[int, int]], bool] | None = None,
+):
+ """
+ Generic context manager for running a Docker container.
+
+ Args:
+ image: Docker image to run (e.g., "postgres:latest").
+ env: Optional dict of environment variables to set in the container.
+ ports: Optional list of container ports to publish (will be mapped to
random host ports).
+ volumes: Optional dict mapping host paths to container paths for
volume mounts.
+ readiness_check: Optional callable that takes (container_id, port_map)
and returns True
+ when the container is ready. port_map maps container
ports to host ports.
+ If not provided, the container is considered ready
once all ports accept
+ TCP connections.
+
+ Yields:
+ A tuple of (container_id, port_map) where port_map is a dict mapping
container ports
+ to their assigned host ports.
+ """
+ if not shutil.which("docker"):
+ pytest.fail("docker is not available on PATH")
+
+ container_id = None
+ try:
+ # Build docker run command
+ cmd = ["docker", "run", "-d"]
+
+ # Add environment variables
+ if env:
+ for key, value in env.items():
+ cmd.extend(["--env", f"{key}={value}"])
+
+ # Add volume mounts
+ if volumes:
+ for host_path, container_path in volumes.items():
+ cmd.extend(["--volume", f"{host_path}:{container_path}"])
+
+ # Add port mappings
+ if ports:
+ for port in ports:
+ cmd.extend(["--publish", str(port)])
+
+ cmd.append(image)
+
+ # Start the container
+ proc = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
+ if proc.returncode != 0:
+ pytest.fail(
+ "Docker command returned non-zero exit code.\n\n"
+ f"Command: {cmd!r}\n"
+ f"Exit code: {proc.returncode}\n\n"
+ f"Stdout:\n{proc.stdout}\n\n"
+ f"Stderr:\n{proc.stderr}\n"
+ )
+ container_id = proc.stdout.strip()
+
+ # Get assigned host ports for each container port
+ port_map: dict[int, int] = {}
+ if ports:
+ for port in ports:
+ inspect_cmd = [
+ "docker",
+ "inspect",
+ "-f",
+ f'{{{{ (index (index .NetworkSettings.Ports "{port}/tcp")
0).HostPort }}}}',
+ container_id,
+ ]
+ inspect_proc = subprocess.run(
+ inspect_cmd, capture_output=True, text=True, timeout=60
+ )
+ if inspect_proc.returncode != 0:
+ pytest.fail(
+ "Docker inspect returned non-zero exit code.\n\n"
+ f"Command: {inspect_cmd!r}\n"
+ f"Exit code: {inspect_proc.returncode}\n\n"
+ f"Stdout:\n{inspect_proc.stdout}\n\n"
+ f"Stderr:\n{inspect_proc.stderr}\n"
+ )
+ port_map[port] = int(inspect_proc.stdout.strip())
+
+ # Wait for readiness
+ deadline = time.time() + 60
+ ready = False
+ while time.time() < deadline:
+ # First check that all ports accept TCP connections
+ all_ports_ready = True
+ for host_port in port_map.values():
+ try:
+ with socket.create_connection(("localhost", host_port),
timeout=1):
+ pass
+ except OSError:
+ all_ports_ready = False
+ break
+
+ if not all_ports_ready:
+ time.sleep(1)
+ continue
+
+ # If a custom readiness check is provided, use it
+ if readiness_check is not None:
+ if readiness_check(container_id, port_map):
+ ready = True
+ break
+ time.sleep(1)
+ else:
+ # No custom check, ports being open is sufficient
+ ready = True
+ break
+
+ if not ready:
+ pytest.fail("Container did not become ready within timeout.")
+
+ yield container_id, port_map
+ finally:
+ if container_id:
+ res = subprocess.run(
+ ["docker", "stop", container_id], capture_output=True,
text=True, timeout=60
+ )
+ if res.returncode != 0:
+ pytest.fail(
+ f"Docker stop returned non-zero exit code:
{res.returncode}\n"
+ f"Stdout: {res.stdout}\nStderr: {res.stderr}"
+ )
+ subprocess.run(
+ ["docker", "rm", container_id], capture_output=True,
text=True, timeout=60
+ )
+
+
+@contextmanager
+def graphite_container():
+ """
+ Context manager for running a Graphite container with seeded data.
+ Yields the Graphite HTTP port and ensures cleanup on exit.
+ """
+ with container(
+ "graphiteapp/graphite-statsd",
+ ports=[80, 2003],
+ ) as (container_id, port_map):
+ yield str(port_map[80])
+
+
+def _remove_trailing_whitespaces(s: str) -> str:
+ return "\n".join(line.rstrip() for line in s.splitlines())
diff --git a/tests/graphite_e2e_test.py b/tests/graphite_e2e_test.py
new file mode 100644
index 0000000..96a2d99
--- /dev/null
+++ b/tests/graphite_e2e_test.py
@@ -0,0 +1,210 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+import os
+import socket
+import subprocess
+import time
+import urllib.request
+from pathlib import Path
+
+import pytest
+from e2e_test_utils import _remove_trailing_whitespaces, container
+
+CARBON_PORT = 2003
+HTTP_PORT = 80
+
+
+def test_analyze_graphite():
+ """
+ End-to-end test for the Graphite example from docs/GRAPHITE.md.
+
+ Starts Graphite docker container, writes sample data, then runs otava
analyze,
+ and verifies the output contains expected change points.
+ """
+ with container(
+ "graphiteapp/graphite-statsd",
+ ports=[HTTP_PORT, CARBON_PORT],
+ readiness_check=_graphite_readiness_check,
+ ) as (container_id, port_map):
+ # Seed data into Graphite using the same pattern as datagen.sh
+ data_points = _seed_graphite_data(port_map[CARBON_PORT])
+
+ # Wait for data to be written and available
+ _wait_for_graphite_data(
+ http_port=port_map[HTTP_PORT],
+ metric_path="performance-tests.daily.my-product.client.throughput",
+ expected_points=data_points,
+ )
+
+ # Run the Otava analysis
+ proc = subprocess.run(
+ ["uv", "run", "otava", "analyze", "my-product.test",
"--since=-10m"],
+ capture_output=True,
+ text=True,
+ timeout=600,
+ env=dict(
+ os.environ,
+ OTAVA_CONFIG=str(Path("examples/graphite/config/otava.yaml")),
+ GRAPHITE_ADDRESS=f"http://localhost:{port_map[HTTP_PORT]}/",
+ GRAFANA_ADDRESS="http://localhost:3000/",
+ GRAFANA_USER="admin",
+ GRAFANA_PASSWORD="admin",
+ ),
+ )
+
+ if proc.returncode != 0:
+ pytest.fail(
+ "Command returned non-zero exit code.\n\n"
+ f"Command: {proc.args!r}\n"
+ f"Exit code: {proc.returncode}\n\n"
+ f"Stdout:\n{proc.stdout}\n\n"
+ f"Stderr:\n{proc.stderr}\n"
+ )
+
+ # Verify output contains expected columns and change point indicators
+ output = _remove_trailing_whitespaces(proc.stdout)
+
+ # Check that the header contains expected column names
+ assert "throughput" in output
+ assert "response_time" in output
+ assert "cpu_usage" in output
+
+ # Data shows throughput dropped from ~61k to ~57k (-5.6%) and cpu
increased from 0.2 to 0.8 (+300%)
+ assert "-5.6%" in output # throughput change
+ assert "+300.0%" in output # cpu_usage change
+
+
+def _graphite_readiness_check(container_id: str, port_map: dict[int, int]) ->
bool:
+ """
+ Check if Graphite is fully ready by writing a canary metric and verifying
it's queryable.
+
+ This ensures both Carbon (write path) and Graphite-web (read path) are
operational.
+ """
+ carbon_port = port_map[CARBON_PORT]
+ http_port = port_map[HTTP_PORT]
+
+ # Send a canary metric to Carbon
+ timestamp = int(time.time())
+ canary_metrics = "test.canary.readiness"
+ message = f"{canary_metrics} 1 {timestamp}\n"
+ try:
+ with socket.create_connection(("localhost", carbon_port), timeout=5)
as sock:
+ sock.sendall(message.encode("utf-8"))
+ except OSError:
+ return False
+
+ # Check if the canary metric is queryable via Graphite-web
+ url =
f"http://localhost:{http_port}/render?target={canary_metrics}&format=json&from=-1min"
+ try:
+ with urllib.request.urlopen(url, timeout=5) as response:
+ data = json.loads(response.read().decode("utf-8"))
+ if data and len(data) > 0:
+ datapoints = data[0].get("datapoints", [])
+ # Check if we have at least one non-null data point
+ if any(dp[0] is not None for dp in datapoints):
+ return True
+ except (urllib.error.URLError, json.JSONDecodeError, OSError):
+ pass
+
+ return False
+
+
+def _seed_graphite_data(carbon_port: int) -> int:
+ """
+ Seed Graphite with test data matching the pattern from
examples/graphite/datagen/datagen.sh.
+
+ Data pattern (from newest to oldest, matching datagen.sh array order):
+ - throughput: 56950, 57980, 57123, 60960, 60160, 61160 (index 0 is newest)
+ - response_time (p50): 85, 87, 88, 89, 85, 87
+ - cpu_usage: 0.7, 0.9, 0.8, 0.1, 0.3, 0.2
+
+ When displayed chronologically (oldest to newest), this shows:
+ - throughput dropped from ~61k to ~57k (-5.6% regression)
+ - cpu increased from 0.2 to 0.8 (+300% regression)
+ """
+ throughput_path = "performance-tests.daily.my-product.client.throughput"
+ throughput_values = [56950, 57980, 57123, 60960, 60160, 61160]
+
+ p50_path = "performance-tests.daily.my-product.client.p50"
+ p50_values = [85, 87, 88, 89, 85, 87]
+
+ cpu_path = "performance-tests.daily.my-product.server.cpu"
+ cpu_values = [0.7, 0.9, 0.8, 0.1, 0.3, 0.2]
+
+ start_timestamp = int(time.time())
+ num_points = len(throughput_values)
+
+ for i in range(num_points):
+ # Data is sent from newest to oldest (same as datagen.sh)
+ timestamp = start_timestamp - (i * 60)
+ _send_to_graphite(carbon_port, throughput_path, throughput_values[i],
timestamp)
+ _send_to_graphite(carbon_port, p50_path, p50_values[i], timestamp)
+ _send_to_graphite(carbon_port, cpu_path, cpu_values[i], timestamp)
+ return num_points
+
+
+def _send_to_graphite(carbon_port: int, path: str, value: float, timestamp:
int):
+ """
+ Send a single metric to Graphite via the Carbon plaintext protocol.
+ """
+ message = f"{path} {value} {timestamp}\n"
+ try:
+ with socket.create_connection(("localhost", carbon_port), timeout=5)
as sock:
+ sock.sendall(message.encode("utf-8"))
+ except OSError as e:
+ pytest.fail(f"Failed to send metric to Graphite: {e}")
+
+
+def _wait_for_graphite_data(
+ http_port: int,
+ metric_path: str,
+ expected_points: int,
+ timeout: float = 120,
+ poll_interval: float = 0.5,
+) -> None:
+ """
+ Wait for Graphite to have the expected data points available.
+
+ Polls the Graphite render API until the specified metric has at least
+ the expected number of non-null data points, or until the timeout expires.
+ """
+ url =
f"http://localhost:{http_port}/render?target={metric_path}&format=json&from=-10min"
+ deadline = time.time() + timeout
+
+ last_observed_count = 0
+ while time.time() < deadline:
+ try:
+ with urllib.request.urlopen(url, timeout=5) as response:
+ data = json.loads(response.read().decode("utf-8"))
+ if data and len(data) > 0:
+ datapoints = data[0].get("datapoints", [])
+ # Count non-null values
+ non_null_count = sum(1 for dp in datapoints if dp[0] is
not None)
+ last_observed_count = non_null_count
+ if non_null_count >= expected_points:
+ return
+ except (urllib.error.URLError, json.JSONDecodeError, OSError):
+ pass # Retry on connection errors
+
+ time.sleep(poll_interval)
+
+ pytest.fail(
+ f"Timeout waiting for Graphite data. "
+ f"Expected {expected_points} points for metric '{metric_path}' within
{timeout}s, got {last_observed_count}"
+ )
diff --git a/tests/postgres_e2e_test.py b/tests/postgres_e2e_test.py
index e14de8d..e45fa39 100644
--- a/tests/postgres_e2e_test.py
+++ b/tests/postgres_e2e_test.py
@@ -16,15 +16,14 @@
# under the License.
import os
-import shutil
-import socket
import subprocess
import textwrap
-import time
from contextlib import contextmanager
from pathlib import Path
+from typing import Callable
import pytest
+from e2e_test_utils import _remove_trailing_whitespaces, container
def test_analyze():
@@ -36,7 +35,10 @@ def test_analyze():
container, and compares stdout to the expected output (seeded data uses
deterministic 2025 timestamps).
"""
- with postgres_container() as (postgres_container_id, host_port):
+ username = "exampleuser"
+ password = "examplepassword"
+ db = "benchmark_results"
+ with postgres_container(username, password, db) as (postgres_container_id,
host_port):
# Run the Otava analysis
proc = subprocess.run(
["uv", "run", "otava", "analyze", "aggregate_mem"],
@@ -48,9 +50,9 @@ def test_analyze():
OTAVA_CONFIG=Path("examples/postgresql/config/otava.yaml"),
POSTGRES_HOSTNAME="localhost",
POSTGRES_PORT=host_port,
- POSTGRES_USERNAME="exampleuser",
- POSTGRES_PASSWORD="examplepassword",
- POSTGRES_DATABASE="benchmark_results",
+ POSTGRES_USERNAME=username,
+ POSTGRES_PASSWORD=password,
+ POSTGRES_DATABASE=db,
BRANCH="trunk",
),
)
@@ -131,7 +133,11 @@ def test_analyze_and_update_postgres():
container, and compares stdout to the expected output (seeded data uses
deterministic 2025 timestamps).
"""
- with postgres_container() as (postgres_container_id, host_port):
+
+ username = "exampleuser"
+ password = "examplepassword"
+ db = "benchmark_results"
+ with postgres_container(username, password, db) as (postgres_container_id,
host_port):
# Run the Otava analysis
proc = subprocess.run(
["uv", "run", "otava", "analyze", "aggregate_mem",
"--update-postgres"],
@@ -143,9 +149,9 @@ def test_analyze_and_update_postgres():
OTAVA_CONFIG=Path("examples/postgresql/config/otava.yaml"),
POSTGRES_HOSTNAME="localhost",
POSTGRES_PORT=host_port,
- POSTGRES_USERNAME="exampleuser",
- POSTGRES_PASSWORD="examplepassword",
- POSTGRES_DATABASE="benchmark_results",
+ POSTGRES_USERNAME=username,
+ POSTGRES_PASSWORD=password,
+ POSTGRES_DATABASE=db,
BRANCH="trunk",
),
)
@@ -233,7 +239,10 @@ def test_regressions():
waits for Postgres to be ready, runs the otava regressions command,
and compares stdout to the expected output.
"""
- with postgres_container() as (postgres_container_id, host_port):
+ username = "exampleuser"
+ password = "examplepassword"
+ db = "benchmark_results"
+ with postgres_container(username, password, db) as (postgres_container_id,
host_port):
# Run the Otava regressions command
proc = subprocess.run(
["uv", "run", "otava", "regressions", "aggregate_mem"],
@@ -245,9 +254,9 @@ def test_regressions():
OTAVA_CONFIG=Path("examples/postgresql/config/otava.yaml"),
POSTGRES_HOSTNAME="localhost",
POSTGRES_PORT=host_port,
- POSTGRES_USERNAME="exampleuser",
- POSTGRES_PASSWORD="examplepassword",
- POSTGRES_DATABASE="benchmark_results",
+ POSTGRES_USERNAME=username,
+ POSTGRES_PASSWORD=password,
+ POSTGRES_DATABASE=db,
BRANCH="trunk",
),
)
@@ -310,106 +319,48 @@ def test_regressions():
assert forward_change == backward_change == p_value == ""
-@contextmanager
-def postgres_container():
- """
- Context manager for running a PostgreSQL container.
- Yields the container ID and ensures cleanup on exit.
- """
- if not shutil.which("docker"):
- pytest.fail("docker is not available on PATH")
+def _postgres_readiness_check_f(
+ username: str, database: str
+) -> Callable[[str, dict[int, int]], bool]:
+ """Check if PostgreSQL is ready to accept connections."""
- container_id = None
- try:
- # Start postgres container
+ def _inner(
+ container_id: str,
+ port_map: dict[int, int],
+ ) -> bool:
cmd = [
"docker",
- "run",
- "-d",
- "--env",
- "POSTGRES_USER=exampleuser",
- "--env",
- "POSTGRES_PASSWORD=examplepassword",
- "--env",
- "POSTGRES_DB=benchmark_results",
- "--volume",
-
f"{Path('examples/postgresql/init-db').resolve()}:/docker-entrypoint-initdb.d",
- "--publish",
- "5432",
- "postgres:latest",
- ]
- proc = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
- if proc.returncode != 0:
- pytest.fail(
- "Docker command returned non-zero exit code.\n\n"
- f"Command: {cmd!r}\n"
- f"Exit code: {proc.returncode}\n\n"
- f"Stdout:\n{proc.stdout}\n\n"
- f"Stderr:\n{proc.stderr}\n"
- )
- container_id = proc.stdout.strip()
- # Determine the randomly assigned host port for 5432/tcp
- inspect_cmd = [
- "docker",
- "inspect",
- "-f",
- '{{ (index (index .NetworkSettings.Ports "5432/tcp") 0).HostPort
}}',
+ "exec",
container_id,
+ "pg_isready",
+ "-U",
+ username,
+ "-d",
+ database,
]
- inspect_proc = subprocess.run(inspect_cmd, capture_output=True,
text=True, timeout=60)
- if inspect_proc.returncode != 0:
- pytest.fail(
- "Docker inspect returned non-zero exit code.\n\n"
- f"Command: {inspect_cmd!r}\n"
- f"Exit code: {inspect_proc.returncode}\n\n"
- f"Stdout:\n{inspect_proc.stdout}\n\n"
- f"Stderr:\n{inspect_proc.stderr}\n"
- )
- host_port = inspect_proc.stdout.strip()
-
- # Wait until Postgres responds
- deadline = time.time() + 60
- ready = False
- while time.time() < deadline:
- # First ensure the assigned host port accepts TCP connections
- try:
- with socket.create_connection(("localhost", int(host_port)),
timeout=1):
- port_ready = True
- except OSError:
- port_ready = False
- continue
-
- # Then check pg_isready inside the container
- cmd = [
- "docker",
- "exec",
- container_id,
- "pg_isready",
- "-U",
- "exampleuser",
- "-d",
- "benchmark_results",
- ]
- proc = subprocess.run(cmd, capture_output=True, text=True)
- if port_ready and proc.returncode == 0:
- ready = True
- break
- time.sleep(1)
-
- if not ready:
- pytest.fail("Postgres did not become ready within timeout.")
+ proc = subprocess.run(cmd, capture_output=True, text=True)
+ return proc.returncode == 0
- yield container_id, host_port
- finally:
- if container_id:
- res = subprocess.run(
- ["docker", "stop", container_id], capture_output=True,
text=True, timeout=60
- )
- if res.returncode != 0:
- pytest.fail(
- f"Docker command returned non-zero exit code:
{res.returncode}\nStdout: {res.stdout}\nStderr: {res.stderr}"
- )
+ return _inner
-def _remove_trailing_whitespaces(s: str) -> str:
- return "\n".join(line.rstrip() for line in s.splitlines())
+@contextmanager
+def postgres_container(username, password, database):
+ """
+ Context manager for running a PostgreSQL container.
+ Yields the container ID and ensures cleanup on exit.
+ """
+ with container(
+ "postgres:latest",
+ env={
+ "POSTGRES_USER": username,
+ "POSTGRES_PASSWORD": password,
+ "POSTGRES_DB": database,
+ },
+ ports=[5432],
+ volumes={
+ str(Path("examples/postgresql/init-db").resolve()):
"/docker-entrypoint-initdb.d",
+ },
+ readiness_check=_postgres_readiness_check_f(username, database),
+ ) as (container_id, port_map):
+ yield container_id, str(port_map[5432])