entvex commented on code in PR #162: URL: https://github.com/apache/pulsar-dotpulsar/pull/162#discussion_r1293213224
########## src/DotPulsar/Internal/Consumer.cs: ########## @@ -86,140 +143,270 @@ public async ValueTask DisposeAsync() if (Interlocked.Exchange(ref _isDisposed, 1) != 0) return; - _eventRegister.Register(new ConsumerDisposed(_correlationId)); - await DisposeChannel().ConfigureAwait(false); + foreach (var subConsumer in _subConsumers) + { + await subConsumer.DisposeAsync().ConfigureAwait(false); + } } - private async ValueTask DisposeChannel() + public async ValueTask<IMessage<TMessage>> Receive(CancellationToken cancellationToken) { - await _channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false); - await _channel.DisposeAsync().ConfigureAwait(false); - } + await Guard(cancellationToken).ConfigureAwait(false); - public async ValueTask<IMessage<TMessage>> Receive(CancellationToken cancellationToken) - => await _executor.Execute(() => InternalReceive(cancellationToken), cancellationToken).ConfigureAwait(false); + if (!_isPartitioned) + return await _subConsumers[_subConsumerIndex].Receive(cancellationToken).ConfigureAwait(false); + + var iterations = 0; + while (true) + { + iterations++; + _subConsumerIndex++; + if (_subConsumerIndex == _subConsumers.Length) + _subConsumerIndex = 0; + + var receiveTask = _receiveTaskQueueForSubConsumers[_subConsumerIndex]; + if (receiveTask == _emptyTaskCompletionSource.Task) + { + var receiveTaskValueTask = _subConsumers[_subConsumerIndex].Receive(cancellationToken); + if (receiveTaskValueTask.IsCompleted) + return receiveTaskValueTask.Result; + _receiveTaskQueueForSubConsumers[_subConsumerIndex] = receiveTaskValueTask.AsTask(); + } + else + { + if (receiveTask.IsCompleted) + { + _receiveTaskQueueForSubConsumers[_subConsumerIndex] = _emptyTaskCompletionSource.Task; + return receiveTask.Result; + } + } + if (iterations == _subConsumers.Length) + await Task.WhenAny(_receiveTaskQueueForSubConsumers).ConfigureAwait(false); + } + } public async ValueTask Acknowledge(MessageId messageId, CancellationToken cancellationToken) - => await InternalAcknowledge(messageId, CommandAck.AckType.Individual, cancellationToken).ConfigureAwait(false); + { + await Guard(cancellationToken).ConfigureAwait(false); + + if (!_isPartitioned) + await _subConsumers[_subConsumerIndex].Acknowledge(messageId, cancellationToken).ConfigureAwait(false); + else + await _subConsumers[messageId.Partition].Acknowledge(messageId, cancellationToken).ConfigureAwait(false); + } public async ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken) - => await InternalAcknowledge(messageId, CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false); + { + await Guard(cancellationToken).ConfigureAwait(false); + + if (!_isPartitioned) + await _subConsumers[_subConsumerIndex].AcknowledgeCumulative(messageId, cancellationToken).ConfigureAwait(false); + else + await _subConsumers[messageId.Partition].AcknowledgeCumulative(messageId, cancellationToken).ConfigureAwait(false); + } public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken) { - var command = new CommandRedeliverUnacknowledgedMessages(); - command.MessageIds.AddRange(messageIds.Select(messageId => messageId.ToMessageIdData())); - await _executor.Execute(() => InternalRedeliverUnacknowledgedMessages(command, cancellationToken), cancellationToken).ConfigureAwait(false); + await Guard(cancellationToken).ConfigureAwait(false); + + if (!_isPartitioned) + await _subConsumers[_subConsumerIndex].RedeliverUnacknowledgedMessages(messageIds, cancellationToken).ConfigureAwait(false); + else + { + var messageIdSortedIntoTopics = new Dictionary<int, LinkedList<MessageId>>(_numberOfPartitions); + //sort messageIds into topics + foreach (var messageId in messageIds) + { + if (messageIdSortedIntoTopics.ContainsKey(messageId.Partition)) + { + messageIdSortedIntoTopics[messageId.Partition].AddLast(messageId); + } + else + { + var linkedList = new LinkedList<MessageId>(); + linkedList.AddLast(messageId); + messageIdSortedIntoTopics.Add(messageId.Partition, linkedList); + } + } + var redeliverUnacknowledgedMessagesTasks = new Task[messageIdSortedIntoTopics.Count]; + var iterations = -1; + //Collect tasks from _subConsumers RedeliverUnacknowledgedMessages without waiting + foreach (var messageIdSortedByPartition in messageIdSortedIntoTopics) + { + iterations++; + var task = _subConsumers[messageIdSortedByPartition.Key].RedeliverUnacknowledgedMessages(messageIdSortedByPartition.Value, cancellationToken).AsTask(); + redeliverUnacknowledgedMessagesTasks[iterations] = task; + } + //await all of the tasks. + await Task.WhenAll(redeliverUnacknowledgedMessagesTasks).ConfigureAwait(false); + } } public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken) - => await RedeliverUnacknowledgedMessages(Enumerable.Empty<MessageId>(), cancellationToken).ConfigureAwait(false); + { + await Guard(cancellationToken).ConfigureAwait(false); + + if (!_isPartitioned) + await _subConsumers[_subConsumerIndex].RedeliverUnacknowledgedMessages(cancellationToken).ConfigureAwait(false); + else + { + foreach (var subConsumer in _subConsumers) + { + await subConsumer.RedeliverUnacknowledgedMessages(cancellationToken).ConfigureAwait(false); + } + } + } public async ValueTask Unsubscribe(CancellationToken cancellationToken) { - var unsubscribe = new CommandUnsubscribe(); - await _executor.Execute(() => InternalUnsubscribe(unsubscribe, cancellationToken), cancellationToken).ConfigureAwait(false); + await Guard(cancellationToken).ConfigureAwait(false); + + if (!_isPartitioned) + await _subConsumers[_subConsumerIndex].Unsubscribe(cancellationToken).ConfigureAwait(false); + else + { + var unsubscribeTasks = new List<Task>(_numberOfPartitions); + foreach (var subConsumer in _subConsumers) + { + var getLastMessageIdTask = subConsumer.Unsubscribe(cancellationToken); + unsubscribeTasks.Add(getLastMessageIdTask.AsTask()); + } + await Task.WhenAll(unsubscribeTasks).ConfigureAwait(false); + } } public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken) { - var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() }; - await _executor.Execute(() => InternalSeek(seek, cancellationToken), cancellationToken).ConfigureAwait(false); + await Guard(cancellationToken).ConfigureAwait(false); + + if (!_isPartitioned) + await _subConsumers[_subConsumerIndex].Seek(messageId, cancellationToken).ConfigureAwait(false); + else + { + var seekTasks = new List<Task>(_numberOfPartitions); + foreach (var subConsumer in _subConsumers) + { + var getLastMessageIdTask = subConsumer.Seek(messageId, cancellationToken); + seekTasks.Add(getLastMessageIdTask.AsTask()); + } + await Task.WhenAll(seekTasks).ConfigureAwait(false); + } } public async ValueTask Seek(ulong publishTime, CancellationToken cancellationToken) { - var seek = new CommandSeek { MessagePublishTime = publishTime }; - await _executor.Execute(() => InternalSeek(seek, cancellationToken), cancellationToken).ConfigureAwait(false); + await Guard(cancellationToken).ConfigureAwait(false); + + if (!_isPartitioned) + await _subConsumers[_subConsumerIndex].Seek(publishTime, cancellationToken).ConfigureAwait(false); + else + { + var seekTasks = new List<Task>(_numberOfPartitions); + foreach (var subConsumer in _subConsumers) + { + var getLastMessageIdTask = subConsumer.Seek(publishTime, cancellationToken); + seekTasks.Add(getLastMessageIdTask.AsTask()); + } + await Task.WhenAll(seekTasks).ConfigureAwait(false); + } } + [Obsolete("GetLastMessageId is obsolete. Please use GetLastMessageIds instead.")] public async ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken) { - var getLastMessageId = new CommandGetLastMessageId(); - return await _executor.Execute(() => InternalGetLastMessageId(getLastMessageId, cancellationToken), cancellationToken).ConfigureAwait(false); - } + await Guard(cancellationToken).ConfigureAwait(false); - private void Guard() - { - if (_isDisposed != 0) - throw new ConsumerDisposedException(GetType().FullName!); + if (!_isPartitioned) + return await _subConsumers[_subConsumerIndex].GetLastMessageId(cancellationToken).ConfigureAwait(false); - if (_faultException is not null) - throw new ConsumerFaultedException(_faultException); + throw new NotSupportedException("GetLastMessageId can't be used on partitioned topics. Please use GetLastMessageIds"); } - public async Task EstablishNewChannel(CancellationToken cancellationToken) + public async ValueTask<IEnumerable<MessageId>> GetLastMessageIds(CancellationToken cancellationToken) { - var channel = await _executor.Execute(() => _factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false); + await Guard(cancellationToken).ConfigureAwait(false); - var oldChannel = _channel; - if (oldChannel is not null) - await oldChannel.DisposeAsync().ConfigureAwait(false); + if (!_isPartitioned) + return new[] { await _subConsumers[_subConsumerIndex].GetLastMessageId(cancellationToken).ConfigureAwait(false) }; - _channel = channel; - } + var getLastMessageIdsTasks = new List<Task<MessageId>>(_numberOfPartitions); - public async ValueTask CloseChannel(CancellationToken cancellationToken) - => await _channel.ClosedByClient(cancellationToken).ConfigureAwait(false); + foreach (var subConsumer in _subConsumers) + { + var getLastMessageIdTask = subConsumer.GetLastMessageId(cancellationToken); + getLastMessageIdsTasks.Add(getLastMessageIdTask.AsTask()); + } - public async ValueTask ChannelFaulted(Exception exception) - { - _faultException = exception; - await DisposeChannel().ConfigureAwait(false); - } + //await all of the tasks. + await Task.WhenAll(getLastMessageIdsTasks).ConfigureAwait(false); - private async ValueTask InternalAcknowledge(CommandAck command, CancellationToken cancellationToken) - { - Guard(); - await _channel.Send(command, cancellationToken).ConfigureAwait(false); + //collect MessageIds + var messageIds = new List<MessageId>(); + for (var i = 0; i < _subConsumers.Length; i++) + { + messageIds.Add(getLastMessageIdsTasks[i].Result); + } + return messageIds; } - private async ValueTask InternalRedeliverUnacknowledgedMessages(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken) + private SubConsumer<TMessage> CreateSubConsumer(string topic) { - Guard(); - await _channel.Send(command, cancellationToken).ConfigureAwait(false); - } + var correlationId = Guid.NewGuid(); + var consumerName = _consumerOptions.ConsumerName ?? $"Consumer-{correlationId:N}"; - private async ValueTask<MessageId> InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken cancellationToken) - { - Guard(); - return await _channel.Send(command, cancellationToken).ConfigureAwait(false); - } + var subscribe = new CommandSubscribe + { + ConsumerName = consumerName, + InitialPosition = (CommandSubscribe.InitialPositionType) _consumerOptions.InitialPosition, + PriorityLevel = _consumerOptions.PriorityLevel, + ReadCompacted = _consumerOptions.ReadCompacted, + ReplicateSubscriptionState = _consumerOptions.ReplicateSubscriptionState, + Subscription = _consumerOptions.SubscriptionName, + Topic = topic, + Type = (CommandSubscribe.SubType) _consumerOptions.SubscriptionType + }; + + foreach (var property in _consumerOptions.SubscriptionProperties) + { + var keyValue = new KeyValue { Key = property.Key, Value = property.Value }; + subscribe.SubscriptionProperties.Add(keyValue); + } - private async Task InternalSeek(CommandSeek command, CancellationToken cancellationToken) - { - Guard(); - await _channel.Send(command, cancellationToken).ConfigureAwait(false); + var messagePrefetchCount = _consumerOptions.MessagePrefetchCount; + var messageFactory = new MessageFactory<TMessage>(_consumerOptions.Schema); + var batchHandler = new BatchHandler<TMessage>(true, messageFactory); + var decompressorFactories = CompressionFactories.DecompressorFactories(); + var consumerChannelFactory = new ConsumerChannelFactory<TMessage>(correlationId, _processManager, _connectionPool, subscribe, + messagePrefetchCount, batchHandler, messageFactory, decompressorFactories, topic); + var stateManager = CreateStateManager(); + var initialChannel = new NotReadyChannel<TMessage>(); + var executor = new Executor(correlationId, _processManager, _exceptionHandler); + + var subConsumer = new SubConsumer<TMessage>(correlationId, ServiceUrl, _consumerOptions.SubscriptionName, topic, + _processManager, initialChannel, executor, stateManager, consumerChannelFactory); + + if (_consumerOptions.StateChangedHandler is not null) + _ = StateMonitor.MonitorConsumer(subConsumer, _consumerOptions.StateChangedHandler); + + var process = new ConsumerProcess(correlationId, stateManager, subConsumer, _consumerOptions.SubscriptionType == SubscriptionType.Failover); + _processManager.Add(process); + process.Start(); + return subConsumer; } - - private async ValueTask<IMessage<TMessage>> InternalReceive(CancellationToken cancellationToken) + private string GetPartitionedTopicName(int partitionNumber) Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org