This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git
The following commit(s) were added to refs/heads/main by this push:
new 8cfeac6 feat: add shutdown support to async client (#290)
8cfeac6 is described below
commit 8cfeac654ab0ed29d82e5a8659d59fd76b1c6b8c
Author: Nikolas Achatz <[email protected]>
AuthorDate: Mon Feb 2 04:03:02 2026 -0800
feat: add shutdown support to async client (#290)
---
pulsar/asyncio.py | 10 ++++++++++
tests/asyncio_test.py | 11 +++++++++++
2 files changed, 21 insertions(+)
diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py
index a429cf8..7fa7c3d 100644
--- a/pulsar/asyncio.py
+++ b/pulsar/asyncio.py
@@ -777,6 +777,16 @@ class Client:
schema.attach_client(self._client)
return Consumer(await future, schema)
+ def shutdown(self) -> None:
+ """
+ Shutdown the client and all the associated producers and consumers
+
+ Raises
+ ------
+ PulsarException
+ """
+ self._client.shutdown()
+
async def get_topic_partitions(self, topic: str) -> List[str]:
"""
Get the list of partitions for a given topic in asynchronous mode.
diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py
index 32371cb..b6df63a 100644
--- a/tests/asyncio_test.py
+++ b/tests/asyncio_test.py
@@ -227,6 +227,17 @@ class AsyncioTest(IsolatedAsyncioTestCase):
await producer.close()
self.assertFalse(producer.is_connected())
+ async def test_shutdown_client(self):
+ producer = await
self._client.create_producer("persistent://public/default/partitioned_topic_name_test")
+ await producer.send(b"hello")
+ self._client.shutdown()
+
+ try:
+ await producer.send(b"hello")
+ self.fail("Expected AlreadyClosed exception after client shutdown")
+ except PulsarException as e:
+ self.assertEqual(e.error(), pulsar.Result.AlreadyClosed)
+
async def _prepare_messages(self, producer: Producer) ->
List[pulsar.MessageId]:
msg_ids = []
for i in range(5):