entvex commented on code in PR #162:
URL: https://github.com/apache/pulsar-dotpulsar/pull/162#discussion_r1288239847


##########
src/DotPulsar/Internal/Reader.cs:
##########
@@ -15,49 +15,140 @@
 namespace DotPulsar.Internal;
 
 using DotPulsar.Abstractions;
-using DotPulsar.Exceptions;
 using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Events;
+using DotPulsar.Internal.Compression;
 using DotPulsar.Internal.PulsarApi;
 using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
 
-public sealed class Reader<TMessage> : IContainsChannel, IReader<TMessage>
+public sealed class Reader<TMessage> : IReader<TMessage>
 {
-    private readonly Guid _correlationId;
-    private readonly IRegisterEvent _eventRegister;
-    private IConsumerChannel<TMessage> _channel;
+    private readonly TaskCompletionSource<IMessage<TMessage>> 
_emptyTaskCompletionSource;
+    private readonly ReaderOptions<TMessage> _readerOptions;
+    private readonly ConcurrentDictionary<string, SubReader<TMessage>> 
_subReaders;
+    private readonly IHandleException _exceptionHandler;
+    private readonly IConnectionPool _connectionPool;
+    private readonly ProcessManager _processManager;
+    private readonly CancellationTokenSource _cts;
     private readonly IExecute _executor;
-    private readonly IStateChanged<ReaderState> _state;
-    private readonly IConsumerChannelFactory<TMessage> _factory;
+    private readonly StateManager<ReaderState> _state;
+    private readonly SemaphoreSlim _semaphoreSlim;
+    private bool _allSubReadersIsReady;
+    private Task<IMessage<TMessage>>[] _receiveTaskQueueForSubReaders;
+    private int _subReaderIndex;
+    private bool _isPartitioned;
+    private uint _numberOfPartitions;
     private int _isDisposed;
     private Exception? _faultException;
 
     public Uri ServiceUrl { get; }
     public string Topic { get; }
 
     public Reader(
-        Guid correlationId,
         Uri serviceUrl,
-        string topic,
-        IRegisterEvent eventRegister,
-        IConsumerChannel<TMessage> initialChannel,
-        IExecute executor,
-        IStateChanged<ReaderState> state,
-        IConsumerChannelFactory<TMessage> factory)
+        ReaderOptions<TMessage> readerOptions,
+        ProcessManager processManager,
+        IHandleException exceptionHandler,
+        IConnectionPool connectionPool)
     {
-        _correlationId = correlationId;
         ServiceUrl = serviceUrl;
-        Topic = topic;
-        _eventRegister = eventRegister;
-        _channel = initialChannel;
-        _executor = executor;
-        _state = state;
-        _factory = factory;
+        Topic = readerOptions.Topic;
+        _readerOptions = readerOptions;
+        _connectionPool = connectionPool;
+        _processManager = processManager;
+        _exceptionHandler = exceptionHandler;
+        _semaphoreSlim = new SemaphoreSlim(1);
+        _state = CreateStateManager();
+        _receiveTaskQueueForSubReaders = 
Array.Empty<Task<IMessage<TMessage>>>();
+        _subReaders = new ConcurrentDictionary<string, SubReader<TMessage>>();
+        _cts = new CancellationTokenSource();
+        _executor = new Executor(Guid.Empty, _processManager, 
_exceptionHandler);
         _isDisposed = 0;
 
-        _eventRegister.Register(new ReaderCreated(_correlationId));
+        _emptyTaskCompletionSource = new 
TaskCompletionSource<IMessage<TMessage>>();
+
+        _ = Setup();
+    }
+
+    private async Task Setup()
+    {
+        try
+        {
+            await _executor.Execute(Monitor, _cts.Token).ConfigureAwait(false);
+        }
+        catch (Exception exception)
+        {
+            if (_cts.IsCancellationRequested)
+                return;
+
+            _faultException = exception;
+            _state.SetState(ReaderState.Faulted);
+        }
+    }
+
+    private async Task Monitor()
+    {
+        await _semaphoreSlim.WaitAsync().ConfigureAwait(false);
+        _numberOfPartitions = await 
_connectionPool.GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false);
+        _isPartitioned = _numberOfPartitions != 0;
+
+        _receiveTaskQueueForSubReaders = new 
Task<IMessage<TMessage>>[_numberOfPartitions];
+        for (var i = 0; i < _receiveTaskQueueForSubReaders.Length; i++)
+        {
+            _receiveTaskQueueForSubReaders[i] = 
_emptyTaskCompletionSource.Task;
+        }
+
+        if (_isPartitioned)
+        {
+            for (var partition = 0; partition < _numberOfPartitions; 
partition++)
+            {
+                var partitionedTopicName = GetPartitionedTopicName(partition);
+
+                var subReader = CreateSubReader(partitionedTopicName);
+                _ = _subReaders.TryAdd(partitionedTopicName, subReader);
+            }
+        }
+        else
+        {
+            var subReader = CreateSubReader(Topic);
+            _ = _subReaders.TryAdd(Topic, subReader);
+        }
+        _allSubReadersIsReady = true;

Review Comment:
   Fixed in both Reader and Consumer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to