dlogothetis commented on a change in pull request #110: Counter Mechanism
URL: https://github.com/apache/giraph/pull/110#discussion_r343959528
 
 

 ##########
 File path: 
giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
 ##########
 @@ -1794,6 +1808,155 @@ private void doMasterCompute() {
     timerContext.stop();
   }
 
+  /**
+   * Use the counterGroupAndNames and context, to get the counter values,
+   * create a custom counter out of each, and add to the set of counters
+   * @param context Job context
+   * @param counterGroupAndNames List of counter names
+   * @param counters Set of CustomCounter which will be populated
+   */
+  private void populateCountersFromContext(Mapper.Context context,
+           Map<String, Set<String>> counterGroupAndNames,
+           Set<CustomCounter> counters) {
+    Counter counter;
+    for (Map.Entry<String, Set<String>> entry :
+            counterGroupAndNames.entrySet()) {
+      String groupName = entry.getKey();
+      for (String counterName: entry.getValue()) {
+        CustomCounter customCounter = new CustomCounter(groupName, counterName,
+                CustomCounter.Aggregation.SUM);
+        counter = context.getCounter(groupName, counterName);
+        customCounter.setValue(counter.getValue());
+        counters.add(customCounter);
+      }
+    }
+  }
+
+  /**
+   * Receive the counters from the workers, and aggregate them with the
+   * master counters.
+   * This method is called at the end of each superstep, and after the job
+   * finishes successfully. In order to ensure best-effort counter values
+   * in case of a job failure, we call this at the end of every superstep
+   * The aggregated counters are stored in a thrift struct
+   */
+  private void aggregateCountersFromWorkersAndMaster() {
+    CustomCounters customCounters = new CustomCounters();
+    // Get the stats from the all the worker selected nodes
+    String workerFinishedPath = getWorkerCountersFinishedPath(
+            getApplicationAttempt(), getSuperstep());
+    List<String> workerFinishedPathList = null;
+    long waitForCountersTimeout =
+            SystemTime.get().getMilliseconds() + maxCounterWaitMsecs;
+    // Subtract 1 for the master
+    int numWorkers = BspInputFormat.getMaxTasks(getConfiguration()) - 1;
+    if (numWorkers == 0) {
+      // When the job is run with 1 worker, numWorkers would be 0,
+      // and thus add 1 to it, to get the worker-related counters
+      numWorkers += 1;
+    }
+    // Get the counter values from the zookeeper, written by the workers
+    // We keep retrying until all the workers have written
+    while (SystemTime.get().getMilliseconds() < waitForCountersTimeout) {
+      try {
+        workerFinishedPathList = getZkExt().getChildrenExt(
+                workerFinishedPath, true,
+                false, true);
+        LOG.info(String.format("Fetching counter values from " +
+                        "workers. Got %d out of %d",
+                workerFinishedPathList.size(), numWorkers));
+        if (workerFinishedPathList.size() == numWorkers) {
+          break;
+        }
+      } catch (KeeperException e) {
+        LOG.info("Got Keeper exception, but will retry: ", e);
 
 Review comment:
   nit: make this `LOG.warn()`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to