This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 864b1782697 Tests for message type consistency between various
supervisor and task comms (#55665)
864b1782697 is described below
commit 864b1782697bba631030c250ef9870645e34fa3c
Author: Amogh Desai <[email protected]>
AuthorDate: Wed Sep 24 18:14:26 2025 +0530
Tests for message type consistency between various supervisor and task
comms (#55665)
---
.../tests/unit/dag_processing/test_processor.py | 84 ++++++++++++++++++++++
airflow-core/tests/unit/jobs/test_triggerer_job.py | 83 +++++++++++++++++++++
2 files changed, 167 insertions(+)
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py
b/airflow-core/tests/unit/dag_processing/test_processor.py
index 5d3d6622d69..a90d9a0e30c 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -21,6 +21,7 @@ import inspect
import pathlib
import sys
import textwrap
+import typing
import uuid
from collections.abc import Callable
from socket import socketpair
@@ -53,6 +54,8 @@ from airflow.dag_processing.processor import (
DagFileParseRequest,
DagFileParsingResult,
DagFileProcessorProcess,
+ ToDagProcessor,
+ ToManager,
_execute_dag_callbacks,
_execute_email_callbacks,
_execute_task_callbacks,
@@ -67,6 +70,8 @@ from airflow.sdk.execution_time import comms
from airflow.sdk.execution_time.comms import (
GetXCom,
GetXComSequenceSlice,
+ ToSupervisor,
+ ToTask,
XComResult,
XComSequenceSliceResult,
)
@@ -1511,3 +1516,82 @@ class TestExecuteEmailCallbacks:
log.info.assert_called_once()
info_call = log.info.call_args[0][0]
assert "Email not sent - task configured with email_on_" in info_call
+
+
+class TestDagProcessingMessageTypes:
+ def test_message_types_in_dag_processor(self):
+ """
+ Test that ToSupervisor is a superset of ToManager and ToTask is a
superset of ToDagProcessor.
+
+ This test ensures that when new message types are added to
ToSupervisor or ToTask,
+ they are also properly handled in ToManager and ToDagProcessor.
+ """
+
+ def get_type_names(union_type):
+ union_args = typing.get_args(union_type.__args__[0])
+ return {arg.__name__ for arg in union_args}
+
+ supervisor_types = get_type_names(ToSupervisor)
+ task_types = get_type_names(ToTask)
+
+ manager_types = get_type_names(ToManager)
+ dag_processor_types = get_type_names(ToDagProcessor)
+
+ in_supervisor_but_not_in_manager = {
+ "DeferTask",
+ "DeleteXCom",
+ "GetAssetByName",
+ "GetAssetByUri",
+ "GetAssetEventByAsset",
+ "GetAssetEventByAssetAlias",
+ "GetDagRunState",
+ "GetDRCount",
+ "GetTaskRescheduleStartDate",
+ "GetTICount",
+ "GetTaskStates",
+ "RescheduleTask",
+ "RetryTask",
+ "SetRenderedFields",
+ "SetXCom",
+ "SkipDownstreamTasks",
+ "SucceedTask",
+ "ValidateInletsAndOutlets",
+ "TaskState",
+ "TriggerDagRun",
+ "ResendLoggingFD",
+ "CreateHITLDetailPayload",
+ "UpdateHITLDetail",
+ "GetHITLDetailResponse",
+ }
+
+ in_task_runner_but_not_in_dag_processing_process = {
+ "AssetResult",
+ "AssetEventsResult",
+ "DagRunStateResult",
+ "DRCount",
+ "SentFDs",
+ "StartupDetails",
+ "TaskRescheduleStartDate",
+ "TICount",
+ "TaskStatesResult",
+ "InactiveAssetsResult",
+ "CreateHITLDetailPayload",
+ "HITLDetailRequestResult",
+ }
+
+ supervisor_diff = supervisor_types - manager_types -
in_supervisor_but_not_in_manager
+ task_diff = task_types - dag_processor_types -
in_task_runner_but_not_in_dag_processing_process
+
+ assert not supervisor_diff, (
+ f"New message types in ToSupervisor not handled in ToManager: "
+ f"{len(supervisor_diff)} types found:\n"
+ + "\n".join(f" - {t}" for t in sorted(supervisor_diff))
+ + "\n\nEither handle these types in ToManager or update
in_supervisor_but_not_in_manager list."
+ )
+
+ assert not task_diff, (
+ f"New message types in ToTask not handled in ToDagProcessor: "
+ f"{len(task_diff)} types found:\n"
+ + "\n".join(f" - {t}" for t in sorted(task_diff))
+ + "\n\nEither handle these types in ToDagProcessor or update
in_task_runner_but_not_in_dag_processing_process list."
+ )
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index a12969ce79e..1e6eb1b9d6f 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -23,6 +23,7 @@ import itertools
import os
import selectors
import time
+import typing
from collections.abc import AsyncIterator
from socket import socket
from typing import TYPE_CHECKING, Any
@@ -37,6 +38,8 @@ from airflow._shared.timezones import timezone
from airflow.executors import workloads
from airflow.jobs.job import Job
from airflow.jobs.triggerer_job_runner import (
+ ToTriggerRunner,
+ ToTriggerSupervisor,
TriggerCommsDecoder,
TriggererJobRunner,
TriggerRunner,
@@ -56,6 +59,7 @@ from airflow.providers.standard.operators.python import
PythonOperator
from airflow.providers.standard.triggers.file import FileDeleteTrigger
from airflow.providers.standard.triggers.temporal import DateTimeTrigger,
TimeDeltaTrigger
from airflow.sdk import BaseHook, BaseOperator
+from airflow.sdk.execution_time.comms import ToSupervisor, ToTask
from airflow.serialization.serialized_objects import LazyDeserializedDAG
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.triggers.testing import FailureTrigger, SuccessTrigger
@@ -1134,3 +1138,82 @@ def
test_update_triggers_prevents_duplicate_creation_queue_entries_with_multiple
trigger_ids = {trigger.id for trigger in supervisor.creating_triggers}
assert trigger_orm1.id in trigger_ids
assert trigger_orm2.id in trigger_ids
+
+
+class TestTriggererMessageTypes:
+ def test_message_types_in_triggerer(self):
+ """
+ Test that ToSupervisor is a superset of ToTriggerSupervisor and ToTask
is a superset of ToTriggerRunner.
+
+ This test ensures that when new message types are added to
ToSupervisor or ToTask,
+ they are also properly handled in ToTriggerSupervisor and
ToTriggerSupervisor.
+ """
+
+ def get_type_names(union_type):
+ union_args = typing.get_args(union_type.__args__[0])
+ return {arg.__name__ for arg in union_args}
+
+ supervisor_types = get_type_names(ToSupervisor)
+ task_types = get_type_names(ToTask)
+
+ trigger_supervisor_types = get_type_names(ToTriggerSupervisor)
+ trigger_runner_types = get_type_names(ToTriggerRunner)
+
+ in_supervisor_but_not_in_trigger_supervisor = {
+ "DeferTask",
+ "GetAssetByName",
+ "GetAssetByUri",
+ "GetAssetEventByAsset",
+ "GetAssetEventByAssetAlias",
+ "GetPrevSuccessfulDagRun",
+ "GetPreviousDagRun",
+ "GetTaskRescheduleStartDate",
+ "GetXComCount",
+ "GetXComSequenceItem",
+ "GetXComSequenceSlice",
+ "RescheduleTask",
+ "RetryTask",
+ "SetRenderedFields",
+ "SkipDownstreamTasks",
+ "SucceedTask",
+ "ValidateInletsAndOutlets",
+ "TaskState",
+ "TriggerDagRun",
+ "ResendLoggingFD",
+ "CreateHITLDetailPayload",
+ }
+
+ in_task_but_not_in_trigger_runner = {
+ "AssetResult",
+ "AssetEventsResult",
+ "SentFDs",
+ "StartupDetails",
+ "TaskRescheduleStartDate",
+ "InactiveAssetsResult",
+ "CreateHITLDetailPayload",
+ "PrevSuccessfulDagRunResult",
+ "XComCountResponse",
+ "XComSequenceIndexResult",
+ "XComSequenceSliceResult",
+ "PreviousDagRunResult",
+ "HITLDetailRequestResult",
+ }
+
+ supervisor_diff = (
+ supervisor_types - trigger_supervisor_types -
in_supervisor_but_not_in_trigger_supervisor
+ )
+ task_diff = task_types - trigger_runner_types -
in_task_but_not_in_trigger_runner
+
+ assert not supervisor_diff, (
+ f"New message types in ToSupervisor not handled in
ToTriggerSupervisor: "
+ f"{len(supervisor_diff)} types found:\n"
+ + "\n".join(f" - {t}" for t in sorted(supervisor_diff))
+ + "\n\nEither handle these types in ToTriggerSupervisor or update
in_supervisor_but_not_in_trigger_supervisor list."
+ )
+
+ assert not task_diff, (
+ f"New message types in ToTask not handled in ToTriggerRunner: "
+ f"{len(task_diff)} types found:\n"
+ + "\n".join(f" - {t}" for t in sorted(task_diff))
+ + "\n\nEither handle these types in ToTriggerRunner or update
in_task_but_not_in_trigger_runner list."
+ )