This is an automated email from the ASF dual-hosted git repository. pierrejeambrun pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new c4d44e716ec AIP84: Check standalone_dag_processor config in get_airflow_health() and update health endpoint (#44383) c4d44e716ec is described below commit c4d44e716ec2c92d3d032c5c97448559c23d0b0b Author: vatsrahul1001 <43964496+vatsrahul1...@users.noreply.github.com> AuthorDate: Fri Nov 29 20:50:08 2024 +0530 AIP84: Check standalone_dag_processor config in get_airflow_health() and update health endpoint (#44383) * AIP84 health endpoint skip dag processor * AIP84 health endpoint skip dag processor * fixing test_airflow_health tests and adding new tests * AIP84 health endpoint skip dag processor * AIP84 health endpoint skip dag processor * fixing test_airflow_health tests and adding new tests * fix static tests * Using only one model class HealthInfoSchema * fixing static checks * implement review comments * fix static tests --- airflow/api/common/airflow_health.py | 40 ++++--- airflow/api_fastapi/core_api/datamodels/monitor.py | 2 +- .../api_fastapi/core_api/openapi/v1-generated.yaml | 5 +- .../api_fastapi/core_api/routes/public/monitor.py | 4 +- airflow/ui/openapi-gen/requests/schemas.gen.ts | 11 +- airflow/ui/openapi-gen/requests/types.gen.ts | 2 +- airflow/ui/src/pages/Dashboard/Health/Health.tsx | 5 +- tests/api/common/test_airflow_health.py | 116 ++++++++++++++++++++- .../core_api/routes/public/test_monitor.py | 52 +++++++++ 9 files changed, 205 insertions(+), 32 deletions(-) diff --git a/airflow/api/common/airflow_health.py b/airflow/api/common/airflow_health.py index 5d37de540a4..043557fa0db 100644 --- a/airflow/api/common/airflow_health.py +++ b/airflow/api/common/airflow_health.py @@ -18,6 +18,7 @@ from __future__ import annotations from typing import Any +from airflow.configuration import conf from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner from airflow.jobs.scheduler_job_runner import SchedulerJobRunner from airflow.jobs.triggerer_job_runner import TriggererJobRunner @@ -28,13 +29,13 @@ UNHEALTHY = "unhealthy" def get_airflow_health() -> dict[str, Any]: """Get the health for Airflow metadatabase, scheduler and triggerer.""" + dag_processor_enabled = conf.getboolean("scheduler", "standalone_dag_processor") metadatabase_status = HEALTHY latest_scheduler_heartbeat = None latest_triggerer_heartbeat = None - latest_dag_processor_heartbeat = None + scheduler_status = UNHEALTHY triggerer_status: str | None = UNHEALTHY - dag_processor_status: str | None = UNHEALTHY try: latest_scheduler_job = SchedulerJobRunner.most_recent_job() @@ -58,18 +59,6 @@ def get_airflow_health() -> dict[str, Any]: except Exception: metadatabase_status = UNHEALTHY - try: - latest_dag_processor_job = DagProcessorJobRunner.most_recent_job() - - if latest_dag_processor_job: - latest_dag_processor_heartbeat = latest_dag_processor_job.latest_heartbeat.isoformat() - if latest_dag_processor_job.is_alive(): - dag_processor_status = HEALTHY - else: - dag_processor_status = None - except Exception: - metadatabase_status = UNHEALTHY - airflow_health_status = { "metadatabase": {"status": metadatabase_status}, "scheduler": { @@ -80,10 +69,27 @@ def get_airflow_health() -> dict[str, Any]: "status": triggerer_status, "latest_triggerer_heartbeat": latest_triggerer_heartbeat, }, - "dag_processor": { + } + + if dag_processor_enabled: + latest_dag_processor_heartbeat = None + dag_processor_status: str | None = UNHEALTHY + + try: + latest_dag_processor_job = DagProcessorJobRunner.most_recent_job() + + if latest_dag_processor_job: + latest_dag_processor_heartbeat = latest_dag_processor_job.latest_heartbeat.isoformat() + if latest_dag_processor_job.is_alive(): + dag_processor_status = HEALTHY + else: + dag_processor_status = None + except Exception: + metadatabase_status = UNHEALTHY + + airflow_health_status["dag_processor"] = { "status": dag_processor_status, "latest_dag_processor_heartbeat": latest_dag_processor_heartbeat, - }, - } + } return airflow_health_status diff --git a/airflow/api_fastapi/core_api/datamodels/monitor.py b/airflow/api_fastapi/core_api/datamodels/monitor.py index fbaf40b4e84..f4434034424 100644 --- a/airflow/api_fastapi/core_api/datamodels/monitor.py +++ b/airflow/api_fastapi/core_api/datamodels/monitor.py @@ -49,4 +49,4 @@ class HealthInfoSchema(BaseModel): metadatabase: BaseInfoSchema scheduler: SchedulerInfoSchema triggerer: TriggererInfoSchema - dag_processor: DagProcessorInfoSchema + dag_processor: DagProcessorInfoSchema | None = None diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index a38fa874c08..9832c3668fe 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -7586,13 +7586,14 @@ components: triggerer: $ref: '#/components/schemas/TriggererInfoSchema' dag_processor: - $ref: '#/components/schemas/DagProcessorInfoSchema' + anyOf: + - $ref: '#/components/schemas/DagProcessorInfoSchema' + - type: 'null' type: object required: - metadatabase - scheduler - triggerer - - dag_processor title: HealthInfoSchema description: Schema for the Health endpoint. HistoricalMetricDataResponse: diff --git a/airflow/api_fastapi/core_api/routes/public/monitor.py b/airflow/api_fastapi/core_api/routes/public/monitor.py index 38953a18b99..538bdcb3521 100644 --- a/airflow/api_fastapi/core_api/routes/public/monitor.py +++ b/airflow/api_fastapi/core_api/routes/public/monitor.py @@ -24,7 +24,7 @@ from airflow.api_fastapi.core_api.datamodels.monitor import HealthInfoSchema monitor_router = AirflowRouter(tags=["Monitor"], prefix="/monitor") -@monitor_router.get("/health") -def get_health() -> HealthInfoSchema: +@monitor_router.get("/health", response_model=HealthInfoSchema, response_model_exclude_unset=True) +def get_health(): airflow_health_status = get_airflow_health() return HealthInfoSchema.model_validate(airflow_health_status) diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index b9289674a18..13e657d0083 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -2886,11 +2886,18 @@ export const $HealthInfoSchema = { $ref: "#/components/schemas/TriggererInfoSchema", }, dag_processor: { - $ref: "#/components/schemas/DagProcessorInfoSchema", + anyOf: [ + { + $ref: "#/components/schemas/DagProcessorInfoSchema", + }, + { + type: "null", + }, + ], }, }, type: "object", - required: ["metadatabase", "scheduler", "triggerer", "dag_processor"], + required: ["metadatabase", "scheduler", "triggerer"], title: "HealthInfoSchema", description: "Schema for the Health endpoint.", } as const; diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 89d8ab8a145..2035b4579fe 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -705,7 +705,7 @@ export type HealthInfoSchema = { metadatabase: BaseInfoSchema; scheduler: SchedulerInfoSchema; triggerer: TriggererInfoSchema; - dag_processor: DagProcessorInfoSchema; + dag_processor?: DagProcessorInfoSchema | null; }; /** diff --git a/airflow/ui/src/pages/Dashboard/Health/Health.tsx b/airflow/ui/src/pages/Dashboard/Health/Health.tsx index 0694283930c..040f9a04969 100644 --- a/airflow/ui/src/pages/Dashboard/Health/Health.tsx +++ b/airflow/ui/src/pages/Dashboard/Health/Health.tsx @@ -54,15 +54,14 @@ export const Health = () => { status={data?.triggerer.status} title="Triggerer" /> - {/* TODO: Update this to match the API when we move the config check to the API level */} - {data?.dag_processor.status === undefined ? undefined : ( + {data?.dag_processor ? ( <HealthTag isLoading={isLoading} latestHeartbeat={data.dag_processor.latest_dag_processor_heartbeat} status={data.dag_processor.status} title="Dag Processor" /> - )} + ) : undefined} </HStack> </Box> ); diff --git a/tests/api/common/test_airflow_health.py b/tests/api/common/test_airflow_health.py index ebdc086c692..06bd1a7bf4c 100644 --- a/tests/api/common/test_airflow_health.py +++ b/tests/api/common/test_airflow_health.py @@ -27,14 +27,19 @@ from airflow.api.common.airflow_health import ( get_airflow_health, ) +from tests_common.test_utils.config import conf_vars + pytestmark = pytest.mark.db_test @patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job", return_value=None) @patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", return_value=None) @patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job", return_value=None) +@conf_vars({("scheduler", "standalone_dag_processor"): "True"}) def test_get_airflow_health_only_metadatabase_healthy( - latest_scheduler_job_mock, latest_triggerer_job_mock, latest_dag_processor_job_mock + latest_scheduler_job_mock, + latest_triggerer_job_mock, + latest_dag_processor_job_mock, ): health_status = get_airflow_health() expected_status = { @@ -50,8 +55,11 @@ def test_get_airflow_health_only_metadatabase_healthy( @patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job", return_value=Exception) @patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", return_value=Exception) @patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job", return_value=Exception) +@conf_vars({("scheduler", "standalone_dag_processor"): "True"}) def test_get_airflow_health_metadatabase_unhealthy( - latest_scheduler_job_mock, latest_triggerer_job_mock, latest_dag_processor_job_mock + latest_scheduler_job_mock, + latest_triggerer_job_mock, + latest_dag_processor_job_mock, ): health_status = get_airflow_health() @@ -65,6 +73,45 @@ def test_get_airflow_health_metadatabase_unhealthy( assert health_status == expected_status +@patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job", return_value=None) +@patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", return_value=None) +@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job", return_value=None) +@conf_vars({("scheduler", "standalone_dag_processor"): "False"}) +def test_get_airflow_health_only_metadatabase_healthy_with_dag_processor_disabled( + latest_scheduler_job_mock, + latest_triggerer_job_mock, + latest_dag_processor_job_mock, +): + health_status = get_airflow_health() + expected_status = { + "metadatabase": {"status": HEALTHY}, + "scheduler": {"status": UNHEALTHY, "latest_scheduler_heartbeat": None}, + "triggerer": {"status": None, "latest_triggerer_heartbeat": None}, + } + + assert health_status == expected_status + + +@patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job", return_value=Exception) +@patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", return_value=Exception) +@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job", return_value=Exception) +@conf_vars({("scheduler", "standalone_dag_processor"): "False"}) +def test_get_airflow_health_metadatabase_unhealthy_with_dag_processor_disabled( + latest_scheduler_job_mock, + latest_triggerer_job_mock, + latest_dag_processor_job_mock, +): + health_status = get_airflow_health() + + expected_status = { + "metadatabase": {"status": UNHEALTHY}, + "scheduler": {"status": UNHEALTHY, "latest_scheduler_heartbeat": None}, + "triggerer": {"status": UNHEALTHY, "latest_triggerer_heartbeat": None}, + } + + assert health_status == expected_status + + LATEST_SCHEDULER_JOB_MOCK = MagicMock() LATEST_SCHEDULER_JOB_MOCK.latest_heartbeat = datetime.now() LATEST_SCHEDULER_JOB_MOCK.is_alive = MagicMock(return_value=True) @@ -76,8 +123,11 @@ LATEST_SCHEDULER_JOB_MOCK.is_alive = MagicMock(return_value=True) ) @patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", return_value=None) @patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job", return_value=None) +@conf_vars({("scheduler", "standalone_dag_processor"): "True"}) def test_get_airflow_health_scheduler_healthy_no_triggerer( - latest_scheduler_job_mock, latest_triggerer_job_mock, latest_dag_processor_job_mock + latest_scheduler_job_mock, + latest_triggerer_job_mock, + latest_dag_processor_job_mock, ): health_status = get_airflow_health() @@ -94,6 +144,32 @@ def test_get_airflow_health_scheduler_healthy_no_triggerer( assert health_status == expected_status +@patch( + "airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job", + return_value=LATEST_SCHEDULER_JOB_MOCK, +) +@patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", return_value=None) +@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job", return_value=None) +@conf_vars({("scheduler", "standalone_dag_processor"): "False"}) +def test_get_airflow_health_scheduler_healthy_no_triggerer__with_dag_processor_disabled( + latest_scheduler_job_mock, + latest_triggerer_job_mock, + latest_dag_processor_job_mock, +): + health_status = get_airflow_health() + + expected_status = { + "metadatabase": {"status": HEALTHY}, + "scheduler": { + "status": HEALTHY, + "latest_scheduler_heartbeat": LATEST_SCHEDULER_JOB_MOCK.latest_heartbeat.isoformat(), + }, + "triggerer": {"status": None, "latest_triggerer_heartbeat": None}, + } + + assert health_status == expected_status + + LATEST_TRIGGERER_JOB_MOCK = MagicMock() LATEST_TRIGGERER_JOB_MOCK.latest_heartbeat = datetime.now() LATEST_TRIGGERER_JOB_MOCK.is_alive = MagicMock(return_value=True) @@ -112,8 +188,11 @@ LATEST_DAG_PROCESSOR_JOB_MOCK.is_alive = MagicMock(return_value=True) "airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job", return_value=LATEST_DAG_PROCESSOR_JOB_MOCK, ) +@conf_vars({("scheduler", "standalone_dag_processor"): "True"}) def test_get_airflow_health_triggerer_healthy_no_scheduler_job_record( - latest_scheduler_job_mock, latest_triggerer_job_mock, latest_dag_processor_job_mock + latest_scheduler_job_mock, + latest_triggerer_job_mock, + latest_dag_processor_job_mock, ): health_status = get_airflow_health() @@ -131,3 +210,32 @@ def test_get_airflow_health_triggerer_healthy_no_scheduler_job_record( } assert health_status == expected_status + + +@patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job", return_value=None) +@patch( + "airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", + return_value=LATEST_TRIGGERER_JOB_MOCK, +) +@patch( + "airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job", + return_value=LATEST_DAG_PROCESSOR_JOB_MOCK, +) +@conf_vars({("scheduler", "standalone_dag_processor"): "False"}) +def test_get_airflow_health_triggerer_healthy_no_scheduler_job_record_with_dag_processor_disabled( + latest_scheduler_job_mock, + latest_triggerer_job_mock, + latest_dag_processor_job_mock, +): + health_status = get_airflow_health() + + expected_status = { + "metadatabase": {"status": HEALTHY}, + "scheduler": {"status": UNHEALTHY, "latest_scheduler_heartbeat": None}, + "triggerer": { + "status": HEALTHY, + "latest_triggerer_heartbeat": LATEST_TRIGGERER_JOB_MOCK.latest_heartbeat.isoformat(), + }, + } + + assert health_status == expected_status diff --git a/tests/api_fastapi/core_api/routes/public/test_monitor.py b/tests/api_fastapi/core_api/routes/public/test_monitor.py index d736291180a..d34031f8be9 100644 --- a/tests/api_fastapi/core_api/routes/public/test_monitor.py +++ b/tests/api_fastapi/core_api/routes/public/test_monitor.py @@ -103,3 +103,55 @@ class TestGetHealth(TestMonitorEndpoint): assert body["metadatabase"]["status"] == "unhealthy" assert body["scheduler"]["latest_scheduler_heartbeat"] is None + + @mock.patch("airflow.api_fastapi.core_api.routes.public.monitor.get_airflow_health") + def test_health_with_dag_processor(self, mock_get_airflow_health, test_client): + mock_get_airflow_health.return_value = { + "metadatabase": {"status": HEALTHY}, + "scheduler": { + "status": HEALTHY, + "latest_scheduler_heartbeat": "2024-11-23T11:09:16.663124+00:00", + }, + "triggerer": { + "status": HEALTHY, + "latest_triggerer_heartbeat": "2024-11-23T11:09:15.815483+00:00", + }, + "dag_processor": { + "status": HEALTHY, + "latest_dag_processor_heartbeat": "2024-11-23T11:09:15.815483+00:00", + }, + } + + response = test_client.get("/public/monitor/health") + + assert response.status_code == 200 + body = response.json() + + assert "dag_processor" in body + assert body["metadatabase"]["status"] == HEALTHY + assert body["scheduler"]["status"] == HEALTHY + assert body["triggerer"]["status"] == HEALTHY + + @mock.patch("airflow.api_fastapi.core_api.routes.public.monitor.get_airflow_health") + def test_health_without_dag_processor(self, mock_get_airflow_health, test_client): + mock_get_airflow_health.return_value = { + "metadatabase": {"status": HEALTHY}, + "scheduler": { + "status": HEALTHY, + "latest_scheduler_heartbeat": "2024-11-23T11:09:16.663124+00:00", + }, + "triggerer": { + "status": HEALTHY, + "latest_triggerer_heartbeat": "2024-11-23T11:09:15.815483+00:00", + }, + } + + response = test_client.get("/public/monitor/health") + + assert response.status_code == 200 + body = response.json() + + assert "dag_processor" not in body + assert body["metadatabase"]["status"] == HEALTHY + assert body["scheduler"]["status"] == HEALTHY + assert body["triggerer"]["status"] == HEALTHY