#754 Change broadcast threads to daemon threads
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5a4e465d Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5a4e465d Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5a4e465d Branch: refs/heads/master Commit: 5a4e465dd8b8fda640db74e595491a2b2dd3c9db Parents: 4d2a548 Author: auphyroc99 <[email protected]> Authored: Tue Jun 20 14:44:01 2017 +0800 Committer: Roger Shi <[email protected]> Committed: Tue Jun 20 15:01:11 2017 +0800 ---------------------------------------------------------------------- .../kylin/metadata/cachesync/Broadcaster.java | 22 +++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/5a4e465d/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java index 4fbfc7c..c9e1130 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java @@ -114,26 +114,28 @@ public class Broadcaster { @Override public void run() { final Map<String, RestClient> restClientMap = Maps.newHashMap(); - final ExecutorService wipingCachePool = new ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); + final ExecutorService wipingCachePool = new ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory()); while (true) { try { final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst(); String[] restServers = config.getRestServers(); - logger.info("Servers in the cluster: " + Arrays.toString(restServers)); + logger.debug("Servers in the cluster: " + Arrays.toString(restServers)); for (final String node : restServers) { if (restClientMap.containsKey(node) == false) { restClientMap.put(node, new RestClient(node)); } } - logger.info("Announcing new broadcast event: " + broadcastEvent); + logger.debug("Announcing new broadcast event: " + broadcastEvent); for (final String node : restServers) { wipingCachePool.execute(new Runnable() { @Override public void run() { try { - restClientMap.get(node).wipeCache(broadcastEvent.getEntity(), broadcastEvent.getEvent(), broadcastEvent.getCacheKey()); + restClientMap.get(node).wipeCache(broadcastEvent.getEntity(), + broadcastEvent.getEvent(), broadcastEvent.getCacheKey()); } catch (IOException e) { logger.warn("Thread failed during wipe cache at " + broadcastEvent, e); } @@ -192,7 +194,8 @@ public class Broadcaster { if (list == null) return; - logger.trace("Broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey + ", listeners=" + list); + logger.trace("Broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey + + ", listeners=" + list); // prevents concurrent modification exception list = Lists.newArrayList(list); @@ -222,7 +225,8 @@ public class Broadcaster { break; } - logger.debug("Done broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey); + logger.debug( + "Done broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey); } /** @@ -279,7 +283,8 @@ public class Broadcaster { public void onProjectDataChange(Broadcaster broadcaster, String project) throws IOException { } - public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException { + public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) + throws IOException { } } @@ -343,7 +348,8 @@ public class Broadcaster { @Override public String toString() { - return Objects.toStringHelper(this).add("entity", entity).add("event", event).add("cacheKey", cacheKey).toString(); + return Objects.toStringHelper(this).add("entity", entity).add("event", event).add("cacheKey", cacheKey) + .toString(); } }
