KYLIN_2464, replace ConcurrentHashMap with ConcurrentMap Signed-off-by: Li Yang <liy...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/693c6faf Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/693c6faf Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/693c6faf Branch: refs/heads/master-hbase0.98 Commit: 693c6faf2f38d19816c8e0c9e0a48c6caaaf18cd Parents: 6adb73d Author: Cheng Wang <cheng.w...@kyligence.io> Authored: Fri Feb 24 18:33:48 2017 +0800 Committer: Li Yang <liy...@apache.org> Committed: Fri Feb 24 18:46:44 2017 +0800 ---------------------------------------------------------------------- .../kylin/common/persistence/ResourceStore.java | 4 ++-- .../common/util/MemoryBudgetController.java | 3 ++- .../org/apache/kylin/cube/CubeDescManager.java | 16 +++++++------- .../java/org/apache/kylin/cube/CubeManager.java | 3 ++- .../apache/kylin/dict/DictionaryManager.java | 3 ++- .../kylin/dict/lookup/SnapshotManager.java | 3 ++- .../org/apache/kylin/job/dao/ExecutableDao.java | 3 ++- .../kylin/job/execution/ExecutableManager.java | 22 +++++++++++--------- .../impl/threadpool/DistributedScheduler.java | 3 ++- .../apache/kylin/metadata/MetadataManager.java | 3 ++- .../badquery/BadQueryHistoryManager.java | 3 ++- .../kylin/metadata/cachesync/Broadcaster.java | 3 ++- .../kylin/metadata/project/ProjectManager.java | 13 ++++++------ .../realization/RealizationRegistry.java | 3 ++- .../metadata/streaming/StreamingManager.java | 3 ++- .../kylin/storage/hybrid/HybridManager.java | 3 ++- .../kylin/source/kafka/KafkaConfigManager.java | 9 ++++---- 17 files changed, 58 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java index 77143b0..b0e06f5 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.List; import java.util.NavigableSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; @@ -63,10 +64,9 @@ abstract public class ResourceStore { public static final String CUBE_STATISTICS_ROOT = "/cube_statistics"; public static final String BAD_QUERY_RESOURCE_ROOT = "/bad_query"; - protected static final String DEFAULT_STORE_NAME = "kylin_metadata"; - private static final ConcurrentHashMap<KylinConfig, ResourceStore> CACHE = new ConcurrentHashMap<KylinConfig, ResourceStore>(); + private static final ConcurrentMap<KylinConfig, ResourceStore> CACHE = new ConcurrentHashMap<KylinConfig, ResourceStore>(); private static final ArrayList<Class<? extends ResourceStore>> knownImpl = new ArrayList<Class<? extends ResourceStore>>(); http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java index ade929c..7a0b919 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java @@ -19,6 +19,7 @@ package org.apache.kylin.common.util; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; @@ -64,7 +65,7 @@ public class MemoryBudgetController { // all budget numbers are in MB private final int totalBudgetMB; - private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>(); + private final ConcurrentMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>(); private int totalReservedMB; private final ReentrantLock lock = new ReentrantLock(); http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java index 1bf7e97..00fa705 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; @@ -63,7 +64,7 @@ public class CubeDescManager { public static final Serializer<CubeDesc> CUBE_DESC_SERIALIZER = new JsonSerializer<CubeDesc>(CubeDesc.class); // static cached instances - private static final ConcurrentHashMap<KylinConfig, CubeDescManager> CACHE = new ConcurrentHashMap<KylinConfig, CubeDescManager>(); + private static final ConcurrentMap<KylinConfig, CubeDescManager> CACHE = new ConcurrentHashMap<KylinConfig, CubeDescManager>(); public static CubeDescManager getInstance(KylinConfig config) { CubeDescManager r = CACHE.get(config); @@ -103,14 +104,14 @@ public class CubeDescManager { logger.info("Initializing CubeDescManager with config " + config); this.config = config; this.cubeDescMap = new CaseInsensitiveStringCache<CubeDesc>(config, "cube_desc"); - + // touch lower level metadata before registering my listener reloadAllCubeDesc(); Broadcaster.getInstance(config).registerListener(new CubeDescSyncListener(), "cube_desc"); } - + private class CubeDescSyncListener extends Broadcaster.Listener { - + @Override public void onClearAll(Broadcaster broadcaster) throws IOException { clearCache(); @@ -122,7 +123,7 @@ public class CubeDescManager { for (IRealization real : ProjectManager.getInstance(config).listAllRealizations(project)) { if (real instanceof CubeInstance) { String descName = ((CubeInstance) real).getDescName(); - reloadCubeDescLocal(descName); + reloadCubeDescLocal(descName); } } } @@ -132,12 +133,12 @@ public class CubeDescManager { String cubeDescName = cacheKey; CubeDesc cubeDesc = getCubeDesc(cubeDescName); String modelName = cubeDesc == null ? null : cubeDesc.getModel().getName(); - + if (event == Event.DROP) removeLocalCubeDesc(cubeDescName); else reloadCubeDescLocal(cubeDescName); - + for (ProjectInstance prj : ProjectManager.getInstance(config).findProjectsByModel(modelName)) { broadcaster.notifyProjectSchemaUpdate(prj.getName()); } @@ -237,7 +238,6 @@ public class CubeDescManager { return cubeDesc; } - /** * if there is some change need be applied after getting a cubeDesc from front-end, do it here * @param cubeDesc http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 9670b89..073f516 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -33,6 +33,7 @@ import java.util.Random; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; @@ -89,7 +90,7 @@ public class CubeManager implements IRealizationProvider { private static final Logger logger = LoggerFactory.getLogger(CubeManager.class); // static cached instances - private static final ConcurrentHashMap<KylinConfig, CubeManager> CACHE = new ConcurrentHashMap<KylinConfig, CubeManager>(); + private static final ConcurrentMap<KylinConfig, CubeManager> CACHE = new ConcurrentHashMap<KylinConfig, CubeManager>(); public static CubeManager getInstance(KylinConfig config) { CubeManager r = CACHE.get(config); http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java index 3ba24cf..427bd14 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.List; import java.util.NavigableSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -59,7 +60,7 @@ public class DictionaryManager { private static final DictionaryInfo NONE_INDICATOR = new DictionaryInfo(); // static cached instances - private static final ConcurrentHashMap<KylinConfig, DictionaryManager> CACHE = new ConcurrentHashMap<KylinConfig, DictionaryManager>(); + private static final ConcurrentMap<KylinConfig, DictionaryManager> CACHE = new ConcurrentHashMap<KylinConfig, DictionaryManager>(); public static DictionaryManager getInstance(KylinConfig config) { DictionaryManager r = CACHE.get(config); http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java index b45d017..a912696 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java @@ -21,6 +21,7 @@ package org.apache.kylin.dict.lookup; import java.io.IOException; import java.util.NavigableSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -47,7 +48,7 @@ public class SnapshotManager { private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class); // static cached instances - private static final ConcurrentHashMap<KylinConfig, SnapshotManager> SERVICE_CACHE = new ConcurrentHashMap<KylinConfig, SnapshotManager>(); + private static final ConcurrentMap<KylinConfig, SnapshotManager> SERVICE_CACHE = new ConcurrentHashMap<KylinConfig, SnapshotManager>(); public static SnapshotManager getInstance(KylinConfig config) { SnapshotManager r = SERVICE_CACHE.get(config); http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java index 96505e6..70799d8 100644 --- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java +++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.List; import java.util.NavigableSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.JsonSerializer; @@ -43,7 +44,7 @@ public class ExecutableDao { private static final Serializer<ExecutablePO> JOB_SERIALIZER = new JsonSerializer<ExecutablePO>(ExecutablePO.class); private static final Serializer<ExecutableOutputPO> JOB_OUTPUT_SERIALIZER = new JsonSerializer<ExecutableOutputPO>(ExecutableOutputPO.class); private static final Logger logger = LoggerFactory.getLogger(ExecutableDao.class); - private static final ConcurrentHashMap<KylinConfig, ExecutableDao> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableDao>(); + private static final ConcurrentMap<KylinConfig, ExecutableDao> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableDao>(); private ResourceStore store; http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index 48cedb5..0c86d72 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -18,9 +18,13 @@ package org.apache.kylin.job.execution; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import java.lang.reflect.Constructor; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.job.dao.ExecutableDao; @@ -31,18 +35,16 @@ import org.apache.kylin.job.exception.PersistentException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Constructor; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; /** */ public class ExecutableManager { private static final Logger logger = LoggerFactory.getLogger(ExecutableManager.class); - private static final ConcurrentHashMap<KylinConfig, ExecutableManager> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableManager>(); + private static final ConcurrentMap<KylinConfig, ExecutableManager> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableManager>(); private final KylinConfig config; private final ExecutableDao executableDao; @@ -347,7 +349,7 @@ public class ExecutableManager { for (AbstractExecutable task : tasks) { if (task.getId().compareTo(stepId) >= 0) { logger.debug("rollback task : " + task); - updateJobOutput(task.getId(), ExecutableState.READY, Maps.<String, String>newHashMap(), ""); + updateJobOutput(task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), ""); } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java index 84e62d5..1f2e958 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java @@ -23,6 +23,7 @@ import java.net.UnknownHostException; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -72,7 +73,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn private DistributedJobLock jobLock; private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class); - private static final ConcurrentHashMap<KylinConfig, DistributedScheduler> CACHE = new ConcurrentHashMap<KylinConfig, DistributedScheduler>(); + private static final ConcurrentMap<KylinConfig, DistributedScheduler> CACHE = new ConcurrentHashMap<KylinConfig, DistributedScheduler>(); //keep all segments having running job private final Set<String> segmentWithLocks = new CopyOnWriteArraySet<>(); private volatile boolean initialized = false; http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java index 49ec96e..9427ace 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.JsonSerializer; @@ -65,7 +66,7 @@ public class MetadataManager { public static final Serializer<ExternalFilterDesc> EXTERNAL_FILTER_DESC_SERIALIZER = new JsonSerializer<ExternalFilterDesc>(ExternalFilterDesc.class); // static cached instances - private static final ConcurrentHashMap<KylinConfig, MetadataManager> CACHE = new ConcurrentHashMap<KylinConfig, MetadataManager>(); + private static final ConcurrentMap<KylinConfig, MetadataManager> CACHE = new ConcurrentHashMap<KylinConfig, MetadataManager>(); public static MetadataManager getInstance(KylinConfig config) { MetadataManager r = CACHE.get(config); http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java index 86e282e..c7eb133 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java @@ -21,6 +21,7 @@ package org.apache.kylin.metadata.badquery; import java.io.IOException; import java.util.NavigableSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; @@ -35,7 +36,7 @@ public class BadQueryHistoryManager { public static final Serializer<BadQueryHistory> BAD_QUERY_INSTANCE_SERIALIZER = new JsonSerializer<>(BadQueryHistory.class); private static final Logger logger = LoggerFactory.getLogger(BadQueryHistoryManager.class); - private static final ConcurrentHashMap<KylinConfig, BadQueryHistoryManager> CACHE = new ConcurrentHashMap<>(); + private static final ConcurrentMap<KylinConfig, BadQueryHistoryManager> CACHE = new ConcurrentHashMap<>(); private KylinConfig kylinConfig; private BadQueryHistoryManager(KylinConfig config) throws IOException { http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java index 17b644d..5b45d9e 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.BlockingDeque; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; @@ -63,7 +64,7 @@ public class Broadcaster { public static final String SYNC_PRJ_DATA = "project_data"; // the special entity to indicate project data has change, e.g. cube/raw_table update // static cached instances - private static final ConcurrentHashMap<KylinConfig, Broadcaster> CACHE = new ConcurrentHashMap<KylinConfig, Broadcaster>(); + private static final ConcurrentMap<KylinConfig, Broadcaster> CACHE = new ConcurrentHashMap<KylinConfig, Broadcaster>(); public static Broadcaster getInstance(KylinConfig config) { http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java index ca4f7f1..bb1e3ed 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.JsonSerializer; @@ -48,7 +49,7 @@ import com.google.common.collect.Lists; public class ProjectManager { private static final Logger logger = LoggerFactory.getLogger(ProjectManager.class); - private static final ConcurrentHashMap<KylinConfig, ProjectManager> CACHE = new ConcurrentHashMap<KylinConfig, ProjectManager>(); + private static final ConcurrentMap<KylinConfig, ProjectManager> CACHE = new ConcurrentHashMap<KylinConfig, ProjectManager>(); public static final Serializer<ProjectInstance> PROJECT_SERIALIZER = new JsonSerializer<ProjectInstance>(ProjectInstance.class); public static ProjectManager getInstance(KylinConfig config) { @@ -98,7 +99,7 @@ public class ProjectManager { } private class ProjectSyncListener extends Broadcaster.Listener { - + @Override public void onClearAll(Broadcaster broadcaster) throws IOException { clearCache(); @@ -107,12 +108,12 @@ public class ProjectManager { @Override public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException { String project = cacheKey; - + if (event == Event.DROP) removeProjectLocal(project); else reloadProjectLocal(project); - + broadcaster.notifyProjectSchemaUpdate(project); broadcaster.notifyProjectDataUpdate(project); } @@ -249,7 +250,7 @@ public class ProjectManager { projectMap.remove(norm(proj.getName())); clearL2Cache(); } - + private void removeProjectLocal(String proj) { projectMap.remove(norm(proj)); clearL2Cache(); @@ -393,7 +394,7 @@ public class ProjectManager { } return projects; } - + public ExternalFilterDesc getExternalFilterDesc(String project, String extFilter) { return l2Cache.getExternalFilterDesc(project, extFilter); } http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java index 77e2679..2d1a4a5 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ClassUtil; @@ -37,7 +38,7 @@ import com.google.common.collect.Maps; public class RealizationRegistry { private static final Logger logger = LoggerFactory.getLogger(RealizationRegistry.class); - private static final ConcurrentHashMap<KylinConfig, RealizationRegistry> CACHE = new ConcurrentHashMap<KylinConfig, RealizationRegistry>(); + private static final ConcurrentMap<KylinConfig, RealizationRegistry> CACHE = new ConcurrentHashMap<KylinConfig, RealizationRegistry>(); public static RealizationRegistry getInstance(KylinConfig config) { RealizationRegistry r = CACHE.get(config); http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java index 8cfe87d..48febeb 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; @@ -42,7 +43,7 @@ public class StreamingManager { private static final Logger logger = LoggerFactory.getLogger(StreamingManager.class); // static cached instances - private static final ConcurrentHashMap<KylinConfig, StreamingManager> CACHE = new ConcurrentHashMap<KylinConfig, StreamingManager>(); + private static final ConcurrentMap<KylinConfig, StreamingManager> CACHE = new ConcurrentHashMap<KylinConfig, StreamingManager>(); public static final Serializer<StreamingConfig> STREAMING_SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class); http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java index 748e873..2d330c0 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; @@ -47,7 +48,7 @@ public class HybridManager implements IRealizationProvider { private static final Logger logger = LoggerFactory.getLogger(HybridManager.class); // static cached instances - private static final ConcurrentHashMap<KylinConfig, HybridManager> CACHE = new ConcurrentHashMap<KylinConfig, HybridManager>(); + private static final ConcurrentMap<KylinConfig, HybridManager> CACHE = new ConcurrentHashMap<KylinConfig, HybridManager>(); public static HybridManager getInstance(KylinConfig config) { HybridManager r = CACHE.get(config); http://git-wip-us.apache.org/repos/asf/kylin/blob/693c6faf/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java index 775f052..50295c3 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; @@ -43,7 +44,7 @@ public class KafkaConfigManager { private static final Logger logger = LoggerFactory.getLogger(KafkaConfigManager.class); // static cached instances - private static final ConcurrentHashMap<KylinConfig, KafkaConfigManager> CACHE = new ConcurrentHashMap<KylinConfig, KafkaConfigManager>(); + private static final ConcurrentMap<KylinConfig, KafkaConfigManager> CACHE = new ConcurrentHashMap<KylinConfig, KafkaConfigManager>(); private KylinConfig config; @@ -59,7 +60,7 @@ public class KafkaConfigManager { private KafkaConfigManager(KylinConfig config) throws IOException { this.config = config; this.kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(config, "kafka"); - + // touch lower level metadata before registering my listener reloadAllKafkaConfig(); Broadcaster.getInstance(config).registerListener(new KafkaSyncListener(), "kafka"); @@ -195,7 +196,7 @@ public class KafkaConfigManager { throw new IllegalArgumentException("No topic info"); } - if (kafkaConfig.getKafkaClusterConfigs() == null || kafkaConfig.getKafkaClusterConfigs().size() ==0) { + if (kafkaConfig.getKafkaClusterConfigs() == null || kafkaConfig.getKafkaClusterConfigs().size() == 0) { throw new IllegalArgumentException("No cluster info"); } @@ -213,7 +214,7 @@ public class KafkaConfigManager { private void removeKafkaConfigLocal(String name) { kafkaMap.remove(name); } - + private void reloadAllKafkaConfig() throws IOException { ResourceStore store = getStore(); logger.info("Reloading Kafka Metadata from folder " + store.getReadableResourcePath(ResourceStore.KAFKA_RESOURCE_ROOT));