This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 46c2b74f4c9 [fix][broker]Global topic policies do not affect after
unloading topic and persistence global topic policies never affect (#24279)
46c2b74f4c9 is described below
commit 46c2b74f4c9727d6399e9a1b48b031696d8c7cfd
Author: fengyubiao <[email protected]>
AuthorDate: Fri May 16 13:35:30 2025 +0800
[fix][broker]Global topic policies do not affect after unloading topic and
persistence global topic policies never affect (#24279)
---
.../broker/admin/impl/PersistentTopicsBase.java | 5 +-
.../pulsar/broker/service/BrokerService.java | 88 ++++++++++-------
.../SystemTopicBasedTopicPoliciesService.java | 2 -
.../broker/service/TopicPoliciesService.java | 1 -
.../broker/service/persistent/PersistentTopic.java | 10 +-
.../pulsar/broker/admin/TopicPoliciesTest.java | 108 +++++++++++++++++++++
.../pulsar/broker/service/BrokerServiceTest.java | 4 +-
.../broker/service/OneWayReplicatorTest.java | 2 +-
.../PersistentTopicInitializeDelayTest.java | 2 +-
.../broker/service/ReplicationTopicGcTest.java | 6 +-
.../broker/service/TopicPolicyTestUtils.java | 2 +-
.../service/persistent/PersistentTopicTest.java | 2 +-
12 files changed, 178 insertions(+), 54 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index af18fb807f9..c666a19b61d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -514,10 +514,11 @@ public class PersistentTopicsBase extends AdminResource {
if (optionalPolicies.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
- // Query the topic-level policies only if the namespace-level
policies exist
+ // Query the topic-level policies only if the namespace-level
policies exist.
+ // Global policies does not affet Replication.
final var namespacePolicies = optionalPolicies.get();
return
pulsar().getTopicPoliciesService().getTopicPoliciesAsync(topicName,
- TopicPoliciesService.GetType.DEFAULT
+ TopicPoliciesService.GetType.LOCAL_ONLY
).thenApply(optionalTopicPolicies ->
optionalTopicPolicies.map(TopicPolicies::getReplicationClustersSet)
.orElse(namespacePolicies.replication_clusters));
});
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index ac076715daa..18929d7914d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -98,6 +98,7 @@ import
org.apache.bookkeeper.mledger.impl.NonAppendableLedgerOffloader;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import
org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
@@ -195,7 +196,6 @@ import
org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.jspecify.annotations.NonNull;
-import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1161,7 +1161,10 @@ public class BrokerService implements Closeable {
if (!exists && !createIfMissing) {
return
CompletableFuture.completedFuture(Optional.empty());
}
- return
getTopicPoliciesBypassSystemTopic(topicName).exceptionally(ex -> {
+ // The topic level policies are not needed now, but the
meaning of calling
+ // "getTopicPoliciesBypassSystemTopic" will wait for
system topic policies initialization.
+ return getTopicPoliciesBypassSystemTopic(topicName,
TopicPoliciesService.GetType.LOCAL_ONLY)
+ .exceptionally(ex -> {
final Throwable rc =
FutureUtil.unwrapCompletionException(ex);
final String errorInfo = String.format("Topic creation
encountered an exception by initialize"
+ " topic policies service. topic_name=%s
error_message=%s", topicName,
@@ -1169,10 +1172,8 @@ public class BrokerService implements Closeable {
log.error(errorInfo, rc);
throw FutureUtil.wrapToCompletionException(new
ServiceUnitNotReadyException(errorInfo));
}).thenCompose(optionalTopicPolicies -> {
- final TopicPolicies topicPolicies =
optionalTopicPolicies.orElse(null);
return topics.computeIfAbsent(topicName.toString(),
- (tpName) ->
loadOrCreatePersistentTopic(tpName, createIfMissing, properties,
- topicPolicies));
+ (tpName) ->
loadOrCreatePersistentTopic(tpName, createIfMissing, properties));
});
});
} else {
@@ -1222,12 +1223,12 @@ public class BrokerService implements Closeable {
}
}
- private CompletableFuture<Optional<TopicPolicies>>
getTopicPoliciesBypassSystemTopic(@NonNull TopicName topicName) {
+ private CompletableFuture<Optional<TopicPolicies>>
getTopicPoliciesBypassSystemTopic(@NonNull TopicName topicName,
+
TopicPoliciesService.GetType type) {
if (ExtensibleLoadManagerImpl.isInternalTopic(topicName.toString())) {
return CompletableFuture.completedFuture(Optional.empty());
}
- return
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName,
- TopicPoliciesService.GetType.DEFAULT);
+ return
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName, type);
}
public CompletableFuture<Void> deleteTopic(String topic, boolean
forceDelete) {
@@ -1645,7 +1646,7 @@ public class BrokerService implements Closeable {
* @throws RuntimeException
*/
protected CompletableFuture<Optional<Topic>>
loadOrCreatePersistentTopic(final String topic,
- boolean createIfMissing, Map<String, String> properties, @Nullable
TopicPolicies topicPolicies) {
+ boolean createIfMissing, Map<String, String> properties) {
final CompletableFuture<Optional<Topic>> topicFuture =
FutureUtil.createFutureWithTimeout(
Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()),
executor(),
() -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION);
@@ -1661,7 +1662,7 @@ public class BrokerService implements Closeable {
if (topicLoadSemaphore.tryAcquire()) {
checkOwnershipAndCreatePersistentTopic(topic,
createIfMissing, topicFuture,
- properties, topicPolicies);
+ properties);
topicFuture.handle((persistentTopic, ex) -> {
// release permit and process pending topic
topicLoadSemaphore.release();
@@ -1677,7 +1678,7 @@ public class BrokerService implements Closeable {
});
} else {
pendingTopicLoadingQueue.add(new
TopicLoadingContext(topic,
- createIfMissing, topicFuture, properties,
topicPolicies));
+ createIfMissing, topicFuture, properties));
if (log.isDebugEnabled()) {
log.debug("topic-loading for {} added into pending
queue", topic);
}
@@ -1722,7 +1723,7 @@ public class BrokerService implements Closeable {
private void checkOwnershipAndCreatePersistentTopic(final String topic,
boolean createIfMissing,
CompletableFuture<Optional<Topic>>
topicFuture,
- Map<String, String> properties,
@Nullable TopicPolicies topicPolicies) {
+ Map<String, String> properties) {
TopicName topicName = TopicName.get(topic);
pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName)
.thenAccept(isActive -> {
@@ -1736,8 +1737,8 @@ public class BrokerService implements Closeable {
}
propertiesFuture.thenAccept(finalProperties ->
//TODO add topicName in properties?
- createPersistentTopic(topic, createIfMissing,
topicFuture,
- finalProperties, topicPolicies)
+ createPersistentTopic0(topic, createIfMissing,
topicFuture,
+ finalProperties)
).exceptionally(throwable -> {
log.warn("[{}] Read topic property failed", topic,
throwable);
pulsar.getExecutor().execute(() ->
topics.remove(topic, topicFuture));
@@ -1758,17 +1759,10 @@ public class BrokerService implements Closeable {
});
}
-
@VisibleForTesting
public void createPersistentTopic0(final String topic, boolean
createIfMissing,
CompletableFuture<Optional<Topic>>
topicFuture,
Map<String, String> properties) {
- createPersistentTopic(topic, createIfMissing, topicFuture, properties,
null);
- }
-
- private void createPersistentTopic(final String topic, boolean
createIfMissing,
- CompletableFuture<Optional<Topic>>
topicFuture,
- Map<String, String> properties,
@Nullable TopicPolicies topicPolicies) {
TopicName topicName = TopicName.get(topic);
final long topicCreateTimeMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
@@ -1787,7 +1781,7 @@ public class BrokerService implements Closeable {
CompletableFuture<Void> isTopicAlreadyMigrated =
checkTopicAlreadyMigrated(topicName);
maxTopicsCheck.thenCompose(partitionedTopicMetadata ->
validateTopicConsistency(topicName))
.thenCompose(__ -> isTopicAlreadyMigrated)
- .thenCompose(__ -> getManagedLedgerConfig(topicName,
topicPolicies))
+ .thenCompose(__ -> getManagedLedgerConfig(topicName))
.thenAccept(managedLedgerConfig -> {
if (isBrokerEntryMetadataEnabled() ||
isBrokerPayloadProcessorEnabled()) {
// init managedLedger interceptor
@@ -1947,30 +1941,44 @@ public class BrokerService implements Closeable {
}
public CompletableFuture<ManagedLedgerConfig>
getManagedLedgerConfig(@NonNull TopicName topicName) {
- final CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture =
- getTopicPoliciesBypassSystemTopic(topicName);
- return topicPoliciesFuture.thenCompose(optionalTopicPolicies ->
- getManagedLedgerConfig(topicName,
optionalTopicPolicies.orElse(null)));
- }
-
- private CompletableFuture<ManagedLedgerConfig>
getManagedLedgerConfig(@NonNull TopicName topicName,
-
@Nullable TopicPolicies topicPolicies) {
requireNonNull(topicName);
NamespaceName namespace = topicName.getNamespaceObject();
ServiceConfiguration serviceConfig = pulsar.getConfiguration();
NamespaceResources nsr =
pulsar.getPulsarResources().getNamespaceResources();
LocalPoliciesResources lpr =
pulsar.getPulsarResources().getLocalPolicies();
+ final CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture =
+ getTopicPoliciesBypassSystemTopic(topicName,
TopicPoliciesService.GetType.LOCAL_ONLY);
+ final CompletableFuture<Optional<TopicPolicies>>
globalTopicPoliciesFuture =
+ getTopicPoliciesBypassSystemTopic(topicName,
TopicPoliciesService.GetType.GLOBAL_ONLY);
final CompletableFuture<Optional<Policies>> nsPolicies =
nsr.getPoliciesAsync(namespace);
final CompletableFuture<Optional<LocalPolicies>> lcPolicies =
lpr.getLocalPoliciesAsync(namespace);
- return nsPolicies.thenCombine(lcPolicies, (policies, localPolicies) ->
{
+ return topicPoliciesFuture.thenCombine(globalTopicPoliciesFuture,
(topicP, globalTopicP) -> {
+ return new ImmutablePair<>(topicP, globalTopicP);
+ }).thenCombine(nsPolicies, (topicPoliciesPair, np) -> {
+ return new ImmutablePair<>(topicPoliciesPair, np);
+ }).thenCombine(lcPolicies, (combined, localPolicies) -> {
+ Optional<TopicPolicies> topicP = combined.getLeft().getLeft();
+ Optional<TopicPolicies> globalTopicP =
combined.getLeft().getRight();
+ Optional<Policies> policies = combined.getRight();
+
PersistencePolicies persistencePolicies = null;
RetentionPolicies retentionPolicies = null;
OffloadPoliciesImpl topicLevelOffloadPolicies = null;
- if (topicPolicies != null) {
- persistencePolicies = topicPolicies.getPersistence();
- retentionPolicies = topicPolicies.getRetentionPolicies();
- topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
+ if (topicP.isPresent() && topicP.get().getPersistence() != null) {
+ persistencePolicies = topicP.get().getPersistence();
+ } else if (globalTopicP.isPresent() &&
globalTopicP.get().getPersistence() != null) {
+ persistencePolicies = globalTopicP.get().getPersistence();
+ }
+ if (topicP.isPresent() && topicP.get().getRetentionPolicies() !=
null) {
+ retentionPolicies = topicP.get().getRetentionPolicies();
+ } else if (globalTopicP.isPresent() &&
globalTopicP.get().getRetentionPolicies() != null) {
+ retentionPolicies = globalTopicP.get().getRetentionPolicies();
+ }
+ if (topicP.isPresent() && topicP.get().getOffloadPolicies() !=
null) {
+ topicLevelOffloadPolicies = topicP.get().getOffloadPolicies();
+ } else if (globalTopicP.isPresent() &&
globalTopicP.get().getOffloadPolicies() != null) {
+ topicLevelOffloadPolicies =
globalTopicP.get().getOffloadPolicies();
}
if (persistencePolicies == null) {
@@ -2123,6 +2131,13 @@ public class BrokerService implements Closeable {
managedLedgerConfig.setNewEntriesCheckDelayInMillis(
serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
return managedLedgerConfig;
+ }).exceptionally(ex -> {
+ final Throwable rc = FutureUtil.unwrapCompletionException(ex);
+ final String errorInfo = String.format("Topic creation encountered
an exception by initialize"
+ + " topic policies service. topic_name=%s
error_message=%s", topicName,
+ rc.getMessage());
+ log.error(errorInfo, rc);
+ throw FutureUtil.wrapToCompletionException(new
ServiceUnitNotReadyException(errorInfo));
});
}
@@ -3219,7 +3234,7 @@ public class BrokerService implements Closeable {
checkOwnershipAndCreatePersistentTopic(topic,
pendingTopic.isCreateIfMissing(),
pendingFuture,
- pendingTopic.getProperties(),
pendingTopic.getTopicPolicies());
+ pendingTopic.getProperties());
pendingFuture.handle((persistentTopic, ex) -> {
// release permit and process next pending topic
if (acquiredPermit) {
@@ -3777,6 +3792,5 @@ public class BrokerService implements Closeable {
private final boolean createIfMissing;
private final CompletableFuture<Optional<Topic>> topicFuture;
private final Map<String, String> properties;
- private final TopicPolicies topicPolicies;
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 2000c9a3099..61960311e03 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -296,8 +296,6 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
if (!inserted || existingFuture != null) {
final var partitionedTopicName =
TopicName.get(topicName.getPartitionedTopicName());
final var policies = Optional.ofNullable(switch (type) {
- case DEFAULT ->
Optional.ofNullable(policiesCache.get(partitionedTopicName))
- .orElseGet(() ->
globalPoliciesCache.get(partitionedTopicName));
case GLOBAL_ONLY ->
globalPoliciesCache.get(partitionedTopicName);
case LOCAL_ONLY ->
policiesCache.get(partitionedTopicName);
});
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index 9b5d9a28ac2..4b7ed3765bb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -55,7 +55,6 @@ public interface TopicPoliciesService extends AutoCloseable {
* It controls the behavior of {@link
TopicPoliciesService#getTopicPoliciesAsync}.
*/
enum GetType {
- DEFAULT, // try getting the local topic policies, if not present, then
get the global policies
GLOBAL_ONLY, // only get the global policies
LOCAL_ONLY, // only get the local policies
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 89b9dfe0e3e..36344bdce28 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -4421,9 +4421,13 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return CompletableFuture.completedFuture(null);
}
return
topicPoliciesService.getTopicPoliciesAsync(partitionedTopicName,
- TopicPoliciesService.GetType.DEFAULT
- ).thenAcceptAsync(optionalPolicies ->
optionalPolicies.ifPresent(this::onUpdate),
- brokerService.getTopicOrderedExecutor());
+ TopicPoliciesService.GetType.GLOBAL_ONLY)
+ .thenAcceptAsync(optionalPolicies ->
optionalPolicies.ifPresent(this::onUpdate),
+ brokerService.getTopicOrderedExecutor())
+ .thenCompose(__ ->
topicPoliciesService.getTopicPoliciesAsync(partitionedTopicName,
+ TopicPoliciesService.GetType.LOCAL_ONLY))
+ .thenAcceptAsync(optionalPolicies ->
optionalPolicies.ifPresent(this::onUpdate),
+ brokerService.getTopicOrderedExecutor());
}
return CompletableFuture.completedFuture(null);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 7ce10b94b72..c7a064e831d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -611,6 +611,114 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
admin.topics().delete(topic, false);
}
+ @Test
+ public void testGlobalPolicyStillAffectsAfterUnloading() throws Exception {
+ // create topic and load it up.
+ final String namespace = myNamespace;
+ final String topic = BrokerTestUtil.newUniqueName("persistent://" +
namespace + "/tp");
+ final TopicName topicName = TopicName.get(topic);
+ admin.topics().createNonPartitionedTopic(topic);
+ pulsarClient.newProducer().topic(topic).create().close();
+ final SystemTopicBasedTopicPoliciesService topicPoliciesService
+ = (SystemTopicBasedTopicPoliciesService)
pulsar.getTopicPoliciesService();
+
+ // Set non-global policy of the limitation of max consumers.
+ // Set global policy of the limitation of max producers.
+ admin.topicPolicies(false).setMaxConsumers(topic, 10);
+ admin.topicPolicies(true).setMaxProducers(topic, 20);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName,
LOCAL_ONLY).join().get()
+ .getMaxConsumerPerTopic(), 10);
+ assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName,
GLOBAL_ONLY).join().get()
+ .getMaxProducerPerTopic(), 20);
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopics().get(topic).get().get();
+ HierarchyTopicPolicies hierarchyTopicPolicies =
persistentTopic.getHierarchyTopicPolicies();
+
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10);
+
assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 20);
+ });
+
+ // Reload topic and verify: both global policy and non-global policy
affect.
+ admin.topics().unload(topic);
+ pulsarClient.newProducer().topic(topic).create().close();
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopics().get(topic).get().get();
+ HierarchyTopicPolicies hierarchyTopicPolicies =
persistentTopic.getHierarchyTopicPolicies();
+
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10);
+
assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 20);
+ });
+
+ // cleanup.
+ admin.topics().delete(topic, false);
+ }
+
+ @Test
+ public void testRetentionGlobalPolicyAffects() throws Exception {
+ // create topic and load it up.
+ final String namespace = myNamespace;
+ final String topic = BrokerTestUtil.newUniqueName("persistent://" +
namespace + "/tp");
+ final TopicName topicName = TopicName.get(topic);
+ admin.topics().createNonPartitionedTopic(topic);
+ pulsarClient.newProducer().topic(topic).create().close();
+ final SystemTopicBasedTopicPoliciesService topicPoliciesService
+ = (SystemTopicBasedTopicPoliciesService)
pulsar.getTopicPoliciesService();
+
+ // Set non-global policy of the limitation of max consumers.
+ // Set global policy of the persistence policies.
+ admin.topicPolicies(false).setMaxConsumers(topic, 10);
+ RetentionPolicies retentionPolicies = new RetentionPolicies(100, 200);
+ admin.topicPolicies(true).setRetention(topic, retentionPolicies);
+ Awaitility.await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName,
LOCAL_ONLY).join().get()
+ .getMaxConsumerPerTopic(), 10);
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopics().get(topic).get().get();
+ HierarchyTopicPolicies hierarchyTopicPolicies =
persistentTopic.getHierarchyTopicPolicies();
+
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10);
+ ManagedLedgerConfig mlConfig =
persistentTopic.getManagedLedger().getConfig();
+ assertEquals(mlConfig.getRetentionTimeMillis(),
TimeUnit.MINUTES.toMillis(100));
+ assertEquals(mlConfig.getRetentionSizeInMB(), 200);
+ });
+ PersistencePolicies persistencePolicy = new PersistencePolicies(3, 2,
1, 4);
+ admin.topicPolicies(true).setPersistence(topic, persistencePolicy);
+ Awaitility.await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName,
LOCAL_ONLY).join().get()
+ .getMaxConsumerPerTopic(), 10);
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopics().get(topic).get().get();
+ HierarchyTopicPolicies hierarchyTopicPolicies =
persistentTopic.getHierarchyTopicPolicies();
+
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10);
+ ManagedLedgerConfig mlConfig =
persistentTopic.getManagedLedger().getConfig();
+ assertEquals(mlConfig.getRetentionTimeMillis(),
TimeUnit.MINUTES.toMillis(100));
+ assertEquals(mlConfig.getRetentionSizeInMB(), 200);
+ assertEquals(mlConfig.getEnsembleSize(), 3);
+ assertEquals(mlConfig.getWriteQuorumSize(), 2);
+ assertEquals(mlConfig.getAckQuorumSize(), 1);
+ assertEquals(mlConfig.getThrottleMarkDelete(), 4D);
+ });
+
+ // Reload topic and verify: retention policy of global policy affects.
+ admin.topics().unload(topic);
+ pulsarClient.newProducer().topic(topic).create().close();
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopics().get(topic).get().get();
+ HierarchyTopicPolicies hierarchyTopicPolicies =
persistentTopic.getHierarchyTopicPolicies();
+ ManagedLedgerConfig mlConfig =
persistentTopic.getManagedLedger().getConfig();
+
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10);
+ assertEquals(mlConfig.getRetentionTimeMillis(),
TimeUnit.MINUTES.toMillis(100));
+ assertEquals(mlConfig.getRetentionSizeInMB(), 200);
+ assertEquals(mlConfig.getEnsembleSize(), 3);
+ assertEquals(mlConfig.getWriteQuorumSize(), 2);
+ assertEquals(mlConfig.getAckQuorumSize(), 1);
+ assertEquals(mlConfig.getThrottleMarkDelete(), 4D);
+ });
+
+ // cleanup.
+ admin.topics().delete(topic, false);
+ }
+
@Test(timeOut = 20000)
public void testGetSizeBasedBacklogQuotaApplied() throws Exception {
final String topic = testTopic + UUID.randomUUID();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 0bb35f0bd9f..66ae75fcbcd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -1181,7 +1181,7 @@ public class BrokerServiceTest extends BrokerTestBase {
// try to create topic which should fail as bundle is disable
CompletableFuture<Optional<Topic>> futureResult =
pulsar.getBrokerService()
- .loadOrCreatePersistentTopic(topicName, true, null, null);
+ .loadOrCreatePersistentTopic(topicName, true, null);
try {
futureResult.get();
@@ -1225,7 +1225,7 @@ public class BrokerServiceTest extends BrokerTestBase {
for (int i = 0; i < 10; i++) {
// try to create topic which should fail as bundle is disable
CompletableFuture<Optional<Topic>> futureResult =
pulsar.getBrokerService()
- .loadOrCreatePersistentTopic(topicName + "_" + i,
false, null, null);
+ .loadOrCreatePersistentTopic(topicName + "_" + i,
false, null);
loadFutures.add(futureResult);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 99763bdfb59..3c7edaec44d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -1116,7 +1116,7 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
assertEquals(admin2.namespaces().getAutoTopicCreationAsync(ns).join().getDefaultNumPartitions(),
2);
// Trigger system topic __change_event's initialize.
pulsar2.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get("persistent://"
+ ns + "/1"),
- TopicPoliciesService.GetType.DEFAULT);
+ TopicPoliciesService.GetType.LOCAL_ONLY);
});
// Create non-partitioned topic.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicInitializeDelayTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicInitializeDelayTest.java
index a563077e012..11a0c8d0bc6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicInitializeDelayTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicInitializeDelayTest.java
@@ -109,7 +109,7 @@ public class PersistentTopicInitializeDelayTest extends
BrokerTestBase {
(SystemTopicBasedTopicPoliciesService)
brokerService.getPulsar().getTopicPoliciesService();
if
(topicPoliciesService.getListeners().containsKey(TopicName.get(topic)) ) {
brokerService.getPulsar().getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(topic),
- TopicPoliciesService.GetType.DEFAULT
+ TopicPoliciesService.GetType.LOCAL_ONLY
).thenAccept(optionalPolicies ->
optionalPolicies.ifPresent(this::onUpdate));
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcTest.java
index 267e8547914..91c667d1d78 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcTest.java
@@ -119,9 +119,9 @@ public class ReplicationTopicGcTest extends
OneWayReplicatorTestBase {
.getPartitionedTopicResources().partitionedTopicExists(TopicName.get(topicName)));
// topic policies.
assertTrue(pulsar1.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(topicName),
- TopicPoliciesService.GetType.DEFAULT).get().isEmpty());
+ TopicPoliciesService.GetType.LOCAL_ONLY).get().isEmpty());
assertTrue(pulsar2.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(topicName),
- TopicPoliciesService.GetType.DEFAULT).get().isEmpty());
+ TopicPoliciesService.GetType.LOCAL_ONLY).get().isEmpty());
// schema.
assertTrue(CollectionUtils.isEmpty(pulsar1.getSchemaStorage().getAll(schemaId).get()));
assertTrue(CollectionUtils.isEmpty(pulsar2.getSchemaStorage().getAll(schemaId).get()));
@@ -169,7 +169,7 @@ public class ReplicationTopicGcTest extends
OneWayReplicatorTestBase {
.getPartitionedTopicResources().partitionedTopicExists(TopicName.get(topicName)));
// topic policies.
assertTrue(pulsar1.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(topicName),
- TopicPoliciesService.GetType.DEFAULT).get().isEmpty());
+ TopicPoliciesService.GetType.LOCAL_ONLY).get().isEmpty());
// schema.
assertTrue(CollectionUtils.isEmpty(pulsar1.getSchemaStorage().getAll(schemaId).get()));
});
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
index d4275cdfd20..0aa8e070d31 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
@@ -44,7 +44,7 @@ public class TopicPolicyTestUtils {
public static TopicPolicies getTopicPolicies(TopicPoliciesService
topicPoliciesService, TopicName topicName)
throws ExecutionException, InterruptedException {
- return topicPoliciesService.getTopicPoliciesAsync(topicName,
TopicPoliciesService.GetType.DEFAULT).get()
+ return topicPoliciesService.getTopicPoliciesAsync(topicName,
TopicPoliciesService.GetType.LOCAL_ONLY).get()
.orElse(null);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 12b9b0568b7..93a66eb269d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -706,7 +706,7 @@ public class PersistentTopicTest extends BrokerTestBase {
TopicPolicies policies = new TopicPolicies();
policies.setRetentionPolicies(retentionPolicies);
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(policiesService)
- .getTopicPoliciesAsync(TopicName.get(topic),
TopicPoliciesService.GetType.DEFAULT);
+ .getTopicPoliciesAsync(TopicName.get(topic),
TopicPoliciesService.GetType.LOCAL_ONLY);
persistentTopic.onUpdate(policies);
verify(persistentTopic, times(1)).checkPersistencePolicies();
Awaitility.await().untilAsserted(() -> {