poorbarcode commented on code in PR #20179:
URL: https://github.com/apache/pulsar/pull/20179#discussion_r1198087967


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -65,23 +67,22 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
     private final KeySharedMode keySharedMode;
 
     /**
-     * When a consumer joins, it will be added to this map with the current 
read position.
-     * This means that, in order to preserve ordering, new consumers can only 
receive old
-     * messages, until the mark-delete position will move past this point.
+     * When a consumer joins, it will be added to this map with the current 
last sent position per the message key.
+     * This means that, in order to preserve ordering per the message key, new 
consumers can only receive old
+     * messages, until the mark-delete position will move past this point in 
the key. New consumers can receive
+     * any new messages with the message key that is not in the last sent 
position.
      */
-    private final LinkedHashMap<Consumer, PositionImpl> 
recentlyJoinedConsumers;
-
-    private final Set<Consumer> stuckConsumers;
-    private final Set<Consumer> nextStuckConsumers;
+    private final LinkedHashMap<Consumer, LastSentPositions> 
recentlyJoinedConsumers;
+    // The lastSentPosition is not thread-safe
+    private final LastSentPositions lastSentPositions;
 
     PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor,

Review Comment:
   Hi @equanz 
   
   Sorry, I've been so busy lately.
   
   ### Menu
   - Explain the issue.
   - Suggestion a simpler solution instead of the current one
   
   --- 
   
   ### Explain the issue
   
   There have two scenarios that will make consumption out of order, which this 
PR tries to fix.
   
   #### Scenario-1
   
   | time | `process: add consumer` | `process: delivery messages to client` |
   | --- | --- | --- |
   | 1 |  | Read entries `1:6 ~ 1:10` complete |
   | 2 | Add new consumer into the selector |
   | 3 | | Choose consumer by the selector |
   | 4 | Add the new consumer into recently joined consumers | 
   | 5 |  | Delivery entries `1:6 ~ 1:10` to all consumers(includes old 
consumers and the new consumer) |
   
   #### Scenario-2
   
   | time | `process: add consumer` | `process: delivery messages to client` |
   | --- | --- | --- |
   | 1 | read position is `1:6` |
   | 2 | Add new consumer into the selector |
   | 3 |  | Read entries `1:6 ~ 1:10` complete |
   | 4 |  | Set read position `1:11` |
   | 5 | Add the new consumer into recently joined consumers | 
   | 6 | <strong>(Highlight)</strong>The max read-position of the new consumer 
is `1:11`, but the exact correct value is `1:6` | 
   | 7 | | Choose consumer by the selector |
   | 8 |  | Delivery entries `1:6 ~ 1:10` to all consumers(includes old 
consumers and the new consumer) |
   
   --- 
   
   ### A simpler solution
   
   First of all, this is a great catch, and the current patch can solve the 
issues above.
   
   But this patch makes an already complicated mechanism even more complicated, 
and I'd like to suggest a simple solution:  Change the constructor of 
`recentlyJoinedConsumers` richer, let me show a simple code:
   
   ```java
    /**
    * There have two positions in the value:
    * The first position:  the read position of the cursor when the consumer 
joins.
    * The second position: the first entry in normal reading after the consumer 
joined.
    */
   protected Map<Consumer, Pair<PositionImpl,PositionImpl> 
recentlyJoinedConsumers;
   
   /**
    * Get max read the position of the consumer.
    */
   protected PositionImpl getMaxReadPosition(Consumer consumer) {
       Pair<PositionImpl,PositionImpl> pair = 
recentlyJoinedConsumers.get(consumer);
       return Math.min(pair.getKey(), pair.getValue());
   }
   
   /**
    * Register consumer.
    */
   public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) {
       addConsumerIntoRecentJoinQueue();
       setReadPositionWhenConsumerJoined();
       // do other things.
   }
   
   /**
    * Calculate how many entries the consumer would consume.
    */
   private synchronized int getRestrictedMaxEntriesForConsumer(Consumer 
consumer, List<Entry> entries, ReadType readType){
      setFirstReadPositionOfRecentJoinedConsumers(entries.get(0));
   
      PositionImpl maxReadPosition = getMaxReadPosition(consumer);
       ...
   }
   ```
   
   What do you think of this solution?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to