thedavidnovak commented on issue #50631:
URL: https://github.com/apache/airflow/issues/50631#issuecomment-3403069369

   A working workaround might be to wrap the classic operator in a TaskFlow API 
decorator inside the task group, thus constructing the command at runtime (this 
may help in this use case - Airflow 2.8.4, as @task.bash is availble from 
Airflow 2.9+):
   
   ```python
   @task_group()
   def my_tg(project, dataset, table_name, partition_id):
       @task(task_id="bash_task")
       def bash_task_wrapped(project, dataset, table_name, partition_id, 
**context):
           return BashOperator(
               task_id=context['task_instance'].task_id,
               bash_command=f"echo 
{project}.{dataset}.{table_name}${partition_id}",
               env={"MY_VAR": "Hello World"}
           ).execute(context)
   
       bash_task_wrapped(project, dataset, table_name, partition_id)
   ```
   
   For Airflow the task would be then `_PythonDecoratedOperator` within which 
the `BashOperator` is instantiated at runtime (see the attached log below).
   
   It works for this simple case, but it did not work for more complex cases 
like custom Kubernetes operators.
   
   ```log
   0564dffbaf58
   *** Found local files:
   ***   * 
/usr/local/airflow/logs/dag_id=airflow_issue_test_v1/run_id=manual__2025-10-14T18:01:14.911936+00:00/task_id=my_tg.bash_task/map_index=0/attempt=1.log
   [2025-10-14T18:01:17.319+0000] {taskinstance.py:1979} INFO - Dependencies 
all met for dep_context=non-requeueable deps ti=<TaskInstance: 
airflow_issue_test_v1.my_tg.bash_task manual__2025-10-14T18:01:14.911936+00:00 
map_index=0 [queued]>
   [2025-10-14T18:01:17.331+0000] {taskinstance.py:1979} INFO - Dependencies 
all met for dep_context=requeueable deps ti=<TaskInstance: 
airflow_issue_test_v1.my_tg.bash_task manual__2025-10-14T18:01:14.911936+00:00 
map_index=0 [queued]>
   [2025-10-14T18:01:17.331+0000] {taskinstance.py:2193} INFO - Starting 
attempt 1 of 1
   [2025-10-14T18:01:17.345+0000] {taskinstance.py:2217} INFO - Executing 
<Task(_PythonDecoratedOperator): my_tg.bash_task> on 2025-10-14 
18:01:14.911936+00:00
   [2025-10-14T18:01:17.349+0000] {standard_task_runner.py:60} INFO - Started 
process 352 to run task
   [2025-10-14T18:01:17.355+0000] {standard_task_runner.py:87} INFO - Running: 
['airflow', 'tasks', 'run', 'airflow_issue_test_v1', 'my_tg.bash_task', 
'manual__2025-10-14T18:01:14.911936+00:00', '--job-id', '179', '--raw', 
'--subdir', 'DAGS_FOLDER/airflow_issue_test_v1.py', '--cfg-path', 
'/tmp/tmpa4h28a0t', '--map-index', '0']
   [2025-10-14T18:01:17.357+0000] {standard_task_runner.py:88} INFO - Job 179: 
Subtask my_tg.bash_task
   [2025-10-14T18:01:17.427+0000] {task_command.py:423} INFO - Running 
<TaskInstance: airflow_issue_test_v1.my_tg.bash_task 
manual__2025-10-14T18:01:14.911936+00:00 map_index=0 [running]> on host 
0564dffbaf58
   [2025-10-14T18:01:17.593+0000] {taskinstance.py:2513} INFO - Exporting env 
vars: AIRFLOW_CTX_DAG_OWNER='airflow' 
AIRFLOW_CTX_DAG_ID='airflow_issue_test_v1' 
AIRFLOW_CTX_TASK_ID='my_tg.bash_task' 
AIRFLOW_CTX_EXECUTION_DATE='2025-10-14T18:01:14.911936+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='manual__2025-10-14T18:01:14.911936+00:00'
   [2025-10-14T18:01:17.595+0000] {subprocess.py:63} INFO - Tmp dir root 
location: /tmp
   [2025-10-14T18:01:17.595+0000] {subprocess.py:75} INFO - Running command: 
['/bin/bash', '-c', 'echo my_project.dataset_0.my_table$my_partition_id']
   [2025-10-14T18:01:17.602+0000] {subprocess.py:86} INFO - Output:
   [2025-10-14T18:01:17.603+0000] {subprocess.py:93} INFO - 
my_project.dataset_0.my_table
   [2025-10-14T18:01:17.603+0000] {subprocess.py:97} INFO - Command exited with 
return code 0
   [2025-10-14T18:01:17.604+0000] {python.py:202} INFO - Done. Returned value 
was: my_project.dataset_0.my_table
   [2025-10-14T18:01:17.648+0000] {taskinstance.py:1149} INFO - Marking task as 
SUCCESS. dag_id=airflow_issue_test_v1, task_id=my_tg.bash_task, map_index=0, 
execution_date=20251014T180114, start_date=20251014T180117, 
end_date=20251014T180117
   [2025-10-14T18:01:17.701+0000] {local_task_job_runner.py:234} INFO - Task 
exited with return code 0
   [2025-10-14T18:01:17.738+0000] {taskinstance.py:3312} INFO - 0 downstream 
tasks scheduled from follow-on schedule check
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to