takersk opened a new issue, #29561:
URL: https://github.com/apache/airflow/issues/29561

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   
   value.yaml
   ```
   config:
     celery_kubernetes_executor:
       kubernetes_queue: 'kubernetes'
   ...
   executor: "CeleryKubernetesExecutor"
   ```
   
   tasks (example.dag)
   - case01 (Expect to be run is CeleryExecutor) => ok!!
   def example_task_celery(_dag): 
   ```
   [2023-02-15 02:43:20,253: INFO/ForkPoolWorker-15] Task 
airflow.executors.celery_executor.execute_command[93ae0ee5-b390-4bda-b3d7-0782ccab5059]
 succeeded in 5.488538254052401s: None
   ```
   
   - case02 (Expect to be run is CeleryExecutor) => not ok!!
   @task(queue="test")
   def example_task_celery(_dag):
   ```
   [2023-02-15T04:28:23.494+0000] {base_executor.py:215} ERROR - could not 
queue task TaskInstanceKey(dag_id='example_dag', task_id='example_task_celery', 
run_id='manual__2023-02-15T02:41:33.585720+00:00', try_number=3, map_index=-1) 
(still running after 4 attempts)
   [2023-02-15T04:29:11.345+0000] {scheduler_job.py:1231} INFO - Resetting 
orphaned tasks for active dag runs
   ```
   
   - case03 (Expect to be run is KubernetesExecutor) => not ok!!
   @task(queue="kubernetes")
   def example_task_kubernetes(_dag):
   
   The worker runs and immediately throws an error
   ```
   [2023-02-15, 15:14:58 KST] {taskinstance.py:1581} INFO - Exporting the 
following env vars:
   AIRFLOW_CTX_DAG_OWNER=cdpde
   AIRFLOW_CTX_DAG_ID=example_dag
   AIRFLOW_CTX_TASK_ID=example_task_kubernetes
   AIRFLOW_CTX_EXECUTION_DATE=2023-02-15T06:09:54.463674+00:00
   AIRFLOW_CTX_TRY_NUMBER=5
   AIRFLOW_CTX_DAG_RUN_ID=manual__2023-02-15T06:09:54.463674+00:00
   [2023-02-15, 15:14:58 KST] {taskinstance.py:1902} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/decorators/base.py", 
line 179, in execute
       return_value = super().execute(context)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", 
line 171, in execute
       return_value = self.execute_callable()
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", 
line 189, in execute_callable
       return self.python_callable(*self.op_args, **self.op_kwargs)
     File "/opt/airflow/dags/repo/dags/example_dag.py", line 23, in 
example_task_kubernetes
       exec_trino_query_string_task = exec_trino_query_pod(
     File "/opt/airflow/dags/repo/dags/common/utils/pod.py", line 10, in 
exec_trino_query_pod
       return get_k8s_operator(
     File "/opt/airflow/dags/repo/dags/common/operator/operator.py", line 50, 
in get_k8s_operator
       return KubernetesPodOperator(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/baseoperator.py",
 line 390, in apply_defaults
       result = func(self, **kwargs, default_args=default_args)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py",
 line 227, in __init__
       super().__init__(resources=resources, **kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/baseoperator.py",
 line 390, in apply_defaults
       result = func(self, **kwargs, default_args=default_args)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/baseoperator.py",
 line 764, in __init__
       task_group.add(self)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/task_group.py", 
line 210, in add
       raise DuplicateTaskIdFound(f"{node_type} id '{key}' has already been 
added to the DAG")
   airflow.exceptions.DuplicateTaskIdFound: Task id 'example_task_kubernetes' 
has already been added to the DAG
   [2023-02-15, 15:14:58 KST] {taskinstance.py:1407} INFO - Marking task as 
FAILED. dag_id=example_dag, task_id=example_task_kubernetes, 
execution_date=20230215T060954, start_date=20230215T061457, 
end_date=20230215T061458
   [2023-02-15, 15:14:58 KST] {standard_task_runner.py:92} ERROR - Failed to 
execute job 141 for task example_task_kubernetes (Task id 
'example_task_kubernetes' has already been added to the DAG; 14)
   [2023-02-15, 15:14:58 KST] {local_task_job.py:156} INFO - Task exited with 
return code 1
   [2023-02-15, 15:14:58 KST] {local_task_job.py:279} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   ```
   
   
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   _No response_
   
   ### Operating System
   
   PRETTY_NAME="Debian GNU/Linux 11 (bullseye)" NAME="Debian GNU/Linux" 
VERSION_ID="11"
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==5.0.0
   apache-airflow-providers-celery==3.0.0
   apache-airflow-providers-cncf-kubernetes==4.3.0
   apache-airflow-providers-common-sql==1.1.0
   apache-airflow-providers-docker==3.1.0
   apache-airflow-providers-elasticsearch==4.2.0
   apache-airflow-providers-ftp==3.1.0
   apache-airflow-providers-google==8.3.0
   apache-airflow-providers-grpc==3.0.0
   apache-airflow-providers-hashicorp==3.1.0
   apache-airflow-providers-http==4.0.0
   apache-airflow-providers-imap==3.0.0
   apache-airflow-providers-microsoft-azure==4.2.0
   apache-airflow-providers-mysql==3.2.0
   apache-airflow-providers-odbc==3.1.1
   apache-airflow-providers-postgres==5.2.0
   apache-airflow-providers-redis==3.0.0
   apache-airflow-providers-sendgrid==3.0.0
   apache-airflow-providers-sftp==4.0.0
   apache-airflow-providers-slack==5.1.0
   apache-airflow-providers-sqlite==3.2.0
   apache-airflow-providers-ssh==3.1.0
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   * helm chart
   apiVersion: v2
   name: airflow
   version: 1.7.0-dev
   appVersion: 2.3.4
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to