This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d520b3e  Running the tests multiple times revealed some race 
conditions causing a hang when disposing the client and its producers, 
consumers, and readers after the current reworking of state management. This 
seems to be fixed by this commit and we can continue adding more tests.
d520b3e is described below

commit d520b3e1d145d9bf0821dbd3d467663e6d289b64
Author: Daniel Blankensteiner <d...@vmail.dk>
AuthorDate: Fri Oct 20 10:50:21 2023 +0200

    Running the tests multiple times revealed some race conditions causing a 
hang when disposing the client and its producers, consumers, and readers after 
the current reworking of state management. This seems to be fixed by this 
commit and we can continue adding more tests.
---
 .../Internal/CancelableCompletionSource.cs         | 24 +++++------
 src/DotPulsar/Internal/Connection.cs               | 29 +++----------
 src/DotPulsar/Internal/ConnectionPool.cs           | 13 +++---
 src/DotPulsar/Internal/Consumer.cs                 | 14 +++---
 src/DotPulsar/Internal/ConsumerProcess.cs          | 11 +++--
 src/DotPulsar/Internal/ProcessManager.cs           |  8 +---
 src/DotPulsar/Internal/Producer.cs                 | 13 +++---
 src/DotPulsar/Internal/ProducerProcess.cs          | 15 +++----
 src/DotPulsar/Internal/PulsarClientBuilder.cs      |  3 +-
 src/DotPulsar/Internal/PulsarClientFactory.cs      |  6 +--
 src/DotPulsar/Internal/Reader.cs                   | 16 +++----
 src/DotPulsar/Internal/ReaderProcess.cs            | 11 +++--
 src/DotPulsar/Internal/RequestResponseHandler.cs   |  4 +-
 src/DotPulsar/PulsarClient.cs                      | 50 ++++++++++++++++++----
 tests/DotPulsar.Tests/DotPulsar.Tests.csproj       |  4 +-
 tests/DotPulsar.Tests/Internal/ProducerTests.cs    |  7 +--
 16 files changed, 115 insertions(+), 113 deletions(-)

diff --git a/src/DotPulsar/Internal/CancelableCompletionSource.cs 
b/src/DotPulsar/Internal/CancelableCompletionSource.cs
index e974718..fe0fbf6 100644
--- a/src/DotPulsar/Internal/CancelableCompletionSource.cs
+++ b/src/DotPulsar/Internal/CancelableCompletionSource.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -22,19 +22,16 @@ public sealed class CancelableCompletionSource<T> : 
IDisposable
 {
     private readonly TaskCompletionSource<T> _source;
     private CancellationTokenRegistration? _registration;
-    private bool _isDisposed;
-    private readonly object _lock = new();
+    private int _isDisposed;
 
     public CancelableCompletionSource()
         => _source = new 
TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
 
     public void SetupCancellation(Action callback, CancellationToken token)
     {
-        lock (_lock)
-        {
-            if (!_isDisposed)
-                _registration = token.Register(callback);
-        }
+        _registration = token.Register(callback);
+        if (_isDisposed == 1)
+            _registration?.Dispose();
     }
 
     public void SetResult(T result)
@@ -47,11 +44,10 @@ public sealed class CancelableCompletionSource<T> : 
IDisposable
 
     public void Dispose()
     {
-        lock (_lock)
-        {
-            _isDisposed = true;
-            _ = _source.TrySetCanceled();
-            _registration?.Dispose();
-        }
+        if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
+            return;
+
+        _ = _source.TrySetCanceled();
+        _registration?.Dispose();
     }
 }
diff --git a/src/DotPulsar/Internal/Connection.cs 
b/src/DotPulsar/Internal/Connection.cs
index 1b72be8..5ffdb4e 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -50,34 +50,19 @@ public sealed class Connection : IConnection
         _ = Task.Factory.StartNew(() => Setup(_cancellationTokenSource.Token));
     }
 
-    public static async Task<Connection> Connect(
+    public static Connection Connect(
         IPulsarStream stream,
         IAuthentication? authentication,
-        CommandConnect commandConnect, 
+        CommandConnect commandConnect,
         TimeSpan keepAliveInterval,
-        TimeSpan closeOnInactiveInterval,
-        CancellationToken cancellationToken)
+        TimeSpan closeOnInactiveInterval)
     {
-        Connection? connection = null;
-
-        try
-        {
-            connection = new Connection(stream, authentication, 
keepAliveInterval, closeOnInactiveInterval);
-            var response = await connection.Send(commandConnect, 
cancellationToken).ConfigureAwait(false);
-            response.Expect(BaseCommand.Type.Connected);
-            connection.MaxMessageSize = response.Connected.MaxMessageSize;
-            DotPulsarMeter.ConnectionCreated();
-            return connection;
-        }
-        catch
-        {
-            if (connection is not null)
-                await connection.DisposeAsync().ConfigureAwait(false);
-            throw;
-        }
+        var connection = new Connection(stream, authentication, 
keepAliveInterval, closeOnInactiveInterval);
+        DotPulsarMeter.ConnectionCreated();
+        return connection;
     }
 
-    public int MaxMessageSize { get; private set; }
+    public int MaxMessageSize { get; set; }
 
     public async Task<ProducerResponse> Send(CommandProducer command, IChannel 
channel, CancellationToken cancellationToken)
     {
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs 
b/src/DotPulsar/Internal/ConnectionPool.cs
index 124b50d..48dac09 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -162,9 +162,12 @@ public sealed class ConnectionPool : IConnectionPool
         if (url.ProxyThroughServiceUrl)
             commandConnect = WithProxyToBroker(commandConnect, url.Logical);
 
-        var connection = await Connection.Connect(new PulsarStream(stream), 
_authentication, commandConnect, _keepAliveInterval, 
_closeInactiveConnectionsInterval, cancellationToken).ConfigureAwait(false);
+        var connection = Connection.Connect(new PulsarStream(stream), 
_authentication, commandConnect, _keepAliveInterval, 
_closeInactiveConnectionsInterval);
         _connections[url] = connection;
         _ = 
connection.OnStateChangeFrom(ConnectionState.Connected).AsTask().ContinueWith(t 
=> DisposeConnection(url));
+        var response = await connection.Send(commandConnect, 
cancellationToken).ConfigureAwait(false);
+        response.Expect(BaseCommand.Type.Connected);
+        connection.MaxMessageSize = response.Connected.MaxMessageSize;
         return connection;
     }
 
@@ -227,15 +230,15 @@ public sealed class ConnectionPool : IConnectionPool
             => $"{nameof(Physical)}: {Physical}, {nameof(Logical)}: {Logical}, 
{nameof(ProxyThroughServiceUrl)}: {ProxyThroughServiceUrl}";
     }
 
-    public async ValueTask<uint> GetNumberOfPartitions(String topic, 
CancellationToken cancellationToken = default)
+    public async ValueTask<uint> GetNumberOfPartitions(string topic, 
CancellationToken cancellationToken = default)
     {
         var connection = await FindConnectionForTopic(topic, 
cancellationToken).ConfigureAwait(false);
-        var commandPartitionedMetadata = new 
PulsarApi.CommandPartitionedTopicMetadata { Topic = topic };
+        var commandPartitionedMetadata = new CommandPartitionedTopicMetadata { 
Topic = topic };
         var response = await connection.Send(commandPartitionedMetadata, 
cancellationToken).ConfigureAwait(false);
 
-        
response.Expect(PulsarApi.BaseCommand.Type.PartitionedMetadataResponse);
+        response.Expect(BaseCommand.Type.PartitionedMetadataResponse);
 
-        if (response.PartitionMetadataResponse.Response == 
PulsarApi.CommandPartitionedTopicMetadataResponse.LookupType.Failed)
+        if (response.PartitionMetadataResponse.Response == 
CommandPartitionedTopicMetadataResponse.LookupType.Failed)
             response.PartitionMetadataResponse.Throw();
 
         return response.PartitionMetadataResponse.Partitions;
diff --git a/src/DotPulsar/Internal/Consumer.cs 
b/src/DotPulsar/Internal/Consumer.cs
index 8211a5d..20c8af7 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -51,7 +51,6 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
 
     public Consumer(
         Uri serviceUrl,
-        ProcessManager processManager,
         ConsumerOptions<TMessage> consumerOptions,
         IConnectionPool connectionPool,
         IHandleException exceptionHandler)
@@ -64,7 +63,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
         _cts = new CancellationTokenSource();
         _exceptionHandler = exceptionHandler;
         _semaphoreSlim = new SemaphoreSlim(1);
-        _processManager = processManager;
+        _processManager = new ProcessManager();
         _executor = new Executor(Guid.Empty, _processManager, 
_exceptionHandler);
         _consumerOptions = consumerOptions;
         _connectionPool = connectionPool;
@@ -72,7 +71,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
         _isPartitionedTopic = false;
         _allSubConsumersAreReady = false;
         _isDisposed = 0;
-        _subConsumers = null!;
+        _subConsumers = Array.Empty<SubConsumer<TMessage>>();
 
         _emptyTaskCompletionSource = new 
TaskCompletionSource<IMessage<TMessage>>();
 
@@ -173,15 +172,12 @@ public sealed class Consumer<TMessage> : 
IConsumer<TMessage>
         _cts.Cancel();
         _cts.Dispose();
 
-        _state.SetState(ConsumerState.Closed);
-
-        if (_subConsumers is null)
-            return;
+        await _processManager.DisposeAsync().ConfigureAwait(false);
 
         foreach (var subConsumer in _subConsumers)
-        {
             await subConsumer.DisposeAsync().ConfigureAwait(false);
-        }
+
+        _state.SetState(ConsumerState.Closed);
     }
 
     public async ValueTask<IMessage<TMessage>> Receive(CancellationToken 
cancellationToken)
diff --git a/src/DotPulsar/Internal/ConsumerProcess.cs 
b/src/DotPulsar/Internal/ConsumerProcess.cs
index 3cc0bee..e690283 100644
--- a/src/DotPulsar/Internal/ConsumerProcess.cs
+++ b/src/DotPulsar/Internal/ConsumerProcess.cs
@@ -21,7 +21,7 @@ using System.Threading.Tasks;
 public sealed class ConsumerProcess : Process
 {
     private readonly IStateManager<ConsumerState> _stateManager;
-    private readonly IContainsChannel _consumer;
+    private readonly IContainsChannel _subConsumer;
     private readonly bool _isFailoverSubscription;
 
     public ConsumerProcess(
@@ -31,7 +31,7 @@ public sealed class ConsumerProcess : Process
         bool isFailoverSubscription) : base(correlationId)
     {
         _stateManager = stateManager;
-        _consumer = consumer;
+        _subConsumer = consumer;
         _isFailoverSubscription = isFailoverSubscription;
     }
 
@@ -39,7 +39,6 @@ public sealed class ConsumerProcess : Process
     {
         await base.DisposeAsync().ConfigureAwait(false);
         _stateManager.SetState(ConsumerState.Closed);
-        await _consumer.DisposeAsync().ConfigureAwait(false);
     }
 
     protected override void CalculateState()
@@ -51,7 +50,7 @@ public sealed class ConsumerProcess : Process
         {
             var formerState = _stateManager.SetState(ConsumerState.Faulted);
             if (formerState != ConsumerState.Faulted)
-                ActionQueue.Enqueue(async _ => await 
_consumer.ChannelFaulted(Exception!).ConfigureAwait(false));
+                ActionQueue.Enqueue(async _ => await 
_subConsumer.ChannelFaulted(Exception!).ConfigureAwait(false));
             return;
         }
 
@@ -68,8 +67,8 @@ public sealed class ConsumerProcess : Process
                 _stateManager.SetState(ConsumerState.Disconnected);
                 ActionQueue.Enqueue(async x =>
                 {
-                    await _consumer.CloseChannel(x).ConfigureAwait(false);
-                    await 
_consumer.EstablishNewChannel(x).ConfigureAwait(false);
+                    await _subConsumer.CloseChannel(x).ConfigureAwait(false);
+                    await 
_subConsumer.EstablishNewChannel(x).ConfigureAwait(false);
                 });
                 return;
             case ChannelState.Connected:
diff --git a/src/DotPulsar/Internal/ProcessManager.cs 
b/src/DotPulsar/Internal/ProcessManager.cs
index 608b02e..9b6619d 100644
--- a/src/DotPulsar/Internal/ProcessManager.cs
+++ b/src/DotPulsar/Internal/ProcessManager.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -24,20 +24,16 @@ using System.Threading.Tasks;
 public sealed class ProcessManager : IRegisterEvent, IAsyncDisposable
 {
     private readonly ConcurrentDictionary<Guid, IProcess> _processes;
-    private readonly IConnectionPool _connectionPool;
 
-    public ProcessManager(IConnectionPool connectionPool)
+    public ProcessManager()
     {
         _processes = new ConcurrentDictionary<Guid, IProcess>();
-        _connectionPool = connectionPool;
     }
 
     public async ValueTask DisposeAsync()
     {
         foreach (var proc in _processes.Values.ToArray())
             await proc.DisposeAsync().ConfigureAwait(false);
-
-        await _connectionPool.DisposeAsync().ConfigureAwait(false);
     }
 
     public void Add(IProcess process)
diff --git a/src/DotPulsar/Internal/Producer.cs 
b/src/DotPulsar/Internal/Producer.cs
index 3d6b1f5..0015d08 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -55,7 +55,6 @@ public sealed class Producer<TMessage> : IProducer<TMessage>, 
IRegisterEvent
     public Producer(
         Uri serviceUrl,
         ProducerOptions<TMessage> options,
-        ProcessManager processManager,
         IHandleException exceptionHandler,
         IConnectionPool connectionPool,
         ICompressorFactory? compressorFactory)
@@ -82,7 +81,7 @@ public sealed class Producer<TMessage> : IProducer<TMessage>, 
IRegisterEvent
         _exceptionHandler = exceptionHandler;
         _connectionPool = connectionPool;
         _compressorFactory = compressorFactory;
-        _processManager = processManager;
+        _processManager = new ProcessManager();
         _messageRouter = options.MessageRouter;
         _cts = new CancellationTokenSource();
         _executor = new Executor(Guid.Empty, this, _exceptionHandler);
@@ -195,12 +194,12 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
         _cts.Cancel();
         _cts.Dispose();
 
-        _state.SetState(ProducerState.Closed);
+        await _processManager.DisposeAsync().ConfigureAwait(false);
 
-        foreach (var producer in _producers.Values)
-        {
-            await producer.DisposeAsync().ConfigureAwait(false);
-        }
+        foreach (var subProducer in _producers.Values)
+            await subProducer.DisposeAsync().ConfigureAwait(false);
+
+        _state.SetState(ProducerState.Closed);
     }
 
     private async ValueTask<int> ChoosePartitions(MessageMetadata metadata, 
CancellationToken cancellationToken)
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs 
b/src/DotPulsar/Internal/ProducerProcess.cs
index c61e331..6104f73 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -22,7 +22,7 @@ using System.Threading.Tasks;
 public sealed class ProducerProcess : Process
 {
     private readonly IStateManager<ProducerState> _stateManager;
-    private readonly IContainsProducerChannel _producer;
+    private readonly IContainsProducerChannel _subProducer;
 
     public ProducerProcess(
         Guid correlationId,
@@ -30,14 +30,13 @@ public sealed class ProducerProcess : Process
         IContainsProducerChannel producer) : base(correlationId)
     {
         _stateManager = stateManager;
-        _producer = producer;
+        _subProducer = producer;
     }
 
     public override async ValueTask DisposeAsync()
     {
         await base.DisposeAsync().ConfigureAwait(false);
         _stateManager.SetState(ProducerState.Closed);
-        await _producer.DisposeAsync().ConfigureAwait(false);
     }
 
     protected override void CalculateState()
@@ -47,10 +46,10 @@ public sealed class ProducerProcess : Process
 
         if (ExecutorState == ExecutorState.Faulted)
         {
-            var newState = Exception! is ProducerFencedException ? 
ProducerState.Fenced : ProducerState.Faulted;
+            var newState = Exception is ProducerFencedException ? 
ProducerState.Fenced : ProducerState.Faulted;
             var formerState = _stateManager.SetState(newState);
             if (formerState != ProducerState.Faulted && formerState != 
ProducerState.Fenced)
-                ActionQueue.Enqueue(async _ => await 
_producer.ChannelFaulted(Exception!).ConfigureAwait(false));
+                ActionQueue.Enqueue(async _ => await 
_subProducer.ChannelFaulted(Exception!).ConfigureAwait(false));
             return;
         }
 
@@ -61,14 +60,14 @@ public sealed class ProducerProcess : Process
                 _stateManager.SetState(ProducerState.Disconnected);
                 ActionQueue.Enqueue(async x =>
                 {
-                    await _producer.CloseChannel(x).ConfigureAwait(false);
-                    await 
_producer.EstablishNewChannel(x).ConfigureAwait(false);
+                    await _subProducer.CloseChannel(x).ConfigureAwait(false);
+                    await 
_subProducer.EstablishNewChannel(x).ConfigureAwait(false);
                 });
                 return;
             case ChannelState.Connected:
                 ActionQueue.Enqueue(async x =>
                 {
-                    await _producer.ActivateChannel(TopicEpoch, 
x).ConfigureAwait(false);
+                    await _subProducer.ActivateChannel(TopicEpoch, 
x).ConfigureAwait(false);
                     _stateManager.SetState(ProducerState.Connected);
                 });
                 return;
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs 
b/src/DotPulsar/Internal/PulsarClientBuilder.cs
index 0aaee42..3ea8a45 100644
--- a/src/DotPulsar/Internal/PulsarClientBuilder.cs
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -174,8 +174,7 @@ public sealed class PulsarClientBuilder : 
IPulsarClientBuilder
         var exceptionHandlers = new List<IHandleException>(_exceptionHandlers) 
{ new DefaultExceptionHandler() };
         var exceptionHandlerPipeline = new 
ExceptionHandlerPipeline(_retryInterval, exceptionHandlers);
         var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, 
connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval, 
_listenerName, _keepAliveInterval, _authentication);
-        var processManager = new ProcessManager(connectionPool);
 
-        return new PulsarClient(connectionPool, processManager, 
exceptionHandlerPipeline, _serviceUrl);
+        return new PulsarClient(connectionPool, exceptionHandlerPipeline, 
_serviceUrl);
     }
 }
diff --git a/src/DotPulsar/Internal/PulsarClientFactory.cs 
b/src/DotPulsar/Internal/PulsarClientFactory.cs
index 6b6e375..6de1408 100644
--- a/src/DotPulsar/Internal/PulsarClientFactory.cs
+++ b/src/DotPulsar/Internal/PulsarClientFactory.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -20,6 +20,6 @@ using System;
 
 public sealed class PulsarClientFactory
 {
-    public static PulsarClient CreatePulsarClient(IConnectionPool 
connectionPool, ProcessManager processManager, IHandleException 
exceptionHandler, Uri serviceUrl)
-        => new(connectionPool, processManager, exceptionHandler, serviceUrl);
+    public static PulsarClient CreatePulsarClient(IConnectionPool 
connectionPool, IHandleException exceptionHandler, Uri serviceUrl)
+        => new(connectionPool, exceptionHandler, serviceUrl);
 }
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index ebc62ec..e10c964 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -51,7 +51,6 @@ public sealed class Reader<TMessage> : IReader<TMessage>
     public Reader(
         Uri serviceUrl,
         ReaderOptions<TMessage> readerOptions,
-        ProcessManager processManager,
         IHandleException exceptionHandler,
         IConnectionPool connectionPool)
     {
@@ -59,7 +58,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
         Topic = readerOptions.Topic;
         _readerOptions = readerOptions;
         _connectionPool = connectionPool;
-        _processManager = processManager;
+        _processManager = new ProcessManager();
         _exceptionHandler = exceptionHandler;
         _semaphoreSlim = new SemaphoreSlim(1);
         _state = CreateStateManager();
@@ -67,7 +66,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
         _cts = new CancellationTokenSource();
         _executor = new Executor(Guid.Empty, _processManager, 
_exceptionHandler);
         _isDisposed = 0;
-        _subReaders = null!;
+        _subReaders = Array.Empty<SubReader<TMessage>>();
 
         _emptyTaskCompletionSource = new 
TaskCompletionSource<IMessage<TMessage>>();
 
@@ -275,15 +274,12 @@ public sealed class Reader<TMessage> : IReader<TMessage>
         _cts.Cancel();
         _cts.Dispose();
 
-        _state.SetState(ReaderState.Closed);
+        await _processManager.DisposeAsync().ConfigureAwait(false);
 
-        if (_subReaders is null)
-            return;
+        foreach (var subReader in _subReaders)
+            await subReader.DisposeAsync().ConfigureAwait(false);
 
-        foreach (var subConsumer in _subReaders)
-        {
-            await subConsumer.DisposeAsync().ConfigureAwait(false);
-        }
+        _state.SetState(ReaderState.Closed);
     }
 
     private static StateManager<ReaderState> CreateStateManager()
diff --git a/src/DotPulsar/Internal/ReaderProcess.cs 
b/src/DotPulsar/Internal/ReaderProcess.cs
index 9ab1cf3..9827a7b 100644
--- a/src/DotPulsar/Internal/ReaderProcess.cs
+++ b/src/DotPulsar/Internal/ReaderProcess.cs
@@ -21,7 +21,7 @@ using System.Threading.Tasks;
 public sealed class ReaderProcess : Process
 {
     private readonly IStateManager<ReaderState> _stateManager;
-    private readonly IContainsChannel _reader;
+    private readonly IContainsChannel _subReader;
 
     public ReaderProcess(
         Guid correlationId,
@@ -29,14 +29,13 @@ public sealed class ReaderProcess : Process
         IContainsChannel reader) : base(correlationId)
     {
         _stateManager = stateManager;
-        _reader = reader;
+        _subReader = reader;
     }
 
     public override async ValueTask DisposeAsync()
     {
         await base.DisposeAsync().ConfigureAwait(false);
         _stateManager.SetState(ReaderState.Closed);
-        await _reader.DisposeAsync().ConfigureAwait(false);
     }
 
     protected override void CalculateState()
@@ -48,7 +47,7 @@ public sealed class ReaderProcess : Process
         {
             var formerState = _stateManager.SetState(ReaderState.Faulted);
             if (formerState != ReaderState.Faulted)
-                ActionQueue.Enqueue(async _ => await 
_reader.ChannelFaulted(Exception!).ConfigureAwait(false));
+                ActionQueue.Enqueue(async _ => await 
_subReader.ChannelFaulted(Exception!).ConfigureAwait(false));
             return;
         }
 
@@ -59,8 +58,8 @@ public sealed class ReaderProcess : Process
                 _stateManager.SetState(ReaderState.Disconnected);
                 ActionQueue.Enqueue(async x =>
                 {
-                    await _reader.CloseChannel(x).ConfigureAwait(false);
-                    await _reader.EstablishNewChannel(x).ConfigureAwait(false);
+                    await _subReader.CloseChannel(x).ConfigureAwait(false);
+                    await 
_subReader.EstablishNewChannel(x).ConfigureAwait(false);
                 });
                 return;
             case ChannelState.Connected:
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs 
b/src/DotPulsar/Internal/RequestResponseHandler.cs
index e6f8dce..3ce8795 100644
--- a/src/DotPulsar/Internal/RequestResponseHandler.cs
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -103,7 +103,7 @@ public sealed class RequestResponseHandler : IDisposable
         return _requests.CreateTask(request);
     }
 
-    public Task<BaseCommand> Outgoing(CommandConnect _1)
+    public Task<BaseCommand> Outgoing(CommandConnect _)
         => _requests.CreateTask(new ConnectRequest());
 
     public Task<BaseCommand> Outgoing(CommandLookupTopic command)
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 17d6919..2fb7fa9 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -16,10 +16,12 @@ namespace DotPulsar;
 
 using DotPulsar.Abstractions;
 using DotPulsar.Exceptions;
+using DotPulsar.Extensions;
 using DotPulsar.Internal;
 using DotPulsar.Internal.Abstractions;
 using DotPulsar.Internal.Compression;
 using System;
+using System.Collections.Generic;
 using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
@@ -29,8 +31,9 @@ using System.Threading.Tasks;
 /// </summary>
 public sealed class PulsarClient : IPulsarClient
 {
+    private readonly object _lock = new();
+    private readonly HashSet<IAsyncDisposable> _disposables;
     private readonly IConnectionPool _connectionPool;
-    private readonly ProcessManager _processManager;
     private readonly IHandleException _exceptionHandler;
     private int _isDisposed;
 
@@ -38,12 +41,11 @@ public sealed class PulsarClient : IPulsarClient
 
     internal PulsarClient(
         IConnectionPool connectionPool,
-        ProcessManager processManager,
         IHandleException exceptionHandler,
         Uri serviceUrl)
     {
+        _disposables = new HashSet<IAsyncDisposable>();
         _connectionPool = connectionPool;
-        _processManager = processManager;
         _exceptionHandler = exceptionHandler;
         ServiceUrl = serviceUrl;
         _isDisposed = 0;
@@ -73,11 +75,13 @@ public sealed class PulsarClient : IPulsarClient
                 throw new CompressionException($"Support for {compressionType} 
compression was not found");
         }
 
-        var producer = new Producer<TMessage>(ServiceUrl, options, 
_processManager, _exceptionHandler, _connectionPool, compressorFactory);
+        var producer = new Producer<TMessage>(ServiceUrl, options, 
_exceptionHandler, _connectionPool, compressorFactory);
 
         if (options.StateChangedHandler is not null)
             _ = StateMonitor.MonitorProducer(producer, 
options.StateChangedHandler);
 
+        AddDisposable(producer);
+        producer.StateChangedTo(ProducerState.Closed).AsTask().ContinueWith(_ 
=> RemoveDisposable(producer));
         return producer;
     }
 
@@ -88,10 +92,12 @@ public sealed class PulsarClient : IPulsarClient
     {
         ThrowIfDisposed();
 
-        var consumer = new Consumer<TMessage>(ServiceUrl, _processManager, 
options, _connectionPool, _exceptionHandler);
+        var consumer = new Consumer<TMessage>(ServiceUrl, options, 
_connectionPool, _exceptionHandler);
         if (options.StateChangedHandler is not null)
             _ = StateMonitor.MonitorConsumer(consumer, 
options.StateChangedHandler);
 
+        AddDisposable(consumer);
+        consumer.StateChangedTo(ConsumerState.Closed).AsTask().ContinueWith(_ 
=> RemoveDisposable(consumer));
         return consumer;
     }
 
@@ -102,10 +108,12 @@ public sealed class PulsarClient : IPulsarClient
     {
         ThrowIfDisposed();
 
-        var reader = new Reader<TMessage>(ServiceUrl, options, 
_processManager, _exceptionHandler, _connectionPool);
+        var reader = new Reader<TMessage>(ServiceUrl, options, 
_exceptionHandler, _connectionPool);
         if (options.StateChangedHandler is not null)
             _ = StateMonitor.MonitorReader(reader, 
options.StateChangedHandler);
 
+        AddDisposable(reader);
+        reader.StateChangedTo(ReaderState.Closed).AsTask().ContinueWith(_ => 
RemoveDisposable(reader));
         return reader;
     }
 
@@ -117,12 +125,38 @@ public sealed class PulsarClient : IPulsarClient
         if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
             return;
 
-        if (_processManager is IAsyncDisposable disposable)
-            await disposable.DisposeAsync().ConfigureAwait(false);
+        IEnumerable<IAsyncDisposable> disposables;
+
+        lock (_lock)
+        {
+            disposables = _disposables.ToArray();
+            _disposables.Clear();
+        }
+
+        foreach (var item in disposables)
+            await item.DisposeAsync().ConfigureAwait(false);
+
+        await _connectionPool.DisposeAsync().ConfigureAwait(false);
 
         DotPulsarMeter.ClientDisposed();
     }
 
+    private void AddDisposable(IAsyncDisposable disposable)
+    {
+        lock (_lock)
+        {
+            _disposables.Add(disposable);
+        }
+    }
+
+    internal void RemoveDisposable(IAsyncDisposable disposable)
+    {
+        lock (_lock)
+        {
+            _disposables.Remove(disposable);
+        }
+    }
+
     private void ThrowIfDisposed()
     {
         if (_isDisposed != 0)
diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj 
b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
index e9925b3..44ff123 100644
--- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -16,7 +16,7 @@
     <PackageReference Include="K4os.Compression.LZ4" Version="1.3.6" />
     <PackageReference Include="NSubstitute" Version="5.1.0" />
     <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.7.2" />
-    <PackageReference Include="xunit" Version="2.5.2" />
+    <PackageReference Include="xunit" Version="2.5.3" />
     <PackageReference Include="xunit.runner.visualstudio" Version="2.5.3">
       <PrivateAssets>all</PrivateAssets>
       <IncludeAssets>runtime; build; native; contentfiles; analyzers; 
buildtransitive</IncludeAssets>
@@ -26,7 +26,7 @@
       <IncludeAssets>runtime; build; native; contentfiles; analyzers; 
buildtransitive</IncludeAssets>
     </PackageReference>
     <PackageReference Include="ZstdNet" Version="1.4.5" />
-    <PackageReference Include="ZstdSharp.Port" Version="0.7.2" />
+    <PackageReference Include="ZstdSharp.Port" Version="0.7.3" />
   </ItemGroup>
 
   <ItemGroup>
diff --git a/tests/DotPulsar.Tests/Internal/ProducerTests.cs 
b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
index dd88b17..1aa7010 100644
--- a/tests/DotPulsar.Tests/Internal/ProducerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
@@ -65,11 +65,11 @@ public class ProducerTests
         await using var client = CreateClient();
         await using var producer = CreateProducer(client, topicName);
         await using var consumer = CreateConsumer(client, topicName);
-        var messageId = MessageId.Earliest;
+        var idTask = new 
TaskCompletionSource<MessageId>(TaskCreationOptions.RunContinuationsAsynchronously);
 
         ValueTask SetMessageId(MessageId id)
         {
-            messageId = id;
+            idTask.SetResult(id);
             return ValueTask.CompletedTask;
         }
 
@@ -78,9 +78,10 @@ public class ProducerTests
         producer.SendChannel.Complete();
         await producer.SendChannel.Completion();
         var message = await consumer.Receive();
+        var expected = await idTask.Task;
 
         //Assert
-        message.MessageId.Should().Be(messageId);
+        message.MessageId.Should().Be(expected);
         message.Value().Should().Be(content);
     }
 

Reply via email to