josh-fell opened a new issue, #35433:
URL: https://github.com/apache/airflow/issues/35433

   ### Body
   
   Several Databricks operators exist which have a `json` parameter that can be 
a JSON object which contains any number of API parameters. In the constructors 
of these operators, the `json` value can be added to be other parameters that 
can be passed to the operator like `name`, `tags`, etc.
   
   Since `json` is a templated field, attempting to modify it in this way will 
fail/not work as expected if the input arg is a string (e.g. "{{ 
var.json....}}" or an XComArg (meaning it's an output of a previous task). 
Template fields are not rendered until just before the execute method is called.
   
   To illustrate the point, let's use this example DAG where we define the 
`json` arg in a previous task and use its output:
   ```py
   from __future__ import annotations
   
   from pendulum import datetime
   from typing import TYPE_CHECKING, Sequence
   
   from airflow.decorators import dag, task
   from airflow.models.baseoperator import BaseOperator
   
   if TYPE_CHECKING:
       from airflow.utils.context import Context
   
   
   class DatabricksCreateJobsOperator(BaseOperator):
       template_fields: Sequence[str] = ("json", "databricks_conn_id")
   
       def __init__(
           self,
           *,
           json: dict | None = None,
           name: str | None = None,
           tags: dict[str, str] | None = None,
           **kwargs
       ):
           super().__init__(**kwargs)
           self.json = json or {}
           if name is not None:
               self.json["name"] = name
           if tags is not None:
               self.json["tags"] = tags
   
       def execute(context: Context) -> None:
           pass
   
   @dag(start_date=datetime(2023, 1, 1), schedule=None)
   def derived_template_fields():
       @task
       def push_json() -> dict[str, str]:
           return {"key1": "val1", "key2": "val2"}
   
       json = push_json()
   
       DatabricksCreateJobsOperator(
           task_id="create_job_w_json", json=json, name="some_name", 
tags={"key3": "value3"}
       )
   
   
   derived_template_fields()
   ```
   DAG parsing fails with:
   ```
   Running: airflow dags reserialize
   [2023-08-31T14:29:57.796+0000] {utils.py:430} WARNING - No module named 
'paramiko'
   [2023-08-31T14:29:57.816+0000] {utils.py:430} WARNING - No module named 
'airflow.providers.dbt'
   [2023-08-31T14:29:58.533+0000] {dagbag.py:539} INFO - Filling up the DagBag 
from /usr/local/airflow/dags
   [2023-08-31T14:29:58.615+0000] {dagbag.py:347} ERROR - Failed to import: 
/usr/local/airflow/dags/derived_template_fields.py
   Traceback (most recent call last):
     File "/usr/local/lib/python3.11/site-packages/airflow/models/dagbag.py", 
line 343, in parse
       loader.exec_module(new_module)
     File "<frozen importlib._bootstrap_external>", line 940, in exec_module
     File "<frozen importlib._bootstrap>", line 241, in 
_call_with_frames_removed
     File "/usr/local/airflow/dags/derived_template_fields.py", line 50, in 
<module>
       derived_template_fields()
     File "/usr/local/lib/python3.11/site-packages/airflow/models/dag.py", line 
3798, in factory
       f(**f_kwargs)
     File "/usr/local/airflow/dags/derived_template_fields.py", line 41, in 
derived_template_fields
       DatabricksCreateJobsOperator(
     File 
"/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 
436, in apply_defaults
       result = func(self, **kwargs, default_args=default_args)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/airflow/dags/derived_template_fields.py", line 27, in 
__init__
       self.json["name"] = name
       ~~~~~~~~~^^^^^^^^
   TypeError: 'PlainXComArg' object does not support item assignment
   ```
   Even if we change the `json` arg assignment to use the classic XCom Jinja 
template approach (i.e. `json = "{{ ti.xcom_pull(task_ids='push_json') }}"`), 
the DAG fails to parse:
   ```
   Running: airflow dags reserialize
   [2023-08-31T14:32:01.553+0000] {utils.py:430} WARNING - No module named 
'paramiko'
   [2023-08-31T14:32:01.574+0000] {utils.py:430} WARNING - No module named 
'airflow.providers.dbt'
   [2023-08-31T14:32:02.341+0000] {dagbag.py:539} INFO - Filling up the DagBag 
from /usr/local/airflow/dags
   [2023-08-31T14:32:02.415+0000] {dagbag.py:347} ERROR - Failed to import: 
/usr/local/airflow/dags/derived_template_fields.py
   Traceback (most recent call last):
     File "/usr/local/lib/python3.11/site-packages/airflow/models/dagbag.py", 
line 343, in parse
       loader.exec_module(new_module)
     File "<frozen importlib._bootstrap_external>", line 940, in exec_module
     File "<frozen importlib._bootstrap>", line 241, in 
_call_with_frames_removed
     File "/usr/local/airflow/dags/derived_template_fields.py", line 51, in 
<module>
       derived_template_fields()
     File "/usr/local/lib/python3.11/site-packages/airflow/models/dag.py", line 
3798, in factory
       f(**f_kwargs)
     File "/usr/local/airflow/dags/derived_template_fields.py", line 42, in 
derived_template_fields
       DatabricksCreateJobsOperator(
     File 
"/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 
436, in apply_defaults
       result = func(self, **kwargs, default_args=default_args)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/airflow/dags/derived_template_fields.py", line 27, in 
__init__
       self.json["name"] = name
       ~~~~~~~~~^^^^^^^^
   TypeError: 'str' object does not support item assignment
   ```
   
   
   
   
   
   It would be best to move modifying json (and generally any template field) 
to the execute method instead.
   
   
   ### Committer
   
   - [X] I acknowledge that I am a maintainer/committer of the Apache Airflow 
project.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to