tirkarthi opened a new issue, #55182:
URL: https://github.com/apache/airflow/issues/55182

   ### Apache Airflow version
   
   main (development)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   I am not sure of a consistent way to reproduce this but when there is a 
mapped task instance in queued state that is marked as failed from the UI then 
the scheduler crashes with the below stacktrace. I had below print statement to 
debug the `failure_context`.
   
   ```patch
   diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
   index 27a46b366d..aad1f0bdff 100644
   --- a/airflow-core/src/airflow/models/taskinstance.py
   +++ b/airflow-core/src/airflow/models/taskinstance.py
   @@ -1688,6 +1688,7 @@ class TaskInstance(Base, LoggingMixin):
            )
    
            _log_state(task_instance=self)
   +        print("failure_context ", failure_context)
            if (
                (failure_task := failure_context["task"])
                and failure_context["email_for_state"](failure_task)
   ```
   
   ```
   [2025-09-02T19:06:50.623+0530] {taskinstance.py:1579} ERROR - Executor 
LocalExecutor(parallelism=32) reported that the task instance <TaskInstance: 
mapped_task_email_on_failure.failure manual__2025-09-02T13:35:34.329442+00:00 
map_index=42 [queued]> finished with state failed, but the task instance's 
state attribute is queued. Learn more: 
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally
   
   Task instance in failure state
   Task instance's state was changed through the API.
   Task operator:_PythonDecoratedOperator
   Failure caused by Executor LocalExecutor(parallelism=32) reported that the 
task instance <TaskInstance: mapped_task_email_on_failure.failure 
manual__2025-09-02T13:35:34.329442+00:00 map_index=42 [queued]> finished with 
state failed, but the task instance's state attribute is queued. Learn more: 
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally
   
   
   [2025-09-02T19:06:50.647+0530] {taskinstance.py:1690} INFO - Marking task as 
FAILED. dag_id=mapped_task_email_on_failure, task_id=failure, 
run_id=manual__2025-09-02T13:35:34.329442+00:00, map_index=42, 
logical_date=20250902T133532, start_date=, end_date=20250902T133650
   
   failure_context  {'ti': <TaskInstance: mapped_task_email_on_failure.failure 
manual__2025-09-02T13:35:34.329442+00:00 map_index=42 [failed]>, 
'email_for_state': operator.attrgetter('email_on_failure'), 'task': 
<airflow.models.mappedoperator.MappedOperator object at 0x7fdecd4ceb10>, 
'context': {'dag': <DAG: mapped_task_email_on_failure>, 'inlets': [], 
'map_index_template': None, 'outlets': [], 'run_id': 
'manual__2025-09-02T13:35:34.329442+00:00', 'task': 
<airflow.models.mappedoperator.MappedOperator object at 0x7fdecd4ceb10>, 
'task_instance': <TaskInstance: mapped_task_email_on_failure.failure 
manual__2025-09-02T13:35:34.329442+00:00 map_index=42 [failed]>, 'ti': 
<TaskInstance: mapped_task_email_on_failure.failure 
manual__2025-09-02T13:35:34.329442+00:00 map_index=42 [failed]>, 
'outlet_events': <airflow.utils.context.OutletEventAccessors object at 
0x7fdeccc0c990>, 'inlet_events': InletEventsAccessors(_inlets=[], _assets={}, 
_asset_aliases={}), 'macros': <MacrosAccessor (dynamic acce
 ss to macros)>, 'params': {}, 'var': {'json': <VariableAccessor (dynamic 
access)>, 'value': <VariableAccessor (dynamic access)>}, 'conn': 
<ConnectionAccessor (dynamic access)>, 'dag_run': 
DagRun(dag_id='mapped_task_email_on_failure', 
run_id='manual__2025-09-02T13:35:34.329442+00:00', 
logical_date=datetime.datetime(2025, 9, 2, 13, 35, 32, 961000, 
tzinfo=Timezone('UTC')), data_interval_start=datetime.datetime(2025, 9, 2, 13, 
35, 32, 961000, tzinfo=Timezone('UTC')), 
data_interval_end=datetime.datetime(2025, 9, 2, 13, 35, 32, 961000, 
tzinfo=Timezone('UTC')), run_after=datetime.datetime(2025, 9, 2, 13, 35, 32, 
961000, tzinfo=Timezone('UTC')), start_date=datetime.datetime(2025, 9, 2, 13, 
35, 42, 487888, tzinfo=Timezone('UTC')), end_date=None, clear_number=0, 
run_type=<DagRunType.MANUAL: 'manual'>, state=<DagRunState.RUNNING: 'running'>, 
conf={}, consumed_asset_events=[]), 'triggering_asset_events': <Proxy at 
0x7fdeccc2df50 with factory <function 
TaskInstance.get_template_context.<locals>.
 get_triggering_events at 0x7fdeccd9f100>>, 'task_instance_key_str': 
'mapped_task_email_on_failure__failure__20250902', 'task_reschedule_count': 0, 
'prev_start_date_success': None, 'prev_end_date_success': None, 'logical_date': 
DateTime(2025, 9, 2, 13, 35, 32, 961000, tzinfo=Timezone('UTC')), 'ds': 
'2025-09-02', 'ds_nodash': '20250902', 'ts': 
'2025-09-02T13:35:32.961000+00:00', 'ts_nodash': '20250902T133532', 
'ts_nodash_with_tz': '20250902T133532.961000+0000', 'data_interval_end': 
DateTime(2025, 9, 2, 13, 35, 32, 961000, tzinfo=Timezone('UTC')), 
'data_interval_start': DateTime(2025, 9, 2, 13, 35, 32, 961000, 
tzinfo=Timezone('UTC')), 'prev_data_interval_start_success': None, 
'prev_data_interval_end_success': None, 'test_mode': False, 
'expanded_ti_count': 200, 'exception': "Executor LocalExecutor(parallelism=32) 
reported that the task instance <TaskInstance: 
mapped_task_email_on_failure.failure manual__2025-09-02T13:35:34.329442+00:00 
map_index=42 [queued]> finished with state failed, 
 but the task instance's state attribute is queued. Learn more: 
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally"}}
   [2025-09-02T19:06:50.650+0530] {scheduler_job_runner.py:998} ERROR - 
Exception when executing SchedulerJob._run_scheduler_loop
   Traceback (most recent call last):
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py",
 line 994, in _execute
       self._run_scheduler_loop()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py",
 line 1297, in _run_scheduler_loop
       num_finished_events += self._process_executor_events(
                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py",
 line 741, in _process_executor_events
       return SchedulerJobRunner.process_executor_events(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py",
 line 932, in process_executor_events
       ti.handle_failure(error=msg, session=session)
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/session.py",
 line 98, in wrapper
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/models/taskinstance.py",
 line 1694, in handle_failure
       and failure_context["email_for_state"](failure_task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "<attrs generated getattr 
airflow.models.mappedoperator.MappedOperator>", line 11, in __getattr__
       return super().__getattribute__(item)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   AttributeError: 'MappedOperator' object has no attribute 'email_on_failure'
   
   [2025-09-02T19:07:01.857+0530] {base_executor.py:382} DEBUG - Changing 
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure', 
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00', 
try_number=1, map_index=44)
   [2025-09-02T19:07:01.857+0530] {base_executor.py:387} DEBUG - Could not find 
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure', 
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=44)
   [2025-09-02T19:07:01.857+0530] {base_executor.py:382} DEBUG - Changing 
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure', 
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00', 
try_number=1, map_index=51)
   [2025-09-02T19:07:01.858+0530] {base_executor.py:387} DEBUG - Could not find 
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure', 
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=51)
   [2025-09-02T19:07:01.858+0530] {base_executor.py:382} DEBUG - Changing 
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure', 
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00', 
try_number=1, map_index=47)
   [2025-09-02T19:07:01.858+0530] {base_executor.py:387} DEBUG - Could not find 
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure', 
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=47)
   [2025-09-02T19:07:01.858+0530] {base_executor.py:382} DEBUG - Changing 
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure', 
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00', 
try_number=1, map_index=46)
   [2025-09-02T19:07:01.858+0530] {base_executor.py:387} DEBUG - Could not find 
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure', 
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=46)
   [2025-09-02T19:07:01.859+0530] {base_executor.py:382} DEBUG - Changing 
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure', 
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00', 
try_number=1, map_index=56)
   [2025-09-02T19:07:01.859+0530] {base_executor.py:387} DEBUG - Could not find 
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure', 
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=56)
   [2025-09-02T19:07:01.859+0530] {base_executor.py:382} DEBUG - Changing 
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure', 
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00', 
try_number=1, map_index=55)
   [2025-09-02T19:07:01.860+0530] {base_executor.py:387} DEBUG - Could not find 
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure', 
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=55)
   [2025-09-02T19:07:01.860+0530] {base_executor.py:382} DEBUG - Changing 
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure', 
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00', 
try_number=1, map_index=50)
   [2025-09-02T19:07:01.860+0530] {base_executor.py:387} DEBUG - Could not find 
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure', 
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=50)
   [2025-09-02T19:07:01.860+0530] {base_executor.py:382} DEBUG - Changing 
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure', 
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00', 
try_number=1, map_index=49)
   [2025-09-02T19:07:01.860+0530] {base_executor.py:387} DEBUG - Could not find 
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure', 
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=49)
   [2025-09-02T19:07:01.861+0530] {base_executor.py:382} DEBUG - Changing 
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure', 
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00', 
try_number=1, map_index=53)
   [2025-09-02T19:07:01.861+0530] {base_executor.py:387} DEBUG - Could not find 
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure', 
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=53)
   [2025-09-02T19:07:01.861+0530] {base_executor.py:382} DEBUG - Changing 
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure', 
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00', 
try_number=1, map_index=52)
   [2025-09-02T19:07:01.862+0530] {base_executor.py:387} DEBUG - Could not find 
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure', 
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=52)
   [2025-09-02T19:07:01.862+0530] {base_executor.py:382} DEBUG - Changing 
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure', 
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00', 
try_number=1, map_index=54)
   [2025-09-02T19:07:01.862+0530] {base_executor.py:387} DEBUG - Could not find 
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure', 
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=54)
   [2025-09-02T19:07:01.863+0530] {base_executor.py:382} DEBUG - Changing 
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure', 
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00', 
try_number=2, map_index=29)
   [2025-09-02T19:07:01.863+0530] {base_executor.py:387} DEBUG - Could not find 
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure', 
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=2, map_index=29)
   [2025-09-02T19:07:01.863+0530] {base_executor.py:382} DEBUG - Changing 
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure', 
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00', 
try_number=1, map_index=45)
   [2025-09-02T19:07:01.863+0530] {base_executor.py:387} DEBUG - Could not find 
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure', 
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=45)
   [2025-09-02T19:07:01.864+0530] {base_executor.py:382} DEBUG - Changing 
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure', 
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00', 
try_number=1, map_index=48)
   [2025-09-02T19:07:01.864+0530] {base_executor.py:387} DEBUG - Could not find 
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure', 
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=48)
   [2025-09-02T19:07:01.864+0530] {scheduler_job_runner.py:1010} INFO - Exited 
execute loop
   [2025-09-02T19:07:01.864+0530] {listener.py:37} DEBUG - Calling 
'\x1b[1mbefore_stopping\x1b[22m' with "\x1b[1m{'component': 
<airflow.jobs.job.Job object at 0x7fdecd804350>}\x1b[22m"
   [2025-09-02T19:07:01.865+0530] {listener.py:38} DEBUG - Hook impls: []
   [2025-09-02T19:07:01.865+0530] {listener.py:42} DEBUG - Result from 
'\x1b[1mbefore_stopping\x1b[22m': []
   [2025-09-02T19:07:01.874+0530] {cli_action_loggers.py:98} DEBUG - Calling 
callbacks: []
   Traceback (most recent call last):
     File "/home/karthikeyan/stuff/python/airflow/.venv/bin/airflow", line 10, 
in <module>
       sys.exit(main())
                ^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/__main__.py", 
line 55, in main
       args.func(args)
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/cli_config.py",
 line 49, in command
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/cli.py", 
line 115, in wrapper
       return f(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/providers_configuration_loader.py",
 line 54, in wrapped_function
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py",
 line 52, in scheduler
       run_command_with_daemon_option(
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/commands/daemon_utils.py",
 line 86, in run_command_with_daemon_option
       callback()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py",
 line 55, in <lambda>
       callback=lambda: _run_scheduler_job(args),
                        ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py",
 line 43, in _run_scheduler_job
       run_job(job=job_runner.job, execute_callable=job_runner._execute)
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/session.py",
 line 100, in wrapper
       return func(*args, session=session, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/job.py", 
line 368, in run_job
       return execute_job(job, execute_callable=execute_callable)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/job.py", 
line 397, in execute_job
       ret = execute_callable()
             ^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py",
 line 994, in _execute
       self._run_scheduler_loop()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py",
 line 1297, in _run_scheduler_loop
       num_finished_events += self._process_executor_events(
                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py",
 line 741, in _process_executor_events
       return SchedulerJobRunner.process_executor_events(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py",
 line 932, in process_executor_events
       ti.handle_failure(error=msg, session=session)
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/session.py",
 line 98, in wrapper
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/models/taskinstance.py",
 line 1694, in handle_failure
       and failure_context["email_for_state"](failure_task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "<attrs generated getattr 
airflow.models.mappedoperator.MappedOperator>", line 11, in __getattr__
   AttributeError: 'MappedOperator' object has no attribute 'email_on_failure'
   INFO:     Received SIGTERM, exiting.
   INFO:     Terminated child process [34635]
   INFO:     Terminated child process [34636]
   INFO:     Waiting for child process [34635]
   INFO:     Shutting down
   INFO:     Shutting down
   INFO:     Waiting for application shutdown.
   INFO:     Application shutdown complete.
   INFO:     Finished server process [34635]
   INFO:     Waiting for application shutdown.
   INFO:     Application shutdown complete.
   INFO:     Finished server process [34636]
   INFO:     Waiting for child process [34636]
   INFO:     Stopping parent process [34630]
   [2025-09-02T19:07:02.491+0530] {settings.py:539} DEBUG - Disposing DB 
connection pool (PID 34613)
   
   
   ```
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   1. Try running the below dag with LocalExecutor.
   2. While there are many mapped task instances try to mark the dagrun as 
failed from UI.
   3. Check the scheduler logs.
   
   ```python
   from __future__ import annotations
   
   import datetime
   
   from airflow.sdk import DAG, task
   
   with DAG(
       dag_id="mapped_task_email_on_failure",
       start_date=datetime.datetime(2022, 3, 4),
       catchup=False,
       default_args={"email_on_failure": True, 'email': 
['[email protected]']},
   ) as dag:
   
       @task(task_id="get_names")
       def get_names(**kwargs) -> list[str]:
           return ["foo", "bar"] * 100
   
       @task(task_id="failure")
       def process_name(name: str) -> str:
           1 / 0
           import time
   
           time.sleep(100)
           return f"Hello {name}!"
   
       names = get_names()
       process_name.expand(name=names)
   ```
   
   ### Operating System
   
   Ubuntu 20.04
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to