dinjazelena98 opened a new issue, #51868: URL: https://github.com/apache/airflow/issues/51868
### Apache Airflow version 3.0.2 ### If "Other Airflow 2 version" selected, which one? _No response_ ### What happened? Hi, i have an Airflow Operator which uses self.defer() to call an Deferrable Trigger. Inside that deferrable trigger we are just waiting for event to happen. Once event happens it yields TriggerEvent back to the worker and executing "method_name" from self.defer() method. Here i want to trigger next DAG which needs that event, and go back to deferring. Now next DAG lasts for much longer, and i want to have possible concurrent runs. But when ever next DAG is triggered, my initial DAG goes to status "queued". I absolutely cant figure out why. ```python def execute(self, context: dict[str, Any]) -> None: self.defer( trigger=DeferrableTriggerClass(**params), method_name="trigger", ) def trigger(self, context: dict[str, Any], event: dict[str, Any]) -> None: TriggerDagRunOperator( task_id="__trigger", trigger_dag_id="next_dag", conf={event["target"]}, wait_for_completion=False, ).execute(context) self.defer( trigger=DeferrableTriggerClass(**params), method_name="trigger", ) ``` First i tried something like above. But it seems that after calling TriggerDagRunOperator, actual task gets done and anything after it never gets executed. So initially my idea was to have an single DAG that is always either triggering the DAG or waiting for event to happen. But after using TriggerDagRunOperator, nothing gets executed after it, and DAG is considered success. Then to mitigate problem above, i tried to just make this DAG run as schedule="@continuous", so after every time it gets event, trigger the DAG with that event, and then based on schedule to create a new run. But then once the the DAG that was waiting for event, triggers the next DAG with TriggerDagRunOperator, DAG is marked green, and by schedule creates a new instance of the DAG, but it gets to queued for the runtime of the DAG that triggered. I tested this without having deferrable task, and it works good, i have initial DAG that always triggers next one and does not have any dependency on it. Why does it get queued? Or why is DAG automatically finished after calling TriggerDagRun inside method of Operator. ### What you think should happen instead? Once the DAG that has deferrable triggers the next DAG, it should be retriggered because of @continuous schedule. But it goes to queued for the runtime of the DAG that it has triggered. ### How to reproduce Have a DAG with self.defer() trigger and make it trigger some other DAG in method_name that is triggered once we move back to worker from triggerer ### Operating System Ubuntu ### Versions of Apache Airflow Providers _No response_ ### Deployment Astronomer ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org