mhenc commented on code in PR #28900:
URL: https://github.com/apache/airflow/pull/28900#discussion_r1323545895
##########
airflow/models/dagrun.py:
##########
@@ -516,27 +564,46 @@ def get_dag(self) -> DAG:
return self.dag
+ @staticmethod
+ @internal_api_call
@provide_session
def get_previous_dagrun(
- self, state: DagRunState | None = None, session: Session = NEW_SESSION
+ dag_run: DagRun | DagRunPydantic, state: DagRunState | None = None,
session: Session = NEW_SESSION
) -> DagRun | None:
- """Return the previous DagRun, if there is one."""
+ """
+ Return the previous DagRun, if there is one.
+
+ :param dag_run: the dag run
+ :param session: SQLAlchemy ORM Session
+ :param state: the dag run state
+ """
filters = [
- DagRun.dag_id == self.dag_id,
- DagRun.execution_date < self.execution_date,
+ DagRun.dag_id == dag_run.dag_id,
+ DagRun.execution_date < dag_run.execution_date,
]
if state is not None:
filters.append(DagRun.state == state)
return
session.scalar(select(DagRun).where(*filters).order_by(DagRun.execution_date.desc()).limit(1))
+ @staticmethod
+ @internal_api_call
@provide_session
- def get_previous_scheduled_dagrun(self, session: Session = NEW_SESSION) ->
DagRun | None:
- """Return the previous SCHEDULED DagRun, if there is one."""
+ def get_previous_scheduled_dagrun(
+ dag_run_id: int,
+ session: Session = NEW_SESSION,
+ ) -> DagRun | None:
Review Comment:
| DagRunPydantic
##########
airflow/models/taskinstance.py:
##########
@@ -1583,14 +2334,22 @@ def _run_raw_task(
# Recording SKIPPED or SUCCESS
self.clear_next_method_args()
- self.end_date = timezone.utcnow()
- self._log_state()
- self.set_duration()
+ _log_state(task_instance=self)
+ TaskInstance.set_end_date(
+ dag_id=self.dag_id,
+ run_id=self.run_id,
+ task_id=self.task_id,
+ map_index=self.map_index,
+ end_date=timezone.utcnow(),
+ session=session,
+ )
+
+ self.refresh_from_db(session=session)
Review Comment:
I think this is the problem
In line 2276 we set
state= SUCCESS
but we don't commit it . And refresh_from_db simply overrides it to RUNNING.
Do we need to call TaskInstance.set_end_date there? the previous
implementation simply set end_date and duration, while you commit the changes
(and reload) from DB.
Maybe we should revert TaskInstance.set_end_date to previous implementation
(this PR is big enought).
I have on my the task to migrate this (_run_raw_task) so I will think about
the possible solutions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]