Updated Branches:
  refs/heads/trunk 9325fa986 -> 4d520645f

GIRAPH-838: setup time & total time counter also include time spent waiting for 
machines (pavanka via majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/4d520645
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/4d520645
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/4d520645

Branch: refs/heads/trunk
Commit: 4d520645ff53b9a354fef57074c7661f3bc46e39
Parents: 9325fa9
Author: Maja Kabiljo <[email protected]>
Authored: Mon Feb 3 09:51:28 2014 -0800
Committer: Maja Kabiljo <[email protected]>
Committed: Mon Feb 3 09:51:28 2014 -0800

----------------------------------------------------------------------
 CHANGELOG                                       |  3 +++
 .../giraph/bsp/CentralizedServiceMaster.java    | 12 +++++++++++
 .../apache/giraph/counters/GiraphTimers.java    | 22 +++++++++++++++++---
 .../apache/giraph/master/BspServiceMaster.java  | 10 ++-------
 .../org/apache/giraph/master/MasterThread.java  | 14 ++++++++++---
 5 files changed, 47 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/4d520645/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index cef754e..b707cc2 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-838: setup time & total time counter also include time spent waiting 
for machines
+  (pavanka via majakabiljo)
+
   GIRAPH-839: NettyWorkerAggregatorRequestProcessor tries to reuse request 
objects 
   (pavanka via majakabiljo)
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/4d520645/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java 
b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
index 999888d..bda967d 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
@@ -21,11 +21,13 @@ package org.apache.giraph.bsp;
 import org.apache.giraph.master.MasterAggregatorHandler;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.master.MasterInfo;
+import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.zookeeper.KeeperException;
 
 import java.io.IOException;
+import java.util.List;
 
 /**
  * At most, there will be one active master at a time, but many threads can
@@ -58,6 +60,16 @@ public interface CentralizedServiceMaster<I extends 
WritableComparable,
   MasterInfo getMasterInfo();
 
   /**
+   * Check all the {@link org.apache.giraph.worker.WorkerInfo} objects to 
ensure
+   * that a minimum number of good workers exists out of the total that have
+   * reported.
+   *
+   * @return List of of healthy workers such that the minimum has been
+   *         met, otherwise null
+   */
+  List<WorkerInfo> checkWorkers();
+
+  /**
    * Create the {@link BspInputSplit} objects from the index range based on the
    * user-defined VertexInputFormat.  The {@link BspInputSplit} objects will
    * processed by the workers later on during the INPUT_SUPERSTEP.

http://git-wip-us.apache.org/repos/asf/giraph/blob/4d520645/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java 
b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java
index cbf2470..56915b6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java
+++ b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java
@@ -39,18 +39,22 @@ public class GiraphTimers extends HadoopCountersBase {
   public static final String TOTAL_MS_NAME = "Total (ms)";
   /** Counter name for shutdown msec */
   public static final String SHUTDOWN_MS_NAME = "Shutdown (ms)";
+  /** Counter name for initialize msec */
+  public static final String INITIALIZE_MS_NAME = "Initialize (ms)";
 
   /** Singleton instance for everyone to use */
   private static GiraphTimers INSTANCE;
 
   /** Setup time in msec */
   private static final int SETUP_MS = 0;
-  /** Total time in msec */
+  /** Total time in msec (doesn't include initialize time) */
   private static final int TOTAL_MS = 1;
   /** Shutdown time in msec */
   private static final int SHUTDOWN_MS = 2;
+  /** Total time it takes to get minimum machines */
+  private static final int INITIALIZE_MS = 3;
   /** How many whole job counters we have */
-  private static final int NUM_COUNTERS = 3;
+  private static final int NUM_COUNTERS = 4;
 
   /** superstep time in msec */
   private final Map<Long, GiraphHadoopCounter> superstepMsec;
@@ -69,6 +73,7 @@ public class GiraphTimers extends HadoopCountersBase {
     jobCounters[SETUP_MS] = getCounter(SETUP_MS_NAME);
     jobCounters[TOTAL_MS] = getCounter(TOTAL_MS_NAME);
     jobCounters[SHUTDOWN_MS] = getCounter(SHUTDOWN_MS_NAME);
+    jobCounters[INITIALIZE_MS] = getCounter(INITIALIZE_MS_NAME);
     superstepMsec = Maps.newHashMap();
   }
 
@@ -124,7 +129,8 @@ public class GiraphTimers extends HadoopCountersBase {
   }
 
   /**
-   * Get counter for total time in milliseconds.
+   * Get counter for total time in milliseconds (doesn't include initialize
+   * time).
    *
    * @return Counter for total time in milliseconds.
    */
@@ -142,6 +148,16 @@ public class GiraphTimers extends HadoopCountersBase {
   }
 
   /**
+   * Get counter for initializing the process,
+   * having to wait for a minimum number of processes to be available
+   * before setup step
+   * @return Counter for initializing in milliseconds
+   */
+  public GiraphHadoopCounter getInitializeMs() {
+    return jobCounters[INITIALIZE_MS];
+  }
+
+  /**
    * Get map of superstep to msec counter.
    *
    * @return mapping of superstep to msec counter.

http://git-wip-us.apache.org/repos/asf/giraph/blob/4d520645/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java 
b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index cfee4c5..90dc9f3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -459,14 +459,8 @@ public class BspServiceMaster<I extends WritableComparable,
     }
   }
 
-  /**
-   * Check all the {@link WorkerInfo} objects to ensure that a minimum
-   * number of good workers exists out of the total that have reported.
-   *
-   * @return List of of healthy workers such that the minimum has been
-   *         met, otherwise null
-   */
-  private List<WorkerInfo> checkWorkers() {
+  @Override
+  public List<WorkerInfo> checkWorkers() {
     boolean failJob = true;
     long failWorkerCheckMsecs =
         SystemTime.get().getMilliseconds() + maxSuperstepWaitMsecs;

http://git-wip-us.apache.org/repos/asf/giraph/blob/4d520645/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java 
b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
index ec1733c..15dbe07 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
@@ -93,15 +93,23 @@ public class MasterThread<I extends WritableComparable, V 
extends Writable,
     // 3. Run all supersteps until complete
     try {
       long startMillis = System.currentTimeMillis();
+      long initializeMillis = 0;
       long endMillis = 0;
       bspServiceMaster.setup();
       if (bspServiceMaster.becomeMaster()) {
+        // First call to checkWorkers waits for all pending resources.
+        // If these resources are still available at subsequent calls it just
+        // reads zookeeper for the list of healthy workers.
+        bspServiceMaster.checkWorkers();
+        initializeMillis = System.currentTimeMillis();
+        GiraphTimers.getInstance().getInitializeMs().increment(
+            initializeMillis - startMillis);
         // Attempt to create InputSplits if necessary. Bail out if that fails.
         if (bspServiceMaster.getRestartedSuperstep() !=
             BspService.UNSET_SUPERSTEP ||
             (bspServiceMaster.createVertexInputSplits() != -1 &&
                 bspServiceMaster.createEdgeInputSplits() != -1)) {
-          long setupMillis = System.currentTimeMillis() - startMillis;
+          long setupMillis = System.currentTimeMillis() - initializeMillis;
           GiraphTimers.getInstance().getSetupMs().increment(setupMillis);
           setupSecs = setupMillis / 1000.0d;
           SuperstepState superstepState = SuperstepState.INITIAL;
@@ -169,11 +177,11 @@ public class MasterThread<I extends WritableComparable, V 
extends Writable,
               (System.currentTimeMillis() - endMillis) /
               1000.0d + " seconds.");
           LOG.info("total: Took " +
-              ((System.currentTimeMillis() - startMillis) /
+              ((System.currentTimeMillis() - initializeMillis) /
               1000.0d) + " seconds.");
         }
         GiraphTimers.getInstance().getTotalMs().
-          increment(System.currentTimeMillis() - startMillis);
+          increment(System.currentTimeMillis() - initializeMillis);
       }
       bspServiceMaster.postApplication();
       // CHECKSTYLE: stop IllegalCatchCheck

Reply via email to