This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0d5358aacd3 [fix][admin] Fix offload policy incompatible issue.
(#25149)
0d5358aacd3 is described below
commit 0d5358aacd3d2fb5959dc549470799d9728f4e64
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Jan 16 12:09:10 2026 +0800
[fix][admin] Fix offload policy incompatible issue. (#25149)
---
.../broker/admin/impl/PersistentTopicsBase.java | 6 ++--
.../pulsar/broker/admin/TopicPoliciesTest.java | 36 ++++++++++++++++++++++
2 files changed, 40 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index be40132cfbf..865212f48e5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -128,6 +128,7 @@ import
org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -1000,8 +1001,9 @@ public class PersistentTopicsBase extends AdminResource {
.thenApply(op -> {
OffloadPoliciesImpl offloadPolicies =
op.map(TopicPolicies::getOffloadPolicies).orElse(null);
if (applied) {
- OffloadPoliciesImpl namespacePolicy =
- (OffloadPoliciesImpl)
getNamespacePolicies(namespaceName).offload_policies;
+ Policies policies = getNamespacePolicies(namespaceName);
+ OffloadPoliciesImpl namespacePolicy =
(OffloadPoliciesImpl) policies.offload_policies;
+ namespacePolicy =
OffloadPoliciesImpl.oldPoliciesCompatible(namespacePolicy, policies);
offloadPolicies =
OffloadPoliciesImpl.mergeConfiguration(offloadPolicies
, namespacePolicy,
pulsar().getConfiguration().getProperties());
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index ab67388193e..3be7acad770 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -97,8 +97,10 @@ import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
@@ -4346,4 +4348,38 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit(), 0.0001);
});
}
+
+ @Test
+ public void testGetAppliedOffloadPoliciesWithLegacyNamespacePolicies()
throws Exception {
+ String topicName = testTopic + UUID.randomUUID().toString();
+ admin.topics().createPartitionedTopic(topicName, 3);
+
+ OffloadPolicies initialPolicies =
admin.topics().getOffloadPolicies(topicName, true);
+ assertNull(initialPolicies, "Applied policies should not be null");
+
+ Policies policies = admin.namespaces().getPolicies(myNamespace);
+ policies.offload_policies = null;
+ policies.offload_threshold = 1024 * 1024 * 10L; // 10MB
+
+ pulsar.getConfigurationMetadataStore().put(
+ "/admin/policies/" + myNamespace,
+
org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies),
+ java.util.Optional.empty()
+ ).join();
+
+ Policies updatedPolicies = admin.namespaces().getPolicies(myNamespace);
+ assertNull(updatedPolicies.offload_policies, "offload_policies should
be null for this test case");
+ assertEquals(updatedPolicies.offload_threshold, 1024 * 1024 * 10L);
+
+ OffloadPolicies appliedPolicies =
admin.topics().getOffloadPolicies(topicName, true);
+
+ assertNotNull(appliedPolicies, "Applied policies should not be null");
+
assertEquals(appliedPolicies.getManagedLedgerOffloadThresholdInBytes(), (Long)
(1024 * 1024 * 10L),
+ "Should inherit offload threshold from legacy namespace
policy");
+
+ OffloadPolicies offloadPolicies =
admin.topicPolicies().getOffloadPolicies(topicName, true);
+ assertNotNull(offloadPolicies, "Applied policies should not be null");
+
assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(), (Long)
(1024 * 1024 * 10L),
+ "Should inherit offload threshold from legacy namespace
policy");
+ }
}