This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 27762afd9f01519d0dd53a769eae968b96ac2580
Author: Jed Cunningham <66968678+jedcunning...@users.noreply.github.com>
AuthorDate: Wed May 18 13:43:16 2022 -0600

    Fix auto upstream dep when expanding non-templated field (#23771)
    
    If you tried to expand via xcom into a non-templated field without
    explicitly setting the upstream task dependency, the scheduler would
    crash because the upstream task dependency wasn't being set
    automatically. It was being set only for templated fields, but now we do
    it for both.
    
    (cherry picked from commit 3849ebb8d22bbc229d464c4171c9b5ff960cd089)
---
 airflow/models/mappedoperator.py  |  3 +--
 tests/models/test_taskinstance.py | 20 ++++++++++++++++++++
 2 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py
index b63e26ec9e..c522cefb2c 100644
--- a/airflow/models/mappedoperator.py
+++ b/airflow/models/mappedoperator.py
@@ -300,8 +300,7 @@ class MappedOperator(AbstractOperator):
         if self.dag:
             self.dag.add_task(self)
         for k, v in self.mapped_kwargs.items():
-            if k in self.template_fields:
-                XComArg.apply_upstream_relationship(self, v)
+            XComArg.apply_upstream_relationship(self, v)
         for k, v in self.partial_kwargs.items():
             if k in self.template_fields:
                 XComArg.apply_upstream_relationship(self, v)
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 4bb3b2bae4..c520fe70bd 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2855,3 +2855,23 @@ def 
test_ti_mapped_depends_on_mapped_xcom_arg_XXX(dag_maker, session):
         ti.refresh_from_task(dag.get_task("add_one"))
         with pytest.raises(XComForMappingNotPushed):
             ti.run()
+
+
+def test_expand_non_templated_field(dag_maker, session):
+    """Test expand on non-templated fields sets upstream deps properly."""
+
+    class SimpleBashOperator(BashOperator):
+        template_fields = ()
+
+    with dag_maker(dag_id="product_same_types", session=session) as dag:
+
+        @dag.task
+        def get_extra_env():
+            return [{"foo": "bar"}, {"foo": "biz"}]
+
+        SimpleBashOperator.partial(task_id="echo", bash_command="echo 
$FOO").expand(env=get_extra_env())
+
+    dag_maker.create_dagrun()
+
+    echo_task = dag.get_task("echo")
+    assert "get_extra_env" in echo_task.upstream_task_ids

Reply via email to