Package: release.debian.org
Severity: normal
Tags: bookworm
User: release.debian....@packages.debian.org
Usertags: pu
X-Debbugs-Cc: python-channels-re...@packages.debian.org, Michael Fladischer 
<fl...@debian.org>
Control: affects -1 + src:python-channels-redis

[ Reason ]
The version of python-channels-redis in bookworm suffers from
https://bugs.debian.org/1027387 /
https://github.com/django/channels_redis/issues/332, which was
introduced in 4.0.0 and is a regression from bullseye.  I ran into this
while working on debusine.

[ Impact ]
I believe that any application that (a) runs Celery, (b) uses
channels-redis, and (c) sends to a channel via async_to_sync in a Celery
task without explicitly closing redis pools will encounter RuntimeErrors
as per the upstream bug report above.

I was able to work around this in my application
(https://salsa.debian.org/freexian-team/debusine/-/merge_requests/594),
but I lost a few hours to figuring this out and it would be good if
others didn't have to.

[ Tests ]
It's covered by the upstream test suite, which is run via autopkgtest.

I also confirmed that this change fixes the problems I ran into in
debusine, without applying any workarounds.

[ Risks ]
The alternative is letting applications work around this individually,
which I think on balance is probably worse - I initially tried switching
to RedisPubSubChannelLayer instead, which did more or less work but
required various other rather strange workarounds.

[ Checklist ]
  [x] *all* changes are documented in the d/changelog
  [x] I reviewed all changes and I approve them
  [x] attach debdiff against the package in (old)stable
  [x] the issue is verified as fixed in unstable

[ Changes ]
This is a straight cherry-pick of an upstream change to close redis
connection pools when closing the asyncio loop.

-- 
Colin Watson (he/him)                              [cjwat...@debian.org]
diff -Nru python-channels-redis-4.0.0/debian/changelog 
python-channels-redis-4.0.0/debian/changelog
--- python-channels-redis-4.0.0/debian/changelog        2022-10-10 
21:13:47.000000000 +0100
+++ python-channels-redis-4.0.0/debian/changelog        2024-02-19 
12:04:40.000000000 +0000
@@ -1,3 +1,11 @@
+python-channels-redis (4.0.0-1+deb12u1) UNRELEASED; urgency=medium
+
+  * Team upload.
+  * Cherry-pick from upstream:
+    - Ensure pools are closed on loop close in core (closes: #1027387).
+
+ -- Colin Watson <cjwat...@debian.org>  Mon, 19 Feb 2024 12:04:40 +0000
+
 python-channels-redis (4.0.0-1) unstable; urgency=medium
 
   * New upstream release.
diff -Nru 
python-channels-redis-4.0.0/debian/patches/loop-close-pools-in-core.patch 
python-channels-redis-4.0.0/debian/patches/loop-close-pools-in-core.patch
--- python-channels-redis-4.0.0/debian/patches/loop-close-pools-in-core.patch   
1970-01-01 01:00:00.000000000 +0100
+++ python-channels-redis-4.0.0/debian/patches/loop-close-pools-in-core.patch   
2024-02-19 12:04:34.000000000 +0000
@@ -0,0 +1,227 @@
+From: Devid <setti.david...@gmail.com>
+Date: Tue, 28 Mar 2023 11:30:35 +0200
+Subject: Assured pools are closed on loop close in core (#347)
+
+Origin: upstream, https://github.com/django/channels_redis/pull/347
+Bug: https://github.com/django/channels_redis/issues/332
+Bug-Debian: https://bugs.debian.org/1027387
+Last-Update: 2024-02-19
+---
+ channels_redis/core.py   | 64 ++++++++++++++++++++++++++++--------------------
+ channels_redis/pubsub.py | 18 +-------------
+ channels_redis/utils.py  | 16 ++++++++++++
+ 3 files changed, 54 insertions(+), 44 deletions(-)
+
+diff --git a/channels_redis/core.py b/channels_redis/core.py
+index 7c04ecd..c3eb3b3 100644
+--- a/channels_redis/core.py
++++ b/channels_redis/core.py
+@@ -15,7 +15,7 @@ from redis import asyncio as aioredis
+ from channels.exceptions import ChannelFull
+ from channels.layers import BaseChannelLayer
+ 
+-from .utils import _consistent_hash
++from .utils import _consistent_hash, _wrap_close
+ 
+ logger = logging.getLogger(__name__)
+ 
+@@ -69,6 +69,26 @@ class BoundedQueue(asyncio.Queue):
+         return super(BoundedQueue, self).put_nowait(item)
+ 
+ 
++class RedisLoopLayer:
++    def __init__(self, channel_layer):
++        self._lock = asyncio.Lock()
++        self.channel_layer = channel_layer
++        self._connections = {}
++
++    def get_connection(self, index):
++        if index not in self._connections:
++            pool = self.channel_layer.create_pool(index)
++            self._connections[index] = aioredis.Redis(connection_pool=pool)
++
++        return self._connections[index]
++
++    async def flush(self):
++        async with self._lock:
++            for index in list(self._connections):
++                connection = self._connections.pop(index)
++                await connection.close(close_connection_pool=True)
++
++
+ class RedisChannelLayer(BaseChannelLayer):
+     """
+     Redis channel layer.
+@@ -101,8 +121,7 @@ class RedisChannelLayer(BaseChannelLayer):
+         self.hosts = self.decode_hosts(hosts)
+         self.ring_size = len(self.hosts)
+         # Cached redis connection pools and the event loop they are from
+-        self.pools = {}
+-        self.pools_loop = None
++        self._layers = {}
+         # Normal channels choose a host index by cycling through the 
available hosts
+         self._receive_index_generator = 
itertools.cycle(range(len(self.hosts)))
+         self._send_index_generator = itertools.cycle(range(len(self.hosts)))
+@@ -138,7 +157,7 @@ class RedisChannelLayer(BaseChannelLayer):
+             return aioredis.sentinel.SentinelConnectionPool(
+                 master_name,
+                 aioredis.sentinel.Sentinel(sentinels, 
sentinel_kwargs=sentinel_kwargs),
+-                **host
++                **host,
+             )
+         else:
+             return aioredis.ConnectionPool(**host)
+@@ -331,7 +350,7 @@ class RedisChannelLayer(BaseChannelLayer):
+ 
+                         raise
+ 
+-                    message, token, exception = None, None, None
++                    message = token = exception = None
+                     for task in done:
+                         try:
+                             result = task.result()
+@@ -367,7 +386,7 @@ class RedisChannelLayer(BaseChannelLayer):
+                             message_channel, message = await 
self.receive_single(
+                                 real_channel
+                             )
+-                            if type(message_channel) is list:
++                            if isinstance(message_channel, list):
+                                 for chan in message_channel:
+                                     
self.receive_buffer[chan].put_nowait(message)
+                             else:
+@@ -459,11 +478,7 @@ class RedisChannelLayer(BaseChannelLayer):
+         Returns a new channel name that can be used by something in our
+         process as a specific channel.
+         """
+-        return "%s.%s!%s" % (
+-            prefix,
+-            self.client_prefix,
+-            uuid.uuid4().hex,
+-        )
++        return f"{prefix}.{self.client_prefix}!{uuid.uuid4().hex}"
+ 
+     ### Flush extension ###
+ 
+@@ -496,9 +511,8 @@ class RedisChannelLayer(BaseChannelLayer):
+         # Flush all cleaners, in case somebody just wanted to close the
+         # pools without flushing first.
+         await self.wait_received()
+-
+-        for index in self.pools:
+-            await self.pools[index].disconnect()
++        for layer in self._layers.values():
++            await layer.flush()
+ 
+     async def wait_received(self):
+         """
+@@ -667,7 +681,7 @@ class RedisChannelLayer(BaseChannelLayer):
+         """
+         Common function to make the storage key for the group.
+         """
+-        return ("%s:group:%s" % (self.prefix, group)).encode("utf8")
++        return f"{self.prefix}:group:{group}".encode("utf8")
+ 
+     ### Serialization ###
+ 
+@@ -711,7 +725,7 @@ class RedisChannelLayer(BaseChannelLayer):
+         return Fernet(formatted_key)
+ 
+     def __str__(self):
+-        return "%s(hosts=%s)" % (self.__class__.__name__, self.hosts)
++        return f"{self.__class__.__name__}(hosts={self.hosts})"
+ 
+     ### Connection handling ###
+ 
+@@ -723,18 +737,14 @@ class RedisChannelLayer(BaseChannelLayer):
+         # Catch bad indexes
+         if not 0 <= index < self.ring_size:
+             raise ValueError(
+-                "There are only %s hosts - you asked for %s!" % 
(self.ring_size, index)
++                f"There are only {self.ring_size} hosts - you asked for 
{index}!"
+             )
+ 
++        loop = asyncio.get_running_loop()
+         try:
+-            loop = asyncio.get_running_loop()
+-            if self.pools_loop != loop:
+-                self.pools = {}
+-                self.pools_loop = loop
+-        except RuntimeError:
+-            pass
+-
+-        if index not in self.pools:
+-            self.pools[index] = self.create_pool(index)
++            layer = self._layers[loop]
++        except KeyError:
++            _wrap_close(self, loop)
++            layer = self._layers[loop] = RedisLoopLayer(self)
+ 
+-        return aioredis.Redis(connection_pool=self.pools[index])
++        return layer.get_connection(index)
+diff --git a/channels_redis/pubsub.py b/channels_redis/pubsub.py
+index 3c10378..550325b 100644
+--- a/channels_redis/pubsub.py
++++ b/channels_redis/pubsub.py
+@@ -1,32 +1,16 @@
+ import asyncio
+ import functools
+ import logging
+-import types
+ import uuid
+ 
+ import msgpack
+ from redis import asyncio as aioredis
+ 
+-from .utils import _consistent_hash
++from .utils import _consistent_hash, _wrap_close
+ 
+ logger = logging.getLogger(__name__)
+ 
+ 
+-def _wrap_close(proxy, loop):
+-    original_impl = loop.close
+-
+-    def _wrapper(self, *args, **kwargs):
+-        if loop in proxy._layers:
+-            layer = proxy._layers[loop]
+-            del proxy._layers[loop]
+-            loop.run_until_complete(layer.flush())
+-
+-        self.close = original_impl
+-        return self.close(*args, **kwargs)
+-
+-    loop.close = types.MethodType(_wrapper, loop)
+-
+-
+ async def _async_proxy(obj, name, *args, **kwargs):
+     # Must be defined as a function and not a method due to
+     # https://bugs.python.org/issue38364
+diff --git a/channels_redis/utils.py b/channels_redis/utils.py
+index 7b30fdc..d2405bb 100644
+--- a/channels_redis/utils.py
++++ b/channels_redis/utils.py
+@@ -1,4 +1,5 @@
+ import binascii
++import types
+ 
+ 
+ def _consistent_hash(value, ring_size):
+@@ -15,3 +16,18 @@ def _consistent_hash(value, ring_size):
+     bigval = binascii.crc32(value) & 0xFFF
+     ring_divisor = 4096 / float(ring_size)
+     return int(bigval / ring_divisor)
++
++
++def _wrap_close(proxy, loop):
++    original_impl = loop.close
++
++    def _wrapper(self, *args, **kwargs):
++        if loop in proxy._layers:
++            layer = proxy._layers[loop]
++            del proxy._layers[loop]
++            loop.run_until_complete(layer.flush())
++
++        self.close = original_impl
++        return self.close(*args, **kwargs)
++
++    loop.close = types.MethodType(_wrapper, loop)
diff -Nru python-channels-redis-4.0.0/debian/patches/series 
python-channels-redis-4.0.0/debian/patches/series
--- python-channels-redis-4.0.0/debian/patches/series   1970-01-01 
01:00:00.000000000 +0100
+++ python-channels-redis-4.0.0/debian/patches/series   2024-02-19 
12:04:34.000000000 +0000
@@ -0,0 +1 @@
+loop-close-pools-in-core.patch

Reply via email to