vatsrahul1001 opened a new issue, #55211:
URL: https://github.com/apache/airflow/issues/55211

   ### Apache Airflow version
   
   main (development)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   Scheduler crashed with below error
   
   ```
   scheduler [2025-09-03T09:35:11.810+0000] {taskinstance.py:1692} INFO - 
Marking task as FAILED. dag_id=datetime_mapped, task_id=expanded_timedelta, 
run_id=manual__2025-09-03T09:35:09.924713+00:00, map_index=0, 
logical_date=20250903T093508, start_date=, end_date=20250903T093511
   scheduler [2025-09-03T09:35:11.812+0000] {scheduler_job_runner.py:998} ERROR 
- Exception when executing SchedulerJob._run_scheduler_loop
   scheduler Traceback (most recent call last):
   scheduler   File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 994, in _execute
   scheduler     self._run_scheduler_loop()
   scheduler   File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 1297, in _run_scheduler_loop
   scheduler     num_finished_events += self._process_executor_events(
   scheduler                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   scheduler   File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 741, in _process_executor_events
   scheduler     return SchedulerJobRunner.process_executor_events(
   scheduler            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   scheduler   File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 932, in process_executor_events
   scheduler     ti.handle_failure(error=msg, session=session)
   scheduler   File 
"/usr/local/lib/python3.12/site-packages/airflow/utils/session.py", line 98, in 
wrapper
   scheduler     return func(*args, **kwargs)
   scheduler            ^^^^^^^^^^^^^^^^^^^^^
   scheduler   File 
"/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 
1695, in handle_failure
   scheduler     and failure_context["email_for_state"](failure_task)
   scheduler         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   scheduler   File "<attrs generated getattr 
airflow.models.mappedoperator.MappedOperator>", line 11, in __getattr__
   scheduler     return super().__getattribute__(item)
   scheduler            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   scheduler AttributeError: 'MappedOperator' object has no attribute 
'email_on_failure'
   scheduler [2025-09-03T09:35:11.818+0000] {kubernetes_executor.py:617} INFO - 
Shutting down Kubernetes executor
   scheduler [2025-09-03T09:35:12.062+0000] {kubernetes_executor_utils.py:265} 
INFO - Event: datetime-mapped-static-datetime-sexsqeeu is Running, annotations: 
<omitted>
   scheduler [2025-09-03T09:35:12.082+0000] {kubernetes_executor_utils.py:265} 
INFO - Event: datetime-mapped-static-timedelta-uqmyfzln is Running, 
annotations: <omitted>
   scheduler [2025-09-03T09:35:24.629+0000] {kubernetes_executor_utils.py:265} 
INFO - Event: datetime-mapped-get-delays-zyibzufw is Running, annotations: 
<omitted>
   scheduler [2025-09-03T09:35:25.829+0000] {kubernetes_executor_utils.py:252} 
INFO - Event: datetime-mapped-get-delays-zyibzufw Succeeded, annotations: 
<omitted>
   scheduler [2025-09-03T09:35:26.122+0000] {scheduler_job_runner.py:252} INFO 
- Exiting gracefully upon receiving signal 15
   scheduler [2025-09-03T09:35:35.457+0000] {kubernetes_executor_utils.py:197} 
INFO - Event: pod datetime-mapped-op-sleep-90-ou6yuu37 adopted, annotations: 
<omitted>
   scheduler [2025-09-03T09:35:35.458+0000] {kubernetes_executor_utils.py:93} 
ERROR - Unknown error in KubernetesJobWatcher. Failing
   scheduler Traceback (most recent call last):
   scheduler   File 
"/usr/local/lib/python3.12/site-packages/astronomer/kubernetes/executors/kubernetes_executor_utils.py",
 line 86, in run
   scheduler     self.resource_version = self._run(
   scheduler                             ^^^^^^^^^^
   scheduler   File 
"/usr/local/lib/python3.12/site-packages/astronomer/kubernetes/executors/kubernetes_executor_utils.py",
 line 158, in _run
   scheduler     self.process_status(
   scheduler   File 
"/usr/local/lib/python3.12/site-packages/astronomer/kubernetes/executors/kubernetes_executor_utils.py",
 line 198, in process_status
   scheduler     self.watcher_queue.put((pod_name, namespace, ADOPTED, 
annotations, resource_version))
   scheduler   File "<string>", line 2, in put
   scheduler   File "/usr/local/lib/python3.12/multiprocessing/managers.py", 
line 827, in _callmethod
   scheduler     conn.send((self._id, methodname, args, kwds))
   scheduler   File "/usr/local/lib/python3.12/multiprocessing/connection.py", 
line 206, in send
   scheduler     self._send_bytes(_ForkingPickler.dumps(obj))
   scheduler   File "/usr/local/lib/python3.12/multiprocessing/connection.py", 
line 427, in _send_bytes
   scheduler     self._send(header + buf)
   scheduler   File "/usr/local/lib/python3.12/multiprocessing/connection.py", 
line 384, in _send
   scheduler     n = write(self._handle, buf)
   scheduler         ^^^^^^^^^^^^^^^^^^^^^^^^
   scheduler BrokenPipeError: [Errno 32] Broken pipe
   scheduler Process KubernetesJobWatcher-3:
   scheduler Traceback (most recent call last):
   scheduler   File "/usr/local/lib/python3.12/multiprocessing/process.py", 
line 314, in _bootstrap
   scheduler     self.run()
   scheduler   File 
"/usr/local/lib/python3.12/site-packages/astronomer/kubernetes/executors/kubernetes_executor_utils.py",
 line 86, in run
   scheduler     self.resource_version = self._run(
   scheduler                             ^^^^^^^^^^
   scheduler   File 
"/usr/local/lib/python3.12/site-packages/astronomer/kubernetes/executors/kubernetes_executor_utils.py",
 line 158, in _run
   scheduler     self.process_status(
   scheduler   File 
"/usr/local/lib/python3.12/site-packages/astronomer/kubernetes/executors/kubernetes_executor_utils.py",
 line 198, in process_status
   scheduler     self.watcher_queue.put((pod_name, namespace, ADOPTED, 
annotations, resource_version))
   scheduler   File "<string>", line 2, in put
   scheduler   File "/usr/local/lib/python3.12/multiprocessing/managers.py", 
line 827, in _callmethod
   scheduler     conn.send((self._id, methodname, args, kwds))
   scheduler   File "/usr/local/lib/python3.12/multiprocessing/connection.py", 
line 206, in send
   scheduler     self._send_bytes(_ForkingPickler.dumps(obj))
   scheduler   File "/usr/local/lib/python3.12/multiprocessing/connection.py", 
line 427, in _send_bytes
   scheduler     self._send(header + buf)
   scheduler   File "/usr/local/lib/python3.12/multiprocessing/connection.py", 
line 384, in _send
   scheduler     n = write(self._handle, buf)
   scheduler         ^^^^^^^^^^^^^^^^^^^^^^^^
   scheduler BrokenPipeError: [Errno 32] Broken pipe
   stream closed EOF for 
optical-convection-8310/optical-convection-8310-scheduler-7b5857c597-5ggmj 
(scheduler-gc)
   stream closed EOF for 
optical-convection-8310/optical-convection-8310-scheduler-7b5857c597-5ggmj 
(scheduler)
   ```
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   Execute below DAG with kubernetes executor in main
   
   ```
   from datetime import datetime, timedelta
   from time import sleep
   
   from airflow.sdk import DAG
   from airflow.decorators import task
   from airflow.models.taskinstance import TaskInstance
   from airflow.providers.standard.operators.python import PythonOperator
   from airflow.providers.standard.sensors.date_time import DateTimeSensor, 
DateTimeSensorAsync
   from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor, 
TimeDeltaSensorAsync
   
   delays = [30, 60, 90]
   
   
   @task
   def get_delays():
       return delays
   
   
   @task
   def get_wakes(delay, **context):
       "Wake {delay} seconds after the task starts"
       ti: TaskInstance = context["ti"]
       return (ti.start_date + timedelta(seconds=delay)).isoformat()
   
   
   with DAG(
       dag_id="datetime_mapped",
       start_date=datetime(1970, 1, 1),
       schedule=None,
       tags=["taskmap"] 
   ) as dag:
   
       wake_times = get_wakes.expand(delay=get_delays())
   
       
DateTimeSensor.partial(task_id="expanded_datetime").expand(target_time=wake_times)
       TimeDeltaSensor.partial(task_id="expanded_timedelta").expand(
           delta=list(map(lambda x: timedelta(seconds=x), [30, 60, 90]))
       )
   
       DateTimeSensorAsync.partial(task_id="expanded_datetime_async").expand(
           target_time=wake_times
       )
       TimeDeltaSensorAsync.partial(task_id="expanded_timedelta_async").expand(
           delta=list(map(lambda x: timedelta(seconds=x), [30, 60, 90]))
       )
   
       TimeDeltaSensor(task_id="static_timedelta", delta=timedelta(seconds=90))
       DateTimeSensor(
           task_id="static_datetime",
           target_time="{{macros.datetime.now() + 
macros.timedelta(seconds=90)}}",
       )
   
       PythonOperator(task_id="op_sleep_90", python_callable=lambda: sleep(90))
   
   ```
   
   ### Operating System
   
   Linux
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to