equanz commented on code in PR #21953:
URL: https://github.com/apache/pulsar/pull/21953#discussion_r1530064053
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -122,15 +136,18 @@ public synchronized CompletableFuture<Void>
addConsumer(Consumer consumer) {
})
).thenRun(() -> {
synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this)
{
- PositionImpl readPositionWhenJoining = (PositionImpl)
cursor.getReadPosition();
- consumer.setReadPositionWhenJoining(readPositionWhenJoining);
- // If this was the 1st consumer, or if all the messages are
already acked, then we
- // don't need to do anything special
- if (!allowOutOfOrderDelivery
- && recentlyJoinedConsumers != null
- && consumerList.size() > 1
- &&
cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
- recentlyJoinedConsumers.put(consumer,
readPositionWhenJoining);
+ if (!allowOutOfOrderDelivery) {
+ final PositionImpl lastSentPositionWhenJoining =
updateIfNeededAndGetLastSentPosition();
+ if (lastSentPositionWhenJoining != null) {
+
consumer.setLastSentPositionWhenJoining(lastSentPositionWhenJoining);
+ // If this was the 1st consumer, or if all the
messages are already acked, then we
+ // don't need to do anything special
+ if (recentlyJoinedConsumers != null
+ && consumerList.size() > 1
+ &&
cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
+ recentlyJoinedConsumers.put(consumer,
lastSentPositionWhenJoining);
Review Comment:
Hi @poorbarcode .
Comment on my consideration as follows.
### Proposed Approach
1. Calculate `cursor.individualDeletedMessages + consumer.pendingAcks +
...(other consumer's pendingAcks)... + dispatcher.redeliveryMessages` or
`cursor.individualDeletedMessages + consumer.pendingAcks + ...(other consumer's
pendingAcks)... - dispatcher.redeliveryMessages`.
- note: Which is your suggestion?
- Roughly,
- `+` means adding ranges.
- `-` means removing ranges.
2. If a lower bound of a first range is less than the markDeletePosition,
return an upper bound as the joined position. Otherwise, return the
markDeletePosition.
Am I correct above?
### Discussion of Proposed Approach
#### Ordering guarantees
`dispatcher.redeliveryMessages` has two types of messages. One is redelivery
messages, and the other is **skipped messages**. When the dispatcher skips
delivery, add messages to `dispatcher.redeliveryMessages` like below.
Therefore, we can't use `dispatcher.redeliveryMessages` as a set of
redelivery messages.
https://github.com/apache/pulsar/blob/8dc9a9b1b4cf960bb868fbea72317c20373d0d72/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L272
https://github.com/apache/pulsar/blob/8dc9a9b1b4cf960bb868fbea72317c20373d0d72/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L1202
So I thought, how about calculating `cursor.individualDeletedMessages +
consumer.pendingAcks + ...(other consumer's pendingAcks)...`?
I think most of the ordering guarantees are similar to the existing
approach. One difference about the guarantee is that it could be waiting for
the markDeletePosition to advance without delivering redeliver (which was
pendingAcks) messages. This means partially preserving the ordering regarding
redelivery messages. (I think it isn't necessary, but it's ok.)
#### Calculation cost and effect
It will be calculated on addConsumer synchronously. Both addConsumer and
trySendMessagesToConsumers are synchronized methods, so the calculation will
affect message delivery. Each time a consumer is added, the dispatcher must get
all consumers' pendingAcks and calculate in real-time.
On the other hand, the existing approach will also affect message delivery.
My concern is to get all consumers' pendingAcks each time. (However, it is
not simple to compare because the place and the number of variables to be
calculated are different.)
#### Code complexity
> Then the new variable individuallySentPositions can be removed, and we do
not need to update it in-time, which makes the logic simpler
I agree with your comment. We can remove most of the production and test
codes from changes.
### Conclusion
* Ordering guarantees: Same as existing.
* Calculation cost and effect: I have a concern. (However, it is not simple
to compare.)
* Code complexity: It will be simpler than existing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]