potiuk commented on code in PR #53435:
URL: https://github.com/apache/airflow/pull/53435#discussion_r2219251295


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2016,7 +2017,7 @@ def _maybe_requeue_stuck_ti(self, *, ti, session):
                     extra=f"Task was requeued more than 
{self._num_stuck_queued_retries} times and will be failed.",
                 )
             )
-            ti.set_state(TaskInstanceState.FAILED, session=session)
+            executor.fail(ti.key)

Review Comment:
    BaseExecutor fail (last line):
   
   ```
      def fail(self, key: TaskInstanceKey, info=None) -> None:
           """
           Set fail state for the event.
   
           :param info: Executor information for the task instance
           :param key: Unique key for the task instance
           """
           trace_id = Trace.get_current_span().get_span_context().trace_id
           if trace_id != NO_TRACE_ID:
               span_id = int(gen_span_id_from_ti_key(key, as_int=True))
               with DebugTrace.start_span(
                   span_name="fail",
                   component="BaseExecutor",
                   parent_sc=gen_context(trace_id=trace_id, span_id=span_id),
               ) as span:
                   span.set_attributes(
                       {
                           "dag_id": key.dag_id,
                           "run_id": key.run_id,
                           "task_id": key.task_id,
                           "try_number": key.try_number,
                           "error": True,
                       }
                   )
   
           self.change_state(key, TaskInstanceState.FAILED, info)
   
   ```



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2016,7 +2017,7 @@ def _maybe_requeue_stuck_ti(self, *, ti, session):
                     extra=f"Task was requeued more than 
{self._num_stuck_queued_retries} times and will be failed.",
                 )
             )
-            ti.set_state(TaskInstanceState.FAILED, session=session)
+            executor.fail(ti.key)

Review Comment:
    BaseExecutor fail (last line):
   
   ```python
      def fail(self, key: TaskInstanceKey, info=None) -> None:
           """
           Set fail state for the event.
   
           :param info: Executor information for the task instance
           :param key: Unique key for the task instance
           """
           trace_id = Trace.get_current_span().get_span_context().trace_id
           if trace_id != NO_TRACE_ID:
               span_id = int(gen_span_id_from_ti_key(key, as_int=True))
               with DebugTrace.start_span(
                   span_name="fail",
                   component="BaseExecutor",
                   parent_sc=gen_context(trace_id=trace_id, span_id=span_id),
               ) as span:
                   span.set_attributes(
                       {
                           "dag_id": key.dag_id,
                           "run_id": key.run_id,
                           "task_id": key.task_id,
                           "try_number": key.try_number,
                           "error": True,
                       }
                   )
   
           self.change_state(key, TaskInstanceState.FAILED, info)
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to