This is an automated email from the ASF dual-hosted git repository.
Technoboy- pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new c0957c80591 [fix][broker] Merge broker offload extra configurations
(#25736)
c0957c80591 is described below
commit c0957c805916de1ab8b7599c76b5c8d4a3b13cc6
Author: Cong Zhao <[email protected]>
AuthorDate: Tue May 12 10:47:16 2026 +0800
[fix][broker] Merge broker offload extra configurations (#25736)
---
.../common/policies/data/OffloadPoliciesImpl.java | 24 +++++++----
.../common/policies/data/OffloadPoliciesTest.java | 49 ++++++++++++++++++++++
2 files changed, 65 insertions(+), 8 deletions(-)
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
index d4282408176..93f090c9ddf 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
@@ -265,13 +265,10 @@ public class OffloadPoliciesImpl implements Serializable,
OffloadPolicies {
}
}
- Map<String, String> extraConfigurations =
properties.entrySet().stream()
- .filter(entry ->
entry.getKey().toString().startsWith(EXTRA_CONFIG_PREFIX))
- .collect(Collectors.toMap(
- entry ->
entry.getKey().toString().replaceFirst(EXTRA_CONFIG_PREFIX, ""),
- entry -> entry.getValue().toString()));
-
- data.getManagedLedgerExtraConfigurations().putAll(extraConfigurations);
+ Map<String, String> extraConfigurations =
getExtraConfigurations(properties);
+ if (extraConfigurations != null) {
+
data.getManagedLedgerExtraConfigurations().putAll(extraConfigurations);
+ }
data.compatibleWithBrokerConfigFile(properties);
return data;
@@ -469,7 +466,9 @@ public class OffloadPoliciesImpl implements Serializable,
OffloadPolicies {
*/
private static Object getCompatibleValue(Properties properties, Field
field) {
Object object;
- if (field.getName().equals("managedLedgerOffloadThresholdInBytes")) {
+ if (field.getName().equals("managedLedgerExtraConfigurations")) {
+ return getExtraConfigurations(properties);
+ } else if
(field.getName().equals("managedLedgerOffloadThresholdInBytes")) {
object =
properties.getProperty("managedLedgerOffloadThresholdInBytes",
properties.getProperty(OFFLOAD_THRESHOLD_NAME_IN_CONF_FILE));
} else if
(field.getName().equals("managedLedgerOffloadDeletionLagInMillis")) {
@@ -484,6 +483,15 @@ public class OffloadPoliciesImpl implements Serializable,
OffloadPolicies {
return value((String) object, field);
}
+ private static Map<String, String> getExtraConfigurations(Properties
properties) {
+ Map<String, String> extraConfigurations =
properties.entrySet().stream()
+ .filter(entry ->
entry.getKey().toString().startsWith(EXTRA_CONFIG_PREFIX))
+ .collect(Collectors.toMap(
+ entry ->
entry.getKey().toString().replaceFirst(EXTRA_CONFIG_PREFIX, ""),
+ entry -> entry.getValue().toString()));
+ return extraConfigurations.isEmpty() ? null : extraConfigurations;
+ }
+
public static class OffloadPoliciesImplBuilder implements
OffloadPolicies.Builder {
private OffloadPoliciesImpl impl = new OffloadPoliciesImpl();
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
index 70bfa5c377d..0662be3f054 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
@@ -333,6 +333,55 @@ public class OffloadPoliciesTest {
Assert.assertNull(offloadPolicies.getS3ManagedLedgerOffloadRegion());
}
+ @Test
+ public void brokerExtraConfigMergeTest() {
+ final String bucketPrefix = "o-123/c-456";
+ Properties brokerProperties = new Properties();
+ brokerProperties.setProperty("managedLedgerOffloadDriver", "aws-s3");
+ brokerProperties.setProperty(EXTRA_CONFIG_PREFIX +
"tieredStorageBucketPrefix", bucketPrefix);
+
+ OffloadPoliciesImpl offloadPolicies =
+ OffloadPoliciesImpl.mergeConfiguration(null, null,
brokerProperties);
+
+ Assert.assertNotNull(offloadPolicies);
+ assertEquals(offloadPolicies.getManagedLedgerExtraConfigurations(),
+ Map.of("tieredStorageBucketPrefix", bucketPrefix));
+ }
+
+ @Test
+ public void higherLevelExtraConfigOverridesBrokerExtraConfigMergeTest() {
+ Properties brokerProperties = new Properties();
+ brokerProperties.setProperty("managedLedgerOffloadDriver", "aws-s3");
+ brokerProperties.setProperty(EXTRA_CONFIG_PREFIX +
"tieredStorageBucketPrefix", "broker-prefix");
+
+ OffloadPoliciesImpl topicLevelPolicies = new OffloadPoliciesImpl();
+
topicLevelPolicies.getManagedLedgerExtraConfigurations().put("tieredStorageBucketPrefix",
"topic-prefix");
+
+ OffloadPoliciesImpl offloadPolicies =
+ OffloadPoliciesImpl.mergeConfiguration(topicLevelPolicies,
null, brokerProperties);
+
+ Assert.assertNotNull(offloadPolicies);
+ assertEquals(offloadPolicies.getManagedLedgerExtraConfigurations(),
+ Map.of("tieredStorageBucketPrefix", "topic-prefix"));
+ }
+
+ @Test
+ public void
emptyHigherLevelExtraConfigOverridesBrokerExtraConfigMergeTest() {
+ Properties brokerProperties = new Properties();
+ brokerProperties.setProperty("managedLedgerOffloadDriver", "aws-s3");
+ brokerProperties.setProperty(EXTRA_CONFIG_PREFIX +
"tieredStorageBucketPrefix", "broker-prefix");
+
+ OffloadPoliciesImpl topicLevelPolicies = new OffloadPoliciesImpl();
+
topicLevelPolicies.getManagedLedgerExtraConfigurations().put("tieredStorageBucketPrefix",
"");
+
+ OffloadPoliciesImpl offloadPolicies =
+ OffloadPoliciesImpl.mergeConfiguration(topicLevelPolicies,
null, brokerProperties);
+
+ Assert.assertNotNull(offloadPolicies);
+ assertEquals(offloadPolicies.getManagedLedgerExtraConfigurations(),
+ Map.of("tieredStorageBucketPrefix", ""));
+ }
+
@Test
public void brokerPropertyCompatibleTest() {