Updated Branches: refs/heads/trunk e5f1024d4 -> 06d64a29d
GIRAPH-522: JMap Dumper (nitay) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/06d64a29 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/06d64a29 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/06d64a29 Branch: refs/heads/trunk Commit: 06d64a29dcea6e10d22165f80300e007eb9d000a Parents: e5f1024 Author: Nitay Joffe <[email protected]> Authored: Sun Feb 17 07:15:29 2013 -0500 Committer: Nitay Joffe <[email protected]> Committed: Sun Feb 17 17:34:23 2013 -0500 ---------------------------------------------------------------------- CHANGELOG | 2 + .../apache/giraph/conf/GiraphConfiguration.java | 9 + .../org/apache/giraph/conf/GiraphConstants.java | 15 ++ .../org/apache/giraph/graph/GraphTaskManager.java | 8 +- .../org/apache/giraph/master/BspServiceMaster.java | 6 +- .../main/java/org/apache/giraph/utils/JMap.java | 84 +++++++++++ .../org/apache/giraph/utils/JMapHistoDumper.java | 117 +++++++++++++++ .../org/apache/giraph/worker/BspServiceWorker.java | 22 ++-- 8 files changed, 248 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/06d64a29/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 1705e22..2fb626e 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-522: JMap Dumper (nitay) + GIRAPH-517: Use stable hcatalog 0.5.0-incubating (nitay) GIRAPH-503: Refactor platform-independent CLI argument parsing in GiraphRunner into a separate class (ereisman) http://git-wip-us.apache.org/repos/asf/giraph/blob/06d64a29/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index 96fada4..ddeaeb7 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -150,6 +150,15 @@ public class GiraphConfiguration extends Configuration } /** + * Check whether to enable jmap dumping thread. + * + * @return true if jmap dumper is enabled. + */ + public boolean isJMapHistogramDumpEnabled() { + return getBoolean(JMAP_ENABLE, JMAP_ENABLE_DEFAULT); + } + + /** * Add a class to a property that is a list of classes. If the property does * not exist it will be created. * http://git-wip-us.apache.org/repos/asf/giraph/blob/06d64a29/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index d04f601..e0aeba3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -114,6 +114,21 @@ public interface GiraphConstants { /** Default to not use thread-level debugging */ boolean LOG_THREAD_LAYOUT_DEFAULT = false; + /** Configuration key to enable jmap printing */ + String JMAP_ENABLE = "giraph.jmap.histo.enable"; + /** Default value for enabling jmap */ + boolean JMAP_ENABLE_DEFAULT = false; + + /** Configuration key for msec to sleep between calls */ + String JMAP_SLEEP_MILLIS = "giraph.jmap.histo.msec"; + /** Default msec to sleep between calls */ + int JMAP_SLEEP_MILLIS_DEFAULT = 30000; + + /** Configuration key for how many lines to print */ + String JMAP_PRINT_LINES = "giraph.jmap.histo.print_lines"; + /** Default lines of output to print */ + int JMAP_PRINT_LINES_DEFAULT = 30; + /** * Minimum percent of the maximum number of workers that have responded * in order to continue progressing. (float) http://git-wip-us.apache.org/repos/asf/giraph/blob/06d64a29/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index 3624728..20fa5c5 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -25,8 +25,6 @@ import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.messages.MessageStoreByPartition; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.partition.PartitionOwner; -import org.apache.giraph.partition.PartitionStats; import org.apache.giraph.master.BspServiceMaster; import org.apache.giraph.master.MasterAggregatorUsage; import org.apache.giraph.master.MasterThread; @@ -36,11 +34,13 @@ import org.apache.giraph.metrics.GiraphTimer; import org.apache.giraph.metrics.GiraphTimerContext; import org.apache.giraph.metrics.ResetSuperstepMetricsObserver; import org.apache.giraph.metrics.SuperstepMetricsRegistry; +import org.apache.giraph.partition.PartitionOwner; +import org.apache.giraph.partition.PartitionStats; +import org.apache.giraph.time.SystemTime; +import org.apache.giraph.time.Time; import org.apache.giraph.utils.MemoryUtils; import org.apache.giraph.utils.ProgressableUtils; import org.apache.giraph.utils.ReflectionUtils; -import org.apache.giraph.time.SystemTime; -import org.apache.giraph.time.Time; import org.apache.giraph.vertex.Vertex; import org.apache.giraph.worker.BspServiceWorker; import org.apache.giraph.worker.WorkerAggregatorUsage; http://git-wip-us.apache.org/repos/asf/giraph/blob/06d64a29/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index 1f4a184..10a0afd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -52,6 +52,7 @@ import org.apache.giraph.metrics.GiraphTimerContext; import org.apache.giraph.metrics.ResetSuperstepMetricsObserver; import org.apache.giraph.metrics.SuperstepMetricsRegistry; import org.apache.giraph.metrics.WorkerSuperstepMetrics; +import org.apache.giraph.utils.JMapHistoDumper; import org.apache.giraph.utils.ProgressableUtils; import org.apache.giraph.time.SystemTime; import org.apache.giraph.time.Time; @@ -206,7 +207,10 @@ public class BspServiceMaster<I extends WritableComparable, GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT); masterGraphPartitioner = getGraphPartitionerFactory().createMasterGraphPartitioner(); - observers = getConfiguration().createMasterObservers(); + if (conf.isJMapHistogramDumpEnabled()) { + conf.addMasterObserverClass(JMapHistoDumper.class); + } + observers = conf.createMasterObservers(); GiraphMetrics.get().addSuperstepResetObserver(this); GiraphStats.init(context); http://git-wip-us.apache.org/repos/asf/giraph/blob/06d64a29/giraph-core/src/main/java/org/apache/giraph/utils/JMap.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/JMap.java b/giraph-core/src/main/java/org/apache/giraph/utils/JMap.java new file mode 100644 index 0000000..69d1ab3 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/JMap.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintStream; +import java.lang.management.ManagementFactory; +import java.util.Date; + +/** + * Helper to run jmap and print the output + */ +public class JMap { + /** The command to run */ + public static final String CMD = "jmap "; + /** Arguments to pass in to command */ + public static final String ARGS = " -histo "; + + /** Do not construct */ + protected JMap() { } + + /** + * Get the process ID of the current running process + * + * @return Integer process ID + */ + public static int getProcessId() { + String processId = ManagementFactory.getRuntimeMXBean().getName(); + if (processId.contains("@")) { + processId = processId.substring(0, processId.indexOf("@")); + } + return Integer.parseInt(processId); + } + + /** + * Run jmap, print numLines of output from it to stderr. + * + * @param numLines Number of lines to print + */ + public static void heapHistogramDump(int numLines) { + heapHistogramDump(numLines, System.err); + } + + /** + * Run jmap, print numLines of output from it to stream passed in. + * + * @param numLines Number of lines to print + * @param printStream Stream to print to + */ + public static void heapHistogramDump(int numLines, PrintStream printStream) { + try { + Process p = Runtime.getRuntime().exec(CMD + ARGS + getProcessId()); + BufferedReader in = new BufferedReader( + new InputStreamReader(p.getInputStream())); + printStream.println("JMap histo dump at " + new Date()); + String line = in.readLine(); + for (int i = 0; i < numLines && line != null; ++i) { + printStream.println("--\t" + line); + line = in.readLine(); + } + in.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/06d64a29/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java b/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java new file mode 100644 index 0000000..256f5d1 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils; + +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.master.MasterObserver; +import org.apache.giraph.worker.WorkerObserver; +import org.apache.log4j.Logger; + +/** + * An observer for both worker and master that periodically dumps the memory + * usage using jmap tool. + */ +public class JMapHistoDumper implements MasterObserver, WorkerObserver { + /** Logger */ + private static final Logger LOG = Logger.getLogger(JMapHistoDumper.class); + + /** How many msec to sleep between calls */ + private int sleepMillis; + /** How many lines of output to print */ + private int linesToPrint; + + /** The jmap printing thread */ + private Thread thread; + /** Halt jmap thread */ + private boolean stop = false; + + @Override + public void preApplication() { + // This is called by both WorkerObserver and MasterObserver + startJMapThread(); + } + + @Override + public void postApplication() { + // This is called by both WorkerObserver and MasterObserver + joinJMapThread(); + } + + /** + * Join the jmap thread + */ + private void joinJMapThread() { + stop = true; + try { + thread.join(sleepMillis + 5000); + } catch (InterruptedException e) { + LOG.error("Failed to join jmap thread"); + } + } + + /** + * Start the jmap thread + */ + public void startJMapThread() { + stop = false; + thread = new Thread(new Runnable() { + @Override + public void run() { + while (!stop) { + JMap.heapHistogramDump(linesToPrint); + try { + Thread.sleep(sleepMillis); + } catch (InterruptedException e) { + LOG.warn("JMap histogram sleep interrupted", e); + } + } + } + }); + thread.start(); + } + + @Override + public void preSuperstep(long superstep) { } + + @Override + public void postSuperstep(long superstep) { } + + @Override + public void applicationFailed(Exception e) { } + + @Override + public void preSuperstep() { } + + @Override + public void postSuperstep() { } + + @Override + public void setConf(ImmutableClassesGiraphConfiguration configuration) { + sleepMillis = configuration.getInt(GiraphConstants.JMAP_SLEEP_MILLIS, + GiraphConstants.JMAP_SLEEP_MILLIS_DEFAULT); + linesToPrint = configuration.getInt(GiraphConstants.JMAP_PRINT_LINES, + GiraphConstants.JMAP_PRINT_LINES_DEFAULT); + } + + @Override + public ImmutableClassesGiraphConfiguration getConf() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/06d64a29/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index 3256a02..3b510b2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -31,6 +31,7 @@ import org.apache.giraph.comm.netty.NettyWorkerClient; import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor; import org.apache.giraph.comm.netty.NettyWorkerServer; import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.GraphState; import org.apache.giraph.bsp.BspService; import org.apache.giraph.graph.GraphTaskManager; @@ -40,6 +41,7 @@ import org.apache.giraph.graph.InputSplitEvents; import org.apache.giraph.graph.FinishedSuperstepStats; import org.apache.giraph.graph.AddressesAndPartitionsWritable; import org.apache.giraph.graph.GlobalStats; +import org.apache.giraph.utils.JMapHistoDumper; import org.apache.giraph.vertex.Vertex; import org.apache.giraph.io.VertexOutputFormat; import org.apache.giraph.io.VertexWriter; @@ -173,28 +175,28 @@ public class BspServiceWorker<I extends WritableComparable, GraphTaskManager<I, V, E, M> graphTaskManager) throws IOException, InterruptedException { super(serverPortList, sessionMsecTimeout, context, graphTaskManager); + ImmutableClassesGiraphConfiguration conf = getConfiguration(); partitionExchangeChildrenChanged = new PredicateLock(context); registerBspEvent(partitionExchangeChildrenChanged); workerGraphPartitioner = getGraphPartitionerFactory().createWorkerGraphPartitioner(); workerInfo = new WorkerInfo(); - workerServer = - new NettyWorkerServer<I, V, E, M>(getConfiguration(), this, context); + workerServer = new NettyWorkerServer<I, V, E, M>(conf, this, context); workerInfo.setInetSocketAddress(workerServer.getMyAddress()); workerInfo.setTaskId(getTaskPartition()); - workerClient = - new NettyWorkerClient<I, V, E, M>(context, getConfiguration(), this); + workerClient = new NettyWorkerClient<I, V, E, M>(context, conf, this); workerAggregatorRequestProcessor = - new NettyWorkerAggregatorRequestProcessor(getContext(), - getConfiguration(), this); + new NettyWorkerAggregatorRequestProcessor(getContext(), conf, this); - this.workerContext = getConfiguration().createWorkerContext(null); + workerContext = conf.createWorkerContext(null); - aggregatorHandler = - new WorkerAggregatorHandler(this, getConfiguration(), context); + aggregatorHandler = new WorkerAggregatorHandler(this, conf, context); - observers = getConfiguration().createWorkerObservers(); + if (conf.isJMapHistogramDumpEnabled()) { + conf.addWorkerObserverClass(JMapHistoDumper.class); + } + observers = conf.createWorkerObservers(); GiraphMetrics.get().addSuperstepResetObserver(this); }
