This is an automated email from the ASF dual-hosted git repository.

nodece pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bb5295715be [fix][broker] Merge broker offload extra configurations 
(#25736)
bb5295715be is described below

commit bb5295715be9bca6777f783452d19f3fe89dbcb5
Author: Cong Zhao <[email protected]>
AuthorDate: Tue May 12 10:47:16 2026 +0800

    [fix][broker] Merge broker offload extra configurations (#25736)
---
 .../common/policies/data/OffloadPoliciesImpl.java  | 24 +++++++----
 .../common/policies/data/OffloadPoliciesTest.java  | 49 ++++++++++++++++++++++
 2 files changed, 65 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
index f1c687bb97d..ce1c369897f 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
@@ -265,13 +265,10 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
             }
         }
 
-        Map<String, String> extraConfigurations = 
properties.entrySet().stream()
-                .filter(entry -> 
entry.getKey().toString().startsWith(EXTRA_CONFIG_PREFIX))
-                .collect(Collectors.toMap(
-                        entry -> 
entry.getKey().toString().replaceFirst(EXTRA_CONFIG_PREFIX, ""),
-                        entry -> entry.getValue().toString()));
-
-        data.getManagedLedgerExtraConfigurations().putAll(extraConfigurations);
+        Map<String, String> extraConfigurations = 
getExtraConfigurations(properties);
+        if (extraConfigurations != null) {
+            
data.getManagedLedgerExtraConfigurations().putAll(extraConfigurations);
+        }
 
         data.compatibleWithBrokerConfigFile(properties);
         return data;
@@ -470,7 +467,9 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
      */
     private static Object getCompatibleValue(Properties properties, Field 
field) {
         Object object;
-        if (field.getName().equals("managedLedgerOffloadThresholdInBytes")) {
+        if (field.getName().equals("managedLedgerExtraConfigurations")) {
+            return getExtraConfigurations(properties);
+        } else if 
(field.getName().equals("managedLedgerOffloadThresholdInBytes")) {
             object = 
properties.getProperty("managedLedgerOffloadThresholdInBytes",
                     
properties.getProperty(OFFLOAD_THRESHOLD_NAME_IN_CONF_FILE));
         } else if 
(field.getName().equals("managedLedgerOffloadDeletionLagInMillis")) {
@@ -485,6 +484,15 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
         return value((String) object, field);
     }
 
+    private static Map<String, String> getExtraConfigurations(Properties 
properties) {
+        Map<String, String> extraConfigurations = 
properties.entrySet().stream()
+                .filter(entry -> 
entry.getKey().toString().startsWith(EXTRA_CONFIG_PREFIX))
+                .collect(Collectors.toMap(
+                        entry -> 
entry.getKey().toString().replaceFirst(EXTRA_CONFIG_PREFIX, ""),
+                        entry -> entry.getValue().toString()));
+        return extraConfigurations.isEmpty() ? null : extraConfigurations;
+    }
+
     public static class OffloadPoliciesImplBuilder implements 
OffloadPolicies.Builder {
         private OffloadPoliciesImpl impl = new OffloadPoliciesImpl();
 
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
index f4ef0722254..a8216e8f8a9 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
@@ -333,6 +333,55 @@ public class OffloadPoliciesTest {
         Assert.assertNull(offloadPolicies.getS3ManagedLedgerOffloadRegion());
     }
 
+    @Test
+    public void brokerExtraConfigMergeTest() {
+        final String bucketPrefix = "o-123/c-456";
+        Properties brokerProperties = new Properties();
+        brokerProperties.setProperty("managedLedgerOffloadDriver", "aws-s3");
+        brokerProperties.setProperty(EXTRA_CONFIG_PREFIX + 
"tieredStorageBucketPrefix", bucketPrefix);
+
+        OffloadPoliciesImpl offloadPolicies =
+                OffloadPoliciesImpl.mergeConfiguration(null, null, 
brokerProperties);
+
+        Assert.assertNotNull(offloadPolicies);
+        assertEquals(offloadPolicies.getManagedLedgerExtraConfigurations(),
+                Map.of("tieredStorageBucketPrefix", bucketPrefix));
+    }
+
+    @Test
+    public void higherLevelExtraConfigOverridesBrokerExtraConfigMergeTest() {
+        Properties brokerProperties = new Properties();
+        brokerProperties.setProperty("managedLedgerOffloadDriver", "aws-s3");
+        brokerProperties.setProperty(EXTRA_CONFIG_PREFIX + 
"tieredStorageBucketPrefix", "broker-prefix");
+
+        OffloadPoliciesImpl topicLevelPolicies = new OffloadPoliciesImpl();
+        
topicLevelPolicies.getManagedLedgerExtraConfigurations().put("tieredStorageBucketPrefix",
 "topic-prefix");
+
+        OffloadPoliciesImpl offloadPolicies =
+                OffloadPoliciesImpl.mergeConfiguration(topicLevelPolicies, 
null, brokerProperties);
+
+        Assert.assertNotNull(offloadPolicies);
+        assertEquals(offloadPolicies.getManagedLedgerExtraConfigurations(),
+                Map.of("tieredStorageBucketPrefix", "topic-prefix"));
+    }
+
+    @Test
+    public void 
emptyHigherLevelExtraConfigOverridesBrokerExtraConfigMergeTest() {
+        Properties brokerProperties = new Properties();
+        brokerProperties.setProperty("managedLedgerOffloadDriver", "aws-s3");
+        brokerProperties.setProperty(EXTRA_CONFIG_PREFIX + 
"tieredStorageBucketPrefix", "broker-prefix");
+
+        OffloadPoliciesImpl topicLevelPolicies = new OffloadPoliciesImpl();
+        
topicLevelPolicies.getManagedLedgerExtraConfigurations().put("tieredStorageBucketPrefix",
 "");
+
+        OffloadPoliciesImpl offloadPolicies =
+                OffloadPoliciesImpl.mergeConfiguration(topicLevelPolicies, 
null, brokerProperties);
+
+        Assert.assertNotNull(offloadPolicies);
+        assertEquals(offloadPolicies.getManagedLedgerExtraConfigurations(),
+                Map.of("tieredStorageBucketPrefix", ""));
+    }
+
 
     @Test
     public void brokerPropertyCompatibleTest() {

Reply via email to