poorbarcode commented on code in PR #20179:
URL: https://github.com/apache/pulsar/pull/20179#discussion_r1199308166


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -65,23 +67,22 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
     private final KeySharedMode keySharedMode;
 
     /**
-     * When a consumer joins, it will be added to this map with the current 
read position.
-     * This means that, in order to preserve ordering, new consumers can only 
receive old
-     * messages, until the mark-delete position will move past this point.
+     * When a consumer joins, it will be added to this map with the current 
last sent position per the message key.
+     * This means that, in order to preserve ordering per the message key, new 
consumers can only receive old
+     * messages, until the mark-delete position will move past this point in 
the key. New consumers can receive
+     * any new messages with the message key that is not in the last sent 
position.
      */
-    private final LinkedHashMap<Consumer, PositionImpl> 
recentlyJoinedConsumers;
-
-    private final Set<Consumer> stuckConsumers;
-    private final Set<Consumer> nextStuckConsumers;
+    private final LinkedHashMap<Consumer, LastSentPositions> 
recentlyJoinedConsumers;
+    // The lastSentPosition is not thread-safe
+    private final LastSentPositions lastSentPositions;
 
     PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor,

Review Comment:
   @equanz 
   
   > Could you explain this position in more detail, please? When and how to 
get (or calculate?) the position?
   
   It will be set in method `getRestrictedMaxEntriesForConsumer`, let me show a 
simpler code
   
   ```java
   /**
    * Calculate how many entries the consumer would consume.
    * @param theFirstEntryInCurrentReading This is not the first message the 
consumer will receive, but the first message read in this batch.
    */
   private synchronized int getRestrictedMaxEntriesForConsumer(Consumer 
consumer, List<Entry> entries, ReadType readType, PositionImpl 
theFirstEntryInCurrentReading){
   
      if (readType.equals(Normal) ){
          recentlyJoinedConsumers.get(consumer).getRight() == null;
          recentlyJoinedConsumers.get(consumer).getRight() = 
theFirstEntryInCurrentReading;
      }
   
      PositionImpl maxReadPosition = getMaxReadPosition(consumer);
       ...
   }
   ```
   
   The progress runs like this:
   
   | time | `process: add consumer` | `process: in-flight reading` |
   | --- | --- | --- |
   | 1 | read position is `1:6` |
   | 2 | | an in-flight reading which tries to read `1:6~1:10` |
   | 3 | Add the new consumer into recently joined consumers | 
   | 4 | Add new consumer into the selector |
   | 5 |  | Read entries `1:6 ~ 1:10` complete |
   | 6 |  | Set read position `1:11` |
   | 7 | Set `recentlyJoinedConsumers.consumer.first-position` to `1:11` |
   | 8 | | Try to send messages to consumers |
   | 9 | | Set `recentlyJoinedConsumers.consumer.second-position` to `1:6`|
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to