vghar-bh opened a new issue, #32484: URL: https://github.com/apache/airflow/issues/32484
### Apache Airflow version Other Airflow 2 version (please specify below) ### What happened Worker seems to be stuck in a catatonic state where queued tasks instance messages are not consumed from redis. Redis did restart while the worker remained as is. The worker did output logs that indicated a loss in connection but was able to reconnect after redis came back online. **worker logs:** [2023-07-09 01:07:58,950: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection... Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 336, in start blueprint.start(self) File "/home/airflow/.local/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start step.start(parent) File "/home/airflow/.local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 726, in start c.loop(*c.loop_args()) File "/home/airflow/.local/lib/python3.8/site-packages/celery/worker/loops.py", line 97, in asynloop next(loop) File "/home/airflow/.local/lib/python3.8/site-packages/kombu/asynchronous/hub.py", line 373, in create_loop cb(*cbargs) File "/home/airflow/.local/lib/python3.8/site-packages/kombu/transport/redis.py", line 1336, in on_readable self.cycle.on_readable(fileno) File "/home/airflow/.local/lib/python3.8/site-packages/kombu/transport/redis.py", line 566, in on_readable chan.handlers[type]() File "/home/airflow/.local/lib/python3.8/site-packages/kombu/transport/redis.py", line 910, in _receive ret.append(self._receive_one(c)) File "/home/airflow/.local/lib/python3.8/site-packages/kombu/transport/redis.py", line 920, in _receive_one response = c.parse_response() File "/home/airflow/.local/lib/python3.8/site-packages/redis/client.py", line 1542, in parse_response response = self._execute(conn, try_read) File "/home/airflow/.local/lib/python3.8/site-packages/redis/client.py", line 1518, in _execute return conn.retry.call_with_retry( File "/home/airflow/.local/lib/python3.8/site-packages/redis/retry.py", line 49, in call_with_retry fail(error) File "/home/airflow/.local/lib/python3.8/site-packages/redis/client.py", line 1520, in <lambda> lambda error: self._disconnect_raise_connect(conn, error), File "/home/airflow/.local/lib/python3.8/site-packages/redis/client.py", line 1507, in _disconnect_raise_connect raise error File "/home/airflow/.local/lib/python3.8/site-packages/redis/retry.py", line 46, in call_with_retry return do() File "/home/airflow/.local/lib/python3.8/site-packages/redis/client.py", line 1519, in <lambda> lambda: command(*args, **kwargs), File "/home/airflow/.local/lib/python3.8/site-packages/redis/client.py", line 1540, in try_read return conn.read_response(disconnect_on_error=False) File "/home/airflow/.local/lib/python3.8/site-packages/redis/connection.py", line 874, in read_response response = self._parser.read_response(disable_decoding=disable_decoding) File "/home/airflow/.local/lib/python3.8/site-packages/redis/connection.py", line 347, in read_response result = self._read_response(disable_decoding=disable_decoding) File "/home/airflow/.local/lib/python3.8/site-packages/redis/connection.py", line 357, in _read_response raw = self._buffer.readline() File "/home/airflow/.local/lib/python3.8/site-packages/redis/connection.py", line 260, in readline self._read_from_socket() File "/home/airflow/.local/lib/python3.8/site-packages/redis/connection.py", line 213, in _read_from_socket raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) redis.exceptions.ConnectionError: Connection closed by server. [2023-07-09 01:07:58,958: WARNING/MainProcess] /home/airflow/.local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py:386: CPendingDeprecationWarning: In Celery 5.1 we introduced an optional breaking change which on connection loss cancels all currently executed tasks with late acknowledgement enabled. These tasks cannot be acknowledged as the connection is gone, and the tasks are automatically redelivered back to the queue. You can enable this behavior using the worker_cancel_long_running_tasks_on_connection_loss setting. In Celery 5.1 it is set to False by default. The setting will be set to True by default in Celery 6.0. warnings.warn(CANCEL_TASKS_BY_DEFAULT, CPendingDeprecationWarning) [2023-07-09 01:07:58,978: WARNING/MainProcess] /home/airflow/.local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py:498: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine whether broker connection retries are made during startup in Celery 6.0 and above. If you wish to retain the existing behavior for retrying connections on startup, you should set broker_connection_retry_on_startup to True. warnings.warn( [2023-07-09 01:07:58,985: ERROR/MainProcess] consumer: Cannot connect to redis://:**@test-airflow-redis-master.test-idp.svc.cluster.local:6379/1: Error 111 connecting to test-airflow-redis-master.test-idp.svc.cluster.local:6379. Connection refused.. Trying again in 2.00 seconds... (1/100) [2023-07-09 01:14:09,530: ERROR/MainProcess] consumer: Cannot connect to redis://:**@test-airflow-redis-master.test-idp.svc.cluster.local:6379/1: Error 110 connecting to test-airflow-redis-master.test-idp.svc.cluster.local:6379. Connection timed out.. Trying again in 32.00 seconds... (16/100) [2023-07-09 01:14:41,652: INFO/MainProcess] Connected to redis://:**@test-airflow-redis-master.test-idp.svc.cluster.local:6379/1 [2023-07-09 01:14:41,653: WARNING/MainProcess] /home/airflow/.local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py:498: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine whether broker connection retries are made during startup in Celery 6.0 and above. If you wish to retain the existing behavior for retrying connections on startup, you should set broker_connection_retry_on_startup to True. warnings.warn( [2023-07-09 01:14:41,666: INFO/MainProcess] mingle: searching for neighbors [2023-07-09 01:14:42,691: INFO/MainProcess] mingle: all alone ### What you think should happen instead After redis comes back online and the worker connected again, the worker should consume the messages and execute queued task instances. ### How to reproduce - Delete the existing redis pod and the worker should be unable to connect to redis - Redis restarts and the worker connects as expected - Worker does not consume new messages (queued task instances) ### Operating System N/A ### Versions of Apache Airflow Providers apache-airflow-providers-amazon 5.1.0 apache-airflow-providers-apache-livy 2.2.3 apache-airflow-providers-celery 3.0.0 apache-airflow-providers-cncf-kubernetes 4.4.0 apache-airflow-providers-common-sql 1.2.0 apache-airflow-providers-docker 3.2.0 apache-airflow-providers-elasticsearch 4.2.1 apache-airflow-providers-ftp 3.1.0 apache-airflow-providers-google 8.3.0 apache-airflow-providers-grpc 3.0.0 apache-airflow-providers-hashicorp 3.1.0 apache-airflow-providers-http 4.0.0 apache-airflow-providers-imap 3.0.0 apache-airflow-providers-microsoft-azure 4.3.0 apache-airflow-providers-mysql 3.2.1 apache-airflow-providers-odbc 3.1.2 apache-airflow-providers-postgres 5.2.2 apache-airflow-providers-redis 3.0.0 apache-airflow-providers-sendgrid 3.0.0 apache-airflow-providers-sftp 4.1.0 apache-airflow-providers-slack 5.1.0 apache-airflow-providers-sqlite 3.2.1 apache-airflow-providers-ssh 3.2.0 ### Deployment Official Apache Airflow Helm Chart ### Deployment details Airflow version 2.4.1 currently working with helm charts from Airflow Helm Chart (User Community) https://artifacthub.io/packages/helm/airflow-helm/airflow/8.6.1 using Argocd as GitOps CD tool for Kubernetes. ### Anything else _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org