This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3c9894d53c0198b5738597ddc2c9db2356c950c9 Author: Yunze Xu <xyzinfern...@163.com> AuthorDate: Thu Nov 11 17:46:55 2021 +0800 [Managed Ledger] Fix the incorrect total size when BrokerEntryMetadata is enabled (#12714) ### Motivation When the BrokerEntryMetadata is enabled, the total size in `ManagedLedgerImpl` is inaccurate. Because when the total size is updated in `OpAddEntry#safeRun`, the `dataLength` is the initial size of `data` when `OpAddEntry` is constructed, but `data` could be changed via `setData` method. The inaccurate total size could affect the retention size validation. Because in `ManagedLedgerImpl#internalTrimLedgers`, the total size reduces by the size of `LedgerInfo`, which is assigned from the `LedgerHandle#getLength()`. Therefore, the total size will become 0 or less before all ledgers are removed. ### Modifications - Update `dataLength` field in `setData` method. - Add a `testManagedLedgerTotalSize` test to `BrokerEntryMetadataE2ETest`. It produces 10 messages and trigger the rollover manually so that the first `LedgerInfo` of the managed ledger contains the correct total bytes. Then compare the `totalSize` field with it to verify this fix works. (cherry picked from commit 5dbb7d25849f3a037aa522b5d0767801aa0a5096) --- .../apache/bookkeeper/mledger/impl/OpAddEntry.java | 1 + .../broker/service/BrokerEntryMetadataE2ETest.java | 45 ++++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index 9106b4f..ecae17e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -314,6 +314,7 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba } public void setData(ByteBuf data) { + this.dataLength = data.readableBytes(); this.data = data; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java index 52c8375..92784bf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java @@ -18,17 +18,25 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; + +import java.time.Duration; import java.util.List; import lombok.Cleanup; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; import org.assertj.core.util.Sets; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -214,4 +222,41 @@ public class BrokerEntryMetadataE2ETest extends BrokerTestBase { .subscribe(); consumer.getLastMessageId(); } + + @Test + public void testManagedLedgerTotalSize() throws Exception { + final String topic = newTopicName(); + final int messages = 10; + + admin.topics().createNonPartitionedTopic(topic); + admin.lookups().lookupTopic(topic); + final ManagedLedgerImpl managedLedger = pulsar.getBrokerService().getTopicIfExists(topic).get() + .map(topicObject -> (ManagedLedgerImpl) ((PersistentTopic) topicObject).getManagedLedger()) + .orElse(null); + Assert.assertNotNull(managedLedger); + final ManagedCursor cursor = managedLedger.openCursor("cursor"); // prevent ledgers being removed + + @Cleanup + final Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + for (int i = 0; i < messages; i++) { + producer.send("msg-" + i); + } + + Assert.assertTrue(managedLedger.getTotalSize() > 0); + + managedLedger.getConfig().setMinimumRolloverTime(0, TimeUnit.MILLISECONDS); + managedLedger.getConfig().setMaxEntriesPerLedger(1); + managedLedger.rollCurrentLedgerIfFull(); + + Awaitility.await().atMost(Duration.ofSeconds(3)) + .until(() -> managedLedger.getLedgersInfo().size() > 1); + + final List<LedgerInfo> ledgerInfoList = managedLedger.getLedgersInfoAsList(); + Assert.assertEquals(ledgerInfoList.size(), 2); + Assert.assertEquals(ledgerInfoList.get(0).getSize(), managedLedger.getTotalSize()); + + cursor.close(); + } }