TAJO-1105: Add thread which detects JVM pauses like HADOOP's. (jinho)

Closes #191


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/140083bf
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/140083bf
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/140083bf

Branch: refs/heads/block_iteration
Commit: 140083bfbaf6dafb330eebbc1c7f7447c5292c1e
Parents: 55d68ec
Author: jhkim <[email protected]>
Authored: Thu Oct 9 11:06:27 2014 +0900
Committer: jhkim <[email protected]>
Committed: Thu Oct 9 11:06:27 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../java/org/apache/tajo/master/TajoMaster.java |  16 +-
 .../org/apache/tajo/util/JvmPauseMonitor.java   | 221 +++++++++++++++++++
 .../java/org/apache/tajo/worker/TajoWorker.java |  16 +-
 4 files changed, 245 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/140083bf/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 51e5598..767e55b 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,6 +4,8 @@ Release 0.9.0 - unreleased
 
   NEW FEATURES 
 
+    TAJO-1105: Add thread which detects JVM pauses like HADOOP's. (jinho)
+
     TAJO-704: TajoMaster HA (jaehwa)
 
     TAJO-20: INSERT INTO ... SELECT. (Hyoungjun Kim via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/140083bf/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java 
b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index e393783..7fec037 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -55,11 +55,7 @@ import org.apache.tajo.master.session.SessionManager;
 import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.StorageManagerFactory;
-import org.apache.tajo.util.ClassUtil;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.StringUtils;
-import org.apache.tajo.util.VersionInfo;
+import org.apache.tajo.util.*;
 import org.apache.tajo.util.metrics.TajoSystemMetrics;
 import org.apache.tajo.webapp.QueryExecutorServlet;
 import org.apache.tajo.webapp.StaticHttpServer;
@@ -134,6 +130,8 @@ public class TajoMaster extends CompositeService {
 
   private HAService haService;
 
+  private JvmPauseMonitor pauseMonitor;
+
   public TajoMaster() throws Exception {
     super(TajoMaster.class.getName());
   }
@@ -356,6 +354,11 @@ public class TajoMaster extends CompositeService {
     return sqlFuncs;
   }
 
+  private void startJvmPauseMonitor(){
+    pauseMonitor = new JvmPauseMonitor(systemConf);
+    pauseMonitor.start();
+  }
+
   public MasterContext getContext() {
     return this.context;
   }
@@ -364,6 +367,8 @@ public class TajoMaster extends CompositeService {
   public void serviceStart() throws Exception {
     LOG.info("TajoMaster is starting up");
 
+    startJvmPauseMonitor();
+
     // check base tablespace and databases
     checkBaseTBSpaceAndDatabase();
 
@@ -450,6 +455,7 @@ public class TajoMaster extends CompositeService {
 
     RpcChannelFactory.shutdown();
 
+    if(pauseMonitor != null) pauseMonitor.stop();
     super.stop();
     LOG.info("Tajo Master main thread exiting");
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/140083bf/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java 
b/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java
new file mode 100644
index 0000000..3ec6c40
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Daemon;
+
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Class which sets up a simple thread which runs in a loop sleeping
+ * for a short interval of time. If the sleep takes significantly longer
+ * than its target time, it implies that the JVM or host machine has
+ * paused processing, which may cause other problems. If such a pause is
+ * detected, the thread logs a message.
+ * This class is borrowed from the following source code :
+ * ${hadoop-common-project}/hadoop-common/src/main/java/org/apache/hadoop/util/
+ * JvmPauseMonitor.java HADOOP-9618
+ */
[email protected]
+public class JvmPauseMonitor {
+  private static final Log LOG = LogFactory.getLog(
+      JvmPauseMonitor.class);
+
+  /** The target sleep time */
+  private static final long SLEEP_INTERVAL_MS = 500;
+
+  /** log WARN if we detect a pause longer than this threshold */
+  private final long warnThresholdMs;
+  private static final String WARN_THRESHOLD_KEY =
+      "jvm.pause.warn-threshold.ms";
+  private static final long WARN_THRESHOLD_DEFAULT = 10000;
+
+  /** log INFO if we detect a pause longer than this threshold */
+  private final long infoThresholdMs;
+  private static final String INFO_THRESHOLD_KEY =
+      "jvm.pause.info-threshold.ms";
+  private static final long INFO_THRESHOLD_DEFAULT = 1000;
+
+  private long numGcWarnThresholdExceeded = 0;
+  private long numGcInfoThresholdExceeded = 0;
+  private long totalGcExtraSleepTime = 0;
+
+  private Thread monitorThread;
+  private volatile boolean shouldRun = true;
+
+  public JvmPauseMonitor(Configuration conf) {
+    this.warnThresholdMs = conf.getLong(WARN_THRESHOLD_KEY, 
WARN_THRESHOLD_DEFAULT);
+    this.infoThresholdMs = conf.getLong(INFO_THRESHOLD_KEY, 
INFO_THRESHOLD_DEFAULT);
+  }
+
+  public void start() {
+    Preconditions.checkState(monitorThread == null,
+        "Already started");
+    monitorThread = new Daemon(new Monitor());
+    monitorThread.start();
+  }
+
+  public void stop() {
+    shouldRun = false;
+    monitorThread.interrupt();
+    try {
+      monitorThread.join();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  public boolean isStarted() {
+    return monitorThread != null;
+  }
+
+  public long getNumGcWarnThreadholdExceeded() {
+    return numGcWarnThresholdExceeded;
+  }
+
+  public long getNumGcInfoThresholdExceeded() {
+    return numGcInfoThresholdExceeded;
+  }
+
+  public long getTotalGcExtraSleepTime() {
+    return totalGcExtraSleepTime;
+  }
+
+  private String formatMessage(long extraSleepTime,
+                               Map<String, GcTimes> gcTimesAfterSleep,
+                               Map<String, GcTimes> gcTimesBeforeSleep) {
+
+    Set<String> gcBeanNames = Sets.intersection(
+        gcTimesAfterSleep.keySet(),
+        gcTimesBeforeSleep.keySet());
+    List<String> gcDiffs = Lists.newArrayList();
+    for (String name : gcBeanNames) {
+      GcTimes diff = gcTimesAfterSleep.get(name).subtract(
+          gcTimesBeforeSleep.get(name));
+      if (diff.gcCount != 0) {
+        gcDiffs.add("GC pool '" + name + "' had collection(s): " +
+            diff.toString());
+      }
+    }
+
+    String ret = "Detected pause in JVM or host machine (eg GC): " +
+        "pause of approximately " + extraSleepTime + "ms\n";
+    if (gcDiffs.isEmpty()) {
+      ret += "No GCs detected";
+    } else {
+      ret += Joiner.on("\n").join(gcDiffs);
+    }
+    return ret;
+  }
+
+  private Map<String, GcTimes> getGcTimes() {
+    Map<String, GcTimes> map = Maps.newHashMap();
+    List<GarbageCollectorMXBean> gcBeans =
+        ManagementFactory.getGarbageCollectorMXBeans();
+    for (GarbageCollectorMXBean gcBean : gcBeans) {
+      map.put(gcBean.getName(), new GcTimes(gcBean));
+    }
+    return map;
+  }
+
+  private static class GcTimes {
+    private GcTimes(GarbageCollectorMXBean gcBean) {
+      gcCount = gcBean.getCollectionCount();
+      gcTimeMillis = gcBean.getCollectionTime();
+    }
+
+    private GcTimes(long count, long time) {
+      this.gcCount = count;
+      this.gcTimeMillis = time;
+    }
+
+    private GcTimes subtract(GcTimes other) {
+      return new GcTimes(this.gcCount - other.gcCount,
+          this.gcTimeMillis - other.gcTimeMillis);
+    }
+
+    @Override
+    public String toString() {
+      return "count=" + gcCount + " time=" + gcTimeMillis + "ms";
+    }
+
+    private long gcCount;
+    private long gcTimeMillis;
+  }
+
+  private class Monitor implements Runnable {
+    @Override
+    public void run() {
+      Stopwatch sw = new Stopwatch();
+      Map<String, GcTimes> gcTimesBeforeSleep = getGcTimes();
+      while (shouldRun) {
+        sw.reset().start();
+        try {
+          Thread.sleep(SLEEP_INTERVAL_MS);
+        } catch (InterruptedException ie) {
+          return;
+        }
+        long extraSleepTime = sw.elapsedMillis() - SLEEP_INTERVAL_MS;
+        Map<String, GcTimes> gcTimesAfterSleep = getGcTimes();
+
+        if (extraSleepTime > warnThresholdMs) {
+          ++numGcWarnThresholdExceeded;
+          LOG.warn(formatMessage(
+              extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep));
+        } else if (extraSleepTime > infoThresholdMs) {
+          ++numGcInfoThresholdExceeded;
+          LOG.info(formatMessage(
+              extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep));
+        }
+        totalGcExtraSleepTime += extraSleepTime;
+        gcTimesBeforeSleep = gcTimesAfterSleep;
+      }
+    }
+  }
+
+  /**
+   * Simple 'main' to facilitate manual testing of the pause monitor.
+   *
+   * This main function just leaks memory into a list. Running this class
+   * with a 1GB heap will very quickly go into "GC hell" and result in
+   * log messages about the GC pauses.
+   */
+  public static void main(String []args) throws Exception {
+    new JvmPauseMonitor(new Configuration()).start();
+    List<String> list = Lists.newArrayList();
+    int i = 0;
+    while (true) {
+      list.add(String.valueOf(i++));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/140083bf/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 280fc2b..b3dae8b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.catalog.CatalogClient;
 import org.apache.tajo.catalog.CatalogService;
-import org.apache.tajo.common.exception.NotImplementedException;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
@@ -45,10 +44,7 @@ import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.util.HAServiceUtil;
-import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.StringUtils;
+import org.apache.tajo.util.*;
 import org.apache.tajo.util.metrics.TajoSystemMetrics;
 import org.apache.tajo.webapp.StaticHttpServer;
 
@@ -131,6 +127,8 @@ public class TajoWorker extends CompositeService {
 
   private LocalDirAllocator lDirAllocator;
 
+  private JvmPauseMonitor pauseMonitor;
+
   public TajoWorker() throws Exception {
     super(TajoWorker.class.getName());
   }
@@ -309,12 +307,18 @@ public class TajoWorker extends CompositeService {
     }
   }
 
+  private void startJvmPauseMonitor(){
+    pauseMonitor = new JvmPauseMonitor(systemConf);
+    pauseMonitor.start();
+  }
+
   public WorkerContext getWorkerContext() {
     return workerContext;
   }
 
   @Override
   public void serviceStart() throws Exception {
+    startJvmPauseMonitor();
 
     tajoMasterInfo = new TajoMasterInfo();
     if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
@@ -372,6 +376,8 @@ public class TajoWorker extends CompositeService {
     }
 
     if(deletionService != null) deletionService.stop();
+
+    if(pauseMonitor != null) pauseMonitor.stop();
     super.serviceStop();
     LOG.info("TajoWorker main thread exiting");
   }

Reply via email to