MaksYermak opened a new issue, #37180:
URL: https://github.com/apache/airflow/issues/37180

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.6.3
   
   ### What happened?
   
   When trying to run multiple instances of `GKEStartPodOperator` in a 
deferrable mode with 2 or more triggerers, the trigger is started on multiple 
triggerers in the same time. As a result we have a race condition. I think this 
issue has similarity with this one 
https://github.com/apache/airflow/issues/27057
   **Logs:**
   
![image](https://github.com/apache/airflow/assets/12415200/d5a0c8a2-b168-41a3-82a1-afc475031558)
   
![image](https://github.com/apache/airflow/assets/12415200/a6d5e84e-80a3-4ce7-bdb3-9ef361257cfa)
   
   
   ### What you think should happen instead?
   
   Airflow should run one trigger in one triggerer process in case when the 
user has multiple triggerers.
   
   ### How to reproduce
   
   Run this script on the environment with several triggerers. I used Composer 
with Airflow 2.6.3 with 2 triggerers.
   ```python
   from __future__ import annotations
   
   import os
   from datetime import datetime, timedelta
   
   from airflow.models.dag import DAG
   from airflow.providers.google.cloud.operators.kubernetes_engine import (
       GKECreateClusterOperator,
       GKEDeleteClusterOperator,
       GKEStartPodOperator,
   )
   
   ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
   DAG_ID = "kubernetes_engine_async_loop"
   GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
   
   GCP_LOCATION = "europe-west1"
   CLUSTER_NAME = f"example-cluster-defer-{ENV_ID}".replace("_", "-")
   
   CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1}
   
   with DAG(
       DAG_ID,
       schedule="@once",  # Override to match your needs
       start_date=datetime(2021, 1, 1),
       catchup=False,
       tags=["example"],
   ) as dag:
   
       create_cluster = GKECreateClusterOperator(
           task_id="create_cluster",
           project_id=GCP_PROJECT_ID,
           location=GCP_LOCATION,
           body=CLUSTER,
           deferrable=True,
       )
   
       pods_tasks = [
           GKEStartPodOperator(
               project_id=GCP_PROJECT_ID,
               location=GCP_LOCATION,
               cluster_name=CLUSTER_NAME,
               task_id=f'pod_task_no_{i}', 
               cmds=["/bin/sh","-c"], 
               name=f'pod_no_{i}',
               deferrable=True,
               arguments=["echo 'Starting job "+str(i)+"'; sleep $(($RANDOM % 
360)); echo 'Finished job "+str(i)+"'"],
               poll_interval=60,
               namespace="default",
               image="alpine:latest",
               do_xcom_push=False,
               get_logs=True,
               max_active_tis_per_dag=50,
               weight_rule="absolute",
               is_delete_operator_pod=True,
               retries=2,
               retry_delay=timedelta(seconds=30),
               image_pull_policy="IfNotPresent",
               annotations={"cluster-autoscaler.kubernetes.io/safe-to-evict": 
"false"},
           ) 
           for i in range(1, 20)
       ]
      
       delete_cluster = GKEDeleteClusterOperator(
           task_id="delete_cluster",
           name=CLUSTER_NAME,
           project_id=GCP_PROJECT_ID,
           location=GCP_LOCATION,
           deferrable=True,
       )
       
       create_cluster >> pods_tasks >> delete_cluster
   ```
   
   ### Operating System
   
   All
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Google Cloud Composer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to