hterik opened a new issue, #29833:
URL: https://github.com/apache/airflow/issues/29833
### Apache Airflow version
2.5.1
### What happened
1. Task was started and sent to celery by scheduler
2. Celery worker picked up the task
3. Celery worker lost database connection
4. Celery worker crashes with error below (`RuntimeError: Set changed size
during iteration`)
5. Task is stuck in queued state for over 14 hours.
Following log and screenshot shows a more recent example of the situation
above, where 5) has not reached 14h yet. Though we've observed a few such
situations recently.
```
[2023-03-01 09:55:27,163: INFO/MainProcess] Task
airflow.executors.celery_executor.execute_command[6e43c201-b325-4538-ae94-3cf9a583f138]
received
[2023-03-01 09:55:27,364: INFO/ForkPoolWorker-1]
[6e43c201-b325-4538-ae94-3cf9a583f138] Executing command in Celery: ['airflow',
'tasks', 'run', XXXXX
[2023-03-01 09:55:28,263: INFO/ForkPoolWorker-1] Filling up the DagBag from
/XXXXX
[2023-03-01 09:55:30,265: INFO/ForkPoolWorker-1] Running <TaskInstance:
XXXXX [queued]> on host worker4
[2023-03-01 10:00:23,487: ERROR/ForkPoolWorker-1]
[6e43c201-b325-4538-ae94-3cf9a583f138] Failed to execute task
(psycopg2.OperationalError) connection to server at "XXXXXXXXX failed:
Connection timed out
Is the server running on that host and accepting TCP/IP connections?
(Background on this error at: https://sqlalche.me/e/14/e3q8).
Traceback (most recent call last):
File "sqlalchemy/engine/base.py", line 3361, in _wrap_pool_connect
return fn()
File "sqlalchemy/pool/base.py", line 325, in connect
return _ConnectionFairy._checkout(self)
File "sqlalchemy/pool/base.py", line 888, in _checkout
fairy = _ConnectionRecord.checkout(pool)
File "sqlalchemy/pool/base.py", line 491, in checkout
rec = pool._do_get()
File "sqlalchemy/pool/impl.py", line 256, in _do_get
return self._create_connection()
File "sqlalchemy/pool/base.py", line 271, in _create_connection
return _ConnectionRecord(self)
File "sqlalchemy/pool/base.py", line 386, in __init__
self.__connect()
File "sqlalchemy/pool/base.py", line 684, in __connect
with util.safe_reraise():
File "sqlalchemy/util/langhelpers.py", line 70, in __exit__
compat.raise_(
File "sqlalchemy/util/compat.py", line 210, in raise_
raise exception
File "sqlalchemy/pool/base.py", line 680, in __connect
self.dbapi_connection = connection = pool._invoke_creator(self)
File "sqlalchemy/engine/create.py", line 578, in connect
return dialect.connect(*cargs, **cparams)
File "sqlalchemy/engine/default.py", line 598, in connect
return self.dbapi.connect(*cargs, **cparams)
File "psycopg2/__init__.py", line 122, in connect
conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: connection to server at "xxxxxx port 5432 failed:
Connection timed out
Is the server running on that host and accepting TCP/IP connections?
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "airflow/executors/celery_executor.py", line 130, in _execute_in_fork
args.func(args)
File "airflow/cli/cli_parser.py", line 52, in command
return func(*args, **kwargs)
File "airflow/utils/cli.py", line 108, in wrapper
return f(*args, **kwargs)
File "airflow/cli/commands/task_command.py", line 396, in task_run
_run_task_by_selected_method(args, dag, ti)
File "airflow/cli/commands/task_command.py", line 193, in
_run_task_by_selected_method
_run_task_by_local_task_job(args, ti)
File "airflow/cli/commands/task_command.py", line 252, in
_run_task_by_local_task_job
run_job.run()
File "airflow/jobs/base_job.py", line 259, in run
session.merge(self)
File "sqlalchemy/orm/session.py", line 3051, in merge
return self._merge(
File "sqlalchemy/orm/session.py", line 3131, in _merge
merged = self.get(
File "sqlalchemy/orm/session.py", line 2848, in get
return self._get_impl(
File "sqlalchemy/orm/session.py", line 2970, in _get_impl
return db_load_fn(
File "sqlalchemy/orm/loading.py", line 530, in load_on_pk_identity
session.execute(
File "sqlalchemy/orm/session.py", line 1713, in execute
conn = self._connection_for_bind(bind)
File "sqlalchemy/orm/session.py", line 1552, in _connection_for_bind
return self._transaction._connection_for_bind(
File "sqlalchemy/orm/session.py", line 747, in _connection_for_bind
conn = bind.connect()
File "sqlalchemy/engine/base.py", line 3315, in connect
return self._connection_cls(self, close_with_result=close_with_result)
File "sqlalchemy/engine/base.py", line 96, in __init__
else engine.raw_connection()
File "sqlalchemy/engine/base.py", line 3394, in raw_connection
return self._wrap_pool_connect(self.pool.connect, _connection)
File "sqlalchemy/engine/base.py", line 3364, in _wrap_pool_connect
Connection._handle_dbapi_exception_noconnection(
File "sqlalchemy/engine/base.py", line 2198, in
_handle_dbapi_exception_noconnection
util.raise_(
File "sqlalchemy/util/compat.py", line 210, in raise_
raise exception
File "sqlalchemy/engine/base.py", line 3361, in _wrap_pool_connect
return fn()
File "sqlalchemy/pool/base.py", line 325, in connect
return _ConnectionFairy._checkout(self)
File "sqlalchemy/pool/base.py", line 888, in _checkout
fairy = _ConnectionRecord.checkout(pool)
File "sqlalchemy/pool/base.py", line 491, in checkout
rec = pool._do_get()
File "sqlalchemy/pool/impl.py", line 256, in _do_get
return self._create_connection()
File "sqlalchemy/pool/base.py", line 271, in _create_connection
return _ConnectionRecord(self)
File "sqlalchemy/pool/base.py", line 386, in __init__
self.__connect()
File "sqlalchemy/pool/base.py", line 684, in __connect
with util.safe_reraise():
File "sqlalchemy/util/langhelpers.py", line 70, in __exit__
compat.raise_(
File "sqlalchemy/util/compat.py", line 210, in raise_
raise exception
File "sqlalchemy/pool/base.py", line 680, in __connect
self.dbapi_connection = connection = pool._invoke_creator(self)
File "sqlalchemy/engine/create.py", line 578, in connect
return dialect.connect(*cargs, **cparams)
File "sqlalchemy/engine/default.py", line 598, in connect
return self.dbapi.connect(*cargs, **cparams)
File "psycopg2/__init__.py", line 122, in connect
conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) connection to
server at "XXXX, port 5432 failed: Connection timed out
Is the server running on that host and accepting TCP/IP connections?
(Background on this error at: https://sqlalche.me/e/14/e3q8)
[2023-03-01 10:00:23,705: ERROR/ForkPoolWorker-1] Task
airflow.executors.celery_executor.execute_command[6e43c201-b325-4538-ae94-3cf9a583f138]
raised unexpected: AirflowException('Celery command failed on host: worker4
with celery_task_id 6e43c201-b325-4538-ae94-3cf9a583f138')
Traceback (most recent call last):
File "celery/app/trace.py", line 451, in trace_task
R = retval = fun(*args, **kwargs)
File "celery/app/trace.py", line 734, in __protected_call__
return self.run(*args, **kwargs)
File "airflow/executors/celery_executor.py", line 96, in execute_command
_execute_in_fork(command_to_exec, celery_task_id)
File "airflow/executors/celery_executor.py", line 111, in _execute_in_fork
raise AirflowException(msg)
airflow.exceptions.AirflowException: Celery command failed on host: worker4
with celery_task_id 6e43c201-b325-4538-ae94-3cf9a583f138
[2023-03-01 10:00:24,109: CRITICAL/MainProcess] Unrecoverable error:
RuntimeError('Set changed size during iteration')
Traceback (most recent call last):
File "celery/worker/worker.py", line 203, in start
self.blueprint.start(self)
File "celery/bootsteps.py", line 116, in start
step.start(parent)
File "celery/bootsteps.py", line 365, in start
return self.obj.start()
File "celery/worker/consumer/consumer.py", line 332, in start
blueprint.start(self)
File "celery/bootsteps.py", line 116, in start
step.start(parent)
File "celery/worker/consumer/consumer.py", line 628, in start
c.loop(*c.loop_args())
File "celery/worker/loops.py", line 97, in asynloop
next(loop)
File "kombu/asynchronous/hub.py", line 294, in create_loop
for tick_callback in on_tick:
RuntimeError: Set changed size during iteration
[2023-03-01 10:00:25 +0000] [14] [INFO] Handling signal: term
[2023-03-01 10:00:25 +0000] [15] [INFO] Worker exiting (pid: 15)
[2023-03-01 10:00:25 +0000] [16] [INFO] Worker exiting (pid: 16)
[2023-03-01 10:00:25 +0000] [14] [INFO] Shutting down: Master
```

### What you think should happen instead
A. Celery worker should reconnect to the database in case of intermittent
network errors
B. In case of unrecoverable errors, scheduler should eventually retry or
fail the task.
### How to reproduce
Difficult, happens intermittently.
### Operating System
Ubuntu 22.04
### Versions of Apache Airflow Providers
apache-airflow==2.5.0
apache-airflow-providers-celery==3.1.0
redis==3.5.3
celery==5.2.7
kombu==5.2.4
### Deployment
Other Docker-based deployment
### Deployment details
airflow.cfg, celery options
```
[celery]
worker_concurrency = 1
worker_prefetch_multiplier = 1
worker_autoscale = 1,1
celery_config_options = ...see below
[celery_broker_transport_options]
socket_connect = 240
socket_keepalive = True
socket_connect_timeout = 240
retry_on_timeout = True
```
```
celery_config_options = {
**airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG,
"result_backend_always_retry": True,
"result_backend_max_retries": 20,
"redis_socket_keepalive": True,
"redis_retry_on_timeout": True,
"redis_socket_connect_timeout": 240,
"worker_deduplicate_successful_tasks": True,
}``
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]