This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 73bf388 Fixed issue with hanging Receive
73bf388 is described below
commit 73bf3885e1482b9f6e53e1649103aa36395e0eab
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Mon Feb 3 11:08:09 2025 +0100
Fixed issue with hanging Receive
---
src/DotPulsar/Internal/Consumer.cs | 42 ++++++++++++++++----------------------
1 file changed, 18 insertions(+), 24 deletions(-)
diff --git a/src/DotPulsar/Internal/Consumer.cs
b/src/DotPulsar/Internal/Consumer.cs
index 5b71281..142d63b 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -149,7 +149,6 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
_singleSubConsumer = _subConsumers.First().Value;
_receiveEnumerator = _subConsumers.GetEnumerator();
- _receiveEnumerator.MoveNext();
_allSubConsumersAreReady = true;
_semaphoreSlim.Release();
@@ -216,8 +215,6 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
- var startTopic = string.Empty;
-
while (true)
{
var receiveTaskNode = _receiveTasks.First;
@@ -231,32 +228,29 @@ public sealed class Consumer<TMessage> :
IConsumer<TMessage>
receiveTaskNode = receiveTaskNode.Next;
}
- if (_receiveEnumerator.Current.Key is not null)
- startTopic = _receiveEnumerator.Current.Key;
-
- if (!_receiveEnumerator.MoveNext())
+ for (var i = 0; i < _numberOfSubConsumers; ++i)
{
- _receiveEnumerator = _subConsumers.GetEnumerator();
- _receiveEnumerator.MoveNext();
- }
-
- var subConsumer = _receiveEnumerator.Current;
+ if (!_receiveEnumerator.MoveNext())
+ {
+ _receiveEnumerator = _subConsumers.GetEnumerator();
+ _receiveEnumerator.MoveNext();
+ }
- var receiveTask = subConsumer.Value.Receive(_cts.Token);
- if (receiveTask.IsCompleted)
- return receiveTask.Result;
+ var subConsumer = _receiveEnumerator.Current.Value;
- _receiveTasks.AddLast(receiveTask.AsTask());
+ var receiveTask = subConsumer.Receive(_cts.Token);
+ if (receiveTask.IsCompleted)
+ return receiveTask.Result;
- if (startTopic == subConsumer.Key)
- {
- var tcs = new TaskCompletionSource<IMessage<TMessage>>();
- using var registration = cancellationToken.Register(() =>
tcs.TrySetCanceled());
- _receiveTasks.AddLast(tcs.Task);
- await Task.WhenAny(_receiveTasks).ConfigureAwait(false);
- _receiveTasks.RemoveLast();
- cancellationToken.ThrowIfCancellationRequested();
+ _receiveTasks.AddLast(receiveTask.AsTask());
}
+
+ var tcs = new TaskCompletionSource<IMessage<TMessage>>();
+ using var registration = cancellationToken.Register(() =>
tcs.TrySetCanceled());
+ _receiveTasks.AddLast(tcs.Task);
+ await Task.WhenAny(_receiveTasks).ConfigureAwait(false);
+ _receiveTasks.RemoveLast();
+ cancellationToken.ThrowIfCancellationRequested();
}
}
}