#754 Change broadcast threads to daemon threads

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5a4e465d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5a4e465d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5a4e465d

Branch: refs/heads/master
Commit: 5a4e465dd8b8fda640db74e595491a2b2dd3c9db
Parents: 4d2a548
Author: auphyroc99 <[email protected]>
Authored: Tue Jun 20 14:44:01 2017 +0800
Committer: Roger Shi <[email protected]>
Committed: Tue Jun 20 15:01:11 2017 +0800

----------------------------------------------------------------------
 .../kylin/metadata/cachesync/Broadcaster.java   | 22 +++++++++++++-------
 1 file changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/5a4e465d/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
index 4fbfc7c..c9e1130 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
@@ -114,26 +114,28 @@ public class Broadcaster {
             @Override
             public void run() {
                 final Map<String, RestClient> restClientMap = 
Maps.newHashMap();
-                final ExecutorService wipingCachePool = new 
ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>());
+                final ExecutorService wipingCachePool = new 
ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS,
+                        new LinkedBlockingQueue<Runnable>(), new 
DaemonThreadFactory());
 
                 while (true) {
                     try {
                         final BroadcastEvent broadcastEvent = 
broadcastEvents.takeFirst();
                         String[] restServers = config.getRestServers();
-                        logger.info("Servers in the cluster: " + 
Arrays.toString(restServers));
+                        logger.debug("Servers in the cluster: " + 
Arrays.toString(restServers));
                         for (final String node : restServers) {
                             if (restClientMap.containsKey(node) == false) {
                                 restClientMap.put(node, new RestClient(node));
                             }
                         }
 
-                        logger.info("Announcing new broadcast event: " + 
broadcastEvent);
+                        logger.debug("Announcing new broadcast event: " + 
broadcastEvent);
                         for (final String node : restServers) {
                             wipingCachePool.execute(new Runnable() {
                                 @Override
                                 public void run() {
                                     try {
-                                        
restClientMap.get(node).wipeCache(broadcastEvent.getEntity(), 
broadcastEvent.getEvent(), broadcastEvent.getCacheKey());
+                                        
restClientMap.get(node).wipeCache(broadcastEvent.getEntity(),
+                                                broadcastEvent.getEvent(), 
broadcastEvent.getCacheKey());
                                     } catch (IOException e) {
                                         logger.warn("Thread failed during wipe 
cache at " + broadcastEvent, e);
                                     }
@@ -192,7 +194,8 @@ public class Broadcaster {
         if (list == null)
             return;
 
-        logger.trace("Broadcasting metadata change: entity=" + entity + ", 
event=" + event + ", cacheKey=" + cacheKey + ", listeners=" + list);
+        logger.trace("Broadcasting metadata change: entity=" + entity + ", 
event=" + event + ", cacheKey=" + cacheKey
+                + ", listeners=" + list);
 
         // prevents concurrent modification exception
         list = Lists.newArrayList(list);
@@ -222,7 +225,8 @@ public class Broadcaster {
             break;
         }
 
-        logger.debug("Done broadcasting metadata change: entity=" + entity + 
", event=" + event + ", cacheKey=" + cacheKey);
+        logger.debug(
+                "Done broadcasting metadata change: entity=" + entity + ", 
event=" + event + ", cacheKey=" + cacheKey);
     }
 
     /**
@@ -279,7 +283,8 @@ public class Broadcaster {
         public void onProjectDataChange(Broadcaster broadcaster, String 
project) throws IOException {
         }
 
-        public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey) throws IOException {
+        public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey)
+                throws IOException {
         }
     }
 
@@ -343,7 +348,8 @@ public class Broadcaster {
 
         @Override
         public String toString() {
-            return Objects.toStringHelper(this).add("entity", 
entity).add("event", event).add("cacheKey", cacheKey).toString();
+            return Objects.toStringHelper(this).add("entity", 
entity).add("event", event).add("cacheKey", cacheKey)
+                    .toString();
         }
 
     }

Reply via email to