Repository: giraph Updated Branches: refs/heads/trunk beca35e45 -> dc2cd63d0
GIRAPH-856: Log amount of free memory on the command line (majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/dc2cd63d Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/dc2cd63d Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/dc2cd63d Branch: refs/heads/trunk Commit: dc2cd63d0e570064f41326bcc09b3d074fe3a01f Parents: beca35e Author: Maja Kabiljo <[email protected]> Authored: Tue Feb 25 15:09:49 2014 -0800 Committer: Maja Kabiljo <[email protected]> Committed: Tue Feb 25 15:11:07 2014 -0800 ---------------------------------------------------------------------- CHANGELOG | 2 ++ .../giraph/job/CombinedWorkerProgress.java | 27 ++++++++++++++++ .../apache/giraph/worker/BspServiceWorker.java | 1 + .../apache/giraph/worker/WorkerProgress.java | 34 ++++++++++++++++++++ .../giraph/worker/WorkerProgressWriter.java | 1 + 5 files changed, 65 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/dc2cd63d/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 7708e72..8707dd4 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-856: Log amount of free memory on the command line (majakabiljo) + GIRAPH-860: Giraph jobs can hang forever if HDFS filestamps aren't created (aching) http://git-wip-us.apache.org/repos/asf/giraph/blob/dc2cd63d/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java index 0810040..a0410b4 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java @@ -20,11 +20,21 @@ package org.apache.giraph.job; import org.apache.giraph.worker.WorkerProgress; +import com.google.common.collect.Iterables; + +import java.text.DecimalFormat; + +import javax.annotation.concurrent.NotThreadSafe; + /** * Class which combines multiple workers' progresses to get overall * application progress */ +@NotThreadSafe public class CombinedWorkerProgress extends WorkerProgress { + /** Decimal format which rounds numbers to two decimal places */ + public static final DecimalFormat DECIMAL_FORMAT = new DecimalFormat("#.##"); + /** * How many workers have reported that they are in highest reported * superstep @@ -34,6 +44,10 @@ public class CombinedWorkerProgress extends WorkerProgress { * How many workers reported that they finished application */ private int workersDone = 0; + /** Minimum amount of free memory on a worker */ + private double minFreeMemoryMB = Double.MAX_VALUE; + /** Name of the worker with min free memory */ + private int workerWithMinFreeMemory; /** * Constructor @@ -75,6 +89,15 @@ public class CombinedWorkerProgress extends WorkerProgress { if (workerProgress.isStoringDone()) { workersDone++; } + + if (workerProgress.getFreeMemoryMB() < minFreeMemoryMB) { + minFreeMemoryMB = workerProgress.getFreeMemoryMB(); + workerWithMinFreeMemory = workerProgress.getTaskId(); + } + freeMemoryMB += workerProgress.getFreeMemoryMB(); + } + if (!Iterables.isEmpty(workerProgresses)) { + freeMemoryMB /= Iterables.size(workerProgresses); } } @@ -113,6 +136,10 @@ public class CombinedWorkerProgress extends WorkerProgress { sb.append(partitionsStored).append(" out of ").append( partitionsToStore).append(" partitions stored"); } + sb.append("; min free memory on worker ").append( + workerWithMinFreeMemory).append(" - ").append( + DECIMAL_FORMAT.format(minFreeMemoryMB)).append("MB, average ").append( + DECIMAL_FORMAT.format(freeMemoryMB)).append("MB"); return sb.toString(); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/dc2cd63d/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index 88a642a..c0b28dd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -207,6 +207,7 @@ public class BspServiceWorker<I extends WritableComparable, } observers = conf.createWorkerObservers(); + WorkerProgress.get().setTaskId(getTaskPartition()); workerProgressWriter = conf.trackJobProgressOnClient() ? new WorkerProgressWriter(myProgressPath, getZkExt()) : null; http://git-wip-us.apache.org/repos/asf/giraph/blob/dc2cd63d/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java index f7de88b..1a2a6ee 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java @@ -18,6 +18,7 @@ package org.apache.giraph.worker; +import org.apache.giraph.utils.MemoryUtils; import org.apache.giraph.utils.WritableUtils; import org.apache.giraph.zk.ZooKeeperExt; import org.apache.hadoop.io.Writable; @@ -82,6 +83,12 @@ public class WorkerProgress implements Writable { /** Whether worker finished storing data */ protected boolean storingDone = false; + /** Id of the mapper */ + protected int taskId; + + /** Free memory */ + protected double freeMemoryMB; + /** * Get singleton instance of WorkerProgress. * @@ -253,6 +260,17 @@ public class WorkerProgress implements Writable { storingDone = true; } + public synchronized void setTaskId(int taskId) { + this.taskId = taskId; + } + + /** + * Update memory info + */ + public synchronized void updateMemory() { + freeMemoryMB = MemoryUtils.freeMemoryMB(); + } + public synchronized long getCurrentSuperstep() { return currentSuperstep; } @@ -317,6 +335,14 @@ public class WorkerProgress implements Writable { return currentSuperstep == Long.MAX_VALUE; } + public synchronized int getTaskId() { + return taskId; + } + + public synchronized double getFreeMemoryMB() { + return freeMemoryMB; + } + @Override public synchronized void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(currentSuperstep); @@ -340,6 +366,10 @@ public class WorkerProgress implements Writable { dataOutput.writeInt(partitionsToStore); dataOutput.writeInt(partitionsStored); dataOutput.writeBoolean(storingDone); + + dataOutput.writeInt(taskId); + + dataOutput.writeDouble(freeMemoryMB); } @Override @@ -365,5 +395,9 @@ public class WorkerProgress implements Writable { partitionsToStore = dataInput.readInt(); partitionsStored = dataInput.readInt(); storingDone = dataInput.readBoolean(); + + taskId = dataInput.readInt(); + + freeMemoryMB = dataInput.readDouble(); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/dc2cd63d/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java index 81acc13..95e46e4 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java @@ -50,6 +50,7 @@ public class WorkerProgressWriter { public void run() { try { while (!finished) { + WorkerProgress.get().updateMemory(); WorkerProgress.writeToZnode(zk, myProgressPath); double factor = 1 + Math.random(); Thread.sleep((long) (WRITE_UPDATE_PERIOD_MILLISECONDS * factor));
