This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3c9894d53c0198b5738597ddc2c9db2356c950c9
Author: Yunze Xu <xyzinfern...@163.com>
AuthorDate: Thu Nov 11 17:46:55 2021 +0800

    [Managed Ledger] Fix the incorrect total size when BrokerEntryMetadata is 
enabled (#12714)
    
    ### Motivation
    
    When the BrokerEntryMetadata is enabled, the total size in 
`ManagedLedgerImpl` is inaccurate. Because when the total size is updated in 
`OpAddEntry#safeRun`, the `dataLength` is the initial size of `data` when 
`OpAddEntry` is constructed, but `data` could be changed via `setData` method.
    
    The inaccurate total size could affect the retention size validation. 
Because in `ManagedLedgerImpl#internalTrimLedgers`, the total size reduces by 
the size of `LedgerInfo`, which is assigned from the 
`LedgerHandle#getLength()`. Therefore, the total size will become 0 or less 
before all ledgers are removed.
    
    ### Modifications
    
    - Update `dataLength` field in `setData` method.
    - Add a `testManagedLedgerTotalSize` test to `BrokerEntryMetadataE2ETest`. 
It produces 10 messages and trigger the rollover manually so that the first 
`LedgerInfo` of the managed ledger contains the correct total bytes. Then 
compare the `totalSize` field with it to verify this fix works.
    
    (cherry picked from commit 5dbb7d25849f3a037aa522b5d0767801aa0a5096)
---
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java |  1 +
 .../broker/service/BrokerEntryMetadataE2ETest.java | 45 ++++++++++++++++++++++
 2 files changed, 46 insertions(+)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 9106b4f..ecae17e 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -314,6 +314,7 @@ public class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallba
     }
 
     public void setData(ByteBuf data) {
+        this.dataLength = data.readableBytes();
         this.data = data;
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
index 52c8375..92784bf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
@@ -18,17 +18,25 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+
+import java.time.Duration;
 import java.util.List;
 import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
 import org.assertj.core.util.Sets;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -214,4 +222,41 @@ public class BrokerEntryMetadataE2ETest extends 
BrokerTestBase {
                 .subscribe();
         consumer.getLastMessageId();
     }
+
+    @Test
+    public void testManagedLedgerTotalSize() throws Exception {
+        final String topic = newTopicName();
+        final int messages = 10;
+
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.lookups().lookupTopic(topic);
+        final ManagedLedgerImpl managedLedger = 
pulsar.getBrokerService().getTopicIfExists(topic).get()
+                .map(topicObject -> (ManagedLedgerImpl) ((PersistentTopic) 
topicObject).getManagedLedger())
+                .orElse(null);
+        Assert.assertNotNull(managedLedger);
+        final ManagedCursor cursor = managedLedger.openCursor("cursor"); // 
prevent ledgers being removed
+
+        @Cleanup
+        final Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+        for (int i = 0; i < messages; i++) {
+            producer.send("msg-" + i);
+        }
+
+        Assert.assertTrue(managedLedger.getTotalSize() > 0);
+
+        managedLedger.getConfig().setMinimumRolloverTime(0, 
TimeUnit.MILLISECONDS);
+        managedLedger.getConfig().setMaxEntriesPerLedger(1);
+        managedLedger.rollCurrentLedgerIfFull();
+
+        Awaitility.await().atMost(Duration.ofSeconds(3))
+                .until(() -> managedLedger.getLedgersInfo().size() > 1);
+
+        final List<LedgerInfo> ledgerInfoList = 
managedLedger.getLedgersInfoAsList();
+        Assert.assertEquals(ledgerInfoList.size(), 2);
+        Assert.assertEquals(ledgerInfoList.get(0).getSize(), 
managedLedger.getTotalSize());
+
+        cursor.close();
+    }
 }

Reply via email to