MaksYermak opened a new issue, #37180: URL: https://github.com/apache/airflow/issues/37180
### Apache Airflow version Other Airflow 2 version (please specify below) ### If "Other Airflow 2 version" selected, which one? 2.6.3 ### What happened? When trying to run multiple instances of `GKEStartPodOperator` in a deferrable mode with 2 or more triggerers, the trigger is started on multiple triggerers in the same time. As a result we have a race condition. I think this issue has similarity with this one https://github.com/apache/airflow/issues/27057 **Logs:** ![image](https://github.com/apache/airflow/assets/12415200/d5a0c8a2-b168-41a3-82a1-afc475031558) ![image](https://github.com/apache/airflow/assets/12415200/a6d5e84e-80a3-4ce7-bdb3-9ef361257cfa) ### What you think should happen instead? Airflow should run one trigger in one triggerer process in case when the user has multiple triggerers. ### How to reproduce Run this script on the environment with several triggerers. I used Composer with Airflow 2.6.3 with 2 triggerers. ```python from __future__ import annotations import os from datetime import datetime, timedelta from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.kubernetes_engine import ( GKECreateClusterOperator, GKEDeleteClusterOperator, GKEStartPodOperator, ) ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") DAG_ID = "kubernetes_engine_async_loop" GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") GCP_LOCATION = "europe-west1" CLUSTER_NAME = f"example-cluster-defer-{ENV_ID}".replace("_", "-") CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1} with DAG( DAG_ID, schedule="@once", # Override to match your needs start_date=datetime(2021, 1, 1), catchup=False, tags=["example"], ) as dag: create_cluster = GKECreateClusterOperator( task_id="create_cluster", project_id=GCP_PROJECT_ID, location=GCP_LOCATION, body=CLUSTER, deferrable=True, ) pods_tasks = [ GKEStartPodOperator( project_id=GCP_PROJECT_ID, location=GCP_LOCATION, cluster_name=CLUSTER_NAME, task_id=f'pod_task_no_{i}', cmds=["/bin/sh","-c"], name=f'pod_no_{i}', deferrable=True, arguments=["echo 'Starting job "+str(i)+"'; sleep $(($RANDOM % 360)); echo 'Finished job "+str(i)+"'"], poll_interval=60, namespace="default", image="alpine:latest", do_xcom_push=False, get_logs=True, max_active_tis_per_dag=50, weight_rule="absolute", is_delete_operator_pod=True, retries=2, retry_delay=timedelta(seconds=30), image_pull_policy="IfNotPresent", annotations={"cluster-autoscaler.kubernetes.io/safe-to-evict": "false"}, ) for i in range(1, 20) ] delete_cluster = GKEDeleteClusterOperator( task_id="delete_cluster", name=CLUSTER_NAME, project_id=GCP_PROJECT_ID, location=GCP_LOCATION, deferrable=True, ) create_cluster >> pods_tasks >> delete_cluster ``` ### Operating System All ### Versions of Apache Airflow Providers _No response_ ### Deployment Google Cloud Composer ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org