This is an automated email from the ASF dual-hosted git repository. amoghdesai pushed a commit to branch backport-864b178-v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 2550d9969a309c9319abe8f8e2d039276d109401 Author: Amogh Desai <[email protected]> AuthorDate: Wed Sep 24 18:14:26 2025 +0530 [v3-1-test] Tests for message type consistency between various supervisor and task comms (#55665) (cherry picked from commit 864b1782697bba631030c250ef9870645e34fa3c) Co-authored-by: Amogh Desai <[email protected]> --- .../tests/unit/dag_processing/test_processor.py | 84 ++++++++++++++++++++++ airflow-core/tests/unit/jobs/test_triggerer_job.py | 83 +++++++++++++++++++++ 2 files changed, 167 insertions(+) diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py b/airflow-core/tests/unit/dag_processing/test_processor.py index bde6a631c31..f3e6064a4f8 100644 --- a/airflow-core/tests/unit/dag_processing/test_processor.py +++ b/airflow-core/tests/unit/dag_processing/test_processor.py @@ -21,6 +21,7 @@ import inspect import pathlib import sys import textwrap +import typing import uuid from collections.abc import Callable from socket import socketpair @@ -52,6 +53,8 @@ from airflow.dag_processing.processor import ( DagFileParseRequest, DagFileParsingResult, DagFileProcessorProcess, + ToDagProcessor, + ToManager, _execute_dag_callbacks, _execute_email_callbacks, _execute_task_callbacks, @@ -66,6 +69,8 @@ from airflow.sdk.execution_time import comms from airflow.sdk.execution_time.comms import ( GetXCom, GetXComSequenceSlice, + ToSupervisor, + ToTask, XComResult, XComSequenceSliceResult, ) @@ -1510,3 +1515,82 @@ class TestExecuteEmailCallbacks: log.info.assert_called_once() info_call = log.info.call_args[0][0] assert "Email not sent - task configured with email_on_" in info_call + + +class TestDagProcessingMessageTypes: + def test_message_types_in_dag_processor(self): + """ + Test that ToSupervisor is a superset of ToManager and ToTask is a superset of ToDagProcessor. + + This test ensures that when new message types are added to ToSupervisor or ToTask, + they are also properly handled in ToManager and ToDagProcessor. + """ + + def get_type_names(union_type): + union_args = typing.get_args(union_type.__args__[0]) + return {arg.__name__ for arg in union_args} + + supervisor_types = get_type_names(ToSupervisor) + task_types = get_type_names(ToTask) + + manager_types = get_type_names(ToManager) + dag_processor_types = get_type_names(ToDagProcessor) + + in_supervisor_but_not_in_manager = { + "DeferTask", + "DeleteXCom", + "GetAssetByName", + "GetAssetByUri", + "GetAssetEventByAsset", + "GetAssetEventByAssetAlias", + "GetDagRunState", + "GetDRCount", + "GetTaskRescheduleStartDate", + "GetTICount", + "GetTaskStates", + "RescheduleTask", + "RetryTask", + "SetRenderedFields", + "SetXCom", + "SkipDownstreamTasks", + "SucceedTask", + "ValidateInletsAndOutlets", + "TaskState", + "TriggerDagRun", + "ResendLoggingFD", + "CreateHITLDetailPayload", + "UpdateHITLDetail", + "GetHITLDetailResponse", + } + + in_task_runner_but_not_in_dag_processing_process = { + "AssetResult", + "AssetEventsResult", + "DagRunStateResult", + "DRCount", + "SentFDs", + "StartupDetails", + "TaskRescheduleStartDate", + "TICount", + "TaskStatesResult", + "InactiveAssetsResult", + "CreateHITLDetailPayload", + "HITLDetailRequestResult", + } + + supervisor_diff = supervisor_types - manager_types - in_supervisor_but_not_in_manager + task_diff = task_types - dag_processor_types - in_task_runner_but_not_in_dag_processing_process + + assert not supervisor_diff, ( + f"New message types in ToSupervisor not handled in ToManager: " + f"{len(supervisor_diff)} types found:\n" + + "\n".join(f" - {t}" for t in sorted(supervisor_diff)) + + "\n\nEither handle these types in ToManager or update in_supervisor_but_not_in_manager list." + ) + + assert not task_diff, ( + f"New message types in ToTask not handled in ToDagProcessor: " + f"{len(task_diff)} types found:\n" + + "\n".join(f" - {t}" for t in sorted(task_diff)) + + "\n\nEither handle these types in ToDagProcessor or update in_task_runner_but_not_in_dag_processing_process list." + ) diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py b/airflow-core/tests/unit/jobs/test_triggerer_job.py index a12969ce79e..1e6eb1b9d6f 100644 --- a/airflow-core/tests/unit/jobs/test_triggerer_job.py +++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py @@ -23,6 +23,7 @@ import itertools import os import selectors import time +import typing from collections.abc import AsyncIterator from socket import socket from typing import TYPE_CHECKING, Any @@ -37,6 +38,8 @@ from airflow._shared.timezones import timezone from airflow.executors import workloads from airflow.jobs.job import Job from airflow.jobs.triggerer_job_runner import ( + ToTriggerRunner, + ToTriggerSupervisor, TriggerCommsDecoder, TriggererJobRunner, TriggerRunner, @@ -56,6 +59,7 @@ from airflow.providers.standard.operators.python import PythonOperator from airflow.providers.standard.triggers.file import FileDeleteTrigger from airflow.providers.standard.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.sdk import BaseHook, BaseOperator +from airflow.sdk.execution_time.comms import ToSupervisor, ToTask from airflow.serialization.serialized_objects import LazyDeserializedDAG from airflow.triggers.base import BaseTrigger, TriggerEvent from airflow.triggers.testing import FailureTrigger, SuccessTrigger @@ -1134,3 +1138,82 @@ def test_update_triggers_prevents_duplicate_creation_queue_entries_with_multiple trigger_ids = {trigger.id for trigger in supervisor.creating_triggers} assert trigger_orm1.id in trigger_ids assert trigger_orm2.id in trigger_ids + + +class TestTriggererMessageTypes: + def test_message_types_in_triggerer(self): + """ + Test that ToSupervisor is a superset of ToTriggerSupervisor and ToTask is a superset of ToTriggerRunner. + + This test ensures that when new message types are added to ToSupervisor or ToTask, + they are also properly handled in ToTriggerSupervisor and ToTriggerSupervisor. + """ + + def get_type_names(union_type): + union_args = typing.get_args(union_type.__args__[0]) + return {arg.__name__ for arg in union_args} + + supervisor_types = get_type_names(ToSupervisor) + task_types = get_type_names(ToTask) + + trigger_supervisor_types = get_type_names(ToTriggerSupervisor) + trigger_runner_types = get_type_names(ToTriggerRunner) + + in_supervisor_but_not_in_trigger_supervisor = { + "DeferTask", + "GetAssetByName", + "GetAssetByUri", + "GetAssetEventByAsset", + "GetAssetEventByAssetAlias", + "GetPrevSuccessfulDagRun", + "GetPreviousDagRun", + "GetTaskRescheduleStartDate", + "GetXComCount", + "GetXComSequenceItem", + "GetXComSequenceSlice", + "RescheduleTask", + "RetryTask", + "SetRenderedFields", + "SkipDownstreamTasks", + "SucceedTask", + "ValidateInletsAndOutlets", + "TaskState", + "TriggerDagRun", + "ResendLoggingFD", + "CreateHITLDetailPayload", + } + + in_task_but_not_in_trigger_runner = { + "AssetResult", + "AssetEventsResult", + "SentFDs", + "StartupDetails", + "TaskRescheduleStartDate", + "InactiveAssetsResult", + "CreateHITLDetailPayload", + "PrevSuccessfulDagRunResult", + "XComCountResponse", + "XComSequenceIndexResult", + "XComSequenceSliceResult", + "PreviousDagRunResult", + "HITLDetailRequestResult", + } + + supervisor_diff = ( + supervisor_types - trigger_supervisor_types - in_supervisor_but_not_in_trigger_supervisor + ) + task_diff = task_types - trigger_runner_types - in_task_but_not_in_trigger_runner + + assert not supervisor_diff, ( + f"New message types in ToSupervisor not handled in ToTriggerSupervisor: " + f"{len(supervisor_diff)} types found:\n" + + "\n".join(f" - {t}" for t in sorted(supervisor_diff)) + + "\n\nEither handle these types in ToTriggerSupervisor or update in_supervisor_but_not_in_trigger_supervisor list." + ) + + assert not task_diff, ( + f"New message types in ToTask not handled in ToTriggerRunner: " + f"{len(task_diff)} types found:\n" + + "\n".join(f" - {t}" for t in sorted(task_diff)) + + "\n\nEither handle these types in ToTriggerRunner or update in_task_but_not_in_trigger_runner list." + )
