opeida opened a new issue, #51816:
URL: https://github.com/apache/airflow/issues/51816

   ### Apache Airflow version
   
   3.0.2
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   The exception occurred when the DAG ran with `dag.test()` attempted to 
retrieve a variable from the API server. Some similar issues have been opened 
(#48554, #51062, #51316), but the pull requests provided as the solutions 
(#50300, #50419) were included in 3.0.2 and did not fix the problem.
   
   ```
   Exception has occurred: ImportError
   cannot import name 'SUPERVISOR_COMMS' from 
'airflow.sdk.execution_time.task_runner' 
(/workspaces/airflow/.venv/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py)
     File "/workspaces/airflow/test.py", line 7, in <module>
       x = Variable.get("my_variable")
           ^^^^^^^^^^^^^^^^^^^^^^^^
   ImportError: cannot import name 'SUPERVISOR_COMMS' from 
'airflow.sdk.execution_time.task_runner' 
(/workspaces/airflow/.venv/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py)
   ```
   
   ### What you think should happen instead?
   
   The variable should have been successfully retrieved without exceptions.
   
   ### How to reproduce
   
   1. Set the variable: `airflow variables set my_variable my_value`
   2. Run DAG:
   ```
   import logging
   
   from airflow import DAG
   from airflow.providers.standard.operators.empty import EmptyOperator
   from airflow.providers.standard.operators.python import PythonOperator
   from airflow.sdk import Variable
   
   x = Variable.get("my_variable")
   
   def my_function(my_var: str) -> None:
       logging.getLogger(__name__).info(my_var)
   
   with DAG("test_dag") as dag:
   
       start = EmptyOperator(task_id="start")
   
       py_func = PythonOperator(
           task_id="py_func",
           python_callable=my_function,
           op_kwargs={
               "my_var": x
           }
       )
   
       end = EmptyOperator(task_id="end")
   
       start >> py_func >> end
   
   if __name__ == "__main__":
       dag.test()
   ```
   
   ### Operating System
   
   Debian GNU/Linux 12 (bookworm)
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   Extended image based on `apache/airflow:slim-3.0.2-python3.12`
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to