Author: aching
Date: Thu Jul 26 04:06:35 2012
New Revision: 1365878

URL: http://svn.apache.org/viewvc?rev=1365878&view=rev
Log:
GIRAPH-267: Jobs can get killed for not reporting status during INPUT
SUPERSTEP (netj via aching).


Removed:
    giraph/trunk/src/main/java/org/apache/giraph/zk/ContextLock.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java
    giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java

Modified: giraph/trunk/CHANGELOG
URL: 
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1365878&r1=1365877&r2=1365878&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Thu Jul 26 04:06:35 2012
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-267: Jobs can get killed for not reporting status during
+  INPUT SUPERSTEP (netj via aching).
+
   GIRAPH-266: Average aggregators don't calculate real average
   (majakabiljo via aching).
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java?rev=1365878&r1=1365877&r2=1365878&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java 
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java Thu Jul 
26 04:06:35 2012
@@ -208,37 +208,27 @@ public abstract class BspService<I exten
   /** Private ZooKeeper instance that implements the service */
   private final ZooKeeperExt zk;
   /** Has the Connection occurred? */
-  private final BspEvent connectedEvent = new PredicateLock();
+  private final BspEvent connectedEvent;
   /** Has worker registration changed (either healthy or unhealthy) */
-  private final BspEvent workerHealthRegistrationChanged =
-      new PredicateLock();
+  private final BspEvent workerHealthRegistrationChanged;
   /** InputSplits are ready for consumption by workers */
-  private final BspEvent inputSplitsAllReadyChanged =
-      new PredicateLock();
+  private final BspEvent inputSplitsAllReadyChanged;
   /** InputSplit reservation or finished notification and synchronization */
-  private final BspEvent inputSplitsStateChanged =
-      new PredicateLock();
+  private final BspEvent inputSplitsStateChanged;
   /** InputSplits are done being processed by workers */
-  private final BspEvent inputSplitsAllDoneChanged =
-      new PredicateLock();
+  private final BspEvent inputSplitsAllDoneChanged;
   /** InputSplit done by a worker finished notification and synchronization */
-  private final BspEvent inputSplitsDoneStateChanged =
-      new PredicateLock();
+  private final BspEvent inputSplitsDoneStateChanged;
   /** Are the partition assignments to workers ready? */
-  private final BspEvent partitionAssignmentsReadyChanged =
-      new PredicateLock();
+  private final BspEvent partitionAssignmentsReadyChanged;
   /** Application attempt changed */
-  private final BspEvent applicationAttemptChanged =
-      new PredicateLock();
+  private final BspEvent applicationAttemptChanged;
   /** Superstep finished synchronization */
-  private final BspEvent superstepFinished =
-      new PredicateLock();
+  private final BspEvent superstepFinished;
   /** Master election changed for any waited on attempt */
-  private final BspEvent masterElectionChildrenChanged =
-      new PredicateLock();
+  private final BspEvent masterElectionChildrenChanged;
   /** Cleaned up directory children changed*/
-  private final BspEvent cleanedUpChildrenChanged =
-      new PredicateLock();
+  private final BspEvent cleanedUpChildrenChanged;
   /** Registered list of BspEvents */
   private final List<BspEvent> registeredBspEvents =
       new ArrayList<BspEvent>();
@@ -284,6 +274,18 @@ public abstract class BspService<I exten
       int sessionMsecTimeout,
       Mapper<?, ?, ?, ?>.Context context,
       GraphMapper<I, V, E, M> graphMapper) {
+    this.connectedEvent = new PredicateLock(context);
+    this.workerHealthRegistrationChanged = new PredicateLock(context);
+    this.inputSplitsAllReadyChanged = new PredicateLock(context);
+    this.inputSplitsStateChanged = new PredicateLock(context);
+    this.inputSplitsAllDoneChanged = new PredicateLock(context);
+    this.inputSplitsDoneStateChanged = new PredicateLock(context);
+    this.partitionAssignmentsReadyChanged = new PredicateLock(context);
+    this.applicationAttemptChanged = new PredicateLock(context);
+    this.superstepFinished = new PredicateLock(context);
+    this.masterElectionChildrenChanged = new PredicateLock(context);
+    this.cleanedUpChildrenChanged = new PredicateLock(context);
+
     registerBspEvent(connectedEvent);
     registerBspEvent(workerHealthRegistrationChanged);
     registerBspEvent(inputSplitsAllReadyChanged);

Modified: 
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1365878&r1=1365877&r2=1365878&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java 
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java 
Thu Jul 26 04:06:35 2012
@@ -121,8 +121,7 @@ public class BspServiceMaster<I extends 
   /** Last finalized checkpoint */
   private long lastCheckpointedSuperstep = -1;
   /** State of the superstep changed */
-  private final BspEvent superstepStateChanged =
-      new PredicateLock();
+  private final BspEvent superstepStateChanged;
   /** Master graph partitioner */
   private final MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner;
   /** All the partition stats from the last superstep */
@@ -147,6 +146,7 @@ public class BspServiceMaster<I extends 
       Mapper<?, ?, ?, ?>.Context context,
       GraphMapper<I, V, E, M> graphMapper) {
     super(serverPortList, sessionMsecTimeout, context, graphMapper);
+    superstepStateChanged = new PredicateLock(context);
     registerBspEvent(superstepStateChanged);
 
     maxWorkers =

Modified: 
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1365878&r1=1365877&r2=1365878&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java 
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java 
Thu Jul 26 04:06:35 2012
@@ -106,8 +106,7 @@ public class BspServiceWorker<I extends 
   private final Map<Integer, Partition<I, V, E, M>> workerPartitionMap =
       new HashMap<Integer, Partition<I, V, E, M>>();
   /** Have the partition exchange children (workers) changed? */
-  private final BspEvent partitionExchangeChildrenChanged =
-      new PredicateLock();
+  private final BspEvent partitionExchangeChildrenChanged;
   /** Max vertices per partition before sending */
   private final int maxVerticesPerPartition;
   /** Worker Context */
@@ -139,6 +138,7 @@ public class BspServiceWorker<I extends 
     GraphState<I, V, E, M> graphState)
     throws IOException, InterruptedException {
     super(serverPortList, sessionMsecTimeout, context, graphMapper);
+    partitionExchangeChildrenChanged = new PredicateLock(context);
     registerBspEvent(partitionExchangeChildrenChanged);
     maxVerticesPerPartition =
         getConfiguration().getInt(
@@ -654,7 +654,7 @@ public class BspServiceWorker<I extends 
     }
 
     // At this point all vertices have been sent to their destinations.
-    // Move them to the worker, creating creating the empty partitions
+    // Move them to the worker, creating the empty partitions
     movePartitionsToWorker(commService);
     for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
       if (partitionOwner.getWorkerInfo().equals(getWorkerInfo()) &&

Modified: giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java?rev=1365878&r1=1365877&r2=1365878&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java 
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java Thu Jul 
26 04:06:35 2012
@@ -23,14 +23,20 @@ import java.util.concurrent.locks.Condit
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
 
 /**
- * A lock with a predicate that was be used to synchronize events.
+ * A lock with a predicate that was be used to synchronize events and keep the
+ * job context updated while waiting.
  */
 public class PredicateLock implements BspEvent {
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(PredicateLock.class);
+  /** Msecs to refresh the progress meter */
+  private static final int MSEC_PERIOD = 10000;
+  /** Progressable for reporting progress (Job context) */
+  protected final Progressable progressable;
   /** Lock */
   private Lock lock = new ReentrantLock();
   /** Condition associated with lock */
@@ -38,6 +44,15 @@ public class PredicateLock implements Bs
   /** Predicate */
   private boolean eventOccurred = false;
 
+  /**
+   * Constructor.
+   *
+   * @param progressable used to report progress() (usually a Mapper.Context)
+   */
+  public PredicateLock(Progressable progressable) {
+    this.progressable = progressable;
+  }
+
   @Override
   public void reset() {
     lock.lock();
@@ -111,6 +126,8 @@ public class PredicateLock implements Bs
 
   @Override
   public void waitForever() {
-    waitMsecs(-1);
+    while (!waitMsecs(MSEC_PERIOD)) {
+      progressable.progress();
+    }
   }
 }

Modified: giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java?rev=1365878&r1=1365877&r2=1365878&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java 
(original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java Thu Jul 
26 04:06:35 2012
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTru
 
 import org.apache.giraph.zk.BspEvent;
 import org.apache.giraph.zk.PredicateLock;
+import org.apache.hadoop.util.Progressable;
 import org.junit.Test;
 
 /**
@@ -43,12 +44,25 @@ public class TestPredicateLock {
         }
     }
 
+    private Progressable stubContext;
+
+    private Progressable getStubProgressable() {
+        if (stubContext == null)
+            stubContext = new Progressable() {
+                @Override
+                    public void progress() {
+                        System.out.println("progress received");
+                    }
+            };
+        return stubContext;
+    }
+
     /**
      * Make sure the the event is not signaled.
      */
   @Test
     public void testWaitMsecsNoEvent() {
-        BspEvent event = new PredicateLock();
+        BspEvent event = new PredicateLock(getStubProgressable());
         boolean gotPredicate = event.waitMsecs(50);
         assertFalse(gotPredicate);
     }
@@ -58,7 +72,7 @@ public class TestPredicateLock {
      */
   @Test
     public void testEvent() {
-        BspEvent event = new PredicateLock();
+        BspEvent event = new PredicateLock(getStubProgressable());
         event.signal();
         boolean gotPredicate = event.waitMsecs(-1);
         assertTrue(gotPredicate );
@@ -74,7 +88,7 @@ public class TestPredicateLock {
   @Test
     public void testWaitMsecs() {
         System.out.println("testWaitMsecs:");
-        BspEvent event = new PredicateLock();
+        BspEvent event = new PredicateLock(getStubProgressable());
         Thread signalThread = new SignalThread(event);
         signalThread.start();
         boolean gotPredicate = event.waitMsecs(2000);


Reply via email to