zhangw opened a new issue, #35998:
URL: https://github.com/apache/airflow/issues/35998

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   ## Airflow version 2.1.2
   
   Dag file is simple but using the XComArgs feature, and I notice the dag_hash 
changed when parsing and serializing every time. I thought the hashing should 
be stable in this case.
   
   ## The Dag file for testing
   
   ```
   import logging
   
   from airflow import DAG
   from airflow.decorators import task
   from airflow.operators.bash import BashOperator
   from airflow.operators.python import PythonOperator, get_current_context
   from airflow.utils.dates import days_ago
   
   log = logging.getLogger(__name__)
   
   
   def generate_value():
       """Dummy function"""
       return "Bring me a shrubbery!"
   
   
   @task()
   def print_value(value):
       """Dummy function"""
       ctx = get_current_context()
       log.info("The knights of Ni say: %s (at %s)", value, ctx['ts'])
   
   
   with DAG(
       dag_id='example_xcom_args',
       default_args={'owner': 'airflow'},
       start_date=days_ago(2),
       schedule_interval=None,
       tags=['example'],
   ) as dag:
       task1 = PythonOperator(
           task_id='generate_value',
           python_callable=generate_value,
       )
   
       print_value(task1.output)
   
   
   with DAG(
       "example_xcom_args_with_operators",
       default_args={'owner': 'airflow'},
       start_date=days_ago(2),
       schedule_interval=None,
       tags=['example'],
   ) as dag2:
       bash_op1 = BashOperator(task_id="c", bash_command="echo c")
       bash_op2 = BashOperator(task_id="d", bash_command="echo c")
       xcom_args_a = print_value("first!")
       xcom_args_b = print_value("second!")
   
       bash_op1 >> xcom_args_a >> xcom_args_b >> bash_op2
   ```
   
   ## one of the serialized data
   
   ```
   {"dag": {"tags": ["example"], "tasks": [{"pool": "default_pool", "label": 
"generate_value", "owner": "airflow", "_inlets": [], "op_args": [], "task_id": 
"generate_value", "_outlets": [], "ui_color": "#ffefeb", "_is_dummy": false, 
"op_kwargs": {}, "_task_type": "PythonOperator", "ui_fgcolor": "#000", 
"_task_module": "airflow.operators.python", "template_fields": 
["templates_dict", "op_args", "op_kwargs"], "_downstream_task_ids": 
["print_value"], "template_fields_renderers": {"op_args": "py", "op_kwargs": 
"py", "templates_dict": "json"}}, {"pool": "default_pool", "label": 
"print_value", "owner": "airflow", "doc_md": "Dummy function", "_inlets": [], 
"op_args": "(<airflow.models.xcom_arg.XComArg object at 0x107415d30>,)", 
"task_id": "print_value", "_outlets": [], "ui_color": "#ffefeb", "_is_dummy": 
false, "op_kwargs": {}, "_task_type": "_PythonDecoratedOperator", "ui_fgcolor": 
"#000", "_task_module": "airflow.decorators.python", "template_fields": 
["op_args", "op_kwargs"], "_downstrea
 m_task_ids": [], "template_fields_renderers": {"op_args": "py", "op_kwargs": 
"py"}}], "_dag_id": "example_xcom_args", "fileloc": 
"/Users/vincent/Codes/Workspace/webull/airflow/airflow/airflow/example_dags/example_xcomargs.py",
 "timezone": "UTC", "edge_info": {}, "start_date": 1701216000.0, "_task_group": 
{"tooltip": "", "children": {"print_value": ["operator", "print_value"], 
"generate_value": ["operator", "generate_value"]}, "ui_color": 
"CornflowerBlue", "_group_id": null, "ui_fgcolor": "#000", "prefix_group_id": 
true, "upstream_task_ids": [], "upstream_group_ids": [], "downstream_task_ids": 
[], "downstream_group_ids": []}, "default_args": {"__var": {"owner": 
"airflow"}, "__type": "dict"}, "dag_dependencies": [], "schedule_interval": 
null}, "__version": 1}
   ```
   
   ## another serialized data
   
   ```
   {"dag": {"tags": ["example"], "tasks": [{"pool": "default_pool", "label": 
"generate_value", "owner": "airflow", "_inlets": [], "op_args": [], "task_id": 
"generate_value", "_outlets": [], "ui_color": "#ffefeb", "_is_dummy": false, 
"op_kwargs": {}, "_task_type": "PythonOperator", "ui_fgcolor": "#000", 
"_task_module": "airflow.operators.python", "template_fields": 
["templates_dict", "op_args", "op_kwargs"], "_downstream_task_ids": 
["print_value"], "template_fields_renderers": {"op_args": "py", "op_kwargs": 
"py", "templates_dict": "json"}}, {"pool": "default_pool", "label": 
"print_value", "owner": "airflow", "doc_md": "Dummy function", "_inlets": [], 
"op_args": "(<airflow.models.xcom_arg.XComArg object at 0x112a51d60>,)", 
"task_id": "print_value", "_outlets": [], "ui_color": "#ffefeb", "_is_dummy": 
false, "op_kwargs": {}, "_task_type": "_PythonDecoratedOperator", "ui_fgcolor": 
"#000", "_task_module": "airflow.decorators.python", "template_fields": 
["op_args", "op_kwargs"], "_downstrea
 m_task_ids": [], "template_fields_renderers": {"op_args": "py", "op_kwargs": 
"py"}}], "_dag_id": "example_xcom_args", "fileloc": 
"/Users/vincent/Codes/Workspace/webull/airflow/airflow/airflow/example_dags/example_xcomargs.py",
 "timezone": "UTC", "edge_info": {}, "start_date": 1701216000.0, "_task_group": 
{"tooltip": "", "children": {"print_value": ["operator", "print_value"], 
"generate_value": ["operator", "generate_value"]}, "ui_color": 
"CornflowerBlue", "_group_id": null, "ui_fgcolor": "#000", "prefix_group_id": 
true, "upstream_task_ids": [], "upstream_group_ids": [], "downstream_task_ids": 
[], "downstream_group_ids": []}, "default_args": {"__var": {"owner": 
"airflow"}, "__type": "dict"}, "dag_dependencies": [], "schedule_interval": 
null}, "__version": 1}
   ```
   
   ## the only difference between them
   
   the value of the op_args
   
   
![image-20231201173700350](https://github.com/apache/airflow/assets/196561/ae1ebf10-4a76-448d-a0a2-551d60f7a666)
   
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   1. just copy the above dag file into airflow environment and make the 
scheduler running.
   2. run the sql several times with the interval 30s (it depends on your 
settings of the configuration min_serialized_dag_update_interval)
   
   ```
   select `dag_id`, `dag_hash`, `last_updated`, `data` from `serialized_dag` 
where `dag_id` = 'example_xcom_args';
   ```
   
   and compare the row results for these executions.
   
   ### Operating System
   
   My MacPro 14.1.1 (23B81) M1 chipset
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to