This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f00801999358955ce5e3e45475eeba291070a1b4 Author: Jiwei Guo <[email protected]> AuthorDate: Fri Jan 16 12:09:10 2026 +0800 [fix][admin] Fix offload policy incompatible issue. (#25149) (cherry picked from commit 0d5358aacd3d2fb5959dc549470799d9728f4e64) --- .../broker/admin/impl/PersistentTopicsBase.java | 6 ++-- .../pulsar/broker/admin/TopicPoliciesTest.java | 38 +++++++++++++++++++++- 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 5fa9bbb80f2..0ccd075e7c2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -128,6 +128,7 @@ import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; +import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; @@ -1000,8 +1001,9 @@ public class PersistentTopicsBase extends AdminResource { .thenApply(op -> { OffloadPoliciesImpl offloadPolicies = op.map(TopicPolicies::getOffloadPolicies).orElse(null); if (applied) { - OffloadPoliciesImpl namespacePolicy = - (OffloadPoliciesImpl) getNamespacePolicies(namespaceName).offload_policies; + Policies policies = getNamespacePolicies(namespaceName); + OffloadPoliciesImpl namespacePolicy = (OffloadPoliciesImpl) policies.offload_policies; + namespacePolicy = OffloadPoliciesImpl.oldPoliciesCompatible(namespacePolicy, policies); offloadPolicies = OffloadPoliciesImpl.mergeConfiguration(offloadPolicies , namespacePolicy, pulsar().getConfiguration().getProperties()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 8d8e7c385b5..a5edcfa5830 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -90,8 +90,10 @@ import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; +import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SubscribeRate; @@ -1287,7 +1289,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { assertEquals(admin.topicPolicies().getDispatchRate(topic, true), brokerDispatchRate); DispatchRate namespaceDispatchRate = DispatchRate.builder() .dispatchThrottlingRateInMsg(10) - .dispatchThrottlingRateInByte(11) + .dispatchThrottlingRateInByte(11) .ratePeriodInSecond(12) .build(); @@ -4127,4 +4129,38 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { admin.topics().deletePartitionedTopic(topic, false); } } + + @Test + public void testGetAppliedOffloadPoliciesWithLegacyNamespacePolicies() throws Exception { + String topicName = testTopic + UUID.randomUUID().toString(); + admin.topics().createPartitionedTopic(topicName, 3); + + OffloadPolicies initialPolicies = admin.topics().getOffloadPolicies(topicName, true); + assertNull(initialPolicies, "Applied policies should not be null"); + + Policies policies = admin.namespaces().getPolicies(myNamespace); + policies.offload_policies = null; + policies.offload_threshold = 1024 * 1024 * 10L; // 10MB + + pulsar.getConfigurationMetadataStore().put( + "/admin/policies/" + myNamespace, + org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies), + java.util.Optional.empty() + ).join(); + + Policies updatedPolicies = admin.namespaces().getPolicies(myNamespace); + assertNull(updatedPolicies.offload_policies, "offload_policies should be null for this test case"); + assertEquals(updatedPolicies.offload_threshold, 1024 * 1024 * 10L); + + OffloadPolicies appliedPolicies = admin.topics().getOffloadPolicies(topicName, true); + + assertNotNull(appliedPolicies, "Applied policies should not be null"); + assertEquals(appliedPolicies.getManagedLedgerOffloadThresholdInBytes(), (Long) (1024 * 1024 * 10L), + "Should inherit offload threshold from legacy namespace policy"); + + OffloadPolicies offloadPolicies = admin.topicPolicies().getOffloadPolicies(topicName, true); + assertNotNull(offloadPolicies, "Applied policies should not be null"); + assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(), (Long) (1024 * 1024 * 10L), + "Should inherit offload threshold from legacy namespace policy"); + } }
