This is an automated email from the ASF dual-hosted git repository. blankensteiner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
commit 53c5b1bc17bc868858cdde3b5f337c3eb301831b Author: Daniel Blankensteiner <[email protected]> AuthorDate: Thu Oct 10 12:52:34 2024 +0200 Added error details on exceptions from the connector and fixed issue with deadlocked DisposeAsync on consumers, readers and producers --- CHANGELOG.md | 9 +++++-- benchmarks/Compression/Compression.csproj | 2 +- samples/Processing/Processing.csproj | 2 +- src/DotPulsar/DotPulsar.csproj | 2 +- src/DotPulsar/Internal/AsyncLock.cs | 45 ++++++++++++++++++++----------- src/DotPulsar/Internal/AsyncQueue.cs | 28 ++++++++++++++----- src/DotPulsar/Internal/Connector.cs | 14 ++++++++-- 7 files changed, 74 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a98ab70..328db4d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,11 +9,16 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ### Added - Multiple messages can now be acknowledged with Acknowledge(IEnumerable\<MessageId> messageIds, CancellationToken cancellationToken) -- ProcessingOptions has a new ShutdownGracePeriod property for doing a graceful shutdown by allowing active tasks to finish +- ProcessingOptions has a new ShutdownGracePeriod property for doing a graceful shutdown by allowing active tasks to finish ### Changed -- Updated the Microsoft.Extensions.ObjectPool dependency from version 8.0.7 to 8.0.8 +- Updated the Microsoft.Extensions.ObjectPool dependency from version 8.0.7 to 8.0.10 +- 'SslPolicyErrors' are added to the 'Data' property of the exception thrown when failing to connect + +- ### Fixed + +- When disposing producers, consumers, or readers 'DisposeAsync' would sometimes hang ## [3.3.2] - 2024-08-07 diff --git a/benchmarks/Compression/Compression.csproj b/benchmarks/Compression/Compression.csproj index 41f8afb..36a5f6b 100644 --- a/benchmarks/Compression/Compression.csproj +++ b/benchmarks/Compression/Compression.csproj @@ -11,7 +11,7 @@ <PackageReference Include="BenchmarkDotNet" Version="0.14.0" /> <PackageReference Include="DotNetZip" Version="1.16.0" /> <PackageReference Include="Google.Protobuf" Version="3.28.2" /> - <PackageReference Include="Grpc.Tools" Version="2.66.0"> + <PackageReference Include="Grpc.Tools" Version="2.67.0"> <PrivateAssets>all</PrivateAssets> <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> </PackageReference> diff --git a/samples/Processing/Processing.csproj b/samples/Processing/Processing.csproj index fbb25bd..f2d243d 100644 --- a/samples/Processing/Processing.csproj +++ b/samples/Processing/Processing.csproj @@ -8,7 +8,7 @@ </PropertyGroup> <ItemGroup> - <PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" /> + <PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.1" /> </ItemGroup> <ItemGroup> diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj index ab79788..e072ab2 100644 --- a/src/DotPulsar/DotPulsar.csproj +++ b/src/DotPulsar/DotPulsar.csproj @@ -24,7 +24,7 @@ <ItemGroup> <PackageReference Include="HashDepot" Version="2.0.3" /> - <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="8.0.8" /> + <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="8.0.10" /> <PackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" PrivateAssets="all" IncludeAssets="runtime; build; native; contentfiles; analyzers; buildtransitive" /> <PackageReference Include="protobuf-net" Version="3.2.30" /> <PackageReference Include="System.IO.Pipelines" Version="8.0.0" /> diff --git a/src/DotPulsar/Internal/AsyncLock.cs b/src/DotPulsar/Internal/AsyncLock.cs index 2a488ac..1134d7f 100644 --- a/src/DotPulsar/Internal/AsyncLock.cs +++ b/src/DotPulsar/Internal/AsyncLock.cs @@ -58,17 +58,22 @@ public sealed class AsyncLock : IAsyncDisposable public async ValueTask DisposeAsync() { - lock (_pending) - { - if (Interlocked.Exchange(ref _isDisposed, 1) != 0) - return; + if (Interlocked.Exchange(ref _isDisposed, 1) != 0) + return; - foreach (var pending in _pending) - pending.Dispose(); + IEnumerable<CancelableCompletionSource<IDisposable>> pending; + lock (_pending) + { + pending = _pending.ToArray(); _pending.Clear(); } + foreach (var ccs in pending) + { + ccs.Dispose(); + } + await _semaphoreSlim.WaitAsync().ConfigureAwait(false); //Wait for possible lock-holder to finish _semaphoreSlim.Release(); @@ -82,31 +87,41 @@ public sealed class AsyncLock : IAsyncDisposable try { _pending.Remove(node); - node.Value.Dispose(); } catch { // Ignore } } + + try + { + node.Value.Dispose(); + } + catch + { + // Ignore + } } private void Release() { + LinkedListNode<CancelableCompletionSource<IDisposable>>? node; + lock (_pending) { - var node = _pending.First; + node = _pending.First; if (node is not null) - { - node.Value.SetResult(_releaser); - node.Value.Dispose(); _pending.RemoveFirst(); - return; - } - - if (_semaphoreSlim.CurrentCount == 0) + else if (_semaphoreSlim.CurrentCount == 0) _semaphoreSlim.Release(); } + + if (node is not null) + { + node.Value.SetResult(_releaser); + node.Value.Dispose(); + } } private void ThrowIfDisposed() diff --git a/src/DotPulsar/Internal/AsyncQueue.cs b/src/DotPulsar/Internal/AsyncQueue.cs index e0560c0..0675542 100644 --- a/src/DotPulsar/Internal/AsyncQueue.cs +++ b/src/DotPulsar/Internal/AsyncQueue.cs @@ -33,20 +33,23 @@ public sealed class AsyncQueue<T> : IEnqueue<T>, IDequeue<T>, IDisposable public void Enqueue(T item) { + LinkedListNode<CancelableCompletionSource<T>>? node; + lock (_lock) { ThrowIfDisposed(); - var node = _pendingDequeues.First; + node = _pendingDequeues.First; if (node is not null) { node.Value.SetResult(item); - node.Value.Dispose(); _pendingDequeues.RemoveFirst(); } else _queue.Enqueue(item); } + + node?.Value.Dispose(); } public ValueTask<T> Dequeue(CancellationToken cancellationToken = default) @@ -72,14 +75,19 @@ public sealed class AsyncQueue<T> : IEnqueue<T>, IDequeue<T>, IDisposable if (Interlocked.Exchange(ref _isDisposed, 1) != 0) return; + IEnumerable<CancelableCompletionSource<T>> pendingDequeues; + lock (_lock) { - foreach (var pendingDequeue in _pendingDequeues) - pendingDequeue.Dispose(); - + pendingDequeues = _pendingDequeues.ToArray(); _pendingDequeues.Clear(); _queue.Clear(); } + + foreach (var ccs in pendingDequeues) + { + ccs.Dispose(); + } } private void Cancel(LinkedListNode<CancelableCompletionSource<T>> node) @@ -88,7 +96,6 @@ public sealed class AsyncQueue<T> : IEnqueue<T>, IDequeue<T>, IDisposable { try { - node.Value.Dispose(); _pendingDequeues.Remove(node); } catch @@ -96,6 +103,15 @@ public sealed class AsyncQueue<T> : IEnqueue<T>, IDequeue<T>, IDisposable // ignored } } + + try + { + node.Value.Dispose(); + } + catch + { + // ignored + } } private void ThrowIfDisposed() diff --git a/src/DotPulsar/Internal/Connector.cs b/src/DotPulsar/Internal/Connector.cs index 6c8964b..f68aeb8 100644 --- a/src/DotPulsar/Internal/Connector.cs +++ b/src/DotPulsar/Internal/Connector.cs @@ -118,10 +118,17 @@ public sealed class Connector private async Task<Stream> EncryptStream(Stream stream, string host, CancellationToken cancellationToken) { SslStream? sslStream = null; + var policyErrors = SslPolicyErrors.None; + + bool Validate(object sender, X509Certificate? certificate, X509Chain? chain, SslPolicyErrors sslPolicyErrors) + { + policyErrors = sslPolicyErrors; + return ValidateServerCertificate(sender, certificate, chain, sslPolicyErrors); + } try { - sslStream = new SslStream(stream, false, ValidateServerCertificate, null); + sslStream = new SslStream(stream, false, Validate, null); var options = new SslClientAuthenticationOptions { TargetHost = host, @@ -132,13 +139,16 @@ public sealed class Connector await sslStream.AuthenticateAsClientAsync(options, cancellationToken).ConfigureAwait(false); return sslStream; } - catch + catch (Exception exception) { if (sslStream is null) await stream.DisposeAsync().ConfigureAwait(false); else await sslStream.DisposeAsync().ConfigureAwait(false); + if (policyErrors != SslPolicyErrors.None) + exception.Data.Add("SslPolicyErrors", policyErrors); + throw; } }
