This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f67f42a  Fix Producer.Send hang on reconnect. (#135)
f67f42a is described below

commit f67f42ada8787759c35bfb4f0efabec3f7208e56
Author: Kristian Andersen <[email protected]>
AuthorDate: Tue Jan 31 10:02:53 2023 +0100

    Fix Producer.Send hang on reconnect. (#135)
---
 CHANGELOG.md                                   | 4 ++++
 src/DotPulsar/Internal/AsyncQueueWithCursor.cs | 8 ++++++--
 2 files changed, 10 insertions(+), 2 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index b945fe1..bca728f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -14,6 +14,10 @@ The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.0.0/)
 
 - If a consumer, reader, or producer is faulted all method calls will throw a 
ConsumerFaultedException, ProducerFaultedException, or ReaderFaultedException
 
+### Fixed
+
+- Fixed an issue introduced in `2.8.0`, where send operation would hang, after 
reestablishing the connection to the broker.
+
 ## [2.9.0] - 2023-01-26
 
 ### Added
diff --git a/src/DotPulsar/Internal/AsyncQueueWithCursor.cs 
b/src/DotPulsar/Internal/AsyncQueueWithCursor.cs
index 0dc13ba..f00f4ab 100644
--- a/src/DotPulsar/Internal/AsyncQueueWithCursor.cs
+++ b/src/DotPulsar/Internal/AsyncQueueWithCursor.cs
@@ -178,8 +178,7 @@ public sealed class AsyncQueueWithCursor<T> : 
IAsyncDisposable where T : IDispos
         }
         finally
         {
-            if (_cursorNextItemTcs is not null && 
_cursorNextItemTcs.Task.IsCanceled)
-                throw new TaskCanceledException("The task was cancelled");
+            bool shouldThrow = _cursorNextItemTcs is not null && 
_cursorNextItemTcs.Task.IsCanceled;
 
             lock (_queue)
             {
@@ -187,6 +186,11 @@ public sealed class AsyncQueueWithCursor<T> : 
IAsyncDisposable where T : IDispos
             }
 
             _cursorSemaphore.Release();
+
+            if (shouldThrow)
+            {
+                throw new TaskCanceledException("The task was cancelled");
+            }
         }
     }
 

Reply via email to