gecko655 opened a new issue, #32804:
URL: https://github.com/apache/airflow/issues/32804

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   ### Airflow version
   2.5.3+composer (The latest available airflow version in Cloud Composer)
   
   
   ### What happened
   
   `DagRun.find()` (the `find` method of SQLAlchemy model `DagRun` ) fails with 
the error message `TypeError: 'DateTime' object is not iterable`, when passing 
`execution_date` that is directly obtained from context:
   ```py
   def delete_previous_dagrun_func(dag_to_delete, **context):
       execution_date = context.get('execution_date')
       dagruns_today = DagRun.find(dag_id=dag_to_delete.dag_id,
                                      execution_date=execution_date)
   
   ```
   
   ### What is the cause
   Upon closer inspection, execution_date is of type `lazy_object_proxy.Proxy` 
and the `is_container()` function used in `DagRun.find()` determines if the 
variable is a "container" by the presence of the `__iter__` field.
   `lazy_object_proxy.Proxy` has `__iter__`, so the `execution_date` is 
determined to be a container, and as a result it is passed as an array element 
to SQLAlchemy, which caused the "not iterable" error.
   
   
https://github.com/apache/airflow/blob/6313e5293280773aed7598e1befb8d371e8f5614/airflow/models/dagrun.py#L406-L409
   
   
https://github.com/apache/airflow/blob/6313e5293280773aed7598e1befb8d371e8f5614/airflow/utils/helpers.py#L117-L120
   
   I think `is_container()` should have another conditional branch to deal with 
`lazy_object_proxy.Proxy`.
   
   
   ### workaround
   
   It works fine by unwrapping `lazy_object_proxy.Proxy` before passing it.
   
   ```py
   def delete_previous_dagrun_func(dag_to_delete, **context):
       execution_date = context.get('execution_date')
       dagruns_today = DagRun.find(dag_id=dag_to_delete.dag_id,
                                      execution_date=execution_date.__wrapped__)
   
   ```
   
   
https://github.com/ionelmc/python-lazy-object-proxy/blob/c56c68bda23b8957abbc2fef3d21f32dd44b7f93/src/lazy_object_proxy/simple.py#L76-L83
   
   
   
   
   
   
   ### What you think should happen instead
   
   The variable fetched from `context.get('execution_date')` can be used 
directly as an argument of `DagRun.find()`
   
   
   ### How to reproduce
   
   
   ```py
   dag_to_delete = DAG('DAG_TO_DELETE')
   dag = DAG('DAG')
   
   
   def delete_previous_dagrun_func(dag_to_delete, **context):
       execution_date = context.get('execution_date')
       dagruns_today = DagRun.find(dag_id=dag_to_delete.dag_id,
                                      execution_date=execution_date)
       # do something for `dagruns_today` (Delete the dagruns for today)
   
   
   op = PythonOperator(
       task_id='Delete_Previous_DagRun',
       python_callable=delete_previous_dagrun_func,
       op_args=[dag_to_delete],
       provide_context=True,
       dag=dag)
   ```
   
   This code produces the following error at `DagRun.find` line:
   
   ```
   [2023-07-24, 19:09:06 JST] {taskinstance.py:1778} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 
210, in execute
       branch = super().execute(context)
     File 
"/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 
175, in execute
       return_value = self.execute_callable()
     File 
"/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 
192, in execute_callable
       return self.python_callable(*self.op_args, **self.op_kwargs)
     File "/home/airflow/gcs/dags/*******.py", line ***, in 
delete_previous_dagrun_func
       dagruns_today = DagRun.find(dag_id=dag_to_delete.dag_id,
     File 
"/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 75, 
in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/opt/python3.8/lib/python3.8/site-packages/airflow/models/dagrun.py", line 
386, in find
       qry = qry.filter(cls.execution_date.in_(execution_date))
     File 
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/operators.py", line 
641, in in_
       return self.operate(in_op, other)
     File 
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/orm/attributes.py", line 
317, in operate
       return op(self.comparator, *other, **kwargs)
     File 
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/operators.py", line 
1423, in in_op
       return a.in_(b)
     File 
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/operators.py", line 
641, in in_
       return self.operate(in_op, other)
     File 
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/orm/properties.py", line 
426, in operate
       return op(self.__clause_element__(), *other, **kwargs)
     File 
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/operators.py", line 
1423, in in_op
       return a.in_(b)
     File 
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/operators.py", line 
641, in in_
       return self.operate(in_op, other)
     File 
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 
870, in operate
       return op(self.comparator, *other, **kwargs)
     File 
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/operators.py", line 
1423, in in_op
       return a.in_(b)
     File 
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/operators.py", line 
641, in in_
       return self.operate(in_op, other)
     File 
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/type_api.py", line 
1373, in operate
       return super(TypeDecorator.Comparator, self).operate(
     File 
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/type_api.py", line 
77, in operate
       return o[0](self.expr, op, *(other + o[1:]), **kwargs)
     File 
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/default_comparator.py",
 line 159, in _in_impl
       seq_or_selectable = coercions.expect(
     File 
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/coercions.py", line 
193, in expect
       resolved = impl._literal_coercion(
     File 
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/coercions.py", line 
573, in _literal_coercion
       element = list(element)
   TypeError: 'DateTime' object is not iterable
   ```
   
   
   
   ### Operating System
   
   Google Cloud Composer (Ubuntu 20.04.6 LTS on Kubernetes)
   
   ### Versions of Apache Airflow Providers
   
   No relevant providers
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   Google Cloud Composer image version: `composer-2.3.4-airflow-2.5.3`
   
    
   
   ### Anything else
   
   I have recently begun preparing to upgrade Airflow from 1.10 to Series 2.x.
   The code described in the reproduce section still works in Airflow 1.10 
environment.
   I want to know it is intentional OR accidental incompatibility between 1.10 
and 2.x.
   
   I am willing to submit a PR, if it is super easy (like changing 1 line of 
code).
   If it is not the case and I need to make a big change, I think I do not have 
enough time to make PR timely.
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to