This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ca779827581 [fix] [ml] fix wrong msg backlog of non-durable cursor 
after trim ledgers (#21250)
ca779827581 is described below

commit ca77982758170993aa52c0f7f45bbf9ad72e368a
Author: fengyubiao <[email protected]>
AuthorDate: Sun Oct 8 22:19:12 2023 +0800

    [fix] [ml] fix wrong msg backlog of non-durable cursor after trim ledgers 
(#21250)
    
    ### Background
    - But after trimming ledgers, `ml.lastConfirmedPosition` relies on a 
deleted ledger when the current ledger of ML is empty.
    - Cursor prevents setting `markDeletedPosition` to a value larger than 
`ml.lastConfirmedPosition`, but there are no entries to read<sup>[1]</sup>.
    - The code description in the method `advanceCursors` said: do not make 
`cursor.markDeletedPosition` larger than 
`ml.lastConfirmedPosition`<sup>[2]</sup>
    
    ### Issue
    If there is no durable cursor, the `markDeletedPosition` might be set to 
`{current_ledger, -1}`, and `async mark delete` will be prevented by the 
`rule-2` above. So he `backlog`, `readPosition`, and `markDeletedPosition` of 
the cursor will be in an incorrect position after trimming the ledger. You can 
reproduce it by the test `testTrimLedgerIfNoDurableCursor`
    
    ### Modifications
    Do not make `cursor.markDeletedPosition` larger than 
`ml.lastConfirmedPosition` when advancing non-durable cursors.
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  28 +++--
 .../client/api/NonDurableSubscriptionTest.java     | 115 ++++++++++++++++++++-
 2 files changed, 133 insertions(+), 10 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index d51b48bdda5..e011bf3e6d7 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2850,15 +2850,14 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
             return;
         }
 
-        // need to move mark delete for non-durable cursors to the first 
ledger NOT marked for deletion
-        // calling getNumberOfEntries latter for a ledger that is already 
deleted will be problematic and return
-        // incorrect results
-        Long firstNonDeletedLedger = 
ledgers.higherKey(ledgersToDelete.get(ledgersToDelete.size() - 
1).getLedgerId());
-        if (firstNonDeletedLedger == null) {
-            throw new LedgerNotExistException("First non deleted Ledger is not 
found");
+        // Just ack messages like a consumer. Normally, consumers will not 
confirm a position that does not exist, so
+        // find the latest existing position to ack.
+        PositionImpl highestPositionToDelete = 
calculateLastEntryInLedgerList(ledgersToDelete);
+        if (highestPositionToDelete == null) {
+            log.warn("[{}] The ledgers to be trim are all empty, skip to 
advance non-durable cursors: {}",
+                    name, ledgersToDelete);
+            return;
         }
-        PositionImpl highestPositionToDelete = new 
PositionImpl(firstNonDeletedLedger, -1);
-
         cursors.forEach(cursor -> {
             // move the mark delete position to the highestPositionToDelete 
only if it is smaller than the add confirmed
             // to prevent the edge case where the cursor is caught up to the 
latest and highestPositionToDelete may be
@@ -2882,6 +2881,19 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         });
     }
 
+    /**
+     * @return null if all ledgers is empty.
+     */
+    private PositionImpl calculateLastEntryInLedgerList(List<LedgerInfo> 
ledgersToDelete) {
+        for (int i = ledgersToDelete.size() - 1; i >= 0; i--) {
+            LedgerInfo ledgerInfo = ledgersToDelete.get(i);
+            if (ledgerInfo != null && ledgerInfo.hasEntries() && 
ledgerInfo.getEntries() > 0) {
+                return PositionImpl.get(ledgerInfo.getLedgerId(), 
ledgerInfo.getEntries() - 1);
+            }
+        }
+        return null;
+    }
+
     /**
      * Delete this ManagedLedger completely from the system.
      *
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
index b5d00ac012a..20407295ccb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
@@ -33,6 +33,7 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
@@ -55,7 +56,7 @@ import org.testng.annotations.Test;
 
 @Test(groups = "broker-api")
 @Slf4j
-public class NonDurableSubscriptionTest  extends ProducerConsumerBase {
+public class NonDurableSubscriptionTest extends ProducerConsumerBase {
 
     private final AtomicInteger numFlow = new AtomicInteger(0);
 
@@ -316,7 +317,7 @@ public class NonDurableSubscriptionTest  extends 
ProducerConsumerBase {
     }
 
     @Test
-    public void testTrimLedgerIfNoDurableCursor() throws Exception {
+    public void testHasMessageAvailableIfIncomingQueueNotEmpty() throws 
Exception {
         final String nonDurableCursor = "non-durable-cursor";
         final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
         Reader<String> reader = 
pulsarClient.newReader(Schema.STRING).topic(topicName).receiverQueueSize(1)
@@ -557,4 +558,114 @@ public class NonDurableSubscriptionTest  extends 
ProducerConsumerBase {
         producer.close();
         admin.topics().delete(topicName, false);
     }
+
+    @Test
+    public void testTrimLedgerIfNoDurableCursor() throws Exception {
+        final String nonDurableCursor = "non-durable-cursor";
+        final String durableCursor = "durable-cursor";
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createNonPartitionedTopic(topicName);
+        Reader<String> reader = 
pulsarClient.newReader(Schema.STRING).topic(topicName).receiverQueueSize(1)
+                
.subscriptionName(nonDurableCursor).startMessageId(MessageIdImpl.earliest).create();
+        Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topicName).receiverQueueSize(1)
+                .subscriptionName(durableCursor).subscribe();
+        consumer.close();
+
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+        producer.send("1");
+        producer.send("2");
+        producer.send("3");
+        producer.send("4");
+        MessageIdImpl msgIdInDeletedLedger5 = (MessageIdImpl) 
producer.send("5");
+
+        Message<String> msg1 = reader.readNext(2, TimeUnit.SECONDS);
+        assertEquals(msg1.getValue(), "1");
+        Message<String> msg2 = reader.readNext(2, TimeUnit.SECONDS);
+        assertEquals(msg2.getValue(), "2");
+        Message<String> msg3 = reader.readNext(2, TimeUnit.SECONDS);
+        assertEquals(msg3.getValue(), "3");
+
+        // Unsubscribe durable cursor.
+        // Trigger a trim ledgers task, and verify trim ledgers successful.
+        admin.topics().unload(topicName);
+        Thread.sleep(3 * 1000);
+        admin.topics().deleteSubscription(topicName, durableCursor);
+        // Trim ledgers after release durable cursor.
+        trimLedgers(topicName);
+        List<ManagedLedgerInternalStats.LedgerInfo> ledgers = 
admin.topics().getInternalStats(topicName).ledgers;
+        assertEquals(ledgers.size(), 1);
+        assertNotEquals(ledgers.get(0).ledgerId, 
msgIdInDeletedLedger5.getLedgerId());
+
+        // Verify backlog and markDeletePosition is correct.
+        Awaitility.await().untilAsserted(() -> {
+            SubscriptionStats subscriptionStats = 
admin.topics().getStats(topicName, true, true, true)
+                    .getSubscriptions().get(nonDurableCursor);
+            log.info("backlog size: {}", subscriptionStats.getMsgBacklog());
+            assertEquals(subscriptionStats.getMsgBacklog(), 0);
+            ManagedLedgerInternalStats.CursorStats cursorStats =
+                    
admin.topics().getInternalStats(topicName).cursors.get(nonDurableCursor);
+            String[] ledgerIdAndEntryId = 
cursorStats.markDeletePosition.split(":");
+            PositionImpl actMarkDeletedPos =
+                    PositionImpl.get(Long.valueOf(ledgerIdAndEntryId[0]), 
Long.valueOf(ledgerIdAndEntryId[1]));
+            PositionImpl expectedMarkDeletedPos =
+                    PositionImpl.get(msgIdInDeletedLedger5.getLedgerId(), 
msgIdInDeletedLedger5.getEntryId());
+            log.info("Expected mark deleted position: {}", 
expectedMarkDeletedPos);
+            log.info("Actual mark deleted position: {}", 
cursorStats.markDeletePosition);
+            
Assert.assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 0);
+        });
+
+        // Clear the incoming queue of the reader for next test.
+        while (true) {
+            Message<String> msg = reader.readNext(2, TimeUnit.SECONDS);
+            if (msg == null) {
+                break;
+            }
+            log.info("clear msg: {}", msg.getValue());
+        }
+
+        // The following tests are designed to verify the api 
"getNumberOfEntries" and "consumedEntries" still work
+        // after changes.See the code-description added with the PR 
https://github.com/apache/pulsar/pull/10667.
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
ml.getCursors().get(nonDurableCursor);
+
+        // Verify "getNumberOfEntries" if there is no entries to consume.
+        assertEquals(0, cursor.getNumberOfEntries());
+        assertEquals(0, ml.getNumberOfEntries());
+
+        // Verify "getNumberOfEntries" if there is 1 entry to consume.
+        producer.send("6");
+        producer.send("7");
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(2, ml.getNumberOfEntries());
+            // Since there is one message has been pulled into the incoming 
queue of reader. There is only one messages
+            // waiting to cursor read.
+            assertEquals(1, cursor.getNumberOfEntries());
+        });
+
+        // Verify "consumedEntries" is correct.
+        ManagedLedgerInternalStats.CursorStats cursorStats =
+                
admin.topics().getInternalStats(topicName).cursors.get(nonDurableCursor);
+        // "messagesConsumedCounter" should be 0 after unload the topic.
+        // Note: "topic_internal_stat.cursor.messagesConsumedCounter" means 
how many messages were acked on this
+        //   cursor. The similar one "topic_stats.lastConsumedTimestamp" means 
the last time of sending messages to
+        //   the consumer.
+        assertEquals(0, cursorStats.messagesConsumedCounter);
+        Message<String> msg6 = reader.readNext(2, TimeUnit.SECONDS);
+        assertEquals(msg6.getValue(), "6");
+        Message<String> msg7 = reader.readNext(2, TimeUnit.SECONDS);
+        assertEquals(msg7.getValue(), "7");
+        Awaitility.await().untilAsserted(() -> {
+            // "messagesConsumedCounter" should be 2 after consumed 2 message.
+            ManagedLedgerInternalStats.CursorStats cStat =
+                    
admin.topics().getInternalStats(topicName).cursors.get(nonDurableCursor);
+            assertEquals(2, cStat.messagesConsumedCounter);
+        });
+
+        // cleanup.
+        reader.close();
+        producer.close();
+        admin.topics().delete(topicName, false);
+    }
 }

Reply via email to