This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 091d1b2 Async lock leak (#142)
091d1b2 is described below
commit 091d1b279391ff1ae81d6e05a99e3833aaf175c0
Author: Kristian Andersen <[email protected]>
AuthorDate: Mon Mar 13 11:36:13 2023 +0100
Async lock leak (#142)
* Improve cleanup on producer reconnect
* Fix memory leak in AsyncLock
Token registration not properly disposed
* Updated changelog
* Properly use CancelableCompletionSource
LockRequest was essentially the same functionality built outside the
CancelableCompletionSource
---
CHANGELOG.md | 4 ++
src/DotPulsar/Internal/AsyncLock.cs | 2 +-
.../Internal/CancelableCompletionSource.cs | 18 +++++--
src/DotPulsar/Internal/SubProducer.cs | 55 ++++++++++++++--------
4 files changed, 56 insertions(+), 23 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8c9f57f..2340ae6 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -10,6 +10,10 @@ The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.0.0/)
- 'ReplicateSubscriptionState' can now be set when creating a consumer. The
default is 'false'
+### Fixed
+
+- Fixed a memory leak related to internal locking mechanism
+
## [2.10.2] - 2023-02-17
### Fixed
diff --git a/src/DotPulsar/Internal/AsyncLock.cs
b/src/DotPulsar/Internal/AsyncLock.cs
index 56a00e0..9846edd 100644
--- a/src/DotPulsar/Internal/AsyncLock.cs
+++ b/src/DotPulsar/Internal/AsyncLock.cs
@@ -55,7 +55,7 @@ public sealed class AsyncLock : IAsyncDisposable
node = _pending.AddLast(ccs);
}
- cancellationToken.Register(() => Cancel(node));
+ node.Value.SetupCancellation(() => Cancel(node), cancellationToken);
return node.Value.Task;
}
diff --git a/src/DotPulsar/Internal/CancelableCompletionSource.cs
b/src/DotPulsar/Internal/CancelableCompletionSource.cs
index f706427..e974718 100644
--- a/src/DotPulsar/Internal/CancelableCompletionSource.cs
+++ b/src/DotPulsar/Internal/CancelableCompletionSource.cs
@@ -22,12 +22,20 @@ public sealed class CancelableCompletionSource<T> :
IDisposable
{
private readonly TaskCompletionSource<T> _source;
private CancellationTokenRegistration? _registration;
+ private bool _isDisposed;
+ private readonly object _lock = new();
public CancelableCompletionSource()
=> _source = new
TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
public void SetupCancellation(Action callback, CancellationToken token)
- => _registration = token.Register(callback);
+ {
+ lock (_lock)
+ {
+ if (!_isDisposed)
+ _registration = token.Register(callback);
+ }
+ }
public void SetResult(T result)
=> _ = _source.TrySetResult(result);
@@ -39,7 +47,11 @@ public sealed class CancelableCompletionSource<T> :
IDisposable
public void Dispose()
{
- _ = _source.TrySetCanceled();
- _registration?.Dispose();
+ lock (_lock)
+ {
+ _isDisposed = true;
+ _ = _source.TrySetCanceled();
+ _registration?.Dispose();
+ }
}
}
diff --git a/src/DotPulsar/Internal/SubProducer.cs
b/src/DotPulsar/Internal/SubProducer.cs
index 8d26335..d61fb98 100644
--- a/src/DotPulsar/Internal/SubProducer.cs
+++ b/src/DotPulsar/Internal/SubProducer.cs
@@ -104,30 +104,47 @@ public sealed class SubProducer : IContainsChannel,
IState<ProducerState>
var responseQueue = new AsyncQueue<Task<BaseCommand>>();
var responseProcessorTask = ResponseProcessor(responseQueue,
cancellationToken);
- while (!cancellationToken.IsCancellationRequested)
+ try
{
- var sendOp = await
_sendQueue.NextItem(cancellationToken).ConfigureAwait(false);
-
- if (sendOp.CancellationToken.IsCancellationRequested)
+ while (!cancellationToken.IsCancellationRequested)
{
- _sendQueue.RemoveCurrentItem();
- continue;
+ var sendOp = await
_sendQueue.NextItem(cancellationToken).ConfigureAwait(false);
+
+ if (sendOp.CancellationToken.IsCancellationRequested)
+ {
+ _sendQueue.RemoveCurrentItem();
+ continue;
+ }
+
+ var tcs = new TaskCompletionSource<BaseCommand>();
+ _ = tcs.Task.ContinueWith(task =>
+ {
+ try
+ {
+ responseQueue.Enqueue(task);
+ }
+ catch
+ {
+ // Ignore
+ }
+ }, TaskContinuationOptions.NotOnCanceled |
TaskContinuationOptions.ExecuteSynchronously);
+
+ // Use CancellationToken.None here because otherwise it will
throw exceptions on all fault actions even retry.
+ var success = await _executor.TryExecuteOnce(() =>
channel.Send(sendOp.Metadata, sendOp.Data, tcs, cancellationToken),
CancellationToken.None).ConfigureAwait(false);
+
+ if (success)
+ continue;
+
+ _eventRegister.Register(new
ChannelDisconnected(_correlationId));
+ break;
}
- var tcs = new TaskCompletionSource<BaseCommand>();
- _ = tcs.Task.ContinueWith(task => responseQueue.Enqueue(task),
TaskContinuationOptions.NotOnCanceled |
TaskContinuationOptions.ExecuteSynchronously);
-
- // Use CancellationToken.None here because otherwise it will throw
exceptions on all fault actions even retry.
- var success = await _executor.TryExecuteOnce(() =>
channel.Send(sendOp.Metadata, sendOp.Data, tcs, cancellationToken),
CancellationToken.None).ConfigureAwait(false);
-
- if (success)
- continue;
-
- _eventRegister.Register(new ChannelDisconnected(_correlationId));
- break;
+ await responseProcessorTask.ConfigureAwait(false);
+ }
+ finally
+ {
+ responseQueue.Dispose();
}
-
- await responseProcessorTask.ConfigureAwait(false);
}
private async ValueTask ResponseProcessor(IDequeue<Task<BaseCommand>>
responseQueue, CancellationToken cancellationToken)