kandersen82 commented on code in PR #109:
URL: https://github.com/apache/pulsar-dotpulsar/pull/109#discussion_r1018349593


##########
src/DotPulsar/Internal/SubProducer.cs:
##########
@@ -80,30 +80,133 @@ public async ValueTask DisposeAsync()
             return;
 
         _eventRegister.Register(new ProducerDisposed(_correlationId));
+        _newChannelLock.Dispose();
+
+        try
+        {
+            _dispatcherCts?.Cancel();
+            _dispatcherCts?.Dispose();
+            await (_dispatcherTask ?? 
Task.CompletedTask).ConfigureAwait(false);
+        }
+        catch
+        {
+            // Ignored
+        }
+        await _sendQueue.DisposeAsync().ConfigureAwait(false);
         await 
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
         await _channel.DisposeAsync().ConfigureAwait(false);
     }
 
-    public async ValueTask<MessageId> Send(MessageMetadata metadata, TMessage 
message, CancellationToken cancellationToken)
-        => await _executor.Execute(() => InternalSend(metadata.Metadata, 
_schema.Encode(message), cancellationToken), 
cancellationToken).ConfigureAwait(false);
+    public async ValueTask Send(SendOp sendOp, CancellationToken 
cancellationToken)
+    {
+        if (IsFinalState()) throw new ProducerClosedException(); // TODO: This 
exception might be intended for other purposes.
+        await _sendQueue.Enqueue(sendOp, 
cancellationToken).ConfigureAwait(false);
+    }
+
+    public async ValueTask WaitForSendQueueEmpty(CancellationToken 
cancellationToken)
+    {
+        await _sendQueue.WaitForEmpty(cancellationToken).ConfigureAwait(false);
+    }
+
+    private async Task MessageDispatcher(IProducerChannel channel, 
CancellationToken cancellationToken)
+    {
+        var responseQueue = new AsyncQueue<Task<BaseCommand>>();
+        var responseProcessorTask = ResponseProcessor(responseQueue, 
cancellationToken);
+
+        while (!cancellationToken.IsCancellationRequested)
+        {
+            SendOp sendOp = await 
_sendQueue.NextItem(cancellationToken).ConfigureAwait(false);
+
+            if (sendOp.CancellationToken.IsCancellationRequested)
+            {
+                _sendQueue.RemoveCurrentItem();
+                continue;
+            }
+
+            var tcs = new TaskCompletionSource<BaseCommand>();
+            _ = tcs.Task.ContinueWith(task => responseQueue.Enqueue(task),
+                TaskContinuationOptions.NotOnCanceled | 
TaskContinuationOptions.ExecuteSynchronously);
 
-    public async ValueTask<MessageId> Send(PulsarApi.MessageMetadata metadata, 
ReadOnlySequence<byte> data, CancellationToken cancellationToken)
-        => await _executor.Execute(() => InternalSend(metadata, data, 
cancellationToken), cancellationToken).ConfigureAwait(false);
+            // Use CancellationToken.None here because otherwise it will throw 
exceptions on all fault actions even retry.
+            bool success = await _executor.TryExecuteOnce(() => 
channel.Send(sendOp.Metadata, sendOp.Data, tcs, cancellationToken), 
CancellationToken.None).ConfigureAwait(false);
 
-    private async ValueTask<MessageId> InternalSend(PulsarApi.MessageMetadata 
metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
+            if (success) continue;
+            _eventRegister.Register(new ChannelDisconnected(_correlationId));
+            break;
+        }
+
+        await responseProcessorTask.ConfigureAwait(false);
+    }
+
+    private async ValueTask ResponseProcessor(IDequeue<Task<BaseCommand>> 
responseQueue, CancellationToken cancellationToken)
     {
-        var response = await _channel.Send(metadata, data, 
cancellationToken).ConfigureAwait(false);
-        return response.MessageId.ToMessageId();
+        while (!cancellationToken.IsCancellationRequested)
+        {
+            var responseTask = await 
responseQueue.Dequeue(cancellationToken).ConfigureAwait(false);
+
+            bool success = await _executor.TryExecuteOnce(() =>
+            {
+                if (responseTask.IsFaulted) throw responseTask.Exception!;
+                responseTask.Result.Expect(BaseCommand.Type.SendReceipt);
+                ProcessReceipt(responseTask.Result.SendReceipt);
+            }, CancellationToken.None).ConfigureAwait(false); // Use 
CancellationToken.None here because otherwise it will throw exceptions on all 
fault actions even retry.
+
+            // TODO: Should we crate a new event instead of channel 
disconnected?
+            if (success) continue;
+            _eventRegister.Register(new ChannelDisconnected(_correlationId));
+            break;
+        }
     }
 
-    public async Task EstablishNewChannel(CancellationToken cancellationToken)
+    private void ProcessReceipt(CommandSendReceipt sendReceipt)
     {
-        var channel = await _executor.Execute(() => 
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
+        ulong receiptSequenceId = sendReceipt.SequenceId;
+
+        if (!_sendQueue.TryPeek(out SendOp? sendOp) || sendOp is null)
+            throw new ProducerSendReceiptOrderingException($"Received 
sequenceId {receiptSequenceId} but send queue is empty");
+
+        ulong expectedSequenceId = sendOp.Metadata.SequenceId;
 
-        var oldChannel = _channel;
-        if (oldChannel is not null)
+        if (receiptSequenceId != expectedSequenceId)
+            throw new ProducerSendReceiptOrderingException($"Received 
sequenceId {receiptSequenceId}. Expected {expectedSequenceId}");
+
+        _sendQueue.Dequeue();
+        sendOp.ReceiptTcs.TrySetResult(sendReceipt.MessageId.ToMessageId());
+    }
+
+    public async Task EstablishNewChannel(CancellationToken cancellationToken)
+    {
+        try
+        {
+            await 
_newChannelLock.WaitAsync(cancellationToken).ConfigureAwait(false);
+
+            if (_dispatcherCts is not null && 
!_dispatcherCts.IsCancellationRequested)
+            {
+                _dispatcherCts.Cancel();
+                _dispatcherCts.Dispose();
+            }
+
+            await _executor.TryExecuteOnce(() => _dispatcherTask ?? 
Task.CompletedTask, cancellationToken).ConfigureAwait(false);
+
+            var oldChannel = _channel;
+            // TODO: Not sure we need to actually close the channel?
+            await 
oldChannel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
+            // TODO: Why does IProducerChannel.DisposeAsync not do anything?
             await oldChannel.DisposeAsync().ConfigureAwait(false);
 
-        _channel = channel;
+            _channel = await _executor.Execute(() => 
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
+
+            _sendQueue.ResetCursor();
+            _dispatcherCts = new CancellationTokenSource();
+            _dispatcherTask = MessageDispatcher(_channel, 
_dispatcherCts.Token);
+        }
+        catch (Exception)
+        {
+            // Ignored

Review Comment:
   All the potential exceptions that can occur are related to disposal or 
stopping scenarios, so it should work as is. I do think that it could make 
sense to maybe split into multiple try catch statements to make it more 
explicit what is going on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to