Updated Branches:
  refs/heads/trunk e5f1024d4 -> 06d64a29d

GIRAPH-522: JMap Dumper (nitay)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/06d64a29
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/06d64a29
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/06d64a29

Branch: refs/heads/trunk
Commit: 06d64a29dcea6e10d22165f80300e007eb9d000a
Parents: e5f1024
Author: Nitay Joffe <[email protected]>
Authored: Sun Feb 17 07:15:29 2013 -0500
Committer: Nitay Joffe <[email protected]>
Committed: Sun Feb 17 17:34:23 2013 -0500

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../apache/giraph/conf/GiraphConfiguration.java    |    9 +
 .../org/apache/giraph/conf/GiraphConstants.java    |   15 ++
 .../org/apache/giraph/graph/GraphTaskManager.java  |    8 +-
 .../org/apache/giraph/master/BspServiceMaster.java |    6 +-
 .../main/java/org/apache/giraph/utils/JMap.java    |   84 +++++++++++
 .../org/apache/giraph/utils/JMapHistoDumper.java   |  117 +++++++++++++++
 .../org/apache/giraph/worker/BspServiceWorker.java |   22 ++--
 8 files changed, 248 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/06d64a29/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 1705e22..2fb626e 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-522: JMap Dumper (nitay)
+
   GIRAPH-517: Use stable hcatalog 0.5.0-incubating (nitay)
 
   GIRAPH-503: Refactor platform-independent CLI argument parsing in 
GiraphRunner into a separate class (ereisman)

http://git-wip-us.apache.org/repos/asf/giraph/blob/06d64a29/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java 
b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 96fada4..ddeaeb7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -150,6 +150,15 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Check whether to enable jmap dumping thread.
+   *
+   * @return true if jmap dumper is enabled.
+   */
+  public boolean isJMapHistogramDumpEnabled() {
+    return getBoolean(JMAP_ENABLE, JMAP_ENABLE_DEFAULT);
+  }
+
+  /**
    * Add a class to a property that is a list of classes. If the property does
    * not exist it will be created.
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/06d64a29/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java 
b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index d04f601..e0aeba3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -114,6 +114,21 @@ public interface GiraphConstants {
   /** Default to not use thread-level debugging */
   boolean LOG_THREAD_LAYOUT_DEFAULT = false;
 
+  /** Configuration key to enable jmap printing */
+  String JMAP_ENABLE = "giraph.jmap.histo.enable";
+  /** Default value for enabling jmap */
+  boolean JMAP_ENABLE_DEFAULT = false;
+
+  /** Configuration key for msec to sleep between calls */
+  String JMAP_SLEEP_MILLIS = "giraph.jmap.histo.msec";
+  /** Default msec to sleep between calls */
+  int JMAP_SLEEP_MILLIS_DEFAULT = 30000;
+
+  /** Configuration key for how many lines to print */
+  String JMAP_PRINT_LINES = "giraph.jmap.histo.print_lines";
+  /** Default lines of output to print */
+  int JMAP_PRINT_LINES_DEFAULT = 30;
+
   /**
    * Minimum percent of the maximum number of workers that have responded
    * in order to continue progressing. (float)

http://git-wip-us.apache.org/repos/asf/giraph/blob/06d64a29/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java 
b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 3624728..20fa5c5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -25,8 +25,6 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.messages.MessageStoreByPartition;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.partition.PartitionStats;
 import org.apache.giraph.master.BspServiceMaster;
 import org.apache.giraph.master.MasterAggregatorUsage;
 import org.apache.giraph.master.MasterThread;
@@ -36,11 +34,13 @@ import org.apache.giraph.metrics.GiraphTimer;
 import org.apache.giraph.metrics.GiraphTimerContext;
 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.partition.PartitionStats;
+import org.apache.giraph.time.SystemTime;
+import org.apache.giraph.time.Time;
 import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.ProgressableUtils;
 import org.apache.giraph.utils.ReflectionUtils;
-import org.apache.giraph.time.SystemTime;
-import org.apache.giraph.time.Time;
 import org.apache.giraph.vertex.Vertex;
 import org.apache.giraph.worker.BspServiceWorker;
 import org.apache.giraph.worker.WorkerAggregatorUsage;

http://git-wip-us.apache.org/repos/asf/giraph/blob/06d64a29/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java 
b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 1f4a184..10a0afd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -52,6 +52,7 @@ import org.apache.giraph.metrics.GiraphTimerContext;
 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 import org.apache.giraph.metrics.WorkerSuperstepMetrics;
+import org.apache.giraph.utils.JMapHistoDumper;
 import org.apache.giraph.utils.ProgressableUtils;
 import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
@@ -206,7 +207,10 @@ public class BspServiceMaster<I extends WritableComparable,
         GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT);
     masterGraphPartitioner =
         getGraphPartitionerFactory().createMasterGraphPartitioner();
-    observers = getConfiguration().createMasterObservers();
+    if (conf.isJMapHistogramDumpEnabled()) {
+      conf.addMasterObserverClass(JMapHistoDumper.class);
+    }
+    observers = conf.createMasterObservers();
 
     GiraphMetrics.get().addSuperstepResetObserver(this);
     GiraphStats.init(context);

http://git-wip-us.apache.org/repos/asf/giraph/blob/06d64a29/giraph-core/src/main/java/org/apache/giraph/utils/JMap.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/JMap.java 
b/giraph-core/src/main/java/org/apache/giraph/utils/JMap.java
new file mode 100644
index 0000000..69d1ab3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/JMap.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.lang.management.ManagementFactory;
+import java.util.Date;
+
+/**
+ * Helper to run jmap and print the output
+ */
+public class JMap {
+  /** The command to run */
+  public static final String CMD = "jmap ";
+  /** Arguments to pass in to command */
+  public static final String ARGS = " -histo ";
+
+  /** Do not construct */
+  protected JMap() { }
+
+  /**
+   * Get the process ID of the current running process
+   *
+   * @return Integer process ID
+   */
+  public static int getProcessId() {
+    String processId = ManagementFactory.getRuntimeMXBean().getName();
+    if (processId.contains("@")) {
+      processId = processId.substring(0, processId.indexOf("@"));
+    }
+    return Integer.parseInt(processId);
+  }
+
+  /**
+   * Run jmap, print numLines of output from it to stderr.
+   *
+   * @param numLines Number of lines to print
+   */
+  public static void heapHistogramDump(int numLines) {
+    heapHistogramDump(numLines, System.err);
+  }
+
+  /**
+   * Run jmap, print numLines of output from it to stream passed in.
+   *
+   * @param numLines Number of lines to print
+   * @param printStream Stream to print to
+   */
+  public static void heapHistogramDump(int numLines, PrintStream printStream) {
+    try {
+      Process p = Runtime.getRuntime().exec(CMD + ARGS + getProcessId());
+      BufferedReader in = new BufferedReader(
+          new InputStreamReader(p.getInputStream()));
+      printStream.println("JMap histo dump at " + new Date());
+      String line = in.readLine();
+      for (int i = 0; i < numLines && line != null; ++i) {
+        printStream.println("--\t" + line);
+        line = in.readLine();
+      }
+      in.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/06d64a29/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java 
b/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
new file mode 100644
index 0000000..256f5d1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.master.MasterObserver;
+import org.apache.giraph.worker.WorkerObserver;
+import org.apache.log4j.Logger;
+
+/**
+ * An observer for both worker and master that periodically dumps the memory
+ * usage using jmap tool.
+ */
+public class JMapHistoDumper implements MasterObserver, WorkerObserver {
+  /** Logger */
+  private static final Logger LOG = Logger.getLogger(JMapHistoDumper.class);
+
+  /** How many msec to sleep between calls */
+  private int sleepMillis;
+  /** How many lines of output to print */
+  private int linesToPrint;
+
+  /** The jmap printing thread */
+  private Thread thread;
+  /** Halt jmap thread */
+  private boolean stop = false;
+
+  @Override
+  public void preApplication() {
+    // This is called by both WorkerObserver and MasterObserver
+    startJMapThread();
+  }
+
+  @Override
+  public void postApplication() {
+    // This is called by both WorkerObserver and MasterObserver
+    joinJMapThread();
+  }
+
+  /**
+   * Join the jmap thread
+   */
+  private void joinJMapThread() {
+    stop = true;
+    try {
+      thread.join(sleepMillis + 5000);
+    } catch (InterruptedException e) {
+      LOG.error("Failed to join jmap thread");
+    }
+  }
+
+  /**
+   * Start the jmap thread
+   */
+  public void startJMapThread() {
+    stop = false;
+    thread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (!stop) {
+          JMap.heapHistogramDump(linesToPrint);
+          try {
+            Thread.sleep(sleepMillis);
+          } catch (InterruptedException e) {
+            LOG.warn("JMap histogram sleep interrupted", e);
+          }
+        }
+      }
+    });
+    thread.start();
+  }
+
+  @Override
+  public void preSuperstep(long superstep) { }
+
+  @Override
+  public void postSuperstep(long superstep) { }
+
+  @Override
+  public void applicationFailed(Exception e) { }
+
+  @Override
+  public void preSuperstep() { }
+
+  @Override
+  public void postSuperstep() { }
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration configuration) {
+    sleepMillis = configuration.getInt(GiraphConstants.JMAP_SLEEP_MILLIS,
+        GiraphConstants.JMAP_SLEEP_MILLIS_DEFAULT);
+    linesToPrint = configuration.getInt(GiraphConstants.JMAP_PRINT_LINES,
+        GiraphConstants.JMAP_PRINT_LINES_DEFAULT);
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration getConf() {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/06d64a29/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java 
b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 3256a02..3b510b2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -31,6 +31,7 @@ import org.apache.giraph.comm.netty.NettyWorkerClient;
 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
 import org.apache.giraph.comm.netty.NettyWorkerServer;
 import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.graph.GraphTaskManager;
@@ -40,6 +41,7 @@ import org.apache.giraph.graph.InputSplitEvents;
 import org.apache.giraph.graph.FinishedSuperstepStats;
 import org.apache.giraph.graph.AddressesAndPartitionsWritable;
 import org.apache.giraph.graph.GlobalStats;
+import org.apache.giraph.utils.JMapHistoDumper;
 import org.apache.giraph.vertex.Vertex;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.VertexWriter;
@@ -173,28 +175,28 @@ public class BspServiceWorker<I extends 
WritableComparable,
     GraphTaskManager<I, V, E, M> graphTaskManager)
     throws IOException, InterruptedException {
     super(serverPortList, sessionMsecTimeout, context, graphTaskManager);
+    ImmutableClassesGiraphConfiguration conf = getConfiguration();
     partitionExchangeChildrenChanged = new PredicateLock(context);
     registerBspEvent(partitionExchangeChildrenChanged);
     workerGraphPartitioner =
         getGraphPartitionerFactory().createWorkerGraphPartitioner();
     workerInfo = new WorkerInfo();
-    workerServer =
-        new NettyWorkerServer<I, V, E, M>(getConfiguration(), this, context);
+    workerServer = new NettyWorkerServer<I, V, E, M>(conf, this, context);
     workerInfo.setInetSocketAddress(workerServer.getMyAddress());
     workerInfo.setTaskId(getTaskPartition());
-    workerClient =
-        new NettyWorkerClient<I, V, E, M>(context, getConfiguration(), this);
+    workerClient = new NettyWorkerClient<I, V, E, M>(context, conf, this);
 
     workerAggregatorRequestProcessor =
-        new NettyWorkerAggregatorRequestProcessor(getContext(),
-            getConfiguration(), this);
+        new NettyWorkerAggregatorRequestProcessor(getContext(), conf, this);
 
-    this.workerContext = getConfiguration().createWorkerContext(null);
+    workerContext = conf.createWorkerContext(null);
 
-    aggregatorHandler =
-        new WorkerAggregatorHandler(this, getConfiguration(), context);
+    aggregatorHandler = new WorkerAggregatorHandler(this, conf, context);
 
-    observers = getConfiguration().createWorkerObservers();
+    if (conf.isJMapHistogramDumpEnabled()) {
+      conf.addWorkerObserverClass(JMapHistoDumper.class);
+    }
+    observers = conf.createWorkerObservers();
 
     GiraphMetrics.get().addSuperstepResetObserver(this);
   }

Reply via email to