blankensteiner commented on code in PR #109:
URL: https://github.com/apache/pulsar-dotpulsar/pull/109#discussion_r1009409230
##########
src/DotPulsar/Abstractions/IProducerBuilder.cs:
##########
@@ -54,6 +54,11 @@ public interface IProducerBuilder<TMessage>
/// </summary>
IProducerBuilder<TMessage> MessageRouter(IMessageRouter messageRouter);
+ /// <summary>
+ /// Set the max size of the queue holding the messages pending to receive
an acknowledgment from the broker. The default is 500.
Review Comment:
Would it make sense to set the default to 1000 (prefetch is also 1000)? What
does the java client set it to?
##########
src/DotPulsar/Internal/Producer.cs:
##########
@@ -40,17 +40,20 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
private readonly ICompressorFactory? _compressorFactory;
private readonly ProducerOptions<TMessage> _options;
private readonly ProcessManager _processManager;
- private readonly ConcurrentDictionary<int, SubProducer<TMessage>>
_producers;
+ private readonly ConcurrentDictionary<int, SubProducer> _producers;
private readonly IMessageRouter _messageRouter;
private readonly CancellationTokenSource _cts;
private readonly IExecute _executor;
private int _isDisposed;
private int _producerCount;
+ private ISendChannel<TMessage>? _sendChannel;
private Exception? _throw;
public Uri ServiceUrl { get; }
public string Topic { get; }
+ public ISendChannel<TMessage> SendChannel { get => _sendChannel ??= new
SendChannel<TMessage>(this); }
Review Comment:
I don't think this is thread-safe, so better just to create it in the
constructor.
##########
src/DotPulsar/Internal/SendChannel.cs:
##########
@@ -0,0 +1,35 @@
+namespace DotPulsar.Internal;
+
+using DotPulsar.Abstractions;
+using Exceptions;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+public class SendChannel<TMessage> : ISendChannel<TMessage>
Review Comment:
Let's keep it sealed
##########
src/DotPulsar/Internal/Executor.cs:
##########
@@ -35,114 +35,147 @@ public Executor(Guid correlationId, IRegisterEvent
eventRegister, IHandleExcepti
}
public async ValueTask Execute(Action action, CancellationToken
cancellationToken)
+ {
+ while (!await TryExecuteOnce(action, cancellationToken)) { }
+ }
+
+ public async ValueTask Execute(Func<Task> func, CancellationToken
cancellationToken)
+ {
+ while (!await TryExecuteOnce(func, cancellationToken)) { }
+ }
+
+ public async ValueTask Execute(Func<ValueTask> func, CancellationToken
cancellationToken)
+ {
+ while (!await TryExecuteOnce(func, cancellationToken)) { }
+ }
+
+ public async ValueTask<TResult> Execute<TResult>(Func<TResult> func,
CancellationToken cancellationToken)
{
while (true)
{
- try
- {
- action();
- return;
- }
- catch (Exception ex)
- {
- if (await Handle(ex, cancellationToken).ConfigureAwait(false))
- throw;
- }
-
- cancellationToken.ThrowIfCancellationRequested();
+ var (success, result) = await TryExecuteOnce(func,
cancellationToken);
Review Comment:
.ConfigureAwait(false);
##########
src/DotPulsar/Internal/Executor.cs:
##########
@@ -35,114 +35,147 @@ public Executor(Guid correlationId, IRegisterEvent
eventRegister, IHandleExcepti
}
public async ValueTask Execute(Action action, CancellationToken
cancellationToken)
+ {
+ while (!await TryExecuteOnce(action, cancellationToken)) { }
+ }
+
+ public async ValueTask Execute(Func<Task> func, CancellationToken
cancellationToken)
+ {
+ while (!await TryExecuteOnce(func, cancellationToken)) { }
+ }
+
+ public async ValueTask Execute(Func<ValueTask> func, CancellationToken
cancellationToken)
+ {
+ while (!await TryExecuteOnce(func, cancellationToken)) { }
+ }
+
+ public async ValueTask<TResult> Execute<TResult>(Func<TResult> func,
CancellationToken cancellationToken)
{
while (true)
{
- try
- {
- action();
- return;
- }
- catch (Exception ex)
- {
- if (await Handle(ex, cancellationToken).ConfigureAwait(false))
- throw;
- }
-
- cancellationToken.ThrowIfCancellationRequested();
+ var (success, result) = await TryExecuteOnce(func,
cancellationToken);
+ if (success) return result!;
}
}
- public async ValueTask Execute(Func<Task> func, CancellationToken
cancellationToken)
+ public async ValueTask<TResult> Execute<TResult>(Func<Task<TResult>> func,
CancellationToken cancellationToken)
{
while (true)
{
- try
- {
- await func().ConfigureAwait(false);
- return;
- }
- catch (Exception ex)
- {
- if (await Handle(ex, cancellationToken).ConfigureAwait(false))
- throw;
- }
-
- cancellationToken.ThrowIfCancellationRequested();
+ var (success, result) = await TryExecuteOnce(func,
cancellationToken);
Review Comment:
.ConfigureAwait(false);
##########
src/DotPulsar/Internal/Abstractions/IProducerChannel.cs:
##########
@@ -22,6 +22,8 @@ namespace DotPulsar.Internal.Abstractions;
public interface IProducerChannel : IAsyncDisposable
{
- Task<CommandSendReceipt> Send(MessageMetadata metadata,
ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
+ // TODO: Why does one return ValueTask and the other Task?
Review Comment:
Good question. I would think it should just return ValueTask
##########
src/DotPulsar/Internal/Abstractions/IExecute.cs:
##########
@@ -31,4 +31,16 @@ public interface IExecute
ValueTask<TResult> Execute<TResult>(Func<Task<TResult>> func,
CancellationToken cancellationToken = default);
ValueTask<TResult> Execute<TResult>(Func<ValueTask<TResult>> func,
CancellationToken cancellationToken = default);
+
+ ValueTask<bool> TryExecuteOnce(Action action, CancellationToken
cancellationToken = default);
+
+ ValueTask<bool> TryExecuteOnce(Func<Task> func, CancellationToken
cancellationToken = default);
+
+ ValueTask<bool> TryExecuteOnce(Func<ValueTask> func, CancellationToken
cancellationToken = default);
+
+ ValueTask<(bool success, TResult? result)>
TryExecuteOnce<TResult>(Func<TResult> func, CancellationToken cancellationToken
= default);
Review Comment:
Maybe "out TResult result" as a parameter instead? and then only returning
bool?
##########
src/DotPulsar/Internal/SendChannel.cs:
##########
@@ -0,0 +1,35 @@
+namespace DotPulsar.Internal;
+
+using DotPulsar.Abstractions;
+using Exceptions;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+public class SendChannel<TMessage> : ISendChannel<TMessage>
+{
+ private readonly Producer<TMessage> _producer;
+ private int _isCompleted;
+
+ public SendChannel(Producer<TMessage> producer)
+ {
+ _producer = producer;
+ }
+
+ public async ValueTask Send(MessageMetadata metadata, TMessage message,
Func<MessageId, ValueTask>? onMessageSent = default, CancellationToken
cancellationToken = default)
+ {
+ if (_isCompleted != 0) throw new SendChannelCompletedException();
+ await _producer.Enqueue(metadata, message, onMessageSent,
cancellationToken).ConfigureAwait(false);
+ }
+
+ public void Complete()
+ {
+ _isCompleted = 1;
+ //Interlocked.Exchange(ref _isCompleted, 1);
Review Comment:
Remove if not needed :)
##########
src/DotPulsar/Internal/AsyncQueueWithCursor.cs:
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal;
+
+using Exceptions;
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+public sealed class AsyncQueueWithCursor<T> : IAsyncDisposable
+{
+ private readonly AsyncLock _pendingLock;
+ private readonly SemaphoreSlim _cursorSemaphore;
+ private readonly LinkedList<T> _queue;
+ private readonly IList<TaskCompletionSource<object>> _queueEmptyTcs;
+ private readonly uint _maxItems;
+ private IDisposable? _pendingLockGrant;
+ private LinkedListNode<T>? _currentNode;
+ private TaskCompletionSource<LinkedListNode<T>>? _cursorNextItemTcs;
+ private int _isDisposed;
+
+ public AsyncQueueWithCursor(uint maxItems)
+ {
+ _pendingLock = new AsyncLock();
+ _cursorSemaphore = new SemaphoreSlim(1, 1);
+ // TODO: Figure out if we can use
https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.taskcompletionsource?view=net-6.0
Review Comment:
Can we? :-)
##########
src/DotPulsar/Internal/SendOp.cs:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal;
+
+using PulsarApi;
+using System.Buffers;
+using System.Threading;
+using System.Threading.Tasks;
+
+public class SendOp
Review Comment:
sealed
##########
src/DotPulsar/Internal/AsyncQueueWithCursor.cs:
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal;
+
+using Exceptions;
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+public sealed class AsyncQueueWithCursor<T> : IAsyncDisposable
+{
+ private readonly AsyncLock _pendingLock;
+ private readonly SemaphoreSlim _cursorSemaphore;
+ private readonly LinkedList<T> _queue;
+ private readonly IList<TaskCompletionSource<object>> _queueEmptyTcs;
+ private readonly uint _maxItems;
+ private IDisposable? _pendingLockGrant;
+ private LinkedListNode<T>? _currentNode;
+ private TaskCompletionSource<LinkedListNode<T>>? _cursorNextItemTcs;
+ private int _isDisposed;
+
+ public AsyncQueueWithCursor(uint maxItems)
+ {
+ _pendingLock = new AsyncLock();
+ _cursorSemaphore = new SemaphoreSlim(1, 1);
+ // TODO: Figure out if we can use
https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.taskcompletionsource?view=net-6.0
+ _queueEmptyTcs = new List<TaskCompletionSource<object>>();
+ _queue = new LinkedList<T>();
+ _maxItems = maxItems;
+ }
+
+ /// <summary>
+ /// Enqueue item
+ /// </summary>
+ public async ValueTask Enqueue(T item, CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
+ try
+ {
+ var grant = await
_pendingLock.Lock(cancellationToken).ConfigureAwait(false);
+ lock (_pendingLock)
+ {
+ _pendingLockGrant = grant;
+ }
+ }
+ catch (Exception)
+ {
+ ReleasePendingLockGrant();
+ throw;
+ }
+
+ lock (_queue)
+ {
Review Comment:
Since we check the queue count I guess it can be full? If yes, then we would
exit without adding the item to the queue.
##########
src/DotPulsar/Internal/SendChannel.cs:
##########
@@ -0,0 +1,35 @@
+namespace DotPulsar.Internal;
Review Comment:
Need apache header
##########
src/DotPulsar/Internal/Exceptions/ProducerSendReceiptOrderingException.cs:
##########
@@ -0,0 +1,11 @@
+namespace DotPulsar.Internal.Exceptions;
Review Comment:
Needs apache header
##########
src/DotPulsar/Internal/Executor.cs:
##########
@@ -35,114 +35,147 @@ public Executor(Guid correlationId, IRegisterEvent
eventRegister, IHandleExcepti
}
public async ValueTask Execute(Action action, CancellationToken
cancellationToken)
+ {
+ while (!await TryExecuteOnce(action, cancellationToken)) { }
Review Comment:
.ConfigureAwait(false);
##########
src/DotPulsar/Internal/Producer.cs:
##########
@@ -259,37 +283,74 @@ public async ValueTask<MessageId> Send(MessageMetadata
metadata, TMessage messag
try
{
var partition = await ChoosePartitions(metadata,
cancellationToken).ConfigureAwait(false);
- var producer = _producers[partition];
+ var subProducer = _producers[partition];
var data = _options.Schema.Encode(message);
- var messageId = await producer.Send(metadata.Metadata, data,
cancellationToken).ConfigureAwait(false);
-
- if (startTimestamp != 0)
- DotPulsarMeter.MessageSent(startTimestamp, _meterTags);
+ var tcs = new TaskCompletionSource<MessageId>();
+ await subProducer.Send(new SendOp(metadata.Metadata, data, tcs,
sendOpCancelable ? cancellationToken : CancellationToken.None),
cancellationToken).ConfigureAwait(false);
- if (activity is not null && activity.IsAllDataRequested)
+ _ = tcs.Task.ContinueWith(async task =>
{
- activity.SetMessageId(messageId);
- activity.SetPayloadSize(data.Length);
- activity.SetStatus(ActivityStatusCode.Ok);
- }
+ if (startTimestamp != 0)
+ DotPulsarMeter.MessageSent(startTimestamp, _meterTags);
+
+ if (task.IsFaulted || task.IsCanceled)
+ {
+ FailActivity(task.IsCanceled ? new
OperationCanceledException() : task.Exception!, activity);
+
+ if (autoAssignSequenceId)
+ metadata.SequenceId = 0;
+ }
- return messageId;
+ CompleteActivity(task.Result, data.Length, activity);
+ try
+ {
+ if (onMessageSent is not null)
+ await
onMessageSent.Invoke(task.Result).ConfigureAwait(false);
Review Comment:
Consider creating a task for executing the user's callback. They might
highjack the task.
##########
src/DotPulsar/Internal/Executor.cs:
##########
@@ -35,114 +35,147 @@ public Executor(Guid correlationId, IRegisterEvent
eventRegister, IHandleExcepti
}
public async ValueTask Execute(Action action, CancellationToken
cancellationToken)
+ {
+ while (!await TryExecuteOnce(action, cancellationToken)) { }
+ }
+
+ public async ValueTask Execute(Func<Task> func, CancellationToken
cancellationToken)
+ {
+ while (!await TryExecuteOnce(func, cancellationToken)) { }
Review Comment:
.ConfigureAwait(false);
##########
src/DotPulsar/Internal/Executor.cs:
##########
@@ -35,114 +35,147 @@ public Executor(Guid correlationId, IRegisterEvent
eventRegister, IHandleExcepti
}
public async ValueTask Execute(Action action, CancellationToken
cancellationToken)
+ {
+ while (!await TryExecuteOnce(action, cancellationToken)) { }
+ }
+
+ public async ValueTask Execute(Func<Task> func, CancellationToken
cancellationToken)
+ {
+ while (!await TryExecuteOnce(func, cancellationToken)) { }
+ }
+
+ public async ValueTask Execute(Func<ValueTask> func, CancellationToken
cancellationToken)
+ {
+ while (!await TryExecuteOnce(func, cancellationToken)) { }
Review Comment:
.ConfigureAwait(false);
##########
src/DotPulsar/Internal/SubProducer.cs:
##########
@@ -80,30 +80,133 @@ public async ValueTask DisposeAsync()
return;
_eventRegister.Register(new ProducerDisposed(_correlationId));
+ _newChannelLock.Dispose();
+
+ try
+ {
+ _dispatcherCts?.Cancel();
+ _dispatcherCts?.Dispose();
+ await (_dispatcherTask ??
Task.CompletedTask).ConfigureAwait(false);
+ }
+ catch
+ {
+ // Ignored
+ }
+ await _sendQueue.DisposeAsync().ConfigureAwait(false);
await
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
await _channel.DisposeAsync().ConfigureAwait(false);
}
- public async ValueTask<MessageId> Send(MessageMetadata metadata, TMessage
message, CancellationToken cancellationToken)
- => await _executor.Execute(() => InternalSend(metadata.Metadata,
_schema.Encode(message), cancellationToken),
cancellationToken).ConfigureAwait(false);
+ public async ValueTask Send(SendOp sendOp, CancellationToken
cancellationToken)
+ {
+ if (IsFinalState()) throw new ProducerClosedException(); // TODO: This
exception might be intended for other purposes.
+ await _sendQueue.Enqueue(sendOp,
cancellationToken).ConfigureAwait(false);
+ }
+
+ public async ValueTask WaitForSendQueueEmpty(CancellationToken
cancellationToken)
+ {
+ await _sendQueue.WaitForEmpty(cancellationToken).ConfigureAwait(false);
+ }
+
+ private async Task MessageDispatcher(IProducerChannel channel,
CancellationToken cancellationToken)
+ {
+ var responseQueue = new AsyncQueue<Task<BaseCommand>>();
+ var responseProcessorTask = ResponseProcessor(responseQueue,
cancellationToken);
+
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ SendOp sendOp = await
_sendQueue.NextItem(cancellationToken).ConfigureAwait(false);
+
+ if (sendOp.CancellationToken.IsCancellationRequested)
+ {
+ _sendQueue.RemoveCurrentItem();
+ continue;
+ }
+
+ var tcs = new TaskCompletionSource<BaseCommand>();
+ _ = tcs.Task.ContinueWith(task => responseQueue.Enqueue(task),
+ TaskContinuationOptions.NotOnCanceled |
TaskContinuationOptions.ExecuteSynchronously);
- public async ValueTask<MessageId> Send(PulsarApi.MessageMetadata metadata,
ReadOnlySequence<byte> data, CancellationToken cancellationToken)
- => await _executor.Execute(() => InternalSend(metadata, data,
cancellationToken), cancellationToken).ConfigureAwait(false);
+ // Use CancellationToken.None here because otherwise it will throw
exceptions on all fault actions even retry.
+ bool success = await _executor.TryExecuteOnce(() =>
channel.Send(sendOp.Metadata, sendOp.Data, tcs, cancellationToken),
CancellationToken.None).ConfigureAwait(false);
- private async ValueTask<MessageId> InternalSend(PulsarApi.MessageMetadata
metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
+ if (success) continue;
+ _eventRegister.Register(new ChannelDisconnected(_correlationId));
+ break;
+ }
+
+ await responseProcessorTask.ConfigureAwait(false);
+ }
+
+ private async ValueTask ResponseProcessor(IDequeue<Task<BaseCommand>>
responseQueue, CancellationToken cancellationToken)
{
- var response = await _channel.Send(metadata, data,
cancellationToken).ConfigureAwait(false);
- return response.MessageId.ToMessageId();
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ var responseTask = await
responseQueue.Dequeue(cancellationToken).ConfigureAwait(false);
+
+ bool success = await _executor.TryExecuteOnce(() =>
+ {
+ if (responseTask.IsFaulted) throw responseTask.Exception!;
+ responseTask.Result.Expect(BaseCommand.Type.SendReceipt);
+ ProcessReceipt(responseTask.Result.SendReceipt);
+ }, CancellationToken.None).ConfigureAwait(false); // Use
CancellationToken.None here because otherwise it will throw exceptions on all
fault actions even retry.
+
+ // TODO: Should we crate a new event instead of channel
disconnected?
+ if (success) continue;
+ _eventRegister.Register(new ChannelDisconnected(_correlationId));
+ break;
+ }
}
- public async Task EstablishNewChannel(CancellationToken cancellationToken)
+ private void ProcessReceipt(CommandSendReceipt sendReceipt)
{
- var channel = await _executor.Execute(() =>
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
+ ulong receiptSequenceId = sendReceipt.SequenceId;
+
+ if (!_sendQueue.TryPeek(out SendOp? sendOp) || sendOp is null)
+ throw new ProducerSendReceiptOrderingException($"Received
sequenceId {receiptSequenceId} but send queue is empty");
+
+ ulong expectedSequenceId = sendOp.Metadata.SequenceId;
- var oldChannel = _channel;
- if (oldChannel is not null)
+ if (receiptSequenceId != expectedSequenceId)
+ throw new ProducerSendReceiptOrderingException($"Received
sequenceId {receiptSequenceId}. Expected {expectedSequenceId}");
+
+ _sendQueue.Dequeue();
+ sendOp.ReceiptTcs.TrySetResult(sendReceipt.MessageId.ToMessageId());
+ }
+
+ public async Task EstablishNewChannel(CancellationToken cancellationToken)
+ {
+ try
+ {
+ await
_newChannelLock.WaitAsync(cancellationToken).ConfigureAwait(false);
+
+ if (_dispatcherCts is not null &&
!_dispatcherCts.IsCancellationRequested)
+ {
+ _dispatcherCts.Cancel();
+ _dispatcherCts.Dispose();
+ }
+
+ await _executor.TryExecuteOnce(() => _dispatcherTask ??
Task.CompletedTask, cancellationToken).ConfigureAwait(false);
+
+ var oldChannel = _channel;
+ // TODO: Not sure we need to actually close the channel?
+ await
oldChannel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
+ // TODO: Why does IProducerChannel.DisposeAsync not do anything?
await oldChannel.DisposeAsync().ConfigureAwait(false);
- _channel = channel;
+ _channel = await _executor.Execute(() =>
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
+
+ _sendQueue.ResetCursor();
+ _dispatcherCts = new CancellationTokenSource();
+ _dispatcherTask = MessageDispatcher(_channel,
_dispatcherCts.Token);
+ }
+ catch (Exception)
+ {
+ // Ignored
Review Comment:
If something outside _executor.Execute fails, the channel with never be
established and the state of the producer will not change?
##########
src/DotPulsar/Internal/Executor.cs:
##########
@@ -35,114 +35,147 @@ public Executor(Guid correlationId, IRegisterEvent
eventRegister, IHandleExcepti
}
public async ValueTask Execute(Action action, CancellationToken
cancellationToken)
+ {
+ while (!await TryExecuteOnce(action, cancellationToken)) { }
+ }
+
+ public async ValueTask Execute(Func<Task> func, CancellationToken
cancellationToken)
+ {
+ while (!await TryExecuteOnce(func, cancellationToken)) { }
+ }
+
+ public async ValueTask Execute(Func<ValueTask> func, CancellationToken
cancellationToken)
+ {
+ while (!await TryExecuteOnce(func, cancellationToken)) { }
+ }
+
+ public async ValueTask<TResult> Execute<TResult>(Func<TResult> func,
CancellationToken cancellationToken)
{
while (true)
{
- try
- {
- action();
- return;
- }
- catch (Exception ex)
- {
- if (await Handle(ex, cancellationToken).ConfigureAwait(false))
- throw;
- }
-
- cancellationToken.ThrowIfCancellationRequested();
+ var (success, result) = await TryExecuteOnce(func,
cancellationToken);
+ if (success) return result!;
}
}
- public async ValueTask Execute(Func<Task> func, CancellationToken
cancellationToken)
+ public async ValueTask<TResult> Execute<TResult>(Func<Task<TResult>> func,
CancellationToken cancellationToken)
{
while (true)
{
- try
- {
- await func().ConfigureAwait(false);
- return;
- }
- catch (Exception ex)
- {
- if (await Handle(ex, cancellationToken).ConfigureAwait(false))
- throw;
- }
-
- cancellationToken.ThrowIfCancellationRequested();
+ var (success, result) = await TryExecuteOnce(func,
cancellationToken);
+ if (success) return result!;
}
}
- public async ValueTask Execute(Func<ValueTask> func, CancellationToken
cancellationToken)
+ public async ValueTask<TResult> Execute<TResult>(Func<ValueTask<TResult>>
func, CancellationToken cancellationToken)
{
while (true)
{
- try
- {
- await func().ConfigureAwait(false);
- return;
- }
- catch (Exception ex)
- {
- if (await Handle(ex, cancellationToken).ConfigureAwait(false))
- throw;
- }
-
- cancellationToken.ThrowIfCancellationRequested();
+ var (success, result) = await TryExecuteOnce(func,
cancellationToken);
Review Comment:
.ConfigureAwait(false);
##########
src/DotPulsar/Internal/SubProducer.cs:
##########
@@ -80,30 +80,133 @@ public async ValueTask DisposeAsync()
return;
_eventRegister.Register(new ProducerDisposed(_correlationId));
+ _newChannelLock.Dispose();
+
+ try
+ {
+ _dispatcherCts?.Cancel();
+ _dispatcherCts?.Dispose();
+ await (_dispatcherTask ??
Task.CompletedTask).ConfigureAwait(false);
+ }
+ catch
+ {
+ // Ignored
+ }
+ await _sendQueue.DisposeAsync().ConfigureAwait(false);
await
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
await _channel.DisposeAsync().ConfigureAwait(false);
}
- public async ValueTask<MessageId> Send(MessageMetadata metadata, TMessage
message, CancellationToken cancellationToken)
- => await _executor.Execute(() => InternalSend(metadata.Metadata,
_schema.Encode(message), cancellationToken),
cancellationToken).ConfigureAwait(false);
+ public async ValueTask Send(SendOp sendOp, CancellationToken
cancellationToken)
+ {
+ if (IsFinalState()) throw new ProducerClosedException(); // TODO: This
exception might be intended for other purposes.
+ await _sendQueue.Enqueue(sendOp,
cancellationToken).ConfigureAwait(false);
+ }
+
+ public async ValueTask WaitForSendQueueEmpty(CancellationToken
cancellationToken)
+ {
+ await _sendQueue.WaitForEmpty(cancellationToken).ConfigureAwait(false);
+ }
+
+ private async Task MessageDispatcher(IProducerChannel channel,
CancellationToken cancellationToken)
+ {
+ var responseQueue = new AsyncQueue<Task<BaseCommand>>();
+ var responseProcessorTask = ResponseProcessor(responseQueue,
cancellationToken);
+
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ SendOp sendOp = await
_sendQueue.NextItem(cancellationToken).ConfigureAwait(false);
+
+ if (sendOp.CancellationToken.IsCancellationRequested)
+ {
+ _sendQueue.RemoveCurrentItem();
+ continue;
+ }
+
+ var tcs = new TaskCompletionSource<BaseCommand>();
+ _ = tcs.Task.ContinueWith(task => responseQueue.Enqueue(task),
+ TaskContinuationOptions.NotOnCanceled |
TaskContinuationOptions.ExecuteSynchronously);
- public async ValueTask<MessageId> Send(PulsarApi.MessageMetadata metadata,
ReadOnlySequence<byte> data, CancellationToken cancellationToken)
- => await _executor.Execute(() => InternalSend(metadata, data,
cancellationToken), cancellationToken).ConfigureAwait(false);
+ // Use CancellationToken.None here because otherwise it will throw
exceptions on all fault actions even retry.
+ bool success = await _executor.TryExecuteOnce(() =>
channel.Send(sendOp.Metadata, sendOp.Data, tcs, cancellationToken),
CancellationToken.None).ConfigureAwait(false);
- private async ValueTask<MessageId> InternalSend(PulsarApi.MessageMetadata
metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
+ if (success) continue;
+ _eventRegister.Register(new ChannelDisconnected(_correlationId));
+ break;
+ }
+
+ await responseProcessorTask.ConfigureAwait(false);
+ }
+
+ private async ValueTask ResponseProcessor(IDequeue<Task<BaseCommand>>
responseQueue, CancellationToken cancellationToken)
{
- var response = await _channel.Send(metadata, data,
cancellationToken).ConfigureAwait(false);
- return response.MessageId.ToMessageId();
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ var responseTask = await
responseQueue.Dequeue(cancellationToken).ConfigureAwait(false);
+
+ bool success = await _executor.TryExecuteOnce(() =>
+ {
+ if (responseTask.IsFaulted) throw responseTask.Exception!;
+ responseTask.Result.Expect(BaseCommand.Type.SendReceipt);
+ ProcessReceipt(responseTask.Result.SendReceipt);
+ }, CancellationToken.None).ConfigureAwait(false); // Use
CancellationToken.None here because otherwise it will throw exceptions on all
fault actions even retry.
+
+ // TODO: Should we crate a new event instead of channel
disconnected?
+ if (success) continue;
+ _eventRegister.Register(new ChannelDisconnected(_correlationId));
+ break;
+ }
}
- public async Task EstablishNewChannel(CancellationToken cancellationToken)
+ private void ProcessReceipt(CommandSendReceipt sendReceipt)
{
- var channel = await _executor.Execute(() =>
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
+ ulong receiptSequenceId = sendReceipt.SequenceId;
+
+ if (!_sendQueue.TryPeek(out SendOp? sendOp) || sendOp is null)
+ throw new ProducerSendReceiptOrderingException($"Received
sequenceId {receiptSequenceId} but send queue is empty");
+
+ ulong expectedSequenceId = sendOp.Metadata.SequenceId;
- var oldChannel = _channel;
- if (oldChannel is not null)
+ if (receiptSequenceId != expectedSequenceId)
+ throw new ProducerSendReceiptOrderingException($"Received
sequenceId {receiptSequenceId}. Expected {expectedSequenceId}");
+
+ _sendQueue.Dequeue();
+ sendOp.ReceiptTcs.TrySetResult(sendReceipt.MessageId.ToMessageId());
+ }
+
+ public async Task EstablishNewChannel(CancellationToken cancellationToken)
+ {
+ try
+ {
+ await
_newChannelLock.WaitAsync(cancellationToken).ConfigureAwait(false);
+
+ if (_dispatcherCts is not null &&
!_dispatcherCts.IsCancellationRequested)
+ {
+ _dispatcherCts.Cancel();
+ _dispatcherCts.Dispose();
+ }
+
+ await _executor.TryExecuteOnce(() => _dispatcherTask ??
Task.CompletedTask, cancellationToken).ConfigureAwait(false);
+
+ var oldChannel = _channel;
+ // TODO: Not sure we need to actually close the channel?
Review Comment:
I see many TODO in the files. We need to talk about every one of them. Maybe
on Teams? :-)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]