This should fix it.  It passed local unittests.  Let me know.

Avery

On 11/15/11 1:03 PM, Avery Ching wrote:
Yes, I think I broke it.  Sorry.  Let me get you a diff to test quickly.

Avery

On 11/15/11 12:42 PM, Sebastian Schelter wrote:
Hi,

I updated to the latest trunk (after the GIRAPH-11 commit) and wanted to
continue to work on GIRAPH-51 where I use a small toy graph to test
SimpleShortestPathVertex.

Unfortunately my code did not work anymore and I guess I tracked it down
to the fact that vertex that voted to halt are not reacted anymore when
new messages arrive.

In SimpleShortestPathVertex every vertex always votes to halt and only
gets reactivated when a shorter path to it has been found. However my
test run always finished after superstep 0.

I don't know too much about Giraph's internals yet, but my guess is that
the number of sent messages is not tracked correctly anymore. Therefore
giraph finishes the algorithm (as all vertices voted to halt) although
there should still be messages in the pipeline.

I think I tracked it down to this behavior:

GraphMapper declares a variable workerSentMessages = 0 and never
increases it. This variable is given to
BspServiceWorker.finishSuperstep() which writes it to zookeeper and uses
it to compute the GlobalStats afterwards, which are used to decide
whether a new superstep has to be scheduled. As it has never been
increased, the algorithm will always stop when all vertices voted to halt.

It would be great if someone could confirm/disprove this speculation and
help me to continue work on GIRAPH-51

--sebastian


Index: src/main/java/org/apache/giraph/graph/BspServiceWorker.java
===================================================================
--- src/main/java/org/apache/giraph/graph/BspServiceWorker.java (revision 
1202424)
+++ src/main/java/org/apache/giraph/graph/BspServiceWorker.java (working copy)
@@ -548,7 +548,7 @@
         workerGraphPartitioner.finalizePartitionStats(
             partitionStatsList, workerPartitionMap);
 
-        finishSuperstep(partitionStatsList, 0);
+        finishSuperstep(partitionStatsList);
     }
 
     /**
@@ -773,8 +773,7 @@
     }
 
     @Override
-    public boolean finishSuperstep(List<PartitionStats> partitionStatsList,
-                                   long workersSentMessages) {
+    public boolean finishSuperstep(List<PartitionStats> partitionStatsList) {
         // This barrier blocks until success (or the master signals it to
         // restart).
         //
@@ -785,8 +784,9 @@
         // of this worker
         // 3. Let the master know it is finished.
         // 4. Then it waits for the master to say whether to stop or not.
+        long workerSentMessages = 0;
         try {
-            commService.flush(getContext());
+            workerSentMessages = commService.flush(getContext());
         } catch (IOException e) {
             throw new IllegalStateException(
                 "finishSuperstep: flush failed", e);
@@ -807,7 +807,7 @@
             workerFinishedInfoObj.put(JSONOBJ_PARTITION_STATS_KEY,
                                       Base64.encodeBytes(partitionStatsBytes));
             workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY,
-                                      workersSentMessages);
+                                      workerSentMessages);
         } catch (JSONException e) {
             throw new RuntimeException(e);
         }
Index: src/main/java/org/apache/giraph/graph/GraphMapper.java
===================================================================
--- src/main/java/org/apache/giraph/graph/GraphMapper.java      (revision 
1202424)
+++ src/main/java/org/apache/giraph/graph/GraphMapper.java      (working copy)
@@ -512,7 +512,6 @@
 
         List<PartitionStats> partitionStatsList =
             new ArrayList<PartitionStats>();
-        long workerSentMessages = 0;
         do {
             long superstep = serviceWorker.getSuperstep();
 
@@ -556,7 +555,6 @@
             context.progress();
 
             partitionStatsList.clear();
-            workerSentMessages = 0;
             for (Partition<I, V, E, M> partition :
                     serviceWorker.getPartitionMap().values()) {
                 PartitionStats partitionStats =
@@ -593,8 +591,7 @@
                          " maxMem=" + Runtime.getRuntime().maxMemory() +
                          " freeMem=" + Runtime.getRuntime().freeMemory());
             }
-        } while (!serviceWorker.finishSuperstep(partitionStatsList,
-                                                workerSentMessages));
+        } while (!serviceWorker.finishSuperstep(partitionStatsList));
         if (LOG.isInfoEnabled()) {
             LOG.info("map: BSP application done " +
                      "(global vertices marked done)");
Index: src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
===================================================================
--- src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java   
(revision 1202424)
+++ src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java   
(working copy)
@@ -106,11 +106,9 @@
      * worker level statistics after the computation.
      *
      * @param partitionStatsList All the partition stats for this worker
-     * @param workersSentMessages Number of messages sent on this worker
      * @return true if this is the last superstep, false otherwise
      */
-    boolean finishSuperstep(List<PartitionStats> partitionStatsList,
-                            long workersSentMessages);
+    boolean finishSuperstep(List<PartitionStats> partitionStatsList);
     /**
      * Get the partition that a vertex index would belong to
      *

Reply via email to