Repository: giraph
Updated Branches:
  refs/heads/trunk beca35e45 -> dc2cd63d0


GIRAPH-856: Log amount of free memory on the command line (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/dc2cd63d
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/dc2cd63d
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/dc2cd63d

Branch: refs/heads/trunk
Commit: dc2cd63d0e570064f41326bcc09b3d074fe3a01f
Parents: beca35e
Author: Maja Kabiljo <[email protected]>
Authored: Tue Feb 25 15:09:49 2014 -0800
Committer: Maja Kabiljo <[email protected]>
Committed: Tue Feb 25 15:11:07 2014 -0800

----------------------------------------------------------------------
 CHANGELOG                                       |  2 ++
 .../giraph/job/CombinedWorkerProgress.java      | 27 ++++++++++++++++
 .../apache/giraph/worker/BspServiceWorker.java  |  1 +
 .../apache/giraph/worker/WorkerProgress.java    | 34 ++++++++++++++++++++
 .../giraph/worker/WorkerProgressWriter.java     |  1 +
 5 files changed, 65 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/dc2cd63d/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 7708e72..8707dd4 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-856: Log amount of free memory on the command line (majakabiljo)
+
   GIRAPH-860: Giraph jobs can hang forever if HDFS filestamps aren't 
   created (aching)
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/dc2cd63d/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java 
b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
index 0810040..a0410b4 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
@@ -20,11 +20,21 @@ package org.apache.giraph.job;
 
 import org.apache.giraph.worker.WorkerProgress;
 
+import com.google.common.collect.Iterables;
+
+import java.text.DecimalFormat;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
 /**
  * Class which combines multiple workers' progresses to get overall
  * application progress
  */
+@NotThreadSafe
 public class CombinedWorkerProgress extends WorkerProgress {
+  /** Decimal format which rounds numbers to two decimal places */
+  public static final DecimalFormat DECIMAL_FORMAT = new DecimalFormat("#.##");
+
   /**
    * How many workers have reported that they are in highest reported
    * superstep
@@ -34,6 +44,10 @@ public class CombinedWorkerProgress extends WorkerProgress {
    * How many workers reported that they finished application
    */
   private int workersDone = 0;
+  /** Minimum amount of free memory on a worker */
+  private double minFreeMemoryMB = Double.MAX_VALUE;
+  /** Name of the worker with min free memory */
+  private int workerWithMinFreeMemory;
 
   /**
    * Constructor
@@ -75,6 +89,15 @@ public class CombinedWorkerProgress extends WorkerProgress {
       if (workerProgress.isStoringDone()) {
         workersDone++;
       }
+
+      if (workerProgress.getFreeMemoryMB() < minFreeMemoryMB) {
+        minFreeMemoryMB = workerProgress.getFreeMemoryMB();
+        workerWithMinFreeMemory = workerProgress.getTaskId();
+      }
+      freeMemoryMB += workerProgress.getFreeMemoryMB();
+    }
+    if (!Iterables.isEmpty(workerProgresses)) {
+      freeMemoryMB /= Iterables.size(workerProgresses);
     }
   }
 
@@ -113,6 +136,10 @@ public class CombinedWorkerProgress extends WorkerProgress 
{
       sb.append(partitionsStored).append(" out of ").append(
           partitionsToStore).append(" partitions stored");
     }
+    sb.append("; min free memory on worker ").append(
+        workerWithMinFreeMemory).append(" - ").append(
+        DECIMAL_FORMAT.format(minFreeMemoryMB)).append("MB, average ").append(
+        DECIMAL_FORMAT.format(freeMemoryMB)).append("MB");
     return sb.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/dc2cd63d/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java 
b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 88a642a..c0b28dd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -207,6 +207,7 @@ public class BspServiceWorker<I extends WritableComparable,
     }
     observers = conf.createWorkerObservers();
 
+    WorkerProgress.get().setTaskId(getTaskPartition());
     workerProgressWriter = conf.trackJobProgressOnClient() ?
         new WorkerProgressWriter(myProgressPath, getZkExt()) : null;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/dc2cd63d/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java 
b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
index f7de88b..1a2a6ee 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.worker;
 
+import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.io.Writable;
@@ -82,6 +83,12 @@ public class WorkerProgress implements Writable {
   /** Whether worker finished storing data */
   protected boolean storingDone = false;
 
+  /** Id of the mapper */
+  protected int taskId;
+
+  /** Free memory */
+  protected double freeMemoryMB;
+
   /**
    * Get singleton instance of WorkerProgress.
    *
@@ -253,6 +260,17 @@ public class WorkerProgress implements Writable {
     storingDone = true;
   }
 
+  public synchronized void setTaskId(int taskId) {
+    this.taskId = taskId;
+  }
+
+  /**
+   * Update memory info
+   */
+  public synchronized void updateMemory() {
+    freeMemoryMB = MemoryUtils.freeMemoryMB();
+  }
+
   public synchronized long getCurrentSuperstep() {
     return currentSuperstep;
   }
@@ -317,6 +335,14 @@ public class WorkerProgress implements Writable {
     return currentSuperstep == Long.MAX_VALUE;
   }
 
+  public synchronized int getTaskId() {
+    return taskId;
+  }
+
+  public synchronized double getFreeMemoryMB() {
+    return freeMemoryMB;
+  }
+
   @Override
   public synchronized void write(DataOutput dataOutput) throws IOException {
     dataOutput.writeLong(currentSuperstep);
@@ -340,6 +366,10 @@ public class WorkerProgress implements Writable {
     dataOutput.writeInt(partitionsToStore);
     dataOutput.writeInt(partitionsStored);
     dataOutput.writeBoolean(storingDone);
+
+    dataOutput.writeInt(taskId);
+
+    dataOutput.writeDouble(freeMemoryMB);
   }
 
   @Override
@@ -365,5 +395,9 @@ public class WorkerProgress implements Writable {
     partitionsToStore = dataInput.readInt();
     partitionsStored = dataInput.readInt();
     storingDone = dataInput.readBoolean();
+
+    taskId = dataInput.readInt();
+
+    freeMemoryMB = dataInput.readDouble();
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/dc2cd63d/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java 
b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
index 81acc13..95e46e4 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
@@ -50,6 +50,7 @@ public class WorkerProgressWriter {
       public void run() {
         try {
           while (!finished) {
+            WorkerProgress.get().updateMemory();
             WorkerProgress.writeToZnode(zk, myProgressPath);
             double factor = 1 + Math.random();
             Thread.sleep((long) (WRITE_UPDATE_PERIOD_MILLISECONDS * factor));

Reply via email to