This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b3e7be9447a24b46f765bb1d0b2debe9a7349d1a Author: lipenghui <peng...@apache.org> AuthorDate: Fri Sep 24 16:40:55 2021 +0800 Return the last position of the compacted data while the original data been deleted. (#12161) Currently, for the get last message ID request the broker returns -1:-1 if all the original data been deleted. ``` 09:51:12.156 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.16.124.36:44443] Created subscription on topic xxx 09:51:12.156 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - Reset cursor:ManagedCursorImpl{ledger=xxx, name=__compaction, ackPos=44946:0, readPos=44946:1} to 66425:-1 since ledger consumed completely 09:51:12.156 [BookKeeperClientWorker-OrderedExecutor-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] Ledger 44946 contains the current last confirmed entry 44946:0, and it is going to be deleted 09:51:12.159 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] End TrimConsumedLedgers. ledgers=1 totalSize=0 09:51:12.159 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] Removing ledger 44946 - size: 3999 ``` After the rollover task, the topic internal stats will be: ``` { "entriesAddedCounter": 0, "numberOfEntries": 0, "totalSize": 0, "currentLedgerEntries": 0, "currentLedgerSize": 0, "lastLedgerCreatedTimestamp": "2021-09-20T09:51:12.15Z", "waitingCursorsCount": 29, "pendingAddEntriesCount": 0, "lastConfirmedEntry": "44946:0", "state": "LedgerOpened", "ledgers": [ { "ledgerId": 66425, "entries": 0, "size": 0, "offloaded": false, "underReplicated": false } ], "cursors": { "__compaction": { "markDeletePosition": "44946:0", "readPosition": "44946:1", "waitingReadOp": false, "pendingReadOps": 0, "messagesConsumedCounter": 0, "cursorLedger": -1, "cursorLedgerLastEntry": -1, "individuallyDeletedMessages": "[]", "lastLedgerSwitchTimestamp": "2021-09-20T09:51:12.154Z", "state": "NoLedger", "numberOfEntriesSinceFirstNotAckedMessage": 1, "totalNonContiguousDeletedMessagesRange": 0, "subscriptionHavePendingRead": false, "subscriptionHavePendingReplayRead": false, "properties": { "CompactedTopicLedger": 64365 } } }, "schemaLedgers": [], "compactedLedger": { "ledgerId": 64365, "entries": 1, "size": 4024, "offloaded": false, "underReplicated": false } } ``` At this time, when a reader call hasMessageAvailable(), the client will get the last message id from the broker, the NonRecoverableLedgerException will throw at the broker side due the ledger 44946 has been deleted. ``` 12:41:40.937 [pulsar-io-4-5] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.16.124.36:53488] Created subscription on topic xxx / yyy 12:41:41.131 [BookKeeperClientWorker-OrderedExecutor-3-0] ERROR org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] Error opening ledger for reading at position 44946:0 - org.apache.bookkeeper.mledger.ManagedLedgerException$NonRecoverableLedgerException: No such ledger exists on Metadata Server ``` The problem is we are not checking if there is compacted data for the topic. If the topic has compacted data but encounter the above situation, we should return the last message ID of the compacted Ledger to the client. Added the test for the new changes. (cherry picked from commit 86e720fc90b8ca42486a3ce848235761b524e723) --- .../apache/pulsar/broker/service/ServerCnx.java | 33 ++++++++++--- .../apache/pulsar/compaction/CompactedTopic.java | 2 + .../pulsar/compaction/CompactedTopicImpl.java | 12 +++++ .../pulsar/compaction/CompactedTopicTest.java | 56 ++++++++++++++++++++++ 4 files changed, 97 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 340685b..a6d7f9c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1719,12 +1719,33 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { batchSizeFuture.whenComplete((batchSize, e) -> { if (e != null) { if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) { - // in this case, the ledgers been removed except the current ledger - // and current ledger without any data - ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, - -1, -1, partitionIndex, -1, - markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, - markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); + persistentTopic.getCompactedTopic().readLastEntryOfCompactedLedger().thenAccept(entry -> { + if (entry != null) { + // in this case, all the data has been compacted, so return the last position + // in the compacted ledger to the client + MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); + int bs = metadata.getNumMessagesInBatch(); + int largestBatchIndex = bs > 0 ? bs - 1 : -1; + ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, + entry.getLedgerId(), entry.getLedgerId(), partitionIndex, largestBatchIndex, + markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, + markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); + entry.release(); + } else { + // in this case, the ledgers been removed except the current ledger + // and current ledger without any data + ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, + -1, -1, partitionIndex, -1, + markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, + markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); + } + }).exceptionally(ex -> { + ctx.writeAndFlush(Commands.newError( + requestId, ServerError.MetadataError, + "Failed to read last entry of the compacted Ledger " + + ex.getCause().getMessage())); + return null; + }); } else { ctx.writeAndFlush(Commands.newError( requestId, ServerError.MetadataError, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java index 4922852..7c96937 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java @@ -20,6 +20,7 @@ package org.apache.pulsar.compaction; import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; +import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.service.Consumer; @@ -31,4 +32,5 @@ public interface CompactedTopic { boolean isFirstRead, ReadEntriesCallback callback, Consumer consumer); + CompletableFuture<Entry> readLastEntryOfCompactedLedger(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index ee9fbbc..1313413 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -281,6 +281,18 @@ public class CompactedTopicImpl implements CompactedTopic { return compactedTopicContext == null ? Optional.empty() : Optional.of(compactedTopicContext.get()); } + @Override + public CompletableFuture<Entry> readLastEntryOfCompactedLedger() { + if (compactionHorizon == null) { + return CompletableFuture.completedFuture(null); + } + return compactedTopicContext.thenCompose(context -> + readEntries(context.ledger, context.ledger.getLastAddConfirmed(), context.ledger.getLastAddConfirmed()) + .thenCompose(entries -> entries.size() > 0 + ? CompletableFuture.completedFuture(entries.get(0)) + : CompletableFuture.completedFuture(null))); + } + private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m) { return ComparisonChain.start() .compare(p.getLedgerId(), m.getLedgerId()) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java index 410d1e5..b70f495 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java @@ -40,10 +40,13 @@ import lombok.Cleanup; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Triple; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Message; @@ -56,8 +59,10 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.RawMessageImpl; import org.apache.pulsar.common.api.proto.MessageIdData; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -360,4 +365,55 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest { Assert.assertEquals(compactedMsgCount, 1); Assert.assertEquals(nonCompactedMsgCount, numMessages); } + + @Test + public void testLastMessageIdForCompactedLedger() throws Exception { + String topic = "persistent://my-property/use/my-ns/testLastMessageIdForCompactedLedger-" + UUID.randomUUID(); + final String key = "1"; + Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(false).create(); + final int numMessages = 10; + final String msg = "test compaction msg"; + for (int i = 0; i < numMessages; ++i) { + producer.newMessage().key(key).value(msg).send(); + } + admin.topics().triggerCompaction(topic); + boolean succeed = retryStrategically((test) -> { + try { + return LongRunningProcessStatus.Status.SUCCESS.equals(admin.topics().compactionStatus(topic).status); + } catch (PulsarAdminException e) { + return false; + } + }, 10, 200); + + Assert.assertTrue(succeed); + + PersistentTopicInternalStats stats0 = admin.topics().getInternalStats(topic); + admin.topics().unload(topic); + PersistentTopicInternalStats stats1 = admin.topics().getInternalStats(topic); + // Make sure the ledger rollover has triggered. + Assert.assertTrue(stats0.currentLedgerSize != stats1.currentLedgerSize); + + Optional<Topic> topicRef = pulsar.getBrokerService().getTopicIfExists(topic).get(); + Assert.assertTrue(topicRef.isPresent()); + PersistentTopic persistentTopic = (PersistentTopic) topicRef.get(); + ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)persistentTopic.getManagedLedger(); + managedLedger.maybeUpdateCursorBeforeTrimmingConsumedLedger(); + + Awaitility.await().untilAsserted(() -> { + Assert.assertEquals(managedLedger.getCurrentLedgerEntries(), 0); + Assert.assertTrue(managedLedger.getLastConfirmedEntry().getEntryId() != -1); + Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1); + }); + + Reader<String> reader = pulsarClient.newReader(Schema.STRING) + .topic(topic) + .subscriptionName("test") + .readCompacted(true) + .startMessageId(MessageId.earliest) + .create(); + + Assert.assertTrue(reader.hasMessageAvailable()); + Assert.assertEquals(msg, reader.readNext().getValue()); + Assert.assertFalse(reader.hasMessageAvailable()); + } }