This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b3e7be9447a24b46f765bb1d0b2debe9a7349d1a
Author: lipenghui <peng...@apache.org>
AuthorDate: Fri Sep 24 16:40:55 2021 +0800

    Return the last position of the compacted data while the original data been 
deleted. (#12161)
    
    Currently, for the get last message ID request the broker returns -1:-1 if 
all the original data been deleted.
    
    ```
    09:51:12.156 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO 
org.apache.pulsar.broker.service.ServerCnx - [/172.16.124.36:44443] Created 
subscription on topic xxx
    09:51:12.156 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - Reset 
cursor:ManagedCursorImpl{ledger=xxx, name=__compaction, ackPos=44946:0, 
readPos=44946:1} to 66425:-1 since ledger consumed completely
    09:51:12.156 [BookKeeperClientWorker-OrderedExecutor-3-0] INFO 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] Ledger 44946 
contains the current last confirmed entry 44946:0, and it is going to be deleted
    09:51:12.159 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] End 
TrimConsumedLedgers. ledgers=1 totalSize=0
    09:51:12.159 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] Removing ledger 
44946 - size: 3999
    ```
    
    After the rollover task, the topic internal stats will be:
    
    ```
    {
        "entriesAddedCounter": 0,
        "numberOfEntries": 0,
        "totalSize": 0,
        "currentLedgerEntries": 0,
        "currentLedgerSize": 0,
        "lastLedgerCreatedTimestamp": "2021-09-20T09:51:12.15Z",
        "waitingCursorsCount": 29,
        "pendingAddEntriesCount": 0,
        "lastConfirmedEntry": "44946:0",
        "state": "LedgerOpened",
        "ledgers": [
            {
                "ledgerId": 66425,
                "entries": 0,
                "size": 0,
                "offloaded": false,
                "underReplicated": false
            }
        ],
        "cursors": {
            "__compaction": {
                "markDeletePosition": "44946:0",
                "readPosition": "44946:1",
                "waitingReadOp": false,
                "pendingReadOps": 0,
                "messagesConsumedCounter": 0,
                "cursorLedger": -1,
                "cursorLedgerLastEntry": -1,
                "individuallyDeletedMessages": "[]",
                "lastLedgerSwitchTimestamp": "2021-09-20T09:51:12.154Z",
                "state": "NoLedger",
                "numberOfEntriesSinceFirstNotAckedMessage": 1,
                "totalNonContiguousDeletedMessagesRange": 0,
                "subscriptionHavePendingRead": false,
                "subscriptionHavePendingReplayRead": false,
                "properties": {
                    "CompactedTopicLedger": 64365
                }
            }
        },
        "schemaLedgers": [],
        "compactedLedger": {
            "ledgerId": 64365,
            "entries": 1,
            "size": 4024,
            "offloaded": false,
            "underReplicated": false
        }
    }
    ```
    
    At this time, when a reader call hasMessageAvailable(), the client will get 
the last message id from the broker, the NonRecoverableLedgerException will 
throw at the broker side due the ledger 44946 has been deleted.
    
    ```
    12:41:40.937 [pulsar-io-4-5] INFO 
org.apache.pulsar.broker.service.ServerCnx - [/172.16.124.36:53488] Created 
subscription on topic xxx / yyy
    12:41:41.131 [BookKeeperClientWorker-OrderedExecutor-3-0] ERROR 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] Error opening 
ledger for reading at position 44946:0 - 
org.apache.bookkeeper.mledger.ManagedLedgerException$NonRecoverableLedgerException:
 No such ledger exists on Metadata Server
    ```
    
    The problem is we are not checking if there is compacted data for the 
topic. If the topic has compacted data but encounter the above situation, we 
should return the last message ID of the compacted Ledger to the client.
    
    Added the test for the new changes.
    
    (cherry picked from commit 86e720fc90b8ca42486a3ce848235761b524e723)
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 33 ++++++++++---
 .../apache/pulsar/compaction/CompactedTopic.java   |  2 +
 .../pulsar/compaction/CompactedTopicImpl.java      | 12 +++++
 .../pulsar/compaction/CompactedTopicTest.java      | 56 ++++++++++++++++++++++
 4 files changed, 97 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 340685b..a6d7f9c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1719,12 +1719,33 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         batchSizeFuture.whenComplete((batchSize, e) -> {
             if (e != null) {
                 if (e.getCause() instanceof 
ManagedLedgerException.NonRecoverableLedgerException) {
-                    // in this case, the ledgers been removed except the 
current ledger
-                    // and current ledger without any data
-                    
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
-                            -1, -1, partitionIndex, -1,
-                            markDeletePosition != null ? 
markDeletePosition.getLedgerId() : -1,
-                            markDeletePosition != null ? 
markDeletePosition.getEntryId() : -1));
+                    
persistentTopic.getCompactedTopic().readLastEntryOfCompactedLedger().thenAccept(entry
 -> {
+                        if (entry != null) {
+                            // in this case, all the data has been compacted, 
so return the last position
+                            // in the compacted ledger to the client
+                            MessageMetadata metadata = 
Commands.parseMessageMetadata(entry.getDataBuffer());
+                            int bs = metadata.getNumMessagesInBatch();
+                            int largestBatchIndex = bs > 0 ? bs - 1 : -1;
+                            
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
+                                    entry.getLedgerId(), entry.getLedgerId(), 
partitionIndex, largestBatchIndex,
+                                    markDeletePosition != null ? 
markDeletePosition.getLedgerId() : -1,
+                                    markDeletePosition != null ? 
markDeletePosition.getEntryId() : -1));
+                            entry.release();
+                        } else {
+                            // in this case, the ledgers been removed except 
the current ledger
+                            // and current ledger without any data
+                            
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
+                                    -1, -1, partitionIndex, -1,
+                                    markDeletePosition != null ? 
markDeletePosition.getLedgerId() : -1,
+                                    markDeletePosition != null ? 
markDeletePosition.getEntryId() : -1));
+                        }
+                    }).exceptionally(ex -> {
+                        ctx.writeAndFlush(Commands.newError(
+                                requestId, ServerError.MetadataError,
+                                "Failed to read last entry of the compacted 
Ledger "
+                                        + ex.getCause().getMessage()));
+                        return null;
+                    });
                 } else {
                     ctx.writeAndFlush(Commands.newError(
                             requestId, ServerError.MetadataError,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index 4922852..7c96937 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.compaction;
 
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
+import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.service.Consumer;
@@ -31,4 +32,5 @@ public interface CompactedTopic {
                                 boolean isFirstRead,
                                 ReadEntriesCallback callback,
                                 Consumer consumer);
+    CompletableFuture<Entry> readLastEntryOfCompactedLedger();
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index ee9fbbc..1313413 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -281,6 +281,18 @@ public class CompactedTopicImpl implements CompactedTopic {
         return compactedTopicContext == null ? Optional.empty() : 
Optional.of(compactedTopicContext.get());
     }
 
+    @Override
+    public CompletableFuture<Entry> readLastEntryOfCompactedLedger() {
+        if (compactionHorizon == null) {
+            return CompletableFuture.completedFuture(null);
+        }
+        return compactedTopicContext.thenCompose(context ->
+                readEntries(context.ledger, 
context.ledger.getLastAddConfirmed(), context.ledger.getLastAddConfirmed())
+                        .thenCompose(entries -> entries.size() > 0
+                                ? 
CompletableFuture.completedFuture(entries.get(0))
+                                : CompletableFuture.completedFuture(null)));
+    }
+
     private static int comparePositionAndMessageId(PositionImpl p, 
MessageIdData m) {
         return ComparisonChain.start()
             .compare(p.getLedgerId(), m.getLedgerId())
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 410d1e5..b70f495 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -40,10 +40,13 @@ import lombok.Cleanup;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
@@ -56,8 +59,10 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.RawMessageImpl;
 import org.apache.pulsar.common.api.proto.MessageIdData;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -360,4 +365,55 @@ public class CompactedTopicTest extends 
MockedPulsarServiceBaseTest {
         Assert.assertEquals(compactedMsgCount, 1);
         Assert.assertEquals(nonCompactedMsgCount, numMessages);
     }
+
+    @Test
+    public void testLastMessageIdForCompactedLedger() throws Exception {
+        String topic = 
"persistent://my-property/use/my-ns/testLastMessageIdForCompactedLedger-" + 
UUID.randomUUID();
+        final String key = "1";
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
+        final int numMessages = 10;
+        final String msg = "test compaction msg";
+        for (int i = 0; i < numMessages; ++i) {
+            producer.newMessage().key(key).value(msg).send();
+        }
+        admin.topics().triggerCompaction(topic);
+        boolean succeed = retryStrategically((test) -> {
+            try {
+                return 
LongRunningProcessStatus.Status.SUCCESS.equals(admin.topics().compactionStatus(topic).status);
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 10, 200);
+
+        Assert.assertTrue(succeed);
+
+        PersistentTopicInternalStats stats0 = 
admin.topics().getInternalStats(topic);
+        admin.topics().unload(topic);
+        PersistentTopicInternalStats stats1 = 
admin.topics().getInternalStats(topic);
+        // Make sure the ledger rollover has triggered.
+        Assert.assertTrue(stats0.currentLedgerSize != 
stats1.currentLedgerSize);
+
+        Optional<Topic> topicRef = 
pulsar.getBrokerService().getTopicIfExists(topic).get();
+        Assert.assertTrue(topicRef.isPresent());
+        PersistentTopic persistentTopic = (PersistentTopic) topicRef.get();
+        ManagedLedgerImpl managedLedger = 
(ManagedLedgerImpl)persistentTopic.getManagedLedger();
+        managedLedger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
+
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(managedLedger.getCurrentLedgerEntries(), 0);
+            
Assert.assertTrue(managedLedger.getLastConfirmedEntry().getEntryId() != -1);
+            Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 
1);
+        });
+
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("test")
+                .readCompacted(true)
+                .startMessageId(MessageId.earliest)
+                .create();
+
+        Assert.assertTrue(reader.hasMessageAvailable());
+        Assert.assertEquals(msg, reader.readNext().getValue());
+        Assert.assertFalse(reader.hasMessageAvailable());
+    }
 }

Reply via email to