KYLIN-2834 fix bug in Broadcaster, lost listener after cache wipe
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ecf4819e Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ecf4819e Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ecf4819e Branch: refs/heads/ranger Commit: ecf4819eb39b8d22ea82c70d6679200c0b2602bb Parents: 26c03fe Author: Li Yang <liy...@apache.org> Authored: Sat Sep 2 06:49:34 2017 +0800 Committer: Hongbin Ma <m...@kyligence.io> Committed: Tue Sep 5 16:58:30 2017 +0800 ---------------------------------------------------------------------- .../kylin/metadata/cachesync/Broadcaster.java | 49 +++++++++----- .../metadata/cachesync/BroadcasterTest.java | 70 ++++++++++++++++++++ .../kylin/rest/controller/CubeController.java | 2 +- .../rest/controller2/CubeControllerV2.java | 2 +- .../apache/kylin/rest/service/CacheService.java | 12 ++-- .../apache/kylin/rest/service/CubeService.java | 27 +++----- 6 files changed, 121 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/ecf4819e/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java index 4b0ef57..26e6f49 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java @@ -103,8 +103,9 @@ public class Broadcaster { // ============================================================================ - private KylinConfig config; + static final Map<String, List<Listener>> staticListenerMap = Maps.newConcurrentMap(); + private KylinConfig config; private BlockingDeque<BroadcastEvent> broadcastEvents = new LinkedBlockingDeque<>(); private Map<String, List<Listener>> listenerMap = Maps.newConcurrentMap(); private AtomicLong counter = new AtomicLong(); @@ -158,31 +159,40 @@ public class Broadcaster { }); } + // static listener survives cache wipe and goes after normal listeners + public void registerStaticListener(Listener listener, String... entities) { + doRegisterListener(staticListenerMap, listener, entities); + } + public void registerListener(Listener listener, String... entities) { - synchronized (listenerMap) { + doRegisterListener(listenerMap, listener, entities); + } + + private static void doRegisterListener(Map<String, List<Listener>> lmap, Listener listener, String... entities) { + synchronized (lmap) { // ignore re-registration - List<Listener> all = listenerMap.get(SYNC_ALL); + List<Listener> all = lmap.get(SYNC_ALL); if (all != null && all.contains(listener)) { return; } for (String entity : entities) { if (!StringUtils.isBlank(entity)) - addListener(entity, listener); + addListener(lmap, entity, listener); } - addListener(SYNC_ALL, listener); - addListener(SYNC_PRJ_SCHEMA, listener); - addListener(SYNC_PRJ_DATA, listener); + addListener(lmap, SYNC_ALL, listener); + addListener(lmap, SYNC_PRJ_SCHEMA, listener); + addListener(lmap, SYNC_PRJ_DATA, listener); } } - private void addListener(String entity, Listener listener) { - List<Listener> list = listenerMap.get(entity); + private static void addListener(Map<String, List<Listener>> lmap, String entity, Listener listener) { + List<Listener> list = lmap.get(entity); if (list == null) { list = new ArrayList<>(); + lmap.put(entity, list); } list.add(listener); - listenerMap.put(entity, list); } public void notifyClearAll() throws IOException { @@ -198,15 +208,19 @@ public class Broadcaster { } public void notifyListener(String entity, Event event, String cacheKey) throws IOException { - List<Listener> list = listenerMap.get(entity); - if (list == null) + // prevents concurrent modification exception + List<Listener> list = Lists.newArrayList(); + List<Listener> l1 = listenerMap.get(entity); // normal listeners first + if (l1 != null) + list.addAll(l1); + List<Listener> l2 = staticListenerMap.get(entity); // static listeners second + if (l2 != null) + list.addAll(l2); + if (list.isEmpty()) return; - logger.trace("Broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey - + ", listeners=" + list); + logger.debug("Broadcasting" + event + ", " + entity + ", " + cacheKey); - // prevents concurrent modification exception - list = Lists.newArrayList(list); switch (entity) { case SYNC_ALL: for (Listener l : list) { @@ -233,8 +247,7 @@ public class Broadcaster { break; } - logger.debug( - "Done broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey); + logger.debug("Done broadcasting" + event + ", " + entity + ", " + cacheKey); } /** http://git-wip-us.apache.org/repos/asf/kylin/blob/ecf4819e/core-metadata/src/test/java/org/apache/kylin/metadata/cachesync/BroadcasterTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/cachesync/BroadcasterTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/cachesync/BroadcasterTest.java new file mode 100644 index 0000000..88cf404 --- /dev/null +++ b/core-metadata/src/test/java/org/apache/kylin/metadata/cachesync/BroadcasterTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metadata.cachesync; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.metadata.cachesync.Broadcaster.Event; +import org.apache.kylin.metadata.cachesync.Broadcaster.Listener; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class BroadcasterTest extends LocalFileMetadataTestCase { + + @Before + public void setUp() throws Exception { + this.createTestMetadata(); + } + + @After + public void after() throws Exception { + this.cleanupTestMetadata(); + } + + @Test + public void testBasics() throws IOException { + Broadcaster broadcaster = Broadcaster.getInstance(getTestConfig()); + final AtomicInteger i = new AtomicInteger(0); + + broadcaster.registerStaticListener(new Listener() { + @Override + public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) + throws IOException { + Assert.assertEquals(2, i.incrementAndGet()); + } + }, "test"); + + broadcaster.registerListener(new Listener() { + @Override + public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) + throws IOException { + Assert.assertEquals(1, i.incrementAndGet()); + } + }, "test"); + + broadcaster.notifyListener("test", Event.UPDATE, ""); + + Broadcaster.staticListenerMap.clear(); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/ecf4819e/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java index 14014fc..20cab7e 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -571,7 +571,7 @@ public class CubeController extends BasicController { // Get info of given table. try { - hr = cubeService.getHTableInfo(tableName); + hr = cubeService.getHTableInfo(cubeName, tableName); } catch (IOException e) { logger.error("Failed to calcuate size of HTable \"" + tableName + "\".", e); } http://git-wip-us.apache.org/repos/asf/kylin/blob/ecf4819e/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java b/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java index 292b633..e8337ab 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java @@ -460,7 +460,7 @@ public class CubeControllerV2 extends BasicController { // Get info of given table. try { - hr = cubeService.getHTableInfo(tableName); + hr = cubeService.getHTableInfo(cubeName, tableName); } catch (IOException e) { logger.error("Failed to calculate size of HTable \"" + tableName + "\".", e); } http://git-wip-us.apache.org/repos/asf/kylin/blob/ecf4819e/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java index 7758987..536b338 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java @@ -29,6 +29,7 @@ import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.query.QueryDataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; @@ -38,7 +39,7 @@ import net.sf.ehcache.CacheManager; /** */ @Component("cacheService") -public class CacheService extends BasicService { +public class CacheService extends BasicService implements InitializingBean { private static final Logger logger = LoggerFactory.getLogger(CacheService.class); private static QueryDataSource queryDataSource = new QueryDataSource(); @@ -92,6 +93,11 @@ public class CacheService extends BasicService { this.cubeService = cubeService; } + @Override + public void afterPropertiesSet() throws Exception { + Broadcaster.getInstance(getConfig()).registerStaticListener(cacheSyncListener, "cube"); + } + public void wipeProjectCache(String project) { if (project == null) annouceWipeCache("all", "update", "all"); @@ -106,10 +112,6 @@ public class CacheService extends BasicService { public void notifyMetadataChange(String entity, Event event, String cacheKey) throws IOException { Broadcaster broadcaster = Broadcaster.getInstance(getConfig()); - - // broadcaster can be clearCache() too, make sure listener is registered; re-registration will be ignored - broadcaster.registerListener(cacheSyncListener, "cube"); - broadcaster.notifyListener(entity, event, cacheKey); } http://git-wip-us.apache.org/repos/asf/kylin/blob/ecf4819e/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java index e79bab9..0fcee44 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -52,7 +52,6 @@ import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.exception.BadRequestException; import org.apache.kylin.rest.exception.ForbiddenException; -import org.apache.kylin.rest.exception.InternalErrorException; import org.apache.kylin.rest.msg.Message; import org.apache.kylin.rest.msg.MsgPicker; import org.apache.kylin.rest.request.MetricsRequest; @@ -417,8 +416,9 @@ public class CubeService extends BasicService implements InitializingBean { * if error happens * @throws IOException Exception when HTable resource is not closed correctly. */ - public HBaseResponse getHTableInfo(String tableName) throws IOException { - HBaseResponse hr = htableInfoCache.getIfPresent(tableName); + public HBaseResponse getHTableInfo(String cubeName, String tableName) throws IOException { + String key = cubeName + "/" + tableName; + HBaseResponse hr = htableInfoCache.getIfPresent(key); if (null != hr) { return hr; } @@ -426,6 +426,8 @@ public class CubeService extends BasicService implements InitializingBean { hr = new HBaseResponse(); if ("hbase".equals(getConfig().getMetadataUrl().getScheme())) { try { + logger.debug("Loading HTable info " + cubeName + ", " + tableName); + // use reflection to isolate NoClassDef errors when HBase is not available hr = (HBaseResponse) Class.forName("org.apache.kylin.rest.service.HBaseInfoUtil")// .getMethod("getHBaseInfo", new Class[] { String.class, KylinConfig.class })// @@ -435,7 +437,7 @@ public class CubeService extends BasicService implements InitializingBean { } } - htableInfoCache.put(tableName, hr); + htableInfoCache.put(key, hr); return hr; } @@ -716,8 +718,7 @@ public class CubeService extends BasicService implements InitializingBean { @Override public void afterPropertiesSet() throws Exception { - Broadcaster.getInstance(getConfig()).registerListener(new HTableInfoSyncListener(), "cube"); - logger.info("HTableInfoSyncListener is on."); + Broadcaster.getInstance(getConfig()).registerStaticListener(new HTableInfoSyncListener(), "cube"); } private class HTableInfoSyncListener extends Broadcaster.Listener { @@ -730,17 +731,11 @@ public class CubeService extends BasicService implements InitializingBean { public void onEntityChange(Broadcaster broadcaster, String entity, Broadcaster.Event event, String cacheKey) throws IOException { String cubeName = cacheKey; - - CubeInstance cube = getCubeManager().getCube(cubeName); - if (null == cube) { - throw new InternalErrorException("Cannot find cube " + cubeName); - } - - List<String> htableNameList = Lists.newArrayListWithExpectedSize(cube.getSegments().size()); - for (CubeSegment segment : cube.getSegments()) { - htableNameList.add(segment.getStorageLocationIdentifier()); + String keyPrefix = cubeName + "/"; + for (String k : htableInfoCache.asMap().keySet()) { + if (k.startsWith(keyPrefix)) + htableInfoCache.invalidate(k); } - htableInfoCache.invalidateAll(htableNameList); } } }