This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new f2c43675f69 [fix] [broker] Fix nothing changed after removing dynamic 
configs (#22673)
f2c43675f69 is described below

commit f2c43675f699828e677f7d21b13f6cf1bd48dd23
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Wed May 8 21:41:22 2024 +0800

    [fix] [broker] Fix nothing changed after removing dynamic configs (#22673)
    
    (cherry picked from commit ada31a96db9aabbb071f65229be746e61f954696)
---
 .../pulsar/broker/admin/impl/BrokersBase.java      |   8 +-
 .../pulsar/broker/service/BrokerService.java       | 138 +++++++++++++++------
 .../admin/AdminApiDynamicConfigurationsTest.java   |  40 ++++++
 3 files changed, 141 insertions(+), 45 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 57650758bbc..ea793cabff5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -264,7 +264,7 @@ public class BrokersBase extends AdminResource {
             @ApiResponse(code = 403, message = "You don't have admin 
permission to get configuration")})
     public void getDynamicConfigurationName(@Suspended AsyncResponse 
asyncResponse) {
         validateSuperUserAccessAsync()
-                .thenAccept(__ -> 
asyncResponse.resume(BrokerService.getDynamicConfiguration()))
+                .thenAccept(__ -> 
asyncResponse.resume(pulsar().getBrokerService().getDynamicConfiguration()))
                 .exceptionally(ex -> {
                     LOG.error("[{}] Failed to get all dynamic configuration 
names.", clientAppId(), ex);
                     resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -297,11 +297,11 @@ public class BrokersBase extends AdminResource {
      */
     private synchronized CompletableFuture<Void> 
persistDynamicConfigurationAsync(
             String configName, String configValue) {
-        if (!BrokerService.validateDynamicConfiguration(configName, 
configValue)) {
+        if 
(!pulsar().getBrokerService().validateDynamicConfiguration(configName, 
configValue)) {
             return FutureUtil
                     .failedFuture(new 
RestException(Status.PRECONDITION_FAILED, " Invalid dynamic-config value"));
         }
-        if (BrokerService.isDynamicConfiguration(configName)) {
+        if (pulsar().getBrokerService().isDynamicConfiguration(configName)) {
             return 
dynamicConfigurationResources().setDynamicConfigurationWithCreateAsync(old -> {
                 Map<String, String> configurationMap = 
old.orElseGet(Maps::newHashMap);
                 configurationMap.put(configName, configValue);
@@ -526,7 +526,7 @@ public class BrokersBase extends AdminResource {
     }
 
     private CompletableFuture<Void> 
internalDeleteDynamicConfigurationOnMetadataAsync(String configName) {
-        if (!BrokerService.isDynamicConfiguration(configName)) {
+        if (!pulsar().getBrokerService().isDynamicConfiguration(configName)) {
             throw new RestException(Status.PRECONDITION_FAILED, " Can't update 
non-dynamic configuration");
         } else {
             return 
dynamicConfigurationResources().setDynamicConfigurationAsync(old -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2adacecb1fa..bcfe531e39f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -218,8 +218,7 @@ public class BrokerService implements Closeable {
     private final OrderedExecutor topicOrderedExecutor;
     // offline topic backlog cache
     private final ConcurrentOpenHashMap<TopicName, 
PersistentOfflineTopicStats> offlineTopicStatCache;
-    private static final ConcurrentOpenHashMap<String, ConfigField> 
dynamicConfigurationMap =
-            prepareDynamicConfigurationMap();
+    private final ConcurrentOpenHashMap<String, ConfigField> 
dynamicConfigurationMap;
     private final ConcurrentOpenHashMap<String, Consumer<?>> 
configRegisteredListeners;
 
     private final ConcurrentLinkedQueue<TopicLoadingContext> 
pendingTopicLoadingQueue;
@@ -292,6 +291,7 @@ public class BrokerService implements Closeable {
 
     public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) 
throws Exception {
         this.pulsar = pulsar;
+        this.dynamicConfigurationMap = prepareDynamicConfigurationMap();
         this.preciseTopicPublishRateLimitingEnable =
                 
pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
         this.managedLedgerFactory = pulsar.getManagedLedgerFactory();
@@ -2517,37 +2517,73 @@ public class BrokerService implements Closeable {
 
         if (dynamicConfigResources != null) {
             dynamicConfigResources.getDynamicConfigurationAsync()
-                    .thenAccept(optMap -> {
-                        if (!optMap.isPresent()) {
-                            return;
+                .thenAccept(optMap -> {
+                    // Case some dynamic configs have been removed.
+                    dynamicConfigurationMap.forEach((configKey, fieldWrapper) 
-> {
+                        boolean configRemoved = optMap.isEmpty() || 
!optMap.get().containsKey(configKey);
+                        if (fieldWrapper.lastDynamicValue != null && 
configRemoved) {
+                            configValueChanged(configKey, null);
                         }
-                        Map<String, String> data = optMap.get();
-                        data.forEach((configKey, value) -> {
-                            ConfigField configFieldWrapper = 
dynamicConfigurationMap.get(configKey);
-                            if (configFieldWrapper == null) {
-                                log.warn("{} does not exist in 
dynamicConfigurationMap, skip this config.", configKey);
-                                return;
-                            }
-                            Field configField = configFieldWrapper.field;
-                            Object newValue = 
FieldParser.value(data.get(configKey), configField);
-                            if (configField != null) {
-                                Consumer listener = 
configRegisteredListeners.get(configKey);
-                                try {
-                                    Object existingValue = 
configField.get(pulsar.getConfiguration());
-                                    configField.set(pulsar.getConfiguration(), 
newValue);
-                                    log.info("Successfully updated 
configuration {}/{}", configKey,
-                                            data.get(configKey));
-                                    if (listener != null && 
!existingValue.equals(newValue)) {
-                                        listener.accept(newValue);
-                                    }
-                                } catch (Exception e) {
-                                    log.error("Failed to update config {}/{}", 
configKey, newValue);
-                                }
-                            } else {
-                                log.error("Found non-dynamic field in 
dynamicConfigMap {}/{}", configKey, newValue);
-                            }
-                        });
                     });
+                    // Some configs have been changed.
+                    if (!optMap.isPresent()) {
+                        return;
+                    }
+                    Map<String, String> data = optMap.get();
+                    data.forEach((configKey, value) -> {
+                        configValueChanged(configKey, value);
+                    });
+                });
+        }
+    }
+
+    private void configValueChanged(String configKey, String newValueStr) {
+        ConfigField configFieldWrapper = 
dynamicConfigurationMap.get(configKey);
+        if (configFieldWrapper == null) {
+            log.warn("{} does not exist in dynamicConfigurationMap, skip this 
config.", configKey);
+            return;
+        }
+        Consumer listener = configRegisteredListeners.get(configKey);
+        try {
+            // Convert existingValue and newValue.
+            final Object existingValue;
+            final Object newValue;
+            if (configFieldWrapper.field != null) {
+                if (StringUtils.isBlank(newValueStr)) {
+                    newValue = configFieldWrapper.defaultValue;
+                } else {
+                    newValue = FieldParser.value(newValueStr, 
configFieldWrapper.field);
+                }
+                existingValue = 
configFieldWrapper.field.get(pulsar.getConfiguration());
+                configFieldWrapper.field.set(pulsar.getConfiguration(), 
newValue);
+            } else {
+                // This case only occurs when it is a customized item.
+                // Since 
https://github.com/apache/pulsar/blob/master/pip/pip-300.md has not been 
cherry-picked, this
+                // case should never occur.
+                log.error("Skip update customized dynamic configuration {}/{} 
in memory, only trigger an event"
+                        + " listeners. Since PIP-300 has net been 
cherry-picked, this case should never occur",
+                        configKey, newValueStr);
+                existingValue = configFieldWrapper.lastDynamicValue;
+                newValue = newValueStr == null ? 
configFieldWrapper.defaultValue : newValueStr;
+            }
+            // Record the latest dynamic config.
+            configFieldWrapper.lastDynamicValue = newValueStr;
+
+            if (newValueStr == null) {
+                log.info("Successfully remove the dynamic configuration {}, 
and revert to the default value",
+                        configKey);
+            } else {
+                log.info("Successfully updated configuration {}/{}", 
configKey, newValueStr);
+            }
+
+            if (listener != null && !Objects.equals(existingValue, newValue)) {
+                // So far, all config items that related to configuration 
listeners, their default value is not null.
+                // And the customized config can be null before.
+                // So call "listener.accept(null)" is okay.
+                listener.accept(newValue);
+            }
+        } catch (Exception e) {
+            log.error("Failed to update config {}", configKey, e);
         }
     }
 
@@ -2958,6 +2994,9 @@ public class BrokerService implements Closeable {
      * On notification, listener should first check if config value has been 
changed and after taking appropriate
      * action, listener should update config value with new value if it has 
been changed (so, next time listener can
      * compare values on configMap change).
+     *
+     * Note: The new value that the {@param listener} may accept could be a 
null value.
+     *
      * @param <T>
      *
      * @param configKey
@@ -3048,7 +3087,7 @@ public class BrokerService implements Closeable {
         return delayedDeliveryTrackerFactory;
     }
 
-    public static List<String> getDynamicConfiguration() {
+    public List<String> getDynamicConfiguration() {
         return dynamicConfigurationMap.keys();
     }
 
@@ -3061,27 +3100,34 @@ public class BrokerService implements Closeable {
         return configMap;
     }
 
-    public static boolean isDynamicConfiguration(String key) {
+    public boolean isDynamicConfiguration(String key) {
         return dynamicConfigurationMap.containsKey(key);
     }
 
-    public static boolean validateDynamicConfiguration(String key, String 
value) {
+    public boolean validateDynamicConfiguration(String key, String value) {
         if (dynamicConfigurationMap.containsKey(key) && 
dynamicConfigurationMap.get(key).validator != null) {
             return dynamicConfigurationMap.get(key).validator.test(value);
         }
         return true;
     }
 
-    private static ConcurrentOpenHashMap<String, ConfigField> 
prepareDynamicConfigurationMap() {
+    private ConcurrentOpenHashMap<String, ConfigField> 
prepareDynamicConfigurationMap() {
         ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap =
                 ConcurrentOpenHashMap.<String, 
ConfigField>newBuilder().build();
-        for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
-            if (field != null && 
field.isAnnotationPresent(FieldContext.class)) {
-                field.setAccessible(true);
-                if (field.getAnnotation(FieldContext.class).dynamic()) {
-                    dynamicConfigurationMap.put(field.getName(), new 
ConfigField(field));
+        try {
+            for (Field field : ServiceConfiguration.class.getDeclaredFields()) 
{
+                if (field != null && 
field.isAnnotationPresent(FieldContext.class)) {
+                    field.setAccessible(true);
+                    if (field.getAnnotation(FieldContext.class).dynamic()) {
+                        Object defaultValue = 
field.get(pulsar.getConfiguration());
+                        dynamicConfigurationMap.put(field.getName(), new 
ConfigField(field, defaultValue));
+                    }
                 }
             }
+        } catch (IllegalArgumentException | IllegalAccessException ex) {
+            // This error never occurs.
+            log.error("Failed to initialize dynamic configuration map", ex);
+            throw new RuntimeException(ex);
         }
         return dynamicConfigurationMap;
     }
@@ -3361,11 +3407,21 @@ public class BrokerService implements Closeable {
 
     private static class ConfigField {
         final Field field;
+
+        // It is the dynamic config value if set.
+        // It is null if has does not set a dynamic config, even if the value 
of "pulsar.config" is present.
+        volatile String lastDynamicValue;
+
+        // The default value of "pulsar.config", which is initialized when the 
broker is starting.
+        // After the dynamic config has been removed, revert the config to 
this default value.
+        final Object defaultValue;
+
         Predicate<String> validator;
 
-        public ConfigField(Field field) {
+        public ConfigField(Field field, Object defaultValue) {
             super();
             this.field = field;
+            this.defaultValue = defaultValue;
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDynamicConfigurationsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDynamicConfigurationsTest.java
index c9a07dc966d..d7a21e6741e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDynamicConfigurationsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDynamicConfigurationsTest.java
@@ -20,12 +20,16 @@ package org.apache.pulsar.broker.admin;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.fail;
 import java.util.Map;
 import javax.ws.rs.core.Response;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -69,4 +73,40 @@ public class AdminApiDynamicConfigurationsTest extends 
MockedPulsarServiceBaseTe
             }
         }
     }
+
+    @Test
+    public void testDeleteStringDynamicConfig() throws PulsarAdminException {
+        String syncEventTopic = BrokerTestUtil.newUniqueName(SYSTEM_NAMESPACE 
+ "/tp");
+        // The default value is null;
+        Awaitility.await().untilAsserted(() -> {
+            
assertNull(pulsar.getConfig().getConfigurationMetadataSyncEventTopic());
+        });
+        // Set dynamic config.
+        
admin.brokers().updateDynamicConfiguration("configurationMetadataSyncEventTopic",
 syncEventTopic);
+        Awaitility.await().untilAsserted(() -> {
+            
assertEquals(pulsar.getConfig().getConfigurationMetadataSyncEventTopic(), 
syncEventTopic);
+        });
+        // Remove dynamic config.
+        
admin.brokers().deleteDynamicConfiguration("configurationMetadataSyncEventTopic");
+        Awaitility.await().untilAsserted(() -> {
+            
assertNull(pulsar.getConfig().getConfigurationMetadataSyncEventTopic());
+        });
+    }
+
+    @Test
+    public void testDeleteIntDynamicConfig() throws PulsarAdminException {
+        // Record the default value;
+        int defaultValue = 
pulsar.getConfig().getMaxConcurrentTopicLoadRequest();
+        // Set dynamic config.
+        int newValue = defaultValue + 1000;
+        
admin.brokers().updateDynamicConfiguration("maxConcurrentTopicLoadRequest", 
newValue + "");
+        Awaitility.await().untilAsserted(() -> {
+            
assertEquals(pulsar.getConfig().getMaxConcurrentTopicLoadRequest(), newValue);
+        });
+        // Verify: it has been reverted to the default value.
+        
admin.brokers().deleteDynamicConfiguration("maxConcurrentTopicLoadRequest");
+        Awaitility.await().untilAsserted(() -> {
+            
assertEquals(pulsar.getConfig().getMaxConcurrentTopicLoadRequest(), 
defaultValue);
+        });
+    }
 }

Reply via email to