This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7cbb6727e9c Decouple ``NotMapped`` exception from Task SDK (#55108)
7cbb6727e9c is described below

commit 7cbb6727e9c08f0cc5cf1646018165ab7f9dccda
Author: Kaxil Naik <[email protected]>
AuthorDate: Mon Sep 1 22:54:53 2025 +0100

    Decouple ``NotMapped`` exception from Task SDK (#55108)
    
    Move NotMapped exception from `airflow.sdk` to `airflow.exceptions `to 
eliminate
    SDK dependency in airflow-core. This enables independent deployment of
    airflow-core without requiring Task SDK modules.
    
    This supports the broader SDK decoupling architecture where airflow-core
    can operate independently of Task SDK implementations. (part of 
https://github.com/apache/airflow/issues/52141)
---
 airflow-core/src/airflow/exceptions.py             |  4 ++++
 airflow-core/src/airflow/models/dagrun.py          |  3 +--
 airflow-core/src/airflow/models/mappedoperator.py  |  5 +----
 airflow-core/src/airflow/models/taskinstance.py    |  2 +-
 .../airflow/serialization/serialized_objects.py    |  2 +-
 .../src/airflow/ti_deps/deps/trigger_rule_dep.py   |  2 +-
 airflow-core/tests/unit/models/test_dagrun.py      | 24 ++++++++++++----------
 7 files changed, 22 insertions(+), 20 deletions(-)

diff --git a/airflow-core/src/airflow/exceptions.py 
b/airflow-core/src/airflow/exceptions.py
index 150bbc3ffb3..95da650a8ab 100644
--- a/airflow-core/src/airflow/exceptions.py
+++ b/airflow-core/src/airflow/exceptions.py
@@ -276,6 +276,10 @@ class TaskInstanceNotFound(AirflowNotFoundException):
     """Raise when a task instance is not available in the system."""
 
 
+class NotMapped(Exception):
+    """Raise if a task is neither mapped nor has any parent mapped groups."""
+
+
 class PoolNotFound(AirflowNotFoundException):
     """Raise when a Pool is not available in the system."""
 
diff --git a/airflow-core/src/airflow/models/dagrun.py 
b/airflow-core/src/airflow/models/dagrun.py
index 2b79da56423..bc781271c89 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -58,7 +58,7 @@ from sqlalchemy_utils import UUIDType
 from airflow._shared.timezones import timezone
 from airflow.callbacks.callback_requests import DagCallbackRequest, 
DagRunContext
 from airflow.configuration import conf as airflow_conf
-from airflow.exceptions import AirflowException, TaskNotFound
+from airflow.exceptions import AirflowException, NotMapped, TaskNotFound
 from airflow.listeners.listener import get_listener_manager
 from airflow.models import Deadline, Log
 from airflow.models.backfill import Backfill
@@ -67,7 +67,6 @@ from airflow.models.taskinstance import TaskInstance as TI
 from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
 from airflow.models.tasklog import LogTemplate
 from airflow.models.taskmap import TaskMap
-from airflow.sdk.definitions._internal.abstractoperator import NotMapped
 from airflow.sdk.definitions.deadline import DeadlineReference
 from airflow.stats import Stats
 from airflow.ti_deps.dep_context import DepContext
diff --git a/airflow-core/src/airflow/models/mappedoperator.py 
b/airflow-core/src/airflow/models/mappedoperator.py
index 6e086beeaaa..3158901e907 100644
--- a/airflow-core/src/airflow/models/mappedoperator.py
+++ b/airflow-core/src/airflow/models/mappedoperator.py
@@ -27,11 +27,8 @@ import methodtools
 import structlog
 from sqlalchemy.orm import Session
 
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, NotMapped
 from airflow.sdk import BaseOperator as TaskSDKBaseOperator
-from airflow.sdk.definitions._internal.abstractoperator import (
-    NotMapped,
-)
 from airflow.sdk.definitions._internal.node import DAGNode
 from airflow.sdk.definitions.mappedoperator import MappedOperator as 
TaskSDKMappedOperator
 from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup
diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index e81b1691f70..bc940c18163 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -1741,13 +1741,13 @@ class TaskInstance(Base, LoggingMixin):
         if not session:
             session = settings.Session()
 
+        from airflow.exceptions import NotMapped
         from airflow.models.mappedoperator import get_mapped_ti_count
         from airflow.sdk.api.datamodels._generated import (
             DagRun as DagRunSDK,
             PrevSuccessfulDagRunResponse,
             TIRunContext,
         )
-        from airflow.sdk.definitions._internal.abstractoperator import 
NotMapped
         from airflow.sdk.definitions.param import process_params
         from airflow.sdk.execution_time.context import InletEventsAccessors
         from airflow.utils.context import (
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py 
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 0a33a1ecf25..fa11f7b95df 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -2238,7 +2238,7 @@ class SerializedBaseOperator(DAGNode, BaseSerialization):
             mapped task groups.
         :return: Total number of mapped TIs this task should have.
         """
-        from airflow.sdk.definitions._internal.abstractoperator import 
NotMapped
+        from airflow.exceptions import NotMapped
 
         group = self.get_closest_mapped_task_group()
         if group is None:
diff --git a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py 
b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
index ea53a582dcf..1298518b96e 100644
--- a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -129,9 +129,9 @@ class TriggerRuleDep(BaseTIDep):
         :param dep_context: The current dependency context.
         :param session: Database session.
         """
+        from airflow.exceptions import NotMapped
         from airflow.models.expandinput import NotFullyPopulated
         from airflow.models.taskinstance import TaskInstance
-        from airflow.sdk.definitions._internal.abstractoperator import 
NotMapped
 
         @functools.lru_cache
         def _get_expanded_ti_count() -> int:
diff --git a/airflow-core/tests/unit/models/test_dagrun.py 
b/airflow-core/tests/unit/models/test_dagrun.py
index 9c85dc041e6..90cffbe4de0 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -876,29 +876,32 @@ class TestDagRun:
     def test_removed_task_instances_can_be_restored(self, dag_maker, session):
         def with_all_tasks_removed(dag):
             with dag_maker(
-                dag_id=dag.dag_id, schedule=datetime.timedelta(days=1), 
start_date=dag.start_date
+                dag_id=dag.dag_id,
+                schedule=datetime.timedelta(days=1),
+                start_date=dag.start_date,
             ) as dag:
                 pass
             return dag
 
         with dag_maker(
-            "test_task_restoration", schedule=datetime.timedelta(days=1), 
start_date=DEFAULT_DATE
-        ) as dag:
-            ...
-        dag.add_task(EmptyOperator(task_id="flaky_task", owner="test"))
+            "test_task_restoration",
+            schedule=datetime.timedelta(days=1),
+            start_date=DEFAULT_DATE,
+        ) as ori_dag:
+            EmptyOperator(task_id="flaky_task", owner="test")
 
-        dagrun = self.create_dag_run(dag, session=session)
+        dagrun = self.create_dag_run(ori_dag, session=session)
         flaky_ti = dagrun.get_task_instances()[0]
         assert flaky_ti.task_id == "flaky_task"
         assert flaky_ti.state is None
 
-        dagrun.dag = with_all_tasks_removed(dag)
-        dag_version_id = DagVersion.get_latest_version(dag.dag_id, 
session=session).id
+        dagrun.dag = with_all_tasks_removed(ori_dag)
+        dag_version_id = DagVersion.get_latest_version(ori_dag.dag_id, 
session=session).id
         dagrun.verify_integrity(dag_version_id=dag_version_id)
         flaky_ti.refresh_from_db()
         assert flaky_ti.state is None
 
-        dagrun.dag.add_task(EmptyOperator(task_id="flaky_task", owner="test"))
+        dagrun.dag.add_task(ori_dag.task_dict["flaky_task"])
 
         dagrun.verify_integrity(dag_version_id=dag_version_id)
         flaky_ti.refresh_from_db()
@@ -942,8 +945,7 @@ class TestDagRun:
             schedule=datetime.timedelta(days=1),
             start_date=DEFAULT_DATE,
         ) as dag:
-            ...
-        dag.add_task(EmptyOperator(task_id="task_to_mutate", owner="test", 
queue="queue1"))
+            EmptyOperator(task_id="task_to_mutate", owner="test", 
queue="queue1")
 
         dagrun = self.create_dag_run(dag, session=session)
         task = dagrun.get_task_instances()[0]

Reply via email to