equanz commented on code in PR #21953:
URL: https://github.com/apache/pulsar/pull/21953#discussion_r1530064053


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -122,15 +136,18 @@ public synchronized CompletableFuture<Void> 
addConsumer(Consumer consumer) {
                 })
         ).thenRun(() -> {
             synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) 
{
-                PositionImpl readPositionWhenJoining = (PositionImpl) 
cursor.getReadPosition();
-                consumer.setReadPositionWhenJoining(readPositionWhenJoining);
-                // If this was the 1st consumer, or if all the messages are 
already acked, then we
-                // don't need to do anything special
-                if (!allowOutOfOrderDelivery
-                        && recentlyJoinedConsumers != null
-                        && consumerList.size() > 1
-                        && 
cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
-                    recentlyJoinedConsumers.put(consumer, 
readPositionWhenJoining);
+                if (!allowOutOfOrderDelivery) {
+                    final PositionImpl lastSentPositionWhenJoining = 
updateIfNeededAndGetLastSentPosition();
+                    if (lastSentPositionWhenJoining != null) {
+                        
consumer.setLastSentPositionWhenJoining(lastSentPositionWhenJoining);
+                        // If this was the 1st consumer, or if all the 
messages are already acked, then we
+                        // don't need to do anything special
+                        if (recentlyJoinedConsumers != null
+                                && consumerList.size() > 1
+                                && 
cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
+                            recentlyJoinedConsumers.put(consumer, 
lastSentPositionWhenJoining);

Review Comment:
   Hi @poorbarcode .
   Comment on my consideration as follows.
   
   ### Proposed Approach
   1. Calculate `cursor.individualDeletedMessages + consumer.pendingAcks + 
...(other consumer's pendingAcks)... + dispatcher.redeliveryMessages` or 
`cursor.individualDeletedMessages + consumer.pendingAcks + ...(other consumer's 
pendingAcks)... - dispatcher.redeliveryMessages`.
       - note: Which is your suggestion?
       - Roughly,
           - `+` means adding ranges.
           - `-` means removing ranges.
   2. If a lower bound of a first range is less than the markDeletePosition, 
return an upper bound as the joined position. Otherwise, return the 
markDeletePosition.
   
   Am I correct above?
   
   ### Discussion of Proposed Approach
   #### Ordering guarantees
   `dispatcher.redeliveryMessages` has two types of messages. One is redelivery 
messages, and the other is **skipped messages**. When the dispatcher skips 
delivery, add messages to `dispatcher.redeliveryMessages` like below.
   Therefore, we can't use `dispatcher.redeliveryMessages` as a set of 
redelivery messages.
   
https://github.com/apache/pulsar/blob/8dc9a9b1b4cf960bb868fbea72317c20373d0d72/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L272
   
https://github.com/apache/pulsar/blob/8dc9a9b1b4cf960bb868fbea72317c20373d0d72/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L1202
   
   So I thought, how about calculating `cursor.individualDeletedMessages + 
consumer.pendingAcks + ...(other consumer's pendingAcks)...`?
   I think most of the ordering guarantees are similar to the existing 
approach. One difference about the guarantee is that it could be waiting for 
the markDeletePosition to advance without delivering redeliver (which was 
pendingAcks) messages. This means partially preserving the ordering regarding 
redelivery messages. (I think it isn't necessary, but it's ok.)
   
   #### Calculation cost and effect
   It will be calculated on addConsumer synchronously. Both addConsumer and 
trySendMessagesToConsumers are synchronized methods, so the calculation will 
affect message delivery. Each time a consumer is added, the dispatcher must get 
all consumers' pendingAcks and calculate in real-time.
   On the other hand, the existing approach will also affect message delivery.
   
   My concern is to get all consumers' pendingAcks each time. (However, it is 
not simple to compare because the place and the number of variables to be 
calculated are different.)
   
   #### Code complexity
   > Then the new variable individuallySentPositions  can be removed, and we do 
not need to update it in-time, which makes the logic simpler
   
   I agree with your comment. We can remove most of the production and test 
codes from changes.
   
   ### Conclusion
   * Ordering guarantees: Same as existing.
   * Calculation cost and effect: I have a concern. (However, it is not simple 
to compare.)
   * Code complexity: It will be simpler than existing.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to