Chickenzilla commented on a change in pull request #23:
URL: https://github.com/apache/pulsar-dotpulsar/pull/23#discussion_r449695256
##########
File path: src/DotPulsar/Internal/ProducerChannel.cs
##########
@@ -90,18 +90,16 @@ public Task<CommandSendReceipt> Send(MessageMetadata
metadata, ReadOnlySequence<
if (autoAssignSequenceId)
{
- sendPackage.Command.SequenceId = _sequenceId.Current;
- sendPackage.Metadata.SequenceId = _sequenceId.Current;
+ var newSequenceId = _sequenceId.FetchNext();
+ sendPackage.Command.SequenceId = newSequenceId;
+ sendPackage.Metadata.SequenceId = newSequenceId;
}
else
sendPackage.Command.SequenceId =
sendPackage.Metadata.SequenceId;
var response = await _connection.Send(sendPackage,
cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.SendReceipt);
- if (autoAssignSequenceId)
- _sequenceId.Increment();
Review comment:
Having read a bit more of the Java producer, I believe I understand the
disconnect here: when we retry messages, they come from the Executor that sits
above this code, and so look to the channel like entirely new messages. If more
than one was outstanding, ordering (and deduplication) is no longer guaranteed
because the retransmits were all submitted asynchronously. When the Java
client resends messages, it has a list of outstanding messages (with their
original SequenceIds!) ready to retransmit.
I'd also like to point out that my concerns are performance related and not
merely threading related. It's easy to dismiss multithreaded production as not
able to care about ordering, but allow me to explain a toy use-case that
definitely would.
Let's say I want to send a set of telemetry over Pulsar. This way I can
have multiple consumers seeing an accurate, fast, real-time view of my
telemetry. This data is sampled perhaps 100 times per second, and there are
many fields, so sending the entire telemetry for each update is impossible for
my device's network connection. Therefore, I split the updates into two types:
full and differential. Full updates are easy enough: they go to a separate
(compacted?) topic, and every update is timestamped so a new consumer knows
what's latest and where to start applying updates from.
Differential updates are interesting because they come in very fast from a
single source, are small, and need to be maintained in the order they were
produced. My device has enough bandwidth to send these smaller updates, but
its latency to Pulsar is perhaps not the best (on the order of 100ms RTT). In
the current paradigm, I send one diff to pulsar, and then I twiddle my thumbs
for 200ms until I get the acknowledgement back from the broker. In this time
I've just completely missed 19 updates, and even if I collapse missed diffs
locally, my consumers now only get updates 5 times a second instead of 100.
In the correctly working design, I can continue to send updates while
awaiting the ACKs from the broker. If I lose my connection, it should be able
to resend those messages, with their original ordering, to the broker (as long
as I can buffer them in memory at least). I have strict ordering because at no
point did I race to make messages concurrently, they all happen exactly 10ms
apart, one at a time.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]