This is an automated email from the ASF dual-hosted git repository. dimberman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push: new e82cf0d Dagrun object doesn't exist in the TriggerDagRunOperator (#12819) e82cf0d is described below commit e82cf0d01d6c1e1ec65d8e1b70d65158947fccd2 Author: Daniel Imberman <daniel.imber...@gmail.com> AuthorDate: Fri Dec 4 20:10:45 2020 -0800 Dagrun object doesn't exist in the TriggerDagRunOperator (#12819) * Dagrun object doesn't exist in the TriggerDagRunOperator fixes https://github.com/apache/airflow/issues/12587 Fixes issue where dag_run object is not populated if the dag_run already exists and is reset * change to get_last_dag_run * Update airflow/operators/dagrun_operator.py Co-authored-by: Tomek Urbaszek <turbas...@gmail.com> Co-authored-by: Kaxil Naik <kaxiln...@gmail.com> Co-authored-by: Tomek Urbaszek <turbas...@gmail.com> --- airflow/operators/dagrun_operator.py | 2 ++ tests/operators/test_dagrun_operator.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py index 61bcac7..63d3361 100644 --- a/airflow/operators/dagrun_operator.py +++ b/airflow/operators/dagrun_operator.py @@ -144,6 +144,8 @@ class TriggerDagRunOperator(BaseOperator): dag = dag_bag.get_dag(self.trigger_dag_id) dag.clear(start_date=self.execution_date, end_date=self.execution_date) + + dag_run = DagRun.find(dag_id=dag.dag_id, run_id=run_id)[0] else: raise e diff --git a/tests/operators/test_dagrun_operator.py b/tests/operators/test_dagrun_operator.py index fc5b2e8..bb85979 100644 --- a/tests/operators/test_dagrun_operator.py +++ b/tests/operators/test_dagrun_operator.py @@ -101,6 +101,36 @@ class TestDagRunOperator(TestCase): self.assertTrue(dagruns[0].external_trigger) self.assertEqual(dagruns[0].execution_date, utc_now) + def test_trigger_dagrun_twice(self): + """Test TriggerDagRunOperator with custom execution_date.""" + utc_now = timezone.utcnow() + task = TriggerDagRunOperator( + task_id="test_trigger_dagrun_with_execution_date", + trigger_dag_id=TRIGGERED_DAG_ID, + execution_date=utc_now, + dag=self.dag, + poke_interval=1, + reset_dag_run=True, + wait_for_completion=True, + ) + run_id = f"manual__{utc_now.isoformat()}" + with create_session() as session: + dag_run = DagRun( + dag_id=TRIGGERED_DAG_ID, + execution_date=utc_now, + state=State.SUCCESS, + run_type="manual", + run_id=run_id, + ) + session.add(dag_run) + session.commit() + task.execute(None) + + dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all() + self.assertEqual(len(dagruns), 1) + self.assertTrue(dagruns[0].external_trigger) + self.assertEqual(dagruns[0].execution_date, utc_now) + def test_trigger_dagrun_with_templated_execution_date(self): """Test TriggerDagRunOperator with templated execution_date.""" task = TriggerDagRunOperator(