This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 8cfeac6  feat: add shutdown support to async client (#290)
8cfeac6 is described below

commit 8cfeac654ab0ed29d82e5a8659d59fd76b1c6b8c
Author: Nikolas Achatz <[email protected]>
AuthorDate: Mon Feb 2 04:03:02 2026 -0800

    feat: add shutdown support to async client (#290)
---
 pulsar/asyncio.py     | 10 ++++++++++
 tests/asyncio_test.py | 11 +++++++++++
 2 files changed, 21 insertions(+)

diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py
index a429cf8..7fa7c3d 100644
--- a/pulsar/asyncio.py
+++ b/pulsar/asyncio.py
@@ -777,6 +777,16 @@ class Client:
         schema.attach_client(self._client)
         return Consumer(await future, schema)
     
+    def shutdown(self) -> None:
+        """
+        Shutdown the client and all the associated producers and consumers
+
+        Raises
+        ------
+        PulsarException
+        """
+        self._client.shutdown()
+
     async def get_topic_partitions(self, topic: str) -> List[str]:
         """
         Get the list of partitions for a given topic in asynchronous mode.
diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py
index 32371cb..b6df63a 100644
--- a/tests/asyncio_test.py
+++ b/tests/asyncio_test.py
@@ -227,6 +227,17 @@ class AsyncioTest(IsolatedAsyncioTestCase):
         await producer.close()
         self.assertFalse(producer.is_connected())
 
+    async def test_shutdown_client(self):
+        producer = await 
self._client.create_producer("persistent://public/default/partitioned_topic_name_test")
+        await producer.send(b"hello")
+        self._client.shutdown()
+
+        try:
+            await producer.send(b"hello")
+            self.fail("Expected AlreadyClosed exception after client shutdown")
+        except PulsarException as e:
+            self.assertEqual(e.error(), pulsar.Result.AlreadyClosed)
+
     async def _prepare_messages(self, producer: Producer) -> 
List[pulsar.MessageId]:
         msg_ids = []
         for i in range(5):

Reply via email to