This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 864b1782697 Tests for message type consistency between various 
supervisor and task comms (#55665)
864b1782697 is described below

commit 864b1782697bba631030c250ef9870645e34fa3c
Author: Amogh Desai <[email protected]>
AuthorDate: Wed Sep 24 18:14:26 2025 +0530

    Tests for message type consistency between various supervisor and task 
comms (#55665)
---
 .../tests/unit/dag_processing/test_processor.py    | 84 ++++++++++++++++++++++
 airflow-core/tests/unit/jobs/test_triggerer_job.py | 83 +++++++++++++++++++++
 2 files changed, 167 insertions(+)

diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py 
b/airflow-core/tests/unit/dag_processing/test_processor.py
index 5d3d6622d69..a90d9a0e30c 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -21,6 +21,7 @@ import inspect
 import pathlib
 import sys
 import textwrap
+import typing
 import uuid
 from collections.abc import Callable
 from socket import socketpair
@@ -53,6 +54,8 @@ from airflow.dag_processing.processor import (
     DagFileParseRequest,
     DagFileParsingResult,
     DagFileProcessorProcess,
+    ToDagProcessor,
+    ToManager,
     _execute_dag_callbacks,
     _execute_email_callbacks,
     _execute_task_callbacks,
@@ -67,6 +70,8 @@ from airflow.sdk.execution_time import comms
 from airflow.sdk.execution_time.comms import (
     GetXCom,
     GetXComSequenceSlice,
+    ToSupervisor,
+    ToTask,
     XComResult,
     XComSequenceSliceResult,
 )
@@ -1511,3 +1516,82 @@ class TestExecuteEmailCallbacks:
         log.info.assert_called_once()
         info_call = log.info.call_args[0][0]
         assert "Email not sent - task configured with email_on_" in info_call
+
+
+class TestDagProcessingMessageTypes:
+    def test_message_types_in_dag_processor(self):
+        """
+        Test that ToSupervisor is a superset of ToManager and ToTask is a 
superset of ToDagProcessor.
+
+        This test ensures that when new message types are added to 
ToSupervisor or ToTask,
+        they are also properly handled in ToManager and ToDagProcessor.
+        """
+
+        def get_type_names(union_type):
+            union_args = typing.get_args(union_type.__args__[0])
+            return {arg.__name__ for arg in union_args}
+
+        supervisor_types = get_type_names(ToSupervisor)
+        task_types = get_type_names(ToTask)
+
+        manager_types = get_type_names(ToManager)
+        dag_processor_types = get_type_names(ToDagProcessor)
+
+        in_supervisor_but_not_in_manager = {
+            "DeferTask",
+            "DeleteXCom",
+            "GetAssetByName",
+            "GetAssetByUri",
+            "GetAssetEventByAsset",
+            "GetAssetEventByAssetAlias",
+            "GetDagRunState",
+            "GetDRCount",
+            "GetTaskRescheduleStartDate",
+            "GetTICount",
+            "GetTaskStates",
+            "RescheduleTask",
+            "RetryTask",
+            "SetRenderedFields",
+            "SetXCom",
+            "SkipDownstreamTasks",
+            "SucceedTask",
+            "ValidateInletsAndOutlets",
+            "TaskState",
+            "TriggerDagRun",
+            "ResendLoggingFD",
+            "CreateHITLDetailPayload",
+            "UpdateHITLDetail",
+            "GetHITLDetailResponse",
+        }
+
+        in_task_runner_but_not_in_dag_processing_process = {
+            "AssetResult",
+            "AssetEventsResult",
+            "DagRunStateResult",
+            "DRCount",
+            "SentFDs",
+            "StartupDetails",
+            "TaskRescheduleStartDate",
+            "TICount",
+            "TaskStatesResult",
+            "InactiveAssetsResult",
+            "CreateHITLDetailPayload",
+            "HITLDetailRequestResult",
+        }
+
+        supervisor_diff = supervisor_types - manager_types - 
in_supervisor_but_not_in_manager
+        task_diff = task_types - dag_processor_types - 
in_task_runner_but_not_in_dag_processing_process
+
+        assert not supervisor_diff, (
+            f"New message types in ToSupervisor not handled in ToManager: "
+            f"{len(supervisor_diff)} types found:\n"
+            + "\n".join(f"  - {t}" for t in sorted(supervisor_diff))
+            + "\n\nEither handle these types in ToManager or update 
in_supervisor_but_not_in_manager list."
+        )
+
+        assert not task_diff, (
+            f"New message types in ToTask not handled in ToDagProcessor: "
+            f"{len(task_diff)} types found:\n"
+            + "\n".join(f"  - {t}" for t in sorted(task_diff))
+            + "\n\nEither handle these types in ToDagProcessor or update 
in_task_runner_but_not_in_dag_processing_process list."
+        )
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py 
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index a12969ce79e..1e6eb1b9d6f 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -23,6 +23,7 @@ import itertools
 import os
 import selectors
 import time
+import typing
 from collections.abc import AsyncIterator
 from socket import socket
 from typing import TYPE_CHECKING, Any
@@ -37,6 +38,8 @@ from airflow._shared.timezones import timezone
 from airflow.executors import workloads
 from airflow.jobs.job import Job
 from airflow.jobs.triggerer_job_runner import (
+    ToTriggerRunner,
+    ToTriggerSupervisor,
     TriggerCommsDecoder,
     TriggererJobRunner,
     TriggerRunner,
@@ -56,6 +59,7 @@ from airflow.providers.standard.operators.python import 
PythonOperator
 from airflow.providers.standard.triggers.file import FileDeleteTrigger
 from airflow.providers.standard.triggers.temporal import DateTimeTrigger, 
TimeDeltaTrigger
 from airflow.sdk import BaseHook, BaseOperator
+from airflow.sdk.execution_time.comms import ToSupervisor, ToTask
 from airflow.serialization.serialized_objects import LazyDeserializedDAG
 from airflow.triggers.base import BaseTrigger, TriggerEvent
 from airflow.triggers.testing import FailureTrigger, SuccessTrigger
@@ -1134,3 +1138,82 @@ def 
test_update_triggers_prevents_duplicate_creation_queue_entries_with_multiple
     trigger_ids = {trigger.id for trigger in supervisor.creating_triggers}
     assert trigger_orm1.id in trigger_ids
     assert trigger_orm2.id in trigger_ids
+
+
+class TestTriggererMessageTypes:
+    def test_message_types_in_triggerer(self):
+        """
+        Test that ToSupervisor is a superset of ToTriggerSupervisor and ToTask 
is a superset of ToTriggerRunner.
+
+        This test ensures that when new message types are added to 
ToSupervisor or ToTask,
+        they are also properly handled in ToTriggerSupervisor and 
ToTriggerSupervisor.
+        """
+
+        def get_type_names(union_type):
+            union_args = typing.get_args(union_type.__args__[0])
+            return {arg.__name__ for arg in union_args}
+
+        supervisor_types = get_type_names(ToSupervisor)
+        task_types = get_type_names(ToTask)
+
+        trigger_supervisor_types = get_type_names(ToTriggerSupervisor)
+        trigger_runner_types = get_type_names(ToTriggerRunner)
+
+        in_supervisor_but_not_in_trigger_supervisor = {
+            "DeferTask",
+            "GetAssetByName",
+            "GetAssetByUri",
+            "GetAssetEventByAsset",
+            "GetAssetEventByAssetAlias",
+            "GetPrevSuccessfulDagRun",
+            "GetPreviousDagRun",
+            "GetTaskRescheduleStartDate",
+            "GetXComCount",
+            "GetXComSequenceItem",
+            "GetXComSequenceSlice",
+            "RescheduleTask",
+            "RetryTask",
+            "SetRenderedFields",
+            "SkipDownstreamTasks",
+            "SucceedTask",
+            "ValidateInletsAndOutlets",
+            "TaskState",
+            "TriggerDagRun",
+            "ResendLoggingFD",
+            "CreateHITLDetailPayload",
+        }
+
+        in_task_but_not_in_trigger_runner = {
+            "AssetResult",
+            "AssetEventsResult",
+            "SentFDs",
+            "StartupDetails",
+            "TaskRescheduleStartDate",
+            "InactiveAssetsResult",
+            "CreateHITLDetailPayload",
+            "PrevSuccessfulDagRunResult",
+            "XComCountResponse",
+            "XComSequenceIndexResult",
+            "XComSequenceSliceResult",
+            "PreviousDagRunResult",
+            "HITLDetailRequestResult",
+        }
+
+        supervisor_diff = (
+            supervisor_types - trigger_supervisor_types - 
in_supervisor_but_not_in_trigger_supervisor
+        )
+        task_diff = task_types - trigger_runner_types - 
in_task_but_not_in_trigger_runner
+
+        assert not supervisor_diff, (
+            f"New message types in ToSupervisor not handled in 
ToTriggerSupervisor: "
+            f"{len(supervisor_diff)} types found:\n"
+            + "\n".join(f"  - {t}" for t in sorted(supervisor_diff))
+            + "\n\nEither handle these types in ToTriggerSupervisor or update 
in_supervisor_but_not_in_trigger_supervisor list."
+        )
+
+        assert not task_diff, (
+            f"New message types in ToTask not handled in ToTriggerRunner: "
+            f"{len(task_diff)} types found:\n"
+            + "\n".join(f"  - {t}" for t in sorted(task_diff))
+            + "\n\nEither handle these types in ToTriggerRunner or update 
in_task_but_not_in_trigger_runner list."
+        )

Reply via email to