This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 73bf388  Fixed issue with hanging Receive
73bf388 is described below

commit 73bf3885e1482b9f6e53e1649103aa36395e0eab
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Mon Feb 3 11:08:09 2025 +0100

    Fixed issue with hanging Receive
---
 src/DotPulsar/Internal/Consumer.cs | 42 ++++++++++++++++----------------------
 1 file changed, 18 insertions(+), 24 deletions(-)

diff --git a/src/DotPulsar/Internal/Consumer.cs 
b/src/DotPulsar/Internal/Consumer.cs
index 5b71281..142d63b 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -149,7 +149,6 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
             _singleSubConsumer = _subConsumers.First().Value;
 
         _receiveEnumerator = _subConsumers.GetEnumerator();
-        _receiveEnumerator.MoveNext();
         _allSubConsumersAreReady = true;
         _semaphoreSlim.Release();
 
@@ -216,8 +215,6 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
 
         using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
         {
-            var startTopic = string.Empty;
-
             while (true)
             {
                 var receiveTaskNode = _receiveTasks.First;
@@ -231,32 +228,29 @@ public sealed class Consumer<TMessage> : 
IConsumer<TMessage>
                     receiveTaskNode = receiveTaskNode.Next;
                 }
 
-                if (_receiveEnumerator.Current.Key is not null)
-                    startTopic = _receiveEnumerator.Current.Key;
-
-                if (!_receiveEnumerator.MoveNext())
+                for (var i = 0; i < _numberOfSubConsumers; ++i)
                 {
-                    _receiveEnumerator = _subConsumers.GetEnumerator();
-                    _receiveEnumerator.MoveNext();
-                }
-
-                var subConsumer = _receiveEnumerator.Current;
+                    if (!_receiveEnumerator.MoveNext())
+                    {
+                        _receiveEnumerator = _subConsumers.GetEnumerator();
+                        _receiveEnumerator.MoveNext();
+                    }
 
-                var receiveTask = subConsumer.Value.Receive(_cts.Token);
-                if (receiveTask.IsCompleted)
-                    return receiveTask.Result;
+                    var subConsumer = _receiveEnumerator.Current.Value;
 
-                _receiveTasks.AddLast(receiveTask.AsTask());
+                    var receiveTask = subConsumer.Receive(_cts.Token);
+                    if (receiveTask.IsCompleted)
+                        return receiveTask.Result;
 
-                if (startTopic == subConsumer.Key)
-                {
-                    var tcs = new TaskCompletionSource<IMessage<TMessage>>();
-                    using var registration = cancellationToken.Register(() => 
tcs.TrySetCanceled());
-                    _receiveTasks.AddLast(tcs.Task);
-                    await Task.WhenAny(_receiveTasks).ConfigureAwait(false);
-                    _receiveTasks.RemoveLast();
-                    cancellationToken.ThrowIfCancellationRequested();
+                    _receiveTasks.AddLast(receiveTask.AsTask());
                 }
+
+                var tcs = new TaskCompletionSource<IMessage<TMessage>>();
+                using var registration = cancellationToken.Register(() => 
tcs.TrySetCanceled());
+                _receiveTasks.AddLast(tcs.Task);
+                await Task.WhenAny(_receiveTasks).ConfigureAwait(false);
+                _receiveTasks.RemoveLast();
+                cancellationToken.ThrowIfCancellationRequested();
             }
         }
     }

Reply via email to