KYLIN-3090 Refactor to consolidate all caches and managers under KylinConfig


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b8d79870
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b8d79870
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b8d79870

Branch: refs/heads/master
Commit: b8d7987090d96a763569a20a3f1665a3a612b565
Parents: bace2d2
Author: Li Yang <liy...@apache.org>
Authored: Thu Dec 7 19:10:34 2017 +0800
Committer: Dong Li <lid...@apache.org>
Committed: Mon Dec 11 10:16:35 2017 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfig.java    |  43 ++-
 .../apache/kylin/common/KylinConfigBase.java    |   4 +
 .../kylin/common/persistence/ResourceStore.java |   4 +
 .../persistence/RootPersistentEntity.java       |   9 +
 .../kylin/common/util/AutoReadWriteLock.java    |  66 ++++
 .../common/util/AbstractKylinTestCase.java      |  34 --
 .../common/util/AutoReadWriteLockTest.java      |  43 +++
 .../org/apache/kylin/cube/CubeDescManager.java  |  51 +--
 .../java/org/apache/kylin/cube/CubeManager.java |  38 +--
 .../org/apache/kylin/cube/cuboid/Cuboid.java    |  77 ++---
 .../apache/kylin/cube/cuboid/CuboidManager.java |  76 +++++
 .../upgrade/common/CubeMetadataUpgrade.java     |  10 +-
 .../kylin/cube/AggregationGroupRuleTest.java    |   2 -
 .../org/apache/kylin/cube/CubeDescTest.java     |   4 +-
 .../apache/kylin/cube/CubeManagerCacheTest.java |   5 -
 .../kylin/cube/common/RowKeySplitterTest.java   |   2 -
 .../kylin/cube/cuboid/CuboidSchedulerTest.java  |   4 +-
 .../apache/kylin/cube/cuboid/CuboidTest.java    |   2 -
 .../apache/kylin/cube/kv/RowKeyDecoderTest.java |   2 -
 .../apache/kylin/cube/kv/RowKeyEncoderTest.java |   2 -
 .../apache/kylin/dict/DictionaryManager.java    |  44 +--
 .../kylin/dict/lookup/SnapshotManager.java      |  46 +--
 .../org/apache/kylin/job/dao/ExecutableDao.java |  34 +-
 .../kylin/job/execution/ExecutableManager.java  |  36 +-
 .../impl/threadpool/DistributedScheduler.java   |  45 +--
 .../kylin/metadata/TableMetadataManager.java    |  58 +---
 .../kylin/metadata/TempStatementManager.java    |  48 +--
 .../kylin/metadata/acl/TableACLManager.java     |  41 +--
 .../badquery/BadQueryHistoryManager.java        |  42 +--
 .../kylin/metadata/cachesync/Broadcaster.java   |  40 +--
 .../metadata/cachesync/CachedCrudAssist.java    | 139 ++++++++
 .../kylin/metadata/draft/DraftManager.java      |  32 +-
 .../kylin/metadata/model/DataModelManager.java  |  47 +--
 .../kylin/metadata/project/ProjectInstance.java |   5 +
 .../kylin/metadata/project/ProjectManager.java  | 334 ++++++-------------
 .../realization/RealizationRegistry.java        |  38 +--
 .../metadata/streaming/StreamingManager.java    |  60 +---
 .../gtrecord/GTCubeStorageQueryBase.java        |   2 +-
 .../kylin/storage/hybrid/HybridManager.java     |  37 +-
 .../engine/mr/steps/MergeCuboidMapperTest.java  |   8 +-
 .../rest/controller/ProjectController.java      |   3 +-
 .../java/org/apache/kylin/rest/msg/Message.java |   4 +
 .../kylin/rest/service/ProjectService.java      |  25 --
 .../rest/service/DiagnosisServiceTest.java      |   2 -
 .../rest/controller/ProjectControllerTest.java  |  20 +-
 .../kylin/source/kafka/KafkaConfigManager.java  |  54 +--
 .../hbase/util/ExtendCubeToHybridCLI.java       |  11 +-
 .../cube/MeasureTypeOnlyAggrInBaseTest.java     |   6 +-
 .../hbase/steps/RowValueDecoderTest.java        |   2 -
 .../kylin/tool/ExtendCubeToHybridCLI.java       |  11 +-
 .../apache/kylin/tool/KylinLogExtractor.java    |   6 +-
 .../apache/kylin/tool/CubeMetaIngesterTest.java |  10 +-
 52 files changed, 686 insertions(+), 1082 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index d014262..c638ec6 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -26,11 +26,13 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.StringReader;
+import java.lang.reflect.Method;
 import java.net.URL;
 import java.nio.ByteOrder;
 import java.nio.charset.Charset;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
@@ -382,6 +384,8 @@ public class KylinConfig extends KylinConfigBase {
     }
 
     // 
============================================================================
+    
+    Map<Class, Object> managersCache = new ConcurrentHashMap<>();
 
     private KylinConfig() {
         super();
@@ -391,6 +395,43 @@ public class KylinConfig extends KylinConfigBase {
         super(props, force);
     }
 
+    public <T> T getManager(Class<T> clz) {
+        KylinConfig base = base();
+        if (base != this)
+            return base.getManager(clz);
+        
+        Object mgr = managersCache.get(clz);
+        if (mgr != null)
+            return (T) mgr;
+        
+        synchronized (this) {
+            mgr = managersCache.get(clz);
+            if (mgr != null)
+                return (T) mgr;
+            
+            try {
+                // new manager via static Manager.newInstance()
+                Method method = clz.getDeclaredMethod("newInstance", 
KylinConfig.class);
+                method.setAccessible(true); // override accessibility
+                mgr = method.invoke(null, this);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            managersCache.put(clz, mgr);
+        }
+        return (T) mgr;
+    }
+    
+    public void clearManagers() {
+        KylinConfig base = base();
+        if (base != this) {
+            base.clearManagers();
+            return;
+        }
+        
+        managersCache.clear();
+    }
+
     public Properties exportToProperties() {
         Properties all = getAllProperties();
         Properties copy = new Properties();
@@ -432,7 +473,7 @@ public class KylinConfig extends KylinConfigBase {
     public synchronized void reloadFromSiteProperties() {
         reloadKylinConfig(buildSiteProperties());
     }
-
+    
     public KylinConfig base() {
         return this;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 2d74684..1302247 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -329,6 +329,10 @@ abstract public class KylinConfigBase implements 
Serializable {
     public String getHBaseMappingAdapter() {
         return getOptional("kylin.metadata.hbasemapping-adapter");
     }
+    
+    public boolean isCheckCopyOnWrite() {
+        return 
Boolean.parseBoolean(getOptional("kylin.metadata.check-copy-on-write", 
"false"));
+    }
 
     // 
============================================================================
     // DICTIONARY & SNAPSHOT

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
 
b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 86aa8d1..629e12e 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -118,6 +118,10 @@ abstract public class ResourceStore {
     protected ResourceStore(KylinConfig kylinConfig) {
         this.kylinConfig = kylinConfig;
     }
+    
+    final public KylinConfig getConfig() {
+        return kylinConfig;
+    }
 
     /**
      * List resources and sub-folders under a given folder, return null if 
given path is not a folder

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
 
b/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
index 06e481d..aa35482 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
@@ -98,6 +98,15 @@ abstract public class RootPersistentEntity implements 
AclEntity, Serializable {
     public void updateRandomUuid() {
         setUuid(UUID.randomUUID().toString());
     }
+    
+    /**
+     * The name as a part of the resource path used to save the entity.
+     * 
+     * E.g. /resource-root-dir/{RESOURCE_NAME}.json
+     */
+    public String resourceName() {
+        return uuid;
+    }
 
     @Override
     public int hashCode() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-common/src/main/java/org/apache/kylin/common/util/AutoReadWriteLock.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/AutoReadWriteLock.java 
b/core-common/src/main/java/org/apache/kylin/common/util/AutoReadWriteLock.java
new file mode 100644
index 0000000..f4518bb
--- /dev/null
+++ 
b/core-common/src/main/java/org/apache/kylin/common/util/AutoReadWriteLock.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+import java.io.Closeable;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class AutoReadWriteLock {
+    
+    public interface AutoLock extends Closeable {
+        // a close() that don't throw exception
+        public void close();
+    }
+    
+    // 
============================================================================
+    
+    final private ReentrantReadWriteLock rwlock;
+
+    public AutoReadWriteLock() {
+        this(new ReentrantReadWriteLock());
+    }
+    
+    public AutoReadWriteLock(ReentrantReadWriteLock rwlock) {
+        this.rwlock = rwlock;
+    }
+    
+    public ReentrantReadWriteLock innerLock() {
+        return rwlock;
+    }
+    
+    public AutoLock lockForRead() {
+        rwlock.readLock().lock();
+        return new AutoLock() {
+            @Override
+            public void close() {
+                rwlock.readLock().unlock();
+            }
+        };
+    }
+    
+    public AutoLock lockForWrite() {
+        rwlock.writeLock().lock();
+        return new AutoLock() {
+            @Override
+            public void close() {
+                rwlock.writeLock().unlock();
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
 
b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
index 620e135..37790d2 100644
--- 
a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
+++ 
b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
@@ -18,8 +18,6 @@
 
 package org.apache.kylin.common.util;
 
-import java.lang.reflect.Method;
-
 import org.apache.kylin.common.KylinConfig;
 
 /**
@@ -28,22 +26,6 @@ import org.apache.kylin.common.KylinConfig;
  */
 public abstract class AbstractKylinTestCase {
 
-    public static final String[] SERVICES_WITH_CACHE = { //
-            "org.apache.kylin.cube.CubeManager", //
-            "org.apache.kylin.cube.CubeDescManager", //
-            "org.apache.kylin.dict.lookup.SnapshotManager", //
-            "org.apache.kylin.dict.DictionaryManager", //
-            "org.apache.kylin.storage.hybrid.HybridManager", //
-            "org.apache.kylin.metadata.realization.RealizationRegistry", //
-            "org.apache.kylin.metadata.project.ProjectManager", //
-            "org.apache.kylin.metadata.MetadataManager", //
-            "org.apache.kylin.metadata.cachesync.Broadcaster", //
-            "org.apache.kylin.metadata.badquery.BadQueryHistoryManager", //
-            "org.apache.kylin.job.impl.threadpool.DistributedScheduler", //
-            "org.apache.kylin.job.execution.ExecutableManager", //
-            "org.apache.kylin.job.dao.ExecutableDao" //
-    };
-
     public abstract void createTestMetadata(String... overlayMetadataDirs) 
throws Exception;
 
     public abstract void cleanupTestMetadata() throws Exception;
@@ -53,24 +35,8 @@ public abstract class AbstractKylinTestCase {
     }
 
     public static void staticCleanupTestMetadata() {
-        cleanupCache();
         System.clearProperty(KylinConfig.KYLIN_CONF);
         KylinConfig.destroyInstance();
     }
 
-    private static void cleanupCache() {
-
-        for (String serviceClass : SERVICES_WITH_CACHE) {
-            try {
-                Class<?> cls = Class.forName(serviceClass);
-                Method method = cls.getDeclaredMethod("clearCache");
-                method.invoke(null);
-            } catch (ClassNotFoundException e) {
-                // acceptable because lower module test does have CubeManager 
etc on classpath
-            } catch (Exception e) {
-                System.err.println("Error clean up cache " + serviceClass);
-                e.printStackTrace();
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-common/src/test/java/org/apache/kylin/common/util/AutoReadWriteLockTest.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/test/java/org/apache/kylin/common/util/AutoReadWriteLockTest.java
 
b/core-common/src/test/java/org/apache/kylin/common/util/AutoReadWriteLockTest.java
new file mode 100644
index 0000000..56cfd93
--- /dev/null
+++ 
b/core-common/src/test/java/org/apache/kylin/common/util/AutoReadWriteLockTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class AutoReadWriteLockTest {
+    
+    @Test
+    public void testBasics() {
+        AutoReadWriteLock lock = new AutoReadWriteLock(new 
ReentrantReadWriteLock());
+        try (AutoLock al = lock.lockForRead()) {
+            Assert.assertTrue(lock.innerLock().getReadHoldCount() == 1);
+        }
+        Assert.assertTrue(lock.innerLock().getReadHoldCount() == 0);
+        
+        try (AutoLock al = lock.lockForWrite()) {
+            Assert.assertTrue(lock.innerLock().getWriteHoldCount() == 1);
+        }
+        Assert.assertTrue(lock.innerLock().getWriteHoldCount() == 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
index 9e42eab..f724549 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
@@ -22,15 +22,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidManager;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.validation.CubeMetadataValidator;
 import org.apache.kylin.cube.model.validation.ValidateContext;
@@ -63,35 +61,13 @@ public class CubeDescManager {
 
     public static final Serializer<CubeDesc> CUBE_DESC_SERIALIZER = new 
JsonSerializer<CubeDesc>(CubeDesc.class);
 
-    // static cached instances
-    private static final ConcurrentMap<KylinConfig, CubeDescManager> CACHE = 
new ConcurrentHashMap<KylinConfig, CubeDescManager>();
-
     public static CubeDescManager getInstance(KylinConfig config) {
-        CubeDescManager r = CACHE.get(config);
-        if (r != null) {
-            return r;
-        }
-
-        synchronized (CubeDescManager.class) {
-            r = CACHE.get(config);
-            if (r != null) {
-                return r;
-            }
-            try {
-                r = new CubeDescManager(config);
-                CACHE.put(config, r);
-                if (CACHE.size() > 1) {
-                    logger.warn("More than one singleton exist");
-                }
-                return r;
-            } catch (IOException e) {
-                throw new IllegalStateException("Failed to init 
CubeDescManager from " + config, e);
-            }
-        }
+        return config.getManager(CubeDescManager.class);
     }
 
-    public static void clearCache() {
-        CACHE.clear();
+    // called by reflection
+    static CubeDescManager newInstance(KylinConfig config) throws IOException {
+        return new CubeDescManager(config);
     }
 
     // 
============================================================================
@@ -113,12 +89,6 @@ public class CubeDescManager {
     private class CubeDescSyncListener extends Broadcaster.Listener {
 
         @Override
-        public void onClearAll(Broadcaster broadcaster) throws IOException {
-            clearCache();
-            Cuboid.clearCache();
-        }
-
-        @Override
         public void onProjectSchemaChange(Broadcaster broadcaster, String 
project) throws IOException {
             for (IRealization real : 
ProjectManager.getInstance(config).listAllRealizations(project)) {
                 if (real instanceof CubeInstance) {
@@ -166,7 +136,7 @@ public class CubeDescManager {
         CubeDesc ndesc = loadCubeDesc(CubeDesc.concatResourcePath(name), 
false);
 
         cubeDescMap.putLocal(ndesc.getName(), ndesc);
-        Cuboid.clearCache(ndesc.getName()); // avoid calling 
CubeDesc.getInitialCuboidScheduler() for late initializing CuboidScheduler
+        clearCuboidCache(ndesc.getName());
 
         // if related cube is in DESCBROKEN state before, change it back to 
DISABLED
         CubeManager cubeManager = CubeManager.getInstance(config);
@@ -292,13 +262,18 @@ public class CubeDescManager {
         String path = cubeDesc.getResourcePath();
         getStore().deleteResource(path);
         cubeDescMap.remove(cubeDesc.getName());
-        Cuboid.clearCache(cubeDesc.getName()); // avoid calling 
CubeDesc.getInitialCuboidScheduler() for late initializing CuboidScheduler
+        clearCuboidCache(cubeDesc.getName());
     }
 
     // remove cubeDesc
     public void removeLocalCubeDesc(String name) throws IOException {
         cubeDescMap.removeLocal(name);
-        Cuboid.clearCache(name);
+        clearCuboidCache(name);
+    }
+    
+    private void clearCuboidCache(String descName) {
+        // avoid calling CubeDesc.getInitialCuboidScheduler() for late 
initializing CuboidScheduler
+        CuboidManager.getInstance(config).clearCache(descName);
     }
 
     private void reloadAllCubeDesc() throws IOException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 95da80a..e00735c 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -91,39 +91,13 @@ public class CubeManager implements IRealizationProvider {
 
     private static final Logger logger = 
LoggerFactory.getLogger(CubeManager.class);
 
-    // static cached instances
-    private static final ConcurrentMap<KylinConfig, CubeManager> CACHE = new 
ConcurrentHashMap<KylinConfig, CubeManager>();
-
     public static CubeManager getInstance(KylinConfig config) {
-        CubeManager r = CACHE.get(config);
-        if (r != null) {
-            return r;
-        }
-
-        synchronized (CubeManager.class) {
-            r = CACHE.get(config);
-            if (r != null) {
-                return r;
-            }
-            try {
-                r = new CubeManager(config);
-                CACHE.put(config, r);
-                if (CACHE.size() > 1) {
-                    logger.warn("More than one singleton exist");
-                    for (KylinConfig kylinConfig : CACHE.keySet()) {
-                        logger.warn("type: " + kylinConfig.getClass() + " 
reference: "
-                                + System.identityHashCode(kylinConfig.base()));
-                    }
-                }
-                return r;
-            } catch (IOException e) {
-                throw new IllegalStateException("Failed to init CubeManager 
from " + config, e);
-            }
-        }
+        return config.getManager(CubeManager.class);
     }
 
-    public static void clearCache() {
-        CACHE.clear();
+    // called by reflection
+    static CubeManager newInstance(KylinConfig config) throws IOException {
+        return new CubeManager(config);
     }
 
     // 
============================================================================
@@ -148,10 +122,6 @@ public class CubeManager implements IRealizationProvider {
     }
 
     private class CubeSyncListener extends Broadcaster.Listener {
-        @Override
-        public void onClearAll(Broadcaster broadcaster) throws IOException {
-            clearCache();
-        }
 
         @Override
         public void onProjectSchemaChange(Broadcaster broadcaster, String 
project) throws IOException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java 
b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
index efd2e2e..1110a5d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
@@ -23,10 +23,10 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
@@ -40,14 +40,10 @@ import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.Maps;
 
 @SuppressWarnings("serial")
 public class Cuboid implements Comparable<Cuboid>, Serializable {
 
-    // TODO Should the cache be inside CuboidScheduler?
-    private final static Map<String, Map<Long, Cuboid>> CUBOID_CACHE = 
Maps.newConcurrentMap();
-
     // smaller is better
     public final static Comparator<Long> cuboidSelectComparator = new 
Comparator<Long>() {
         @Override
@@ -56,38 +52,12 @@ public class Cuboid implements Comparable<Cuboid>, 
Serializable {
         }
     };
 
-    // this is the only entry point for query to find the right cuboid for a 
segment
-    public static Cuboid identifyCuboid(CubeSegment cubeSegment, 
Set<TblColRef> dimensions,
-            Collection<FunctionDesc> metrics) {
-        return identifyCuboid(cubeSegment.getCuboidScheduler(), dimensions, 
metrics);
-    }
-
-    // this is the only entry point for query to find the right cuboid for a 
cube instance
-    public static Cuboid identifyCuboid(CubeInstance cubeInstance, 
Set<TblColRef> dimensions,
-            Collection<FunctionDesc> metrics) {
-        return identifyCuboid(cubeInstance.getCuboidScheduler(), dimensions, 
metrics);
-    }
-
-    public static Cuboid identifyCuboid(CuboidScheduler cuboidScheduler, 
Set<TblColRef> dimensions,
+    public static Cuboid findCuboid(CuboidScheduler cuboidScheduler, 
Set<TblColRef> dimensions,
             Collection<FunctionDesc> metrics) {
-        long cuboidID = identifyCuboidId(cuboidScheduler.getCubeDesc(), 
dimensions, metrics);
+        long cuboidID = toCuboidId(cuboidScheduler.getCubeDesc(), dimensions, 
metrics);
         return Cuboid.findById(cuboidScheduler, cuboidID);
     }
 
-    public static long identifyCuboidId(CubeDesc cubeDesc, Set<TblColRef> 
dimensions, Collection<FunctionDesc> metrics) {
-        for (FunctionDesc metric : metrics) {
-            if (metric.getMeasureType().onlyAggrInBaseCuboid())
-                return Cuboid.getBaseCuboidId(cubeDesc);
-        }
-
-        long cuboidID = 0;
-        for (TblColRef column : dimensions) {
-            int index = cubeDesc.getRowkey().getColumnBitIndex(column);
-            cuboidID |= 1L << index;
-        }
-        return cuboidID;
-    }
-
     public static Cuboid findById(CuboidScheduler cuboidScheduler, byte[] 
cuboidID) {
         return findById(cuboidScheduler, Bytes.toLong(cuboidID));
     }
@@ -106,18 +76,27 @@ public class Cuboid implements Comparable<Cuboid>, 
Serializable {
     }
 
     public static Cuboid findById(CuboidScheduler cuboidScheduler, long 
cuboidID) {
-        Map<Long, Cuboid> cubeCache = 
CUBOID_CACHE.get(cuboidScheduler.getCuboidCacheKey());
-        if (cubeCache == null) {
-            cubeCache = Maps.newConcurrentMap();
-            CUBOID_CACHE.put(cuboidScheduler.getCuboidCacheKey(), cubeCache);
+        KylinConfig config = cuboidScheduler.getCubeDesc().getConfig();
+        return CuboidManager.getInstance(config).findById(cuboidScheduler, 
cuboidID);
+    }
+
+    public static void clearCache(CubeInstance cubeInstance) {
+        KylinConfig config = cubeInstance.getConfig();
+        CuboidManager.getInstance(config).clearCache(cubeInstance);
+    }
+
+    public static long toCuboidId(CubeDesc cubeDesc, Set<TblColRef> 
dimensions, Collection<FunctionDesc> metrics) {
+        for (FunctionDesc metric : metrics) {
+            if (metric.getMeasureType().onlyAggrInBaseCuboid())
+                return Cuboid.getBaseCuboidId(cubeDesc);
         }
-        Cuboid cuboid = cubeCache.get(cuboidID);
-        if (cuboid == null) {
-            long validCuboidID = cuboidScheduler.findBestMatchCuboid(cuboidID);
-            cuboid = new Cuboid(cuboidScheduler.getCubeDesc(), cuboidID, 
validCuboidID);
-            cubeCache.put(cuboidID, cuboid);
+
+        long cuboidID = 0;
+        for (TblColRef column : dimensions) {
+            int index = cubeDesc.getRowkey().getColumnBitIndex(column);
+            cuboidID |= 1L << index;
         }
-        return cuboid;
+        return cuboidID;
     }
 
     public static long getBaseCuboidId(CubeDesc cube) {
@@ -128,18 +107,6 @@ public class Cuboid implements Comparable<Cuboid>, 
Serializable {
         return findById(cube.getInitialCuboidScheduler(), 
getBaseCuboidId(cube));
     }
 
-    public static void clearCache() {
-        CUBOID_CACHE.clear();
-    }
-
-    public static void clearCache(String cacheKey) {
-        CUBOID_CACHE.remove(cacheKey);
-    }
-    
-    public static void clearCache(CubeInstance cubeInstance) {
-        
CUBOID_CACHE.remove(cubeInstance.getCuboidScheduler().getCuboidCacheKey());
-    }
-
     // 
============================================================================
 
     private CubeDesc cubeDesc;

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidManager.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidManager.java
new file mode 100644
index 0000000..6efc87f
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidManager.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.cuboid;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+
+import com.google.common.collect.Maps;
+
+/**
+ * A cuboid cache.
+ */
+public class CuboidManager {
+
+    public static CuboidManager getInstance(KylinConfig config) {
+        return config.getManager(CuboidManager.class);
+    }
+
+    // called by reflection
+    static CuboidManager newInstance(KylinConfig config) throws IOException {
+        return new CuboidManager(config);
+    }
+    
+    // 
============================================================================
+
+    @SuppressWarnings("unused")
+    final private KylinConfig config;
+    final private Map<String, Map<Long, Cuboid>> schedulerCuboidCache = 
Maps.newConcurrentMap();
+    
+    private CuboidManager(KylinConfig config) {
+        this.config = config;
+    }
+    
+    public Cuboid findById(CuboidScheduler cuboidScheduler, long cuboidID) {
+        Map<Long, Cuboid> cubeCache = 
schedulerCuboidCache.get(cuboidScheduler.getCuboidCacheKey());
+        if (cubeCache == null) {
+            cubeCache = Maps.newConcurrentMap();
+            schedulerCuboidCache.put(cuboidScheduler.getCuboidCacheKey(), 
cubeCache);
+        }
+        Cuboid cuboid = cubeCache.get(cuboidID);
+        if (cuboid == null) {
+            long validCuboidID = cuboidScheduler.findBestMatchCuboid(cuboidID);
+            cuboid = new Cuboid(cuboidScheduler.getCubeDesc(), cuboidID, 
validCuboidID);
+            cubeCache.put(cuboidID, cuboid);
+        }
+        return cuboid;
+    }
+
+    public void clearCache(String cacheKey) {
+        schedulerCuboidCache.remove(cacheKey);
+    }
+    
+    public void clearCache(CubeInstance cubeInstance) {
+        
schedulerCuboidCache.remove(cubeInstance.getCuboidScheduler().getCuboidCacheKey());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/main/java/org/apache/kylin/cube/upgrade/common/CubeMetadataUpgrade.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/upgrade/common/CubeMetadataUpgrade.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/upgrade/common/CubeMetadataUpgrade.java
index f2f6b57..7c7ea0e 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/upgrade/common/CubeMetadataUpgrade.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/upgrade/common/CubeMetadataUpgrade.java
@@ -63,23 +63,17 @@ public abstract class CubeMetadataUpgrade {
     }
 
     public void clear() {
-        DataModelManager.clearCache();
-        CubeDescManager.clearCache();
-        CubeManager.clearCache();
-        ProjectManager.clearCache();
+        config.clearManagers();
     }
 
     public void verify() {
         
logger.info("=================================================================");
         logger.info("The changes are applied, now it's time to verify the new 
metadata store by reloading all metadata:");
         
logger.info("=================================================================");
-        DataModelManager.clearCache();
+        config.clearManagers();
         DataModelManager.getInstance(config);
-        CubeDescManager.clearCache();
         CubeDescManager.getInstance(config);
-        CubeManager.clearCache();
         CubeManager.getInstance(config);
-        ProjectManager.clearCache();
         ProjectManager.getInstance(config);
         //cleanup();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java 
b/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java
index 1444ee1..b40dded 100644
--- 
a/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java
+++ 
b/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java
@@ -32,7 +32,6 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.validation.IValidatorRule;
 import org.apache.kylin.cube.model.validation.ValidateContext;
 import org.apache.kylin.cube.model.validation.rule.AggregationGroupRule;
-import org.apache.kylin.metadata.model.DataModelManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -41,7 +40,6 @@ public class AggregationGroupRuleTest extends 
LocalFileMetadataTestCase {
     @Before
     public void setUp() throws Exception {
         this.createTestMetadata();
-        DataModelManager.clearCache();
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/test/java/org/apache/kylin/cube/CubeDescTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeDescTest.java 
b/core-cube/src/test/java/org/apache/kylin/cube/CubeDescTest.java
index ae7b17c..36ac5c3 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/CubeDescTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeDescTest.java
@@ -376,7 +376,7 @@ public class CubeDescTest extends LocalFileMetadataTestCase 
{
         }
 
         thrown.expect(TooManyCuboidException.class);
-        CubeDescManager.clearCache();
+        getTestConfig().clearManagers();
         CubeDesc cubeDesc = CubeDescManager.getInstance(getTestConfig())
                 .getCubeDesc("ut_cube_desc_combination_int_overflow");
         cubeDesc.init(getTestConfig());
@@ -391,7 +391,7 @@ public class CubeDescTest extends LocalFileMetadataTestCase 
{
 
         thrown.expect(IllegalArgumentException.class);
         thrown.expectMessage("Too many rowkeys (78) in CubeDesc, please try to 
reduce dimension number or adopt derived dimensions");
-        CubeDescManager.clearCache();
+        getTestConfig().clearManagers();
         CubeDesc cubeDesc = 
CubeDescManager.getInstance(getTestConfig()).getCubeDesc("ut_78_rowkeys");
         cubeDesc.init(getTestConfig());
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java 
b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java
index 52b9042..3881943 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java
@@ -23,8 +23,6 @@ import static org.junit.Assert.assertEquals;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.metadata.model.DataModelManager;
-import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.junit.After;
 import org.junit.Before;
@@ -41,9 +39,6 @@ public class CubeManagerCacheTest extends 
LocalFileMetadataTestCase {
     @Before
     public void setUp() throws Exception {
         this.createTestMetadata();
-        DataModelManager.clearCache();
-        CubeManager.clearCache();
-        ProjectManager.clearCache();
         cubeManager = CubeManager.getInstance(getTestConfig());
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java 
b/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java
index 6d32586..e4a426d 100644
--- 
a/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java
+++ 
b/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertEquals;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.metadata.model.DataModelManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -33,7 +32,6 @@ public class RowKeySplitterTest extends 
LocalFileMetadataTestCase {
     @Before
     public void setUp() throws Exception {
         this.createTestMetadata();
-        DataModelManager.clearCache();
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidSchedulerTest.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidSchedulerTest.java 
b/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidSchedulerTest.java
index 09200b8..87bf9c3 100644
--- 
a/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidSchedulerTest.java
+++ 
b/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidSchedulerTest.java
@@ -31,7 +31,6 @@ import java.util.Set;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.metadata.model.DataModelManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -47,7 +46,6 @@ public class CuboidSchedulerTest extends 
LocalFileMetadataTestCase {
     @Before
     public void setUp() throws Exception {
         this.createTestMetadata();
-        DataModelManager.clearCache();
     }
 
     @After
@@ -330,7 +328,7 @@ public class CuboidSchedulerTest extends 
LocalFileMetadataTestCase {
                 f.renameTo(new File(path.substring(0, path.length() - 4)));
             }
         }
-        CubeDescManager.clearCache();
+        getTestConfig().clearManagers();
         CubeDesc cube = 
getCubeDescManager().getCubeDesc("ut_large_dimension_number");
         CuboidScheduler scheduler = cube.getInitialCuboidScheduler();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidTest.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidTest.java 
b/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidTest.java
index 0a77bdc..3c7dd66 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidTest.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertEquals;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.metadata.model.DataModelManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -61,7 +60,6 @@ public class CuboidTest extends LocalFileMetadataTestCase {
     @Before
     public void setUp() throws Exception {
         this.createTestMetadata();
-        DataModelManager.clearCache();
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java 
b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java
index 459e734..d9f9265 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java
@@ -29,7 +29,6 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.metadata.model.DataModelManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -40,7 +39,6 @@ public class RowKeyDecoderTest extends 
LocalFileMetadataTestCase {
     @Before
     public void setUp() throws Exception {
         this.createTestMetadata();
-        DataModelManager.clearCache();
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java 
b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java
index dcd883e..b34197e 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java
@@ -29,7 +29,6 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.metadata.model.DataModelManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -40,7 +39,6 @@ public class RowKeyEncoderTest extends 
LocalFileMetadataTestCase {
     @Before
     public void setUp() throws Exception {
         this.createTestMetadata();
-        DataModelManager.clearCache();
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java 
b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index e97899c..292c61c 100755
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -24,8 +24,6 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.List;
 import java.util.NavigableSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -35,7 +33,6 @@ import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.model.DataModelManager;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.IReadableTable;
 import org.apache.kylin.source.IReadableTable.TableSignature;
@@ -55,28 +52,13 @@ public class DictionaryManager {
 
     private static final DictionaryInfo NONE_INDICATOR = new DictionaryInfo();
 
-    // static cached instances
-    private static final ConcurrentMap<KylinConfig, DictionaryManager> CACHE = 
new ConcurrentHashMap<KylinConfig, DictionaryManager>();
-
     public static DictionaryManager getInstance(KylinConfig config) {
-        DictionaryManager r = CACHE.get(config);
-        if (r == null) {
-            synchronized (DictionaryManager.class) {
-                r = CACHE.get(config);
-                if (r == null) {
-                    r = new DictionaryManager(config);
-                    CACHE.put(config, r);
-                    if (CACHE.size() > 1) {
-                        logger.warn("More than one singleton exist");
-                    }
-                }
-            }
-        }
-        return r;
+        return config.getManager(DictionaryManager.class);
     }
 
-    public static void clearCache() {
-        CACHE.clear();
+    // called by reflection
+    static DictionaryManager newInstance(KylinConfig config) throws 
IOException {
+        return new DictionaryManager(config);
     }
 
     // 
============================================================================
@@ -176,7 +158,7 @@ public class DictionaryManager {
     }
 
     private String checkDupByContent(DictionaryInfo dictInfo, 
Dictionary<String> dict) throws IOException {
-        ResourceStore store = DataModelManager.getInstance(config).getStore();
+        ResourceStore store = getStore();
         NavigableSet<String> existings = 
store.listResources(dictInfo.getResourceDir());
         if (existings == null)
             return null;
@@ -343,7 +325,7 @@ public class DictionaryManager {
     }
 
     private String checkDupByInfo(DictionaryInfo dictInfo) throws IOException {
-        final ResourceStore store = 
DataModelManager.getInstance(config).getStore();
+        final ResourceStore store = getStore();
         final List<DictionaryInfo> allResources = 
store.getAllResources(dictInfo.getResourceDir(), DictionaryInfo.class, 
DictionaryInfoSerializer.INFO_SERIALIZER);
 
         TableSignature input = dictInfo.getInput();
@@ -357,7 +339,7 @@ public class DictionaryManager {
     }
 
     private DictionaryInfo findLargestDictInfo(DictionaryInfo dictInfo) throws 
IOException {
-        final ResourceStore store = 
DataModelManager.getInstance(config).getStore();
+        final ResourceStore store = getStore();
         final List<DictionaryInfo> allResources = 
store.getAllResources(dictInfo.getResourceDir(), DictionaryInfo.class, 
DictionaryInfoSerializer.INFO_SERIALIZER);
 
         DictionaryInfo largestDict = null;
@@ -376,7 +358,7 @@ public class DictionaryManager {
 
     public void removeDictionary(String resourcePath) throws IOException {
         logger.info("Remvoing dict: " + resourcePath);
-        ResourceStore store = DataModelManager.getInstance(config).getStore();
+        ResourceStore store = getStore();
         store.deleteResource(resourcePath);
         dictCache.invalidate(resourcePath);
     }
@@ -386,7 +368,7 @@ public class DictionaryManager {
         info.setSourceTable(srcTable);
         info.setSourceColumn(srcCol);
 
-        ResourceStore store = DataModelManager.getInstance(config).getStore();
+        ResourceStore store = getStore();
         NavigableSet<String> existings = 
store.listResources(info.getResourceDir());
         if (existings == null)
             return;
@@ -396,7 +378,7 @@ public class DictionaryManager {
     }
 
     void save(DictionaryInfo dict) throws IOException {
-        ResourceStore store = DataModelManager.getInstance(config).getStore();
+        ResourceStore store = getStore();
         String path = dict.getResourcePath();
         logger.info("Saving dictionary at " + path);
 
@@ -412,11 +394,15 @@ public class DictionaryManager {
     }
 
     DictionaryInfo load(String resourcePath, boolean loadDictObj) throws 
IOException {
-        ResourceStore store = DataModelManager.getInstance(config).getStore();
+        ResourceStore store = getStore();
 
         logger.info("DictionaryManager(" + System.identityHashCode(this) + ") 
loading DictionaryInfo(loadDictObj:" + loadDictObj + ") at " + resourcePath);
         DictionaryInfo info = store.getResource(resourcePath, 
DictionaryInfo.class, loadDictObj ? DictionaryInfoSerializer.FULL_SERIALIZER : 
DictionaryInfoSerializer.INFO_SERIALIZER);
         return info;
     }
 
+    private ResourceStore getStore() {
+        return ResourceStore.getStore(config);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
 
b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index c10deb4..5192805 100644
--- 
a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++ 
b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -20,14 +20,11 @@ package org.apache.kylin.dict.lookup;
 
 import java.io.IOException;
 import java.util.NavigableSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.metadata.model.DataModelManager;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.IReadableTable;
 import org.apache.kylin.source.IReadableTable.TableSignature;
@@ -47,39 +44,21 @@ public class SnapshotManager {
 
     private static final Logger logger = 
LoggerFactory.getLogger(SnapshotManager.class);
 
-    // static cached instances
-    private static final ConcurrentMap<KylinConfig, SnapshotManager> 
SERVICE_CACHE = new ConcurrentHashMap<KylinConfig, SnapshotManager>();
-
     public static SnapshotManager getInstance(KylinConfig config) {
-        SnapshotManager r = SERVICE_CACHE.get(config);
-        if (r == null) {
-            synchronized (SnapshotManager.class) {
-                r = SERVICE_CACHE.get(config);
-                if (r == null) {
-                    r = new SnapshotManager(config);
-                    SERVICE_CACHE.put(config, r);
-                    if (SERVICE_CACHE.size() > 1) {
-                        logger.warn("More than one singleton exist");
-                    }
-                }
-            }
-        }
-        return r;
+        return config.getManager(SnapshotManager.class);
     }
 
-    public static void clearCache() {
-        synchronized (SERVICE_CACHE) {
-            SERVICE_CACHE.clear();
-        }
+    // called by reflection
+    static SnapshotManager newInstance(KylinConfig config) throws IOException {
+        return new SnapshotManager(config);
     }
 
     // 
============================================================================
 
     private KylinConfig config;
-    private LoadingCache<String, SnapshotTable> snapshotCache; // resource
 
-    // path ==>
-    // SnapshotTable
+    // path ==> SnapshotTable
+    private LoadingCache<String, SnapshotTable> snapshotCache; // resource
 
     private SnapshotManager(KylinConfig config) {
         this.config = config;
@@ -116,7 +95,7 @@ public class SnapshotManager {
     }
 
     public void removeSnapshot(String resourcePath) throws IOException {
-        ResourceStore store = 
DataModelManager.getInstance(this.config).getStore();
+        ResourceStore store = getStore();
         store.deleteResource(resourcePath);
         snapshotCache.invalidate(resourcePath);
     }
@@ -171,7 +150,7 @@ public class SnapshotManager {
     }
 
     private String checkDupByInfo(SnapshotTable snapshot) throws IOException {
-        ResourceStore store = 
DataModelManager.getInstance(this.config).getStore();
+        ResourceStore store = getStore();
         String resourceDir = snapshot.getResourceDir();
         NavigableSet<String> existings = store.listResources(resourceDir);
         if (existings == null)
@@ -189,7 +168,7 @@ public class SnapshotManager {
     }
 
     private String checkDupByContent(SnapshotTable snapshot) throws 
IOException {
-        ResourceStore store = 
DataModelManager.getInstance(this.config).getStore();
+        ResourceStore store = getStore();
         String resourceDir = snapshot.getResourceDir();
         NavigableSet<String> existings = store.listResources(resourceDir);
         if (existings == null)
@@ -205,14 +184,14 @@ public class SnapshotManager {
     }
 
     private void save(SnapshotTable snapshot) throws IOException {
-        ResourceStore store = 
DataModelManager.getInstance(this.config).getStore();
+        ResourceStore store = getStore();
         String path = snapshot.getResourcePath();
         store.putResource(path, snapshot, 
SnapshotTableSerializer.FULL_SERIALIZER);
     }
 
     private SnapshotTable load(String resourcePath, boolean loadData) throws 
IOException {
         logger.info("Loading snapshotTable from " + resourcePath + ", with 
loadData: " + loadData);
-        ResourceStore store = 
DataModelManager.getInstance(this.config).getStore();
+        ResourceStore store = getStore();
 
         SnapshotTable table = store.getResource(resourcePath, 
SnapshotTable.class, loadData ? SnapshotTableSerializer.FULL_SERIALIZER : 
SnapshotTableSerializer.INFO_SERIALIZER);
 
@@ -222,4 +201,7 @@ public class SnapshotManager {
         return table;
     }
 
+    private ResourceStore getStore() {
+        return ResourceStore.getStore(this.config);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java 
b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
index 16875b1..a396b92 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
@@ -23,15 +23,12 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.NavigableSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.job.exception.PersistentException;
-import org.apache.kylin.metadata.model.DataModelManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,36 +41,23 @@ public class ExecutableDao {
     private static final Serializer<ExecutablePO> JOB_SERIALIZER = new 
JsonSerializer<ExecutablePO>(ExecutablePO.class);
     private static final Serializer<ExecutableOutputPO> JOB_OUTPUT_SERIALIZER 
= new JsonSerializer<ExecutableOutputPO>(ExecutableOutputPO.class);
     private static final Logger logger = 
LoggerFactory.getLogger(ExecutableDao.class);
-    private static final ConcurrentMap<KylinConfig, ExecutableDao> CACHE = new 
ConcurrentHashMap<KylinConfig, ExecutableDao>();
-
-    private ResourceStore store;
 
     public static ExecutableDao getInstance(KylinConfig config) {
-        ExecutableDao r = CACHE.get(config);
-        if (r == null) {
-            synchronized (ExecutableDao.class) {
-                r = CACHE.get(config);
-                if (r == null) {
-                    r = new ExecutableDao(config);
-                    CACHE.put(config, r);
-                    if (CACHE.size() > 1) {
-                        logger.warn("More than one singleton exist");
-                    }
-                }
-            }
-        }
-        return r;
+        return config.getManager(ExecutableDao.class);
     }
 
-    public static void clearCache() {
-        synchronized (CACHE) {
-            CACHE.clear();
-        }
+    // called by reflection
+    static ExecutableDao newInstance(KylinConfig config) throws IOException {
+        return new ExecutableDao(config);
     }
 
+    // 
============================================================================
+    
+    private ResourceStore store;
+
     private ExecutableDao(KylinConfig config) {
         logger.info("Using metadata url: " + config);
-        this.store = DataModelManager.getInstance(config).getStore();
+        this.store = ResourceStore.getStore(config);
     }
 
     private String pathOfJob(ExecutablePO job) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java 
b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index bab8c30..9f67a2b 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -22,13 +22,12 @@ import static 
org.apache.kylin.job.constant.ExecutableConstants.MR_JOB_ID;
 import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_ID;
 import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_URL;
 
+import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.util.HashMap;
 import java.util.IllegalFormatException;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -51,8 +50,18 @@ import com.google.common.collect.Maps;
 public class ExecutableManager {
 
     private static final Logger logger = 
LoggerFactory.getLogger(ExecutableManager.class);
-    private static final ConcurrentMap<KylinConfig, ExecutableManager> CACHE = 
new ConcurrentHashMap<KylinConfig, ExecutableManager>();
 
+    public static ExecutableManager getInstance(KylinConfig config) {
+        return config.getManager(ExecutableManager.class);
+    }
+
+    // called by reflection
+    static ExecutableManager newInstance(KylinConfig config) throws 
IOException {
+        return new ExecutableManager(config);
+    }
+
+    // 
============================================================================
+    
     private final KylinConfig config;
     private final ExecutableDao executableDao;
 
@@ -62,27 +71,6 @@ public class ExecutableManager {
         this.executableDao = ExecutableDao.getInstance(config);
     }
 
-    public static ExecutableManager getInstance(KylinConfig config) {
-        ExecutableManager r = CACHE.get(config);
-        if (r == null) {
-            synchronized (ExecutableManager.class) {
-                r = CACHE.get(config);
-                if (r == null) {
-                    r = new ExecutableManager(config);
-                    CACHE.put(config, r);
-                    if (CACHE.size() > 1) {
-                        logger.warn("More than one singleton exist");
-                    }
-                }
-            }
-        }
-        return r;
-    }
-
-    public static void clearCache() {
-        CACHE.clear();
-    }
-
     private static ExecutablePO parse(AbstractExecutable executable) {
         ExecutablePO result = new ExecutablePO();
         result.setName(executable.getName());

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 6d41c5e..2c934d0 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -22,8 +22,6 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -63,6 +61,22 @@ import com.google.common.collect.Maps;
  *  3. add all the job servers and query servers to the 
kylin.server.cluster-servers
  */
 public class DistributedScheduler implements Scheduler<AbstractExecutable>, 
ConnectionStateListener {
+    private static final Logger logger = 
LoggerFactory.getLogger(DistributedScheduler.class);
+    
+    private final static String SEGMENT_ID = "segmentId";
+    public static final String ZOOKEEPER_LOCK_PATH = "/job_engine/lock"; // 
note ZookeeperDistributedLock will ensure zk path prefix: 
/${kylin.env.zookeeper-base-path}/metadata
+
+    public static DistributedScheduler getInstance(KylinConfig config) {
+        return config.getManager(DistributedScheduler.class);
+    }
+
+    // called by reflection
+    static DistributedScheduler newInstance(KylinConfig config) throws 
IOException {
+        return new DistributedScheduler();
+    }
+
+    // 
============================================================================
+    
     private ExecutableManager executableManager;
     private FetcherRunner fetcher;
     private ScheduledExecutorService fetcherPool;
@@ -72,8 +86,6 @@ public class DistributedScheduler implements 
Scheduler<AbstractExecutable>, Conn
     private DistributedLock jobLock;
     private Closeable lockWatch;
 
-    private static final Logger logger = 
LoggerFactory.getLogger(DistributedScheduler.class);
-    private static final ConcurrentMap<KylinConfig, DistributedScheduler> 
CACHE = new ConcurrentHashMap<KylinConfig, DistributedScheduler>();
     //keep all segments having running job
     private final Set<String> segmentWithLocks = new CopyOnWriteArraySet<>();
     private volatile boolean initialized = false;
@@ -81,31 +93,6 @@ public class DistributedScheduler implements 
Scheduler<AbstractExecutable>, Conn
     private JobEngineConfig jobEngineConfig;
     private String serverName;
 
-    private final static String SEGMENT_ID = "segmentId";
-    public static final String ZOOKEEPER_LOCK_PATH = "/job_engine/lock"; // 
note ZookeeperDistributedLock will ensure zk path prefix: 
/${kylin.env.zookeeper-base-path}/metadata
-
-    //only for it test
-    public static DistributedScheduler getInstance(KylinConfig config) {
-        DistributedScheduler r = CACHE.get(config);
-        if (r == null) {
-            synchronized (DistributedScheduler.class) {
-                r = CACHE.get(config);
-                if (r == null) {
-                    r = new DistributedScheduler();
-                    CACHE.put(config, r);
-                    if (CACHE.size() > 1) {
-                        logger.warn("More than one singleton exist");
-                    }
-                }
-            }
-        }
-        return r;
-    }
-
-    public static void clearCache() {
-        CACHE.clear();
-    }
-
     private class FetcherRunner implements Runnable {
         @Override
         synchronized public void run() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
index 84133af..af275a5 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
@@ -26,12 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
-import javax.annotation.Nullable;
-
-import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.RawResource;
@@ -50,8 +45,6 @@ import org.apache.kylin.metadata.project.ProjectManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -67,43 +60,13 @@ public class TableMetadataManager {
     public static final Serializer<ExternalFilterDesc> 
EXTERNAL_FILTER_DESC_SERIALIZER = new JsonSerializer<ExternalFilterDesc>(
             ExternalFilterDesc.class);
 
-    // static cached instances
-    private static final ConcurrentMap<KylinConfig, TableMetadataManager> 
CACHE = new ConcurrentHashMap<KylinConfig, TableMetadataManager>();
-
     public static TableMetadataManager getInstance(KylinConfig config) {
-        TableMetadataManager r = CACHE.get(config);
-        if (r != null) {
-            return r;
-        }
-
-        synchronized (TableMetadataManager.class) {
-            r = CACHE.get(config);
-            if (r != null) {
-                return r;
-            }
-            try {
-                r = new TableMetadataManager(config);
-                CACHE.put(config, r);
-                if (CACHE.size() > 1) {
-                    logger.warn("More than one singleton exist, current keys: 
{}", StringUtils
-                            
.join(Iterators.transform(CACHE.keySet().iterator(), new Function<KylinConfig, 
String>() {
-                                @Nullable
-                                @Override
-                                public String apply(@Nullable KylinConfig 
input) {
-                                    return 
String.valueOf(System.identityHashCode(input));
-                                }
-                            }), ","));
-                }
-
-                return r;
-            } catch (IOException e) {
-                throw new IllegalStateException("Failed to init 
TableMetadataManager from " + config, e);
-            }
-        }
+        return config.getManager(TableMetadataManager.class);
     }
 
-    public static void clearCache() {
-        CACHE.clear();
+    // called by reflection
+    static TableMetadataManager newInstance(KylinConfig config) throws 
IOException {
+        return new TableMetadataManager(config);
     }
 
     // 
============================================================================
@@ -148,7 +111,6 @@ public class TableMetadataManager {
             return globalTables;
         }
         
-        
         ProjectInstance project = 
ProjectManager.getInstance(config).getProject(prj);
         Set<String> prjTableNames = project.getTables();
 
@@ -359,10 +321,6 @@ public class TableMetadataManager {
     }
 
     private class SrcTableSyncListener extends Broadcaster.Listener {
-        @Override
-        public void onClearAll(Broadcaster broadcaster) throws IOException {
-            clearCache();
-        }
 
         @Override
         public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey)
@@ -387,10 +345,6 @@ public class TableMetadataManager {
     }
 
     private class SrcTableExtSyncListener extends Broadcaster.Listener {
-        @Override
-        public void onClearAll(Broadcaster broadcaster) throws IOException {
-            clearCache();
-        }
 
         @Override
         public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey)
@@ -403,10 +357,6 @@ public class TableMetadataManager {
     }
 
     private class ExtFilterSyncListener extends Broadcaster.Listener {
-        @Override
-        public void onClearAll(Broadcaster broadcaster) throws IOException {
-            clearCache();
-        }
 
         @Override
         public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey)

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-metadata/src/main/java/org/apache/kylin/metadata/TempStatementManager.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/TempStatementManager.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/TempStatementManager.java
index 6487eef..30ff934 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/TempStatementManager.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/TempStatementManager.java
@@ -21,12 +21,7 @@ package org.apache.kylin.metadata;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
-import javax.annotation.Nullable;
-
-import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
@@ -39,49 +34,19 @@ import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
 
 public class TempStatementManager {
     private static final Logger logger = 
LoggerFactory.getLogger(TempStatementManager.class);
-    private static final ConcurrentMap<KylinConfig, TempStatementManager> 
CACHE = new ConcurrentHashMap<>();
     public static final Serializer<TempStatementEntity> 
TEMP_STATEMENT_SERIALIZER = new JsonSerializer<>(
             TempStatementEntity.class);
 
     public static TempStatementManager getInstance(KylinConfig config) {
-        TempStatementManager r = CACHE.get(config);
-        if (r != null) {
-            return r;
-        }
-
-        synchronized (TempStatementManager.class) {
-            r = CACHE.get(config);
-            if (r != null) {
-                return r;
-            }
-            try {
-                r = new TempStatementManager(config);
-                CACHE.put(config, r);
-                if (CACHE.size() > 1) {
-                    logger.warn("More than one singleton exist, current keys: 
{}", StringUtils
-                            
.join(Iterators.transform(CACHE.keySet().iterator(), new Function<KylinConfig, 
String>() {
-                                @Nullable
-                                @Override
-                                public String apply(@Nullable KylinConfig 
input) {
-                                    return 
String.valueOf(System.identityHashCode(input));
-                                }
-                            }), ","));
-                }
-
-                return r;
-            } catch (IOException e) {
-                throw new IllegalStateException("Failed to init 
TableMetadataManager from " + config, e);
-            }
-        }
+        return config.getManager(TempStatementManager.class);
     }
 
-    public static void clearCache() {
-        CACHE.clear();
+    // called by reflection
+    static TempStatementManager newInstance(KylinConfig config) throws 
IOException {
+        return new TempStatementManager(config);
     }
 
     // 
============================================================================
@@ -195,10 +160,6 @@ public class TempStatementManager {
     }
 
     private class TempStatementSyncListener extends Broadcaster.Listener {
-        @Override
-        public void onClearAll(Broadcaster broadcaster) throws IOException {
-            clearCache();
-        }
 
         @Override
         public void onEntityChange(Broadcaster broadcaster, String entity, 
Broadcaster.Event event, String cacheKey)
@@ -210,6 +171,7 @@ public class TempStatementManager {
         }
     }
 
+    @SuppressWarnings("serial")
     @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, 
getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = 
JsonAutoDetect.Visibility.NONE, setterVisibility = 
JsonAutoDetect.Visibility.NONE)
     private static class TempStatementEntity extends RootPersistentEntity {
         private static final String DEFAULT_SESSION_ID = "DEFAULT_SESSION";

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-metadata/src/main/java/org/apache/kylin/metadata/acl/TableACLManager.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/acl/TableACLManager.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/acl/TableACLManager.java
index 9c802b4..905fa16 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/acl/TableACLManager.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/acl/TableACLManager.java
@@ -20,8 +20,6 @@ package org.apache.kylin.metadata.acl;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
@@ -41,40 +39,13 @@ public class TableACLManager {
     private static final Serializer<TableACL> TABLE_ACL_SERIALIZER = new 
JsonSerializer<>(TableACL.class);
     private static final String DIR_PREFIX = "/table_acl/";
 
-    // static cached instances
-    private static final ConcurrentMap<KylinConfig, TableACLManager> CACHE = 
new ConcurrentHashMap<>();
-
     public static TableACLManager getInstance(KylinConfig config) {
-        TableACLManager r = CACHE.get(config);
-        if (r != null) {
-            return r;
-        }
-
-        synchronized (TableACLManager.class) {
-            r = CACHE.get(config);
-            if (r != null) {
-                return r;
-            }
-            try {
-                r = new TableACLManager(config);
-                CACHE.put(config, r);
-                if (CACHE.size() > 1) {
-                    logger.warn("More than one singleton exist");
-                }
-                return r;
-            } catch (IOException e) {
-                throw new IllegalStateException("Failed to init 
CubeDescManager from " + config, e);
-            }
-        }
+        return config.getManager(TableACLManager.class);
     }
 
-    public static void clearCache() {
-        CACHE.clear();
-    }
-
-    public static void clearCache(KylinConfig kylinConfig) {
-        if (kylinConfig != null)
-            CACHE.remove(kylinConfig);
+    // called by reflection
+    static TableACLManager newInstance(KylinConfig config) throws IOException {
+        return new TableACLManager(config);
     }
 
     // 
============================================================================
@@ -92,10 +63,6 @@ public class TableACLManager {
     }
 
     private class TableACLSyncListener extends Broadcaster.Listener {
-        @Override
-        public void onClearAll(Broadcaster broadcaster) throws IOException {
-            clearCache();
-        }
 
         @Override
         public void onEntityChange(Broadcaster broadcaster, String entity, 
Broadcaster.Event event, String cacheKey) throws IOException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java
index a916254..c04e9eb 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java
@@ -20,8 +20,6 @@ package org.apache.kylin.metadata.badquery;
 
 import java.io.IOException;
 import java.util.NavigableSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -35,8 +33,18 @@ import org.slf4j.LoggerFactory;
 public class BadQueryHistoryManager {
     public static final Serializer<BadQueryHistory> 
BAD_QUERY_INSTANCE_SERIALIZER = new JsonSerializer<>(BadQueryHistory.class);
     private static final Logger logger = 
LoggerFactory.getLogger(BadQueryHistoryManager.class);
+    
+    public static BadQueryHistoryManager getInstance(KylinConfig config) {
+        return config.getManager(BadQueryHistoryManager.class);
+    }
+
+    // called by reflection
+    static BadQueryHistoryManager newInstance(KylinConfig config) throws 
IOException {
+        return new BadQueryHistoryManager(config);
+    }
+    
+    // 
============================================================================
 
-    private static final ConcurrentMap<KylinConfig, BadQueryHistoryManager> 
CACHE = new ConcurrentHashMap<>();
     private KylinConfig kylinConfig;
 
     private BadQueryHistoryManager(KylinConfig config) throws IOException {
@@ -44,34 +52,6 @@ public class BadQueryHistoryManager {
         this.kylinConfig = config;
     }
 
-    public static BadQueryHistoryManager getInstance(KylinConfig config) {
-        BadQueryHistoryManager r = CACHE.get(config);
-        if (r != null) {
-            return r;
-        }
-
-        synchronized (BadQueryHistoryManager.class) {
-            r = CACHE.get(config);
-            if (r != null) {
-                return r;
-            }
-            try {
-                r = new BadQueryHistoryManager(config);
-                CACHE.put(config, r);
-                if (CACHE.size() > 1) {
-                    logger.warn("More than one singleton exist");
-                }
-                return r;
-            } catch (IOException e) {
-                throw new IllegalStateException("Failed to init 
BadQueryHistoryManager from " + config, e);
-            }
-        }
-    }
-
-    public static void clearCache() {
-        CACHE.clear();
-    }
-
     private ResourceStore getStore() {
         return ResourceStore.getStore(this.kylinConfig);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
index 6558692..05910ea 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
@@ -24,8 +24,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -67,39 +65,13 @@ public class Broadcaster {
     public static final String SYNC_PRJ_DATA = "project_data"; // the special 
entity to indicate project data has change, e.g. cube/raw_table update
     public static final String SYNC_PRJ_ACL = "project_acl"; // the special 
entity to indicate query ACL has change, e.g. table_acl/learn_kylin update
 
-    // static cached instances
-    private static final ConcurrentMap<KylinConfig, Broadcaster> CACHE = new 
ConcurrentHashMap<KylinConfig, Broadcaster>();
-
     public static Broadcaster getInstance(KylinConfig config) {
-
-        synchronized (CACHE) {
-            Broadcaster r = CACHE.get(config);
-            if (r != null) {
-                return r;
-            }
-
-            r = new Broadcaster(config);
-            CACHE.put(config, r);
-            if (CACHE.size() > 1) {
-                logger.warn("More than one singleton exist");
-            }
-            return r;
-        }
+        return config.getManager(Broadcaster.class);
     }
-
-    // call Broadcaster.getInstance().notifyClearAll() to clear cache
-    public static void clearCache() {
-        synchronized (CACHE) {
-            CACHE.clear();
-        }
-    }
-
-    public static void clearCache(KylinConfig kylinConfig) {
-        if (kylinConfig != null) {
-            synchronized (CACHE) {
-                CACHE.remove(kylinConfig);
-            }
-        }
+    
+    // called by reflection
+    static Broadcaster newInstance(KylinConfig config) {
+        return new Broadcaster(config);
     }
 
     // 
============================================================================
@@ -260,7 +232,7 @@ public class Broadcaster {
             for (Listener l : list) {
                 l.onClearAll(this);
             }
-            clearCache(); // clear broadcaster too in the end
+            config.clearManagers(); // clear all registered managers in config
             break;
         case SYNC_PRJ_SCHEMA:
             ProjectManager.getInstance(config).clearL2Cache();

Reply via email to