This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 95c46ec1353 FIX: Don't raise a warning in ExecutorSafeguard when
execute is called from an extended operator (#42849)
95c46ec1353 is described below
commit 95c46ec135349c8e8d3150d16f18ab65f8240f3e
Author: David Blain <[email protected]>
AuthorDate: Sun Oct 13 00:17:37 2024 +0200
FIX: Don't raise a warning in ExecutorSafeguard when execute is called from
an extended operator (#42849)
* refactor: Don't raise a warning when execute is called from an extended
operator, as this should always be allowed.
* refactored: Fixed import of test_utils in test_dag_run
---------
Co-authored-by: David Blain <[email protected]>
---
airflow/models/baseoperator.py | 11 ++++++++++-
tests/models/test_baseoperatormeta.py | 24 +++++++++++++++++++++++-
2 files changed, 33 insertions(+), 2 deletions(-)
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 7b5c8e7c4f8..0c3d119be19 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -34,6 +34,7 @@ import sys
import warnings
from datetime import datetime, timedelta
from functools import total_ordering, wraps
+from threading import local
from types import FunctionType
from typing import (
TYPE_CHECKING,
@@ -392,6 +393,8 @@ class ExecutorSafeguard:
"""
test_mode = conf.getboolean("core", "unit_test_mode")
+ _sentinel = local()
+ _sentinel.callers = {}
@classmethod
def decorator(cls, func):
@@ -399,7 +402,13 @@ class ExecutorSafeguard:
def wrapper(self, *args, **kwargs):
from airflow.decorators.base import DecoratedOperator
- sentinel = kwargs.pop(f"{self.__class__.__name__}__sentinel", None)
+ sentinel_key = f"{self.__class__.__name__}__sentinel"
+ sentinel = kwargs.pop(sentinel_key, None)
+
+ if sentinel:
+ cls._sentinel.callers[sentinel_key] = sentinel
+ else:
+ sentinel =
cls._sentinel.callers.pop(f"{func.__qualname__.split('.')[0]}__sentinel", None)
if not cls.test_mode and not sentinel == _sentinel and not
isinstance(self, DecoratedOperator):
message = f"{self.__class__.__name__}.{func.__name__} cannot
be called outside TaskInstance!"
diff --git a/tests/models/test_baseoperatormeta.py
b/tests/models/test_baseoperatormeta.py
index 6c6567b2389..5244e86b2c3 100644
--- a/tests/models/test_baseoperatormeta.py
+++ b/tests/models/test_baseoperatormeta.py
@@ -40,6 +40,11 @@ class HelloWorldOperator(BaseOperator):
return f"Hello {self.owner}!"
+class ExtendedHelloWorldOperator(HelloWorldOperator):
+ def execute(self, context: Context) -> Any:
+ return super().execute(context)
+
+
class TestExecutorSafeguard:
def setup_method(self):
ExecutorSafeguard.test_mode = False
@@ -49,12 +54,29 @@ class TestExecutorSafeguard:
@pytest.mark.skip_if_database_isolation_mode # Does not work in db
isolation mode
@pytest.mark.db_test
- def test_executor_when_classic_operator_called_from_dag(self, dag_maker):
+ @patch.object(HelloWorldOperator, "log")
+ def test_executor_when_classic_operator_called_from_dag(self, mock_log,
dag_maker):
with dag_maker() as dag:
HelloWorldOperator(task_id="hello_operator")
dag_run = dag.test()
assert dag_run.state == DagRunState.SUCCESS
+ mock_log.warning.assert_not_called()
+
+ @pytest.mark.skip_if_database_isolation_mode # Does not work in db
isolation mode
+ @pytest.mark.db_test
+ @patch.object(HelloWorldOperator, "log")
+ def test_executor_when_extended_classic_operator_called_from_dag(
+ self,
+ mock_log,
+ dag_maker,
+ ):
+ with dag_maker() as dag:
+ ExtendedHelloWorldOperator(task_id="hello_operator")
+
+ dag_run = dag.test()
+ assert dag_run.state == DagRunState.SUCCESS
+ mock_log.warning.assert_not_called()
@pytest.mark.skip_if_database_isolation_mode # Does not work in db
isolation mode
@pytest.mark.parametrize(