This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 726de461130 [improve] [broker] Avoid repeated Read-and-discard when
using Key_Shared mode (#22245)
726de461130 is described below
commit 726de4611308c0cec4a5691d31bab4941525f00b
Author: fengyubiao <[email protected]>
AuthorDate: Fri Mar 29 12:06:26 2024 +0800
[improve] [broker] Avoid repeated Read-and-discard when using Key_Shared
mode (#22245)
(cherry picked from commit e34ea626a65da4c8e1578010f857aa961a7b5c55)
---
.../persistent/MessageRedeliveryController.java | 8 +
.../PersistentDispatcherMultipleConsumers.java | 47 +++-
...istentStickyKeyDispatcherMultipleConsumers.java | 104 +++++++-
.../client/api/KeySharedSubscriptionTest.java | 266 +++++++++++++++++++++
.../pulsar/client/api/ProducerConsumerBase.java | 66 +++++
5 files changed, 470 insertions(+), 21 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
index 5bf3f5506fa..63803177242 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
@@ -95,6 +95,14 @@ public class MessageRedeliveryController {
}
}
+ public Long getHash(long ledgerId, long entryId) {
+ LongPair value = hashesToBeBlocked.get(ledgerId, entryId);
+ if (value == null) {
+ return null;
+ }
+ return value.first;
+ }
+
public void removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId)
{
if (!allowOutOfOrderDelivery) {
List<LongPair> keysToRemove = new ArrayList<>();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 6e4b37456b6..f6f63787a86 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -314,24 +314,25 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
}
NavigableSet<PositionImpl> messagesToReplayNow =
getMessagesToReplayNow(messagesToRead);
-
- if (!messagesToReplayNow.isEmpty()) {
+ NavigableSet<PositionImpl> messagesToReplayFiltered =
filterOutEntriesWillBeDiscarded(messagesToReplayNow);
+ if (!messagesToReplayFiltered.isEmpty()) {
if (log.isDebugEnabled()) {
- log.debug("[{}] Schedule replay of {} messages for {}
consumers", name, messagesToReplayNow.size(),
- consumerList.size());
+ log.debug("[{}] Schedule replay of {} messages for {}
consumers", name,
+ messagesToReplayFiltered.size(),
consumerList.size());
}
havePendingReplayRead = true;
minReplayedPosition = messagesToReplayNow.first();
Set<? extends Position> deletedMessages =
topic.isDelayedDeliveryEnabled()
- ? asyncReplayEntriesInOrder(messagesToReplayNow) :
asyncReplayEntries(messagesToReplayNow);
+ ? asyncReplayEntriesInOrder(messagesToReplayFiltered)
+ : asyncReplayEntries(messagesToReplayFiltered);
// clear already acked positions from replay bucket
deletedMessages.forEach(position ->
redeliveryMessages.remove(((PositionImpl) position).getLedgerId(),
((PositionImpl) position).getEntryId()));
// if all the entries are acked-entries and cleared up from
redeliveryMessages, try to read
// next entries as readCompletedEntries-callback was never
called
- if ((messagesToReplayNow.size() - deletedMessages.size()) ==
0) {
+ if ((messagesToReplayFiltered.size() - deletedMessages.size())
== 0) {
havePendingReplayRead = false;
readMoreEntriesAsync();
}
@@ -340,7 +341,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
log.debug("[{}] Dispatcher read is blocked due to
unackMessages {} reached to max {}", name,
totalUnackedMessages,
topic.getMaxUnackedMessagesOnSubscription());
}
- } else if (!havePendingRead) {
+ } else if (!havePendingRead && hasConsumersNeededNormalRead()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule read of {} messages for {}
consumers", name, messagesToRead,
consumerList.size());
@@ -369,7 +370,16 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
topic.getMaxReadPosition());
}
} else {
- log.debug("[{}] Cannot schedule next read until previous one
is done", name);
+ if (log.isDebugEnabled()) {
+ if (!messagesToReplayNow.isEmpty()) {
+ log.debug("[{}] [{}] Skipping read for the topic:
because all entries in replay queue were"
+ + " filtered out due to the mechanism of
Key_Shared mode, and the left consumers have"
+ + " no permits now",
+ topic.getName(), getSubscriptionName());
+ } else {
+ log.debug("[{}] Cannot schedule next read until
previous one is done", name);
+ }
+ }
}
} else {
if (log.isDebugEnabled()) {
@@ -1100,6 +1110,27 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
}
}
+ /**
+ * This is a mode method designed for Key_Shared mode.
+ * Filter out the entries that will be discarded due to the order
guarantee mechanism of Key_Shared mode.
+ * This method is in order to avoid the scenario below:
+ * - Get positions from the Replay queue.
+ * - Read entries from BK.
+ * - The order guarantee mechanism of Key_Shared mode filtered out all the
entries.
+ * - Delivery non entry to the client, but we did a BK read.
+ */
+ protected NavigableSet<PositionImpl>
filterOutEntriesWillBeDiscarded(NavigableSet<PositionImpl> src) {
+ return src;
+ }
+
+ /**
+ * This is a mode method designed for Key_Shared mode, to avoid
unnecessary stuck.
+ * See detail {@link
PersistentStickyKeyDispatcherMultipleConsumers#hasConsumersNeededNormalRead}.
+ */
+ protected boolean hasConsumersNeededNormalRead() {
+ return true;
+ }
+
protected synchronized boolean shouldPauseDeliveryForDelayTracker() {
return delayedDeliveryTracker.isPresent() &&
delayedDeliveryTracker.get().shouldPauseAllDeliveries();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 8f05530f58b..ee2ebd7ca86 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -30,13 +30,16 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.MapUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
import
org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
@@ -165,6 +168,14 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
}
};
+ private static final FastThreadLocal<Map<Consumer, List<PositionImpl>>>
localGroupedPositions =
+ new FastThreadLocal<Map<Consumer, List<PositionImpl>>>() {
+ @Override
+ protected Map<Consumer, List<PositionImpl>> initialValue()
throws Exception {
+ return new HashMap<>();
+ }
+ };
+
@Override
protected synchronized boolean trySendMessagesToConsumers(ReadType
readType, List<Entry> entries) {
long totalMessagesSent = 0;
@@ -248,15 +259,9 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
assert consumer != null; // checked when added to groupedEntries
List<Entry> entriesWithSameKey = current.getValue();
int entriesWithSameKeyCount = entriesWithSameKey.size();
- int availablePermits = Math.max(consumer.getAvailablePermits(), 0);
- if (consumer.getMaxUnackedMessages() > 0) {
- int remainUnAckedMessages =
- // Avoid negative number
- Math.max(consumer.getMaxUnackedMessages() -
consumer.getUnackedMessages(), 0);
- availablePermits = Math.min(availablePermits,
remainUnAckedMessages);
- }
- int maxMessagesForC = Math.min(entriesWithSameKeyCount,
availablePermits);
- int messagesForC = getRestrictedMaxEntriesForConsumer(consumer,
entriesWithSameKey, maxMessagesForC,
+ int availablePermits = getAvailablePermits(consumer);
+ int messagesForC = getRestrictedMaxEntriesForConsumer(consumer,
+
entriesWithSameKey.stream().map(Entry::getPosition).collect(Collectors.toList()),
availablePermits,
readType, consumerStickyKeyHashesMap.get(consumer));
if (log.isDebugEnabled()) {
log.debug("[{}] select consumer {} with messages num {}, read
type is {}",
@@ -289,7 +294,6 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
EntryBatchIndexesAcks batchIndexesAcks =
EntryBatchIndexesAcks.get(messagesForC);
totalEntries += filterEntriesForConsumer(entriesWithSameKey,
batchSizes, sendMessageInfo,
batchIndexesAcks, cursor, readType == ReadType.Replay,
consumer);
-
consumer.sendMessages(entriesWithSameKey, batchSizes,
batchIndexesAcks,
sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(),
sendMessageInfo.getTotalChunkedMessages(),
@@ -332,8 +336,9 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
return false;
}
- private int getRestrictedMaxEntriesForConsumer(Consumer consumer,
List<Entry> entries, int maxMessages,
- ReadType readType, Set<Integer> stickyKeyHashes) {
+ private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<?
extends Position> entries,
+ int availablePermits, ReadType readType, Set<Integer>
stickyKeyHashes) {
+ int maxMessages = Math.min(entries.size(), availablePermits);
if (maxMessages == 0) {
return 0;
}
@@ -378,7 +383,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
// Here, the consumer is one that has recently joined, so we can only
send messages that were
// published before it has joined.
for (int i = 0; i < maxMessages; i++) {
- if (((PositionImpl)
entries.get(i).getPosition()).compareTo(maxReadPosition) >= 0) {
+ if (((PositionImpl) entries.get(i)).compareTo(maxReadPosition) >=
0) {
// We have already crossed the divider line. All messages in
the list are now
// newer than what we can currently dispatch to this consumer
return i;
@@ -405,6 +410,9 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
}
private boolean removeConsumersFromRecentJoinedConsumers() {
+ if (MapUtils.isEmpty(recentlyJoinedConsumers)) {
+ return false;
+ }
Iterator<Map.Entry<Consumer, PositionImpl>> itr =
recentlyJoinedConsumers.entrySet().iterator();
boolean hasConsumerRemovedFromTheRecentJoinedConsumers = false;
PositionImpl mdp = (PositionImpl) cursor.getMarkDeletedPosition();
@@ -437,6 +445,76 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
}
}
+ private int getAvailablePermits(Consumer c) {
+ int availablePermits = Math.max(c.getAvailablePermits(), 0);
+ if (c.getMaxUnackedMessages() > 0) {
+ // Avoid negative number
+ int remainUnAckedMessages = Math.max(c.getMaxUnackedMessages() -
c.getUnackedMessages(), 0);
+ availablePermits = Math.min(availablePermits,
remainUnAckedMessages);
+ }
+ return availablePermits;
+ }
+
+ @Override
+ protected synchronized NavigableSet<PositionImpl>
filterOutEntriesWillBeDiscarded(NavigableSet<PositionImpl> src) {
+ if (src.isEmpty()) {
+ return src;
+ }
+ NavigableSet<PositionImpl> res = new TreeSet<>();
+ // Group positions.
+ final Map<Consumer, List<PositionImpl>> groupedPositions =
localGroupedPositions.get();
+ groupedPositions.clear();
+ for (PositionImpl pos : src) {
+ Long stickyKeyHash = redeliveryMessages.getHash(pos.getLedgerId(),
pos.getEntryId());
+ if (stickyKeyHash == null) {
+ res.add(pos);
+ continue;
+ }
+ Consumer c = selector.select(stickyKeyHash.intValue());
+ if (c == null) {
+ // Maybe using HashRangeExclusiveStickyKeyConsumerSelector.
+ continue;
+ }
+ groupedPositions.computeIfAbsent(c, k -> new
ArrayList<>()).add(pos);
+ }
+ // Filter positions by the Recently Joined Position rule.
+ for (Map.Entry<Consumer, List<PositionImpl>> item :
groupedPositions.entrySet()) {
+ int availablePermits = getAvailablePermits(item.getKey());
+ if (availablePermits == 0) {
+ continue;
+ }
+ int posCountToRead =
getRestrictedMaxEntriesForConsumer(item.getKey(), item.getValue(),
availablePermits,
+ ReadType.Replay, null);
+ if (posCountToRead > 0) {
+ res.addAll(item.getValue().subList(0, posCountToRead));
+ }
+ }
+ return res;
+ }
+
+ /**
+ * In Key_Shared mode, the consumer will not receive any entries from a
normal reading if it is included in
+ * {@link #recentlyJoinedConsumers}, they can only receive entries from
replay reads.
+ * If all entries in {@link #redeliveryMessages} have been filtered out
due to the order guarantee mechanism,
+ * Broker need a normal read to make the consumers not included in @link
#recentlyJoinedConsumers} will not be
+ * stuck. See https://github.com/apache/pulsar/pull/7105.
+ */
+ @Override
+ protected boolean hasConsumersNeededNormalRead() {
+ for (Consumer consumer : consumerList) {
+ if (consumer == null || consumer.isBlocked()) {
+ continue;
+ }
+ if (recentlyJoinedConsumers.containsKey(consumer)) {
+ continue;
+ }
+ if (consumer.getAvailablePermits() > 0) {
+ return true;
+ }
+ }
+ return false;
+ }
+
@Override
public SubType getType() {
return SubType.Key_Shared;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 18fb141be31..72195550508 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -38,6 +38,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
+import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListSet;
@@ -48,12 +49,17 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.Topic;
import
org.apache.pulsar.broker.service.nonpersistent.NonPersistentStickyKeyDispatcherMultipleConsumers;
import
org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.naming.TopicDomain;
@@ -61,6 +67,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.awaitility.Awaitility;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -1630,4 +1637,263 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
log.info("Got {} other messages...", sum);
Assert.assertEquals(sum, delayedMessages + messages);
}
+
+ private AtomicInteger injectReplayReadCounter(String topicName, String
cursorName) throws Exception {
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
+ ManagedCursorImpl cursor = (ManagedCursorImpl)
managedLedger.openCursor(cursorName);
+ managedLedger.getCursors().removeCursor(cursor.getName());
+ managedLedger.getActiveCursors().removeCursor(cursor.getName());
+ ManagedCursorImpl spyCursor = Mockito.spy(cursor);
+ managedLedger.getCursors().add(spyCursor, PositionImpl.EARLIEST);
+ managedLedger.getActiveCursors().add(spyCursor, PositionImpl.EARLIEST);
+ AtomicInteger replyReadCounter = new AtomicInteger();
+ Mockito.doAnswer(invocation -> {
+ if
(!String.valueOf(invocation.getArguments()[2]).equals("Normal")) {
+ replyReadCounter.incrementAndGet();
+ }
+ return invocation.callRealMethod();
+ }).when(spyCursor).asyncReplayEntries(Mockito.anySet(), Mockito.any(),
Mockito.any());
+ Mockito.doAnswer(invocation -> {
+ if
(!String.valueOf(invocation.getArguments()[2]).equals("Normal")) {
+ replyReadCounter.incrementAndGet();
+ }
+ return invocation.callRealMethod();
+ }).when(spyCursor).asyncReplayEntries(Mockito.anySet(), Mockito.any(),
Mockito.any(), Mockito.anyBoolean());
+ admin.topics().createSubscription(topicName, cursorName,
MessageId.earliest);
+ return replyReadCounter;
+ }
+
+ @Test
+ public void testNoRepeatedReadAndDiscard() throws Exception {
+ int delayedMessages = 100;
+ final String topic =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ final String subName = "my-sub";
+ admin.topics().createNonPartitionedTopic(topic);
+ AtomicInteger replyReadCounter = injectReplayReadCounter(topic,
subName);
+
+ // Send messages.
+ @Cleanup
+ Producer<Integer> producer =
pulsarClient.newProducer(Schema.INT32).topic(topic).enableBatching(false).create();
+ for (int i = 0; i < delayedMessages; i++) {
+ MessageId messageId = producer.newMessage()
+ .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
+ .value(100 + i)
+ .send();
+ log.info("Published message :{}", messageId);
+ }
+ producer.close();
+
+ // Make ack holes.
+ Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName(subName)
+ .receiverQueueSize(10)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+ Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName(subName)
+ .receiverQueueSize(10)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+ List<Message> msgList1 = new ArrayList<>();
+ List<Message> msgList2 = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ Message msg1 = consumer1.receive(1, TimeUnit.SECONDS);
+ if (msg1 != null) {
+ msgList1.add(msg1);
+ }
+ Message msg2 = consumer2.receive(1, TimeUnit.SECONDS);
+ if (msg2 != null) {
+ msgList2.add(msg2);
+ }
+ }
+ Consumer<Integer> redeliverConsumer = null;
+ if (!msgList1.isEmpty()) {
+ msgList1.forEach(msg -> consumer1.acknowledgeAsync(msg));
+ redeliverConsumer = consumer2;
+ } else {
+ msgList2.forEach(msg -> consumer2.acknowledgeAsync(msg));
+ redeliverConsumer = consumer1;
+ }
+
+ // consumer3 will be added to the "recentJoinedConsumers".
+ Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName(subName)
+ .receiverQueueSize(1000)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+ redeliverConsumer.close();
+
+ // Verify: no repeated Read-and-discard.
+ Thread.sleep(5 * 1000);
+ int maxReplayCount = delayedMessages * 2;
+ log.info("Reply read count: {}", replyReadCounter.get());
+ assertTrue(replyReadCounter.get() < maxReplayCount);
+
+ // cleanup.
+ consumer1.close();
+ consumer2.close();
+ consumer3.close();
+ admin.topics().delete(topic, false);
+ }
+
+ /**
+ * This test is in order to guarantee the feature added by
https://github.com/apache/pulsar/pull/7105.
+ * 1. Start 3 consumers:
+ * - consumer1 will be closed and trigger a messages redeliver.
+ * - consumer2 will not ack any messages to make the new consumer joined
late will be stuck due
+ * to the mechanism "recentlyJoinedConsumers".
+ * - consumer3 will always receive and ack messages.
+ * 2. Add consumer4 after consumer1 was close, and consumer4 will be stuck
due to the mechanism
+ * "recentlyJoinedConsumers".
+ * 3. Verify:
+ * - (Main purpose) consumer3 can still receive messages util the
cursor.readerPosition is larger than LAC.
+ * - no repeated Read-and-discard.
+ * - at last, all messages will be received.
+ */
+ @Test(timeOut = 180 * 1000) // the test will be finished in 60s.
+ public void testRecentJoinedPosWillNotStuckOtherConsumer() throws
Exception {
+ final int messagesSentPerTime = 100;
+ final Set<Integer> totalReceivedMessages = new TreeSet<>();
+ final String topic =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ final String subName = "my-sub";
+ admin.topics().createNonPartitionedTopic(topic);
+ AtomicInteger replyReadCounter = injectReplayReadCounter(topic,
subName);
+
+ // Send messages.
+ @Cleanup
+ Producer<Integer> producer =
pulsarClient.newProducer(Schema.INT32).topic(topic).enableBatching(false).create();
+ for (int i = 0; i < messagesSentPerTime; i++) {
+ MessageId messageId = producer.newMessage()
+ .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
+ .value(100 + i)
+ .send();
+ log.info("Published message :{}", messageId);
+ }
+
+ // 1. Start 3 consumers and make ack holes.
+ // - one consumer will be closed and trigger a messages redeliver.
+ // - one consumer will not ack any messages to make the new consumer
joined late will be stuck due to the
+ // mechanism "recentlyJoinedConsumers".
+ // - one consumer will always receive and ack messages.
+ Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName(subName)
+ .receiverQueueSize(10)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+ Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName(subName)
+ .receiverQueueSize(10)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+ Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName(subName)
+ .receiverQueueSize(10)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+ List<Message> msgList1 = new ArrayList<>();
+ List<Message> msgList2 = new ArrayList<>();
+ List<Message> msgList3 = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ Message<Integer> msg1 = consumer1.receive(1, TimeUnit.SECONDS);
+ if (msg1 != null) {
+ totalReceivedMessages.add(msg1.getValue());
+ msgList1.add(msg1);
+ }
+ Message<Integer> msg2 = consumer2.receive(1, TimeUnit.SECONDS);
+ if (msg2 != null) {
+ totalReceivedMessages.add(msg2.getValue());
+ msgList2.add(msg2);
+ }
+ Message<Integer> msg3 = consumer3.receive(1, TimeUnit.SECONDS);
+ if (msg2 != null) {
+ totalReceivedMessages.add(msg3.getValue());
+ msgList3.add(msg3);
+ }
+ }
+ Consumer<Integer> consumerWillBeClose = null;
+ Consumer<Integer> consumerAlwaysAck = null;
+ Consumer<Integer> consumerStuck = null;
+ if (!msgList1.isEmpty()) {
+ msgList1.forEach(msg -> consumer1.acknowledgeAsync(msg));
+ consumerAlwaysAck = consumer1;
+ consumerWillBeClose = consumer2;
+ consumerStuck = consumer3;
+ } else if (!msgList2.isEmpty()){
+ msgList2.forEach(msg -> consumer2.acknowledgeAsync(msg));
+ consumerAlwaysAck = consumer2;
+ consumerWillBeClose = consumer3;
+ consumerStuck = consumer1;
+ } else {
+ msgList3.forEach(msg -> consumer3.acknowledgeAsync(msg));
+ consumerAlwaysAck = consumer3;
+ consumerWillBeClose = consumer1;
+ consumerStuck = consumer2;
+ }
+
+ // 2. Add consumer4 after "consumerWillBeClose" was close, and
consumer4 will be stuck due to the mechanism
+ // "recentlyJoinedConsumers".
+ Consumer<Integer> consumer4 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName(subName)
+ .receiverQueueSize(1000)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+ consumerWillBeClose.close();
+
+ Thread.sleep(2000);
+
+ for (int i = messagesSentPerTime; i < messagesSentPerTime * 2; i++) {
+ MessageId messageId = producer.newMessage()
+ .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
+ .value(100 + i)
+ .send();
+ log.info("Published message :{}", messageId);
+ }
+
+ // Send messages again.
+ // Verify: "consumerAlwaysAck" can receive messages util the
cursor.readerPosition is larger than LAC.
+ while (true) {
+ Message<Integer> msg = consumerAlwaysAck.receive(2,
TimeUnit.SECONDS);
+ if (msg == null) {
+ break;
+ }
+ totalReceivedMessages.add(msg.getValue());
+ consumerAlwaysAck.acknowledge(msg);
+ }
+ PersistentTopic persistentTopic =
+ (PersistentTopic) pulsar.getBrokerService().getTopic(topic,
false).join().get();
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
+ ManagedCursorImpl cursor = (ManagedCursorImpl)
managedLedger.openCursor(subName);
+ log.info("cursor_readPosition {}, LAC {}", cursor.getReadPosition(),
managedLedger.getLastConfirmedEntry());
+ assertTrue(((PositionImpl) cursor.getReadPosition())
+ .compareTo((PositionImpl)
managedLedger.getLastConfirmedEntry()) > 0);
+
+ // Make all consumers to start to read and acknowledge messages.
+ // Verify: no repeated Read-and-discard.
+ Thread.sleep(5 * 1000);
+ int maxReplayCount = messagesSentPerTime * 2;
+ log.info("Reply read count: {}", replyReadCounter.get());
+ assertTrue(replyReadCounter.get() < maxReplayCount);
+ // Verify: at last, all messages will be received.
+ ReceivedMessages<Integer> receivedMessages =
ackAllMessages(consumerAlwaysAck, consumerStuck, consumer4);
+
totalReceivedMessages.addAll(receivedMessages.messagesReceived.stream().map(p
-> p.getRight()).collect(
+ Collectors.toList()));
+ assertEquals(totalReceivedMessages.size(), messagesSentPerTime * 2);
+
+ // cleanup.
+ consumer1.close();
+ consumer2.close();
+ consumer3.close();
+ consumer4.close();
+ producer.close();
+ admin.topics().delete(topic, false);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
index f58c1fa26af..ef070250ca1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
@@ -21,9 +21,14 @@ package org.apache.pulsar.client.api;
import com.google.common.collect.Sets;
import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -69,4 +74,65 @@ public abstract class ProducerConsumerBase extends
MockedPulsarServiceBaseTest {
return "my-property/my-ns/topic-" +
Long.toHexString(random.nextLong());
}
+ protected <T> ReceivedMessages<T> receiveAndAckMessages(
+ BiFunction<MessageId, T, Boolean> ackPredicate,
+ Consumer<T>...consumers) throws Exception {
+ ReceivedMessages receivedMessages = new ReceivedMessages();
+ while (true) {
+ int receivedMsgCount = 0;
+ for (int i = 0; i < consumers.length; i++) {
+ Consumer<T> consumer = consumers[i];
+ while (true) {
+ Message<T> msg = consumer.receive(2, TimeUnit.SECONDS);
+ if (msg != null) {
+ receivedMsgCount++;
+ T v = msg.getValue();
+ MessageId messageId = msg.getMessageId();
+
receivedMessages.messagesReceived.add(Pair.of(msg.getMessageId(), v));
+ if (ackPredicate.apply(messageId, v)) {
+ consumer.acknowledge(msg);
+
receivedMessages.messagesAcked.add(Pair.of(msg.getMessageId(), v));
+ }
+ } else {
+ break;
+ }
+ }
+ }
+ // Because of the possibility of consumers getting stuck with each
other, only jump out of the loop if all
+ // consumers could not receive messages.
+ if (receivedMsgCount == 0) {
+ break;
+ }
+ }
+ return receivedMessages;
+ }
+
+ protected <T> ReceivedMessages<T> ackAllMessages(Consumer<T>...consumers)
throws Exception {
+ return receiveAndAckMessages((msgId, msgV) -> true, consumers);
+ }
+
+ protected static class ReceivedMessages<T> {
+
+ List<Pair<MessageId, T>> messagesReceived = new ArrayList<>();
+
+ List<Pair<MessageId, T>> messagesAcked = new ArrayList<>();
+
+ public boolean hasReceivedMessage(T v) {
+ for (Pair<MessageId, T> pair : messagesReceived) {
+ if (pair.getRight().equals(v)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean hasAckedMessage(T v) {
+ for (Pair<MessageId, T> pair : messagesAcked) {
+ if (pair.getRight().equals(v)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
}