This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 42fce1a1c499f8ae351b59a9986e7ded2e15b998
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Jan 16 12:09:10 2026 +0800

    [fix][admin] Fix offload policy incompatible issue. (#25149)
    
    (cherry picked from commit 0d5358aacd3d2fb5959dc549470799d9728f4e64)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  6 ++--
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 36 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index be40132cfbf..865212f48e5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -128,6 +128,7 @@ import 
org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -1000,8 +1001,9 @@ public class PersistentTopicsBase extends AdminResource {
             .thenApply(op -> {
                 OffloadPoliciesImpl offloadPolicies = 
op.map(TopicPolicies::getOffloadPolicies).orElse(null);
                 if (applied) {
-                    OffloadPoliciesImpl namespacePolicy =
-                            (OffloadPoliciesImpl) 
getNamespacePolicies(namespaceName).offload_policies;
+                    Policies policies = getNamespacePolicies(namespaceName);
+                    OffloadPoliciesImpl namespacePolicy = 
(OffloadPoliciesImpl) policies.offload_policies;
+                    namespacePolicy = 
OffloadPoliciesImpl.oldPoliciesCompatible(namespacePolicy, policies);
                     offloadPolicies = 
OffloadPoliciesImpl.mergeConfiguration(offloadPolicies
                             , namespacePolicy, 
pulsar().getConfiguration().getProperties());
                 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 6c690e267b0..a0b54cdb2ff 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -92,8 +92,10 @@ import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
@@ -4321,4 +4323,38 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
                 
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit(), 0.0001);
         });
     }
+
+    @Test
+    public void testGetAppliedOffloadPoliciesWithLegacyNamespacePolicies() 
throws Exception {
+        String topicName = testTopic + UUID.randomUUID().toString();
+        admin.topics().createPartitionedTopic(topicName, 3);
+
+        OffloadPolicies initialPolicies = 
admin.topics().getOffloadPolicies(topicName, true);
+        assertNull(initialPolicies, "Applied policies should not be null");
+
+        Policies policies = admin.namespaces().getPolicies(myNamespace);
+        policies.offload_policies = null;
+        policies.offload_threshold = 1024 * 1024 * 10L; // 10MB
+
+        pulsar.getConfigurationMetadataStore().put(
+                "/admin/policies/" + myNamespace,
+                
org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies),
+                java.util.Optional.empty()
+        ).join();
+
+        Policies updatedPolicies = admin.namespaces().getPolicies(myNamespace);
+        assertNull(updatedPolicies.offload_policies, "offload_policies should 
be null for this test case");
+        assertEquals(updatedPolicies.offload_threshold, 1024 * 1024 * 10L);
+
+        OffloadPolicies appliedPolicies = 
admin.topics().getOffloadPolicies(topicName, true);
+
+        assertNotNull(appliedPolicies, "Applied policies should not be null");
+        
assertEquals(appliedPolicies.getManagedLedgerOffloadThresholdInBytes(), (Long) 
(1024 * 1024 * 10L),
+                "Should inherit offload threshold from legacy namespace 
policy");
+
+        OffloadPolicies offloadPolicies = 
admin.topicPolicies().getOffloadPolicies(topicName, true);
+        assertNotNull(offloadPolicies, "Applied policies should not be null");
+        
assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(), (Long) 
(1024 * 1024 * 10L),
+                "Should inherit offload threshold from legacy namespace 
policy");
+    }
 }

Reply via email to