Neon-Face opened a new issue, #54223:
URL: https://github.com/apache/airflow/issues/54223

   ### Apache Airflow version
   
   3.0.3
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   Since I want to pass Airflow Variables from a regular `@task` to a 
`@task.virtualenv`, I first created a normal task to fetch the variables and 
then passed the resulting dictionary to the virtualenv task.
   
   Here's the DAG code:
   **Path:** `dags/test/dag_test.py`
   (*Please note the file paths — I'm not entirely sure if separating DAG and 
task definitions like this is supported.*)
   
   ``` python
   from airflow.sdk import dag, Param
   
   @dag(
       dag_id="test",
       params={
           "P": Param("HI", type="string")
       }
   )
   def test():
       from utils.common_tools import clear_dag_xcom, load_vars_for_venv
       from utils.test import virtual_task
   
       variables = load_vars_for_venv(var_list=["DATASETTE_DB_SECRET"])
       t = virtual_task(variables=variables)
       c = clear_dag_xcom()
   
       t >> c
   
   test()
   ```
   
   here is my task code:
   **Path**: `utils/test.py`
   
   ```python
   from airflow.sdk import task
   
   @task.virtualenv(
       python_version = "3.9",
       requirements="requirements_test.txt",
       system_site_packages = True,
       venv_cache_path="venvs/test",
       inherit_env = True
   )
   def virtual_task(variables:dict, logical_date = None, params = None):
       print(variables)
       print(f"This is param: {params} !!!!!") 
       print(f"This is logical_date: {logical_date} !!!!!") 
       return
   ```
   
   and here is my common tasks (to improve reusability)
   **Path**: `utils/common_tools.py`
   
   ```python
   from airflow.sdk import task
   
   @task(task_id="load_vars_for_venv")
   def load_vars_for_venv(var_list:list):
       from airflow.sdk import Variable
       variables = {}
       for variable in var_list:
           variables[variable] = Variable.get(variable)
       return variables
   
   @task(task_id = "clear_dag_xcom")
   def clear_dag_xcom(run_id = None): # I also created this to clean XCOM data 
if the dag succeeded
       import os
       from airflow.sdk import Variable
       airflow_db_path = Variable.get("AIRFLOW_DB_SECRET")
       sql = f"DELETE FROM xcom WHERE run_id='{run_id}';"
       print(sql)
       os.system(f'sqlite3 {airflow_db_path} "{sql}"')
       return 
   ```
   
   However, the load_vars_for_venv task is quite unstable — it works 
intermittently. When it fails, there are no logs shown, making it difficult to 
debug.
   <img width="1920" height="998" alt="Image" 
src="https://github.com/user-attachments/assets/5095c637-71fa-4a39-b398-4150a1164c7a";
 />
   
   
   and the scheduler shows:
   ``` scheduler
   2025-08-07 12:02:42 [info] Secrets backends loaded for worker [supervisor] 
backend_classes=['EnvironmentVariablesBackend'] count=1        
   [2025-08-07T12:02:42.594+0000] {_client.py:1026} INFO - HTTP Request: PATCH 
http://localhost:8080/execution/task-instances/01988469-7ada-7fee-b
   011-bba401a05876/run "HTTP/1.1 409 Conflict"                                 
                                                                  
   2025-08-07 12:02:42 [warning] Server error [airflow.sdk.api.client] 
detail={'detail': {'reason': 'invalid_state', 'message'
   : 'TI was not in a state where it could be marked as running', 
'previous_state': 'scheduled'}}                                                 
   2025-08-07 12:02:42 [info] Process exited [supervisor] 
exit_code=<Negsignal.SIGKILL: -9> pid=516354 signal_sent=SIGKILL   
   ```
   and
   
   ``` scheduler
   [2025-08-07T12:02:42.611+0000] {local_executor.py:96} ERROR - uhoh
   .......
     File 
"/home/....../Airflow/.venv/lib64/python3.9/site-packages/airflow/sdk/api/client.py",
 line 110, in get_json_error
       raise err
   airflow.sdk.api.client.ServerResponseError: Server returned error
   ```
   
   However, when I run the DAG using `airflow dags test test`, it consistently 
succeeds and produces the expected logs.
   
   ```
   [2025-08-07T12:25:13.682+0000] {process_utils.py:196} INFO - 
{'DATASETTE_DB_SECRET': '......./datasette/insights.db'}
   [2025-08-07T12:25:13.683+0000] {process_utils.py:196} INFO - This is param: 
{'P': 'HI'} !!!!!
   [2025-08-07T12:25:13.683+0000] {process_utils.py:196} INFO - This is 
logical_date: 2025-08-07 12:25:10.918707+00:00 !!!!!
   ```
   And you can see it's successful in airflow UI as well, but when I try to 
read the log in airflow UI, it shows "Could not read served logs: 404 Client 
Error: NOT FOUND for url......"
   
   <img width="1920" height="999" alt="Image" 
src="https://github.com/user-attachments/assets/c2ddba76-3d3b-4441-abd9-8f064e936189";
 />
   
   
   **So my questions are:**
   
   1. Why does the `load_vars_for_venv` task behave inconsistently? Sometimes 
it works, other times it silently fails without any logs.
   2. What are the exact differences between triggering a DAG via the **Airflow 
UI** and using the CLI command `airflow dags test`? Why does the CLI test 
consistently succeed, while the UI-triggered runs fail or get stuck?
   
   Thanks for helping!
   
   
   
   
   ### What you think should happen instead?
   
   variables(dict) should be passed to `@task.virtualenv` and variables can be 
used in the virtualenv.
   
   ### How to reproduce
   
   Copy the code above and use the same file structure should be able to 
reproduce the problem.
   
   ### Operating System
   
   Oracle Linux Server
   
   ### Versions of Apache Airflow Providers
   
   ```
   apache-airflow-providers-celery==3.12.0
   apache-airflow-providers-common-compat==1.7.1
   apache-airflow-providers-common-io==1.6.0
   apache-airflow-providers-common-sql==1.27.2
   apache-airflow-providers-fab==2.2.1
   apache-airflow-providers-http==5.3.1
   apache-airflow-providers-smtp==2.1.0
   apache-airflow-providers-standard==1.3.0
   ```
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   I installed locally using `uv`.
   
   ```
   uv add "apache-airflow[celery,pandas,fab]==3.0.3" --constraint 
"https://raw.githubusercontent.com/apache/airflow/constraints-3.0.3/constraints-3.9.txt";
   uv add ruff
   uv add apache-airflow-providers-http
   ```
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to