KYLIN-3090 Refactor to consolidate all caches and managers under KylinConfig
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b8d79870 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b8d79870 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b8d79870 Branch: refs/heads/master Commit: b8d7987090d96a763569a20a3f1665a3a612b565 Parents: bace2d2 Author: Li Yang <liy...@apache.org> Authored: Thu Dec 7 19:10:34 2017 +0800 Committer: Dong Li <lid...@apache.org> Committed: Mon Dec 11 10:16:35 2017 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/common/KylinConfig.java | 43 ++- .../apache/kylin/common/KylinConfigBase.java | 4 + .../kylin/common/persistence/ResourceStore.java | 4 + .../persistence/RootPersistentEntity.java | 9 + .../kylin/common/util/AutoReadWriteLock.java | 66 ++++ .../common/util/AbstractKylinTestCase.java | 34 -- .../common/util/AutoReadWriteLockTest.java | 43 +++ .../org/apache/kylin/cube/CubeDescManager.java | 51 +-- .../java/org/apache/kylin/cube/CubeManager.java | 38 +-- .../org/apache/kylin/cube/cuboid/Cuboid.java | 77 ++--- .../apache/kylin/cube/cuboid/CuboidManager.java | 76 +++++ .../upgrade/common/CubeMetadataUpgrade.java | 10 +- .../kylin/cube/AggregationGroupRuleTest.java | 2 - .../org/apache/kylin/cube/CubeDescTest.java | 4 +- .../apache/kylin/cube/CubeManagerCacheTest.java | 5 - .../kylin/cube/common/RowKeySplitterTest.java | 2 - .../kylin/cube/cuboid/CuboidSchedulerTest.java | 4 +- .../apache/kylin/cube/cuboid/CuboidTest.java | 2 - .../apache/kylin/cube/kv/RowKeyDecoderTest.java | 2 - .../apache/kylin/cube/kv/RowKeyEncoderTest.java | 2 - .../apache/kylin/dict/DictionaryManager.java | 44 +-- .../kylin/dict/lookup/SnapshotManager.java | 46 +-- .../org/apache/kylin/job/dao/ExecutableDao.java | 34 +- .../kylin/job/execution/ExecutableManager.java | 36 +- .../impl/threadpool/DistributedScheduler.java | 45 +-- .../kylin/metadata/TableMetadataManager.java | 58 +--- .../kylin/metadata/TempStatementManager.java | 48 +-- .../kylin/metadata/acl/TableACLManager.java | 41 +-- .../badquery/BadQueryHistoryManager.java | 42 +-- .../kylin/metadata/cachesync/Broadcaster.java | 40 +-- .../metadata/cachesync/CachedCrudAssist.java | 139 ++++++++ .../kylin/metadata/draft/DraftManager.java | 32 +- .../kylin/metadata/model/DataModelManager.java | 47 +-- .../kylin/metadata/project/ProjectInstance.java | 5 + .../kylin/metadata/project/ProjectManager.java | 334 ++++++------------- .../realization/RealizationRegistry.java | 38 +-- .../metadata/streaming/StreamingManager.java | 60 +--- .../gtrecord/GTCubeStorageQueryBase.java | 2 +- .../kylin/storage/hybrid/HybridManager.java | 37 +- .../engine/mr/steps/MergeCuboidMapperTest.java | 8 +- .../rest/controller/ProjectController.java | 3 +- .../java/org/apache/kylin/rest/msg/Message.java | 4 + .../kylin/rest/service/ProjectService.java | 25 -- .../rest/service/DiagnosisServiceTest.java | 2 - .../rest/controller/ProjectControllerTest.java | 20 +- .../kylin/source/kafka/KafkaConfigManager.java | 54 +-- .../hbase/util/ExtendCubeToHybridCLI.java | 11 +- .../cube/MeasureTypeOnlyAggrInBaseTest.java | 6 +- .../hbase/steps/RowValueDecoderTest.java | 2 - .../kylin/tool/ExtendCubeToHybridCLI.java | 11 +- .../apache/kylin/tool/KylinLogExtractor.java | 6 +- .../apache/kylin/tool/CubeMetaIngesterTest.java | 10 +- 52 files changed, 686 insertions(+), 1082 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java index d014262..c638ec6 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -26,11 +26,13 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.StringReader; +import java.lang.reflect.Method; import java.net.URL; import java.nio.ByteOrder; import java.nio.charset.Charset; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; @@ -382,6 +384,8 @@ public class KylinConfig extends KylinConfigBase { } // ============================================================================ + + Map<Class, Object> managersCache = new ConcurrentHashMap<>(); private KylinConfig() { super(); @@ -391,6 +395,43 @@ public class KylinConfig extends KylinConfigBase { super(props, force); } + public <T> T getManager(Class<T> clz) { + KylinConfig base = base(); + if (base != this) + return base.getManager(clz); + + Object mgr = managersCache.get(clz); + if (mgr != null) + return (T) mgr; + + synchronized (this) { + mgr = managersCache.get(clz); + if (mgr != null) + return (T) mgr; + + try { + // new manager via static Manager.newInstance() + Method method = clz.getDeclaredMethod("newInstance", KylinConfig.class); + method.setAccessible(true); // override accessibility + mgr = method.invoke(null, this); + } catch (Exception e) { + throw new RuntimeException(e); + } + managersCache.put(clz, mgr); + } + return (T) mgr; + } + + public void clearManagers() { + KylinConfig base = base(); + if (base != this) { + base.clearManagers(); + return; + } + + managersCache.clear(); + } + public Properties exportToProperties() { Properties all = getAllProperties(); Properties copy = new Properties(); @@ -432,7 +473,7 @@ public class KylinConfig extends KylinConfigBase { public synchronized void reloadFromSiteProperties() { reloadKylinConfig(buildSiteProperties()); } - + public KylinConfig base() { return this; } http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 2d74684..1302247 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -329,6 +329,10 @@ abstract public class KylinConfigBase implements Serializable { public String getHBaseMappingAdapter() { return getOptional("kylin.metadata.hbasemapping-adapter"); } + + public boolean isCheckCopyOnWrite() { + return Boolean.parseBoolean(getOptional("kylin.metadata.check-copy-on-write", "false")); + } // ============================================================================ // DICTIONARY & SNAPSHOT http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java index 86aa8d1..629e12e 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java @@ -118,6 +118,10 @@ abstract public class ResourceStore { protected ResourceStore(KylinConfig kylinConfig) { this.kylinConfig = kylinConfig; } + + final public KylinConfig getConfig() { + return kylinConfig; + } /** * List resources and sub-folders under a given folder, return null if given path is not a folder http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java b/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java index 06e481d..aa35482 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java @@ -98,6 +98,15 @@ abstract public class RootPersistentEntity implements AclEntity, Serializable { public void updateRandomUuid() { setUuid(UUID.randomUUID().toString()); } + + /** + * The name as a part of the resource path used to save the entity. + * + * E.g. /resource-root-dir/{RESOURCE_NAME}.json + */ + public String resourceName() { + return uuid; + } @Override public int hashCode() { http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-common/src/main/java/org/apache/kylin/common/util/AutoReadWriteLock.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/AutoReadWriteLock.java b/core-common/src/main/java/org/apache/kylin/common/util/AutoReadWriteLock.java new file mode 100644 index 0000000..f4518bb --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/util/AutoReadWriteLock.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.common.util; + +import java.io.Closeable; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class AutoReadWriteLock { + + public interface AutoLock extends Closeable { + // a close() that don't throw exception + public void close(); + } + + // ============================================================================ + + final private ReentrantReadWriteLock rwlock; + + public AutoReadWriteLock() { + this(new ReentrantReadWriteLock()); + } + + public AutoReadWriteLock(ReentrantReadWriteLock rwlock) { + this.rwlock = rwlock; + } + + public ReentrantReadWriteLock innerLock() { + return rwlock; + } + + public AutoLock lockForRead() { + rwlock.readLock().lock(); + return new AutoLock() { + @Override + public void close() { + rwlock.readLock().unlock(); + } + }; + } + + public AutoLock lockForWrite() { + rwlock.writeLock().lock(); + return new AutoLock() { + @Override + public void close() { + rwlock.writeLock().unlock(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java index 620e135..37790d2 100644 --- a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java +++ b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java @@ -18,8 +18,6 @@ package org.apache.kylin.common.util; -import java.lang.reflect.Method; - import org.apache.kylin.common.KylinConfig; /** @@ -28,22 +26,6 @@ import org.apache.kylin.common.KylinConfig; */ public abstract class AbstractKylinTestCase { - public static final String[] SERVICES_WITH_CACHE = { // - "org.apache.kylin.cube.CubeManager", // - "org.apache.kylin.cube.CubeDescManager", // - "org.apache.kylin.dict.lookup.SnapshotManager", // - "org.apache.kylin.dict.DictionaryManager", // - "org.apache.kylin.storage.hybrid.HybridManager", // - "org.apache.kylin.metadata.realization.RealizationRegistry", // - "org.apache.kylin.metadata.project.ProjectManager", // - "org.apache.kylin.metadata.MetadataManager", // - "org.apache.kylin.metadata.cachesync.Broadcaster", // - "org.apache.kylin.metadata.badquery.BadQueryHistoryManager", // - "org.apache.kylin.job.impl.threadpool.DistributedScheduler", // - "org.apache.kylin.job.execution.ExecutableManager", // - "org.apache.kylin.job.dao.ExecutableDao" // - }; - public abstract void createTestMetadata(String... overlayMetadataDirs) throws Exception; public abstract void cleanupTestMetadata() throws Exception; @@ -53,24 +35,8 @@ public abstract class AbstractKylinTestCase { } public static void staticCleanupTestMetadata() { - cleanupCache(); System.clearProperty(KylinConfig.KYLIN_CONF); KylinConfig.destroyInstance(); } - private static void cleanupCache() { - - for (String serviceClass : SERVICES_WITH_CACHE) { - try { - Class<?> cls = Class.forName(serviceClass); - Method method = cls.getDeclaredMethod("clearCache"); - method.invoke(null); - } catch (ClassNotFoundException e) { - // acceptable because lower module test does have CubeManager etc on classpath - } catch (Exception e) { - System.err.println("Error clean up cache " + serviceClass); - e.printStackTrace(); - } - } - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-common/src/test/java/org/apache/kylin/common/util/AutoReadWriteLockTest.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/util/AutoReadWriteLockTest.java b/core-common/src/test/java/org/apache/kylin/common/util/AutoReadWriteLockTest.java new file mode 100644 index 0000000..56cfd93 --- /dev/null +++ b/core-common/src/test/java/org/apache/kylin/common/util/AutoReadWriteLockTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.common.util; + +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock; +import org.junit.Assert; +import org.junit.Test; + + +public class AutoReadWriteLockTest { + + @Test + public void testBasics() { + AutoReadWriteLock lock = new AutoReadWriteLock(new ReentrantReadWriteLock()); + try (AutoLock al = lock.lockForRead()) { + Assert.assertTrue(lock.innerLock().getReadHoldCount() == 1); + } + Assert.assertTrue(lock.innerLock().getReadHoldCount() == 0); + + try (AutoLock al = lock.lockForWrite()) { + Assert.assertTrue(lock.innerLock().getWriteHoldCount() == 1); + } + Assert.assertTrue(lock.innerLock().getWriteHoldCount() == 0); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java index 9e42eab..f724549 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java @@ -22,15 +22,13 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.Serializer; -import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.cuboid.CuboidManager; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.validation.CubeMetadataValidator; import org.apache.kylin.cube.model.validation.ValidateContext; @@ -63,35 +61,13 @@ public class CubeDescManager { public static final Serializer<CubeDesc> CUBE_DESC_SERIALIZER = new JsonSerializer<CubeDesc>(CubeDesc.class); - // static cached instances - private static final ConcurrentMap<KylinConfig, CubeDescManager> CACHE = new ConcurrentHashMap<KylinConfig, CubeDescManager>(); - public static CubeDescManager getInstance(KylinConfig config) { - CubeDescManager r = CACHE.get(config); - if (r != null) { - return r; - } - - synchronized (CubeDescManager.class) { - r = CACHE.get(config); - if (r != null) { - return r; - } - try { - r = new CubeDescManager(config); - CACHE.put(config, r); - if (CACHE.size() > 1) { - logger.warn("More than one singleton exist"); - } - return r; - } catch (IOException e) { - throw new IllegalStateException("Failed to init CubeDescManager from " + config, e); - } - } + return config.getManager(CubeDescManager.class); } - public static void clearCache() { - CACHE.clear(); + // called by reflection + static CubeDescManager newInstance(KylinConfig config) throws IOException { + return new CubeDescManager(config); } // ============================================================================ @@ -113,12 +89,6 @@ public class CubeDescManager { private class CubeDescSyncListener extends Broadcaster.Listener { @Override - public void onClearAll(Broadcaster broadcaster) throws IOException { - clearCache(); - Cuboid.clearCache(); - } - - @Override public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException { for (IRealization real : ProjectManager.getInstance(config).listAllRealizations(project)) { if (real instanceof CubeInstance) { @@ -166,7 +136,7 @@ public class CubeDescManager { CubeDesc ndesc = loadCubeDesc(CubeDesc.concatResourcePath(name), false); cubeDescMap.putLocal(ndesc.getName(), ndesc); - Cuboid.clearCache(ndesc.getName()); // avoid calling CubeDesc.getInitialCuboidScheduler() for late initializing CuboidScheduler + clearCuboidCache(ndesc.getName()); // if related cube is in DESCBROKEN state before, change it back to DISABLED CubeManager cubeManager = CubeManager.getInstance(config); @@ -292,13 +262,18 @@ public class CubeDescManager { String path = cubeDesc.getResourcePath(); getStore().deleteResource(path); cubeDescMap.remove(cubeDesc.getName()); - Cuboid.clearCache(cubeDesc.getName()); // avoid calling CubeDesc.getInitialCuboidScheduler() for late initializing CuboidScheduler + clearCuboidCache(cubeDesc.getName()); } // remove cubeDesc public void removeLocalCubeDesc(String name) throws IOException { cubeDescMap.removeLocal(name); - Cuboid.clearCache(name); + clearCuboidCache(name); + } + + private void clearCuboidCache(String descName) { + // avoid calling CubeDesc.getInitialCuboidScheduler() for late initializing CuboidScheduler + CuboidManager.getInstance(config).clearCache(descName); } private void reloadAllCubeDesc() throws IOException { http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 95da80a..e00735c 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -91,39 +91,13 @@ public class CubeManager implements IRealizationProvider { private static final Logger logger = LoggerFactory.getLogger(CubeManager.class); - // static cached instances - private static final ConcurrentMap<KylinConfig, CubeManager> CACHE = new ConcurrentHashMap<KylinConfig, CubeManager>(); - public static CubeManager getInstance(KylinConfig config) { - CubeManager r = CACHE.get(config); - if (r != null) { - return r; - } - - synchronized (CubeManager.class) { - r = CACHE.get(config); - if (r != null) { - return r; - } - try { - r = new CubeManager(config); - CACHE.put(config, r); - if (CACHE.size() > 1) { - logger.warn("More than one singleton exist"); - for (KylinConfig kylinConfig : CACHE.keySet()) { - logger.warn("type: " + kylinConfig.getClass() + " reference: " - + System.identityHashCode(kylinConfig.base())); - } - } - return r; - } catch (IOException e) { - throw new IllegalStateException("Failed to init CubeManager from " + config, e); - } - } + return config.getManager(CubeManager.class); } - public static void clearCache() { - CACHE.clear(); + // called by reflection + static CubeManager newInstance(KylinConfig config) throws IOException { + return new CubeManager(config); } // ============================================================================ @@ -148,10 +122,6 @@ public class CubeManager implements IRealizationProvider { } private class CubeSyncListener extends Broadcaster.Listener { - @Override - public void onClearAll(Broadcaster broadcaster) throws IOException { - clearCache(); - } @Override public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException { http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java index efd2e2e..1110a5d 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java @@ -23,10 +23,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.List; -import java.util.Map; import java.util.Set; import org.apache.commons.lang.StringUtils; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; @@ -40,14 +40,10 @@ import org.apache.kylin.metadata.model.TblColRef; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ComparisonChain; -import com.google.common.collect.Maps; @SuppressWarnings("serial") public class Cuboid implements Comparable<Cuboid>, Serializable { - // TODO Should the cache be inside CuboidScheduler? - private final static Map<String, Map<Long, Cuboid>> CUBOID_CACHE = Maps.newConcurrentMap(); - // smaller is better public final static Comparator<Long> cuboidSelectComparator = new Comparator<Long>() { @Override @@ -56,38 +52,12 @@ public class Cuboid implements Comparable<Cuboid>, Serializable { } }; - // this is the only entry point for query to find the right cuboid for a segment - public static Cuboid identifyCuboid(CubeSegment cubeSegment, Set<TblColRef> dimensions, - Collection<FunctionDesc> metrics) { - return identifyCuboid(cubeSegment.getCuboidScheduler(), dimensions, metrics); - } - - // this is the only entry point for query to find the right cuboid for a cube instance - public static Cuboid identifyCuboid(CubeInstance cubeInstance, Set<TblColRef> dimensions, - Collection<FunctionDesc> metrics) { - return identifyCuboid(cubeInstance.getCuboidScheduler(), dimensions, metrics); - } - - public static Cuboid identifyCuboid(CuboidScheduler cuboidScheduler, Set<TblColRef> dimensions, + public static Cuboid findCuboid(CuboidScheduler cuboidScheduler, Set<TblColRef> dimensions, Collection<FunctionDesc> metrics) { - long cuboidID = identifyCuboidId(cuboidScheduler.getCubeDesc(), dimensions, metrics); + long cuboidID = toCuboidId(cuboidScheduler.getCubeDesc(), dimensions, metrics); return Cuboid.findById(cuboidScheduler, cuboidID); } - public static long identifyCuboidId(CubeDesc cubeDesc, Set<TblColRef> dimensions, Collection<FunctionDesc> metrics) { - for (FunctionDesc metric : metrics) { - if (metric.getMeasureType().onlyAggrInBaseCuboid()) - return Cuboid.getBaseCuboidId(cubeDesc); - } - - long cuboidID = 0; - for (TblColRef column : dimensions) { - int index = cubeDesc.getRowkey().getColumnBitIndex(column); - cuboidID |= 1L << index; - } - return cuboidID; - } - public static Cuboid findById(CuboidScheduler cuboidScheduler, byte[] cuboidID) { return findById(cuboidScheduler, Bytes.toLong(cuboidID)); } @@ -106,18 +76,27 @@ public class Cuboid implements Comparable<Cuboid>, Serializable { } public static Cuboid findById(CuboidScheduler cuboidScheduler, long cuboidID) { - Map<Long, Cuboid> cubeCache = CUBOID_CACHE.get(cuboidScheduler.getCuboidCacheKey()); - if (cubeCache == null) { - cubeCache = Maps.newConcurrentMap(); - CUBOID_CACHE.put(cuboidScheduler.getCuboidCacheKey(), cubeCache); + KylinConfig config = cuboidScheduler.getCubeDesc().getConfig(); + return CuboidManager.getInstance(config).findById(cuboidScheduler, cuboidID); + } + + public static void clearCache(CubeInstance cubeInstance) { + KylinConfig config = cubeInstance.getConfig(); + CuboidManager.getInstance(config).clearCache(cubeInstance); + } + + public static long toCuboidId(CubeDesc cubeDesc, Set<TblColRef> dimensions, Collection<FunctionDesc> metrics) { + for (FunctionDesc metric : metrics) { + if (metric.getMeasureType().onlyAggrInBaseCuboid()) + return Cuboid.getBaseCuboidId(cubeDesc); } - Cuboid cuboid = cubeCache.get(cuboidID); - if (cuboid == null) { - long validCuboidID = cuboidScheduler.findBestMatchCuboid(cuboidID); - cuboid = new Cuboid(cuboidScheduler.getCubeDesc(), cuboidID, validCuboidID); - cubeCache.put(cuboidID, cuboid); + + long cuboidID = 0; + for (TblColRef column : dimensions) { + int index = cubeDesc.getRowkey().getColumnBitIndex(column); + cuboidID |= 1L << index; } - return cuboid; + return cuboidID; } public static long getBaseCuboidId(CubeDesc cube) { @@ -128,18 +107,6 @@ public class Cuboid implements Comparable<Cuboid>, Serializable { return findById(cube.getInitialCuboidScheduler(), getBaseCuboidId(cube)); } - public static void clearCache() { - CUBOID_CACHE.clear(); - } - - public static void clearCache(String cacheKey) { - CUBOID_CACHE.remove(cacheKey); - } - - public static void clearCache(CubeInstance cubeInstance) { - CUBOID_CACHE.remove(cubeInstance.getCuboidScheduler().getCuboidCacheKey()); - } - // ============================================================================ private CubeDesc cubeDesc; http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidManager.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidManager.java new file mode 100644 index 0000000..6efc87f --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidManager.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.cube.cuboid; + +import java.io.IOException; +import java.util.Map; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; + +import com.google.common.collect.Maps; + +/** + * A cuboid cache. + */ +public class CuboidManager { + + public static CuboidManager getInstance(KylinConfig config) { + return config.getManager(CuboidManager.class); + } + + // called by reflection + static CuboidManager newInstance(KylinConfig config) throws IOException { + return new CuboidManager(config); + } + + // ============================================================================ + + @SuppressWarnings("unused") + final private KylinConfig config; + final private Map<String, Map<Long, Cuboid>> schedulerCuboidCache = Maps.newConcurrentMap(); + + private CuboidManager(KylinConfig config) { + this.config = config; + } + + public Cuboid findById(CuboidScheduler cuboidScheduler, long cuboidID) { + Map<Long, Cuboid> cubeCache = schedulerCuboidCache.get(cuboidScheduler.getCuboidCacheKey()); + if (cubeCache == null) { + cubeCache = Maps.newConcurrentMap(); + schedulerCuboidCache.put(cuboidScheduler.getCuboidCacheKey(), cubeCache); + } + Cuboid cuboid = cubeCache.get(cuboidID); + if (cuboid == null) { + long validCuboidID = cuboidScheduler.findBestMatchCuboid(cuboidID); + cuboid = new Cuboid(cuboidScheduler.getCubeDesc(), cuboidID, validCuboidID); + cubeCache.put(cuboidID, cuboid); + } + return cuboid; + } + + public void clearCache(String cacheKey) { + schedulerCuboidCache.remove(cacheKey); + } + + public void clearCache(CubeInstance cubeInstance) { + schedulerCuboidCache.remove(cubeInstance.getCuboidScheduler().getCuboidCacheKey()); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/main/java/org/apache/kylin/cube/upgrade/common/CubeMetadataUpgrade.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/upgrade/common/CubeMetadataUpgrade.java b/core-cube/src/main/java/org/apache/kylin/cube/upgrade/common/CubeMetadataUpgrade.java index f2f6b57..7c7ea0e 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/upgrade/common/CubeMetadataUpgrade.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/upgrade/common/CubeMetadataUpgrade.java @@ -63,23 +63,17 @@ public abstract class CubeMetadataUpgrade { } public void clear() { - DataModelManager.clearCache(); - CubeDescManager.clearCache(); - CubeManager.clearCache(); - ProjectManager.clearCache(); + config.clearManagers(); } public void verify() { logger.info("================================================================="); logger.info("The changes are applied, now it's time to verify the new metadata store by reloading all metadata:"); logger.info("================================================================="); - DataModelManager.clearCache(); + config.clearManagers(); DataModelManager.getInstance(config); - CubeDescManager.clearCache(); CubeDescManager.getInstance(config); - CubeManager.clearCache(); CubeManager.getInstance(config); - ProjectManager.clearCache(); ProjectManager.getInstance(config); //cleanup(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java b/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java index 1444ee1..b40dded 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java @@ -32,7 +32,6 @@ import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.validation.IValidatorRule; import org.apache.kylin.cube.model.validation.ValidateContext; import org.apache.kylin.cube.model.validation.rule.AggregationGroupRule; -import org.apache.kylin.metadata.model.DataModelManager; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -41,7 +40,6 @@ public class AggregationGroupRuleTest extends LocalFileMetadataTestCase { @Before public void setUp() throws Exception { this.createTestMetadata(); - DataModelManager.clearCache(); } @After http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/test/java/org/apache/kylin/cube/CubeDescTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeDescTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeDescTest.java index ae7b17c..36ac5c3 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/CubeDescTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeDescTest.java @@ -376,7 +376,7 @@ public class CubeDescTest extends LocalFileMetadataTestCase { } thrown.expect(TooManyCuboidException.class); - CubeDescManager.clearCache(); + getTestConfig().clearManagers(); CubeDesc cubeDesc = CubeDescManager.getInstance(getTestConfig()) .getCubeDesc("ut_cube_desc_combination_int_overflow"); cubeDesc.init(getTestConfig()); @@ -391,7 +391,7 @@ public class CubeDescTest extends LocalFileMetadataTestCase { thrown.expect(IllegalArgumentException.class); thrown.expectMessage("Too many rowkeys (78) in CubeDesc, please try to reduce dimension number or adopt derived dimensions"); - CubeDescManager.clearCache(); + getTestConfig().clearManagers(); CubeDesc cubeDesc = CubeDescManager.getInstance(getTestConfig()).getCubeDesc("ut_78_rowkeys"); cubeDesc.init(getTestConfig()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java index 52b9042..3881943 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java @@ -23,8 +23,6 @@ import static org.junit.Assert.assertEquals; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.metadata.model.DataModelManager; -import org.apache.kylin.metadata.project.ProjectManager; import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.junit.After; import org.junit.Before; @@ -41,9 +39,6 @@ public class CubeManagerCacheTest extends LocalFileMetadataTestCase { @Before public void setUp() throws Exception { this.createTestMetadata(); - DataModelManager.clearCache(); - CubeManager.clearCache(); - ProjectManager.clearCache(); cubeManager = CubeManager.getInstance(getTestConfig()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java b/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java index 6d32586..e4a426d 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/common/RowKeySplitterTest.java @@ -23,7 +23,6 @@ import static org.junit.Assert.assertEquals; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.metadata.model.DataModelManager; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -33,7 +32,6 @@ public class RowKeySplitterTest extends LocalFileMetadataTestCase { @Before public void setUp() throws Exception { this.createTestMetadata(); - DataModelManager.clearCache(); } @After http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidSchedulerTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidSchedulerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidSchedulerTest.java index 09200b8..87bf9c3 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidSchedulerTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidSchedulerTest.java @@ -31,7 +31,6 @@ import java.util.Set; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.cube.CubeDescManager; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.metadata.model.DataModelManager; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -47,7 +46,6 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { @Before public void setUp() throws Exception { this.createTestMetadata(); - DataModelManager.clearCache(); } @After @@ -330,7 +328,7 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { f.renameTo(new File(path.substring(0, path.length() - 4))); } } - CubeDescManager.clearCache(); + getTestConfig().clearManagers(); CubeDesc cube = getCubeDescManager().getCubeDesc("ut_large_dimension_number"); CuboidScheduler scheduler = cube.getInitialCuboidScheduler(); http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidTest.java b/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidTest.java index 0a77bdc..3c7dd66 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidTest.java @@ -23,7 +23,6 @@ import static org.junit.Assert.assertEquals; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.cube.CubeDescManager; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.metadata.model.DataModelManager; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -61,7 +60,6 @@ public class CuboidTest extends LocalFileMetadataTestCase { @Before public void setUp() throws Exception { this.createTestMetadata(); - DataModelManager.clearCache(); } @After http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java index 459e734..d9f9265 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java @@ -29,7 +29,6 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.metadata.model.DataModelManager; import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -40,7 +39,6 @@ public class RowKeyDecoderTest extends LocalFileMetadataTestCase { @Before public void setUp() throws Exception { this.createTestMetadata(); - DataModelManager.clearCache(); } @After http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java index dcd883e..b34197e 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java @@ -29,7 +29,6 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.metadata.model.DataModelManager; import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -40,7 +39,6 @@ public class RowKeyEncoderTest extends LocalFileMetadataTestCase { @Before public void setUp() throws Exception { this.createTestMetadata(); - DataModelManager.clearCache(); } @After http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java index e97899c..292c61c 100755 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java @@ -24,8 +24,6 @@ import java.io.DataOutputStream; import java.io.IOException; import java.util.List; import java.util.NavigableSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -35,7 +33,6 @@ import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.metadata.datatype.DataType; -import org.apache.kylin.metadata.model.DataModelManager; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.IReadableTable; import org.apache.kylin.source.IReadableTable.TableSignature; @@ -55,28 +52,13 @@ public class DictionaryManager { private static final DictionaryInfo NONE_INDICATOR = new DictionaryInfo(); - // static cached instances - private static final ConcurrentMap<KylinConfig, DictionaryManager> CACHE = new ConcurrentHashMap<KylinConfig, DictionaryManager>(); - public static DictionaryManager getInstance(KylinConfig config) { - DictionaryManager r = CACHE.get(config); - if (r == null) { - synchronized (DictionaryManager.class) { - r = CACHE.get(config); - if (r == null) { - r = new DictionaryManager(config); - CACHE.put(config, r); - if (CACHE.size() > 1) { - logger.warn("More than one singleton exist"); - } - } - } - } - return r; + return config.getManager(DictionaryManager.class); } - public static void clearCache() { - CACHE.clear(); + // called by reflection + static DictionaryManager newInstance(KylinConfig config) throws IOException { + return new DictionaryManager(config); } // ============================================================================ @@ -176,7 +158,7 @@ public class DictionaryManager { } private String checkDupByContent(DictionaryInfo dictInfo, Dictionary<String> dict) throws IOException { - ResourceStore store = DataModelManager.getInstance(config).getStore(); + ResourceStore store = getStore(); NavigableSet<String> existings = store.listResources(dictInfo.getResourceDir()); if (existings == null) return null; @@ -343,7 +325,7 @@ public class DictionaryManager { } private String checkDupByInfo(DictionaryInfo dictInfo) throws IOException { - final ResourceStore store = DataModelManager.getInstance(config).getStore(); + final ResourceStore store = getStore(); final List<DictionaryInfo> allResources = store.getAllResources(dictInfo.getResourceDir(), DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER); TableSignature input = dictInfo.getInput(); @@ -357,7 +339,7 @@ public class DictionaryManager { } private DictionaryInfo findLargestDictInfo(DictionaryInfo dictInfo) throws IOException { - final ResourceStore store = DataModelManager.getInstance(config).getStore(); + final ResourceStore store = getStore(); final List<DictionaryInfo> allResources = store.getAllResources(dictInfo.getResourceDir(), DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER); DictionaryInfo largestDict = null; @@ -376,7 +358,7 @@ public class DictionaryManager { public void removeDictionary(String resourcePath) throws IOException { logger.info("Remvoing dict: " + resourcePath); - ResourceStore store = DataModelManager.getInstance(config).getStore(); + ResourceStore store = getStore(); store.deleteResource(resourcePath); dictCache.invalidate(resourcePath); } @@ -386,7 +368,7 @@ public class DictionaryManager { info.setSourceTable(srcTable); info.setSourceColumn(srcCol); - ResourceStore store = DataModelManager.getInstance(config).getStore(); + ResourceStore store = getStore(); NavigableSet<String> existings = store.listResources(info.getResourceDir()); if (existings == null) return; @@ -396,7 +378,7 @@ public class DictionaryManager { } void save(DictionaryInfo dict) throws IOException { - ResourceStore store = DataModelManager.getInstance(config).getStore(); + ResourceStore store = getStore(); String path = dict.getResourcePath(); logger.info("Saving dictionary at " + path); @@ -412,11 +394,15 @@ public class DictionaryManager { } DictionaryInfo load(String resourcePath, boolean loadDictObj) throws IOException { - ResourceStore store = DataModelManager.getInstance(config).getStore(); + ResourceStore store = getStore(); logger.info("DictionaryManager(" + System.identityHashCode(this) + ") loading DictionaryInfo(loadDictObj:" + loadDictObj + ") at " + resourcePath); DictionaryInfo info = store.getResource(resourcePath, DictionaryInfo.class, loadDictObj ? DictionaryInfoSerializer.FULL_SERIALIZER : DictionaryInfoSerializer.INFO_SERIALIZER); return info; } + private ResourceStore getStore() { + return ResourceStore.getStore(config); + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java index c10deb4..5192805 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java @@ -20,14 +20,11 @@ package org.apache.kylin.dict.lookup; import java.io.IOException; import java.util.NavigableSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.metadata.model.DataModelManager; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.source.IReadableTable; import org.apache.kylin.source.IReadableTable.TableSignature; @@ -47,39 +44,21 @@ public class SnapshotManager { private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class); - // static cached instances - private static final ConcurrentMap<KylinConfig, SnapshotManager> SERVICE_CACHE = new ConcurrentHashMap<KylinConfig, SnapshotManager>(); - public static SnapshotManager getInstance(KylinConfig config) { - SnapshotManager r = SERVICE_CACHE.get(config); - if (r == null) { - synchronized (SnapshotManager.class) { - r = SERVICE_CACHE.get(config); - if (r == null) { - r = new SnapshotManager(config); - SERVICE_CACHE.put(config, r); - if (SERVICE_CACHE.size() > 1) { - logger.warn("More than one singleton exist"); - } - } - } - } - return r; + return config.getManager(SnapshotManager.class); } - public static void clearCache() { - synchronized (SERVICE_CACHE) { - SERVICE_CACHE.clear(); - } + // called by reflection + static SnapshotManager newInstance(KylinConfig config) throws IOException { + return new SnapshotManager(config); } // ============================================================================ private KylinConfig config; - private LoadingCache<String, SnapshotTable> snapshotCache; // resource - // path ==> - // SnapshotTable + // path ==> SnapshotTable + private LoadingCache<String, SnapshotTable> snapshotCache; // resource private SnapshotManager(KylinConfig config) { this.config = config; @@ -116,7 +95,7 @@ public class SnapshotManager { } public void removeSnapshot(String resourcePath) throws IOException { - ResourceStore store = DataModelManager.getInstance(this.config).getStore(); + ResourceStore store = getStore(); store.deleteResource(resourcePath); snapshotCache.invalidate(resourcePath); } @@ -171,7 +150,7 @@ public class SnapshotManager { } private String checkDupByInfo(SnapshotTable snapshot) throws IOException { - ResourceStore store = DataModelManager.getInstance(this.config).getStore(); + ResourceStore store = getStore(); String resourceDir = snapshot.getResourceDir(); NavigableSet<String> existings = store.listResources(resourceDir); if (existings == null) @@ -189,7 +168,7 @@ public class SnapshotManager { } private String checkDupByContent(SnapshotTable snapshot) throws IOException { - ResourceStore store = DataModelManager.getInstance(this.config).getStore(); + ResourceStore store = getStore(); String resourceDir = snapshot.getResourceDir(); NavigableSet<String> existings = store.listResources(resourceDir); if (existings == null) @@ -205,14 +184,14 @@ public class SnapshotManager { } private void save(SnapshotTable snapshot) throws IOException { - ResourceStore store = DataModelManager.getInstance(this.config).getStore(); + ResourceStore store = getStore(); String path = snapshot.getResourcePath(); store.putResource(path, snapshot, SnapshotTableSerializer.FULL_SERIALIZER); } private SnapshotTable load(String resourcePath, boolean loadData) throws IOException { logger.info("Loading snapshotTable from " + resourcePath + ", with loadData: " + loadData); - ResourceStore store = DataModelManager.getInstance(this.config).getStore(); + ResourceStore store = getStore(); SnapshotTable table = store.getResource(resourcePath, SnapshotTable.class, loadData ? SnapshotTableSerializer.FULL_SERIALIZER : SnapshotTableSerializer.INFO_SERIALIZER); @@ -222,4 +201,7 @@ public class SnapshotManager { return table; } + private ResourceStore getStore() { + return ResourceStore.getStore(this.config); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java index 16875b1..a396b92 100644 --- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java +++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java @@ -23,15 +23,12 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.NavigableSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.job.exception.PersistentException; -import org.apache.kylin.metadata.model.DataModelManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,36 +41,23 @@ public class ExecutableDao { private static final Serializer<ExecutablePO> JOB_SERIALIZER = new JsonSerializer<ExecutablePO>(ExecutablePO.class); private static final Serializer<ExecutableOutputPO> JOB_OUTPUT_SERIALIZER = new JsonSerializer<ExecutableOutputPO>(ExecutableOutputPO.class); private static final Logger logger = LoggerFactory.getLogger(ExecutableDao.class); - private static final ConcurrentMap<KylinConfig, ExecutableDao> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableDao>(); - - private ResourceStore store; public static ExecutableDao getInstance(KylinConfig config) { - ExecutableDao r = CACHE.get(config); - if (r == null) { - synchronized (ExecutableDao.class) { - r = CACHE.get(config); - if (r == null) { - r = new ExecutableDao(config); - CACHE.put(config, r); - if (CACHE.size() > 1) { - logger.warn("More than one singleton exist"); - } - } - } - } - return r; + return config.getManager(ExecutableDao.class); } - public static void clearCache() { - synchronized (CACHE) { - CACHE.clear(); - } + // called by reflection + static ExecutableDao newInstance(KylinConfig config) throws IOException { + return new ExecutableDao(config); } + // ============================================================================ + + private ResourceStore store; + private ExecutableDao(KylinConfig config) { logger.info("Using metadata url: " + config); - this.store = DataModelManager.getInstance(config).getStore(); + this.store = ResourceStore.getStore(config); } private String pathOfJob(ExecutablePO job) { http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index bab8c30..9f67a2b 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -22,13 +22,12 @@ import static org.apache.kylin.job.constant.ExecutableConstants.MR_JOB_ID; import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_ID; import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_URL; +import java.io.IOException; import java.lang.reflect.Constructor; import java.util.HashMap; import java.util.IllegalFormatException; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; @@ -51,8 +50,18 @@ import com.google.common.collect.Maps; public class ExecutableManager { private static final Logger logger = LoggerFactory.getLogger(ExecutableManager.class); - private static final ConcurrentMap<KylinConfig, ExecutableManager> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableManager>(); + public static ExecutableManager getInstance(KylinConfig config) { + return config.getManager(ExecutableManager.class); + } + + // called by reflection + static ExecutableManager newInstance(KylinConfig config) throws IOException { + return new ExecutableManager(config); + } + + // ============================================================================ + private final KylinConfig config; private final ExecutableDao executableDao; @@ -62,27 +71,6 @@ public class ExecutableManager { this.executableDao = ExecutableDao.getInstance(config); } - public static ExecutableManager getInstance(KylinConfig config) { - ExecutableManager r = CACHE.get(config); - if (r == null) { - synchronized (ExecutableManager.class) { - r = CACHE.get(config); - if (r == null) { - r = new ExecutableManager(config); - CACHE.put(config, r); - if (CACHE.size() > 1) { - logger.warn("More than one singleton exist"); - } - } - } - } - return r; - } - - public static void clearCache() { - CACHE.clear(); - } - private static ExecutablePO parse(AbstractExecutable executable) { ExecutablePO result = new ExecutablePO(); result.setName(executable.getName()); http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java index 6d41c5e..2c934d0 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java @@ -22,8 +22,6 @@ import java.io.Closeable; import java.io.IOException; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -63,6 +61,22 @@ import com.google.common.collect.Maps; * 3. add all the job servers and query servers to the kylin.server.cluster-servers */ public class DistributedScheduler implements Scheduler<AbstractExecutable>, ConnectionStateListener { + private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class); + + private final static String SEGMENT_ID = "segmentId"; + public static final String ZOOKEEPER_LOCK_PATH = "/job_engine/lock"; // note ZookeeperDistributedLock will ensure zk path prefix: /${kylin.env.zookeeper-base-path}/metadata + + public static DistributedScheduler getInstance(KylinConfig config) { + return config.getManager(DistributedScheduler.class); + } + + // called by reflection + static DistributedScheduler newInstance(KylinConfig config) throws IOException { + return new DistributedScheduler(); + } + + // ============================================================================ + private ExecutableManager executableManager; private FetcherRunner fetcher; private ScheduledExecutorService fetcherPool; @@ -72,8 +86,6 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn private DistributedLock jobLock; private Closeable lockWatch; - private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class); - private static final ConcurrentMap<KylinConfig, DistributedScheduler> CACHE = new ConcurrentHashMap<KylinConfig, DistributedScheduler>(); //keep all segments having running job private final Set<String> segmentWithLocks = new CopyOnWriteArraySet<>(); private volatile boolean initialized = false; @@ -81,31 +93,6 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn private JobEngineConfig jobEngineConfig; private String serverName; - private final static String SEGMENT_ID = "segmentId"; - public static final String ZOOKEEPER_LOCK_PATH = "/job_engine/lock"; // note ZookeeperDistributedLock will ensure zk path prefix: /${kylin.env.zookeeper-base-path}/metadata - - //only for it test - public static DistributedScheduler getInstance(KylinConfig config) { - DistributedScheduler r = CACHE.get(config); - if (r == null) { - synchronized (DistributedScheduler.class) { - r = CACHE.get(config); - if (r == null) { - r = new DistributedScheduler(); - CACHE.put(config, r); - if (CACHE.size() > 1) { - logger.warn("More than one singleton exist"); - } - } - } - } - return r; - } - - public static void clearCache() { - CACHE.clear(); - } - private class FetcherRunner implements Runnable { @Override synchronized public void run() { http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java index 84133af..af275a5 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java @@ -26,12 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import javax.annotation.Nullable; - -import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.RawResource; @@ -50,8 +45,6 @@ import org.apache.kylin.metadata.project.ProjectManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Function; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -67,43 +60,13 @@ public class TableMetadataManager { public static final Serializer<ExternalFilterDesc> EXTERNAL_FILTER_DESC_SERIALIZER = new JsonSerializer<ExternalFilterDesc>( ExternalFilterDesc.class); - // static cached instances - private static final ConcurrentMap<KylinConfig, TableMetadataManager> CACHE = new ConcurrentHashMap<KylinConfig, TableMetadataManager>(); - public static TableMetadataManager getInstance(KylinConfig config) { - TableMetadataManager r = CACHE.get(config); - if (r != null) { - return r; - } - - synchronized (TableMetadataManager.class) { - r = CACHE.get(config); - if (r != null) { - return r; - } - try { - r = new TableMetadataManager(config); - CACHE.put(config, r); - if (CACHE.size() > 1) { - logger.warn("More than one singleton exist, current keys: {}", StringUtils - .join(Iterators.transform(CACHE.keySet().iterator(), new Function<KylinConfig, String>() { - @Nullable - @Override - public String apply(@Nullable KylinConfig input) { - return String.valueOf(System.identityHashCode(input)); - } - }), ",")); - } - - return r; - } catch (IOException e) { - throw new IllegalStateException("Failed to init TableMetadataManager from " + config, e); - } - } + return config.getManager(TableMetadataManager.class); } - public static void clearCache() { - CACHE.clear(); + // called by reflection + static TableMetadataManager newInstance(KylinConfig config) throws IOException { + return new TableMetadataManager(config); } // ============================================================================ @@ -148,7 +111,6 @@ public class TableMetadataManager { return globalTables; } - ProjectInstance project = ProjectManager.getInstance(config).getProject(prj); Set<String> prjTableNames = project.getTables(); @@ -359,10 +321,6 @@ public class TableMetadataManager { } private class SrcTableSyncListener extends Broadcaster.Listener { - @Override - public void onClearAll(Broadcaster broadcaster) throws IOException { - clearCache(); - } @Override public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) @@ -387,10 +345,6 @@ public class TableMetadataManager { } private class SrcTableExtSyncListener extends Broadcaster.Listener { - @Override - public void onClearAll(Broadcaster broadcaster) throws IOException { - clearCache(); - } @Override public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) @@ -403,10 +357,6 @@ public class TableMetadataManager { } private class ExtFilterSyncListener extends Broadcaster.Listener { - @Override - public void onClearAll(Broadcaster broadcaster) throws IOException { - clearCache(); - } @Override public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-metadata/src/main/java/org/apache/kylin/metadata/TempStatementManager.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/TempStatementManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/TempStatementManager.java index 6487eef..30ff934 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/TempStatementManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/TempStatementManager.java @@ -21,12 +21,7 @@ package org.apache.kylin.metadata; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import javax.annotation.Nullable; - -import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.ResourceStore; @@ -39,49 +34,19 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Function; -import com.google.common.collect.Iterators; public class TempStatementManager { private static final Logger logger = LoggerFactory.getLogger(TempStatementManager.class); - private static final ConcurrentMap<KylinConfig, TempStatementManager> CACHE = new ConcurrentHashMap<>(); public static final Serializer<TempStatementEntity> TEMP_STATEMENT_SERIALIZER = new JsonSerializer<>( TempStatementEntity.class); public static TempStatementManager getInstance(KylinConfig config) { - TempStatementManager r = CACHE.get(config); - if (r != null) { - return r; - } - - synchronized (TempStatementManager.class) { - r = CACHE.get(config); - if (r != null) { - return r; - } - try { - r = new TempStatementManager(config); - CACHE.put(config, r); - if (CACHE.size() > 1) { - logger.warn("More than one singleton exist, current keys: {}", StringUtils - .join(Iterators.transform(CACHE.keySet().iterator(), new Function<KylinConfig, String>() { - @Nullable - @Override - public String apply(@Nullable KylinConfig input) { - return String.valueOf(System.identityHashCode(input)); - } - }), ",")); - } - - return r; - } catch (IOException e) { - throw new IllegalStateException("Failed to init TableMetadataManager from " + config, e); - } - } + return config.getManager(TempStatementManager.class); } - public static void clearCache() { - CACHE.clear(); + // called by reflection + static TempStatementManager newInstance(KylinConfig config) throws IOException { + return new TempStatementManager(config); } // ============================================================================ @@ -195,10 +160,6 @@ public class TempStatementManager { } private class TempStatementSyncListener extends Broadcaster.Listener { - @Override - public void onClearAll(Broadcaster broadcaster) throws IOException { - clearCache(); - } @Override public void onEntityChange(Broadcaster broadcaster, String entity, Broadcaster.Event event, String cacheKey) @@ -210,6 +171,7 @@ public class TempStatementManager { } } + @SuppressWarnings("serial") @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) private static class TempStatementEntity extends RootPersistentEntity { private static final String DEFAULT_SESSION_ID = "DEFAULT_SESSION"; http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-metadata/src/main/java/org/apache/kylin/metadata/acl/TableACLManager.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/acl/TableACLManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/acl/TableACLManager.java index 9c802b4..905fa16 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/acl/TableACLManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/acl/TableACLManager.java @@ -20,8 +20,6 @@ package org.apache.kylin.metadata.acl; import java.io.IOException; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.JsonSerializer; @@ -41,40 +39,13 @@ public class TableACLManager { private static final Serializer<TableACL> TABLE_ACL_SERIALIZER = new JsonSerializer<>(TableACL.class); private static final String DIR_PREFIX = "/table_acl/"; - // static cached instances - private static final ConcurrentMap<KylinConfig, TableACLManager> CACHE = new ConcurrentHashMap<>(); - public static TableACLManager getInstance(KylinConfig config) { - TableACLManager r = CACHE.get(config); - if (r != null) { - return r; - } - - synchronized (TableACLManager.class) { - r = CACHE.get(config); - if (r != null) { - return r; - } - try { - r = new TableACLManager(config); - CACHE.put(config, r); - if (CACHE.size() > 1) { - logger.warn("More than one singleton exist"); - } - return r; - } catch (IOException e) { - throw new IllegalStateException("Failed to init CubeDescManager from " + config, e); - } - } + return config.getManager(TableACLManager.class); } - public static void clearCache() { - CACHE.clear(); - } - - public static void clearCache(KylinConfig kylinConfig) { - if (kylinConfig != null) - CACHE.remove(kylinConfig); + // called by reflection + static TableACLManager newInstance(KylinConfig config) throws IOException { + return new TableACLManager(config); } // ============================================================================ @@ -92,10 +63,6 @@ public class TableACLManager { } private class TableACLSyncListener extends Broadcaster.Listener { - @Override - public void onClearAll(Broadcaster broadcaster) throws IOException { - clearCache(); - } @Override public void onEntityChange(Broadcaster broadcaster, String entity, Broadcaster.Event event, String cacheKey) throws IOException { http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java index a916254..c04e9eb 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java @@ -20,8 +20,6 @@ package org.apache.kylin.metadata.badquery; import java.io.IOException; import java.util.NavigableSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; @@ -35,8 +33,18 @@ import org.slf4j.LoggerFactory; public class BadQueryHistoryManager { public static final Serializer<BadQueryHistory> BAD_QUERY_INSTANCE_SERIALIZER = new JsonSerializer<>(BadQueryHistory.class); private static final Logger logger = LoggerFactory.getLogger(BadQueryHistoryManager.class); + + public static BadQueryHistoryManager getInstance(KylinConfig config) { + return config.getManager(BadQueryHistoryManager.class); + } + + // called by reflection + static BadQueryHistoryManager newInstance(KylinConfig config) throws IOException { + return new BadQueryHistoryManager(config); + } + + // ============================================================================ - private static final ConcurrentMap<KylinConfig, BadQueryHistoryManager> CACHE = new ConcurrentHashMap<>(); private KylinConfig kylinConfig; private BadQueryHistoryManager(KylinConfig config) throws IOException { @@ -44,34 +52,6 @@ public class BadQueryHistoryManager { this.kylinConfig = config; } - public static BadQueryHistoryManager getInstance(KylinConfig config) { - BadQueryHistoryManager r = CACHE.get(config); - if (r != null) { - return r; - } - - synchronized (BadQueryHistoryManager.class) { - r = CACHE.get(config); - if (r != null) { - return r; - } - try { - r = new BadQueryHistoryManager(config); - CACHE.put(config, r); - if (CACHE.size() > 1) { - logger.warn("More than one singleton exist"); - } - return r; - } catch (IOException e) { - throw new IllegalStateException("Failed to init BadQueryHistoryManager from " + config, e); - } - } - } - - public static void clearCache() { - CACHE.clear(); - } - private ResourceStore getStore() { return ResourceStore.getStore(this.kylinConfig); } http://git-wip-us.apache.org/repos/asf/kylin/blob/b8d79870/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java index 6558692..05910ea 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java @@ -24,8 +24,6 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingDeque; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; @@ -67,39 +65,13 @@ public class Broadcaster { public static final String SYNC_PRJ_DATA = "project_data"; // the special entity to indicate project data has change, e.g. cube/raw_table update public static final String SYNC_PRJ_ACL = "project_acl"; // the special entity to indicate query ACL has change, e.g. table_acl/learn_kylin update - // static cached instances - private static final ConcurrentMap<KylinConfig, Broadcaster> CACHE = new ConcurrentHashMap<KylinConfig, Broadcaster>(); - public static Broadcaster getInstance(KylinConfig config) { - - synchronized (CACHE) { - Broadcaster r = CACHE.get(config); - if (r != null) { - return r; - } - - r = new Broadcaster(config); - CACHE.put(config, r); - if (CACHE.size() > 1) { - logger.warn("More than one singleton exist"); - } - return r; - } + return config.getManager(Broadcaster.class); } - - // call Broadcaster.getInstance().notifyClearAll() to clear cache - public static void clearCache() { - synchronized (CACHE) { - CACHE.clear(); - } - } - - public static void clearCache(KylinConfig kylinConfig) { - if (kylinConfig != null) { - synchronized (CACHE) { - CACHE.remove(kylinConfig); - } - } + + // called by reflection + static Broadcaster newInstance(KylinConfig config) { + return new Broadcaster(config); } // ============================================================================ @@ -260,7 +232,7 @@ public class Broadcaster { for (Listener l : list) { l.onClearAll(this); } - clearCache(); // clear broadcaster too in the end + config.clearManagers(); // clear all registered managers in config break; case SYNC_PRJ_SCHEMA: ProjectManager.getInstance(config).clearL2Cache();