This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 726de461130 [improve] [broker] Avoid repeated Read-and-discard when 
using Key_Shared mode (#22245)
726de461130 is described below

commit 726de4611308c0cec4a5691d31bab4941525f00b
Author: fengyubiao <[email protected]>
AuthorDate: Fri Mar 29 12:06:26 2024 +0800

    [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared 
mode (#22245)
    
    (cherry picked from commit e34ea626a65da4c8e1578010f857aa961a7b5c55)
---
 .../persistent/MessageRedeliveryController.java    |   8 +
 .../PersistentDispatcherMultipleConsumers.java     |  47 +++-
 ...istentStickyKeyDispatcherMultipleConsumers.java | 104 +++++++-
 .../client/api/KeySharedSubscriptionTest.java      | 266 +++++++++++++++++++++
 .../pulsar/client/api/ProducerConsumerBase.java    |  66 +++++
 5 files changed, 470 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
index 5bf3f5506fa..63803177242 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
@@ -95,6 +95,14 @@ public class MessageRedeliveryController {
         }
     }
 
+    public Long getHash(long ledgerId, long entryId) {
+        LongPair value = hashesToBeBlocked.get(ledgerId, entryId);
+        if (value == null) {
+            return null;
+        }
+        return value.first;
+    }
+
     public void removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) 
{
         if (!allowOutOfOrderDelivery) {
             List<LongPair> keysToRemove = new ArrayList<>();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 6e4b37456b6..f6f63787a86 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -314,24 +314,25 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
             }
 
             NavigableSet<PositionImpl> messagesToReplayNow = 
getMessagesToReplayNow(messagesToRead);
-
-            if (!messagesToReplayNow.isEmpty()) {
+            NavigableSet<PositionImpl> messagesToReplayFiltered = 
filterOutEntriesWillBeDiscarded(messagesToReplayNow);
+            if (!messagesToReplayFiltered.isEmpty()) {
                 if (log.isDebugEnabled()) {
-                    log.debug("[{}] Schedule replay of {} messages for {} 
consumers", name, messagesToReplayNow.size(),
-                            consumerList.size());
+                    log.debug("[{}] Schedule replay of {} messages for {} 
consumers", name,
+                            messagesToReplayFiltered.size(), 
consumerList.size());
                 }
 
                 havePendingReplayRead = true;
                 minReplayedPosition = messagesToReplayNow.first();
                 Set<? extends Position> deletedMessages = 
topic.isDelayedDeliveryEnabled()
-                        ? asyncReplayEntriesInOrder(messagesToReplayNow) : 
asyncReplayEntries(messagesToReplayNow);
+                        ? asyncReplayEntriesInOrder(messagesToReplayFiltered)
+                        : asyncReplayEntries(messagesToReplayFiltered);
                 // clear already acked positions from replay bucket
 
                 deletedMessages.forEach(position -> 
redeliveryMessages.remove(((PositionImpl) position).getLedgerId(),
                         ((PositionImpl) position).getEntryId()));
                 // if all the entries are acked-entries and cleared up from 
redeliveryMessages, try to read
                 // next entries as readCompletedEntries-callback was never 
called
-                if ((messagesToReplayNow.size() - deletedMessages.size()) == 
0) {
+                if ((messagesToReplayFiltered.size() - deletedMessages.size()) 
== 0) {
                     havePendingReplayRead = false;
                     readMoreEntriesAsync();
                 }
@@ -340,7 +341,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                     log.debug("[{}] Dispatcher read is blocked due to 
unackMessages {} reached to max {}", name,
                             totalUnackedMessages, 
topic.getMaxUnackedMessagesOnSubscription());
                 }
-            } else if (!havePendingRead) {
+            } else if (!havePendingRead && hasConsumersNeededNormalRead()) {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Schedule read of {} messages for {} 
consumers", name, messagesToRead,
                             consumerList.size());
@@ -369,7 +370,16 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                             topic.getMaxReadPosition());
                 }
             } else {
-                log.debug("[{}] Cannot schedule next read until previous one 
is done", name);
+                if (log.isDebugEnabled()) {
+                    if (!messagesToReplayNow.isEmpty()) {
+                        log.debug("[{}] [{}] Skipping read for the topic: 
because all entries in replay queue were"
+                                + " filtered out due to the mechanism of 
Key_Shared mode, and the left consumers have"
+                                + " no permits now",
+                                topic.getName(), getSubscriptionName());
+                    } else {
+                        log.debug("[{}] Cannot schedule next read until 
previous one is done", name);
+                    }
+                }
             }
         } else {
             if (log.isDebugEnabled()) {
@@ -1100,6 +1110,27 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
         }
     }
 
+    /**
+     * This is a mode method designed for Key_Shared mode.
+     * Filter out the entries that will be discarded due to the order 
guarantee mechanism of Key_Shared mode.
+     * This method is in order to avoid the scenario below:
+     * - Get positions from the Replay queue.
+     * - Read entries from BK.
+     * - The order guarantee mechanism of Key_Shared mode filtered out all the 
entries.
+     * - Delivery non entry to the client, but we did a BK read.
+     */
+    protected NavigableSet<PositionImpl> 
filterOutEntriesWillBeDiscarded(NavigableSet<PositionImpl> src) {
+        return src;
+    }
+
+    /**
+     * This is a mode method designed for Key_Shared mode, to avoid 
unnecessary stuck.
+     * See detail {@link 
PersistentStickyKeyDispatcherMultipleConsumers#hasConsumersNeededNormalRead}.
+     */
+    protected boolean hasConsumersNeededNormalRead() {
+        return true;
+    }
+
     protected synchronized boolean shouldPauseDeliveryForDelayTracker() {
         return delayedDeliveryTracker.isPresent() && 
delayedDeliveryTracker.get().shouldPauseAllDeliveries();
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 8f05530f58b..ee2ebd7ca86 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -30,13 +30,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.MapUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import 
org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
@@ -165,6 +168,14 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                 }
             };
 
+    private static final FastThreadLocal<Map<Consumer, List<PositionImpl>>> 
localGroupedPositions =
+            new FastThreadLocal<Map<Consumer, List<PositionImpl>>>() {
+                @Override
+                protected Map<Consumer, List<PositionImpl>> initialValue() 
throws Exception {
+                    return new HashMap<>();
+                }
+            };
+
     @Override
     protected synchronized boolean trySendMessagesToConsumers(ReadType 
readType, List<Entry> entries) {
         long totalMessagesSent = 0;
@@ -248,15 +259,9 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             assert consumer != null; // checked when added to groupedEntries
             List<Entry> entriesWithSameKey = current.getValue();
             int entriesWithSameKeyCount = entriesWithSameKey.size();
-            int availablePermits = Math.max(consumer.getAvailablePermits(), 0);
-            if (consumer.getMaxUnackedMessages() > 0) {
-                int remainUnAckedMessages =
-                        // Avoid negative number
-                        Math.max(consumer.getMaxUnackedMessages() - 
consumer.getUnackedMessages(), 0);
-                availablePermits = Math.min(availablePermits, 
remainUnAckedMessages);
-            }
-            int maxMessagesForC = Math.min(entriesWithSameKeyCount, 
availablePermits);
-            int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, 
entriesWithSameKey, maxMessagesForC,
+            int availablePermits = getAvailablePermits(consumer);
+            int messagesForC = getRestrictedMaxEntriesForConsumer(consumer,
+                    
entriesWithSameKey.stream().map(Entry::getPosition).collect(Collectors.toList()),
 availablePermits,
                     readType, consumerStickyKeyHashesMap.get(consumer));
             if (log.isDebugEnabled()) {
                 log.debug("[{}] select consumer {} with messages num {}, read 
type is {}",
@@ -289,7 +294,6 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
                 EntryBatchIndexesAcks batchIndexesAcks = 
EntryBatchIndexesAcks.get(messagesForC);
                 totalEntries += filterEntriesForConsumer(entriesWithSameKey, 
batchSizes, sendMessageInfo,
                         batchIndexesAcks, cursor, readType == ReadType.Replay, 
consumer);
-
                 consumer.sendMessages(entriesWithSameKey, batchSizes, 
batchIndexesAcks,
                         sendMessageInfo.getTotalMessages(),
                         sendMessageInfo.getTotalBytes(), 
sendMessageInfo.getTotalChunkedMessages(),
@@ -332,8 +336,9 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
         return false;
     }
 
-    private int getRestrictedMaxEntriesForConsumer(Consumer consumer, 
List<Entry> entries, int maxMessages,
-            ReadType readType, Set<Integer> stickyKeyHashes) {
+    private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<? 
extends Position> entries,
+           int availablePermits, ReadType readType, Set<Integer> 
stickyKeyHashes) {
+        int maxMessages = Math.min(entries.size(), availablePermits);
         if (maxMessages == 0) {
             return 0;
         }
@@ -378,7 +383,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
         // Here, the consumer is one that has recently joined, so we can only 
send messages that were
         // published before it has joined.
         for (int i = 0; i < maxMessages; i++) {
-            if (((PositionImpl) 
entries.get(i).getPosition()).compareTo(maxReadPosition) >= 0) {
+            if (((PositionImpl) entries.get(i)).compareTo(maxReadPosition) >= 
0) {
                 // We have already crossed the divider line. All messages in 
the list are now
                 // newer than what we can currently dispatch to this consumer
                 return i;
@@ -405,6 +410,9 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
     }
 
     private boolean removeConsumersFromRecentJoinedConsumers() {
+        if (MapUtils.isEmpty(recentlyJoinedConsumers)) {
+            return false;
+        }
         Iterator<Map.Entry<Consumer, PositionImpl>> itr = 
recentlyJoinedConsumers.entrySet().iterator();
         boolean hasConsumerRemovedFromTheRecentJoinedConsumers = false;
         PositionImpl mdp = (PositionImpl) cursor.getMarkDeletedPosition();
@@ -437,6 +445,76 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         }
     }
 
+    private int getAvailablePermits(Consumer c) {
+        int availablePermits = Math.max(c.getAvailablePermits(), 0);
+        if (c.getMaxUnackedMessages() > 0) {
+            // Avoid negative number
+            int remainUnAckedMessages = Math.max(c.getMaxUnackedMessages() - 
c.getUnackedMessages(), 0);
+            availablePermits = Math.min(availablePermits, 
remainUnAckedMessages);
+        }
+        return availablePermits;
+    }
+
+    @Override
+    protected synchronized NavigableSet<PositionImpl> 
filterOutEntriesWillBeDiscarded(NavigableSet<PositionImpl> src) {
+        if (src.isEmpty()) {
+            return src;
+        }
+        NavigableSet<PositionImpl> res = new TreeSet<>();
+        // Group positions.
+        final Map<Consumer, List<PositionImpl>> groupedPositions = 
localGroupedPositions.get();
+        groupedPositions.clear();
+        for (PositionImpl pos : src) {
+            Long stickyKeyHash = redeliveryMessages.getHash(pos.getLedgerId(), 
pos.getEntryId());
+            if (stickyKeyHash == null) {
+                res.add(pos);
+                continue;
+            }
+            Consumer c = selector.select(stickyKeyHash.intValue());
+            if (c == null) {
+                // Maybe using HashRangeExclusiveStickyKeyConsumerSelector.
+                continue;
+            }
+            groupedPositions.computeIfAbsent(c, k -> new 
ArrayList<>()).add(pos);
+        }
+        // Filter positions by the Recently Joined Position rule.
+        for (Map.Entry<Consumer, List<PositionImpl>> item : 
groupedPositions.entrySet()) {
+            int availablePermits = getAvailablePermits(item.getKey());
+            if (availablePermits == 0) {
+                continue;
+            }
+            int posCountToRead = 
getRestrictedMaxEntriesForConsumer(item.getKey(), item.getValue(), 
availablePermits,
+                    ReadType.Replay, null);
+            if (posCountToRead > 0) {
+                res.addAll(item.getValue().subList(0, posCountToRead));
+            }
+        }
+        return res;
+    }
+
+    /**
+     * In Key_Shared mode, the consumer will not receive any entries from a 
normal reading if it is included in
+     * {@link #recentlyJoinedConsumers}, they can only receive entries from 
replay reads.
+     * If all entries in {@link #redeliveryMessages} have been filtered out 
due to the order guarantee mechanism,
+     * Broker need a normal read to make the consumers not included in @link 
#recentlyJoinedConsumers} will not be
+     * stuck. See https://github.com/apache/pulsar/pull/7105.
+     */
+    @Override
+    protected boolean hasConsumersNeededNormalRead() {
+        for (Consumer consumer : consumerList) {
+            if (consumer == null || consumer.isBlocked()) {
+                continue;
+            }
+            if (recentlyJoinedConsumers.containsKey(consumer)) {
+                continue;
+            }
+            if (consumer.getAvailablePermits() > 0) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     @Override
     public SubType getType() {
         return SubType.Key_Shared;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 18fb141be31..72195550508 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -38,6 +38,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListSet;
@@ -48,12 +49,17 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.Topic;
 import 
org.apache.pulsar.broker.service.nonpersistent.NonPersistentStickyKeyDispatcherMultipleConsumers;
 import 
org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.common.api.proto.KeySharedMode;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -61,6 +67,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.awaitility.Awaitility;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -1630,4 +1637,263 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
         log.info("Got {} other messages...", sum);
         Assert.assertEquals(sum, delayedMessages + messages);
     }
+
+    private AtomicInteger injectReplayReadCounter(String topicName, String 
cursorName) throws Exception {
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
managedLedger.openCursor(cursorName);
+        managedLedger.getCursors().removeCursor(cursor.getName());
+        managedLedger.getActiveCursors().removeCursor(cursor.getName());
+        ManagedCursorImpl spyCursor = Mockito.spy(cursor);
+        managedLedger.getCursors().add(spyCursor, PositionImpl.EARLIEST);
+        managedLedger.getActiveCursors().add(spyCursor, PositionImpl.EARLIEST);
+        AtomicInteger replyReadCounter = new AtomicInteger();
+        Mockito.doAnswer(invocation -> {
+            if 
(!String.valueOf(invocation.getArguments()[2]).equals("Normal")) {
+                replyReadCounter.incrementAndGet();
+            }
+            return invocation.callRealMethod();
+        }).when(spyCursor).asyncReplayEntries(Mockito.anySet(), Mockito.any(), 
Mockito.any());
+        Mockito.doAnswer(invocation -> {
+            if 
(!String.valueOf(invocation.getArguments()[2]).equals("Normal")) {
+                replyReadCounter.incrementAndGet();
+            }
+            return invocation.callRealMethod();
+        }).when(spyCursor).asyncReplayEntries(Mockito.anySet(), Mockito.any(), 
Mockito.any(), Mockito.anyBoolean());
+        admin.topics().createSubscription(topicName, cursorName, 
MessageId.earliest);
+        return replyReadCounter;
+    }
+
+    @Test
+    public void testNoRepeatedReadAndDiscard() throws Exception {
+        int delayedMessages = 100;
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final String subName = "my-sub";
+        admin.topics().createNonPartitionedTopic(topic);
+        AtomicInteger replyReadCounter = injectReplayReadCounter(topic, 
subName);
+
+        // Send messages.
+        @Cleanup
+        Producer<Integer> producer = 
pulsarClient.newProducer(Schema.INT32).topic(topic).enableBatching(false).create();
+        for (int i = 0; i < delayedMessages; i++) {
+            MessageId messageId = producer.newMessage()
+                    .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
+                    .value(100 + i)
+                    .send();
+            log.info("Published message :{}", messageId);
+        }
+        producer.close();
+
+        // Make ack holes.
+        Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+        Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+        List<Message> msgList1 = new ArrayList<>();
+        List<Message> msgList2 = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            Message msg1 = consumer1.receive(1, TimeUnit.SECONDS);
+            if (msg1 != null) {
+                msgList1.add(msg1);
+            }
+            Message msg2 = consumer2.receive(1, TimeUnit.SECONDS);
+            if (msg2 != null) {
+                msgList2.add(msg2);
+            }
+        }
+        Consumer<Integer> redeliverConsumer = null;
+        if (!msgList1.isEmpty()) {
+            msgList1.forEach(msg -> consumer1.acknowledgeAsync(msg));
+            redeliverConsumer = consumer2;
+        } else {
+            msgList2.forEach(msg -> consumer2.acknowledgeAsync(msg));
+            redeliverConsumer = consumer1;
+        }
+
+        // consumer3 will be added to the "recentJoinedConsumers".
+        Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName(subName)
+                .receiverQueueSize(1000)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+        redeliverConsumer.close();
+
+        // Verify: no repeated Read-and-discard.
+        Thread.sleep(5 * 1000);
+        int maxReplayCount = delayedMessages * 2;
+        log.info("Reply read count: {}", replyReadCounter.get());
+        assertTrue(replyReadCounter.get() < maxReplayCount);
+
+        // cleanup.
+        consumer1.close();
+        consumer2.close();
+        consumer3.close();
+        admin.topics().delete(topic, false);
+    }
+
+    /**
+     * This test is in order to guarantee the feature added by 
https://github.com/apache/pulsar/pull/7105.
+     * 1. Start 3 consumers:
+     *   - consumer1 will be closed and trigger a messages redeliver.
+     *   - consumer2 will not ack any messages to make the new consumer joined 
late will be stuck due
+     *     to the mechanism "recentlyJoinedConsumers".
+     *   - consumer3 will always receive and ack messages.
+     * 2. Add consumer4 after consumer1 was close, and consumer4 will be stuck 
due to the mechanism
+     *    "recentlyJoinedConsumers".
+     * 3. Verify:
+     *   - (Main purpose) consumer3 can still receive messages util the 
cursor.readerPosition is larger than LAC.
+     *   - no repeated Read-and-discard.
+     *   - at last, all messages will be received.
+     */
+    @Test(timeOut = 180 * 1000) // the test will be finished in 60s.
+    public void testRecentJoinedPosWillNotStuckOtherConsumer() throws 
Exception {
+        final int messagesSentPerTime = 100;
+        final Set<Integer> totalReceivedMessages = new TreeSet<>();
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final String subName = "my-sub";
+        admin.topics().createNonPartitionedTopic(topic);
+        AtomicInteger replyReadCounter = injectReplayReadCounter(topic, 
subName);
+
+        // Send messages.
+        @Cleanup
+        Producer<Integer> producer = 
pulsarClient.newProducer(Schema.INT32).topic(topic).enableBatching(false).create();
+        for (int i = 0; i < messagesSentPerTime; i++) {
+            MessageId messageId = producer.newMessage()
+                    .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
+                    .value(100 + i)
+                    .send();
+            log.info("Published message :{}", messageId);
+        }
+
+        // 1. Start 3 consumers and make ack holes.
+        //   - one consumer will be closed and trigger a messages redeliver.
+        //   - one consumer will not ack any messages to make the new consumer 
joined late will be stuck due to the
+        //     mechanism "recentlyJoinedConsumers".
+        //   - one consumer will always receive and ack messages.
+        Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+        Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+        Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+        List<Message> msgList1 = new ArrayList<>();
+        List<Message> msgList2 = new ArrayList<>();
+        List<Message> msgList3 = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            Message<Integer> msg1 = consumer1.receive(1, TimeUnit.SECONDS);
+            if (msg1 != null) {
+                totalReceivedMessages.add(msg1.getValue());
+                msgList1.add(msg1);
+            }
+            Message<Integer> msg2 = consumer2.receive(1, TimeUnit.SECONDS);
+            if (msg2 != null) {
+                totalReceivedMessages.add(msg2.getValue());
+                msgList2.add(msg2);
+            }
+            Message<Integer> msg3 = consumer3.receive(1, TimeUnit.SECONDS);
+            if (msg2 != null) {
+                totalReceivedMessages.add(msg3.getValue());
+                msgList3.add(msg3);
+            }
+        }
+        Consumer<Integer> consumerWillBeClose = null;
+        Consumer<Integer> consumerAlwaysAck = null;
+        Consumer<Integer> consumerStuck = null;
+        if (!msgList1.isEmpty()) {
+            msgList1.forEach(msg -> consumer1.acknowledgeAsync(msg));
+            consumerAlwaysAck = consumer1;
+            consumerWillBeClose = consumer2;
+            consumerStuck = consumer3;
+        } else if (!msgList2.isEmpty()){
+            msgList2.forEach(msg -> consumer2.acknowledgeAsync(msg));
+            consumerAlwaysAck = consumer2;
+            consumerWillBeClose = consumer3;
+            consumerStuck = consumer1;
+        } else {
+            msgList3.forEach(msg -> consumer3.acknowledgeAsync(msg));
+            consumerAlwaysAck = consumer3;
+            consumerWillBeClose = consumer1;
+            consumerStuck = consumer2;
+        }
+
+        // 2. Add consumer4 after "consumerWillBeClose" was close, and 
consumer4 will be stuck due to the mechanism
+        //    "recentlyJoinedConsumers".
+        Consumer<Integer> consumer4 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName(subName)
+                .receiverQueueSize(1000)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+        consumerWillBeClose.close();
+
+        Thread.sleep(2000);
+
+        for (int i = messagesSentPerTime; i < messagesSentPerTime * 2; i++) {
+            MessageId messageId = producer.newMessage()
+                    .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
+                    .value(100 + i)
+                    .send();
+            log.info("Published message :{}", messageId);
+        }
+
+        // Send messages again.
+        // Verify: "consumerAlwaysAck" can receive messages util the 
cursor.readerPosition is larger than LAC.
+        while (true) {
+            Message<Integer> msg = consumerAlwaysAck.receive(2, 
TimeUnit.SECONDS);
+            if (msg == null) {
+                break;
+            }
+            totalReceivedMessages.add(msg.getValue());
+            consumerAlwaysAck.acknowledge(msg);
+        }
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topic, 
false).join().get();
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
managedLedger.openCursor(subName);
+        log.info("cursor_readPosition {}, LAC {}", cursor.getReadPosition(), 
managedLedger.getLastConfirmedEntry());
+        assertTrue(((PositionImpl) cursor.getReadPosition())
+                .compareTo((PositionImpl) 
managedLedger.getLastConfirmedEntry())  > 0);
+
+        // Make all consumers to start to read and acknowledge messages.
+        // Verify: no repeated Read-and-discard.
+        Thread.sleep(5 * 1000);
+        int maxReplayCount = messagesSentPerTime * 2;
+        log.info("Reply read count: {}", replyReadCounter.get());
+        assertTrue(replyReadCounter.get() < maxReplayCount);
+        // Verify: at last, all messages will be received.
+        ReceivedMessages<Integer> receivedMessages = 
ackAllMessages(consumerAlwaysAck, consumerStuck, consumer4);
+        
totalReceivedMessages.addAll(receivedMessages.messagesReceived.stream().map(p 
-> p.getRight()).collect(
+                Collectors.toList()));
+        assertEquals(totalReceivedMessages.size(), messagesSentPerTime * 2);
+
+        // cleanup.
+        consumer1.close();
+        consumer2.close();
+        consumer3.close();
+        consumer4.close();
+        producer.close();
+        admin.topics().delete(topic, false);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
index f58c1fa26af..ef070250ca1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
@@ -21,9 +21,14 @@ package org.apache.pulsar.client.api;
 import com.google.common.collect.Sets;
 
 import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 import java.util.Set;
 
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -69,4 +74,65 @@ public abstract class ProducerConsumerBase extends 
MockedPulsarServiceBaseTest {
         return "my-property/my-ns/topic-" + 
Long.toHexString(random.nextLong());
     }
 
+    protected <T> ReceivedMessages<T> receiveAndAckMessages(
+            BiFunction<MessageId, T, Boolean> ackPredicate,
+            Consumer<T>...consumers) throws Exception {
+        ReceivedMessages receivedMessages = new ReceivedMessages();
+        while (true) {
+            int receivedMsgCount = 0;
+            for (int i = 0; i < consumers.length; i++) {
+                Consumer<T> consumer = consumers[i];
+                while (true) {
+                    Message<T> msg = consumer.receive(2, TimeUnit.SECONDS);
+                    if (msg != null) {
+                        receivedMsgCount++;
+                        T v = msg.getValue();
+                        MessageId messageId = msg.getMessageId();
+                        
receivedMessages.messagesReceived.add(Pair.of(msg.getMessageId(), v));
+                        if (ackPredicate.apply(messageId, v)) {
+                            consumer.acknowledge(msg);
+                            
receivedMessages.messagesAcked.add(Pair.of(msg.getMessageId(), v));
+                        }
+                    } else {
+                        break;
+                    }
+                }
+            }
+            // Because of the possibility of consumers getting stuck with each 
other, only jump out of the loop if all
+            // consumers could not receive messages.
+            if (receivedMsgCount == 0) {
+                break;
+            }
+        }
+        return receivedMessages;
+    }
+
+    protected <T> ReceivedMessages<T> ackAllMessages(Consumer<T>...consumers) 
throws Exception {
+        return receiveAndAckMessages((msgId, msgV) -> true, consumers);
+    }
+
+    protected static class ReceivedMessages<T> {
+
+        List<Pair<MessageId, T>> messagesReceived = new ArrayList<>();
+
+        List<Pair<MessageId, T>> messagesAcked = new ArrayList<>();
+
+        public boolean hasReceivedMessage(T v) {
+            for (Pair<MessageId, T> pair : messagesReceived) {
+                if (pair.getRight().equals(v)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+
+        public boolean hasAckedMessage(T v) {
+            for (Pair<MessageId, T> pair : messagesAcked) {
+                if (pair.getRight().equals(v)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+    }
 }


Reply via email to