TAJO-1105: Add thread which detects JVM pauses like HADOOP's. (jinho) Closes #191
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/140083bf Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/140083bf Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/140083bf Branch: refs/heads/block_iteration Commit: 140083bfbaf6dafb330eebbc1c7f7447c5292c1e Parents: 55d68ec Author: jhkim <[email protected]> Authored: Thu Oct 9 11:06:27 2014 +0900 Committer: jhkim <[email protected]> Committed: Thu Oct 9 11:06:27 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../java/org/apache/tajo/master/TajoMaster.java | 16 +- .../org/apache/tajo/util/JvmPauseMonitor.java | 221 +++++++++++++++++++ .../java/org/apache/tajo/worker/TajoWorker.java | 16 +- 4 files changed, 245 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/140083bf/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 51e5598..767e55b 100644 --- a/CHANGES +++ b/CHANGES @@ -4,6 +4,8 @@ Release 0.9.0 - unreleased NEW FEATURES + TAJO-1105: Add thread which detects JVM pauses like HADOOP's. (jinho) + TAJO-704: TajoMaster HA (jaehwa) TAJO-20: INSERT INTO ... SELECT. (Hyoungjun Kim via hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/140083bf/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index e393783..7fec037 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -55,11 +55,7 @@ import org.apache.tajo.master.session.SessionManager; import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.storage.AbstractStorageManager; import org.apache.tajo.storage.StorageManagerFactory; -import org.apache.tajo.util.ClassUtil; -import org.apache.tajo.util.CommonTestingUtil; -import org.apache.tajo.util.NetUtils; -import org.apache.tajo.util.StringUtils; -import org.apache.tajo.util.VersionInfo; +import org.apache.tajo.util.*; import org.apache.tajo.util.metrics.TajoSystemMetrics; import org.apache.tajo.webapp.QueryExecutorServlet; import org.apache.tajo.webapp.StaticHttpServer; @@ -134,6 +130,8 @@ public class TajoMaster extends CompositeService { private HAService haService; + private JvmPauseMonitor pauseMonitor; + public TajoMaster() throws Exception { super(TajoMaster.class.getName()); } @@ -356,6 +354,11 @@ public class TajoMaster extends CompositeService { return sqlFuncs; } + private void startJvmPauseMonitor(){ + pauseMonitor = new JvmPauseMonitor(systemConf); + pauseMonitor.start(); + } + public MasterContext getContext() { return this.context; } @@ -364,6 +367,8 @@ public class TajoMaster extends CompositeService { public void serviceStart() throws Exception { LOG.info("TajoMaster is starting up"); + startJvmPauseMonitor(); + // check base tablespace and databases checkBaseTBSpaceAndDatabase(); @@ -450,6 +455,7 @@ public class TajoMaster extends CompositeService { RpcChannelFactory.shutdown(); + if(pauseMonitor != null) pauseMonitor.stop(); super.stop(); LOG.info("Tajo Master main thread exiting"); } http://git-wip-us.apache.org/repos/asf/tajo/blob/140083bf/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java b/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java new file mode 100644 index 0000000..3ec6c40 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.util; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.Daemon; + +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Class which sets up a simple thread which runs in a loop sleeping + * for a short interval of time. If the sleep takes significantly longer + * than its target time, it implies that the JVM or host machine has + * paused processing, which may cause other problems. If such a pause is + * detected, the thread logs a message. + * This class is borrowed from the following source code : + * ${hadoop-common-project}/hadoop-common/src/main/java/org/apache/hadoop/util/ + * JvmPauseMonitor.java HADOOP-9618 + */ [email protected] +public class JvmPauseMonitor { + private static final Log LOG = LogFactory.getLog( + JvmPauseMonitor.class); + + /** The target sleep time */ + private static final long SLEEP_INTERVAL_MS = 500; + + /** log WARN if we detect a pause longer than this threshold */ + private final long warnThresholdMs; + private static final String WARN_THRESHOLD_KEY = + "jvm.pause.warn-threshold.ms"; + private static final long WARN_THRESHOLD_DEFAULT = 10000; + + /** log INFO if we detect a pause longer than this threshold */ + private final long infoThresholdMs; + private static final String INFO_THRESHOLD_KEY = + "jvm.pause.info-threshold.ms"; + private static final long INFO_THRESHOLD_DEFAULT = 1000; + + private long numGcWarnThresholdExceeded = 0; + private long numGcInfoThresholdExceeded = 0; + private long totalGcExtraSleepTime = 0; + + private Thread monitorThread; + private volatile boolean shouldRun = true; + + public JvmPauseMonitor(Configuration conf) { + this.warnThresholdMs = conf.getLong(WARN_THRESHOLD_KEY, WARN_THRESHOLD_DEFAULT); + this.infoThresholdMs = conf.getLong(INFO_THRESHOLD_KEY, INFO_THRESHOLD_DEFAULT); + } + + public void start() { + Preconditions.checkState(monitorThread == null, + "Already started"); + monitorThread = new Daemon(new Monitor()); + monitorThread.start(); + } + + public void stop() { + shouldRun = false; + monitorThread.interrupt(); + try { + monitorThread.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + public boolean isStarted() { + return monitorThread != null; + } + + public long getNumGcWarnThreadholdExceeded() { + return numGcWarnThresholdExceeded; + } + + public long getNumGcInfoThresholdExceeded() { + return numGcInfoThresholdExceeded; + } + + public long getTotalGcExtraSleepTime() { + return totalGcExtraSleepTime; + } + + private String formatMessage(long extraSleepTime, + Map<String, GcTimes> gcTimesAfterSleep, + Map<String, GcTimes> gcTimesBeforeSleep) { + + Set<String> gcBeanNames = Sets.intersection( + gcTimesAfterSleep.keySet(), + gcTimesBeforeSleep.keySet()); + List<String> gcDiffs = Lists.newArrayList(); + for (String name : gcBeanNames) { + GcTimes diff = gcTimesAfterSleep.get(name).subtract( + gcTimesBeforeSleep.get(name)); + if (diff.gcCount != 0) { + gcDiffs.add("GC pool '" + name + "' had collection(s): " + + diff.toString()); + } + } + + String ret = "Detected pause in JVM or host machine (eg GC): " + + "pause of approximately " + extraSleepTime + "ms\n"; + if (gcDiffs.isEmpty()) { + ret += "No GCs detected"; + } else { + ret += Joiner.on("\n").join(gcDiffs); + } + return ret; + } + + private Map<String, GcTimes> getGcTimes() { + Map<String, GcTimes> map = Maps.newHashMap(); + List<GarbageCollectorMXBean> gcBeans = + ManagementFactory.getGarbageCollectorMXBeans(); + for (GarbageCollectorMXBean gcBean : gcBeans) { + map.put(gcBean.getName(), new GcTimes(gcBean)); + } + return map; + } + + private static class GcTimes { + private GcTimes(GarbageCollectorMXBean gcBean) { + gcCount = gcBean.getCollectionCount(); + gcTimeMillis = gcBean.getCollectionTime(); + } + + private GcTimes(long count, long time) { + this.gcCount = count; + this.gcTimeMillis = time; + } + + private GcTimes subtract(GcTimes other) { + return new GcTimes(this.gcCount - other.gcCount, + this.gcTimeMillis - other.gcTimeMillis); + } + + @Override + public String toString() { + return "count=" + gcCount + " time=" + gcTimeMillis + "ms"; + } + + private long gcCount; + private long gcTimeMillis; + } + + private class Monitor implements Runnable { + @Override + public void run() { + Stopwatch sw = new Stopwatch(); + Map<String, GcTimes> gcTimesBeforeSleep = getGcTimes(); + while (shouldRun) { + sw.reset().start(); + try { + Thread.sleep(SLEEP_INTERVAL_MS); + } catch (InterruptedException ie) { + return; + } + long extraSleepTime = sw.elapsedMillis() - SLEEP_INTERVAL_MS; + Map<String, GcTimes> gcTimesAfterSleep = getGcTimes(); + + if (extraSleepTime > warnThresholdMs) { + ++numGcWarnThresholdExceeded; + LOG.warn(formatMessage( + extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep)); + } else if (extraSleepTime > infoThresholdMs) { + ++numGcInfoThresholdExceeded; + LOG.info(formatMessage( + extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep)); + } + totalGcExtraSleepTime += extraSleepTime; + gcTimesBeforeSleep = gcTimesAfterSleep; + } + } + } + + /** + * Simple 'main' to facilitate manual testing of the pause monitor. + * + * This main function just leaks memory into a list. Running this class + * with a 1GB heap will very quickly go into "GC hell" and result in + * log messages about the GC pauses. + */ + public static void main(String []args) throws Exception { + new JvmPauseMonitor(new Configuration()).start(); + List<String> list = Lists.newArrayList(); + int i = 0; + while (true) { + list.add(String.valueOf(i++)); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/140083bf/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index 280fc2b..b3dae8b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -32,7 +32,6 @@ import org.apache.hadoop.yarn.util.RackResolver; import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.CatalogClient; import org.apache.tajo.catalog.CatalogService; -import org.apache.tajo.common.exception.NotImplementedException; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; @@ -45,10 +44,7 @@ import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.storage.HashShuffleAppenderManager; -import org.apache.tajo.util.CommonTestingUtil; -import org.apache.tajo.util.HAServiceUtil; -import org.apache.tajo.util.NetUtils; -import org.apache.tajo.util.StringUtils; +import org.apache.tajo.util.*; import org.apache.tajo.util.metrics.TajoSystemMetrics; import org.apache.tajo.webapp.StaticHttpServer; @@ -131,6 +127,8 @@ public class TajoWorker extends CompositeService { private LocalDirAllocator lDirAllocator; + private JvmPauseMonitor pauseMonitor; + public TajoWorker() throws Exception { super(TajoWorker.class.getName()); } @@ -309,12 +307,18 @@ public class TajoWorker extends CompositeService { } } + private void startJvmPauseMonitor(){ + pauseMonitor = new JvmPauseMonitor(systemConf); + pauseMonitor.start(); + } + public WorkerContext getWorkerContext() { return workerContext; } @Override public void serviceStart() throws Exception { + startJvmPauseMonitor(); tajoMasterInfo = new TajoMasterInfo(); if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { @@ -372,6 +376,8 @@ public class TajoWorker extends CompositeService { } if(deletionService != null) deletionService.stop(); + + if(pauseMonitor != null) pauseMonitor.stop(); super.serviceStop(); LOG.info("TajoWorker main thread exiting"); }
