This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c4d44e716ec AIP84:  Check standalone_dag_processor config in 
get_airflow_health()  and update health endpoint (#44383)
c4d44e716ec is described below

commit c4d44e716ec2c92d3d032c5c97448559c23d0b0b
Author: vatsrahul1001 <43964496+vatsrahul1...@users.noreply.github.com>
AuthorDate: Fri Nov 29 20:50:08 2024 +0530

    AIP84:  Check standalone_dag_processor config in get_airflow_health()  and 
update health endpoint (#44383)
    
    * AIP84 health endpoint skip dag processor
    
    * AIP84 health endpoint skip dag processor
    
    * fixing test_airflow_health tests and adding new tests
    
    * AIP84 health endpoint skip dag processor
    
    * AIP84 health endpoint skip dag processor
    
    * fixing test_airflow_health tests and adding new tests
    
    * fix static tests
    
    * Using only one model class HealthInfoSchema
    
    * fixing static checks
    
    * implement review comments
    
    * fix static tests
---
 airflow/api/common/airflow_health.py               |  40 ++++---
 airflow/api_fastapi/core_api/datamodels/monitor.py |   2 +-
 .../api_fastapi/core_api/openapi/v1-generated.yaml |   5 +-
 .../api_fastapi/core_api/routes/public/monitor.py  |   4 +-
 airflow/ui/openapi-gen/requests/schemas.gen.ts     |  11 +-
 airflow/ui/openapi-gen/requests/types.gen.ts       |   2 +-
 airflow/ui/src/pages/Dashboard/Health/Health.tsx   |   5 +-
 tests/api/common/test_airflow_health.py            | 116 ++++++++++++++++++++-
 .../core_api/routes/public/test_monitor.py         |  52 +++++++++
 9 files changed, 205 insertions(+), 32 deletions(-)

diff --git a/airflow/api/common/airflow_health.py 
b/airflow/api/common/airflow_health.py
index 5d37de540a4..043557fa0db 100644
--- a/airflow/api/common/airflow_health.py
+++ b/airflow/api/common/airflow_health.py
@@ -18,6 +18,7 @@ from __future__ import annotations
 
 from typing import Any
 
+from airflow.configuration import conf
 from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner
 from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
 from airflow.jobs.triggerer_job_runner import TriggererJobRunner
@@ -28,13 +29,13 @@ UNHEALTHY = "unhealthy"
 
 def get_airflow_health() -> dict[str, Any]:
     """Get the health for Airflow metadatabase, scheduler and triggerer."""
+    dag_processor_enabled = conf.getboolean("scheduler", 
"standalone_dag_processor")
     metadatabase_status = HEALTHY
     latest_scheduler_heartbeat = None
     latest_triggerer_heartbeat = None
-    latest_dag_processor_heartbeat = None
+
     scheduler_status = UNHEALTHY
     triggerer_status: str | None = UNHEALTHY
-    dag_processor_status: str | None = UNHEALTHY
 
     try:
         latest_scheduler_job = SchedulerJobRunner.most_recent_job()
@@ -58,18 +59,6 @@ def get_airflow_health() -> dict[str, Any]:
     except Exception:
         metadatabase_status = UNHEALTHY
 
-    try:
-        latest_dag_processor_job = DagProcessorJobRunner.most_recent_job()
-
-        if latest_dag_processor_job:
-            latest_dag_processor_heartbeat = 
latest_dag_processor_job.latest_heartbeat.isoformat()
-            if latest_dag_processor_job.is_alive():
-                dag_processor_status = HEALTHY
-        else:
-            dag_processor_status = None
-    except Exception:
-        metadatabase_status = UNHEALTHY
-
     airflow_health_status = {
         "metadatabase": {"status": metadatabase_status},
         "scheduler": {
@@ -80,10 +69,27 @@ def get_airflow_health() -> dict[str, Any]:
             "status": triggerer_status,
             "latest_triggerer_heartbeat": latest_triggerer_heartbeat,
         },
-        "dag_processor": {
+    }
+
+    if dag_processor_enabled:
+        latest_dag_processor_heartbeat = None
+        dag_processor_status: str | None = UNHEALTHY
+
+        try:
+            latest_dag_processor_job = DagProcessorJobRunner.most_recent_job()
+
+            if latest_dag_processor_job:
+                latest_dag_processor_heartbeat = 
latest_dag_processor_job.latest_heartbeat.isoformat()
+                if latest_dag_processor_job.is_alive():
+                    dag_processor_status = HEALTHY
+            else:
+                dag_processor_status = None
+        except Exception:
+            metadatabase_status = UNHEALTHY
+
+        airflow_health_status["dag_processor"] = {
             "status": dag_processor_status,
             "latest_dag_processor_heartbeat": latest_dag_processor_heartbeat,
-        },
-    }
+        }
 
     return airflow_health_status
diff --git a/airflow/api_fastapi/core_api/datamodels/monitor.py 
b/airflow/api_fastapi/core_api/datamodels/monitor.py
index fbaf40b4e84..f4434034424 100644
--- a/airflow/api_fastapi/core_api/datamodels/monitor.py
+++ b/airflow/api_fastapi/core_api/datamodels/monitor.py
@@ -49,4 +49,4 @@ class HealthInfoSchema(BaseModel):
     metadatabase: BaseInfoSchema
     scheduler: SchedulerInfoSchema
     triggerer: TriggererInfoSchema
-    dag_processor: DagProcessorInfoSchema
+    dag_processor: DagProcessorInfoSchema | None = None
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml 
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index a38fa874c08..9832c3668fe 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -7586,13 +7586,14 @@ components:
         triggerer:
           $ref: '#/components/schemas/TriggererInfoSchema'
         dag_processor:
-          $ref: '#/components/schemas/DagProcessorInfoSchema'
+          anyOf:
+          - $ref: '#/components/schemas/DagProcessorInfoSchema'
+          - type: 'null'
       type: object
       required:
       - metadatabase
       - scheduler
       - triggerer
-      - dag_processor
       title: HealthInfoSchema
       description: Schema for the Health endpoint.
     HistoricalMetricDataResponse:
diff --git a/airflow/api_fastapi/core_api/routes/public/monitor.py 
b/airflow/api_fastapi/core_api/routes/public/monitor.py
index 38953a18b99..538bdcb3521 100644
--- a/airflow/api_fastapi/core_api/routes/public/monitor.py
+++ b/airflow/api_fastapi/core_api/routes/public/monitor.py
@@ -24,7 +24,7 @@ from airflow.api_fastapi.core_api.datamodels.monitor import 
HealthInfoSchema
 monitor_router = AirflowRouter(tags=["Monitor"], prefix="/monitor")
 
 
-@monitor_router.get("/health")
-def get_health() -> HealthInfoSchema:
+@monitor_router.get("/health", response_model=HealthInfoSchema, 
response_model_exclude_unset=True)
+def get_health():
     airflow_health_status = get_airflow_health()
     return HealthInfoSchema.model_validate(airflow_health_status)
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index b9289674a18..13e657d0083 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -2886,11 +2886,18 @@ export const $HealthInfoSchema = {
       $ref: "#/components/schemas/TriggererInfoSchema",
     },
     dag_processor: {
-      $ref: "#/components/schemas/DagProcessorInfoSchema",
+      anyOf: [
+        {
+          $ref: "#/components/schemas/DagProcessorInfoSchema",
+        },
+        {
+          type: "null",
+        },
+      ],
     },
   },
   type: "object",
-  required: ["metadatabase", "scheduler", "triggerer", "dag_processor"],
+  required: ["metadatabase", "scheduler", "triggerer"],
   title: "HealthInfoSchema",
   description: "Schema for the Health endpoint.",
 } as const;
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 89d8ab8a145..2035b4579fe 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -705,7 +705,7 @@ export type HealthInfoSchema = {
   metadatabase: BaseInfoSchema;
   scheduler: SchedulerInfoSchema;
   triggerer: TriggererInfoSchema;
-  dag_processor: DagProcessorInfoSchema;
+  dag_processor?: DagProcessorInfoSchema | null;
 };
 
 /**
diff --git a/airflow/ui/src/pages/Dashboard/Health/Health.tsx 
b/airflow/ui/src/pages/Dashboard/Health/Health.tsx
index 0694283930c..040f9a04969 100644
--- a/airflow/ui/src/pages/Dashboard/Health/Health.tsx
+++ b/airflow/ui/src/pages/Dashboard/Health/Health.tsx
@@ -54,15 +54,14 @@ export const Health = () => {
           status={data?.triggerer.status}
           title="Triggerer"
         />
-        {/* TODO: Update this to match the API when we move the config check 
to the API level */}
-        {data?.dag_processor.status === undefined ? undefined : (
+        {data?.dag_processor ? (
           <HealthTag
             isLoading={isLoading}
             latestHeartbeat={data.dag_processor.latest_dag_processor_heartbeat}
             status={data.dag_processor.status}
             title="Dag Processor"
           />
-        )}
+        ) : undefined}
       </HStack>
     </Box>
   );
diff --git a/tests/api/common/test_airflow_health.py 
b/tests/api/common/test_airflow_health.py
index ebdc086c692..06bd1a7bf4c 100644
--- a/tests/api/common/test_airflow_health.py
+++ b/tests/api/common/test_airflow_health.py
@@ -27,14 +27,19 @@ from airflow.api.common.airflow_health import (
     get_airflow_health,
 )
 
+from tests_common.test_utils.config import conf_vars
+
 pytestmark = pytest.mark.db_test
 
 
 @patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job", 
return_value=None)
 @patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", 
return_value=None)
 
@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job",
 return_value=None)
+@conf_vars({("scheduler", "standalone_dag_processor"): "True"})
 def test_get_airflow_health_only_metadatabase_healthy(
-    latest_scheduler_job_mock, latest_triggerer_job_mock, 
latest_dag_processor_job_mock
+    latest_scheduler_job_mock,
+    latest_triggerer_job_mock,
+    latest_dag_processor_job_mock,
 ):
     health_status = get_airflow_health()
     expected_status = {
@@ -50,8 +55,11 @@ def test_get_airflow_health_only_metadatabase_healthy(
 @patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job", 
return_value=Exception)
 @patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", 
return_value=Exception)
 
@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job",
 return_value=Exception)
+@conf_vars({("scheduler", "standalone_dag_processor"): "True"})
 def test_get_airflow_health_metadatabase_unhealthy(
-    latest_scheduler_job_mock, latest_triggerer_job_mock, 
latest_dag_processor_job_mock
+    latest_scheduler_job_mock,
+    latest_triggerer_job_mock,
+    latest_dag_processor_job_mock,
 ):
     health_status = get_airflow_health()
 
@@ -65,6 +73,45 @@ def test_get_airflow_health_metadatabase_unhealthy(
     assert health_status == expected_status
 
 
+@patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job", 
return_value=None)
+@patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", 
return_value=None)
+@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job",
 return_value=None)
+@conf_vars({("scheduler", "standalone_dag_processor"): "False"})
+def 
test_get_airflow_health_only_metadatabase_healthy_with_dag_processor_disabled(
+    latest_scheduler_job_mock,
+    latest_triggerer_job_mock,
+    latest_dag_processor_job_mock,
+):
+    health_status = get_airflow_health()
+    expected_status = {
+        "metadatabase": {"status": HEALTHY},
+        "scheduler": {"status": UNHEALTHY, "latest_scheduler_heartbeat": None},
+        "triggerer": {"status": None, "latest_triggerer_heartbeat": None},
+    }
+
+    assert health_status == expected_status
+
+
+@patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job", 
return_value=Exception)
+@patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", 
return_value=Exception)
+@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job",
 return_value=Exception)
+@conf_vars({("scheduler", "standalone_dag_processor"): "False"})
+def test_get_airflow_health_metadatabase_unhealthy_with_dag_processor_disabled(
+    latest_scheduler_job_mock,
+    latest_triggerer_job_mock,
+    latest_dag_processor_job_mock,
+):
+    health_status = get_airflow_health()
+
+    expected_status = {
+        "metadatabase": {"status": UNHEALTHY},
+        "scheduler": {"status": UNHEALTHY, "latest_scheduler_heartbeat": None},
+        "triggerer": {"status": UNHEALTHY, "latest_triggerer_heartbeat": None},
+    }
+
+    assert health_status == expected_status
+
+
 LATEST_SCHEDULER_JOB_MOCK = MagicMock()
 LATEST_SCHEDULER_JOB_MOCK.latest_heartbeat = datetime.now()
 LATEST_SCHEDULER_JOB_MOCK.is_alive = MagicMock(return_value=True)
@@ -76,8 +123,11 @@ LATEST_SCHEDULER_JOB_MOCK.is_alive = 
MagicMock(return_value=True)
 )
 @patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", 
return_value=None)
 
@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job",
 return_value=None)
+@conf_vars({("scheduler", "standalone_dag_processor"): "True"})
 def test_get_airflow_health_scheduler_healthy_no_triggerer(
-    latest_scheduler_job_mock, latest_triggerer_job_mock, 
latest_dag_processor_job_mock
+    latest_scheduler_job_mock,
+    latest_triggerer_job_mock,
+    latest_dag_processor_job_mock,
 ):
     health_status = get_airflow_health()
 
@@ -94,6 +144,32 @@ def test_get_airflow_health_scheduler_healthy_no_triggerer(
     assert health_status == expected_status
 
 
+@patch(
+    "airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job",
+    return_value=LATEST_SCHEDULER_JOB_MOCK,
+)
+@patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", 
return_value=None)
+@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job",
 return_value=None)
+@conf_vars({("scheduler", "standalone_dag_processor"): "False"})
+def 
test_get_airflow_health_scheduler_healthy_no_triggerer__with_dag_processor_disabled(
+    latest_scheduler_job_mock,
+    latest_triggerer_job_mock,
+    latest_dag_processor_job_mock,
+):
+    health_status = get_airflow_health()
+
+    expected_status = {
+        "metadatabase": {"status": HEALTHY},
+        "scheduler": {
+            "status": HEALTHY,
+            "latest_scheduler_heartbeat": 
LATEST_SCHEDULER_JOB_MOCK.latest_heartbeat.isoformat(),
+        },
+        "triggerer": {"status": None, "latest_triggerer_heartbeat": None},
+    }
+
+    assert health_status == expected_status
+
+
 LATEST_TRIGGERER_JOB_MOCK = MagicMock()
 LATEST_TRIGGERER_JOB_MOCK.latest_heartbeat = datetime.now()
 LATEST_TRIGGERER_JOB_MOCK.is_alive = MagicMock(return_value=True)
@@ -112,8 +188,11 @@ LATEST_DAG_PROCESSOR_JOB_MOCK.is_alive = 
MagicMock(return_value=True)
     "airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job",
     return_value=LATEST_DAG_PROCESSOR_JOB_MOCK,
 )
+@conf_vars({("scheduler", "standalone_dag_processor"): "True"})
 def test_get_airflow_health_triggerer_healthy_no_scheduler_job_record(
-    latest_scheduler_job_mock, latest_triggerer_job_mock, 
latest_dag_processor_job_mock
+    latest_scheduler_job_mock,
+    latest_triggerer_job_mock,
+    latest_dag_processor_job_mock,
 ):
     health_status = get_airflow_health()
 
@@ -131,3 +210,32 @@ def 
test_get_airflow_health_triggerer_healthy_no_scheduler_job_record(
     }
 
     assert health_status == expected_status
+
+
+@patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job", 
return_value=None)
+@patch(
+    "airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job",
+    return_value=LATEST_TRIGGERER_JOB_MOCK,
+)
+@patch(
+    "airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job",
+    return_value=LATEST_DAG_PROCESSOR_JOB_MOCK,
+)
+@conf_vars({("scheduler", "standalone_dag_processor"): "False"})
+def 
test_get_airflow_health_triggerer_healthy_no_scheduler_job_record_with_dag_processor_disabled(
+    latest_scheduler_job_mock,
+    latest_triggerer_job_mock,
+    latest_dag_processor_job_mock,
+):
+    health_status = get_airflow_health()
+
+    expected_status = {
+        "metadatabase": {"status": HEALTHY},
+        "scheduler": {"status": UNHEALTHY, "latest_scheduler_heartbeat": None},
+        "triggerer": {
+            "status": HEALTHY,
+            "latest_triggerer_heartbeat": 
LATEST_TRIGGERER_JOB_MOCK.latest_heartbeat.isoformat(),
+        },
+    }
+
+    assert health_status == expected_status
diff --git a/tests/api_fastapi/core_api/routes/public/test_monitor.py 
b/tests/api_fastapi/core_api/routes/public/test_monitor.py
index d736291180a..d34031f8be9 100644
--- a/tests/api_fastapi/core_api/routes/public/test_monitor.py
+++ b/tests/api_fastapi/core_api/routes/public/test_monitor.py
@@ -103,3 +103,55 @@ class TestGetHealth(TestMonitorEndpoint):
 
         assert body["metadatabase"]["status"] == "unhealthy"
         assert body["scheduler"]["latest_scheduler_heartbeat"] is None
+
+    
@mock.patch("airflow.api_fastapi.core_api.routes.public.monitor.get_airflow_health")
+    def test_health_with_dag_processor(self, mock_get_airflow_health, 
test_client):
+        mock_get_airflow_health.return_value = {
+            "metadatabase": {"status": HEALTHY},
+            "scheduler": {
+                "status": HEALTHY,
+                "latest_scheduler_heartbeat": 
"2024-11-23T11:09:16.663124+00:00",
+            },
+            "triggerer": {
+                "status": HEALTHY,
+                "latest_triggerer_heartbeat": 
"2024-11-23T11:09:15.815483+00:00",
+            },
+            "dag_processor": {
+                "status": HEALTHY,
+                "latest_dag_processor_heartbeat": 
"2024-11-23T11:09:15.815483+00:00",
+            },
+        }
+
+        response = test_client.get("/public/monitor/health")
+
+        assert response.status_code == 200
+        body = response.json()
+
+        assert "dag_processor" in body
+        assert body["metadatabase"]["status"] == HEALTHY
+        assert body["scheduler"]["status"] == HEALTHY
+        assert body["triggerer"]["status"] == HEALTHY
+
+    
@mock.patch("airflow.api_fastapi.core_api.routes.public.monitor.get_airflow_health")
+    def test_health_without_dag_processor(self, mock_get_airflow_health, 
test_client):
+        mock_get_airflow_health.return_value = {
+            "metadatabase": {"status": HEALTHY},
+            "scheduler": {
+                "status": HEALTHY,
+                "latest_scheduler_heartbeat": 
"2024-11-23T11:09:16.663124+00:00",
+            },
+            "triggerer": {
+                "status": HEALTHY,
+                "latest_triggerer_heartbeat": 
"2024-11-23T11:09:15.815483+00:00",
+            },
+        }
+
+        response = test_client.get("/public/monitor/health")
+
+        assert response.status_code == 200
+        body = response.json()
+
+        assert "dag_processor" not in body
+        assert body["metadatabase"]["status"] == HEALTHY
+        assert body["scheduler"]["status"] == HEALTHY
+        assert body["triggerer"]["status"] == HEALTHY

Reply via email to