This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new c3c17dee756 [fix][broker] avoid offload system topic (#22497)
c3c17dee756 is described below

commit c3c17dee7567d0a182affb1991e1e35098689d9b
Author: Qiang Zhao <mattisonc...@apache.org>
AuthorDate: Wed May 8 13:10:49 2024 +0800

    [fix][broker] avoid offload system topic (#22497)
    
    Co-authored-by: 道君 <dao...@apache.org>
---
 .../pulsar/broker/service/BrokerService.java       |  8 +-
 .../pulsar/broker/service/BrokerServiceTest.java   | 94 ++++++++++++++++++++++
 2 files changed, 101 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 032d4dd9369..60d56c0d908 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1906,7 +1906,13 @@ public class BrokerService implements Closeable {
                     topicLevelOffloadPolicies,
                     
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, 
policies.orElse(null)),
                     getPulsar().getConfig().getProperties());
-            if 
(NamespaceService.isSystemServiceNamespace(namespace.toString())) {
+            if (NamespaceService.isSystemServiceNamespace(namespace.toString())
+                || SystemTopicNames.isSystemTopic(topicName)) {
+                /*
+                 Avoid setting broker internal system topics using off-loader 
because some of them are the
+                 preconditions of other topics. The slow replying log speed 
will cause a delay in all the topic
+                 loading.(timeout)
+                 */
                 
managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
             } else  {
                 if (topicLevelOffloadPolicies != null) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index fcf11fad708..ab0b8f813ea 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -67,12 +67,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.CloseableHttpClient;
@@ -111,6 +114,9 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.protocol.Commands;
@@ -1772,4 +1778,92 @@ public class BrokerServiceTest extends BrokerTestBase {
             fail("Unsubscribe failed");
         }
     }
+
+
+    @Test
+    public void testOffloadConfShouldNotAppliedForSystemTopic() throws 
PulsarAdminException {
+        final String driver = "aws-s3";
+        final String region = "test-region";
+        final String bucket = "test-bucket";
+        final String role = "test-role";
+        final String roleSessionName = "test-role-session-name";
+        final String credentialId = "test-credential-id";
+        final String credentialSecret = "test-credential-secret";
+        final String endPoint = "test-endpoint";
+        final Integer maxBlockSizeInBytes = 5;
+        final Integer readBufferSizeInBytes = 2;
+        final Long offloadThresholdInBytes = 10L;
+        final Long offloadThresholdInSeconds = 1000L;
+        final Long offloadDeletionLagInMillis = 5L;
+
+        final OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(
+                driver,
+                region,
+                bucket,
+                endPoint,
+                role,
+                roleSessionName,
+                credentialId,
+                credentialSecret,
+                maxBlockSizeInBytes,
+                readBufferSizeInBytes,
+                offloadThresholdInBytes,
+                offloadThresholdInSeconds,
+                offloadDeletionLagInMillis,
+                OffloadedReadPriority.TIERED_STORAGE_FIRST
+        );
+
+        var fakeOffloader = new LedgerOffloader() {
+            @Override
+            public String getOffloadDriverName() {
+                return driver;
+            }
+
+            @Override
+            public CompletableFuture<Void> offload(ReadHandle ledger, UUID 
uid, Map<String, String> extraMetadata) {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            @Override
+            public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, 
UUID uid, Map<String, String> offloadDriverMetadata) {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            @Override
+            public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID 
uid, Map<String, String> offloadDriverMetadata) {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            @Override
+            public OffloadPolicies getOffloadPolicies() {
+                return offloadPolicies;
+            }
+
+            @Override
+            public void close() {
+            }
+        };
+
+        final BrokerService brokerService = pulsar.getBrokerService();
+        final String namespace = "prop/" + UUID.randomUUID();
+        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().setOffloadPolicies(namespace, offloadPolicies);
+
+        // Inject the cache to avoid real load off-loader jar
+        final Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = 
pulsar.getLedgerOffloaderMap();
+        ledgerOffloaderMap.put(NamespaceName.get(namespace), fakeOffloader);
+
+        // (1) test normal topic
+        final String normalTopic = "persistent://" + namespace + "/" + 
UUID.randomUUID();
+        var managedLedgerConfig = 
brokerService.getManagedLedgerConfig(TopicName.get(normalTopic)).join();
+
+        Assert.assertEquals(managedLedgerConfig.getLedgerOffloader(), 
fakeOffloader);
+
+        // (2) test system topic
+        for (String eventTopicName : SystemTopicNames.EVENTS_TOPIC_NAMES) {
+            managedLedgerConfig = 
brokerService.getManagedLedgerConfig(TopicName.get(eventTopicName)).join();
+            Assert.assertEquals(managedLedgerConfig.getLedgerOffloader(), 
NullLedgerOffloader.INSTANCE);
+        }
+    }
 }
+

Reply via email to