This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f35e682  Make ready for release 3.1.1-rc.1 Fixed a bug where disposing 
a disconnected consumer, reader or producer would cause a hang. Updated NuGet 
packages
     new c328300  Merge branch 'master' of 
https://github.com/apache/pulsar-dotpulsar
f35e682 is described below

commit f35e6824f0790696bf4300dcd7a00967cf6c250e
Author: Daniel Blankensteiner <d...@vmail.dk>
AuthorDate: Wed Dec 6 11:23:29 2023 +0100

    Make ready for release 3.1.1-rc.1
    Fixed a bug where disposing a disconnected consumer, reader or producer 
would cause a hang.
    Updated NuGet packages
---
 CHANGELOG.md                                   |  6 ++++
 src/DotPulsar/Internal/Abstractions/Process.cs |  4 +--
 src/DotPulsar/Internal/ConnectionPool.cs       | 15 ++++-----
 src/DotPulsar/Internal/Connector.cs            | 46 ++++++++++++++++++++++----
 tests/DotPulsar.Tests/DotPulsar.Tests.csproj   |  4 +--
 5 files changed, 55 insertions(+), 20 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 60a4562..18faebf 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,12 @@ All notable changes to this project will be documented in this 
file.
 
 The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to 
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
+## [3.1.1-rc.1] - 2023-12-06
+
+### Fixed
+
+- Fixed a bug where disposing a disconnected consumer, reader or producer 
would cause a hang
+
 ## [3.1.0] - 2023-11-28
 
 ### Added
diff --git a/src/DotPulsar/Internal/Abstractions/Process.cs 
b/src/DotPulsar/Internal/Abstractions/Process.cs
index a2c5158..981653e 100644
--- a/src/DotPulsar/Internal/Abstractions/Process.cs
+++ b/src/DotPulsar/Internal/Abstractions/Process.cs
@@ -49,10 +49,8 @@ public abstract class Process : IProcess
     public virtual async ValueTask DisposeAsync()
     {
         _cancellationTokenSource.Cancel();
-        if (_actionProcessorTask != null)
-        {
+        if (_actionProcessorTask is not null)
             await _actionProcessorTask.ConfigureAwait(false);
-        }
     }
 
     public void Handle(IEvent e)
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs 
b/src/DotPulsar/Internal/ConnectionPool.cs
index 1cd7e7c..bf164ff 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -21,7 +21,6 @@ using DotPulsar.Internal.Extensions;
 using DotPulsar.Internal.PulsarApi;
 using System;
 using System.Collections.Concurrent;
-using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -64,9 +63,9 @@ public sealed class ConnectionPool : IConnectionPool
     {
         _cancellationTokenSource.Cancel();
 
-        foreach (var serviceUrl in _connections.Keys.ToArray())
+        foreach (var entry in _connections.ToArray())
         {
-            await DisposeConnection(serviceUrl).ConfigureAwait(false);
+            await DisposeConnection(entry.Key, 
entry.Value).ConfigureAwait(false);
         }
     }
 
@@ -149,14 +148,14 @@ public sealed class ConnectionPool : IConnectionPool
 
     private async Task<Connection> EstablishNewConnection(PulsarUrl url, 
CancellationToken cancellationToken)
     {
-        var stream = await 
_connector.Connect(url.Physical).ConfigureAwait(false);
+        var stream = await _connector.Connect(url.Physical, 
cancellationToken).ConfigureAwait(false);
 
         var commandConnect = _commandConnect;
         if (url.ProxyThroughServiceUrl)
             commandConnect = WithProxyToBroker(commandConnect, url.Logical);
 
         var connection = Connection.Connect(new PulsarStream(stream), 
_authentication, _keepAliveInterval, _closeInactiveConnectionsInterval);
-        _ = 
connection.OnStateChangeFrom(ConnectionState.Connected).AsTask().ContinueWith(t 
=> DisposeConnection(url));
+        _ = connection.OnStateChangeFrom(ConnectionState.Connected, 
CancellationToken.None).AsTask().ContinueWith(t => DisposeConnection(url, 
connection));
         var response = await connection.Send(commandConnect, 
cancellationToken).ConfigureAwait(false);
         response.Expect(BaseCommand.Type.Connected);
         _connections[url] = connection;
@@ -164,10 +163,10 @@ public sealed class ConnectionPool : IConnectionPool
         return connection;
     }
 
-    private async ValueTask DisposeConnection(PulsarUrl serviceUrl)
+    private async ValueTask DisposeConnection(PulsarUrl serviceUrl, Connection 
connection)
     {
-        if (_connections.TryRemove(serviceUrl, out var connection) && 
connection is not null)
-            await connection.DisposeAsync().ConfigureAwait(false);
+        _connections.TryRemove(serviceUrl, out var _);
+        await connection.DisposeAsync().ConfigureAwait(false);
     }
 
     private static CommandConnect WithProxyToBroker(CommandConnect 
commandConnect, Uri logicalUrl)
diff --git a/src/DotPulsar/Internal/Connector.cs 
b/src/DotPulsar/Internal/Connector.cs
index 6af6bfb..b4f4bdd 100644
--- a/src/DotPulsar/Internal/Connector.cs
+++ b/src/DotPulsar/Internal/Connector.cs
@@ -21,6 +21,7 @@ using System.Net.Security;
 using System.Net.Sockets;
 using System.Security.Authentication;
 using System.Security.Cryptography.X509Certificates;
+using System.Threading;
 using System.Threading.Tasks;
 
 public sealed class Connector
@@ -45,7 +46,7 @@ public sealed class Connector
         _checkCertificateRevocation = checkCertificateRevocation;
     }
 
-    public async Task<Stream> Connect(Uri serviceUrl)
+    public async Task<Stream> Connect(Uri serviceUrl, CancellationToken 
cancellationToken)
     {
         var scheme = serviceUrl.Scheme;
         var host = serviceUrl.Host;
@@ -55,15 +56,15 @@ public sealed class Connector
         if (port == -1)
             port = encrypt ? Constants.DefaultPulsarSSLPort : 
Constants.DefaultPulsarPort;
 
-        var stream = await GetStream(host, port).ConfigureAwait(false);
+        var stream = await GetStream(host, port, 
cancellationToken).ConfigureAwait(false);
 
         if (encrypt)
-            stream = await EncryptStream(stream, host).ConfigureAwait(false);
+            stream = await EncryptStream(stream, host, 
cancellationToken).ConfigureAwait(false);
 
         return stream;
     }
 
-    private static async Task<Stream> GetStream(string host, int port)
+    private static async Task<Stream> GetStream(string host, int port, 
CancellationToken cancellationToken)
     {
         var tcpClient = new TcpClient();
 
@@ -71,10 +72,17 @@ public sealed class Connector
         {
             var type = Uri.CheckHostName(host);
 
+#if NETSTANDARD2_0 || NETSTANDARD2_1
             if (type == UriHostNameType.IPv4 || type == UriHostNameType.IPv6)
                 await tcpClient.ConnectAsync(IPAddress.Parse(host), 
port).ConfigureAwait(false);
             else
                 await tcpClient.ConnectAsync(host, port).ConfigureAwait(false);
+#else
+            if (type == UriHostNameType.IPv4 || type == UriHostNameType.IPv6)
+                await tcpClient.ConnectAsync(IPAddress.Parse(host), port, 
cancellationToken).ConfigureAwait(false);
+            else
+                await tcpClient.ConnectAsync(host, port, 
cancellationToken).ConfigureAwait(false);
+#endif
 
             return tcpClient.GetStream();
         }
@@ -85,7 +93,8 @@ public sealed class Connector
         }
     }
 
-    private async Task<Stream> EncryptStream(Stream stream, string host)
+#if NETSTANDARD2_0
+    private async Task<Stream> EncryptStream(Stream stream, string host, 
CancellationToken _)
     {
         SslStream? sslStream = null;
 
@@ -97,20 +106,43 @@ public sealed class Connector
         }
         catch
         {
-#if NETSTANDARD2_0
             if (sslStream is null)
                 stream.Dispose();
             else
                 sslStream.Dispose();
+
+            throw;
+        }
+    }
 #else
+    private async Task<Stream> EncryptStream(Stream stream, string host, 
CancellationToken cancellationToken)
+    {
+        SslStream? sslStream = null;
+
+        try
+        {
+            sslStream = new SslStream(stream, false, 
ValidateServerCertificate, null);
+            var options = new SslClientAuthenticationOptions
+            {
+                TargetHost = host,
+                ClientCertificates = _clientCertificates,
+                EnabledSslProtocols = SslProtocols.None,
+                CertificateRevocationCheckMode = _checkCertificateRevocation ? 
X509RevocationMode.Online : X509RevocationMode.NoCheck
+            };
+            await sslStream.AuthenticateAsClientAsync(options, 
cancellationToken).ConfigureAwait(false);
+            return sslStream;
+        }
+        catch
+        {
             if (sslStream is null)
                 await stream.DisposeAsync().ConfigureAwait(false);
             else
                 await sslStream.DisposeAsync().ConfigureAwait(false);
-#endif
+
             throw;
         }
     }
+#endif
 
     private bool ValidateServerCertificate(object sender, X509Certificate? 
certificate, X509Chain? chain, SslPolicyErrors sslPolicyErrors)
     {
diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj 
b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
index 27d6c88..38717ad 100644
--- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -7,8 +7,8 @@
   </PropertyGroup>
 
   <ItemGroup>
-    <PackageReference Include="AutoFixture.AutoNSubstitute" Version="4.18.0" />
-    <PackageReference Include="AutoFixture.Xunit2" Version="4.18.0" />
+    <PackageReference Include="AutoFixture.AutoNSubstitute" Version="4.18.1" />
+    <PackageReference Include="AutoFixture.Xunit2" Version="4.18.1" />
     <PackageReference Include="DotNetZip" Version="1.16.0" />
     <PackageReference Include="Ductus.FluentDocker" Version="2.10.59" />
     <PackageReference Include="FluentAssertions" Version="6.12.0" />

Reply via email to