This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 75ce0220450715a83f50cfa90acb539db71d8b0a
Author: JiangHaiting <jianghait...@apache.org>
AuthorDate: Tue Nov 30 20:34:45 2021 +0800

    Fix Issue #12885, Unordered consuming case in Key_Shared subscription 
(#12890)
    
    (cherry picked from commit 73ef1621ab0bbecfcb2325453a4d93a406fcba3c)
---
 .../PersistentDispatcherMultipleConsumers.java     |  3 +++
 ...istentStickyKeyDispatcherMultipleConsumers.java | 31 ++++++++++++++++++++++
 2 files changed, 34 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 907d180..7ec6f4d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -82,6 +82,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
 
     protected volatile boolean havePendingRead = false;
     protected volatile boolean havePendingReplayRead = false;
+    protected volatile PositionImpl minReplayedPosition = null;
     protected boolean shouldRewindBeforeReadingOrReplaying = false;
     protected final String name;
 
@@ -243,6 +244,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                 }
 
                 havePendingReplayRead = true;
+                minReplayedPosition = 
messagesToReplayNow.stream().min(PositionImpl::compareTo).orElse(null);
                 Set<? extends Position> deletedMessages = 
topic.isDelayedDeliveryEnabled()
                         ? asyncReplayEntriesInOrder(messagesToReplayNow) : 
asyncReplayEntries(messagesToReplayNow);
                 // clear already acked positions from replay bucket
@@ -266,6 +268,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                             consumerList.size());
                 }
                 havePendingRead = true;
+                minReplayedPosition = 
getMessagesToReplayNow(1).stream().findFirst().orElse(null);
                 cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, 
this,
                         ReadType.Normal, topic.getMaxReadPosition());
             } else {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index f3bcbf2..420795c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -169,6 +169,37 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             return;
         }
 
+        // A corner case that we have to retry a readMoreEntries in order to 
preserver order delivery.
+        // This may happen when consumer closed. See issue #12885 for details.
+        if (!allowOutOfOrderDelivery) {
+            Set<PositionImpl> messagesToReplayNow = 
this.getMessagesToReplayNow(1);
+            if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty() 
&& this.minReplayedPosition != null) {
+                PositionImpl relayPosition = 
messagesToReplayNow.stream().findFirst().get();
+                // If relayPosition is a new entry wither smaller position is 
inserted for redelivery during this async
+                // read, it is possible that this relayPosition should 
dispatch to consumer first. So in order to
+                // preserver order delivery, we need to discard this read 
result, and try to trigger a replay read,
+                // that containing "relayPosition", by calling readMoreEntries.
+                if (relayPosition.compareTo(minReplayedPosition) < 0) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Position {} (<{}) is inserted for 
relay during current {} read, discard this "
+                                + "read and retry with readMoreEntries.",
+                                name, relayPosition, minReplayedPosition, 
readType);
+                    }
+                    if (readType == ReadType.Normal) {
+                        entries.forEach(entry -> {
+                            long stickyKeyHash = getStickyKeyHash(entry);
+                            addMessageToReplay(entry.getLedgerId(), 
entry.getEntryId(), stickyKeyHash);
+                            entry.release();
+                        });
+                    } else if (readType == ReadType.Replay) {
+                        entries.forEach(Entry::release);
+                    }
+                    readMoreEntries();
+                    return;
+                }
+            }
+        }
+
         nextStuckConsumers.clear();
 
         final Map<Consumer, List<Entry>> groupedEntries = 
localGroupedEntries.get();

Reply via email to