This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new dbeec89 Refactor: `SKIPPED` shouldn't be logged again as `SUCCESS`. (#14822) dbeec89 is described below commit dbeec896fd752f266d1fd9950ba9220d415231b9 Author: suiting-young <80445042+suiting-yo...@users.noreply.github.com> AuthorDate: Fri Jun 11 05:48:51 2021 +0800 Refactor: `SKIPPED` shouldn't be logged again as `SUCCESS`. (#14822) * `SKIPPED` shouldn't be logged again as `SUCCESS`. * `_safe_date` duplicates with `_date_or_empty`. * Borrowed advantage from `_safe_date`. --- airflow/models/taskinstance.py | 65 +++++++++++++----------------------------- 1 file changed, 20 insertions(+), 45 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 4fd72e7..b4e644e 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1087,12 +1087,23 @@ class TaskInstance(Base, LoggingMixin): # pylint: disable=R0902,R0904 self.log.info("Executing %s on %s", self.task, self.execution_date) return True - def _date_or_empty(self, attr): - if hasattr(self, attr): - date = getattr(self, attr) - if date: - return date.strftime('%Y%m%dT%H%M%S') - return '' + def _date_or_empty(self, attr: str): + result = getattr(self, attr, None) # type: datetime + return result.strftime('%Y%m%dT%H%M%S') if result else '' + + def _log_state(self, lead_msg: str = ''): + self.log.info( + '%sMarking task as %s.' + + ' dag_id=%s, task_id=%s,' + + ' execution_date=%s, start_date=%s, end_date=%s', + lead_msg, + self.state.upper(), + self.dag_id, + self.task_id, + self._date_or_empty('execution_date'), + self._date_or_empty('start_date'), + self._date_or_empty('end_date'), + ) @provide_session @Sentry.enrich_errors @@ -1147,15 +1158,6 @@ class TaskInstance(Base, LoggingMixin): # pylint: disable=R0902,R0904 self.log.info(e) self.refresh_from_db(lock_for_update=True) self.state = State.SKIPPED - self.log.info( - 'Marking task as SKIPPED. ' - 'dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s', - self.dag_id, - self.task_id, - self._date_or_empty('execution_date'), - self._date_or_empty('start_date'), - self._date_or_empty('end_date'), - ) except AirflowRescheduleException as reschedule_exception: self.refresh_from_db() self._handle_reschedule(actual_start_date, reschedule_exception, test_mode) @@ -1181,17 +1183,9 @@ class TaskInstance(Base, LoggingMixin): # pylint: disable=R0902,R0904 finally: Stats.incr(f'ti.finish.{task.dag_id}.{task.task_id}.{self.state}') - # Recording SUCCESS + # Recording SKIPPED or SUCCESS self.end_date = timezone.utcnow() - self.log.info( - 'Marking task as SUCCESS. ' - 'dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s', - self.dag_id, - self.task_id, - self._date_or_empty('execution_date'), - self._date_or_empty('start_date'), - self._date_or_empty('end_date'), - ) + self._log_state() self.set_duration() if not test_mode: session.add(Log(self.state, self)) @@ -1458,25 +1452,12 @@ class TaskInstance(Base, LoggingMixin): # pylint: disable=R0902,R0904 if force_fail or not self.is_eligible_to_retry(): self.state = State.FAILED - if force_fail: - log_message = "Immediate failure requested. Marking task as FAILED." - else: - log_message = "Marking task as FAILED." email_for_state = task.email_on_failure else: self.state = State.UP_FOR_RETRY - log_message = "Marking task as UP_FOR_RETRY." email_for_state = task.email_on_retry - self.log.info( - '%s dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s', - log_message, - self.dag_id, - self.task_id, - self._safe_date('execution_date', '%Y%m%dT%H%M%S'), - self._safe_date('start_date', '%Y%m%dT%H%M%S'), - self._safe_date('end_date', '%Y%m%dT%H%M%S'), - ) + self._log_state('Immediate failure requested. ' if force_fail else '') if email_for_state and task.email: try: self.email_alert(error) @@ -1502,12 +1483,6 @@ class TaskInstance(Base, LoggingMixin): # pylint: disable=R0902,R0904 """Is task instance is eligible for retry""" return self.task.retries and self.try_number <= self.max_tries - def _safe_date(self, date_attr, fmt): - result = getattr(self, date_attr, None) - if result is not None: - return result.strftime(fmt) - return '' - @provide_session def get_template_context(self, session=None) -> Context: # pylint: disable=too-many-locals """Return TI Context"""