This is an automated email from the ASF dual-hosted git repository. blankensteiner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push: new f35e682 Make ready for release 3.1.1-rc.1 Fixed a bug where disposing a disconnected consumer, reader or producer would cause a hang. Updated NuGet packages new c328300 Merge branch 'master' of https://github.com/apache/pulsar-dotpulsar f35e682 is described below commit f35e6824f0790696bf4300dcd7a00967cf6c250e Author: Daniel Blankensteiner <d...@vmail.dk> AuthorDate: Wed Dec 6 11:23:29 2023 +0100 Make ready for release 3.1.1-rc.1 Fixed a bug where disposing a disconnected consumer, reader or producer would cause a hang. Updated NuGet packages --- CHANGELOG.md | 6 ++++ src/DotPulsar/Internal/Abstractions/Process.cs | 4 +-- src/DotPulsar/Internal/ConnectionPool.cs | 15 ++++----- src/DotPulsar/Internal/Connector.cs | 46 ++++++++++++++++++++++---- tests/DotPulsar.Tests/DotPulsar.Tests.csproj | 4 +-- 5 files changed, 55 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 60a4562..18faebf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [3.1.1-rc.1] - 2023-12-06 + +### Fixed + +- Fixed a bug where disposing a disconnected consumer, reader or producer would cause a hang + ## [3.1.0] - 2023-11-28 ### Added diff --git a/src/DotPulsar/Internal/Abstractions/Process.cs b/src/DotPulsar/Internal/Abstractions/Process.cs index a2c5158..981653e 100644 --- a/src/DotPulsar/Internal/Abstractions/Process.cs +++ b/src/DotPulsar/Internal/Abstractions/Process.cs @@ -49,10 +49,8 @@ public abstract class Process : IProcess public virtual async ValueTask DisposeAsync() { _cancellationTokenSource.Cancel(); - if (_actionProcessorTask != null) - { + if (_actionProcessorTask is not null) await _actionProcessorTask.ConfigureAwait(false); - } } public void Handle(IEvent e) diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs index 1cd7e7c..bf164ff 100644 --- a/src/DotPulsar/Internal/ConnectionPool.cs +++ b/src/DotPulsar/Internal/ConnectionPool.cs @@ -21,7 +21,6 @@ using DotPulsar.Internal.Extensions; using DotPulsar.Internal.PulsarApi; using System; using System.Collections.Concurrent; -using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -64,9 +63,9 @@ public sealed class ConnectionPool : IConnectionPool { _cancellationTokenSource.Cancel(); - foreach (var serviceUrl in _connections.Keys.ToArray()) + foreach (var entry in _connections.ToArray()) { - await DisposeConnection(serviceUrl).ConfigureAwait(false); + await DisposeConnection(entry.Key, entry.Value).ConfigureAwait(false); } } @@ -149,14 +148,14 @@ public sealed class ConnectionPool : IConnectionPool private async Task<Connection> EstablishNewConnection(PulsarUrl url, CancellationToken cancellationToken) { - var stream = await _connector.Connect(url.Physical).ConfigureAwait(false); + var stream = await _connector.Connect(url.Physical, cancellationToken).ConfigureAwait(false); var commandConnect = _commandConnect; if (url.ProxyThroughServiceUrl) commandConnect = WithProxyToBroker(commandConnect, url.Logical); var connection = Connection.Connect(new PulsarStream(stream), _authentication, _keepAliveInterval, _closeInactiveConnectionsInterval); - _ = connection.OnStateChangeFrom(ConnectionState.Connected).AsTask().ContinueWith(t => DisposeConnection(url)); + _ = connection.OnStateChangeFrom(ConnectionState.Connected, CancellationToken.None).AsTask().ContinueWith(t => DisposeConnection(url, connection)); var response = await connection.Send(commandConnect, cancellationToken).ConfigureAwait(false); response.Expect(BaseCommand.Type.Connected); _connections[url] = connection; @@ -164,10 +163,10 @@ public sealed class ConnectionPool : IConnectionPool return connection; } - private async ValueTask DisposeConnection(PulsarUrl serviceUrl) + private async ValueTask DisposeConnection(PulsarUrl serviceUrl, Connection connection) { - if (_connections.TryRemove(serviceUrl, out var connection) && connection is not null) - await connection.DisposeAsync().ConfigureAwait(false); + _connections.TryRemove(serviceUrl, out var _); + await connection.DisposeAsync().ConfigureAwait(false); } private static CommandConnect WithProxyToBroker(CommandConnect commandConnect, Uri logicalUrl) diff --git a/src/DotPulsar/Internal/Connector.cs b/src/DotPulsar/Internal/Connector.cs index 6af6bfb..b4f4bdd 100644 --- a/src/DotPulsar/Internal/Connector.cs +++ b/src/DotPulsar/Internal/Connector.cs @@ -21,6 +21,7 @@ using System.Net.Security; using System.Net.Sockets; using System.Security.Authentication; using System.Security.Cryptography.X509Certificates; +using System.Threading; using System.Threading.Tasks; public sealed class Connector @@ -45,7 +46,7 @@ public sealed class Connector _checkCertificateRevocation = checkCertificateRevocation; } - public async Task<Stream> Connect(Uri serviceUrl) + public async Task<Stream> Connect(Uri serviceUrl, CancellationToken cancellationToken) { var scheme = serviceUrl.Scheme; var host = serviceUrl.Host; @@ -55,15 +56,15 @@ public sealed class Connector if (port == -1) port = encrypt ? Constants.DefaultPulsarSSLPort : Constants.DefaultPulsarPort; - var stream = await GetStream(host, port).ConfigureAwait(false); + var stream = await GetStream(host, port, cancellationToken).ConfigureAwait(false); if (encrypt) - stream = await EncryptStream(stream, host).ConfigureAwait(false); + stream = await EncryptStream(stream, host, cancellationToken).ConfigureAwait(false); return stream; } - private static async Task<Stream> GetStream(string host, int port) + private static async Task<Stream> GetStream(string host, int port, CancellationToken cancellationToken) { var tcpClient = new TcpClient(); @@ -71,10 +72,17 @@ public sealed class Connector { var type = Uri.CheckHostName(host); +#if NETSTANDARD2_0 || NETSTANDARD2_1 if (type == UriHostNameType.IPv4 || type == UriHostNameType.IPv6) await tcpClient.ConnectAsync(IPAddress.Parse(host), port).ConfigureAwait(false); else await tcpClient.ConnectAsync(host, port).ConfigureAwait(false); +#else + if (type == UriHostNameType.IPv4 || type == UriHostNameType.IPv6) + await tcpClient.ConnectAsync(IPAddress.Parse(host), port, cancellationToken).ConfigureAwait(false); + else + await tcpClient.ConnectAsync(host, port, cancellationToken).ConfigureAwait(false); +#endif return tcpClient.GetStream(); } @@ -85,7 +93,8 @@ public sealed class Connector } } - private async Task<Stream> EncryptStream(Stream stream, string host) +#if NETSTANDARD2_0 + private async Task<Stream> EncryptStream(Stream stream, string host, CancellationToken _) { SslStream? sslStream = null; @@ -97,20 +106,43 @@ public sealed class Connector } catch { -#if NETSTANDARD2_0 if (sslStream is null) stream.Dispose(); else sslStream.Dispose(); + + throw; + } + } #else + private async Task<Stream> EncryptStream(Stream stream, string host, CancellationToken cancellationToken) + { + SslStream? sslStream = null; + + try + { + sslStream = new SslStream(stream, false, ValidateServerCertificate, null); + var options = new SslClientAuthenticationOptions + { + TargetHost = host, + ClientCertificates = _clientCertificates, + EnabledSslProtocols = SslProtocols.None, + CertificateRevocationCheckMode = _checkCertificateRevocation ? X509RevocationMode.Online : X509RevocationMode.NoCheck + }; + await sslStream.AuthenticateAsClientAsync(options, cancellationToken).ConfigureAwait(false); + return sslStream; + } + catch + { if (sslStream is null) await stream.DisposeAsync().ConfigureAwait(false); else await sslStream.DisposeAsync().ConfigureAwait(false); -#endif + throw; } } +#endif private bool ValidateServerCertificate(object sender, X509Certificate? certificate, X509Chain? chain, SslPolicyErrors sslPolicyErrors) { diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj index 27d6c88..38717ad 100644 --- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj +++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj @@ -7,8 +7,8 @@ </PropertyGroup> <ItemGroup> - <PackageReference Include="AutoFixture.AutoNSubstitute" Version="4.18.0" /> - <PackageReference Include="AutoFixture.Xunit2" Version="4.18.0" /> + <PackageReference Include="AutoFixture.AutoNSubstitute" Version="4.18.1" /> + <PackageReference Include="AutoFixture.Xunit2" Version="4.18.1" /> <PackageReference Include="DotNetZip" Version="1.16.0" /> <PackageReference Include="Ductus.FluentDocker" Version="2.10.59" /> <PackageReference Include="FluentAssertions" Version="6.12.0" />