This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3f43bc4a5f4 partial kwargs deserialized MappedOperator set on unmapped
Operator (#42563)
3f43bc4a5f4 is described below
commit 3f43bc4a5f4b033d1562e32d78801a6406761e47
Author: Fred Thomsen <[email protected]>
AuthorDate: Mon Nov 18 02:08:44 2024 -0500
partial kwargs deserialized MappedOperator set on unmapped Operator (#42563)
Forwarding the partial kwargs to the underlying operator is done in
the standard (non-serialization) case and thus it should
be done here as well for things in the webserver that rely on these
fixed attributes.
An example is the `Triggered DAG` link for the `TriggerDagRunOperator`.
---
airflow/models/mappedoperator.py | 2 ++
tests/serialization/test_dag_serialization.py | 6 +++++-
2 files changed, 7 insertions(+), 1 deletion(-)
diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py
index 925acfc16f0..9a5c1b563ce 100644
--- a/airflow/models/mappedoperator.py
+++ b/airflow/models/mappedoperator.py
@@ -821,6 +821,8 @@ class MappedOperator(AbstractOperator):
from airflow.serialization.serialized_objects import
SerializedBaseOperator
op = SerializedBaseOperator(task_id=self.task_id, params=self.params,
_airflow_from_mapped=True)
+ for partial_attr, value in self.partial_kwargs.items():
+ setattr(op, partial_attr, value)
SerializedBaseOperator.populate_operator(op, self.operator_class)
if self.dag is not None: # For Mypy; we only serialize tasks in a DAG
so the check always satisfies.
SerializedBaseOperator.set_task_dag_references(op, self.dag)
diff --git a/tests/serialization/test_dag_serialization.py
b/tests/serialization/test_dag_serialization.py
index d10984556ff..f04ff3e2568 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -2529,7 +2529,11 @@ def test_operator_expand_deserialized_unmap():
ser_normal = BaseSerialization.serialize(normal)
deser_normal = BaseSerialization.deserialize(ser_normal)
deser_normal.dag = None
- assert deser_mapped.unmap(None) == deser_normal
+ unmapped_deser_mapped = deser_mapped.unmap(None)
+
+ assert type(unmapped_deser_mapped) is type(deser_normal) is
SerializedBaseOperator
+ assert unmapped_deser_mapped.task_id == deser_normal.task_id == "a"
+ assert unmapped_deser_mapped.executor_config ==
deser_normal.executor_config == {"a": "b"}
@pytest.mark.db_test