This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new aece98b1f05 Fix DAG deserialization failure with non-default
``weight_rule`` (#55906)
aece98b1f05 is described below
commit aece98b1f055cb2cc284a9ad804a9a484c06eec5
Author: Kaxil Naik <[email protected]>
AuthorDate: Fri Sep 19 23:51:14 2025 +0100
Fix DAG deserialization failure with non-default ``weight_rule`` (#55906)
Resolves an issue where DAGs using non default value like
`weight_rule='absolute'` would fail during
deserialization with `'Unknown priority strategy'` error. The problem
occurred
when the scheduler attempted to deserialize tasks that had already been
converted to `PriorityWeightStrategy` instances.
The `validate_and_load_priority_weight_strategy` function now properly
handles
cases where a `PriorityWeightStrategy` instance is passed, returning it
directly
instead of attempting to re-validate it.
This fix ensures that all weight rule options (`downstream`, `upstream`,
`absolute`)
work correctly in the complete DAG processing pipeline including
serialization,
storage, and deserialization.
To Reproduce the bug, run the following dag or just run the added unit test
without code changes:
```py
from airflow import DAG
from airflow.sdk import task
from airflow.task.weight_rule import WeightRule
with DAG("xcom_backend_dag"):
@task()
def xcom_producer():
return "my_value"
@task(weight_rule=WeightRule.ABSOLUTE)
def xcom_consumer(value):
assert value == "my_value"
xcom_consumer(xcom_producer())
```
which will then fail, when you try to trigger the dag from UI.
Fix DAG deserialization failure with weight_rule='absolute'
Resolves an issue where DAGs using weight_rule='absolute' would fail during
deserialization with 'Unknown priority strategy' error. The problem occurred
when the SerializedBaseOperator.weight_rule property was re-validating
already-instantiated PriorityWeightStrategy instances.
During deserialization, decode_priority_weight_strategy() creates strategy
instances and stores them in _weight_rule, but the weight_rule property
was calling validate_and_load_priority_weight_strategy() which expected
a string or class, not an instance.
Fixed by checking if _weight_rule is already a PriorityWeightStrategy
instance in the property getter and returning it directly. User input
still goes through proper validation.
This targeted fix resolves the specific deserialization issue while
maintaining existing validation behavior for new operator creation.
---
.../airflow/serialization/serialized_objects.py | 2 ++
.../unit/serialization/test_dag_serialization.py | 25 ++++++++++++++++++++--
task-sdk/src/airflow/sdk/__init__.py | 2 +-
3 files changed, 26 insertions(+), 3 deletions(-)
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py
b/airflow-core/src/airflow/serialization/serialized_objects.py
index ce1ce22d9ac..9dbe19df085 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -1416,6 +1416,8 @@ class SerializedBaseOperator(DAGNode, BaseSerialization):
@property
def weight_rule(self) -> PriorityWeightStrategy:
+ if isinstance(self._weight_rule, PriorityWeightStrategy):
+ return self._weight_rule
return validate_and_load_priority_weight_strategy(self._weight_rule)
def __getattr__(self, name):
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index 102b092770f..dba647a9393 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -58,7 +58,7 @@ from airflow.models.mappedoperator import MappedOperator
from airflow.models.xcom import XCOM_RETURN_KEY, XComModel
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
from airflow.providers.standard.operators.bash import BashOperator
-from airflow.sdk import DAG, AssetAlias, BaseHook, teardown
+from airflow.sdk import DAG, AssetAlias, BaseHook, WeightRule, teardown
from airflow.sdk.bases.decorator import DecoratedOperator
from airflow.sdk.bases.operator import OPERATOR_DEFAULTS, BaseOperator
from airflow.sdk.definitions._internal.expandinput import EXPAND_INPUT_EMPTY
@@ -75,7 +75,7 @@ from airflow.serialization.serialized_objects import (
SerializedDAG,
XComOperatorLink,
)
-from airflow.task.priority_strategy import _DownstreamPriorityWeightStrategy
+from airflow.task.priority_strategy import _AbsolutePriorityWeightStrategy,
_DownstreamPriorityWeightStrategy
from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
from airflow.timetables.simple import NullTimetable, OnceTimetable
from airflow.triggers.base import StartTriggerArgs
@@ -3802,6 +3802,27 @@ def
test_task_callback_backward_compatibility(old_callback_name, new_callback_na
assert getattr(deserialized_task_empty, new_callback_name) is False
+def test_weight_rule_absolute_serialization_deserialization():
+ """Test that weight_rule can be serialized and deserialized correctly."""
+ from airflow.sdk import task
+
+ with DAG("test_weight_rule_dag") as dag:
+
+ @task(weight_rule=WeightRule.ABSOLUTE)
+ def test_task():
+ return "test"
+
+ test_task()
+
+ serialized_dag = SerializedDAG.to_dict(dag)
+ assert serialized_dag["dag"]["tasks"][0]["__var"]["weight_rule"] ==
"absolute"
+
+ deserialized_dag = SerializedDAG.from_dict(serialized_dag)
+
+ deserialized_task = deserialized_dag.task_dict["test_task"]
+ assert isinstance(deserialized_task.weight_rule,
_AbsolutePriorityWeightStrategy)
+
+
class TestClientDefaultsGeneration:
"""Test client defaults generation functionality."""
diff --git a/task-sdk/src/airflow/sdk/__init__.py
b/task-sdk/src/airflow/sdk/__init__.py
index ff50daad139..9c411354c95 100644
--- a/task-sdk/src/airflow/sdk/__init__.py
+++ b/task-sdk/src/airflow/sdk/__init__.py
@@ -63,7 +63,7 @@ __all__ = [
__version__ = "1.2.0"
if TYPE_CHECKING:
- from airflow.sdk.api.datamodels._generated import DagRunState,
TaskInstanceState, TriggerRule
+ from airflow.sdk.api.datamodels._generated import DagRunState,
TaskInstanceState, TriggerRule, WeightRule
from airflow.sdk.bases.hook import BaseHook
from airflow.sdk.bases.notifier import BaseNotifier
from airflow.sdk.bases.operator import BaseOperator, chain, chain_linear,
cross_downstream