This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 75ce0220450715a83f50cfa90acb539db71d8b0a Author: JiangHaiting <jianghait...@apache.org> AuthorDate: Tue Nov 30 20:34:45 2021 +0800 Fix Issue #12885, Unordered consuming case in Key_Shared subscription (#12890) (cherry picked from commit 73ef1621ab0bbecfcb2325453a4d93a406fcba3c) --- .../PersistentDispatcherMultipleConsumers.java | 3 +++ ...istentStickyKeyDispatcherMultipleConsumers.java | 31 ++++++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 907d180..7ec6f4d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -82,6 +82,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul protected volatile boolean havePendingRead = false; protected volatile boolean havePendingReplayRead = false; + protected volatile PositionImpl minReplayedPosition = null; protected boolean shouldRewindBeforeReadingOrReplaying = false; protected final String name; @@ -243,6 +244,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } havePendingReplayRead = true; + minReplayedPosition = messagesToReplayNow.stream().min(PositionImpl::compareTo).orElse(null); Set<? extends Position> deletedMessages = topic.isDelayedDeliveryEnabled() ? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow); // clear already acked positions from replay bucket @@ -266,6 +268,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul consumerList.size()); } havePendingRead = true; + minReplayedPosition = getMessagesToReplayNow(1).stream().findFirst().orElse(null); cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this, ReadType.Normal, topic.getMaxReadPosition()); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index f3bcbf2..420795c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -169,6 +169,37 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi return; } + // A corner case that we have to retry a readMoreEntries in order to preserver order delivery. + // This may happen when consumer closed. See issue #12885 for details. + if (!allowOutOfOrderDelivery) { + Set<PositionImpl> messagesToReplayNow = this.getMessagesToReplayNow(1); + if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty() && this.minReplayedPosition != null) { + PositionImpl relayPosition = messagesToReplayNow.stream().findFirst().get(); + // If relayPosition is a new entry wither smaller position is inserted for redelivery during this async + // read, it is possible that this relayPosition should dispatch to consumer first. So in order to + // preserver order delivery, we need to discard this read result, and try to trigger a replay read, + // that containing "relayPosition", by calling readMoreEntries. + if (relayPosition.compareTo(minReplayedPosition) < 0) { + if (log.isDebugEnabled()) { + log.debug("[{}] Position {} (<{}) is inserted for relay during current {} read, discard this " + + "read and retry with readMoreEntries.", + name, relayPosition, minReplayedPosition, readType); + } + if (readType == ReadType.Normal) { + entries.forEach(entry -> { + long stickyKeyHash = getStickyKeyHash(entry); + addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash); + entry.release(); + }); + } else if (readType == ReadType.Replay) { + entries.forEach(Entry::release); + } + readMoreEntries(); + return; + } + } + } + nextStuckConsumers.clear(); final Map<Consumer, List<Entry>> groupedEntries = localGroupedEntries.get();