entvex commented on code in PR #162: URL: https://github.com/apache/pulsar-dotpulsar/pull/162#discussion_r1288239847
########## src/DotPulsar/Internal/Reader.cs: ########## @@ -15,49 +15,140 @@ namespace DotPulsar.Internal; using DotPulsar.Abstractions; -using DotPulsar.Exceptions; using DotPulsar.Internal.Abstractions; -using DotPulsar.Internal.Events; +using DotPulsar.Internal.Compression; using DotPulsar.Internal.PulsarApi; using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; -public sealed class Reader<TMessage> : IContainsChannel, IReader<TMessage> +public sealed class Reader<TMessage> : IReader<TMessage> { - private readonly Guid _correlationId; - private readonly IRegisterEvent _eventRegister; - private IConsumerChannel<TMessage> _channel; + private readonly TaskCompletionSource<IMessage<TMessage>> _emptyTaskCompletionSource; + private readonly ReaderOptions<TMessage> _readerOptions; + private readonly ConcurrentDictionary<string, SubReader<TMessage>> _subReaders; + private readonly IHandleException _exceptionHandler; + private readonly IConnectionPool _connectionPool; + private readonly ProcessManager _processManager; + private readonly CancellationTokenSource _cts; private readonly IExecute _executor; - private readonly IStateChanged<ReaderState> _state; - private readonly IConsumerChannelFactory<TMessage> _factory; + private readonly StateManager<ReaderState> _state; + private readonly SemaphoreSlim _semaphoreSlim; + private bool _allSubReadersIsReady; + private Task<IMessage<TMessage>>[] _receiveTaskQueueForSubReaders; + private int _subReaderIndex; + private bool _isPartitioned; + private uint _numberOfPartitions; private int _isDisposed; private Exception? _faultException; public Uri ServiceUrl { get; } public string Topic { get; } public Reader( - Guid correlationId, Uri serviceUrl, - string topic, - IRegisterEvent eventRegister, - IConsumerChannel<TMessage> initialChannel, - IExecute executor, - IStateChanged<ReaderState> state, - IConsumerChannelFactory<TMessage> factory) + ReaderOptions<TMessage> readerOptions, + ProcessManager processManager, + IHandleException exceptionHandler, + IConnectionPool connectionPool) { - _correlationId = correlationId; ServiceUrl = serviceUrl; - Topic = topic; - _eventRegister = eventRegister; - _channel = initialChannel; - _executor = executor; - _state = state; - _factory = factory; + Topic = readerOptions.Topic; + _readerOptions = readerOptions; + _connectionPool = connectionPool; + _processManager = processManager; + _exceptionHandler = exceptionHandler; + _semaphoreSlim = new SemaphoreSlim(1); + _state = CreateStateManager(); + _receiveTaskQueueForSubReaders = Array.Empty<Task<IMessage<TMessage>>>(); + _subReaders = new ConcurrentDictionary<string, SubReader<TMessage>>(); + _cts = new CancellationTokenSource(); + _executor = new Executor(Guid.Empty, _processManager, _exceptionHandler); _isDisposed = 0; - _eventRegister.Register(new ReaderCreated(_correlationId)); + _emptyTaskCompletionSource = new TaskCompletionSource<IMessage<TMessage>>(); + + _ = Setup(); + } + + private async Task Setup() + { + try + { + await _executor.Execute(Monitor, _cts.Token).ConfigureAwait(false); + } + catch (Exception exception) + { + if (_cts.IsCancellationRequested) + return; + + _faultException = exception; + _state.SetState(ReaderState.Faulted); + } + } + + private async Task Monitor() + { + await _semaphoreSlim.WaitAsync().ConfigureAwait(false); + _numberOfPartitions = await _connectionPool.GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false); + _isPartitioned = _numberOfPartitions != 0; + + _receiveTaskQueueForSubReaders = new Task<IMessage<TMessage>>[_numberOfPartitions]; + for (var i = 0; i < _receiveTaskQueueForSubReaders.Length; i++) + { + _receiveTaskQueueForSubReaders[i] = _emptyTaskCompletionSource.Task; + } + + if (_isPartitioned) + { + for (var partition = 0; partition < _numberOfPartitions; partition++) + { + var partitionedTopicName = GetPartitionedTopicName(partition); + + var subReader = CreateSubReader(partitionedTopicName); + _ = _subReaders.TryAdd(partitionedTopicName, subReader); + } + } + else + { + var subReader = CreateSubReader(Topic); + _ = _subReaders.TryAdd(Topic, subReader); + } + _allSubReadersIsReady = true; Review Comment: Fixed in both Reader and Consumer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org