takersk opened a new issue, #29561: URL: https://github.com/apache/airflow/issues/29561
### Apache Airflow version Other Airflow 2 version (please specify below) ### What happened value.yaml ``` config: celery_kubernetes_executor: kubernetes_queue: 'kubernetes' ... executor: "CeleryKubernetesExecutor" ``` tasks (example.dag) - case01 (Expect to be run is CeleryExecutor) => ok!! def example_task_celery(_dag): ``` [2023-02-15 02:43:20,253: INFO/ForkPoolWorker-15] Task airflow.executors.celery_executor.execute_command[93ae0ee5-b390-4bda-b3d7-0782ccab5059] succeeded in 5.488538254052401s: None ``` - case02 (Expect to be run is CeleryExecutor) => not ok!! @task(queue="test") def example_task_celery(_dag): ``` [2023-02-15T04:28:23.494+0000] {base_executor.py:215} ERROR - could not queue task TaskInstanceKey(dag_id='example_dag', task_id='example_task_celery', run_id='manual__2023-02-15T02:41:33.585720+00:00', try_number=3, map_index=-1) (still running after 4 attempts) [2023-02-15T04:29:11.345+0000] {scheduler_job.py:1231} INFO - Resetting orphaned tasks for active dag runs ``` - case03 (Expect to be run is KubernetesExecutor) => not ok!! @task(queue="kubernetes") def example_task_kubernetes(_dag): The worker runs and immediately throws an error ``` [2023-02-15, 15:14:58 KST] {taskinstance.py:1581} INFO - Exporting the following env vars: AIRFLOW_CTX_DAG_OWNER=cdpde AIRFLOW_CTX_DAG_ID=example_dag AIRFLOW_CTX_TASK_ID=example_task_kubernetes AIRFLOW_CTX_EXECUTION_DATE=2023-02-15T06:09:54.463674+00:00 AIRFLOW_CTX_TRY_NUMBER=5 AIRFLOW_CTX_DAG_RUN_ID=manual__2023-02-15T06:09:54.463674+00:00 [2023-02-15, 15:14:58 KST] {taskinstance.py:1902} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/decorators/base.py", line 179, in execute return_value = super().execute(context) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 171, in execute return_value = self.execute_callable() File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 189, in execute_callable return self.python_callable(*self.op_args, **self.op_kwargs) File "/opt/airflow/dags/repo/dags/example_dag.py", line 23, in example_task_kubernetes exec_trino_query_string_task = exec_trino_query_pod( File "/opt/airflow/dags/repo/dags/common/utils/pod.py", line 10, in exec_trino_query_pod return get_k8s_operator( File "/opt/airflow/dags/repo/dags/common/operator/operator.py", line 50, in get_k8s_operator return KubernetesPodOperator( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 390, in apply_defaults result = func(self, **kwargs, default_args=default_args) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py", line 227, in __init__ super().__init__(resources=resources, **kwargs) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 390, in apply_defaults result = func(self, **kwargs, default_args=default_args) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 764, in __init__ task_group.add(self) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/task_group.py", line 210, in add raise DuplicateTaskIdFound(f"{node_type} id '{key}' has already been added to the DAG") airflow.exceptions.DuplicateTaskIdFound: Task id 'example_task_kubernetes' has already been added to the DAG [2023-02-15, 15:14:58 KST] {taskinstance.py:1407} INFO - Marking task as FAILED. dag_id=example_dag, task_id=example_task_kubernetes, execution_date=20230215T060954, start_date=20230215T061457, end_date=20230215T061458 [2023-02-15, 15:14:58 KST] {standard_task_runner.py:92} ERROR - Failed to execute job 141 for task example_task_kubernetes (Task id 'example_task_kubernetes' has already been added to the DAG; 14) [2023-02-15, 15:14:58 KST] {local_task_job.py:156} INFO - Task exited with return code 1 [2023-02-15, 15:14:58 KST] {local_task_job.py:279} INFO - 0 downstream tasks scheduled from follow-on schedule check ``` ### What you think should happen instead _No response_ ### How to reproduce _No response_ ### Operating System PRETTY_NAME="Debian GNU/Linux 11 (bullseye)" NAME="Debian GNU/Linux" VERSION_ID="11" ### Versions of Apache Airflow Providers apache-airflow-providers-amazon==5.0.0 apache-airflow-providers-celery==3.0.0 apache-airflow-providers-cncf-kubernetes==4.3.0 apache-airflow-providers-common-sql==1.1.0 apache-airflow-providers-docker==3.1.0 apache-airflow-providers-elasticsearch==4.2.0 apache-airflow-providers-ftp==3.1.0 apache-airflow-providers-google==8.3.0 apache-airflow-providers-grpc==3.0.0 apache-airflow-providers-hashicorp==3.1.0 apache-airflow-providers-http==4.0.0 apache-airflow-providers-imap==3.0.0 apache-airflow-providers-microsoft-azure==4.2.0 apache-airflow-providers-mysql==3.2.0 apache-airflow-providers-odbc==3.1.1 apache-airflow-providers-postgres==5.2.0 apache-airflow-providers-redis==3.0.0 apache-airflow-providers-sendgrid==3.0.0 apache-airflow-providers-sftp==4.0.0 apache-airflow-providers-slack==5.1.0 apache-airflow-providers-sqlite==3.2.0 apache-airflow-providers-ssh==3.1.0 ### Deployment Official Apache Airflow Helm Chart ### Deployment details * helm chart apiVersion: v2 name: airflow version: 1.7.0-dev appVersion: 2.3.4 ### Anything else _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org