This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new aece98b1f05 Fix DAG deserialization failure with non-default 
``weight_rule`` (#55906)
aece98b1f05 is described below

commit aece98b1f055cb2cc284a9ad804a9a484c06eec5
Author: Kaxil Naik <[email protected]>
AuthorDate: Fri Sep 19 23:51:14 2025 +0100

    Fix DAG deserialization failure with non-default ``weight_rule`` (#55906)
    
    Resolves an issue where DAGs using non default value like 
`weight_rule='absolute'` would fail during
    deserialization with `'Unknown priority strategy'` error. The problem 
occurred
    when the scheduler attempted to deserialize tasks that had already been
    converted to `PriorityWeightStrategy` instances.
    
    The `validate_and_load_priority_weight_strategy` function now properly 
handles
    cases where a `PriorityWeightStrategy` instance is passed, returning it 
directly
    instead of attempting to re-validate it.
    
    This fix ensures that all weight rule options (`downstream`, `upstream`, 
`absolute`)
    work correctly in the complete DAG processing pipeline including 
serialization,
    storage, and deserialization.
    
    To Reproduce the bug, run the following dag or just run the added unit test 
without code changes:
    
    ```py
    from airflow import DAG
    from airflow.sdk import task
    from airflow.task.weight_rule import WeightRule
    
    with DAG("xcom_backend_dag"):
    
        @task()
        def xcom_producer():
            return "my_value"
    
        @task(weight_rule=WeightRule.ABSOLUTE)
        def xcom_consumer(value):
            assert value == "my_value"
    
        xcom_consumer(xcom_producer())
    
    ```
    
    which will then fail, when you try to trigger the dag from UI.
    
    Fix DAG deserialization failure with weight_rule='absolute'
    
    Resolves an issue where DAGs using weight_rule='absolute' would fail during
    deserialization with 'Unknown priority strategy' error. The problem occurred
    when the SerializedBaseOperator.weight_rule property was re-validating
    already-instantiated PriorityWeightStrategy instances.
    
    During deserialization, decode_priority_weight_strategy() creates strategy
    instances and stores them in _weight_rule, but the weight_rule property
    was calling validate_and_load_priority_weight_strategy() which expected
    a string or class, not an instance.
    
    Fixed by checking if _weight_rule is already a PriorityWeightStrategy
    instance in the property getter and returning it directly. User input
    still goes through proper validation.
    
    This targeted fix resolves the specific deserialization issue while
    maintaining existing validation behavior for new operator creation.
---
 .../airflow/serialization/serialized_objects.py    |  2 ++
 .../unit/serialization/test_dag_serialization.py   | 25 ++++++++++++++++++++--
 task-sdk/src/airflow/sdk/__init__.py               |  2 +-
 3 files changed, 26 insertions(+), 3 deletions(-)

diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py 
b/airflow-core/src/airflow/serialization/serialized_objects.py
index ce1ce22d9ac..9dbe19df085 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -1416,6 +1416,8 @@ class SerializedBaseOperator(DAGNode, BaseSerialization):
 
     @property
     def weight_rule(self) -> PriorityWeightStrategy:
+        if isinstance(self._weight_rule, PriorityWeightStrategy):
+            return self._weight_rule
         return validate_and_load_priority_weight_strategy(self._weight_rule)
 
     def __getattr__(self, name):
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py 
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index 102b092770f..dba647a9393 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -58,7 +58,7 @@ from airflow.models.mappedoperator import MappedOperator
 from airflow.models.xcom import XCOM_RETURN_KEY, XComModel
 from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
 from airflow.providers.standard.operators.bash import BashOperator
-from airflow.sdk import DAG, AssetAlias, BaseHook, teardown
+from airflow.sdk import DAG, AssetAlias, BaseHook, WeightRule, teardown
 from airflow.sdk.bases.decorator import DecoratedOperator
 from airflow.sdk.bases.operator import OPERATOR_DEFAULTS, BaseOperator
 from airflow.sdk.definitions._internal.expandinput import EXPAND_INPUT_EMPTY
@@ -75,7 +75,7 @@ from airflow.serialization.serialized_objects import (
     SerializedDAG,
     XComOperatorLink,
 )
-from airflow.task.priority_strategy import _DownstreamPriorityWeightStrategy
+from airflow.task.priority_strategy import _AbsolutePriorityWeightStrategy, 
_DownstreamPriorityWeightStrategy
 from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
 from airflow.timetables.simple import NullTimetable, OnceTimetable
 from airflow.triggers.base import StartTriggerArgs
@@ -3802,6 +3802,27 @@ def 
test_task_callback_backward_compatibility(old_callback_name, new_callback_na
     assert getattr(deserialized_task_empty, new_callback_name) is False
 
 
+def test_weight_rule_absolute_serialization_deserialization():
+    """Test that weight_rule can be serialized and deserialized correctly."""
+    from airflow.sdk import task
+
+    with DAG("test_weight_rule_dag") as dag:
+
+        @task(weight_rule=WeightRule.ABSOLUTE)
+        def test_task():
+            return "test"
+
+        test_task()
+
+    serialized_dag = SerializedDAG.to_dict(dag)
+    assert serialized_dag["dag"]["tasks"][0]["__var"]["weight_rule"] == 
"absolute"
+
+    deserialized_dag = SerializedDAG.from_dict(serialized_dag)
+
+    deserialized_task = deserialized_dag.task_dict["test_task"]
+    assert isinstance(deserialized_task.weight_rule, 
_AbsolutePriorityWeightStrategy)
+
+
 class TestClientDefaultsGeneration:
     """Test client defaults generation functionality."""
 
diff --git a/task-sdk/src/airflow/sdk/__init__.py 
b/task-sdk/src/airflow/sdk/__init__.py
index ff50daad139..9c411354c95 100644
--- a/task-sdk/src/airflow/sdk/__init__.py
+++ b/task-sdk/src/airflow/sdk/__init__.py
@@ -63,7 +63,7 @@ __all__ = [
 __version__ = "1.2.0"
 
 if TYPE_CHECKING:
-    from airflow.sdk.api.datamodels._generated import DagRunState, 
TaskInstanceState, TriggerRule
+    from airflow.sdk.api.datamodels._generated import DagRunState, 
TaskInstanceState, TriggerRule, WeightRule
     from airflow.sdk.bases.hook import BaseHook
     from airflow.sdk.bases.notifier import BaseNotifier
     from airflow.sdk.bases.operator import BaseOperator, chain, chain_linear, 
cross_downstream

Reply via email to