This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7cbb6727e9c Decouple ``NotMapped`` exception from Task SDK (#55108)
7cbb6727e9c is described below
commit 7cbb6727e9c08f0cc5cf1646018165ab7f9dccda
Author: Kaxil Naik <[email protected]>
AuthorDate: Mon Sep 1 22:54:53 2025 +0100
Decouple ``NotMapped`` exception from Task SDK (#55108)
Move NotMapped exception from `airflow.sdk` to `airflow.exceptions `to
eliminate
SDK dependency in airflow-core. This enables independent deployment of
airflow-core without requiring Task SDK modules.
This supports the broader SDK decoupling architecture where airflow-core
can operate independently of Task SDK implementations. (part of
https://github.com/apache/airflow/issues/52141)
---
airflow-core/src/airflow/exceptions.py | 4 ++++
airflow-core/src/airflow/models/dagrun.py | 3 +--
airflow-core/src/airflow/models/mappedoperator.py | 5 +----
airflow-core/src/airflow/models/taskinstance.py | 2 +-
.../airflow/serialization/serialized_objects.py | 2 +-
.../src/airflow/ti_deps/deps/trigger_rule_dep.py | 2 +-
airflow-core/tests/unit/models/test_dagrun.py | 24 ++++++++++++----------
7 files changed, 22 insertions(+), 20 deletions(-)
diff --git a/airflow-core/src/airflow/exceptions.py
b/airflow-core/src/airflow/exceptions.py
index 150bbc3ffb3..95da650a8ab 100644
--- a/airflow-core/src/airflow/exceptions.py
+++ b/airflow-core/src/airflow/exceptions.py
@@ -276,6 +276,10 @@ class TaskInstanceNotFound(AirflowNotFoundException):
"""Raise when a task instance is not available in the system."""
+class NotMapped(Exception):
+ """Raise if a task is neither mapped nor has any parent mapped groups."""
+
+
class PoolNotFound(AirflowNotFoundException):
"""Raise when a Pool is not available in the system."""
diff --git a/airflow-core/src/airflow/models/dagrun.py
b/airflow-core/src/airflow/models/dagrun.py
index 2b79da56423..bc781271c89 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -58,7 +58,7 @@ from sqlalchemy_utils import UUIDType
from airflow._shared.timezones import timezone
from airflow.callbacks.callback_requests import DagCallbackRequest,
DagRunContext
from airflow.configuration import conf as airflow_conf
-from airflow.exceptions import AirflowException, TaskNotFound
+from airflow.exceptions import AirflowException, NotMapped, TaskNotFound
from airflow.listeners.listener import get_listener_manager
from airflow.models import Deadline, Log
from airflow.models.backfill import Backfill
@@ -67,7 +67,6 @@ from airflow.models.taskinstance import TaskInstance as TI
from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
from airflow.models.tasklog import LogTemplate
from airflow.models.taskmap import TaskMap
-from airflow.sdk.definitions._internal.abstractoperator import NotMapped
from airflow.sdk.definitions.deadline import DeadlineReference
from airflow.stats import Stats
from airflow.ti_deps.dep_context import DepContext
diff --git a/airflow-core/src/airflow/models/mappedoperator.py
b/airflow-core/src/airflow/models/mappedoperator.py
index 6e086beeaaa..3158901e907 100644
--- a/airflow-core/src/airflow/models/mappedoperator.py
+++ b/airflow-core/src/airflow/models/mappedoperator.py
@@ -27,11 +27,8 @@ import methodtools
import structlog
from sqlalchemy.orm import Session
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, NotMapped
from airflow.sdk import BaseOperator as TaskSDKBaseOperator
-from airflow.sdk.definitions._internal.abstractoperator import (
- NotMapped,
-)
from airflow.sdk.definitions._internal.node import DAGNode
from airflow.sdk.definitions.mappedoperator import MappedOperator as
TaskSDKMappedOperator
from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index e81b1691f70..bc940c18163 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -1741,13 +1741,13 @@ class TaskInstance(Base, LoggingMixin):
if not session:
session = settings.Session()
+ from airflow.exceptions import NotMapped
from airflow.models.mappedoperator import get_mapped_ti_count
from airflow.sdk.api.datamodels._generated import (
DagRun as DagRunSDK,
PrevSuccessfulDagRunResponse,
TIRunContext,
)
- from airflow.sdk.definitions._internal.abstractoperator import
NotMapped
from airflow.sdk.definitions.param import process_params
from airflow.sdk.execution_time.context import InletEventsAccessors
from airflow.utils.context import (
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 0a33a1ecf25..fa11f7b95df 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -2238,7 +2238,7 @@ class SerializedBaseOperator(DAGNode, BaseSerialization):
mapped task groups.
:return: Total number of mapped TIs this task should have.
"""
- from airflow.sdk.definitions._internal.abstractoperator import
NotMapped
+ from airflow.exceptions import NotMapped
group = self.get_closest_mapped_task_group()
if group is None:
diff --git a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
index ea53a582dcf..1298518b96e 100644
--- a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -129,9 +129,9 @@ class TriggerRuleDep(BaseTIDep):
:param dep_context: The current dependency context.
:param session: Database session.
"""
+ from airflow.exceptions import NotMapped
from airflow.models.expandinput import NotFullyPopulated
from airflow.models.taskinstance import TaskInstance
- from airflow.sdk.definitions._internal.abstractoperator import
NotMapped
@functools.lru_cache
def _get_expanded_ti_count() -> int:
diff --git a/airflow-core/tests/unit/models/test_dagrun.py
b/airflow-core/tests/unit/models/test_dagrun.py
index 9c85dc041e6..90cffbe4de0 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -876,29 +876,32 @@ class TestDagRun:
def test_removed_task_instances_can_be_restored(self, dag_maker, session):
def with_all_tasks_removed(dag):
with dag_maker(
- dag_id=dag.dag_id, schedule=datetime.timedelta(days=1),
start_date=dag.start_date
+ dag_id=dag.dag_id,
+ schedule=datetime.timedelta(days=1),
+ start_date=dag.start_date,
) as dag:
pass
return dag
with dag_maker(
- "test_task_restoration", schedule=datetime.timedelta(days=1),
start_date=DEFAULT_DATE
- ) as dag:
- ...
- dag.add_task(EmptyOperator(task_id="flaky_task", owner="test"))
+ "test_task_restoration",
+ schedule=datetime.timedelta(days=1),
+ start_date=DEFAULT_DATE,
+ ) as ori_dag:
+ EmptyOperator(task_id="flaky_task", owner="test")
- dagrun = self.create_dag_run(dag, session=session)
+ dagrun = self.create_dag_run(ori_dag, session=session)
flaky_ti = dagrun.get_task_instances()[0]
assert flaky_ti.task_id == "flaky_task"
assert flaky_ti.state is None
- dagrun.dag = with_all_tasks_removed(dag)
- dag_version_id = DagVersion.get_latest_version(dag.dag_id,
session=session).id
+ dagrun.dag = with_all_tasks_removed(ori_dag)
+ dag_version_id = DagVersion.get_latest_version(ori_dag.dag_id,
session=session).id
dagrun.verify_integrity(dag_version_id=dag_version_id)
flaky_ti.refresh_from_db()
assert flaky_ti.state is None
- dagrun.dag.add_task(EmptyOperator(task_id="flaky_task", owner="test"))
+ dagrun.dag.add_task(ori_dag.task_dict["flaky_task"])
dagrun.verify_integrity(dag_version_id=dag_version_id)
flaky_ti.refresh_from_db()
@@ -942,8 +945,7 @@ class TestDagRun:
schedule=datetime.timedelta(days=1),
start_date=DEFAULT_DATE,
) as dag:
- ...
- dag.add_task(EmptyOperator(task_id="task_to_mutate", owner="test",
queue="queue1"))
+ EmptyOperator(task_id="task_to_mutate", owner="test",
queue="queue1")
dagrun = self.create_dag_run(dag, session=session)
task = dagrun.get_task_instances()[0]