This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cb638b  Fixed memory-leak when consuming from a partitioned topic
9cb638b is described below

commit 9cb638bf0de91e60f02c7b43613b01402cd6cb18
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Mon May 12 21:16:40 2025 +0200

    Fixed memory-leak when consuming from a partitioned topic
---
 src/DotPulsar/Internal/Consumer.cs | 44 +++++++++++++++++++++-----------------
 1 file changed, 24 insertions(+), 20 deletions(-)

diff --git a/src/DotPulsar/Internal/Consumer.cs 
b/src/DotPulsar/Internal/Consumer.cs
index 54b1d4c..70a1ee7 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -33,8 +33,10 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
     private readonly SemaphoreSlim _semaphoreSlim;
     private readonly AsyncLock _lock;
     private readonly Dictionary<string, SubConsumer<TMessage>> _subConsumers;
-    private readonly LinkedList<Task<IMessage<TMessage>>> _receiveTasks;
-    private Dictionary<string, SubConsumer<TMessage>>.Enumerator 
_receiveEnumerator;
+    private readonly TaskCompletionSource<IMessage<TMessage>> _neverEndingTask;
+    private SubConsumer<TMessage>[] _receivers;
+    private Task<IMessage<TMessage>>[] _receiveTasks;
+    private int _receiveIndex;
     private SubConsumer<TMessage>? _singleSubConsumer;
     private bool _allSubConsumersAreReady;
     private int _isDisposed;
@@ -64,6 +66,8 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
             Topic = consumerOptions.TopicsPattern.ToString();
         else
             Topic = string.Join(",", consumerOptions.Topics);
+        _neverEndingTask = new TaskCompletionSource<IMessage<TMessage>>();
+        _receivers = [];
         _receiveTasks = [];
         _cts = new CancellationTokenSource();
         _exceptionHandler = exceptionHandler;
@@ -76,7 +80,6 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
         _allSubConsumersAreReady = false;
         _isDisposed = 0;
         _subConsumers = [];
-        _receiveEnumerator = _subConsumers.GetEnumerator();
         _singleSubConsumer = null;
 
         _ = Setup();
@@ -134,6 +137,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
         }
 
         _numberOfSubConsumers = topics.Count;
+        _receiveTasks = Enumerable.Repeat(_neverEndingTask.Task, 
_numberOfSubConsumers + 1).ToArray();
         var monitoringTasks = new 
Task<ConsumerStateChanged>[_numberOfSubConsumers];
         var states = new ConsumerState[_numberOfSubConsumers];
 
@@ -148,7 +152,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
         if (_numberOfSubConsumers == 1)
             _singleSubConsumer = _subConsumers.First().Value;
 
-        _receiveEnumerator = _subConsumers.GetEnumerator();
+        _receivers = _subConsumers.Values.ToArray();
         _allSubConsumersAreReady = true;
         _semaphoreSlim.Release();
 
@@ -217,39 +221,39 @@ public sealed class Consumer<TMessage> : 
IConsumer<TMessage>
         {
             while (true)
             {
-                var receiveTaskNode = _receiveTasks.First;
-                while (receiveTaskNode is not null)
+                for (var i = 0; i < _numberOfSubConsumers; ++i)
                 {
-                    if (receiveTaskNode.Value.IsCompleted)
+                    var task = _receiveTasks[i];
+                    if (task.IsCompleted)
                     {
-                        _receiveTasks.Remove(receiveTaskNode);
-                        return receiveTaskNode.Value.Result;
+                        _receiveTasks[i] = _neverEndingTask.Task;
+                        return task.Result;
                     }
-                    receiveTaskNode = receiveTaskNode.Next;
                 }
 
-                for (var i = 0; i < _numberOfSubConsumers; ++i)
+                for (var i = 0; i < _numberOfSubConsumers; ++i, 
++_receiveIndex)
                 {
-                    if (!_receiveEnumerator.MoveNext())
-                    {
-                        _receiveEnumerator = _subConsumers.GetEnumerator();
-                        _receiveEnumerator.MoveNext();
-                    }
+                    if (_receiveIndex == _numberOfSubConsumers)
+                        _receiveIndex = 0;
+
+                    var task = _receiveTasks[_receiveIndex];
+                    if (task != _neverEndingTask.Task)
+                        continue;
 
-                    var subConsumer = _receiveEnumerator.Current.Value;
+                    var subConsumer = _receivers[_receiveIndex];
 
                     var receiveTask = subConsumer.Receive(_cts.Token);
                     if (receiveTask.IsCompleted)
                         return receiveTask.Result;
 
-                    _receiveTasks.AddLast(receiveTask.AsTask());
+                    _receiveTasks[_receiveIndex] = receiveTask.AsTask();
                 }
 
                 var tcs = new TaskCompletionSource<IMessage<TMessage>>();
                 using var registration = cancellationToken.Register(() => 
tcs.TrySetCanceled());
-                _receiveTasks.AddLast(tcs.Task);
+                _receiveTasks[_numberOfSubConsumers] = tcs.Task;
                 await Task.WhenAny(_receiveTasks).ConfigureAwait(false);
-                _receiveTasks.RemoveLast();
+                _receiveTasks[_numberOfSubConsumers] = _neverEndingTask.Task;
                 cancellationToken.ThrowIfCancellationRequested();
             }
         }

Reply via email to