dinjazelena98 opened a new issue, #51868:
URL: https://github.com/apache/airflow/issues/51868

   ### Apache Airflow version
   
   3.0.2
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   Hi, i have an Airflow Operator which uses self.defer() to call an Deferrable 
Trigger. Inside that deferrable trigger we are just waiting for event to 
happen. Once event happens it yields TriggerEvent back to the worker and 
executing "method_name" from self.defer() method. Here i want to trigger next 
DAG which needs that event, and go back to deferring. Now next DAG lasts for 
much longer, and i want to have possible concurrent runs.
   
   But when ever next DAG is triggered, my initial DAG goes to status "queued". 
I absolutely cant figure out why.
   
   
   ```python
       def execute(self, context: dict[str, Any]) -> None:
           self.defer(
               trigger=DeferrableTriggerClass(**params),
               method_name="trigger",
           )
   
       def trigger(self, context: dict[str, Any], event: dict[str, Any]) -> 
None:
           TriggerDagRunOperator(
               task_id="__trigger",
               trigger_dag_id="next_dag",
               conf={event["target"]},
               wait_for_completion=False,
           ).execute(context)
           
           self.defer(
               trigger=DeferrableTriggerClass(**params),
               method_name="trigger",
           )
   ```
   
   First i tried something like above. But it seems that after calling 
TriggerDagRunOperator, actual task gets done and anything after it never gets 
executed. So initially my idea was to have an single DAG that is always either 
triggering the DAG or waiting for event to happen. But after using 
TriggerDagRunOperator, nothing gets executed after it, and DAG is considered 
success.
   
   Then to mitigate problem above, i tried to just make this DAG run as 
schedule="@continuous", so after every time it gets event, trigger the DAG with 
that event, and then based on schedule to create a new run. But then once the 
the DAG that was waiting for event, triggers the next DAG with 
TriggerDagRunOperator, DAG is marked green, and by schedule creates a new 
instance of the DAG, but it gets to queued for the runtime of the DAG that 
triggered.
   
   I tested this without having deferrable task, and it works good, i have 
initial DAG that always triggers next one and does not have any dependency on 
it.
   
   
   Why does it get queued? Or why is DAG automatically finished after calling 
TriggerDagRun inside method of Operator.
   
   ### What you think should happen instead?
   
   Once the DAG that has deferrable triggers the next DAG, it should be 
retriggered because of @continuous schedule. But it goes to queued for the 
runtime of the DAG that it has triggered.
   
   ### How to reproduce
   
   Have a DAG with self.defer() trigger and make it trigger some other DAG in 
method_name that is triggered once we move back to worker from triggerer
   
   ### Operating System
   
   Ubuntu
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to