GitHub user MrHenryD created a discussion: Scheduler restarting from 
intermittent errors (Airflow 3.0.4)

### Apache Airflow version
3.0.2 (Python 3.11)

### Deployment
* AWS EKS
* Kubernetes (via. Helm Chart v1.17.0)
* KubernetesExecutor

### What happened
Airflow scheduler fails and restarts after error that occurs randomly. It seems 
to be related with fetching details about a task instance.

```
[2025-08-09T21:08:03.201+0000] {kubernetes_executor.py:739} ERROR - Unknown 
error while flushing task queue and result queue.
Traceback (most recent call last):
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1022, in _execute
    self._run_scheduler_loop()
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1316, in _run_scheduler_loop
    num_finished_events += self._process_executor_events(
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 784, in _process_executor_events
    return SchedulerJobRunner.process_executor_events(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 949, in process_executor_events
    context_from_server=TIRunContext(
                        ^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/pydantic/main.py", 
line 253, in __init__
    validated_self = self.__pydantic_validator__.validate_python(data, 
self_instance=self)
                     
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pydantic_core._pydantic_core.ValidationError: 1 validation error for 
TIRunContext
dag_run.consumed_asset_events
  Error extracting attribute: DetachedInstanceError: Parent instance <DagRun at 
0x7fd31ebee550> is not bound to a Session; lazy load operation of attribute 
'consumed_asset_events' cannot proceed (Background on this error at: 
https://sqlalche.me/e/14/bhk3) [type=get_attribute_error, input_value=<DagRun 
us_bear_relay_dif...00. run_type: scheduled>, input_type=DagRun]
    For further information visit 
https://errors.pydantic.dev/2.11/v/get_attribute_error

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
 line 737, in end
    self.kube_scheduler.terminate()
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py",
 line 533, in terminate
    kube_watcher.join()
  File "/usr/local/lib/python3.11/multiprocessing/process.py", line 149, in join
    res = self._popen.wait(timeout)
          ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/multiprocessing/popen_fork.py", line 43, in 
wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/multiprocessing/popen_fork.py", line 27, in 
poll
    pid, sts = os.waitpid(self.pid, flag)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 277, in _exit_gracefully
    self._end_active_spans()
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", 
line 101, in wrapper
    return func(*args, session=session, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1088, in _end_active_spans
    dag_run: DagRun = session.scalars(select(DagRun).where(DagRun.run_id == 
key)).one()
                      
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/result.py",
 line 1522, in one
    return self._only_one_row(
           ^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/result.py",
 line 614, in _only_one_row
    raise exc.MultipleResultsFound(
sqlalchemy.exc.MultipleResultsFound: Multiple rows were found when exactly one 
was required
[2025-08-09T21:08:03.213+0000] {scheduler_job_runner.py:1038} INFO - Exited 
execute loop
Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 7, in <module>
    sys.exit(main())
             ^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/__main__.py", 
line 55, in main
    args.func(args)
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/cli_config.py", 
line 48, in command
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/cli.py", line 
112, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/providers_configuration_loader.py",
 line 55, in wrapped_function
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/scheduler_command.py",
 line 52, in scheduler
    run_command_with_daemon_option(
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/daemon_utils.py",
 line 86, in run_command_with_daemon_option
    callback()
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/scheduler_command.py",
 line 55, in <lambda>
    callback=lambda: _run_scheduler_job(args),
                     ^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/scheduler_command.py",
 line 43, in _run_scheduler_job
    run_job(job=job_runner.job, execute_callable=job_runner._execute)
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", 
line 101, in wrapper
    return func(*args, session=session, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/job.py", 
line 347, in run_job
    return execute_job(job, execute_callable=execute_callable)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/job.py", 
line 376, in execute_job
    ret = execute_callable()
          ^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1022, in _execute
    self._run_scheduler_loop()
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1316, in _run_scheduler_loop
    num_finished_events += self._process_executor_events(
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 784, in _process_executor_events
    return SchedulerJobRunner.process_executor_events(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 949, in process_executor_events
    context_from_server=TIRunContext(
                        ^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/pydantic/main.py", 
line 253, in __init__
    validated_self = self.__pydantic_validator__.validate_python(data, 
self_instance=self)
                     
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pydantic_core._pydantic_core.ValidationError: 1 validation error for 
TIRunContext
dag_run.consumed_asset_events
  Error extracting attribute: DetachedInstanceError: Parent instance <DagRun at 
0x7fd31ebee550> is not bound to a Session; lazy load operation of attribute 
'consumed_asset_events' cannot proceed (Background on this error at: 
https://sqlalche.me/e/14/bhk3) [type=get_attribute_error, input_value=<DagRun 
us_bear_relay_dif...00. run_type: scheduled>, input_type=DagRun]
    For further information visit 
https://errors.pydantic.dev/2.11/v/get_attribute_error
[2025-08-09T21:08:10.168+0000] {kubernetes_executor_utils.py:277} INFO - Event: 
us-ghsg-relay-diff-sync-sql-restore-replicate-tables-hvj32csx Succeeded, 
annotations: <omitted>
[2025-08-09T21:08:10.169+0000] {kubernetes_executor_utils.py:98} ERROR - 
Unknown error in KubernetesJobWatcher. Failing
Traceback (most recent call last):
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py",
 line 91, in run
    self.resource_version = self._run(
                            ^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py",
 line 171, in _run
    self.process_status(
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py",
 line 278, in process_status
    self.watcher_queue.put((pod_name, namespace, None, annotations, 
resource_version))
  File "<string>", line 2, in put
  File "/usr/local/lib/python3.11/multiprocessing/managers.py", line 821, in 
_callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/local/lib/python3.11/multiprocessing/connection.py", line 206, in 
send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/local/lib/python3.11/multiprocessing/connection.py", line 427, in 
_send_bytes
    self._send(header + buf)
  File "/usr/local/lib/python3.11/multiprocessing/connection.py", line 384, in 
_send
    n = write(self._handle, buf)
        ^^^^^^^^^^^^^^^^^^^^^^^^
BrokenPipeError: [Errno 32] Broken pipe
Process KubernetesJobWatcher-3:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/multiprocessing/process.py", line 314, in 
_bootstrap
    self.run()
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py",
 line 91, in run
    self.resource_version = self._run(
                            ^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py",
 line 171, in _run
    self.process_status(
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py",
 line 278, in process_status
    self.watcher_queue.put((pod_name, namespace, None, annotations, 
resource_version))
  File "<string>", line 2, in put
  File "/usr/local/lib/python3.11/multiprocessing/managers.py", line 821, in 
_callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/local/lib/python3.11/multiprocessing/connection.py", line 206, in 
send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/local/lib/python3.11/multiprocessing/connection.py", line 427, in 
_send_bytes
    self._send(header + buf)
  File "/usr/local/lib/python3.11/multiprocessing/connection.py", line 384, in 
_send
    n = write(self._handle, buf)
        ^^^^^^^^^^^^^^^^^^^^^^^^
BrokenPipeError: [Errno 32] Broken pipe
```


### How to reproduce
Not sure

### Anything else
No response


GitHub link: https://github.com/apache/airflow/discussions/54306

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to