This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 091d1b2  Async lock leak (#142)
091d1b2 is described below

commit 091d1b279391ff1ae81d6e05a99e3833aaf175c0
Author: Kristian Andersen <[email protected]>
AuthorDate: Mon Mar 13 11:36:13 2023 +0100

    Async lock leak (#142)
    
    * Improve cleanup on producer reconnect
    
    * Fix memory leak in AsyncLock
    
    Token registration not properly disposed
    
    * Updated changelog
    
    * Properly use CancelableCompletionSource
    
    LockRequest was essentially the same functionality built outside the 
CancelableCompletionSource
---
 CHANGELOG.md                                       |  4 ++
 src/DotPulsar/Internal/AsyncLock.cs                |  2 +-
 .../Internal/CancelableCompletionSource.cs         | 18 +++++--
 src/DotPulsar/Internal/SubProducer.cs              | 55 ++++++++++++++--------
 4 files changed, 56 insertions(+), 23 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8c9f57f..2340ae6 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -10,6 +10,10 @@ The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.0.0/)
 
 - 'ReplicateSubscriptionState' can now be set when creating a consumer. The 
default is 'false'
 
+### Fixed
+
+- Fixed a memory leak related to internal locking mechanism
+
 ## [2.10.2] - 2023-02-17
 
 ### Fixed
diff --git a/src/DotPulsar/Internal/AsyncLock.cs 
b/src/DotPulsar/Internal/AsyncLock.cs
index 56a00e0..9846edd 100644
--- a/src/DotPulsar/Internal/AsyncLock.cs
+++ b/src/DotPulsar/Internal/AsyncLock.cs
@@ -55,7 +55,7 @@ public sealed class AsyncLock : IAsyncDisposable
             node = _pending.AddLast(ccs);
         }
 
-        cancellationToken.Register(() => Cancel(node));
+        node.Value.SetupCancellation(() => Cancel(node), cancellationToken);
 
         return node.Value.Task;
     }
diff --git a/src/DotPulsar/Internal/CancelableCompletionSource.cs 
b/src/DotPulsar/Internal/CancelableCompletionSource.cs
index f706427..e974718 100644
--- a/src/DotPulsar/Internal/CancelableCompletionSource.cs
+++ b/src/DotPulsar/Internal/CancelableCompletionSource.cs
@@ -22,12 +22,20 @@ public sealed class CancelableCompletionSource<T> : 
IDisposable
 {
     private readonly TaskCompletionSource<T> _source;
     private CancellationTokenRegistration? _registration;
+    private bool _isDisposed;
+    private readonly object _lock = new();
 
     public CancelableCompletionSource()
         => _source = new 
TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
 
     public void SetupCancellation(Action callback, CancellationToken token)
-        => _registration = token.Register(callback);
+    {
+        lock (_lock)
+        {
+            if (!_isDisposed)
+                _registration = token.Register(callback);
+        }
+    }
 
     public void SetResult(T result)
         => _ = _source.TrySetResult(result);
@@ -39,7 +47,11 @@ public sealed class CancelableCompletionSource<T> : 
IDisposable
 
     public void Dispose()
     {
-        _ = _source.TrySetCanceled();
-        _registration?.Dispose();
+        lock (_lock)
+        {
+            _isDisposed = true;
+            _ = _source.TrySetCanceled();
+            _registration?.Dispose();
+        }
     }
 }
diff --git a/src/DotPulsar/Internal/SubProducer.cs 
b/src/DotPulsar/Internal/SubProducer.cs
index 8d26335..d61fb98 100644
--- a/src/DotPulsar/Internal/SubProducer.cs
+++ b/src/DotPulsar/Internal/SubProducer.cs
@@ -104,30 +104,47 @@ public sealed class SubProducer : IContainsChannel, 
IState<ProducerState>
         var responseQueue = new AsyncQueue<Task<BaseCommand>>();
         var responseProcessorTask = ResponseProcessor(responseQueue, 
cancellationToken);
 
-        while (!cancellationToken.IsCancellationRequested)
+        try
         {
-            var sendOp = await 
_sendQueue.NextItem(cancellationToken).ConfigureAwait(false);
-
-            if (sendOp.CancellationToken.IsCancellationRequested)
+            while (!cancellationToken.IsCancellationRequested)
             {
-                _sendQueue.RemoveCurrentItem();
-                continue;
+                var sendOp = await 
_sendQueue.NextItem(cancellationToken).ConfigureAwait(false);
+
+                if (sendOp.CancellationToken.IsCancellationRequested)
+                {
+                    _sendQueue.RemoveCurrentItem();
+                    continue;
+                }
+
+                var tcs = new TaskCompletionSource<BaseCommand>();
+                _ = tcs.Task.ContinueWith(task =>
+                {
+                    try
+                    {
+                        responseQueue.Enqueue(task);
+                    }
+                    catch
+                    {
+                        // Ignore
+                    }
+                }, TaskContinuationOptions.NotOnCanceled | 
TaskContinuationOptions.ExecuteSynchronously);
+
+                // Use CancellationToken.None here because otherwise it will 
throw exceptions on all fault actions even retry.
+                var success = await _executor.TryExecuteOnce(() => 
channel.Send(sendOp.Metadata, sendOp.Data, tcs, cancellationToken), 
CancellationToken.None).ConfigureAwait(false);
+
+                if (success)
+                    continue;
+
+                _eventRegister.Register(new 
ChannelDisconnected(_correlationId));
+                break;
             }
 
-            var tcs = new TaskCompletionSource<BaseCommand>();
-            _ = tcs.Task.ContinueWith(task => responseQueue.Enqueue(task), 
TaskContinuationOptions.NotOnCanceled | 
TaskContinuationOptions.ExecuteSynchronously);
-
-            // Use CancellationToken.None here because otherwise it will throw 
exceptions on all fault actions even retry.
-            var success = await _executor.TryExecuteOnce(() => 
channel.Send(sendOp.Metadata, sendOp.Data, tcs, cancellationToken), 
CancellationToken.None).ConfigureAwait(false);
-
-            if (success)
-                continue;
-
-            _eventRegister.Register(new ChannelDisconnected(_correlationId));
-            break;
+            await responseProcessorTask.ConfigureAwait(false);
+        }
+        finally
+        {
+            responseQueue.Dispose();
         }
-
-        await responseProcessorTask.ConfigureAwait(false);
     }
 
     private async ValueTask ResponseProcessor(IDequeue<Task<BaseCommand>> 
responseQueue, CancellationToken cancellationToken)

Reply via email to