This is an automated email from the ASF dual-hosted git repository.

utkarsharma pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-10-test by this push:
     new 57adf0b2853 Fixed thread local _sentinel.callers defect and added test 
cases (#44646) (#46280)
57adf0b2853 is described below

commit 57adf0b285331cfff587c2be77abadc4aed45b9d
Author: Utkarsh Sharma <[email protected]>
AuthorDate: Thu Jan 30 15:12:20 2025 +0530

    Fixed thread local _sentinel.callers defect and added test cases (#44646) 
(#46280)
    
    * Update base.py
    
    * Update base.py
    
    * Update base.py
    
    * Update baseoperator.py
    
    * Update base.py
    
    * Update base.py
    
    * Update baseoperator.py
    
    * Update baseoperator.py
    
    * Fixed thread local _sentinel.callers defect and added test cases
    
    * Fixed issue
    
    * Fixed issue
    
    * Fixed issue
    
    * Fixed issue
    
    * Fixed issue
    
    * Fixed issue
    
    * Fixed issue
    
    ---------
    
    Co-authored-by: Rahul Goyal <[email protected]>
    (cherry picked from commit a77fca25518b07d7d75c3301244d21f6a97c3947)
    
    Co-authored-by: rahulgoyal2987 <[email protected]>
---
 airflow/models/baseoperator.py        |  2 ++
 tests/models/test_baseoperatormeta.py | 18 ++++++++++++++++++
 2 files changed, 20 insertions(+)

diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 65900276271..1b1b22c7be4 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -410,6 +410,8 @@ class ExecutorSafeguard:
             sentinel = kwargs.pop(sentinel_key, None)
 
             if sentinel:
+                if not getattr(cls._sentinel, "callers", None):
+                    cls._sentinel.callers = {}
                 cls._sentinel.callers[sentinel_key] = sentinel
             else:
                 sentinel = 
cls._sentinel.callers.pop(f"{func.__qualname__.split('.')[0]}__sentinel", None)
diff --git a/tests/models/test_baseoperatormeta.py 
b/tests/models/test_baseoperatormeta.py
index 5244e86b2c3..52e45dd1cf3 100644
--- a/tests/models/test_baseoperatormeta.py
+++ b/tests/models/test_baseoperatormeta.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import datetime
+import threading
 from typing import TYPE_CHECKING, Any
 from unittest.mock import patch
 
@@ -211,3 +212,20 @@ class TestExecutorSafeguard:
         mock_log.warning.assert_called_once_with(
             "HelloWorldOperator.execute cannot be called outside TaskInstance!"
         )
+
+    def test_thread_local_executor_safeguard(self):
+        class TestExecutorSafeguardThread(threading.Thread):
+            def __init__(self):
+                threading.Thread.__init__(self)
+                self.executor_safeguard = ExecutorSafeguard()
+
+            def run(self):
+                class Wrapper:
+                    def wrapper_test_func(self, *args, **kwargs):
+                        print("test")
+
+                wrap_func = 
self.executor_safeguard.decorator(Wrapper.wrapper_test_func)
+                wrap_func(Wrapper(), Wrapper__sentinel="abc")
+
+        # Test thread local caller value is set properly
+        TestExecutorSafeguardThread().start()

Reply via email to