tirkarthi opened a new issue, #55182:
URL: https://github.com/apache/airflow/issues/55182
### Apache Airflow version
main (development)
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
I am not sure of a consistent way to reproduce this but when there is a
mapped task instance in queued state that is marked as failed from the UI then
the scheduler crashes with the below stacktrace. I had below print statement to
debug the `failure_context`.
```patch
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index 27a46b366d..aad1f0bdff 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -1688,6 +1688,7 @@ class TaskInstance(Base, LoggingMixin):
)
_log_state(task_instance=self)
+ print("failure_context ", failure_context)
if (
(failure_task := failure_context["task"])
and failure_context["email_for_state"](failure_task)
```
```
[2025-09-02T19:06:50.623+0530] {taskinstance.py:1579} ERROR - Executor
LocalExecutor(parallelism=32) reported that the task instance <TaskInstance:
mapped_task_email_on_failure.failure manual__2025-09-02T13:35:34.329442+00:00
map_index=42 [queued]> finished with state failed, but the task instance's
state attribute is queued. Learn more:
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally
Task instance in failure state
Task instance's state was changed through the API.
Task operator:_PythonDecoratedOperator
Failure caused by Executor LocalExecutor(parallelism=32) reported that the
task instance <TaskInstance: mapped_task_email_on_failure.failure
manual__2025-09-02T13:35:34.329442+00:00 map_index=42 [queued]> finished with
state failed, but the task instance's state attribute is queued. Learn more:
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally
[2025-09-02T19:06:50.647+0530] {taskinstance.py:1690} INFO - Marking task as
FAILED. dag_id=mapped_task_email_on_failure, task_id=failure,
run_id=manual__2025-09-02T13:35:34.329442+00:00, map_index=42,
logical_date=20250902T133532, start_date=, end_date=20250902T133650
failure_context {'ti': <TaskInstance: mapped_task_email_on_failure.failure
manual__2025-09-02T13:35:34.329442+00:00 map_index=42 [failed]>,
'email_for_state': operator.attrgetter('email_on_failure'), 'task':
<airflow.models.mappedoperator.MappedOperator object at 0x7fdecd4ceb10>,
'context': {'dag': <DAG: mapped_task_email_on_failure>, 'inlets': [],
'map_index_template': None, 'outlets': [], 'run_id':
'manual__2025-09-02T13:35:34.329442+00:00', 'task':
<airflow.models.mappedoperator.MappedOperator object at 0x7fdecd4ceb10>,
'task_instance': <TaskInstance: mapped_task_email_on_failure.failure
manual__2025-09-02T13:35:34.329442+00:00 map_index=42 [failed]>, 'ti':
<TaskInstance: mapped_task_email_on_failure.failure
manual__2025-09-02T13:35:34.329442+00:00 map_index=42 [failed]>,
'outlet_events': <airflow.utils.context.OutletEventAccessors object at
0x7fdeccc0c990>, 'inlet_events': InletEventsAccessors(_inlets=[], _assets={},
_asset_aliases={}), 'macros': <MacrosAccessor (dynamic acce
ss to macros)>, 'params': {}, 'var': {'json': <VariableAccessor (dynamic
access)>, 'value': <VariableAccessor (dynamic access)>}, 'conn':
<ConnectionAccessor (dynamic access)>, 'dag_run':
DagRun(dag_id='mapped_task_email_on_failure',
run_id='manual__2025-09-02T13:35:34.329442+00:00',
logical_date=datetime.datetime(2025, 9, 2, 13, 35, 32, 961000,
tzinfo=Timezone('UTC')), data_interval_start=datetime.datetime(2025, 9, 2, 13,
35, 32, 961000, tzinfo=Timezone('UTC')),
data_interval_end=datetime.datetime(2025, 9, 2, 13, 35, 32, 961000,
tzinfo=Timezone('UTC')), run_after=datetime.datetime(2025, 9, 2, 13, 35, 32,
961000, tzinfo=Timezone('UTC')), start_date=datetime.datetime(2025, 9, 2, 13,
35, 42, 487888, tzinfo=Timezone('UTC')), end_date=None, clear_number=0,
run_type=<DagRunType.MANUAL: 'manual'>, state=<DagRunState.RUNNING: 'running'>,
conf={}, consumed_asset_events=[]), 'triggering_asset_events': <Proxy at
0x7fdeccc2df50 with factory <function
TaskInstance.get_template_context.<locals>.
get_triggering_events at 0x7fdeccd9f100>>, 'task_instance_key_str':
'mapped_task_email_on_failure__failure__20250902', 'task_reschedule_count': 0,
'prev_start_date_success': None, 'prev_end_date_success': None, 'logical_date':
DateTime(2025, 9, 2, 13, 35, 32, 961000, tzinfo=Timezone('UTC')), 'ds':
'2025-09-02', 'ds_nodash': '20250902', 'ts':
'2025-09-02T13:35:32.961000+00:00', 'ts_nodash': '20250902T133532',
'ts_nodash_with_tz': '20250902T133532.961000+0000', 'data_interval_end':
DateTime(2025, 9, 2, 13, 35, 32, 961000, tzinfo=Timezone('UTC')),
'data_interval_start': DateTime(2025, 9, 2, 13, 35, 32, 961000,
tzinfo=Timezone('UTC')), 'prev_data_interval_start_success': None,
'prev_data_interval_end_success': None, 'test_mode': False,
'expanded_ti_count': 200, 'exception': "Executor LocalExecutor(parallelism=32)
reported that the task instance <TaskInstance:
mapped_task_email_on_failure.failure manual__2025-09-02T13:35:34.329442+00:00
map_index=42 [queued]> finished with state failed,
but the task instance's state attribute is queued. Learn more:
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally"}}
[2025-09-02T19:06:50.650+0530] {scheduler_job_runner.py:998} ERROR -
Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py",
line 994, in _execute
self._run_scheduler_loop()
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py",
line 1297, in _run_scheduler_loop
num_finished_events += self._process_executor_events(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py",
line 741, in _process_executor_events
return SchedulerJobRunner.process_executor_events(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py",
line 932, in process_executor_events
ti.handle_failure(error=msg, session=session)
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/session.py",
line 98, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/models/taskinstance.py",
line 1694, in handle_failure
and failure_context["email_for_state"](failure_task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<attrs generated getattr
airflow.models.mappedoperator.MappedOperator>", line 11, in __getattr__
return super().__getattribute__(item)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'MappedOperator' object has no attribute 'email_on_failure'
[2025-09-02T19:07:01.857+0530] {base_executor.py:382} DEBUG - Changing
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure',
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00',
try_number=1, map_index=44)
[2025-09-02T19:07:01.857+0530] {base_executor.py:387} DEBUG - Could not find
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure',
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=44)
[2025-09-02T19:07:01.857+0530] {base_executor.py:382} DEBUG - Changing
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure',
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00',
try_number=1, map_index=51)
[2025-09-02T19:07:01.858+0530] {base_executor.py:387} DEBUG - Could not find
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure',
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=51)
[2025-09-02T19:07:01.858+0530] {base_executor.py:382} DEBUG - Changing
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure',
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00',
try_number=1, map_index=47)
[2025-09-02T19:07:01.858+0530] {base_executor.py:387} DEBUG - Could not find
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure',
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=47)
[2025-09-02T19:07:01.858+0530] {base_executor.py:382} DEBUG - Changing
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure',
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00',
try_number=1, map_index=46)
[2025-09-02T19:07:01.858+0530] {base_executor.py:387} DEBUG - Could not find
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure',
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=46)
[2025-09-02T19:07:01.859+0530] {base_executor.py:382} DEBUG - Changing
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure',
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00',
try_number=1, map_index=56)
[2025-09-02T19:07:01.859+0530] {base_executor.py:387} DEBUG - Could not find
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure',
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=56)
[2025-09-02T19:07:01.859+0530] {base_executor.py:382} DEBUG - Changing
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure',
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00',
try_number=1, map_index=55)
[2025-09-02T19:07:01.860+0530] {base_executor.py:387} DEBUG - Could not find
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure',
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=55)
[2025-09-02T19:07:01.860+0530] {base_executor.py:382} DEBUG - Changing
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure',
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00',
try_number=1, map_index=50)
[2025-09-02T19:07:01.860+0530] {base_executor.py:387} DEBUG - Could not find
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure',
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=50)
[2025-09-02T19:07:01.860+0530] {base_executor.py:382} DEBUG - Changing
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure',
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00',
try_number=1, map_index=49)
[2025-09-02T19:07:01.860+0530] {base_executor.py:387} DEBUG - Could not find
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure',
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=49)
[2025-09-02T19:07:01.861+0530] {base_executor.py:382} DEBUG - Changing
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure',
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00',
try_number=1, map_index=53)
[2025-09-02T19:07:01.861+0530] {base_executor.py:387} DEBUG - Could not find
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure',
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=53)
[2025-09-02T19:07:01.861+0530] {base_executor.py:382} DEBUG - Changing
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure',
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00',
try_number=1, map_index=52)
[2025-09-02T19:07:01.862+0530] {base_executor.py:387} DEBUG - Could not find
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure',
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=52)
[2025-09-02T19:07:01.862+0530] {base_executor.py:382} DEBUG - Changing
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure',
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00',
try_number=1, map_index=54)
[2025-09-02T19:07:01.862+0530] {base_executor.py:387} DEBUG - Could not find
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure',
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=54)
[2025-09-02T19:07:01.863+0530] {base_executor.py:382} DEBUG - Changing
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure',
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00',
try_number=2, map_index=29)
[2025-09-02T19:07:01.863+0530] {base_executor.py:387} DEBUG - Could not find
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure',
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=2, map_index=29)
[2025-09-02T19:07:01.863+0530] {base_executor.py:382} DEBUG - Changing
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure',
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00',
try_number=1, map_index=45)
[2025-09-02T19:07:01.863+0530] {base_executor.py:387} DEBUG - Could not find
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure',
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=45)
[2025-09-02T19:07:01.864+0530] {base_executor.py:382} DEBUG - Changing
state: TaskInstanceKey(dag_id='mapped_task_email_on_failure',
task_id='failure', run_id='manual__2025-09-02T13:35:34.329442+00:00',
try_number=1, map_index=48)
[2025-09-02T19:07:01.864+0530] {base_executor.py:387} DEBUG - Could not find
key: TaskInstanceKey(dag_id='mapped_task_email_on_failure', task_id='failure',
run_id='manual__2025-09-02T13:35:34.329442+00:00', try_number=1, map_index=48)
[2025-09-02T19:07:01.864+0530] {scheduler_job_runner.py:1010} INFO - Exited
execute loop
[2025-09-02T19:07:01.864+0530] {listener.py:37} DEBUG - Calling
'\x1b[1mbefore_stopping\x1b[22m' with "\x1b[1m{'component':
<airflow.jobs.job.Job object at 0x7fdecd804350>}\x1b[22m"
[2025-09-02T19:07:01.865+0530] {listener.py:38} DEBUG - Hook impls: []
[2025-09-02T19:07:01.865+0530] {listener.py:42} DEBUG - Result from
'\x1b[1mbefore_stopping\x1b[22m': []
[2025-09-02T19:07:01.874+0530] {cli_action_loggers.py:98} DEBUG - Calling
callbacks: []
Traceback (most recent call last):
File "/home/karthikeyan/stuff/python/airflow/.venv/bin/airflow", line 10,
in <module>
sys.exit(main())
^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/__main__.py",
line 55, in main
args.func(args)
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/cli_config.py",
line 49, in command
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/cli.py",
line 115, in wrapper
return f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/providers_configuration_loader.py",
line 54, in wrapped_function
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py",
line 52, in scheduler
run_command_with_daemon_option(
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/commands/daemon_utils.py",
line 86, in run_command_with_daemon_option
callback()
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py",
line 55, in <lambda>
callback=lambda: _run_scheduler_job(args),
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py",
line 43, in _run_scheduler_job
run_job(job=job_runner.job, execute_callable=job_runner._execute)
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/session.py",
line 100, in wrapper
return func(*args, session=session, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/job.py",
line 368, in run_job
return execute_job(job, execute_callable=execute_callable)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/job.py",
line 397, in execute_job
ret = execute_callable()
^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py",
line 994, in _execute
self._run_scheduler_loop()
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py",
line 1297, in _run_scheduler_loop
num_finished_events += self._process_executor_events(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py",
line 741, in _process_executor_events
return SchedulerJobRunner.process_executor_events(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py",
line 932, in process_executor_events
ti.handle_failure(error=msg, session=session)
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/session.py",
line 98, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/models/taskinstance.py",
line 1694, in handle_failure
and failure_context["email_for_state"](failure_task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<attrs generated getattr
airflow.models.mappedoperator.MappedOperator>", line 11, in __getattr__
AttributeError: 'MappedOperator' object has no attribute 'email_on_failure'
INFO: Received SIGTERM, exiting.
INFO: Terminated child process [34635]
INFO: Terminated child process [34636]
INFO: Waiting for child process [34635]
INFO: Shutting down
INFO: Shutting down
INFO: Waiting for application shutdown.
INFO: Application shutdown complete.
INFO: Finished server process [34635]
INFO: Waiting for application shutdown.
INFO: Application shutdown complete.
INFO: Finished server process [34636]
INFO: Waiting for child process [34636]
INFO: Stopping parent process [34630]
[2025-09-02T19:07:02.491+0530] {settings.py:539} DEBUG - Disposing DB
connection pool (PID 34613)
```
### What you think should happen instead?
_No response_
### How to reproduce
1. Try running the below dag with LocalExecutor.
2. While there are many mapped task instances try to mark the dagrun as
failed from UI.
3. Check the scheduler logs.
```python
from __future__ import annotations
import datetime
from airflow.sdk import DAG, task
with DAG(
dag_id="mapped_task_email_on_failure",
start_date=datetime.datetime(2022, 3, 4),
catchup=False,
default_args={"email_on_failure": True, 'email':
['[email protected]']},
) as dag:
@task(task_id="get_names")
def get_names(**kwargs) -> list[str]:
return ["foo", "bar"] * 100
@task(task_id="failure")
def process_name(name: str) -> str:
1 / 0
import time
time.sleep(100)
return f"Hello {name}!"
names = get_names()
process_name.expand(name=names)
```
### Operating System
Ubuntu 20.04
### Versions of Apache Airflow Providers
_No response_
### Deployment
Virtualenv installation
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]