Felix-neko opened a new issue, #35529:
URL: https://github.com/apache/airflow/issues/35529

   ### Apache Airflow version
   
   2.7.3
   
   ### What happened
   
   Hi folks!
   
   I have to use `PythonVirtualenvOperator` operator and pass it `{{ dag_run 
}}`, `{{ task_instance }}` and other airflow context variables. And sometimes 
it crashes with following error:
   
   ```
   [2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO - Traceback (most 
recent call last):
   [2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO -   File 
"/tmp/venvqbspm8nx/script.py", line 17, in <module>
   [2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO -     arg_dict = 
dill.load(file)
   [2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO -   File 
"/tmp/venvqbspm8nx/lib/python3.10/site-packages/dill/_dill.py", line 373, in 
load
   [2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO -     return 
Unpickler(file, ignore=ignore, **kwds).load()
   [2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO -   File 
"/tmp/venvqbspm8nx/lib/python3.10/site-packages/dill/_dill.py", line 646, in 
load
   [2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO -     obj = 
StockUnpickler.load(self)
   [2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO -   File 
"/tmp/venvqbspm8nx/lib/python3.10/site-packages/dill/_dill.py", line 636, in 
find_class
   [2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO -     return 
StockUnpickler.find_class(self, module, name)
   [2023-11-08, 11:30:20 UTC] {process_utils.py:190} INFO - 
ModuleNotFoundError: No module named 
'unusual_prefix_7da5b81975a8caeba2f4e2b91b352e55493c2e25_dag'
   [2023-11-08, 11:30:20 UTC] {taskinstance.py:1937} ERROR - Task failed with 
exception
   ```
   
   Afer some testing I have found out that this error occurs if there is any 
operator in the DAG (maybe other operator than `PythonVirtualenvOperator`) that 
takes a function as `python_callable` argument -- and whose function is defined 
in the same Python source as the DAG object.
   
   
   
   ### What you think should happen instead
   
   I think that `airflow` should check its DAGs befor running (or before 
serialization) and give an informative error message in following case: if 
there is a `PythonVirtualenvOperator` in the DAG and if there is a 
`python_callable` function who is declared in the same Python module as the DAG 
itself.
   
   And, for the future, it will be really cool if airflow will migrate to 
`cloudpickle` and such functions will be deserialized correctly.
   
   
   ### How to reproduce
   
   Here's a minimal example that will give this error (should be tested with 
`airflow standalone`, with `SequentialExecutor` or `KubernetesExecutor`, does 
not happen on `DebugExecutor`):
   
   ```
   import datetime
   import pendulum
   import airflow
   from airflow import DAG
   from airflow.operators.python import PythonOperator, PythonVirtualenvOperator
   import dill
   
   
   dag = DAG(
       dag_id='strange_pickling_error_dag',
       schedule_interval='0 5 * * 1',
       start_date=datetime.datetime(2020, 1, 1),
       catchup=False,
       render_template_as_native_obj=True,
   )
   
   
   context = {"ts": "{{ ts }}", "dag_run": "{{ dag_run }}"}
   
   
   def make_foo(*args, **kwargs):
       print("---> making foo!")
       print("make foo(...): args")
       print(args)
       print("make foo(...): kwargs")
       print(kwargs)
   
   
   make_foo_task = PythonVirtualenvOperator(
       task_id='make_foo',
       python_callable=make_foo,
       use_dill=True,
       system_site_packages=False,
       op_args=[context],
       requirements=[f"dill=={dill.__version__}", 
f"apache-airflow=={airflow.__version__}", "psycopg2-binary >= 2.9, < 3",
                     f"pendulum=={pendulum.__version__}", "lazy-object-proxy"],
       dag=dag)
   ```
   
   
   
   And here's my workaround code:
   
   Here's my code now:
   
   - `dags/strange_pickling_error/dag.py`:
   
   
   ```
   import datetime
   import pendulum
   import airflow
   from airflow import DAG
   from airflow.operators.python import PythonOperator, PythonVirtualenvOperator
   import dill
   
   from strange_pickling_error.some_moar_code import make_foo
   
   
   dag = DAG(
       dag_id='strange_pickling_error_dag',
       schedule_interval='0 5 * * 1',
       start_date=datetime.datetime(2020, 1, 1),
       catchup=False,
       render_template_as_native_obj=True,
   )
   
   context = {"ts": "{{ ts }}", "dag_run": "{{ dag_run }}"}
   
   
   make_foo_task = PythonVirtualenvOperator(
       task_id='make_foo',
       python_callable=make_foo,
       use_dill=True,
       system_site_packages=False,
       op_args=[context],
       requirements=[f"dill=={dill.__version__}", 
f"apache-airflow=={airflow.__version__}", "psycopg2-binary >= 2.9, < 3",
                     f"pendulum=={pendulum.__version__}", "lazy-object-proxy"],
       dag=dag)
   ```
   
   - `dags/strange_pickling_error/some_moar_code.py`:
   
   ```
   def make_foo(*args, **kwargs):
       print("---> making foo!")
       print("make foo(...): args")
       print(args)
       print("make foo(...): kwargs")
       print(kwargs)
   ```
   
   ### Operating System
   
   Ubuntu 22.04
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-common-sql==1.8.0
   apache-airflow-providers-ftp==3.6.0
   apache-airflow-providers-google==10.11.0
   apache-airflow-providers-http==4.6.0
   apache-airflow-providers-imap==3.4.0
   apache-airflow-providers-sqlite==3.5.0
   
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   Python 3.10
   airflow==2.7.3
   dill==0.3.5.1
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to