vghar-bh opened a new issue, #32484:
URL: https://github.com/apache/airflow/issues/32484

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   Worker seems to be stuck in a catatonic state where queued tasks instance 
messages are not consumed from redis.
   
   Redis did restart while the worker remained as is. The worker did output 
logs that indicated a loss in connection but was able to reconnect after redis 
came back online.
   
   **worker logs:** 
   
   [2023-07-09 01:07:58,950: WARNING/MainProcess] consumer: Connection to 
broker lost. Trying to re-establish the connection...
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py",
 line 336, in start
       blueprint.start(self)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/celery/bootsteps.py", line 
116, in start
       step.start(parent)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py",
 line 726, in start
       c.loop(*c.loop_args())
     File 
"/home/airflow/.local/lib/python3.8/site-packages/celery/worker/loops.py", line 
97, in asynloop
       next(loop)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/kombu/asynchronous/hub.py", 
line 373, in create_loop
       cb(*cbargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/kombu/transport/redis.py", 
line 1336, in on_readable
       self.cycle.on_readable(fileno)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/kombu/transport/redis.py", 
line 566, in on_readable
       chan.handlers[type]()
     File 
"/home/airflow/.local/lib/python3.8/site-packages/kombu/transport/redis.py", 
line 910, in _receive
       ret.append(self._receive_one(c))
     File 
"/home/airflow/.local/lib/python3.8/site-packages/kombu/transport/redis.py", 
line 920, in _receive_one
       response = c.parse_response()
     File "/home/airflow/.local/lib/python3.8/site-packages/redis/client.py", 
line 1542, in parse_response
       response = self._execute(conn, try_read)
     File "/home/airflow/.local/lib/python3.8/site-packages/redis/client.py", 
line 1518, in _execute
       return conn.retry.call_with_retry(
     File "/home/airflow/.local/lib/python3.8/site-packages/redis/retry.py", 
line 49, in call_with_retry
       fail(error)
     File "/home/airflow/.local/lib/python3.8/site-packages/redis/client.py", 
line 1520, in <lambda>
       lambda error: self._disconnect_raise_connect(conn, error),
     File "/home/airflow/.local/lib/python3.8/site-packages/redis/client.py", 
line 1507, in _disconnect_raise_connect
       raise error
     File "/home/airflow/.local/lib/python3.8/site-packages/redis/retry.py", 
line 46, in call_with_retry
       return do()
     File "/home/airflow/.local/lib/python3.8/site-packages/redis/client.py", 
line 1519, in <lambda>
       lambda: command(*args, **kwargs),
     File "/home/airflow/.local/lib/python3.8/site-packages/redis/client.py", 
line 1540, in try_read
       return conn.read_response(disconnect_on_error=False)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/redis/connection.py", line 
874, in read_response
       response = self._parser.read_response(disable_decoding=disable_decoding)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/redis/connection.py", line 
347, in read_response
       result = self._read_response(disable_decoding=disable_decoding)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/redis/connection.py", line 
357, in _read_response
       raw = self._buffer.readline()
     File 
"/home/airflow/.local/lib/python3.8/site-packages/redis/connection.py", line 
260, in readline
       self._read_from_socket()
     File 
"/home/airflow/.local/lib/python3.8/site-packages/redis/connection.py", line 
213, in _read_from_socket
       raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
   redis.exceptions.ConnectionError: Connection closed by server.
   [2023-07-09 01:07:58,958: WARNING/MainProcess] 
/home/airflow/.local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py:386:
 CPendingDeprecationWarning:
   In Celery 5.1 we introduced an optional breaking change which
   on connection loss cancels all currently executed tasks with late 
acknowledgement enabled.
   These tasks cannot be acknowledged as the connection is gone, and the tasks 
are automatically redelivered
   back to the queue. You can enable this behavior using the 
worker_cancel_long_running_tasks_on_connection_loss
   setting. In Celery 5.1 it is set to False by default. The setting will be 
set to True by default in Celery 6.0.
   
     warnings.warn(CANCEL_TASKS_BY_DEFAULT, CPendingDeprecationWarning)
   
   [2023-07-09 01:07:58,978: WARNING/MainProcess] 
/home/airflow/.local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py:498:
 CPendingDeprecationWarning: The broker_connection_retry configuration setting 
will no longer determine
   whether broker connection retries are made during startup in Celery 6.0 and 
above.
   If you wish to retain the existing behavior for retrying connections on 
startup,
   you should set broker_connection_retry_on_startup to True.
     warnings.warn(
   
   [2023-07-09 01:07:58,985: ERROR/MainProcess] consumer: Cannot connect to 
redis://:**@test-airflow-redis-master.test-idp.svc.cluster.local:6379/1: Error 
111 connecting to test-airflow-redis-master.test-idp.svc.cluster.local:6379. 
Connection refused..
   Trying again in 2.00 seconds... (1/100)
   
   [2023-07-09 01:14:09,530: ERROR/MainProcess] consumer: Cannot connect to 
redis://:**@test-airflow-redis-master.test-idp.svc.cluster.local:6379/1: Error 
110 connecting to test-airflow-redis-master.test-idp.svc.cluster.local:6379. 
Connection timed out..
   Trying again in 32.00 seconds... (16/100)
   
   [2023-07-09 01:14:41,652: INFO/MainProcess] Connected to 
redis://:**@test-airflow-redis-master.test-idp.svc.cluster.local:6379/1
   [2023-07-09 01:14:41,653: WARNING/MainProcess] 
/home/airflow/.local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py:498:
 CPendingDeprecationWarning: The broker_connection_retry configuration setting 
will no longer determine
   whether broker connection retries are made during startup in Celery 6.0 and 
above.
   If you wish to retain the existing behavior for retrying connections on 
startup,
   you should set broker_connection_retry_on_startup to True.
     warnings.warn(
   
   [2023-07-09 01:14:41,666: INFO/MainProcess] mingle: searching for neighbors
   [2023-07-09 01:14:42,691: INFO/MainProcess] mingle: all alone
   
   
   
   
   
   
   ### What you think should happen instead
   
   After redis comes back online and the worker connected again, the worker 
should consume the messages and execute queued task instances.
   
   ### How to reproduce
   
   - Delete the existing redis pod and the worker should be unable to connect 
to redis
   - Redis restarts and the worker connects as expected
   - Worker does not consume new messages (queued task instances)
   
   ### Operating System
   
   N/A
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon      5.1.0
   apache-airflow-providers-apache-livy 2.2.3   
   apache-airflow-providers-celery      3.0.0
   apache-airflow-providers-cncf-kubernetes 4.4.0
   apache-airflow-providers-common-sql  1.2.0
   apache-airflow-providers-docker      3.2.0
   apache-airflow-providers-elasticsearch       4.2.1
   apache-airflow-providers-ftp 3.1.0
   apache-airflow-providers-google      8.3.0
   apache-airflow-providers-grpc        3.0.0
   apache-airflow-providers-hashicorp   3.1.0
   apache-airflow-providers-http        4.0.0
   apache-airflow-providers-imap        3.0.0
   apache-airflow-providers-microsoft-azure 4.3.0
   apache-airflow-providers-mysql       3.2.1
   apache-airflow-providers-odbc        3.1.2
   apache-airflow-providers-postgres    5.2.2
   apache-airflow-providers-redis       3.0.0
   apache-airflow-providers-sendgrid    3.0.0
   apache-airflow-providers-sftp        4.1.0
   apache-airflow-providers-slack       5.1.0
   apache-airflow-providers-sqlite      3.2.1
   apache-airflow-providers-ssh 3.2.0
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   Airflow version 2.4.1
   
   currently working with helm charts from Airflow Helm Chart (User Community) 
   https://artifacthub.io/packages/helm/airflow-helm/airflow/8.6.1
   
   using  Argocd as GitOps CD tool for Kubernetes.
   
   
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to