entvex commented on code in PR #162: URL: https://github.com/apache/pulsar-dotpulsar/pull/162#discussion_r1288271769
########## src/DotPulsar/Internal/Reader.cs: ########## @@ -72,86 +163,158 @@ public bool IsFinalState() public bool IsFinalState(ReaderState state) => _state.IsFinalState(state); + [Obsolete("GetLastMessageId is obsolete. Please use GetLastMessageIds instead.")] public async ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken) { - var getLastMessageId = new CommandGetLastMessageId(); - return await _executor.Execute(() => InternalGetLastMessageId(getLastMessageId, cancellationToken), cancellationToken).ConfigureAwait(false); + await Guard(cancellationToken).ConfigureAwait(false); + + if (_isPartitioned) + { + throw new NotSupportedException("GetLastMessageId can't be used on partitioned topics. Please use GetLastMessageIds"); + } + else + { + return await _subReaders[Topic].GetLastMessageId(cancellationToken).ConfigureAwait(false); + } } - private async ValueTask<MessageId> InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken cancellationToken) + public async ValueTask<IEnumerable<MessageId>> GetLastMessageIds(CancellationToken cancellationToken) { - Guard(); - return await _channel.Send(command, cancellationToken).ConfigureAwait(false); + await Guard(cancellationToken).ConfigureAwait(false); + + if (_isPartitioned) + { + Task<MessageId>[] getLastMessageIdsTasks = new Task<MessageId>[_numberOfPartitions]; + + for (var i = 0; i < _subReaders.Count; i++) + { + var getLastMessageIdTask = _subReaders.Values.ElementAt(i).GetLastMessageId(cancellationToken); + getLastMessageIdsTasks[i] = getLastMessageIdTask.AsTask(); + } + + //await all of the tasks. + await Task.WhenAll(getLastMessageIdsTasks).ConfigureAwait(false); + + //collect MessageIds + List<MessageId> messageIds = new List<MessageId>(); + for (var i = 0; i < _subReaders.Count; i++) + { + messageIds.Add(getLastMessageIdsTasks[i].Result); + } + return messageIds; + } + else + { + MessageId[] messageIds = new MessageId[1]; + messageIds[0] = await _subReaders[Topic].GetLastMessageId(cancellationToken).ConfigureAwait(false); + return messageIds; + } } public async ValueTask<IMessage<TMessage>> Receive(CancellationToken cancellationToken) - => await _executor.Execute(() => InternalReceive(cancellationToken), cancellationToken).ConfigureAwait(false); - - private async ValueTask<IMessage<TMessage>> InternalReceive(CancellationToken cancellationToken) { - Guard(); - return await _channel.Receive(cancellationToken).ConfigureAwait(false); + await Guard(cancellationToken).ConfigureAwait(false); + + if (_isPartitioned) + { + var iterations = 0; + while (true) + { + _subReaderIndex++; + iterations++; + if (_subReaderIndex == _subReaders.Count) + _subReaderIndex = 0; + + var receiveTask = _receiveTaskQueueForSubReaders[_subReaderIndex]; + if (receiveTask == _emptyTaskCompletionSource.Task) + { + var receiveTaskValueTask = _subReaders.ElementAt(_subReaderIndex).Value.Receive(cancellationToken); + + if (receiveTaskValueTask.IsCompleted) + return receiveTaskValueTask.Result; + + _receiveTaskQueueForSubReaders[_subReaderIndex] = receiveTaskValueTask.AsTask(); + } + else + { + if (receiveTask.IsCompleted) + { + _receiveTaskQueueForSubReaders[_subReaderIndex] = _emptyTaskCompletionSource.Task; + return receiveTask.Result; + } + } + if (iterations == _subReaders.Count) + await Task.WhenAny(_receiveTaskQueueForSubReaders).ConfigureAwait(false); + } + } + else + { + return await _subReaders[Topic].Receive(cancellationToken).ConfigureAwait(false); + } } public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken) { - var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() }; - await _executor.Execute(() => InternalSeek(seek, cancellationToken), cancellationToken).ConfigureAwait(false); + await Guard(cancellationToken).ConfigureAwait(false); + + if (_isPartitioned) + { + var seekTasks = new Task[_numberOfPartitions]; + for (var i = 0; i < _subReaders.Values.Count; i++) Review Comment: _subReaders.Value does not return an int. But we could use _subReaders.Values.Count, do you like that better ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org