vincbeck commented on code in PR #35527:
URL: https://github.com/apache/airflow/pull/35527#discussion_r1388063159


##########
airflow/models/taskinstance.py:
##########
@@ -2107,47 +2147,80 @@ def check_and_change_state_before_execution(
                 ignore_ti_state=ignore_ti_state,
                 description="requeueable deps",
             )
-            if not self.are_dependencies_met(dep_context=dep_context, 
session=session, verbose=True):
-                self.state = None
-                self.log.warning(
+            if not ti.are_dependencies_met(dep_context=dep_context, 
session=session, verbose=True):
+                ti.state = None
+                ti.log.warning(

Review Comment:
   Same here for log



##########
airflow/models/taskinstance.py:
##########
@@ -2107,47 +2147,80 @@ def check_and_change_state_before_execution(
                 ignore_ti_state=ignore_ti_state,
                 description="requeueable deps",
             )
-            if not self.are_dependencies_met(dep_context=dep_context, 
session=session, verbose=True):
-                self.state = None
-                self.log.warning(
+            if not ti.are_dependencies_met(dep_context=dep_context, 
session=session, verbose=True):
+                ti.state = None
+                ti.log.warning(
                     "Rescheduling due to concurrency limits reached "
                     "at task runtime. Attempt %s of "
                     "%s. State set to NONE.",
-                    self.try_number,
-                    self.max_tries + 1,
+                    ti.try_number,
+                    ti.max_tries + 1,
                 )
-                self.queued_dttm = timezone.utcnow()
-                session.merge(self)
+                ti.queued_dttm = timezone.utcnow()
+                session.merge(ti)
                 session.commit()
                 return False
 
-        if self.next_kwargs is not None:
-            self.log.info("Resuming after deferral")
+        if ti.next_kwargs is not None:
+            ti.log.info("Resuming after deferral")
         else:
-            self.log.info("Starting attempt %s of %s", self.try_number, 
self.max_tries + 1)
-        self._try_number += 1
+            ti.log.info("Starting attempt %s of %s", ti.try_number, 
ti.max_tries + 1)

Review Comment:
   Same



##########
airflow/models/taskinstance.py:
##########
@@ -2107,47 +2147,80 @@ def check_and_change_state_before_execution(
                 ignore_ti_state=ignore_ti_state,
                 description="requeueable deps",
             )
-            if not self.are_dependencies_met(dep_context=dep_context, 
session=session, verbose=True):
-                self.state = None
-                self.log.warning(
+            if not ti.are_dependencies_met(dep_context=dep_context, 
session=session, verbose=True):
+                ti.state = None
+                ti.log.warning(
                     "Rescheduling due to concurrency limits reached "
                     "at task runtime. Attempt %s of "
                     "%s. State set to NONE.",
-                    self.try_number,
-                    self.max_tries + 1,
+                    ti.try_number,
+                    ti.max_tries + 1,
                 )
-                self.queued_dttm = timezone.utcnow()
-                session.merge(self)
+                ti.queued_dttm = timezone.utcnow()
+                session.merge(ti)
                 session.commit()
                 return False
 
-        if self.next_kwargs is not None:
-            self.log.info("Resuming after deferral")
+        if ti.next_kwargs is not None:
+            ti.log.info("Resuming after deferral")

Review Comment:
   Same



##########
airflow/models/taskinstance.py:
##########
@@ -2107,47 +2147,80 @@ def check_and_change_state_before_execution(
                 ignore_ti_state=ignore_ti_state,
                 description="requeueable deps",
             )
-            if not self.are_dependencies_met(dep_context=dep_context, 
session=session, verbose=True):
-                self.state = None
-                self.log.warning(
+            if not ti.are_dependencies_met(dep_context=dep_context, 
session=session, verbose=True):
+                ti.state = None
+                ti.log.warning(
                     "Rescheduling due to concurrency limits reached "
                     "at task runtime. Attempt %s of "
                     "%s. State set to NONE.",
-                    self.try_number,
-                    self.max_tries + 1,
+                    ti.try_number,
+                    ti.max_tries + 1,
                 )
-                self.queued_dttm = timezone.utcnow()
-                session.merge(self)
+                ti.queued_dttm = timezone.utcnow()
+                session.merge(ti)
                 session.commit()
                 return False
 
-        if self.next_kwargs is not None:
-            self.log.info("Resuming after deferral")
+        if ti.next_kwargs is not None:
+            ti.log.info("Resuming after deferral")
         else:
-            self.log.info("Starting attempt %s of %s", self.try_number, 
self.max_tries + 1)
-        self._try_number += 1
+            ti.log.info("Starting attempt %s of %s", ti.try_number, 
ti.max_tries + 1)
+        ti._try_number += 1
 
         if not test_mode:
-            session.add(Log(TaskInstanceState.RUNNING.value, self))
+            session.add(Log(TaskInstanceState.RUNNING.value, ti))
 
-        self.state = TaskInstanceState.RUNNING
-        self.emit_state_change_metric(TaskInstanceState.RUNNING)
-        self.external_executor_id = external_executor_id
-        self.end_date = None
+        ti.state = TaskInstanceState.RUNNING
+        ti.emit_state_change_metric(TaskInstanceState.RUNNING)
+        ti.external_executor_id = external_executor_id
+        ti.end_date = None
         if not test_mode:
-            session.merge(self).task = task
+            session.merge(ti).task = task
         session.commit()
 
         # Closing all pooled connections to prevent
         # "max number of connections reached"
         settings.engine.dispose()  # type: ignore
         if verbose:
             if mark_success:
-                self.log.info("Marking success for %s on %s", self.task, 
self.execution_date)
+                ti.log.info("Marking success for %s on %s", ti.task, 
ti.execution_date)

Review Comment:
   Same



##########
airflow/models/taskinstance.py:
##########
@@ -2107,47 +2147,80 @@ def check_and_change_state_before_execution(
                 ignore_ti_state=ignore_ti_state,
                 description="requeueable deps",
             )
-            if not self.are_dependencies_met(dep_context=dep_context, 
session=session, verbose=True):
-                self.state = None
-                self.log.warning(
+            if not ti.are_dependencies_met(dep_context=dep_context, 
session=session, verbose=True):
+                ti.state = None
+                ti.log.warning(
                     "Rescheduling due to concurrency limits reached "
                     "at task runtime. Attempt %s of "
                     "%s. State set to NONE.",
-                    self.try_number,
-                    self.max_tries + 1,
+                    ti.try_number,
+                    ti.max_tries + 1,
                 )
-                self.queued_dttm = timezone.utcnow()
-                session.merge(self)
+                ti.queued_dttm = timezone.utcnow()
+                session.merge(ti)
                 session.commit()
                 return False
 
-        if self.next_kwargs is not None:
-            self.log.info("Resuming after deferral")
+        if ti.next_kwargs is not None:
+            ti.log.info("Resuming after deferral")
         else:
-            self.log.info("Starting attempt %s of %s", self.try_number, 
self.max_tries + 1)
-        self._try_number += 1
+            ti.log.info("Starting attempt %s of %s", ti.try_number, 
ti.max_tries + 1)
+        ti._try_number += 1
 
         if not test_mode:
-            session.add(Log(TaskInstanceState.RUNNING.value, self))
+            session.add(Log(TaskInstanceState.RUNNING.value, ti))
 
-        self.state = TaskInstanceState.RUNNING
-        self.emit_state_change_metric(TaskInstanceState.RUNNING)
-        self.external_executor_id = external_executor_id
-        self.end_date = None
+        ti.state = TaskInstanceState.RUNNING
+        ti.emit_state_change_metric(TaskInstanceState.RUNNING)
+        ti.external_executor_id = external_executor_id
+        ti.end_date = None
         if not test_mode:
-            session.merge(self).task = task
+            session.merge(ti).task = task
         session.commit()
 
         # Closing all pooled connections to prevent
         # "max number of connections reached"
         settings.engine.dispose()  # type: ignore
         if verbose:
             if mark_success:
-                self.log.info("Marking success for %s on %s", self.task, 
self.execution_date)
+                ti.log.info("Marking success for %s on %s", ti.task, 
ti.execution_date)
             else:
-                self.log.info("Executing %s on %s", self.task, 
self.execution_date)
+                ti.log.info("Executing %s on %s", ti.task, ti.execution_date)

Review Comment:
   Same



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to