This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 46c2b74f4c9 [fix][broker]Global topic policies do not affect after 
unloading topic and persistence global topic policies never affect (#24279)
46c2b74f4c9 is described below

commit 46c2b74f4c9727d6399e9a1b48b031696d8c7cfd
Author: fengyubiao <[email protected]>
AuthorDate: Fri May 16 13:35:30 2025 +0800

    [fix][broker]Global topic policies do not affect after unloading topic and 
persistence global topic policies never affect (#24279)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |   5 +-
 .../pulsar/broker/service/BrokerService.java       |  88 ++++++++++-------
 .../SystemTopicBasedTopicPoliciesService.java      |   2 -
 .../broker/service/TopicPoliciesService.java       |   1 -
 .../broker/service/persistent/PersistentTopic.java |  10 +-
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 108 +++++++++++++++++++++
 .../pulsar/broker/service/BrokerServiceTest.java   |   4 +-
 .../broker/service/OneWayReplicatorTest.java       |   2 +-
 .../PersistentTopicInitializeDelayTest.java        |   2 +-
 .../broker/service/ReplicationTopicGcTest.java     |   6 +-
 .../broker/service/TopicPolicyTestUtils.java       |   2 +-
 .../service/persistent/PersistentTopicTest.java    |   2 +-
 12 files changed, 178 insertions(+), 54 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index af18fb807f9..c666a19b61d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -514,10 +514,11 @@ public class PersistentTopicsBase extends AdminResource {
             if (optionalPolicies.isEmpty()) {
                 return CompletableFuture.completedFuture(null);
             }
-            // Query the topic-level policies only if the namespace-level 
policies exist
+            // Query the topic-level policies only if the namespace-level 
policies exist.
+            // Global policies does not affet Replication.
             final var namespacePolicies = optionalPolicies.get();
             return 
pulsar().getTopicPoliciesService().getTopicPoliciesAsync(topicName,
-                    TopicPoliciesService.GetType.DEFAULT
+                    TopicPoliciesService.GetType.LOCAL_ONLY
             ).thenApply(optionalTopicPolicies -> 
optionalTopicPolicies.map(TopicPolicies::getReplicationClustersSet)
                     .orElse(namespacePolicies.replication_clusters));
         });
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index ac076715daa..18929d7914d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -98,6 +98,7 @@ import 
org.apache.bookkeeper.mledger.impl.NonAppendableLedgerOffloader;
 import org.apache.bookkeeper.mledger.util.Futures;
 import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import 
org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -195,7 +196,6 @@ import 
org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.jspecify.annotations.NonNull;
-import org.jspecify.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1161,7 +1161,10 @@ public class BrokerService implements Closeable {
                     if (!exists && !createIfMissing) {
                         return 
CompletableFuture.completedFuture(Optional.empty());
                     }
-                    return 
getTopicPoliciesBypassSystemTopic(topicName).exceptionally(ex -> {
+                    // The topic level policies are not needed now, but the 
meaning of calling
+                    // "getTopicPoliciesBypassSystemTopic" will wait for 
system topic policies initialization.
+                    return getTopicPoliciesBypassSystemTopic(topicName, 
TopicPoliciesService.GetType.LOCAL_ONLY)
+                            .exceptionally(ex -> {
                         final Throwable rc = 
FutureUtil.unwrapCompletionException(ex);
                         final String errorInfo = String.format("Topic creation 
encountered an exception by initialize"
                                 + " topic policies service. topic_name=%s 
error_message=%s", topicName,
@@ -1169,10 +1172,8 @@ public class BrokerService implements Closeable {
                         log.error(errorInfo, rc);
                         throw FutureUtil.wrapToCompletionException(new 
ServiceUnitNotReadyException(errorInfo));
                     }).thenCompose(optionalTopicPolicies -> {
-                        final TopicPolicies topicPolicies = 
optionalTopicPolicies.orElse(null);
                         return topics.computeIfAbsent(topicName.toString(),
-                                (tpName) -> 
loadOrCreatePersistentTopic(tpName, createIfMissing, properties,
-                                        topicPolicies));
+                                (tpName) -> 
loadOrCreatePersistentTopic(tpName, createIfMissing, properties));
                     });
                 });
             } else {
@@ -1222,12 +1223,12 @@ public class BrokerService implements Closeable {
         }
     }
 
-    private CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesBypassSystemTopic(@NonNull TopicName topicName) {
+    private CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesBypassSystemTopic(@NonNull TopicName topicName,
+                                                                               
  TopicPoliciesService.GetType type) {
         if (ExtensibleLoadManagerImpl.isInternalTopic(topicName.toString())) {
             return CompletableFuture.completedFuture(Optional.empty());
         }
-        return 
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName,
-                TopicPoliciesService.GetType.DEFAULT);
+        return 
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName, type);
     }
 
     public CompletableFuture<Void> deleteTopic(String topic, boolean 
forceDelete) {
@@ -1645,7 +1646,7 @@ public class BrokerService implements Closeable {
      * @throws RuntimeException
      */
     protected CompletableFuture<Optional<Topic>> 
loadOrCreatePersistentTopic(final String topic,
-            boolean createIfMissing, Map<String, String> properties, @Nullable 
TopicPolicies topicPolicies) {
+            boolean createIfMissing, Map<String, String> properties) {
         final CompletableFuture<Optional<Topic>> topicFuture = 
FutureUtil.createFutureWithTimeout(
                 
Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), 
executor(),
                 () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION);
@@ -1661,7 +1662,7 @@ public class BrokerService implements Closeable {
 
                     if (topicLoadSemaphore.tryAcquire()) {
                         checkOwnershipAndCreatePersistentTopic(topic, 
createIfMissing, topicFuture,
-                                properties, topicPolicies);
+                                properties);
                         topicFuture.handle((persistentTopic, ex) -> {
                             // release permit and process pending topic
                             topicLoadSemaphore.release();
@@ -1677,7 +1678,7 @@ public class BrokerService implements Closeable {
                         });
                     } else {
                         pendingTopicLoadingQueue.add(new 
TopicLoadingContext(topic,
-                                createIfMissing, topicFuture, properties, 
topicPolicies));
+                                createIfMissing, topicFuture, properties));
                         if (log.isDebugEnabled()) {
                             log.debug("topic-loading for {} added into pending 
queue", topic);
                         }
@@ -1722,7 +1723,7 @@ public class BrokerService implements Closeable {
 
     private void checkOwnershipAndCreatePersistentTopic(final String topic, 
boolean createIfMissing,
                                        CompletableFuture<Optional<Topic>> 
topicFuture,
-                                       Map<String, String> properties, 
@Nullable TopicPolicies topicPolicies) {
+                                       Map<String, String> properties) {
         TopicName topicName = TopicName.get(topic);
         pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName)
                 .thenAccept(isActive -> {
@@ -1736,8 +1737,8 @@ public class BrokerService implements Closeable {
                         }
                         propertiesFuture.thenAccept(finalProperties ->
                                 //TODO add topicName in properties?
-                                createPersistentTopic(topic, createIfMissing, 
topicFuture,
-                                        finalProperties, topicPolicies)
+                                createPersistentTopic0(topic, createIfMissing, 
topicFuture,
+                                        finalProperties)
                         ).exceptionally(throwable -> {
                             log.warn("[{}] Read topic property failed", topic, 
throwable);
                             pulsar.getExecutor().execute(() -> 
topics.remove(topic, topicFuture));
@@ -1758,17 +1759,10 @@ public class BrokerService implements Closeable {
                 });
     }
 
-
     @VisibleForTesting
     public void createPersistentTopic0(final String topic, boolean 
createIfMissing,
                                        CompletableFuture<Optional<Topic>> 
topicFuture,
                                        Map<String, String> properties) {
-        createPersistentTopic(topic, createIfMissing, topicFuture, properties, 
null);
-    }
-
-    private void createPersistentTopic(final String topic, boolean 
createIfMissing,
-                                       CompletableFuture<Optional<Topic>> 
topicFuture,
-                                       Map<String, String> properties, 
@Nullable TopicPolicies topicPolicies) {
         TopicName topicName = TopicName.get(topic);
         final long topicCreateTimeMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
 
@@ -1787,7 +1781,7 @@ public class BrokerService implements Closeable {
         CompletableFuture<Void> isTopicAlreadyMigrated = 
checkTopicAlreadyMigrated(topicName);
         maxTopicsCheck.thenCompose(partitionedTopicMetadata -> 
validateTopicConsistency(topicName))
                 .thenCompose(__ -> isTopicAlreadyMigrated)
-                .thenCompose(__ -> getManagedLedgerConfig(topicName, 
topicPolicies))
+                .thenCompose(__ -> getManagedLedgerConfig(topicName))
         .thenAccept(managedLedgerConfig -> {
             if (isBrokerEntryMetadataEnabled() || 
isBrokerPayloadProcessorEnabled()) {
                 // init managedLedger interceptor
@@ -1947,30 +1941,44 @@ public class BrokerService implements Closeable {
     }
 
     public CompletableFuture<ManagedLedgerConfig> 
getManagedLedgerConfig(@NonNull TopicName topicName) {
-        final CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture =
-                getTopicPoliciesBypassSystemTopic(topicName);
-        return topicPoliciesFuture.thenCompose(optionalTopicPolicies ->
-                getManagedLedgerConfig(topicName, 
optionalTopicPolicies.orElse(null)));
-    }
-
-    private CompletableFuture<ManagedLedgerConfig> 
getManagedLedgerConfig(@NonNull TopicName topicName,
-                                                                          
@Nullable TopicPolicies topicPolicies) {
         requireNonNull(topicName);
         NamespaceName namespace = topicName.getNamespaceObject();
         ServiceConfiguration serviceConfig = pulsar.getConfiguration();
 
         NamespaceResources nsr = 
pulsar.getPulsarResources().getNamespaceResources();
         LocalPoliciesResources lpr = 
pulsar.getPulsarResources().getLocalPolicies();
+        final CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture =
+                getTopicPoliciesBypassSystemTopic(topicName, 
TopicPoliciesService.GetType.LOCAL_ONLY);
+        final CompletableFuture<Optional<TopicPolicies>> 
globalTopicPoliciesFuture =
+                getTopicPoliciesBypassSystemTopic(topicName, 
TopicPoliciesService.GetType.GLOBAL_ONLY);
         final CompletableFuture<Optional<Policies>> nsPolicies = 
nsr.getPoliciesAsync(namespace);
         final CompletableFuture<Optional<LocalPolicies>> lcPolicies = 
lpr.getLocalPoliciesAsync(namespace);
-        return nsPolicies.thenCombine(lcPolicies, (policies, localPolicies) -> 
{
+        return topicPoliciesFuture.thenCombine(globalTopicPoliciesFuture, 
(topicP, globalTopicP) -> {
+            return new ImmutablePair<>(topicP, globalTopicP);
+        }).thenCombine(nsPolicies, (topicPoliciesPair, np) -> {
+            return new ImmutablePair<>(topicPoliciesPair, np);
+        }).thenCombine(lcPolicies, (combined, localPolicies) -> {
+            Optional<TopicPolicies> topicP = combined.getLeft().getLeft();
+            Optional<TopicPolicies> globalTopicP = 
combined.getLeft().getRight();
+            Optional<Policies> policies = combined.getRight();
+
             PersistencePolicies persistencePolicies = null;
             RetentionPolicies retentionPolicies = null;
             OffloadPoliciesImpl topicLevelOffloadPolicies = null;
-            if (topicPolicies != null) {
-                persistencePolicies = topicPolicies.getPersistence();
-                retentionPolicies = topicPolicies.getRetentionPolicies();
-                topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
+            if (topicP.isPresent() && topicP.get().getPersistence() != null) {
+                persistencePolicies = topicP.get().getPersistence();
+            } else if (globalTopicP.isPresent() && 
globalTopicP.get().getPersistence() != null) {
+                persistencePolicies = globalTopicP.get().getPersistence();
+            }
+            if (topicP.isPresent() && topicP.get().getRetentionPolicies() != 
null) {
+                retentionPolicies = topicP.get().getRetentionPolicies();
+            } else if (globalTopicP.isPresent() && 
globalTopicP.get().getRetentionPolicies() != null) {
+                retentionPolicies = globalTopicP.get().getRetentionPolicies();
+            }
+            if (topicP.isPresent() && topicP.get().getOffloadPolicies() != 
null) {
+                topicLevelOffloadPolicies = topicP.get().getOffloadPolicies();
+            } else if (globalTopicP.isPresent() && 
globalTopicP.get().getOffloadPolicies() != null) {
+                topicLevelOffloadPolicies = 
globalTopicP.get().getOffloadPolicies();
             }
 
             if (persistencePolicies == null) {
@@ -2123,6 +2131,13 @@ public class BrokerService implements Closeable {
             managedLedgerConfig.setNewEntriesCheckDelayInMillis(
                     
serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
             return managedLedgerConfig;
+        }).exceptionally(ex -> {
+            final Throwable rc = FutureUtil.unwrapCompletionException(ex);
+            final String errorInfo = String.format("Topic creation encountered 
an exception by initialize"
+                            + " topic policies service. topic_name=%s 
error_message=%s", topicName,
+                    rc.getMessage());
+            log.error(errorInfo, rc);
+            throw FutureUtil.wrapToCompletionException(new 
ServiceUnitNotReadyException(errorInfo));
         });
     }
 
@@ -3219,7 +3234,7 @@ public class BrokerService implements Closeable {
             checkOwnershipAndCreatePersistentTopic(topic,
                     pendingTopic.isCreateIfMissing(),
                     pendingFuture,
-                    pendingTopic.getProperties(), 
pendingTopic.getTopicPolicies());
+                    pendingTopic.getProperties());
             pendingFuture.handle((persistentTopic, ex) -> {
                 // release permit and process next pending topic
                 if (acquiredPermit) {
@@ -3777,6 +3792,5 @@ public class BrokerService implements Closeable {
         private final boolean createIfMissing;
         private final CompletableFuture<Optional<Topic>> topicFuture;
         private final Map<String, String> properties;
-        private final TopicPolicies topicPolicies;
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 2000c9a3099..61960311e03 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -296,8 +296,6 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 if (!inserted || existingFuture != null) {
                     final var partitionedTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
                     final var policies = Optional.ofNullable(switch (type) {
-                        case DEFAULT -> 
Optional.ofNullable(policiesCache.get(partitionedTopicName))
-                                .orElseGet(() -> 
globalPoliciesCache.get(partitionedTopicName));
                         case GLOBAL_ONLY -> 
globalPoliciesCache.get(partitionedTopicName);
                         case LOCAL_ONLY -> 
policiesCache.get(partitionedTopicName);
                     });
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index 9b5d9a28ac2..4b7ed3765bb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -55,7 +55,6 @@ public interface TopicPoliciesService extends AutoCloseable {
      * It controls the behavior of {@link 
TopicPoliciesService#getTopicPoliciesAsync}.
      */
     enum GetType {
-        DEFAULT, // try getting the local topic policies, if not present, then 
get the global policies
         GLOBAL_ONLY, // only get the global policies
         LOCAL_ONLY,  // only get the local policies
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 89b9dfe0e3e..36344bdce28 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -4421,9 +4421,13 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 return CompletableFuture.completedFuture(null);
             }
             return 
topicPoliciesService.getTopicPoliciesAsync(partitionedTopicName,
-                    TopicPoliciesService.GetType.DEFAULT
-            ).thenAcceptAsync(optionalPolicies -> 
optionalPolicies.ifPresent(this::onUpdate),
-                    brokerService.getTopicOrderedExecutor());
+                    TopicPoliciesService.GetType.GLOBAL_ONLY)
+            .thenAcceptAsync(optionalPolicies -> 
optionalPolicies.ifPresent(this::onUpdate),
+                    brokerService.getTopicOrderedExecutor())
+            .thenCompose(__ -> 
topicPoliciesService.getTopicPoliciesAsync(partitionedTopicName,
+                    TopicPoliciesService.GetType.LOCAL_ONLY))
+            .thenAcceptAsync(optionalPolicies -> 
optionalPolicies.ifPresent(this::onUpdate),
+                            brokerService.getTopicOrderedExecutor());
         }
         return CompletableFuture.completedFuture(null);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 7ce10b94b72..c7a064e831d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -611,6 +611,114 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         admin.topics().delete(topic, false);
     }
 
+    @Test
+    public void testGlobalPolicyStillAffectsAfterUnloading() throws Exception {
+        // create topic and load it up.
+        final String namespace = myNamespace;
+        final String topic = BrokerTestUtil.newUniqueName("persistent://" + 
namespace + "/tp");
+        final TopicName topicName = TopicName.get(topic);
+        admin.topics().createNonPartitionedTopic(topic);
+        pulsarClient.newProducer().topic(topic).create().close();
+        final SystemTopicBasedTopicPoliciesService topicPoliciesService
+                = (SystemTopicBasedTopicPoliciesService) 
pulsar.getTopicPoliciesService();
+
+        // Set non-global policy of the limitation of max consumers.
+        // Set global policy of the limitation of max producers.
+        admin.topicPolicies(false).setMaxConsumers(topic, 10);
+        admin.topicPolicies(true).setMaxProducers(topic, 20);
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, 
LOCAL_ONLY).join().get()
+                    .getMaxConsumerPerTopic(), 10);
+            assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, 
GLOBAL_ONLY).join().get()
+                    .getMaxProducerPerTopic(), 20);
+            PersistentTopic persistentTopic =
+                    (PersistentTopic) 
pulsar.getBrokerService().getTopics().get(topic).get().get();
+            HierarchyTopicPolicies hierarchyTopicPolicies = 
persistentTopic.getHierarchyTopicPolicies();
+            
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10);
+            
assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 20);
+        });
+
+        // Reload topic and verify: both global policy and non-global policy 
affect.
+        admin.topics().unload(topic);
+        pulsarClient.newProducer().topic(topic).create().close();
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic =
+                    (PersistentTopic) 
pulsar.getBrokerService().getTopics().get(topic).get().get();
+            HierarchyTopicPolicies hierarchyTopicPolicies = 
persistentTopic.getHierarchyTopicPolicies();
+            
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10);
+            
assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 20);
+        });
+
+        // cleanup.
+        admin.topics().delete(topic, false);
+    }
+
+    @Test
+    public void testRetentionGlobalPolicyAffects() throws Exception {
+        // create topic and load it up.
+        final String namespace = myNamespace;
+        final String topic = BrokerTestUtil.newUniqueName("persistent://" + 
namespace + "/tp");
+        final TopicName topicName = TopicName.get(topic);
+        admin.topics().createNonPartitionedTopic(topic);
+        pulsarClient.newProducer().topic(topic).create().close();
+        final SystemTopicBasedTopicPoliciesService topicPoliciesService
+                = (SystemTopicBasedTopicPoliciesService) 
pulsar.getTopicPoliciesService();
+
+        // Set non-global policy of the limitation of max consumers.
+        // Set global policy of the persistence policies.
+        admin.topicPolicies(false).setMaxConsumers(topic, 10);
+        RetentionPolicies retentionPolicies = new RetentionPolicies(100, 200);
+        admin.topicPolicies(true).setRetention(topic, retentionPolicies);
+        Awaitility.await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, 
LOCAL_ONLY).join().get()
+                    .getMaxConsumerPerTopic(), 10);
+            PersistentTopic persistentTopic =
+                    (PersistentTopic) 
pulsar.getBrokerService().getTopics().get(topic).get().get();
+            HierarchyTopicPolicies hierarchyTopicPolicies = 
persistentTopic.getHierarchyTopicPolicies();
+            
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10);
+            ManagedLedgerConfig mlConfig = 
persistentTopic.getManagedLedger().getConfig();
+            assertEquals(mlConfig.getRetentionTimeMillis(), 
TimeUnit.MINUTES.toMillis(100));
+            assertEquals(mlConfig.getRetentionSizeInMB(), 200);
+        });
+        PersistencePolicies persistencePolicy = new PersistencePolicies(3, 2, 
1, 4);
+        admin.topicPolicies(true).setPersistence(topic, persistencePolicy);
+        Awaitility.await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, 
LOCAL_ONLY).join().get()
+                    .getMaxConsumerPerTopic(), 10);
+            PersistentTopic persistentTopic =
+                    (PersistentTopic) 
pulsar.getBrokerService().getTopics().get(topic).get().get();
+            HierarchyTopicPolicies hierarchyTopicPolicies = 
persistentTopic.getHierarchyTopicPolicies();
+            
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10);
+            ManagedLedgerConfig mlConfig = 
persistentTopic.getManagedLedger().getConfig();
+            assertEquals(mlConfig.getRetentionTimeMillis(), 
TimeUnit.MINUTES.toMillis(100));
+            assertEquals(mlConfig.getRetentionSizeInMB(), 200);
+            assertEquals(mlConfig.getEnsembleSize(), 3);
+            assertEquals(mlConfig.getWriteQuorumSize(), 2);
+            assertEquals(mlConfig.getAckQuorumSize(), 1);
+            assertEquals(mlConfig.getThrottleMarkDelete(), 4D);
+        });
+
+        // Reload topic and verify: retention policy of global policy affects.
+        admin.topics().unload(topic);
+        pulsarClient.newProducer().topic(topic).create().close();
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic =
+                    (PersistentTopic) 
pulsar.getBrokerService().getTopics().get(topic).get().get();
+            HierarchyTopicPolicies hierarchyTopicPolicies = 
persistentTopic.getHierarchyTopicPolicies();
+            ManagedLedgerConfig mlConfig = 
persistentTopic.getManagedLedger().getConfig();
+            
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10);
+            assertEquals(mlConfig.getRetentionTimeMillis(), 
TimeUnit.MINUTES.toMillis(100));
+            assertEquals(mlConfig.getRetentionSizeInMB(), 200);
+            assertEquals(mlConfig.getEnsembleSize(), 3);
+            assertEquals(mlConfig.getWriteQuorumSize(), 2);
+            assertEquals(mlConfig.getAckQuorumSize(), 1);
+            assertEquals(mlConfig.getThrottleMarkDelete(), 4D);
+        });
+
+        // cleanup.
+        admin.topics().delete(topic, false);
+    }
+
     @Test(timeOut = 20000)
     public void testGetSizeBasedBacklogQuotaApplied() throws Exception {
         final String topic = testTopic + UUID.randomUUID();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 0bb35f0bd9f..66ae75fcbcd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -1181,7 +1181,7 @@ public class BrokerServiceTest extends BrokerTestBase {
 
         // try to create topic which should fail as bundle is disable
         CompletableFuture<Optional<Topic>> futureResult = 
pulsar.getBrokerService()
-                .loadOrCreatePersistentTopic(topicName, true, null, null);
+                .loadOrCreatePersistentTopic(topicName, true, null);
 
         try {
             futureResult.get();
@@ -1225,7 +1225,7 @@ public class BrokerServiceTest extends BrokerTestBase {
             for (int i = 0; i < 10; i++) {
                 // try to create topic which should fail as bundle is disable
                 CompletableFuture<Optional<Topic>> futureResult = 
pulsar.getBrokerService()
-                        .loadOrCreatePersistentTopic(topicName + "_" + i, 
false, null, null);
+                        .loadOrCreatePersistentTopic(topicName + "_" + i, 
false, null);
                 loadFutures.add(futureResult);
             }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 99763bdfb59..3c7edaec44d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -1116,7 +1116,7 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
             
assertEquals(admin2.namespaces().getAutoTopicCreationAsync(ns).join().getDefaultNumPartitions(),
 2);
             // Trigger system topic __change_event's initialize.
             
pulsar2.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get("persistent://"
 + ns + "/1"),
-                    TopicPoliciesService.GetType.DEFAULT);
+                    TopicPoliciesService.GetType.LOCAL_ONLY);
         });
 
         // Create non-partitioned topic.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicInitializeDelayTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicInitializeDelayTest.java
index a563077e012..11a0c8d0bc6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicInitializeDelayTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicInitializeDelayTest.java
@@ -109,7 +109,7 @@ public class PersistentTopicInitializeDelayTest extends 
BrokerTestBase {
                     (SystemTopicBasedTopicPoliciesService) 
brokerService.getPulsar().getTopicPoliciesService();
             if 
(topicPoliciesService.getListeners().containsKey(TopicName.get(topic)) ) {
                 
brokerService.getPulsar().getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(topic),
-                        TopicPoliciesService.GetType.DEFAULT
+                        TopicPoliciesService.GetType.LOCAL_ONLY
                 ).thenAccept(optionalPolicies -> 
optionalPolicies.ifPresent(this::onUpdate));
             }
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcTest.java
index 267e8547914..91c667d1d78 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcTest.java
@@ -119,9 +119,9 @@ public class ReplicationTopicGcTest extends 
OneWayReplicatorTestBase {
                     
.getPartitionedTopicResources().partitionedTopicExists(TopicName.get(topicName)));
             // topic policies.
             
assertTrue(pulsar1.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(topicName),
-                    TopicPoliciesService.GetType.DEFAULT).get().isEmpty());
+                    TopicPoliciesService.GetType.LOCAL_ONLY).get().isEmpty());
             
assertTrue(pulsar2.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(topicName),
-                    TopicPoliciesService.GetType.DEFAULT).get().isEmpty());
+                    TopicPoliciesService.GetType.LOCAL_ONLY).get().isEmpty());
             // schema.
             
assertTrue(CollectionUtils.isEmpty(pulsar1.getSchemaStorage().getAll(schemaId).get()));
             
assertTrue(CollectionUtils.isEmpty(pulsar2.getSchemaStorage().getAll(schemaId).get()));
@@ -169,7 +169,7 @@ public class ReplicationTopicGcTest extends 
OneWayReplicatorTestBase {
                     
.getPartitionedTopicResources().partitionedTopicExists(TopicName.get(topicName)));
             // topic policies.
             
assertTrue(pulsar1.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(topicName),
-                    TopicPoliciesService.GetType.DEFAULT).get().isEmpty());
+                    TopicPoliciesService.GetType.LOCAL_ONLY).get().isEmpty());
             // schema.
             
assertTrue(CollectionUtils.isEmpty(pulsar1.getSchemaStorage().getAll(schemaId).get()));
         });
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
index d4275cdfd20..0aa8e070d31 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
@@ -44,7 +44,7 @@ public class TopicPolicyTestUtils {
 
     public static TopicPolicies getTopicPolicies(TopicPoliciesService 
topicPoliciesService, TopicName topicName)
             throws ExecutionException, InterruptedException {
-        return topicPoliciesService.getTopicPoliciesAsync(topicName, 
TopicPoliciesService.GetType.DEFAULT).get()
+        return topicPoliciesService.getTopicPoliciesAsync(topicName, 
TopicPoliciesService.GetType.LOCAL_ONLY).get()
                 .orElse(null);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 12b9b0568b7..93a66eb269d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -706,7 +706,7 @@ public class PersistentTopicTest extends BrokerTestBase {
         TopicPolicies policies = new TopicPolicies();
         policies.setRetentionPolicies(retentionPolicies);
         
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(policiesService)
-                .getTopicPoliciesAsync(TopicName.get(topic), 
TopicPoliciesService.GetType.DEFAULT);
+                .getTopicPoliciesAsync(TopicName.get(topic), 
TopicPoliciesService.GetType.LOCAL_ONLY);
         persistentTopic.onUpdate(policies);
         verify(persistentTopic, times(1)).checkPersistencePolicies();
         Awaitility.await().untilAsserted(() -> {


Reply via email to