This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ac14b470d25 Add task SDK integration test foundation with JWT auth and
session fixtures (#56139)
ac14b470d25 is described below
commit ac14b470d2534edc5cc7d50877cf1aa41bfe65f9
Author: Amogh Desai <[email protected]>
AuthorDate: Tue Oct 14 21:56:51 2025 +0530
Add task SDK integration test foundation with JWT auth and session fixtures
(#56139)
---
.../constants.py => dags/test_dag.py} | 36 ++-
task-sdk-tests/docker/docker-compose.yaml | 18 +-
task-sdk-tests/tests/task_sdk_tests/__init__.py | 8 +
task-sdk-tests/tests/task_sdk_tests/conftest.py | 245 ++++++++++++++++++++-
task-sdk-tests/tests/task_sdk_tests/constants.py | 21 +-
task-sdk-tests/tests/task_sdk_tests/jwt_plugin.py | 107 +++++++++
.../tests/task_sdk_tests/test_task_sdk_health.py | 130 ++---------
7 files changed, 430 insertions(+), 135 deletions(-)
diff --git a/task-sdk-tests/tests/task_sdk_tests/constants.py
b/task-sdk-tests/dags/test_dag.py
similarity index 53%
copy from task-sdk-tests/tests/task_sdk_tests/constants.py
copy to task-sdk-tests/dags/test_dag.py
index 4b65352c6c4..c8f19597ba7 100644
--- a/task-sdk-tests/tests/task_sdk_tests/constants.py
+++ b/task-sdk-tests/dags/test_dag.py
@@ -16,18 +16,32 @@
# under the License.
from __future__ import annotations
-import os
-from pathlib import Path
+import time
-AIRFLOW_ROOT_PATH = Path(__file__).resolve().parents[3]
-TASK_SDK_TESTS_ROOT = Path(__file__).resolve().parents[2]
+from airflow.sdk import DAG, task
-DEFAULT_PYTHON_MAJOR_MINOR_VERSION = "3.10"
-DEFAULT_DOCKER_IMAGE =
f"ghcr.io/apache/airflow/main/prod/python{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}:latest"
-DOCKER_IMAGE = os.environ.get("DOCKER_IMAGE") or DEFAULT_DOCKER_IMAGE
+dag = DAG("test_dag", description="Test DAG for Task SDK testing with
long-running task", schedule=None)
-DOCKER_COMPOSE_HOST_PORT = os.environ.get("HOST_PORT", "localhost:8080")
-TASK_SDK_HOST_PORT = os.environ.get("TASK_SDK_HOST_PORT", "localhost:8080")
-TASK_SDK_API_VERSION = "2025-08-10"
-DOCKER_COMPOSE_FILE_PATH = TASK_SDK_TESTS_ROOT / "docker" /
"docker-compose.yaml"
+@task(dag=dag)
+def get_task_instance_id(ti=None):
+ """Task that returns its own task instance ID"""
+ return str(ti.id)
+
+
+@task(dag=dag)
+def long_running_task(ti=None):
+ """Long-running task that sleeps for 5 minutes to allow testing"""
+ print(f"Starting long-running task with TI ID: {ti.id}")
+ print("This task will run for 5 minutes to allow API testing...")
+
+ time.sleep(3000)
+
+ print("Long-running task completed!")
+ return "test completed"
+
+
+get_ti_id = get_task_instance_id()
+long_task = long_running_task()
+
+get_ti_id >> long_task
diff --git a/task-sdk-tests/docker/docker-compose.yaml
b/task-sdk-tests/docker/docker-compose.yaml
index 29ece3d5316..1bf57694709 100644
--- a/task-sdk-tests/docker/docker-compose.yaml
+++ b/task-sdk-tests/docker/docker-compose.yaml
@@ -22,12 +22,20 @@ x-airflow-common:
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: LocalExecutor
- AIRFLOW__CORE__AUTH_MANAGER:
airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager
+ # yamllint disable rule:line-length
+ AIRFLOW__CORE__AUTH_MANAGER:
'airflow.api_fastapi.auth.managers.simple.simple_auth_manager.SimpleAuthManager'
+ AIRFLOW__CORE__SIMPLE_AUTH_MANAGER_ALL_ADMINS: 'true'
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN:
postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
+ AIRFLOW__CORE__DAGS_FOLDER: '/opt/airflow/dags'
AIRFLOW__CORE__EXECUTION_API_SERVER_URL:
'http://airflow-apiserver:8080/execution/'
+ AIRFLOW__API__BASE_URL: 'http://airflow-apiserver:8080/'
+ AIRFLOW__API_AUTH__JWT_SECRET: 'test-secret-key-for-testing'
user: "${AIRFLOW_UID:-50000}:0"
+ volumes:
+ - ${PWD}/dags:/opt/airflow/dags
+ - ${PWD}/logs:/opt/airflow/logs
depends_on:
&airflow-common-depends-on
postgres:
@@ -103,3 +111,11 @@ services:
condition: service_healthy
airflow-init:
condition: service_completed_successfully
+
+ airflow-dag-processor:
+ <<: *airflow-common
+ command: dag-processor
+ depends_on:
+ <<: *airflow-common-depends-on
+ airflow-init:
+ condition: service_completed_successfully
diff --git a/task-sdk-tests/tests/task_sdk_tests/__init__.py
b/task-sdk-tests/tests/task_sdk_tests/__init__.py
index 13a83393a91..973b8fdce34 100644
--- a/task-sdk-tests/tests/task_sdk_tests/__init__.py
+++ b/task-sdk-tests/tests/task_sdk_tests/__init__.py
@@ -14,3 +14,11 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+from __future__ import annotations
+
+from rich.console import Console
+
+console = Console(width=400, color_system="standard")
+
+
+__all__ = ["console"]
diff --git a/task-sdk-tests/tests/task_sdk_tests/conftest.py
b/task-sdk-tests/tests/task_sdk_tests/conftest.py
index 47aed12906f..2b2e8274330 100644
--- a/task-sdk-tests/tests/task_sdk_tests/conftest.py
+++ b/task-sdk-tests/tests/task_sdk_tests/conftest.py
@@ -19,16 +19,121 @@ from __future__ import annotations
import os
import subprocess
import sys
+from pathlib import Path
-from task_sdk_tests.constants import AIRFLOW_ROOT_PATH
+import pytest
+from task_sdk_tests import console
+from task_sdk_tests.constants import (
+ AIRFLOW_ROOT_PATH,
+ DOCKER_COMPOSE_FILE_PATH,
+ DOCKER_IMAGE,
+ TASK_SDK_HOST_PORT,
+)
-def pytest_sessionstart(session):
- """Install Task SDK at the very start of the pytest session."""
- from rich.console import Console
- console = Console(width=400, color_system="standard")
+def print_diagnostics(compose, compose_version, docker_version):
+ """Print diagnostic information when test fails."""
+ console.print("[red]=== DIAGNOSTIC INFORMATION ===[/]")
+ console.print(f"Docker version: {docker_version}")
+ console.print(f"Docker Compose version: {compose_version}")
+ console.print("\n[yellow]Container Status:[/]")
+ try:
+ containers = compose.compose.ps()
+ for container in containers:
+ console.print(f" {container.name}: {container.state}")
+ except Exception as e:
+ console.print(f" Error getting container status: {e}")
+
+ console.print("\n[yellow]Container Logs:[/]")
+ try:
+ logs = compose.compose.logs()
+ console.print(logs)
+ except Exception as e:
+ console.print(f" Error getting logs: {e}")
+
+
+def debug_environment():
+ """Debug the Python environment setup in CI."""
+
+ import os
+ import subprocess
+ import sys
+
+ console.print("[yellow]===== CI ENVIRONMENT DEBUG =====")
+ console.print(f"[blue]Python executable: {sys.executable}")
+ console.print(f"[blue]Python version: {sys.version}")
+ console.print(f"[blue]Working directory: {os.getcwd()}")
+ console.print(f"[blue]VIRTUAL_ENV: {os.environ.get('VIRTUAL_ENV', 'Not
set')}")
+ console.print(f"[blue]PYTHONPATH: {os.environ.get('PYTHONPATH', 'Not
set')}")
+
+ console.print(f"[blue]Python executable exists:
{Path(sys.executable).exists()}")
+ if Path(sys.executable).is_symlink():
+ console.print(f"[blue]Python executable is symlink to:
{Path(sys.executable).readlink()}")
+
+ try:
+ uv_python = subprocess.check_output(["uv", "python", "find"],
text=True).strip()
+ console.print(f"[cyan]UV Python: {uv_python}")
+ console.print(f"[green]Match: {uv_python == sys.executable}")
+
+ console.print(f"[cyan]UV Python exists: {Path(uv_python).exists()}")
+ if Path(uv_python).is_symlink():
+ console.print(f"[cyan]UV Python is symlink to:
{Path(uv_python).readlink()}")
+ except Exception as e:
+ console.print(f"[red]UV Python error: {e}")
+
+ # Check what's installed in current environment
+ try:
+ import airflow
+
+ console.print(f"[green]✅ airflow already available:
{airflow.__file__}")
+ except ImportError:
+ console.print("[red]❌ airflow not available in current environment")
+
+ console.print("[yellow]================================")
+
[email protected](scope="session")
+def docker_compose_setup(tmp_path_factory):
+ """Start docker-compose once per session."""
+ import os
+ from shutil import copyfile
+
+ from python_on_whales import DockerClient, docker
+
+ # Create temp directory for docker-compose
+ tmp_dir = tmp_path_factory.mktemp("airflow-task-sdk-test")
+ tmp_docker_compose_file = tmp_dir / "docker-compose.yaml"
+ copyfile(DOCKER_COMPOSE_FILE_PATH, tmp_docker_compose_file)
+
+ # Set environment variables
+ os.environ["AIRFLOW_IMAGE_NAME"] = DOCKER_IMAGE
+ os.environ["TASK_SDK_VERSION"] = os.environ.get("TASK_SDK_VERSION",
"1.1.0")
+
+ compose = DockerClient(compose_files=[str(tmp_docker_compose_file)])
+
+ try:
+ console.print("[yellow]Starting docker-compose for session...")
+ compose.compose.up(detach=True, wait=True)
+ console.print("[green]Docker compose started successfully!\n")
+
+ yield compose
+ except Exception as e:
+ console.print(f"[red]❌ Docker compose failed to start: {e}")
+
+ debug_environment()
+ print_diagnostics(compose, compose.version(), docker.version())
+
+ raise
+ finally:
+ if not os.environ.get("SKIP_DOCKER_COMPOSE_DELETION"):
+ console.print("[yellow]Cleaning up docker-compose...")
+ compose.compose.down(remove_orphans=True, volumes=True, quiet=True)
+ console.print("[green]Docker compose cleaned up")
+
+
+def pytest_sessionstart(session):
+ """Install Task SDK at the very start of the pytest session."""
task_sdk_version = os.environ.get("TASK_SDK_VERSION", "1.1.0")
console.print(
f"[yellow]Installing apache-airflow-task-sdk=={task_sdk_version} via
pytest_sessionstart..."
@@ -69,3 +174,133 @@ def pytest_sessionstart(session):
console.print(f"[red]Stdout: {e.stdout}")
console.print(f"[red]Stderr: {e.stderr}")
raise
+
+
[email protected](scope="session")
+def airflow_test_setup(docker_compose_setup):
+ """Fixed session-scoped fixture that matches UI behavior."""
+ import time
+
+ import requests
+
+ from airflow.sdk.api.client import Client
+ from airflow.sdk.timezone import utcnow
+ from task_sdk_tests.jwt_plugin import generate_jwt_token
+
+ time.sleep(15)
+
+ # Step 1: Get auth token
+ auth_url = "http://localhost:8080/auth/token"
+ try:
+ auth_response = requests.get(auth_url, timeout=10)
+ auth_response.raise_for_status()
+ auth_token = auth_response.json()["access_token"]
+ console.print("[green]✅ Got auth token")
+ except Exception as e:
+ raise e
+
+ # Step 2: Check and unpause DAG
+ headers = {"Authorization": f"Bearer {auth_token}", "Content-Type":
"application/json"}
+
+ console.print("[yellow]Checking DAG status...")
+ dag_response = requests.get("http://localhost:8080/api/v2/dags/test_dag",
headers=headers)
+ dag_response.raise_for_status()
+ dag_data = dag_response.json()
+
+ if dag_data.get("is_paused", True):
+ console.print("[yellow]Unpausing DAG...")
+ unpause_response = requests.patch(
+ "http://localhost:8080/api/v2/dags/test_dag", json={"is_paused":
False}, headers=headers
+ )
+ unpause_response.raise_for_status()
+ console.print("[green]✅ DAG unpaused")
+ logical_date = utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")[:-3] + "Z"
+ payload = {"conf": {}, "logical_date": logical_date}
+
+ trigger_response = requests.post(
+ "http://localhost:8080/api/v2/dags/test_dag/dagRuns", json=payload,
headers=headers, timeout=30
+ )
+
+ console.print(f"[blue]Trigger DAG Run response status:
{trigger_response.status_code}")
+ console.print(f"[blue]Trigger DAG Run response: {trigger_response.text}")
+
+ trigger_response.raise_for_status()
+ dag_run_data = trigger_response.json()
+ dag_run_id = dag_run_data["dag_run_id"]
+
+ console.print(f"[green]✅ DAG triggered: {dag_run_id}")
+
+ # Step 4: Get task instance for testing
+ console.print("[yellow]Waiting for any task instance...")
+ ti_id = None
+
+ for attempt in range(20):
+ try:
+ ti_url =
f"http://localhost:8080/api/v2/dags/test_dag/dagRuns/{dag_run_id}/taskInstances"
+ ti_response = requests.get(ti_url, headers=headers, timeout=10)
+ ti_response.raise_for_status()
+
+ task_instances = ti_response.json().get("task_instances", [])
+
+ if task_instances:
+ first_ti = task_instances[0]
+ ti_id = first_ti.get("id")
+
+ if ti_id:
+ console.print(f"[green]✅ Using task instance from
'{first_ti.get('task_id')}'")
+ console.print(f"[green] State: {first_ti.get('state')}")
+ console.print(f"[green] Instance ID: {ti_id}")
+ break
+ else:
+ console.print(f"[blue]Waiting for tasks (attempt {attempt +
1}/20)")
+
+ except Exception as e:
+ console.print(f"[yellow]Task check failed: {e}")
+
+ time.sleep(2)
+
+ if not ti_id:
+ console.print("[red]❌ Task instances never appeared. Final debug
info:")
+ raise TimeoutError("No task instance found within timeout period")
+
+ # Step 5: Create SDK client
+ jwt_token = generate_jwt_token(ti_id)
+ sdk_client = Client(base_url=f"http://{TASK_SDK_HOST_PORT}/execution",
token=jwt_token)
+
+ return {
+ "auth_token": auth_token,
+ "dag_info": {"dag_id": "test_dag", "dag_run_id": dag_run_id,
"logical_date": logical_date},
+ "task_instance_id": ti_id,
+ "sdk_client": sdk_client,
+ "core_api_headers": headers,
+ }
+
+
[email protected](scope="session")
+def auth_token(airflow_test_setup):
+ """Get the auth token from setup."""
+ return airflow_test_setup["auth_token"]
+
+
[email protected](scope="session")
+def dag_info(airflow_test_setup):
+ """Get DAG information from setup."""
+ return airflow_test_setup["dag_info"]
+
+
[email protected](scope="session")
+def task_instance_id(airflow_test_setup):
+ """Get task instance ID from setup."""
+ return airflow_test_setup["task_instance_id"]
+
+
[email protected](scope="session")
+def sdk_client(airflow_test_setup):
+ """Get authenticated Task SDK client from setup."""
+ return airflow_test_setup["sdk_client"]
+
+
[email protected](scope="session")
+def core_api_headers(airflow_test_setup):
+ """Get Core API headers from setup."""
+ return airflow_test_setup["core_api_headers"]
diff --git a/task-sdk-tests/tests/task_sdk_tests/constants.py
b/task-sdk-tests/tests/task_sdk_tests/constants.py
index 4b65352c6c4..1e35e0106c2 100644
--- a/task-sdk-tests/tests/task_sdk_tests/constants.py
+++ b/task-sdk-tests/tests/task_sdk_tests/constants.py
@@ -28,6 +28,25 @@ DOCKER_IMAGE = os.environ.get("DOCKER_IMAGE") or
DEFAULT_DOCKER_IMAGE
DOCKER_COMPOSE_HOST_PORT = os.environ.get("HOST_PORT", "localhost:8080")
TASK_SDK_HOST_PORT = os.environ.get("TASK_SDK_HOST_PORT", "localhost:8080")
-TASK_SDK_API_VERSION = "2025-08-10"
+
+
+# This represents the Execution API schema version, NOT the Task SDK package
version.
+#
+# Purpose:
+# - Defines the API contract between Task SDK and Airflow's Execution API
+# - Enables backward compatibility when API schemas evolve
+# - Uses calver format (YYYY-MM-DD) based on expected release dates
+#
+# Usage:
+# - Sent as "Airflow-API-Version" header with every API request
+# - Server uses this to determine which schema version to serve
+# - Allows older Task SDK versions to work with newer Airflow servers
+#
+# Version vs Package Version:
+# - API Version: "2025-09-23" (schema compatibility)
+# - Package Version: "1.1.0" (Task SDK release version)
+#
+# Keep this in sync with: task-sdk/src/airflow/sdk/api/datamodels/_generated.py
+TASK_SDK_API_VERSION = "2025-09-23"
DOCKER_COMPOSE_FILE_PATH = TASK_SDK_TESTS_ROOT / "docker" /
"docker-compose.yaml"
diff --git a/task-sdk-tests/tests/task_sdk_tests/jwt_plugin.py
b/task-sdk-tests/tests/task_sdk_tests/jwt_plugin.py
new file mode 100644
index 00000000000..3caf4193eea
--- /dev/null
+++ b/task-sdk-tests/tests/task_sdk_tests/jwt_plugin.py
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""JWT Token Plugin for Task SDK Integration Tests."""
+
+from __future__ import annotations
+
+import os
+import uuid
+from datetime import datetime, timezone
+from typing import Any
+
+import jwt
+
+
+class JWTTokenGenerator:
+ """Generator for JWT tokens used in Task SDK API authentication."""
+
+ def __init__(self):
+ """Initialize JWT configuration from environment variables."""
+ self.secret = os.getenv("AIRFLOW__API_AUTH__JWT_SECRET",
"test-secret-key-for-testing")
+ self.issuer = os.getenv("AIRFLOW__API_AUTH__JWT_ISSUER",
"airflow-test")
+ self.audience = os.getenv("AIRFLOW__API_AUTH__JWT_AUDIENCE",
"urn:airflow.apache.org:task")
+ self.algorithm = os.getenv("AIRFLOW__API_AUTH__JWT_ALGORITHM", "HS512")
+ self.kid = os.getenv("AIRFLOW__API_AUTH__JWT_KID", "test-key-id")
+
+ def generate_token(
+ self,
+ task_instance_id: str,
+ expires_in_seconds: int = 3600,
+ extra_claims: dict[str, Any] | None = None,
+ extra_headers: dict[str, Any] | None = None,
+ ) -> str:
+ """
+ Generate a JWT token for task instance authentication.
+
+ Args:
+ task_instance_id: The task instance ID to use as the 'sub' claim
+ expires_in_seconds: Token expiration time in seconds (default: 1
hour)
+ extra_claims: Additional claims to include in the token
+ extra_headers: Additional headers to include in the token
+
+ Returns:
+ JWT token as a string
+ """
+ now = int(datetime.now(timezone.utc).timestamp())
+
+ claims = {
+ "jti": uuid.uuid4().hex,
+ "iss": self.issuer,
+ "aud": self.audience,
+ "nbf": now,
+ "exp": now + expires_in_seconds,
+ "iat": now,
+ "sub": task_instance_id,
+ }
+
+ # Remove audience if not set
+ if not claims.get("aud"):
+ del claims["aud"]
+
+ # Add extra claims if provided
+ if extra_claims:
+ claims.update(extra_claims)
+
+ # Base JWT headers
+ headers = {
+ "alg": self.algorithm,
+ "kid": self.kid,
+ }
+
+ # Add extra headers if provided
+ if extra_headers:
+ headers.update(extra_headers)
+
+ # Generate and return the token
+ token = jwt.encode(claims, self.secret, algorithm=self.algorithm,
headers=headers)
+ return token
+
+
+def generate_jwt_token(task_instance_id: str, expires_in_seconds: int = 3600)
-> str:
+ """
+ Convenience function to generate a JWT token.
+
+ Args:
+ task_instance_id: The task instance ID to use as the 'sub' claim
+ expires_in_seconds: Token expiration time in seconds (default: 1 hour)
+
+ Returns:
+ JWT token as a string
+ """
+ generator = JWTTokenGenerator()
+ return generator.generate_token(task_instance_id, expires_in_seconds)
diff --git a/task-sdk-tests/tests/task_sdk_tests/test_task_sdk_health.py
b/task-sdk-tests/tests/task_sdk_tests/test_task_sdk_health.py
index d4b3b1ba94d..578068c9c6a 100644
--- a/task-sdk-tests/tests/task_sdk_tests/test_task_sdk_health.py
+++ b/task-sdk-tests/tests/task_sdk_tests/test_task_sdk_health.py
@@ -16,129 +16,25 @@
# under the License.
from __future__ import annotations
-import os
-from pathlib import Path
-from shutil import copyfile
-
-from python_on_whales import DockerClient, docker
-from rich.console import Console
-
+from task_sdk_tests import console
from task_sdk_tests.constants import (
- DOCKER_COMPOSE_FILE_PATH,
- DOCKER_IMAGE,
TASK_SDK_API_VERSION,
- TASK_SDK_HOST_PORT,
)
-console = Console(width=400, color_system="standard")
-
-
-def print_diagnostics(compose, compose_version, docker_version):
- """Print diagnostic information when test fails."""
- console.print("[red]=== DIAGNOSTIC INFORMATION ===[/]")
- console.print(f"Docker version: {docker_version}")
- console.print(f"Docker Compose version: {compose_version}")
- console.print("\n[yellow]Container Status:[/]")
- try:
- containers = compose.compose.ps()
- for container in containers:
- console.print(f" {container.name}: {container.state}")
- except Exception as e:
- console.print(f" Error getting container status: {e}")
-
- console.print("\n[yellow]Container Logs:[/]")
- try:
- logs = compose.compose.logs()
- console.print(logs)
- except Exception as e:
- console.print(f" Error getting logs: {e}")
-
-
-def debug_environment():
- """Debug the Python environment setup in CI."""
-
- import os
- import subprocess
- import sys
-
- console.print("[yellow]===== CI ENVIRONMENT DEBUG =====")
- console.print(f"[blue]Python executable: {sys.executable}")
- console.print(f"[blue]Python version: {sys.version}")
- console.print(f"[blue]Working directory: {os.getcwd()}")
- console.print(f"[blue]VIRTUAL_ENV: {os.environ.get('VIRTUAL_ENV', 'Not
set')}")
- console.print(f"[blue]PYTHONPATH: {os.environ.get('PYTHONPATH', 'Not
set')}")
-
- console.print(f"[blue]Python executable exists:
{Path(sys.executable).exists()}")
- if Path(sys.executable).is_symlink():
- console.print(f"[blue]Python executable is symlink to:
{Path(sys.executable).readlink()}")
-
- try:
- uv_python = subprocess.check_output(["uv", "python", "find"],
text=True).strip()
- console.print(f"[cyan]UV Python: {uv_python}")
- console.print(f"[green]Match: {uv_python == sys.executable}")
-
- console.print(f"[cyan]UV Python exists: {Path(uv_python).exists()}")
- if Path(uv_python).is_symlink():
- console.print(f"[cyan]UV Python is symlink to:
{Path(uv_python).readlink()}")
- except Exception as e:
- console.print(f"[red]UV Python error: {e}")
-
- # Check what's installed in current environment
- try:
- import airflow
-
- console.print(f"[green]✅ airflow already available:
{airflow.__file__}")
- except ImportError:
- console.print("[red]❌ airflow not available in current environment")
-
- console.print("[yellow]================================")
-
-
-def test_task_sdk_health(tmp_path_factory, monkeypatch):
- """Test Task SDK health check using docker-compose environment."""
- tmp_dir = tmp_path_factory.mktemp("airflow-task-sdk-test")
- console.print(f"[yellow]Tests are run in {tmp_dir}")
-
- # Copy docker-compose.yaml to temp directory
- tmp_docker_compose_file = tmp_dir / "docker-compose.yaml"
- copyfile(DOCKER_COMPOSE_FILE_PATH, tmp_docker_compose_file)
-
- # Set environment variables for the test
- monkeypatch.setenv("AIRFLOW_IMAGE_NAME", DOCKER_IMAGE)
- monkeypatch.setenv("TASK_SDK_VERSION", os.environ.get("TASK_SDK_VERSION",
"1.0.3"))
-
- # Initialize Docker client
- compose = DockerClient(compose_files=[str(tmp_docker_compose_file)])
-
- try:
- compose.compose.up(detach=True, wait=True)
- console.print("[green]Docker compose started for task SDK test\n")
-
- try:
- from airflow.sdk.api.client import Client
-
- console.print("[green]✅ Task SDK client imported successfully!")
- except ImportError as e:
- console.print(f"[red]❌ Failed to import Task SDK client: {e}")
- raise
- client = Client(base_url=f"http://{TASK_SDK_HOST_PORT}/execution",
token="not-a-token")
+def test_task_sdk_health(sdk_client):
+ """Test Task SDK health check using session setup."""
+ client = sdk_client
- console.print("[yellow]Making health check request...")
- response = client.get("health/ping", headers={"Airflow-API-Version":
TASK_SDK_API_VERSION})
+ console.print("[yellow]Making health check request...")
+ response = client.get("health/ping", headers={"Airflow-API-Version":
TASK_SDK_API_VERSION})
- console.print(" Health Check Response ".center(72, "="))
- console.print(f"[bright_blue]Status Code:[/] {response.status_code}")
- console.print(f"[bright_blue]Response:[/] {response.json()}")
- console.print("=" * 72)
+ console.print(" Health Check Response ".center(72, "="))
+ console.print(f"[bright_blue]Status Code:[/] {response.status_code}")
+ console.print(f"[bright_blue]Response:[/] {response.json()}")
+ console.print("=" * 72)
- assert response.status_code == 200
- assert response.json() == {"ok":
["airflow.api_fastapi.auth.tokens.JWTValidator"], "failing": {}}
+ assert response.status_code == 200
+ assert response.json() == {"ok":
["airflow.api_fastapi.auth.tokens.JWTValidator"], "failing": {}}
- except Exception:
- print_diagnostics(compose, compose.version(), docker.version())
- raise
- finally:
- if not os.environ.get("SKIP_DOCKER_COMPOSE_DELETION"):
- compose.compose.down(remove_orphans=True, volumes=True, quiet=True)
- console.print("[green]Docker compose instance deleted")
+ console.print("[green]✅ Task SDK health check passed!")