GitHub user MrHenryD edited a discussion: Scheduler restarting from
intermittent errors (Airflow 3.0.4)
### Apache Airflow version
3.0.4 (Python 3.11)
### Deployment
* AWS EKS
* Kubernetes (via. Helm Chart v1.17.0)
* KubernetesExecutor
### What happened
Airflow scheduler fails and restarts after error that occurs randomly. It seems
to be related with fetching details about a task instance.
```
[2025-08-09T21:08:03.201+0000] {kubernetes_executor.py:739} ERROR - Unknown
error while flushing task queue and result queue.
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1022, in _execute
self._run_scheduler_loop()
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1316, in _run_scheduler_loop
num_finished_events += self._process_executor_events(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
line 784, in _process_executor_events
return SchedulerJobRunner.process_executor_events(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
line 949, in process_executor_events
context_from_server=TIRunContext(
^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/pydantic/main.py",
line 253, in __init__
validated_self = self.__pydantic_validator__.validate_python(data,
self_instance=self)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pydantic_core._pydantic_core.ValidationError: 1 validation error for
TIRunContext
dag_run.consumed_asset_events
Error extracting attribute: DetachedInstanceError: Parent instance <DagRun at
0x7fd31ebee550> is not bound to a Session; lazy load operation of attribute
'consumed_asset_events' cannot proceed (Background on this error at:
https://sqlalche.me/e/14/bhk3) [type=get_attribute_error, input_value=<DagRun
us_bear_relay_dif...00. run_type: scheduled>, input_type=DagRun]
For further information visit
https://errors.pydantic.dev/2.11/v/get_attribute_error
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
line 737, in end
self.kube_scheduler.terminate()
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py",
line 533, in terminate
kube_watcher.join()
File "/usr/local/lib/python3.11/multiprocessing/process.py", line 149, in join
res = self._popen.wait(timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/multiprocessing/popen_fork.py", line 43, in
wait
return self.poll(os.WNOHANG if timeout == 0.0 else 0)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/multiprocessing/popen_fork.py", line 27, in
poll
pid, sts = os.waitpid(self.pid, flag)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
line 277, in _exit_gracefully
self._end_active_spans()
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py",
line 101, in wrapper
return func(*args, session=session, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1088, in _end_active_spans
dag_run: DagRun = session.scalars(select(DagRun).where(DagRun.run_id ==
key)).one()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/result.py",
line 1522, in one
return self._only_one_row(
^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/result.py",
line 614, in _only_one_row
raise exc.MultipleResultsFound(
sqlalchemy.exc.MultipleResultsFound: Multiple rows were found when exactly one
was required
[2025-08-09T21:08:03.213+0000] {scheduler_job_runner.py:1038} INFO - Exited
execute loop
Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 7, in <module>
sys.exit(main())
^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/__main__.py",
line 55, in main
args.func(args)
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/cli_config.py",
line 48, in command
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/cli.py", line
112, in wrapper
return f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/providers_configuration_loader.py",
line 55, in wrapped_function
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/scheduler_command.py",
line 52, in scheduler
run_command_with_daemon_option(
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/daemon_utils.py",
line 86, in run_command_with_daemon_option
callback()
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/scheduler_command.py",
line 55, in <lambda>
callback=lambda: _run_scheduler_job(args),
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/scheduler_command.py",
line 43, in _run_scheduler_job
run_job(job=job_runner.job, execute_callable=job_runner._execute)
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py",
line 101, in wrapper
return func(*args, session=session, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/job.py",
line 347, in run_job
return execute_job(job, execute_callable=execute_callable)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/job.py",
line 376, in execute_job
ret = execute_callable()
^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1022, in _execute
self._run_scheduler_loop()
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1316, in _run_scheduler_loop
num_finished_events += self._process_executor_events(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
line 784, in _process_executor_events
return SchedulerJobRunner.process_executor_events(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
line 949, in process_executor_events
context_from_server=TIRunContext(
^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/pydantic/main.py",
line 253, in __init__
validated_self = self.__pydantic_validator__.validate_python(data,
self_instance=self)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pydantic_core._pydantic_core.ValidationError: 1 validation error for
TIRunContext
dag_run.consumed_asset_events
Error extracting attribute: DetachedInstanceError: Parent instance <DagRun at
0x7fd31ebee550> is not bound to a Session; lazy load operation of attribute
'consumed_asset_events' cannot proceed (Background on this error at:
https://sqlalche.me/e/14/bhk3) [type=get_attribute_error, input_value=<DagRun
us_bear_relay_dif...00. run_type: scheduled>, input_type=DagRun]
For further information visit
https://errors.pydantic.dev/2.11/v/get_attribute_error
[2025-08-09T21:08:10.168+0000] {kubernetes_executor_utils.py:277} INFO - Event:
us-ghsg-relay-diff-sync-sql-restore-replicate-tables-hvj32csx Succeeded,
annotations: <omitted>
[2025-08-09T21:08:10.169+0000] {kubernetes_executor_utils.py:98} ERROR -
Unknown error in KubernetesJobWatcher. Failing
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py",
line 91, in run
self.resource_version = self._run(
^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py",
line 171, in _run
self.process_status(
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py",
line 278, in process_status
self.watcher_queue.put((pod_name, namespace, None, annotations,
resource_version))
File "<string>", line 2, in put
File "/usr/local/lib/python3.11/multiprocessing/managers.py", line 821, in
_callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/local/lib/python3.11/multiprocessing/connection.py", line 206, in
send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/local/lib/python3.11/multiprocessing/connection.py", line 427, in
_send_bytes
self._send(header + buf)
File "/usr/local/lib/python3.11/multiprocessing/connection.py", line 384, in
_send
n = write(self._handle, buf)
^^^^^^^^^^^^^^^^^^^^^^^^
BrokenPipeError: [Errno 32] Broken pipe
Process KubernetesJobWatcher-3:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/multiprocessing/process.py", line 314, in
_bootstrap
self.run()
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py",
line 91, in run
self.resource_version = self._run(
^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py",
line 171, in _run
self.process_status(
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py",
line 278, in process_status
self.watcher_queue.put((pod_name, namespace, None, annotations,
resource_version))
File "<string>", line 2, in put
File "/usr/local/lib/python3.11/multiprocessing/managers.py", line 821, in
_callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/local/lib/python3.11/multiprocessing/connection.py", line 206, in
send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/local/lib/python3.11/multiprocessing/connection.py", line 427, in
_send_bytes
self._send(header + buf)
File "/usr/local/lib/python3.11/multiprocessing/connection.py", line 384, in
_send
n = write(self._handle, buf)
^^^^^^^^^^^^^^^^^^^^^^^^
BrokenPipeError: [Errno 32] Broken pipe
```
### How to reproduce
Not sure
### Anything else
No response
GitHub link: https://github.com/apache/airflow/discussions/54306
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]