This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ada31a96db9 [fix] [broker] Fix nothing changed after removing dynamic 
configs (#22673)
ada31a96db9 is described below

commit ada31a96db9aabbb071f65229be746e61f954696
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Wed May 8 21:41:22 2024 +0800

    [fix] [broker] Fix nothing changed after removing dynamic configs (#22673)
---
 .../pulsar/broker/service/BrokerService.java       | 137 ++++++++++++++-------
 .../admin/AdminApiDynamicConfigurationsTest.java   |  68 ++++++++++
 2 files changed, 160 insertions(+), 45 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 6e23deaa6fa..c1b2b9e1da9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -229,8 +229,7 @@ public class BrokerService implements Closeable {
     private final OrderedExecutor topicOrderedExecutor;
     // offline topic backlog cache
     private final ConcurrentOpenHashMap<TopicName, 
PersistentOfflineTopicStats> offlineTopicStatCache;
-    private final ConcurrentOpenHashMap<String, ConfigField> 
dynamicConfigurationMap =
-            prepareDynamicConfigurationMap();
+    private final ConcurrentOpenHashMap<String, ConfigField> 
dynamicConfigurationMap;
     private final ConcurrentOpenHashMap<String, Consumer<?>> 
configRegisteredListeners;
 
     private final ConcurrentLinkedQueue<TopicLoadingContext> 
pendingTopicLoadingQueue;
@@ -313,6 +312,7 @@ public class BrokerService implements Closeable {
 
     public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) 
throws Exception {
         this.pulsar = pulsar;
+        this.dynamicConfigurationMap = prepareDynamicConfigurationMap();
         this.brokerPublishRateLimiter = new 
PublishRateLimiterImpl(pulsar.getMonotonicSnapshotClock());
         this.preciseTopicPublishRateLimitingEnable =
                 
pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
@@ -2496,40 +2496,71 @@ public class BrokerService implements Closeable {
 
         if (dynamicConfigResources != null) {
             dynamicConfigResources.getDynamicConfigurationAsync()
-                    .thenAccept(optMap -> {
-                        if (!optMap.isPresent()) {
-                            return;
+                .thenAccept(optMap -> {
+                    // Case some dynamic configs have been removed.
+                    dynamicConfigurationMap.forEach((configKey, fieldWrapper) 
-> {
+                        boolean configRemoved = optMap.isEmpty() || 
!optMap.get().containsKey(configKey);
+                        if (fieldWrapper.lastDynamicValue != null && 
configRemoved) {
+                            configValueChanged(configKey, null);
                         }
-                        Map<String, String> data = optMap.get();
-                        data.forEach((configKey, value) -> {
-                            ConfigField configFieldWrapper = 
dynamicConfigurationMap.get(configKey);
-                            if (configFieldWrapper == null) {
-                                log.warn("{} does not exist in 
dynamicConfigurationMap, skip this config.", configKey);
-                                return;
-                            }
-                            Field configField = configFieldWrapper.field;
-                            Consumer listener = 
configRegisteredListeners.get(configKey);
-                            try {
-                                final Object existingValue;
-                                final Object newValue;
-                                if (configField != null) {
-                                    newValue = 
FieldParser.value(data.get(configKey), configField);
-                                    existingValue = 
configField.get(pulsar.getConfiguration());
-                                    configField.set(pulsar.getConfiguration(), 
newValue);
-                                } else {
-                                    newValue = value;
-                                    existingValue = 
configFieldWrapper.customValue;
-                                    configFieldWrapper.customValue = newValue 
== null ? null : String.valueOf(newValue);
-                                }
-                                log.info("Successfully updated configuration 
{}/{}", configKey, data.get(configKey));
-                                if (listener != null && 
!Objects.equals(existingValue, newValue)) {
-                                    listener.accept(newValue);
-                                }
-                            } catch (Exception e) {
-                                log.error("Failed to update config {}", 
configKey, e);
-                            }
-                        });
                     });
+                    // Some configs have been changed.
+                    if (!optMap.isPresent()) {
+                        return;
+                    }
+                    Map<String, String> data = optMap.get();
+                    data.forEach((configKey, value) -> {
+                        configValueChanged(configKey, value);
+                    });
+                });
+        }
+    }
+
+    private void configValueChanged(String configKey, String newValueStr) {
+        ConfigField configFieldWrapper = 
dynamicConfigurationMap.get(configKey);
+        if (configFieldWrapper == null) {
+            log.warn("{} does not exist in dynamicConfigurationMap, skip this 
config.", configKey);
+            return;
+        }
+        Consumer listener = configRegisteredListeners.get(configKey);
+        try {
+            // Convert existingValue and newValue.
+            final Object existingValue;
+            final Object newValue;
+            if (configFieldWrapper.field != null) {
+                if (StringUtils.isBlank(newValueStr)) {
+                    newValue = configFieldWrapper.defaultValue;
+                } else {
+                    newValue = FieldParser.value(newValueStr, 
configFieldWrapper.field);
+                }
+                existingValue = 
configFieldWrapper.field.get(pulsar.getConfiguration());
+                configFieldWrapper.field.set(pulsar.getConfiguration(), 
newValue);
+            } else {
+                // This case only occurs when it is a customized item.
+                // See: 
https://github.com/apache/pulsar/blob/master/pip/pip-300.md.
+                log.info("Skip update customized dynamic configuration {}/{} 
in memory, only trigger an event"
+                        + " listeners.", configKey, newValueStr);
+                existingValue = configFieldWrapper.lastDynamicValue;
+                newValue = newValueStr == null ? 
configFieldWrapper.defaultValue : newValueStr;
+            }
+            // Record the latest dynamic config.
+            configFieldWrapper.lastDynamicValue = newValueStr;
+
+            if (newValueStr == null) {
+                log.info("Successfully remove the dynamic configuration {}, 
and revert to the default value",
+                        configKey);
+            } else {
+                log.info("Successfully updated configuration {}/{}", 
configKey, newValueStr);
+            }
+
+            if (listener != null && !Objects.equals(existingValue, newValue)) {
+                // So far, all config items that related to configuration 
listeners, their default value is not null.
+                // And the customized config can be null before.
+                // So call "listener.accept(null)" is okay.
+                listener.accept(newValue);
+            }
+        } catch (Exception e) {
+            log.error("Failed to update config {}", configKey, e);
         }
     }
 
@@ -2936,6 +2967,9 @@ public class BrokerService implements Closeable {
      * On notification, listener should first check if config value has been 
changed and after taking appropriate
      * action, listener should update config value with new value if it has 
been changed (so, next time listener can
      * compare values on configMap change).
+     *
+     * Note: The new value that the {@param listener} may accept could be a 
null value.
+     *
      * @param <T>
      *
      * @param configKey
@@ -3057,16 +3091,23 @@ public class BrokerService implements Closeable {
         return true;
     }
 
-    private static ConcurrentOpenHashMap<String, ConfigField> 
prepareDynamicConfigurationMap() {
+    private ConcurrentOpenHashMap<String, ConfigField> 
prepareDynamicConfigurationMap() {
         ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap =
                 ConcurrentOpenHashMap.<String, 
ConfigField>newBuilder().build();
-        for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
-            if (field != null && 
field.isAnnotationPresent(FieldContext.class)) {
-                field.setAccessible(true);
-                if (field.getAnnotation(FieldContext.class).dynamic()) {
-                    dynamicConfigurationMap.put(field.getName(), new 
ConfigField(field));
+        try {
+            for (Field field : ServiceConfiguration.class.getDeclaredFields()) 
{
+                if (field != null && 
field.isAnnotationPresent(FieldContext.class)) {
+                    field.setAccessible(true);
+                    if (field.getAnnotation(FieldContext.class).dynamic()) {
+                        Object defaultValue = 
field.get(pulsar.getConfiguration());
+                        dynamicConfigurationMap.put(field.getName(), new 
ConfigField(field, defaultValue));
+                    }
                 }
             }
+        } catch (IllegalArgumentException | IllegalAccessException ex) {
+            // This error never occurs.
+            log.error("Failed to initialize dynamic configuration map", ex);
+            throw new RuntimeException(ex);
         }
         return dynamicConfigurationMap;
     }
@@ -3348,19 +3389,25 @@ public class BrokerService implements Closeable {
         // field holds the pulsar dynamic configuration.
         final Field field;
 
-        // customValue holds the external dynamic configuration.
-        volatile String customValue;
+        // It is the dynamic config value if set.
+        // It is null if has does not set a dynamic config, even if the value 
of "pulsar.config" is present.
+        volatile String lastDynamicValue;
+
+        // The default value of "pulsar.config", which is initialized when the 
broker is starting.
+        // After the dynamic config has been removed, revert the config to 
this default value.
+        final Object defaultValue;
 
         Predicate<String> validator;
 
-        public ConfigField(Field field) {
+        public ConfigField(Field field, Object defaultValue) {
             super();
             this.field = field;
+            this.defaultValue = defaultValue;
         }
 
         public static ConfigField newCustomConfigField(String customValue) {
-            ConfigField configField = new ConfigField(null);
-            configField.customValue = customValue;
+            ConfigField configField = new ConfigField(null, null);
+            configField.lastDynamicValue = customValue;
             return configField;
         }
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDynamicConfigurationsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDynamicConfigurationsTest.java
index 12f231a4d2c..aa7c2d720e3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDynamicConfigurationsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDynamicConfigurationsTest.java
@@ -18,15 +18,18 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.fail;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.ws.rs.core.Response;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.awaitility.Awaitility;
@@ -107,4 +110,69 @@ public class AdminApiDynamicConfigurationsTest extends 
MockedPulsarServiceBaseTe
         allDynamicConfigurations = 
admin.brokers().getAllDynamicConfigurations();
         assertThat(allDynamicConfigurations).doesNotContainKey(key);
     }
+
+    @Test
+    public void testDeleteStringDynamicConfig() throws PulsarAdminException {
+        String syncEventTopic = BrokerTestUtil.newUniqueName(SYSTEM_NAMESPACE 
+ "/tp");
+        // The default value is null;
+        Awaitility.await().untilAsserted(() -> {
+            
assertNull(pulsar.getConfig().getConfigurationMetadataSyncEventTopic());
+        });
+        // Set dynamic config.
+        
admin.brokers().updateDynamicConfiguration("configurationMetadataSyncEventTopic",
 syncEventTopic);
+        Awaitility.await().untilAsserted(() -> {
+            
assertEquals(pulsar.getConfig().getConfigurationMetadataSyncEventTopic(), 
syncEventTopic);
+        });
+        // Remove dynamic config.
+        
admin.brokers().deleteDynamicConfiguration("configurationMetadataSyncEventTopic");
+        Awaitility.await().untilAsserted(() -> {
+            
assertNull(pulsar.getConfig().getConfigurationMetadataSyncEventTopic());
+        });
+    }
+
+    @Test
+    public void testDeleteIntDynamicConfig() throws PulsarAdminException {
+        // Record the default value;
+        int defaultValue = 
pulsar.getConfig().getMaxConcurrentTopicLoadRequest();
+        // Set dynamic config.
+        int newValue = defaultValue + 1000;
+        
admin.brokers().updateDynamicConfiguration("maxConcurrentTopicLoadRequest", 
newValue + "");
+        Awaitility.await().untilAsserted(() -> {
+            
assertEquals(pulsar.getConfig().getMaxConcurrentTopicLoadRequest(), newValue);
+        });
+        // Verify: it has been reverted to the default value.
+        
admin.brokers().deleteDynamicConfiguration("maxConcurrentTopicLoadRequest");
+        Awaitility.await().untilAsserted(() -> {
+            
assertEquals(pulsar.getConfig().getMaxConcurrentTopicLoadRequest(), 
defaultValue);
+        });
+    }
+
+    @Test
+    public void testDeleteCustomizedDynamicConfig() throws 
PulsarAdminException {
+        // Record the default value;
+        String customizedConfigName = "a123";
+        
pulsar.getBrokerService().registerCustomDynamicConfiguration(customizedConfigName,
 v -> true);
+
+        AtomicReference<Object> currentValue = new AtomicReference<>();
+        
pulsar.getBrokerService().registerConfigurationListener(customizedConfigName, v 
-> {
+            currentValue.set(v);
+        });
+
+        // The default value is null;
+        Awaitility.await().untilAsserted(() -> {
+            assertNull(currentValue.get());
+        });
+
+        // Set dynamic config.
+        admin.brokers().updateDynamicConfiguration(customizedConfigName, 
"xxx");
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(currentValue.get(), "xxx");
+        });
+
+        // Remove dynamic config.
+        admin.brokers().deleteDynamicConfiguration(customizedConfigName);
+        Awaitility.await().untilAsserted(() -> {
+            assertNull(currentValue.get());
+        });
+    }
 }

Reply via email to