vatsrahul1001 opened a new issue, #55211:
URL: https://github.com/apache/airflow/issues/55211
### Apache Airflow version
main (development)
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
Scheduler crashed with below error
```
scheduler [2025-09-03T09:35:11.810+0000] {taskinstance.py:1692} INFO -
Marking task as FAILED. dag_id=datetime_mapped, task_id=expanded_timedelta,
run_id=manual__2025-09-03T09:35:09.924713+00:00, map_index=0,
logical_date=20250903T093508, start_date=, end_date=20250903T093511
scheduler [2025-09-03T09:35:11.812+0000] {scheduler_job_runner.py:998} ERROR
- Exception when executing SchedulerJob._run_scheduler_loop
scheduler Traceback (most recent call last):
scheduler File
"/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
line 994, in _execute
scheduler self._run_scheduler_loop()
scheduler File
"/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1297, in _run_scheduler_loop
scheduler num_finished_events += self._process_executor_events(
scheduler ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
scheduler File
"/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
line 741, in _process_executor_events
scheduler return SchedulerJobRunner.process_executor_events(
scheduler ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
scheduler File
"/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
line 932, in process_executor_events
scheduler ti.handle_failure(error=msg, session=session)
scheduler File
"/usr/local/lib/python3.12/site-packages/airflow/utils/session.py", line 98, in
wrapper
scheduler return func(*args, **kwargs)
scheduler ^^^^^^^^^^^^^^^^^^^^^
scheduler File
"/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line
1695, in handle_failure
scheduler and failure_context["email_for_state"](failure_task)
scheduler ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
scheduler File "<attrs generated getattr
airflow.models.mappedoperator.MappedOperator>", line 11, in __getattr__
scheduler return super().__getattribute__(item)
scheduler ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
scheduler AttributeError: 'MappedOperator' object has no attribute
'email_on_failure'
scheduler [2025-09-03T09:35:11.818+0000] {kubernetes_executor.py:617} INFO -
Shutting down Kubernetes executor
scheduler [2025-09-03T09:35:12.062+0000] {kubernetes_executor_utils.py:265}
INFO - Event: datetime-mapped-static-datetime-sexsqeeu is Running, annotations:
<omitted>
scheduler [2025-09-03T09:35:12.082+0000] {kubernetes_executor_utils.py:265}
INFO - Event: datetime-mapped-static-timedelta-uqmyfzln is Running,
annotations: <omitted>
scheduler [2025-09-03T09:35:24.629+0000] {kubernetes_executor_utils.py:265}
INFO - Event: datetime-mapped-get-delays-zyibzufw is Running, annotations:
<omitted>
scheduler [2025-09-03T09:35:25.829+0000] {kubernetes_executor_utils.py:252}
INFO - Event: datetime-mapped-get-delays-zyibzufw Succeeded, annotations:
<omitted>
scheduler [2025-09-03T09:35:26.122+0000] {scheduler_job_runner.py:252} INFO
- Exiting gracefully upon receiving signal 15
scheduler [2025-09-03T09:35:35.457+0000] {kubernetes_executor_utils.py:197}
INFO - Event: pod datetime-mapped-op-sleep-90-ou6yuu37 adopted, annotations:
<omitted>
scheduler [2025-09-03T09:35:35.458+0000] {kubernetes_executor_utils.py:93}
ERROR - Unknown error in KubernetesJobWatcher. Failing
scheduler Traceback (most recent call last):
scheduler File
"/usr/local/lib/python3.12/site-packages/astronomer/kubernetes/executors/kubernetes_executor_utils.py",
line 86, in run
scheduler self.resource_version = self._run(
scheduler ^^^^^^^^^^
scheduler File
"/usr/local/lib/python3.12/site-packages/astronomer/kubernetes/executors/kubernetes_executor_utils.py",
line 158, in _run
scheduler self.process_status(
scheduler File
"/usr/local/lib/python3.12/site-packages/astronomer/kubernetes/executors/kubernetes_executor_utils.py",
line 198, in process_status
scheduler self.watcher_queue.put((pod_name, namespace, ADOPTED,
annotations, resource_version))
scheduler File "<string>", line 2, in put
scheduler File "/usr/local/lib/python3.12/multiprocessing/managers.py",
line 827, in _callmethod
scheduler conn.send((self._id, methodname, args, kwds))
scheduler File "/usr/local/lib/python3.12/multiprocessing/connection.py",
line 206, in send
scheduler self._send_bytes(_ForkingPickler.dumps(obj))
scheduler File "/usr/local/lib/python3.12/multiprocessing/connection.py",
line 427, in _send_bytes
scheduler self._send(header + buf)
scheduler File "/usr/local/lib/python3.12/multiprocessing/connection.py",
line 384, in _send
scheduler n = write(self._handle, buf)
scheduler ^^^^^^^^^^^^^^^^^^^^^^^^
scheduler BrokenPipeError: [Errno 32] Broken pipe
scheduler Process KubernetesJobWatcher-3:
scheduler Traceback (most recent call last):
scheduler File "/usr/local/lib/python3.12/multiprocessing/process.py",
line 314, in _bootstrap
scheduler self.run()
scheduler File
"/usr/local/lib/python3.12/site-packages/astronomer/kubernetes/executors/kubernetes_executor_utils.py",
line 86, in run
scheduler self.resource_version = self._run(
scheduler ^^^^^^^^^^
scheduler File
"/usr/local/lib/python3.12/site-packages/astronomer/kubernetes/executors/kubernetes_executor_utils.py",
line 158, in _run
scheduler self.process_status(
scheduler File
"/usr/local/lib/python3.12/site-packages/astronomer/kubernetes/executors/kubernetes_executor_utils.py",
line 198, in process_status
scheduler self.watcher_queue.put((pod_name, namespace, ADOPTED,
annotations, resource_version))
scheduler File "<string>", line 2, in put
scheduler File "/usr/local/lib/python3.12/multiprocessing/managers.py",
line 827, in _callmethod
scheduler conn.send((self._id, methodname, args, kwds))
scheduler File "/usr/local/lib/python3.12/multiprocessing/connection.py",
line 206, in send
scheduler self._send_bytes(_ForkingPickler.dumps(obj))
scheduler File "/usr/local/lib/python3.12/multiprocessing/connection.py",
line 427, in _send_bytes
scheduler self._send(header + buf)
scheduler File "/usr/local/lib/python3.12/multiprocessing/connection.py",
line 384, in _send
scheduler n = write(self._handle, buf)
scheduler ^^^^^^^^^^^^^^^^^^^^^^^^
scheduler BrokenPipeError: [Errno 32] Broken pipe
stream closed EOF for
optical-convection-8310/optical-convection-8310-scheduler-7b5857c597-5ggmj
(scheduler-gc)
stream closed EOF for
optical-convection-8310/optical-convection-8310-scheduler-7b5857c597-5ggmj
(scheduler)
```
### What you think should happen instead?
_No response_
### How to reproduce
Execute below DAG with kubernetes executor in main
```
from datetime import datetime, timedelta
from time import sleep
from airflow.sdk import DAG
from airflow.decorators import task
from airflow.models.taskinstance import TaskInstance
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.sensors.date_time import DateTimeSensor,
DateTimeSensorAsync
from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor,
TimeDeltaSensorAsync
delays = [30, 60, 90]
@task
def get_delays():
return delays
@task
def get_wakes(delay, **context):
"Wake {delay} seconds after the task starts"
ti: TaskInstance = context["ti"]
return (ti.start_date + timedelta(seconds=delay)).isoformat()
with DAG(
dag_id="datetime_mapped",
start_date=datetime(1970, 1, 1),
schedule=None,
tags=["taskmap"]
) as dag:
wake_times = get_wakes.expand(delay=get_delays())
DateTimeSensor.partial(task_id="expanded_datetime").expand(target_time=wake_times)
TimeDeltaSensor.partial(task_id="expanded_timedelta").expand(
delta=list(map(lambda x: timedelta(seconds=x), [30, 60, 90]))
)
DateTimeSensorAsync.partial(task_id="expanded_datetime_async").expand(
target_time=wake_times
)
TimeDeltaSensorAsync.partial(task_id="expanded_timedelta_async").expand(
delta=list(map(lambda x: timedelta(seconds=x), [30, 60, 90]))
)
TimeDeltaSensor(task_id="static_timedelta", delta=timedelta(seconds=90))
DateTimeSensor(
task_id="static_datetime",
target_time="{{macros.datetime.now() +
macros.timedelta(seconds=90)}}",
)
PythonOperator(task_id="op_sleep_90", python_callable=lambda: sleep(90))
```
### Operating System
Linux
### Versions of Apache Airflow Providers
_No response_
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]