poorbarcode commented on code in PR #20179: URL: https://github.com/apache/pulsar/pull/20179#discussion_r1198087967
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java: ########## @@ -65,23 +67,22 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi private final KeySharedMode keySharedMode; /** - * When a consumer joins, it will be added to this map with the current read position. - * This means that, in order to preserve ordering, new consumers can only receive old - * messages, until the mark-delete position will move past this point. + * When a consumer joins, it will be added to this map with the current last sent position per the message key. + * This means that, in order to preserve ordering per the message key, new consumers can only receive old + * messages, until the mark-delete position will move past this point in the key. New consumers can receive + * any new messages with the message key that is not in the last sent position. */ - private final LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers; - - private final Set<Consumer> stuckConsumers; - private final Set<Consumer> nextStuckConsumers; + private final LinkedHashMap<Consumer, LastSentPositions> recentlyJoinedConsumers; + // The lastSentPosition is not thread-safe + private final LastSentPositions lastSentPositions; PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Review Comment: Hi @equanz Sorry, I've been so busy lately. ### Menu - Explain the issue. - Suggestion a simpler solution instead of the current one --- ### Explain the issue There have two scenarios that will make consumption out of order, which this PR tries to fix. #### Scenario-1 | time | `process: add consumer` | `process: delivery messages to client` | | --- | --- | --- | | 1 | | Read entries `1:6 ~ 1:10` complete | | 2 | Add new consumer into the selector | | 3 | | Choose consumer by the selector | | 4 | Add the new consumer into recently joined consumers | | 5 | | Delivery entries `1:6 ~ 1:10` to all consumers(includes old consumers and the new consumer) | #### Scenario-2 | time | `process: add consumer` | `process: delivery messages to client` | | --- | --- | --- | | 1 | read position is `1:6` | | 2 | Add new consumer into the selector | | 3 | | Read entries `1:6 ~ 1:10` complete | | 4 | | Set read position `1:11` | | 5 | Add the new consumer into recently joined consumers | | 6 | <strong>(Highlight)</strong>The max read-position of the new consumer is `1:11`, but the exact correct value is `1:6` | | 7 | | Choose consumer by the selector | | 8 | | Delivery entries `1:6 ~ 1:10` to all consumers(includes old consumers and the new consumer) | --- ### A simpler solution First of all, this is a great catch, and the current patch can solve the issues above. But this patch makes an already complicated mechanism even more complicated, and I'd like to suggest a simple solution: Change the constructor of `recentlyJoinedConsumers` richer, let me show a simple code: ```java /** * There have two positions in the value: * The first position: the read position of the cursor when the consumer joins. * The second position: the first entry in normal reading after the consumer joined. */ protected Map<Consumer, Pair<PositionImpl,PositionImpl> recentlyJoinedConsumers; /** * Get max read the position of the consumer. */ protected PositionImpl getMaxReadPosition(Consumer consumer) { Pair<PositionImpl,PositionImpl> pair = recentlyJoinedConsumers.get(consumer); return Math.min(pair.getKey(), pair.getValue()); } /** * Register consumer. */ public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) { addConsumerIntoRecentJoinQueue(); setReadPositionWhenConsumerJoined(); // do other things. } /** * Calculate how many entries the consumer would consume. */ private synchronized int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> entries, ReadType readType){ setFirstReadPositionOfRecentJoinedConsumers(entries.get(0)); PositionImpl maxReadPosition = getMaxReadPosition(consumer); ... } ``` What do you think of this solution? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org