github-actions[bot] opened a new pull request, #51199:
URL: https://github.com/apache/airflow/pull/51199

   When using `dag.test()` with deferred tasks, tasks that complete their 
trigger execution were incorrectly being set to `SUCCESS` state instead of 
`SCHEDULED` state. This prevented task resumption!
   
   Dag used to test:
   
   ```python
   from datetime import datetime, timedelta, timezone
   from typing import Any
   
   import pendulum
   
   from airflow.providers.standard.triggers.temporal import DateTimeTrigger
   from airflow.sdk import Context, task, BaseOperator, DAG
   
   class DummyOperator(BaseOperator):
   
       def execute(self, context: Context):
           self.defer(
               trigger=DateTimeTrigger(
                   moment=datetime.now(timezone.utc) + timedelta(seconds=2),
               ),
               method_name="execute_complet",
           )
   
       def execute_complet(self, context: Context, event: Any = None):
           assert event is not None
           return "test"
   
   @task
   def dummy_task(param):
       print("DEBUG")
       assert param == "test", "Parameter should be 'test'"
   
   with DAG(
       dag_id="example_debug",
       start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
   ) as dag:
       task1 = DummyOperator(task_id="task1")
       task2 = dummy_task(task1.output)
       task1 >> task2
   
   if __name__ == "__main__":
       dag.test()
   ```
   (cherry picked from commit 1b83f7135d4b5167972ec32752642924e8e0a55a)
   
   Co-authored-by: Kaxil Naik <kaxiln...@gmail.com>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to