This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch dev-metadata
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git

commit ca057cb003bd6ee89bf4022bbf678463dd8c6419
Author: ken.lj <ken.lj...@gmail.com>
AuthorDate: Sun Sep 9 17:04:26 2018 +0800

    Refactor dynamic config,
    1. Extract common method to AbstractDynamicConfiguration
    2. Unify strategy when config server cannot be reached at startup: start 
using local snapshot and try to connect in background.
---
 .../dynamic/AbstractDynamicConfiguration.java      | 35 ++++++++++++++-----
 .../dubbo/config/dynamic/DynamicConfiguration.java |  4 +--
 .../support/apollo/ApolloDynamicConfiguration.java | 34 ++++++++++--------
 .../archaius/ArchaiusDynamicConfiguration.java     | 40 +++++++++-------------
 .../sources/ZooKeeperConfigurationSource.java      | 26 ++++++++++----
 5 files changed, 85 insertions(+), 54 deletions(-)

diff --git 
a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/AbstractDynamicConfiguration.java
 
b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/AbstractDynamicConfiguration.java
index 84129d6..e459bcc 100644
--- 
a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/AbstractDynamicConfiguration.java
+++ 
b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/AbstractDynamicConfiguration.java
@@ -18,36 +18,51 @@ package org.apache.dubbo.config.dynamic;
 
 import org.apache.dubbo.common.URL;
 
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
 /**
  *
  */
-public abstract class AbstractDynamicConfiguration implements 
DynamicConfiguration {
+public abstract class AbstractDynamicConfiguration<TargetConfigListener> 
implements DynamicConfiguration {
     protected URL url;
+    /**
+     * One key can register multiple target listeners, but one target listener 
only maps to one configuration listener
+     */
+    private ConcurrentMap<String, ConcurrentMap<ConfigurationListener, 
TargetConfigListener>> listenerToTargetListenerMap;
 
     public AbstractDynamicConfiguration() {
     }
 
     @Override
     public void addListener(String key, ConfigurationListener listener) {
-
+        ConcurrentMap<ConfigurationListener, TargetConfigListener> listeners = 
listenerToTargetListenerMap.computeIfAbsent(key, k -> new 
ConcurrentHashMap<>());
+        TargetConfigListener targetListener = 
listeners.computeIfAbsent(listener, k -> createTargetConfigListener(key, 
listener));
+        addTargetListener(key, targetListener);
     }
 
     @Override
     public String getConfig(String key, String group) {
-        return null;
+        return getConfig(key, group, null);
     }
 
     @Override
     public String getConfig(String key, String group, ConfigurationListener 
listener) {
-        return null;
+        return getConfig(key, group, 0l, listener);
     }
 
     @Override
-    public String getConfig(String key, String group, long timeout) {
-        return null;
+    public String getConfig(String key, String group, long timeout, 
ConfigurationListener listener) {
+        try {
+            if (listener != null) {
+                this.addListener(key, listener);
+            }
+            return getInternalProperty(key, group, timeout, listener);
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
     }
 
-
     public URL getUrl() {
         return url;
     }
@@ -56,6 +71,10 @@ public abstract class AbstractDynamicConfiguration 
implements DynamicConfigurati
         this.url = url;
     }
 
-    protected abstract String getInternalProperty(String key, String group, 
long timeout);
+    protected abstract String getInternalProperty(String key, String group, 
long timeout, ConfigurationListener listener);
+
+    protected abstract void addTargetListener(String key, TargetConfigListener 
listener);
+
+    protected abstract TargetConfigListener createTargetConfigListener(String 
key, ConfigurationListener listener);
 
 }
diff --git 
a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/DynamicConfiguration.java
 
b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/DynamicConfiguration.java
index 8d05aca..799f670 100644
--- 
a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/DynamicConfiguration.java
+++ 
b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/DynamicConfiguration.java
@@ -25,7 +25,7 @@ import org.apache.dubbo.common.extension.SPI;
 @SPI("zookeeper")
 public interface DynamicConfiguration {
 
-    public void init();
+    void init();
 
     URL getUrl();
 
@@ -35,7 +35,7 @@ public interface DynamicConfiguration {
 
     String getConfig(String key, String group);
 
-    String getConfig(String key, String group, long timeout);
+    String getConfig(String key, String group, long timeout, 
ConfigurationListener listener);
 
     String getConfig(String key, String group, ConfigurationListener listener);
 
diff --git 
a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/apollo/ApolloDynamicConfiguration.java
 
b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/apollo/ApolloDynamicConfiguration.java
index 7b718f4..489bf47 100644
--- 
a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/apollo/ApolloDynamicConfiguration.java
+++ 
b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/apollo/ApolloDynamicConfiguration.java
@@ -36,10 +36,11 @@ import java.util.Set;
 /**
  *
  */
-public class ApolloDynamicConfiguration extends AbstractDynamicConfiguration {
+public class ApolloDynamicConfiguration extends 
AbstractDynamicConfiguration<ConfigChangeListener> {
     private static final String APOLLO_ENV_KEY = "env";
     private static final String APOLLO_ADDR_KEY = "apollo.meta";
     private static final String APOLLO_CLUSTER_KEY = "apollo.cluster";
+    private static final String APPLO_DEFAULT_NAMESPACE = "dubbo";
     /**
      * support two namespaces: application -> dubbo
      */
@@ -70,7 +71,7 @@ public class ApolloDynamicConfiguration extends 
AbstractDynamicConfiguration {
             System.setProperty(APOLLO_CLUSTER_KEY, configCluster);
         }
 
-        dubboConfig = ConfigService.getConfig("dubbo");
+        dubboConfig = 
ConfigService.getConfig(url.getParameter(Constants.CONFIG_NAMESPACE_KEY, 
APPLO_DEFAULT_NAMESPACE));
         appConfig = ConfigService.getAppConfig();
     }
 
@@ -83,16 +84,8 @@ public class ApolloDynamicConfiguration extends 
AbstractDynamicConfiguration {
     }
 
     @Override
-    public String getConfig(String key, String group, ConfigurationListener 
listener) {
-        Set<String> keys = new HashSet<>(1);
-        keys.add(key);
-        this.appConfig.addChangeListener(new ApolloListener(listener), keys);
-        this.dubboConfig.addChangeListener(new ApolloListener(listener), keys);
-        return getInternalProperty(key, group, 0L);
-    }
-
-    @Override
-    protected String getInternalProperty(String key, String group, long 
timeout) {
+    protected String getInternalProperty(String key, String group, long 
timeout, ConfigurationListener listener) {
+        // FIXME According to Apollo, if it fails to get a value from one 
namespace, it will keep logging warning msg. They are working to improve it.
         String value = appConfig.getProperty(key, null);
         if (value == null) {
             value = dubboConfig.getProperty(key, null);
@@ -101,6 +94,19 @@ public class ApolloDynamicConfiguration extends 
AbstractDynamicConfiguration {
         return value;
     }
 
+    @Override
+    protected void addTargetListener(String key, ConfigChangeListener 
listener) {
+        Set<String> keys = new HashSet<>(1);
+        keys.add(key);
+        this.appConfig.addChangeListener(listener, keys);
+        this.dubboConfig.addChangeListener(listener, keys);
+    }
+
+    @Override
+    protected ConfigChangeListener createTargetConfigListener(String key, 
ConfigurationListener listener) {
+        return new ApolloListener(listener);
+    }
+
     public ConfigChangeType getChangeType(PropertyChangeType changeType) {
         if (changeType.equals(PropertyChangeType.DELETED)) {
             return ConfigChangeType.DELETED;
@@ -121,12 +127,12 @@ public class ApolloDynamicConfiguration extends 
AbstractDynamicConfiguration {
             this.listener = listener;
         }
 
-        // FIXME will Apollo consider an empty value ("") as deleted?
+        // FIXME will Apollo consider an empty value "" as deleted?
         @Override
         public void onChange(ConfigChangeEvent changeEvent) {
             for (String key : changeEvent.changedKeys()) {
                 ConfigChange change = changeEvent.getChange(key);
-                // Maybe we no longer need to identify the type of change. 
Because there's no scenario that a callback will subscribe for both 
configurators and routers
+                // TODO Maybe we no longer need to identify the type of 
change. Because there's no scenario that a callback will subscribe for both 
configurators and routers
                 if 
(change.getPropertyName().endsWith(Constants.CONFIGURATORS_SUFFIX)) {
                     listener.process(new 
org.apache.dubbo.config.dynamic.ConfigChangeEvent(key, change.getNewValue(), 
ConfigType.CONFIGURATORS, getChangeType(change.getChangeType())));
                 } else {
diff --git 
a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/ArchaiusDynamicConfiguration.java
 
b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/ArchaiusDynamicConfiguration.java
index 979e884..feb177a 100644
--- 
a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/ArchaiusDynamicConfiguration.java
+++ 
b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/ArchaiusDynamicConfiguration.java
@@ -33,7 +33,7 @@ import 
org.apache.dubbo.config.dynamic.support.archaius.sources.ZooKeeperConfigu
 /**
  * Archaius supports various sources and it's extensiable: JDBC, ZK, 
Properties, ..., so should we make it extensiable?
  */
-public class ArchaiusDynamicConfiguration extends AbstractDynamicConfiguration 
{
+public class ArchaiusDynamicConfiguration extends 
AbstractDynamicConfiguration<Runnable> {
 
     public ArchaiusDynamicConfiguration() {
     }
@@ -43,51 +43,45 @@ public class ArchaiusDynamicConfiguration extends 
AbstractDynamicConfiguration {
         //  String address = env.getCompositeConf().getString(ADDRESS_KEY);
         //  String app = env.getCompositeConf().getString(APP_KEY);
 
-        String address = url.getAddress();
+        String address = url.getParameter(Constants.CONFIG_ADDRESS_KEY, 
url.getAddress());
         if (!address.equals(Constants.ANYHOST_VALUE)) {
             
System.setProperty(ZooKeeperConfigurationSource.ARCHAIUS_SOURCE_ADDRESS_KEY, 
address);
         }
-        
System.setProperty(ZooKeeperConfigurationSource.ARCHAIUS_CONFIG_ROOT_PATH_KEY, 
ZooKeeperConfigurationSource.DEFAULT_CONFIG_ROOT_PATH);
+        
System.setProperty(ZooKeeperConfigurationSource.ARCHAIUS_CONFIG_ROOT_PATH_KEY, 
url.getParameter(Constants.CONFIG_NAMESPACE_KEY, 
ZooKeeperConfigurationSource.DEFAULT_CONFIG_ROOT_PATH));
 
         try {
             ZooKeeperConfigurationSource zkConfigSource = new 
ZooKeeperConfigurationSource();
             zkConfigSource.start();
+            /*if (!zkConfigSource.isConnected()) {
+                // we can check the status of config center here, and decide 
to fail or continue if we cannot reach the config server.
+            }*/
             DynamicWatchedConfiguration zkDynamicConfig = new 
DynamicWatchedConfiguration(zkConfigSource);
             ConfigurationManager.install(zkDynamicConfig);
         } catch (Exception e) {
-            e.printStackTrace();
+            throw new IllegalStateException(e.getMessage(), e);
         }
     }
 
     @Override
-    public void addListener(String key, ConfigurationListener listener) {
-        DynamicStringProperty prop = DynamicPropertyFactory.getInstance()
-                .getStringProperty(key, null);
-        prop.addCallback(new ArchaiusListener(key, listener));
-    }
-
-    @Override
-    public String getConfig(String key, String group) {
-        return getConfig(key, group, null);
+    protected String getInternalProperty(String key, String group, long 
timeout, ConfigurationListener listener) {
+        return DynamicPropertyFactory.getInstance()
+                .getStringProperty(key, null)
+                .get();
     }
 
     @Override
-    public String getConfig(String key, String group, ConfigurationListener 
listener) {
+    protected void addTargetListener(String key, Runnable runnable) {
         DynamicStringProperty prop = DynamicPropertyFactory.getInstance()
                 .getStringProperty(key, null);
-        if (listener != null) {
-            prop.addCallback(new ArchaiusListener(key, listener));
-        }
-        return prop.get();
+        prop.addCallback(runnable);
     }
 
     @Override
-    protected String getInternalProperty(String key, String group, long 
timeout) {
-        return DynamicPropertyFactory.getInstance()
-                .getStringProperty(key, null)
-                .get();
+    protected Runnable createTargetConfigListener(String key, 
ConfigurationListener listener) {
+        return new ArchaiusListener(key, listener);
     }
 
+
     private class ArchaiusListener implements Runnable {
         private ConfigurationListener listener;
         private URL url;
@@ -103,7 +97,7 @@ public class ArchaiusDynamicConfiguration extends 
AbstractDynamicConfiguration {
                 type = ConfigType.CONFIGURATORS;
             } else {
                 /**
-                 * Works for any router rules:
+                 * used for all router rules:
                  * {@link Constants.ROUTERS_SUFFIX}
                  * {@link 
org.apache.dubbo.rpc.cluster.router.tag.TagRouter.TAGRULE_DATAID}
                  */
diff --git 
a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/sources/ZooKeeperConfigurationSource.java
 
b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/sources/ZooKeeperConfigurationSource.java
index c95033b..6e91624 100644
--- 
a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/sources/ZooKeeperConfigurationSource.java
+++ 
b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/sources/ZooKeeperConfigurationSource.java
@@ -38,7 +38,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -57,6 +56,7 @@ public class ZooKeeperConfigurationSource implements 
WatchedConfigurationSource,
 
     private final String configRootPath;
     private final TreeCache treeCache;
+    private boolean connected = false;
 
     private final Charset charset = Charset.forName("UTF-8");
 
@@ -75,10 +75,18 @@ public class ZooKeeperConfigurationSource implements 
WatchedConfigurationSource,
         if (connectString == null) {
             throw new IllegalArgumentException("connectString==null, must 
specify the address to connect for zookeeper archaius source.");
         }
+
         CuratorFramework client = 
CuratorFrameworkFactory.newClient(connectString, sessionTimeout, connectTimeout,
                 new ExponentialBackoffRetry(1000, 3));
         client.start();
-
+        try {
+            connected = client.blockUntilConnected(connectTimeout * 4, 
TimeUnit.MILLISECONDS);
+            if (!connected) {
+                logger.warn("Cannot connect to ConfigCenter at zookeeper " + 
connectString + " in " + connectTimeout * 4 + "ms");
+            }
+        } catch (InterruptedException e) {
+            logger.error("The thread was interrupted unexpectedly when try 
connecting to zookeeper " + connectString + " as ConfigCenter, ", e);
+        }
         this.client = client;
         this.configRootPath = configRootPath;
         this.treeCache = new TreeCache(client, configRootPath);
@@ -103,16 +111,12 @@ public class ZooKeeperConfigurationSource implements 
WatchedConfigurationSource,
      */
     public void start() throws Exception {
         // create the watcher for future configuration updatess
-        CountDownLatch latch = new CountDownLatch(1);
         treeCache.getListenable().addListener(new TreeCacheListener() {
             public void childEvent(CuratorFramework aClient, TreeCacheEvent 
event)
                     throws Exception {
 
                 TreeCacheEvent.Type type = event.getType();
                 ChildData data = event.getData();
-                if (type == TreeCacheEvent.Type.INITIALIZED) {
-                    latch.countDown();
-                }
 
                 // TODO, ignore other event types
                 if (data == null) {
@@ -152,7 +156,6 @@ public class ZooKeeperConfigurationSource implements 
WatchedConfigurationSource,
 
         // passing true to trigger an initial rebuild upon starting.  
(blocking call)
         treeCache.start();
-        latch.await(60 * 1000, TimeUnit.MILLISECONDS);
     }
 
     /**
@@ -174,6 +177,11 @@ public class ZooKeeperConfigurationSource implements 
WatchedConfigurationSource,
 
         Map<String, Object> all = new HashMap<>();
 
+        if (!connected) {
+            logger.warn("ConfigServer is not connected yet, zookeeper don't 
support local snapshot yet, so there's no old data to use!");
+            return all;
+        }
+
         Map<String, ChildData> dataMap = 
treeCache.getCurrentChildren(configRootPath);
         if (dataMap != null && dataMap.size() > 0) {
             dataMap.forEach((childPath, v) -> {
@@ -220,4 +228,8 @@ public class ZooKeeperConfigurationSource implements 
WatchedConfigurationSource,
             logger.error("IOException should not have been thrown.", exc);
         }
     }
+
+    public boolean isConnected() {
+        return connected;
+    }
 }

Reply via email to