This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 00d4fd8  thread leakage checker and memory usage reporter #1226 (#1452)
00d4fd8 is described below

commit 00d4fd831180f72e15c620486dd401efdffac7a0
Author: kaisun2000 <[email protected]>
AuthorDate: Fri Oct 9 14:19:12 2020 -0700

    thread leakage checker and memory usage reporter #1226 (#1452)
    
    Add thread leakage checker and memory usage reporter. The two utility would 
be invoke before and after test classes. The would help to detect/monitor 
resource/memory usage of the unit test.
    
    Co-authored-by: Kai Sun <[email protected]>
---
 .../org/apache/helix/ThreadLeakageChecker.java     | 220 +++++++++++++++++++++
 .../java/org/apache/helix/common/ZkTestBase.java   |  29 ++-
 .../multizk/TestMultiZkHelixJavaApis.java          |  18 ++
 3 files changed, 266 insertions(+), 1 deletion(-)

diff --git 
a/helix-core/src/test/java/org/apache/helix/ThreadLeakageChecker.java 
b/helix-core/src/test/java/org/apache/helix/ThreadLeakageChecker.java
new file mode 100644
index 0000000..c2d3e41
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/ThreadLeakageChecker.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.helix.common.ZkTestBase;
+
+
+public class ThreadLeakageChecker {
+  private static ThreadGroup getRootThreadGroup() {
+    ThreadGroup candidate = Thread.currentThread().getThreadGroup();
+    while (candidate.getParent() != null) {
+      candidate = candidate.getParent();
+    }
+    return candidate;
+  }
+
+  private static List<Thread> getAllThreads() {
+    ThreadGroup rootThreadGroup = getRootThreadGroup();
+    Thread[] threads = new Thread[32];
+    int count = rootThreadGroup.enumerate(threads);
+    while (count == threads.length) {
+      threads = new Thread[threads.length * 2];
+      count = rootThreadGroup.enumerate(threads);
+    }
+    return Arrays.asList(Arrays.copyOf(threads, count));
+  }
+
+  private static final String[] ZKSERVER_THRD_PATTERN =
+      {"SessionTracker", "NIOServerCxn", "SyncThread:", "ProcessThread"};
+  private static final String[] ZKSESSION_THRD_PATTERN =
+      new String[]{"ZkClient-EventThread", "ZkClient-AsyncCallback", 
"-EventThread", "-SendThread"};
+  private static final String[] FORKJOIN_THRD_PATTERN = new 
String[]{"ForkJoinPool"};
+  private static final String[] TIMER_THRD_PATTERN = new String[]{"time"};
+  private static final String[] TASKSTATEMODEL_THRD_PATTERN = new 
String[]{"TaskStateModel"};
+
+  /*
+   * The two threshold -- warning and limit, are mostly empirical.
+   *
+   * ZkServer, current version has only 4 threads. In case later version use 
more, we the limit to 100.
+   * The reasoning is that these ZkServer threads are not deemed as leaking no 
matter how much they have.
+   *
+   * ZkSession is the ZkClient and native Zookeeper client we have. ZkTestBase 
has 12 at starting up time.
+   * Thus, if there is more than that, it is the test code leaking ZkClient.
+   *
+   * ForkJoin is created by using parallel stream or similar Java features. 
This is out of our control.
+   * Similar to ZkServer. The limit is to 100 while keep a small _warningLimit.
+   *
+   * Timer should not happen. Setting limit to 2 not 0 mostly because even 
when you cancel the timer
+   * thread, it may take some not deterministic time for it to go away. So 
give it some slack here
+   *
+   * Also note, this ThreadLeakage checker depends on the fact that tests are 
running sequentially.
+   * Otherwise, the report is not going to be accurate.
+   */
+  private static enum ThreadCategory {
+    ZkServer("zookeeper server threads", 4, 100, ZKSERVER_THRD_PATTERN),
+    ZkSession("zkclient/zooKeeper session threads", 12, 12, 
ZKSESSION_THRD_PATTERN),
+    ForkJoin("fork join pool threads", 2, 100, FORKJOIN_THRD_PATTERN),
+    Timer("timer threads", 0, 2, TIMER_THRD_PATTERN),
+    TaskStateModel("TaskStateModel threads", 0, 0, 
TASKSTATEMODEL_THRD_PATTERN),
+    Other("Other threads", 0, 2, new String[]{""});
+
+    private String _description;
+    private List<String> _pattern;
+    private int _warningLimit;
+    private int _limit;
+
+    public String getDescription() {
+      return _description;
+    }
+
+    public Predicate<String> getMatchPred() {
+      if (this.name() != ThreadCategory.Other.name()) {
+        Predicate<String> pred = target -> {
+          for (String p : _pattern) {
+            if (target.toLowerCase().contains(p.toLowerCase())) {
+              return true;
+            }
+          }
+          return false;
+        };
+        return pred;
+      }
+
+      List<Predicate<String>> predicateList = new ArrayList<>();
+      for (ThreadCategory threadCategory : ThreadCategory.values()) {
+        if (threadCategory == ThreadCategory.Other) {
+          continue;
+        }
+        predicateList.add(threadCategory.getMatchPred());
+      }
+      Predicate<String> pred = target -> {
+        for (Predicate<String> p : predicateList) {
+          if (p.test(target)) {
+            return false;
+          }
+        }
+        return true;
+      };
+
+      return pred;
+    }
+
+    public int getWarningLimit() {
+      return _warningLimit;
+    }
+
+    public int getLimit() {
+      return _limit;
+    }
+
+    private ThreadCategory(String description, int warningLimit, int limit, 
String[] patterns) {
+      _description = description;
+      _pattern = Arrays.asList(patterns);
+      _warningLimit = warningLimit;
+      _limit = limit;
+    }
+  }
+
+  public static boolean afterClassCheck(String classname) {
+    ZkTestBase.reportPhysicalMemory();
+    // step 1: get all active threads
+    List<Thread> threads = getAllThreads();
+    System.out.println(classname + " has active threads cnt:" + 
threads.size());
+
+    // step 2: categorize threads
+    Map<String, List<Thread>> threadByName = null;
+    Map<ThreadCategory, Integer> threadByCnt = new HashMap<>();
+    Map<ThreadCategory, Set<Thread>> threadByCat = new HashMap<>();
+    try {
+      threadByName = threads.
+          stream().
+          filter(p -> p.getThreadGroup() != null && 
p.getThreadGroup().getName() != null
+              &&  ! "system".equals(p.getThreadGroup().getName())).
+          collect(Collectors.groupingBy(p -> p.getName()));
+    } catch (Exception e) {
+      System.out.println("filtering thread failure with exception:" + 
e.getStackTrace());
+    }
+
+    threadByName.entrySet().stream().forEach(entry -> {
+      String key = entry.getKey(); // thread name
+      Arrays.asList(ThreadCategory.values()).stream().forEach(category -> {
+        if (category.getMatchPred().test(key)) {
+          Integer count = threadByCnt.containsKey(category) ? 
threadByCnt.get(category) : 0;
+          threadByCnt.put(category, count + entry.getValue().size());
+          Set<Thread> thisSet = threadByCat.getOrDefault(category, new 
HashSet<>());
+          thisSet.addAll(entry.getValue());
+          threadByCat.put(category, thisSet);
+        }
+      });
+    });
+
+    // todo: We should make the following System.out as LOG.INfO once we 
achieve 0 thread leakage.
+    // todo: also the calling point of this method would fail the test
+    // step 3: enforce checking policy
+    boolean checkStatus = true;
+    for (ThreadCategory threadCategory : ThreadCategory.values()) {
+      int limit = threadCategory.getLimit();
+      int warningLimit = threadCategory.getWarningLimit();
+
+      Integer categoryThreadCnt = threadByCnt.get(threadCategory);
+      if (categoryThreadCnt != null) {
+        boolean dumpThread = false;
+        if (categoryThreadCnt > limit) {
+          checkStatus = false;
+          System.out.println(
+              "Failure " + threadCategory.getDescription() + " has " + 
categoryThreadCnt + " thread");
+          dumpThread = true;
+        } else if (categoryThreadCnt > warningLimit) {
+          System.out.println(
+              "Warning " + threadCategory.getDescription() + " has " + 
categoryThreadCnt + " thread");
+          dumpThread = true;
+        } else {
+          System.out.println(threadCategory.getDescription() + " has " + 
categoryThreadCnt + " thread");
+        }
+        if (!dumpThread) {
+          continue;
+        }
+        // print first 100 thread names
+        int i = 0;
+        for (Thread t : threadByCat.get(threadCategory)) {
+          System.out.println(i + " thread:" + t.getName());
+          i++;
+          if (i == 100) {
+            System.out.println(" skipping the rest");
+            break;
+          }
+        }
+      }
+    }
+
+    return checkStatus;
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java 
b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index 67fee96..c9741a9 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -44,6 +44,7 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.TestHelper;
+import org.apache.helix.ThreadLeakageChecker;
 import org.apache.helix.api.config.HelixConfigProperty;
 import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
 import org.apache.helix.controller.pipeline.Pipeline;
@@ -127,8 +128,22 @@ public class ZkTestBase {
   protected Map<String, ClusterSetup> _clusterSetupMap = new HashMap<>();
   protected Map<String, BaseDataAccessor> _baseDataAccessorMap = new 
HashMap<>();
 
+  static public void reportPhysicalMemory() {
+    com.sun.management.OperatingSystemMXBean os = 
(com.sun.management.OperatingSystemMXBean)
+        java.lang.management.ManagementFactory.getOperatingSystemMXBean();
+    long physicalMemorySize = os.getTotalPhysicalMemorySize();
+    System.out.println("************ SYSTEM Physical Memory:"  + 
physicalMemorySize);
+
+    long MB = 1024 * 1024;
+    Runtime runtime = Runtime.getRuntime();
+    long free = runtime.freeMemory()/MB;
+    long total = runtime.totalMemory()/MB;
+    System.out.println("************ total memory:" + total + " free memory:" 
+ free);
+  }
+
   @BeforeSuite
   public void beforeSuite() throws Exception {
+    reportPhysicalMemory();
     // TODO: use logging.properties file to config java.util.logging.Logger 
levels
     java.util.logging.Logger topJavaLogger = 
java.util.logging.Logger.getLogger("");
     topJavaLogger.setLevel(Level.WARNING);
@@ -710,7 +725,8 @@ public class ZkTestBase {
   }
 
   @AfterClass
-  public void cleanupLiveInstanceOwners() {
+  public void cleanupLiveInstanceOwners() throws InterruptedException {
+    String testClassName = this.getShortClassName();
     for (String cluster : _liveInstanceOwners.keySet()) {
       Map<String, HelixZkClient> clientMap = _liveInstanceOwners.get(cluster);
       for (HelixZkClient client : clientMap.values()) {
@@ -719,6 +735,17 @@ public class ZkTestBase {
       clientMap.clear();
     }
     _liveInstanceOwners.clear();
+
+    boolean status = false;
+    try {
+      status = ThreadLeakageChecker.afterClassCheck(testClassName);
+    } catch (Exception e) {
+      LOG.error("ThreadLeakageChecker exception:", e);
+    }
+    // todo: We should fail test here once we achieved 0 leakage and remove 
the following System print
+    if (!status) {
+      System.out.println("---------- Test Class " + testClassName + " thread 
leakage detected! ---------------");
+    }
   }
 
   protected List<LiveInstance> setupLiveInstances(String clusterName, int[] 
liveInstances) {
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
 
b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
index 8f62207..a8f2c2c 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
@@ -43,7 +43,9 @@ import org.apache.helix.HelixManagerProperty;
 import org.apache.helix.InstanceType;
 import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.TestHelper;
+import org.apache.helix.ThreadLeakageChecker;
 import org.apache.helix.api.config.RebalanceConfig;
+import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
 import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.integration.manager.ClusterControllerManager;
@@ -83,6 +85,8 @@ import 
org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
 import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
 import org.apache.helix.zookeeper.routing.RoutingDataManager;
 import org.apache.helix.zookeeper.zkclient.ZkServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -95,6 +99,7 @@ import org.testng.annotations.Test;
  * This test verifies that all Helix Java APIs work as expected.
  */
 public class TestMultiZkHelixJavaApis {
+  private static Logger LOG = 
LoggerFactory.getLogger(TestMultiZkHelixJavaApis.class);
   private static final int NUM_ZK = 3;
   private static final Map<String, ZkServer> ZK_SERVER_MAP = new HashMap<>();
   private static final Map<String, HelixZkClient> ZK_CLIENT_MAP = new 
HashMap<>();
@@ -170,6 +175,8 @@ public class TestMultiZkHelixJavaApis {
 
   @AfterClass
   public void afterClass() throws Exception {
+    String testClassName = getClass().getSimpleName();
+
     try {
       // Kill all mock controllers and participants
       MOCK_CONTROLLERS.values().forEach(ClusterControllerManager::syncStop);
@@ -216,6 +223,17 @@ public class TestMultiZkHelixJavaApis {
         
System.clearProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY);
       }
     }
+
+    boolean status = false;
+    try {
+      status = ThreadLeakageChecker.afterClassCheck(testClassName);
+    } catch (Exception e) {
+      LOG.error("ThreadLeakageChecker exception:", e);
+    }
+    // todo: We should fail test here once we achieved 0 leakage and remove 
the following System print
+    if (!status) {
+      System.out.println("---------- Test Class " + testClassName + " thread 
leakage detected! ---------------");
+    }
   }
 
   /**

Reply via email to