howard-oc opened a new issue, #55766:
URL: https://github.com/apache/airflow/issues/55766

   ### Apache Airflow version
   
   3.0.6
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   When configuring Airflow 3.0.6 with multiple executors concurrently using 
`executor: "CeleryExecutor,KubernetesExecutor"`, tasks with 
`queue='kubernetes'` are not being routed to the KubernetesExecutor. Instead, 
all tasks are being executed by the CeleryExecutor regardless of their queue 
assignment.
   
   ## Expected Behavior
   
   According to the [Airflow 
documentation](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/index.html#using-multiple-executors-concurrently):
   
   - Tasks without `queue` parameter → CeleryExecutor (default)
   - Tasks with `queue='kubernetes'` → KubernetesExecutor
   
   ## Actual Behavior
   
   - **All tasks are executed by CeleryExecutor** regardless of queue assignment
   - Tasks with `queue='kubernetes'` are being executed by 
`CeleryExecutor(parallelism=32)` instead of KubernetesExecutor
   - No Kubernetes pods are created for tasks with `queue='kubernetes'`
   
   ## Configuration
   
   ### Helm Values (airflow.cfg)
   ```yaml
   # Multiple Executors (Airflow 3.x+)
   executor: "CeleryExecutor,KubernetesExecutor"
   
   airflow:
     config:
       # Multiple Executors Concurrently (Airflow 3.0+)
       AIRFLOW__CORE__EXECUTOR: "CeleryExecutor,KubernetesExecutor"
       # Kubernetes queue configuration
       AIRFLOW__KUBERNETES__KUBERNETES_QUEUE: "kubernetes"
   ```
   
   ### Generated airflow.cfg
   ```ini
   [core]
   executor = CeleryExecutor,KubernetesExecutor
   
   [celery_kubernetes_executor]
   kubernetes_queue = kubernetes
   
   [kubernetes_executor]
   multi_namespace_mode = False
   namespace = airflow
   pod_template_file = /opt/airflow/pod_templates/pod_template_file.yaml
   worker_container_repository = apache/airflow
   worker_container_tag = 3.0.6
   ```
   
   
   
   ## Environment Details
   
   - **Airflow Version**: 3.0.6
   - **Python Version**: 3.12
   - **Kubernetes Version**: 1.28+ (Kind cluster)
   - **Helm Chart**: apache-airflow/airflow 1.18.0
   - **Deployment Method**: Helm + ArgoCD
   
   ## Steps to Reproduce
   
   1. Deploy Airflow 3.0.6 with the configuration above
   2. Set `executor: "CeleryExecutor,KubernetesExecutor"` in Helm values
   3. Set `AIRFLOW__CORE__EXECUTOR: "CeleryExecutor,KubernetesExecutor"` in 
environment variables
   4. Set `AIRFLOW__KUBERNETES__KUBERNETES_QUEUE: "kubernetes"` in environment 
variables
   5. Create a DAG with tasks using `queue='kubernetes'`
   6. Trigger the DAG
   7. Observe that all tasks are executed by CeleryExecutor
   
   ## Expected vs Actual Results
   
   | Task | Queue | Expected Executor | Actual Executor | Status |
   |------|-------|-------------------|-----------------|---------|
   | celery_python_task | default | CeleryExecutor | CeleryExecutor | ✅ Working 
|
   | kubernetes_python_task | kubernetes | KubernetesExecutor | CeleryExecutor 
| ❌ **Bug** |
   | celery_bash_task | default | CeleryExecutor | CeleryExecutor | ✅ Working |
   
   ## Impact
   
   - **High Impact**: Core functionality documented in official Airflow 
documentation is not working
   - **Workaround**: None available - tasks cannot be routed to 
KubernetesExecutor
   - **Affected Users**: All users trying to use multiple executors 
concurrently in Airflow 3.x
   
   ## Additional Information
   
   ### Configuration Verification
   ```bash
   # Executor configuration
   $ airflow config get-value core executor
   CeleryExecutor,KubernetesExecutor
   
   # Kubernetes queue configuration  
   $ airflow config get-value celery_kubernetes_executor kubernetes_queue
   kubernetes
   
   # Environment variable
   $ env | grep AIRFLOW__CORE__EXECUTOR
   AIRFLOW__CORE__EXECUTOR=CeleryExecutor,KubernetesExecutor
   ```
   
   ### Related Documentation
   - [Using Multiple Executors 
Concurrently](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/index.html#using-multiple-executors-concurrently)
   - [CeleryKubernetesExecutor 
Deprecation](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/index.html#celerykubernetesexecutor)
   
   ## Request
   
   Please investigate why the multiple executors concurrently feature is not 
routing tasks with `queue='kubernetes'` to the KubernetesExecutor in Airflow 
3.0.6. This appears to be a regression or configuration issue that prevents the 
documented functionality from working as expected.
   
   ---
   
   **Issue Log Created**: 2025-09-17  
   **Reporter**: Data Platform Team  
   **Priority**: High  
   **Status**: Open
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   ## Test Case
   
   ### DAG Configuration
   ```python
   from datetime import datetime, timedelta
   from airflow import DAG
   from airflow.operators.python import PythonOperator
   from airflow.operators.bash import BashOperator
   
   default_args = {
       'owner': 'airflow',
       'depends_on_past': False,
       'start_date': datetime(2025, 9, 17),
       'email_on_failure': False,
       'email_on_retry': False,
       'retries': 1,
       'retry_delay': timedelta(minutes=5),
   }
   
   dag = DAG(
       'hybrid_executor_test',
       default_args=default_args,
       description='Test DAG for Multiple Executors Concurrently',
       schedule=timedelta(hours=1),
       catchup=False,
       tags=['hybrid', 'test', 'executor'],
   )
   
   def celery_python_task():
       print("Hello from Celery Python Task!")
       import os
       print(f"Running in pod: {os.getenv('HOSTNAME')}")
       print(f"Executor: {os.getenv('AIRFLOW__CORE__EXECUTOR')}")
   
   def kubernetes_python_task():
       print("Hello from Kubernetes Python Task!")
       import os
       print(f"Running in pod: {os.getenv('HOSTNAME')}")
       print(f"Executor: {os.getenv('AIRFLOW__CORE__EXECUTOR')}")
   
   # Task 1: Should use CeleryExecutor (no queue specified)
   celery_task = PythonOperator(
       task_id='celery_python_task',
       python_callable=celery_python_task,
       dag=dag,
   )
   
   # Task 2: Should use KubernetesExecutor (queue='kubernetes')
   kubernetes_task = PythonOperator(
       task_id='kubernetes_python_task',
       python_callable=kubernetes_python_task,
       queue='kubernetes',  # This should route to KubernetesExecutor
       dag=dag,
   )
   
   # Task 3: Should use CeleryExecutor (no queue specified)
   celery_bash_task = BashOperator(
       task_id='celery_bash_task',
       bash_command='echo "Hello from Celery Bash Task! Running on $(hostname)" 
&& sleep 5',
       dag=dag,
   )
   
   # Set task dependencies
   celery_task >> kubernetes_task >> celery_bash_task
   ```
   
   ## Observed Behavior
   
   ### Task Execution Logs
   ```
   [2025-09-17T11:04:30.751+0000] {scheduler_job_runner.py:879} INFO - 
TaskInstance Finished: dag_id=hybrid_executor_test, task_id=celery_python_task, 
run_id=manual__2025-09-17T11:04:23.446622+00:00_ssPeqR6R, map_index=-1, 
run_start_date=None, run_end_date=None, run_duration=None, state=queued, 
executor=CeleryExecutor(parallelism=32), executor_state=failed, try_number=1, 
max_tries=1, pool=default_pool, queue=default, priority_weigh=3, 
operator=PythonOperator, queued_dttm=2025-09-17 11:04:24.733620+00:00, 
scheduled_dttm=2025-09-17 11:04:24.710596+00:00,queued_by_job_id=47, pid=None
   
   [2025-09-17T11:04:30.988+0000] {scheduler_job_runner.py:879} INFO - 
TaskInstance Finished: dag_id=hybrid_executor_test, 
task_id=kubernetes_python_task, 
run_id=manual__2025-09-17T10:29:05.176689+00:00_eqFN1IEs, map_index=-1, 
run_start_date=2025-09-17 11:04:30.785718+00:00, run_end_date=2025-09-17 
11:04:30.785718+00:00, run_duration=0.0, state=failed, 
executor=CeleryExecutor(parallelism=32), executor_state=failed, try_number=1, 
max_tries=1, pool=default_pool, queue=kubernetes, priority_weight=2, 
operator=PythonOperator, queued_dttm=2025-09-17 10:53:35.344116+00:00, 
scheduled_dttm=2025-09-17 10:53:31.646261+00:00,queued_by_job_id=47, pid=None
   ```
   
   **Key Observations:**
   1. `celery_python_task` (no queue) → 
`executor=CeleryExecutor(parallelism=32), queue=default` ✅ **Expected**
   2. `kubernetes_python_task` (queue='kubernetes') → 
`executor=CeleryExecutor(parallelism=32), queue=kubernetes` ❌ **Unexpected**
   
   ### Kubernetes Executor Status
   ```
   [2025-09-17T11:07:14.896+0000] {kubernetes_executor_utils.py:95} INFO - 
Kubernetes watch timed out waiting for events. Restarting watch.
   [2025-09-17T11:07:15.944+0000] {kubernetes_executor_utils.py:132} INFO - 
Event: and now my watch begins starting at resource_version: 0
   ```
   
   The Kubernetes executor is running and watching for events, but no pods are 
created.
   
   ### Operating System
   
   Ubuntu
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to