Author: aching
Date: Mon Oct 22 23:12:13 2012
New Revision: 1401120

URL: http://svn.apache.org/viewvc?rev=1401120&view=rev
Log:
GIRAPH-382: ZooKeeperExt should handle ConnectionLossException by
retrying. (aching)

Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/TestZooKeeperExt.java

Modified: giraph/trunk/CHANGELOG
URL: 
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1401120&r1=1401119&r2=1401120&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Mon Oct 22 23:12:13 2012
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-382: ZooKeeperExt should handle ConnectionLossException by
+  retrying. (aching)
+
   GIRAPH-381: Ensure we get the original exception from
   GraphMapper#run(). (aching)
 

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java?rev=1401120&r1=1401119&r2=1401120&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java 
(original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java 
Mon Oct 22 23:12:13 2012
@@ -161,6 +161,24 @@ public class GiraphConfiguration extends
   /** Local ZooKeeper directory to use */
   public static final String ZOOKEEPER_DIR = "giraph.zkDir";
 
+  /** Max attempts for handling ZooKeeper connection loss */
+  public static final String ZOOKEEPER_OPS_MAX_ATTEMPTS =
+      "giraph.zkOpsMaxAttempts";
+  /** Default of 3 attempts for handling ZooKeeper connection loss */
+  public static final int ZOOKEEPER_OPS_MAX_ATTEMPTS_DEFAULT = 3;
+
+  /**
+   * Msecs to wait before retrying a failed ZooKeeper op due to connection
+   * loss.
+   */
+  public static final String ZOOKEEPER_OPS_RETRY_WAIT_MSECS =
+      "giraph.zkOpsRetryWaitMsecs";
+  /**
+   * Default to wait 5 seconds before retrying a failed ZooKeeper op due to
+   * connection loss.
+   */
+  public static final int ZOOKEEPER_OPS_RETRY_WAIT_MSECS_DEFAULT = 5 * 1000;
+
   /** TCP backlog (defaults to number of workers) */
   public static final String TCP_BACKLOG = "giraph.tcpBacklog";
   /**
@@ -796,6 +814,16 @@ public class GiraphConfiguration extends
         ZOOKEEPER_SESSION_TIMEOUT_DEFAULT);
   }
 
+  public int getZookeeperOpsMaxAttempts() {
+    return getInt(ZOOKEEPER_OPS_MAX_ATTEMPTS,
+        ZOOKEEPER_OPS_MAX_ATTEMPTS_DEFAULT);
+  }
+
+  public int getZookeeperOpsRetryWaitMsecs() {
+    return getInt(ZOOKEEPER_OPS_RETRY_WAIT_MSECS,
+        ZOOKEEPER_OPS_RETRY_WAIT_MSECS_DEFAULT);
+  }
+
   public boolean getNettyServerUseExecutionHandler() {
     return getBoolean(NETTY_SERVER_USE_EXECUTION_HANDLER,
         NETTY_SERVER_USE_EXECUTION_HANDLER_DEFAULT);

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java?rev=1401120&r1=1401119&r2=1401120&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java 
(original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java 
Mon Oct 22 23:12:13 2012
@@ -333,8 +333,12 @@ public abstract class BspService<I exten
           ", " + getTaskPartition() + " on " + serverPortList);
     }
     try {
-      this.zk = new ZooKeeperExt(
-          serverPortList, sessionMsecTimeout, this, context);
+      this.zk = new ZooKeeperExt(serverPortList,
+                                 sessionMsecTimeout,
+                                 conf.getZookeeperOpsMaxAttempts(),
+                                 conf.getZookeeperOpsRetryWaitMsecs(),
+                                 this,
+                                 context);
       connectedEvent.waitForever();
       this.fs = FileSystem.get(getConfiguration());
     } catch (IOException e) {

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1401120&r1=1401119&r2=1401120&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java 
(original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java 
Mon Oct 22 23:12:13 2012
@@ -586,10 +586,11 @@ public class BspServiceMaster<I extends 
 
     // Let workers know they can start trying to load the input splits
     try {
-      getZkExt().create(inputSplitsAllReadyPath,
-          null,
-          Ids.OPEN_ACL_UNSAFE,
-          CreateMode.PERSISTENT);
+      getZkExt().createExt(inputSplitsAllReadyPath,
+                           null,
+                           Ids.OPEN_ACL_UNSAFE,
+                           CreateMode.PERSISTENT,
+                           false);
     } catch (KeeperException.NodeExistsException e) {
       LOG.info("createInputSplits: Node " +
           inputSplitsAllReadyPath + " already exists.");
@@ -1319,10 +1320,11 @@ public class BspServiceMaster<I extends 
             "(currently not supported)");
       }
       try {
-        getZkExt().create(inputSplitsAllDonePath,
-            null,
-            Ids.OPEN_ACL_UNSAFE,
-            CreateMode.PERSISTENT);
+        getZkExt().createExt(inputSplitsAllDonePath,
+                             null,
+                             Ids.OPEN_ACL_UNSAFE,
+                             CreateMode.PERSISTENT,
+                             false);
       } catch (KeeperException.NodeExistsException e) {
         LOG.info("coordinateInputSplits: Node " +
             inputSplitsAllDonePath + " already exists.");

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1401120&r1=1401119&r2=1401120&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java 
(original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java 
Mon Oct 22 23:12:13 2012
@@ -571,7 +571,7 @@ else[HADOOP_NON_SECURE]*/
     LOG.error("unregisterHealth: Got failure, unregistering health on " +
         myHealthZnode + " on superstep " + getSuperstep());
     try {
-      getZkExt().delete(myHealthZnode, -1);
+      getZkExt().deleteExt(myHealthZnode, -1, false);
     } catch (InterruptedException e) {
       throw new IllegalStateException(
           "unregisterHealth: InterruptedException - Couldn't delete " +

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java?rev=1401120&r1=1401119&r2=1401120&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java 
(original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java 
Mon Oct 22 23:12:13 2012
@@ -37,16 +37,23 @@ import org.apache.zookeeper.ZooKeeper;
 
 /**
  * ZooKeeper provides only atomic operations.  ZooKeeperExt provides additional
- * non-atomic operations that are useful.  All methods of this class should
- * be thread-safe.
+ * non-atomic operations that are useful.  It also provides wrappers to
+ * deal with ConnectionLossException.  All methods of this class
+ * should be thread-safe.
  */
-public class ZooKeeperExt extends ZooKeeper {
+public class ZooKeeperExt {
   /** Internal logger */
   private static final Logger LOG = Logger.getLogger(ZooKeeperExt.class);
   /** Length of the ZK sequence number */
   private static final int SEQUENCE_NUMBER_LENGTH = 10;
+  /** Internal ZooKeeper */
+  private final ZooKeeper zooKeeper;
   /** Ensure we have progress */
   private final Progressable progressable;
+  /** Number of max attempts to retry when failing due to connection loss */
+  private final int maxRetryAttempts;
+  /** Milliseconds to wait before trying again due to connection loss */
+  private final int retryWaitMsecs;
 
   /**
    * Constructor to connect to ZooKeeper, does not make progress
@@ -61,14 +68,20 @@ public class ZooKeeperExt extends ZooKee
    *        "/foo/bar" would result in operations being run on
    *        "/app/a/foo/bar" (from the server perspective).
    * @param sessionTimeout Session timeout in milliseconds
+   * @param maxRetryAttempts Max retry attempts during connection loss
+   * @param retryWaitMsecs Msecs to wait when retrying due to connection
+   *        loss
    * @param watcher A watcher object which will be notified of state changes,
    *        may also be notified for node events
    * @throws IOException
    */
   public ZooKeeperExt(String connectString,
                       int sessionTimeout,
+                      int maxRetryAttempts,
+                      int retryWaitMsecs,
                       Watcher watcher) throws IOException {
-    this(connectString, sessionTimeout, watcher, null);
+    this(connectString, sessionTimeout, maxRetryAttempts,
+        retryWaitMsecs, watcher, null);
   }
 
   /**
@@ -84,6 +97,9 @@ public class ZooKeeperExt extends ZooKee
    *        "/foo/bar" would result in operations being run on
    *        "/app/a/foo/bar" (from the server perspective).
    * @param sessionTimeout Session timeout in milliseconds
+   * @param maxRetryAttempts Max retry attempts during connection loss
+   * @param retryWaitMsecs Msecs to wait when retrying due to connection
+   *        loss
    * @param watcher A watcher object which will be notified of state changes,
    *        may also be notified for node events
    * @param progressable Makes progress for longer operations
@@ -91,10 +107,14 @@ public class ZooKeeperExt extends ZooKee
    */
   public ZooKeeperExt(String connectString,
       int sessionTimeout,
+      int maxRetryAttempts,
+      int retryWaitMsecs,
       Watcher watcher,
       Progressable progressable) throws IOException {
-    super(connectString, sessionTimeout, watcher);
+    this.zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
     this.progressable = progressable;
+    this.maxRetryAttempts = maxRetryAttempts;
+    this.retryWaitMsecs = retryWaitMsecs;
   }
 
   /**
@@ -121,34 +141,46 @@ public class ZooKeeperExt extends ZooKee
       LOG.debug("createExt: Creating path " + path);
     }
 
-    if (!recursive) {
-      return create(path, data, acl, createMode);
-    }
-
-    try {
-      return create(path, data, acl, createMode);
-    } catch (KeeperException.NoNodeException e) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("createExt: Cannot directly create node " + path);
-      }
-    }
-
-    int pos = path.indexOf("/", 1);
-    for (; pos != -1; pos = path.indexOf("/", pos + 1)) {
+    int attempt = 0;
+    while (attempt < maxRetryAttempts) {
       try {
-        if (progressable != null) {
-          progressable.progress();
+        if (!recursive) {
+          return zooKeeper.create(path, data, acl, createMode);
         }
-        create(
-            path.substring(0, pos), null, acl, CreateMode.PERSISTENT);
-      } catch (KeeperException.NodeExistsException e) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("createExt: Znode " + path.substring(0, pos) +
-              " already exists");
+
+        try {
+          return zooKeeper.create(path, data, acl, createMode);
+        } catch (KeeperException.NoNodeException e) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("createExt: Cannot directly create node " + path);
+          }
+        }
+
+        int pos = path.indexOf("/", 1);
+        for (; pos != -1; pos = path.indexOf("/", pos + 1)) {
+          try {
+            if (progressable != null) {
+              progressable.progress();
+            }
+            zooKeeper.create(
+                path.substring(0, pos), null, acl, CreateMode.PERSISTENT);
+          } catch (KeeperException.NodeExistsException e) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("createExt: Znode " + path.substring(0, pos) +
+                  " already exists");
+            }
+          }
         }
+        return zooKeeper.create(path, data, acl, createMode);
+      } catch (KeeperException.ConnectionLossException e) {
+        LOG.warn("createExt: Connection loss on attempt " + attempt + ", " +
+            "waiting " + retryWaitMsecs + " msecs before retrying.", e);
       }
+      ++attempt;
+      Thread.sleep(retryWaitMsecs);
     }
-    return create(path, data, acl, createMode);
+    throw new IllegalStateException("createExt: Failed to create " + path  +
+        " after " + attempt + " tries!");
   }
 
   /**
@@ -217,7 +249,7 @@ public class ZooKeeperExt extends ZooKee
       if (LOG.isDebugEnabled()) {
         LOG.debug("createOrSet: Node exists on path " + path);
       }
-      setStat = setData(path, data, version);
+      setStat = zooKeeper.setData(path, data, version);
     }
     return new PathStat(createdPath, setStat);
   }
@@ -263,29 +295,189 @@ public class ZooKeeperExt extends ZooKee
    */
   public void deleteExt(final String path, int version, boolean recursive)
     throws InterruptedException, KeeperException {
-    if (!recursive) {
-      delete(path, version);
-      return;
+    int attempt = 0;
+    while (attempt < maxRetryAttempts) {
+      try {
+        if (!recursive) {
+          zooKeeper.delete(path, version);
+          return;
+        }
+
+        try {
+          zooKeeper.delete(path, version);
+          return;
+        } catch (KeeperException.NotEmptyException e) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("deleteExt: Cannot directly remove node " + path);
+          }
+        }
+
+        List<String> childList = zooKeeper.getChildren(path, false);
+        for (String child : childList) {
+          if (progressable != null) {
+            progressable.progress();
+          }
+          deleteExt(path + "/" + child, -1, true);
+        }
+
+        zooKeeper.delete(path, version);
+        return;
+      } catch (KeeperException.ConnectionLossException e) {
+        LOG.warn("deleteExt: Connection loss on attempt " +
+            attempt + ", waiting " + retryWaitMsecs +
+            " msecs before retrying.", e);
+      }
+      ++attempt;
+      Thread.sleep(retryWaitMsecs);
     }
+    throw new IllegalStateException("deleteExt: Failed to delete " + path  +
+        " after " + attempt + " tries!");
+  }
 
-    try {
-      delete(path, version);
-      return;
-    } catch (KeeperException.NotEmptyException e) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("deleteExt: Cannot directly remove node " + path);
+  /**
+   * Return the stat of the node of the given path. Return null if no such a
+   * node exists.
+   * <p>
+   * If the watch is true and the call is successful (no exception is thrown),
+   * a watch will be left on the node with the given path. The watch will be
+   * triggered by a successful operation that creates/delete the node or sets
+   * the data on the node.
+   *
+   * @param path
+   *                the node path
+   * @param watch
+   *                whether need to watch this node
+   * @return the stat of the node of the given path; return null if no such a
+   *         node exists.
+   * @throws KeeperException If the server signals an error
+   * @throws InterruptedException If the server transaction is interrupted.
+   */
+  public Stat exists(String path, boolean watch) throws KeeperException,
+      InterruptedException {
+    int attempt = 0;
+    while (attempt < maxRetryAttempts) {
+      try {
+        return zooKeeper.exists(path, watch);
+      } catch (KeeperException.ConnectionLossException e) {
+        LOG.warn("exists: Connection loss on attempt " +
+            attempt + ", waiting " + retryWaitMsecs +
+            " msecs before retrying.", e);
       }
+      ++attempt;
+      Thread.sleep(retryWaitMsecs);
     }
+    throw new IllegalStateException("exists: Failed to check " + path  +
+        " after " + attempt + " tries!");
+  }
 
-    List<String> childList = getChildren(path, false);
-    for (String child : childList) {
-      if (progressable != null) {
-        progressable.progress();
+  /**
+   * Return the stat of the node of the given path. Return null if no such a
+   * node exists.
+   * <p>
+   * If the watch is non-null and the call is successful (no exception is
+   * thrown), a watch will be left on the node with the given path. The
+   * watch will be triggered by a successful operation that
+   * creates/delete the node or sets the data on the node.
+   *
+   * @param path the node path
+   * @param watcher explicit watcher
+   * @return the stat of the node of the given path; return null if no such a
+   *         node exists.
+   * @throws KeeperException If the server signals an error
+   * @throws InterruptedException If the server transaction is interrupted.
+   * @throws IllegalArgumentException if an invalid path is specified
+   */
+  public Stat exists(final String path, Watcher watcher)
+    throws KeeperException, InterruptedException {
+    int attempt = 0;
+    while (attempt < maxRetryAttempts) {
+      try {
+        return zooKeeper.exists(path, watcher);
+      } catch (KeeperException.ConnectionLossException e) {
+        LOG.warn("exists: Connection loss on attempt " +
+            attempt + ", waiting " + retryWaitMsecs +
+            " msecs before retrying.", e);
       }
-      deleteExt(path + "/" + child, -1, true);
+      ++attempt;
+      Thread.sleep(retryWaitMsecs);
     }
+    throw new IllegalStateException("exists: Failed to check " + path  +
+        " after " + attempt + " tries!");
+  }
 
-    delete(path, version);
+  /**
+   * Return the data and the stat of the node of the given path.
+   * <p>
+   * If the watch is non-null and the call is successful (no exception is
+   * thrown), a watch will be left on the node with the given path. The watch
+   * will be triggered by a successful operation that sets data on the node, or
+   * deletes the node.
+   * <p>
+   * A KeeperException with error code KeeperException.NoNode will be thrown
+   * if no node with the given path exists.
+   *
+   * @param path the given path
+   * @param watcher explicit watcher
+   * @param stat the stat of the node
+   * @return the data of the node
+   * @throws KeeperException If the server signals an error with a non-zero
+   *         error code
+   * @throws InterruptedException If the server transaction is interrupted.
+   * @throws IllegalArgumentException if an invalid path is specified
+   */
+  public byte[] getData(final String path, Watcher watcher, Stat stat)
+    throws KeeperException, InterruptedException {
+    int attempt = 0;
+    while (attempt < maxRetryAttempts) {
+      try {
+        return zooKeeper.getData(path, watcher, stat);
+      } catch (KeeperException.ConnectionLossException e) {
+        LOG.warn("getData: Connection loss on attempt " +
+            attempt + ", waiting " + retryWaitMsecs +
+            " msecs before retrying.", e);
+      }
+      ++attempt;
+      Thread.sleep(retryWaitMsecs);
+    }
+    throw new IllegalStateException("getData: Failed to get " + path  +
+        " after " + attempt + " tries!");
+  }
+
+  /**
+   * Return the data and the stat of the node of the given path.
+   * <p>
+   * If the watch is true and the call is successful (no exception is
+   * thrown), a watch will be left on the node with the given path. The watch
+   * will be triggered by a successful operation that sets data on the node, or
+   * deletes the node.
+   * <p>
+   * A KeeperException with error code KeeperException.NoNode will be thrown
+   * if no node with the given path exists.
+   *
+   * @param path the given path
+   * @param watch whether need to watch this node
+   * @param stat the stat of the node
+   * @return the data of the node
+   * @throws KeeperException If the server signals an error with a non-zero
+   *         error code
+   * @throws InterruptedException If the server transaction is interrupted.
+   */
+  public byte[] getData(String path, boolean watch, Stat stat)
+    throws KeeperException, InterruptedException {
+    int attempt = 0;
+    while (attempt < maxRetryAttempts) {
+      try {
+        return zooKeeper.getData(path, watch, stat);
+      } catch (KeeperException.ConnectionLossException e) {
+        LOG.warn("getData: Connection loss on attempt " +
+            attempt + ", waiting " + retryWaitMsecs +
+            " msecs before retrying.", e);
+      }
+      ++attempt;
+      Thread.sleep(retryWaitMsecs);
+    }
+    throw new IllegalStateException("getData: Failed to get " + path  +
+        " after " + attempt + " tries!");
   }
 
   /**
@@ -306,37 +498,62 @@ public class ZooKeeperExt extends ZooKee
       boolean watch,
       boolean sequenceSorted,
       boolean fullPath) throws KeeperException, InterruptedException {
-    List<String> childList = getChildren(path, watch);
-    /* Sort children according to the sequence number, if desired */
-    if (sequenceSorted) {
-      Collections.sort(childList, new Comparator<String>() {
-        public int compare(String s1, String s2) {
-          if ((s1.length() <= SEQUENCE_NUMBER_LENGTH) ||
-              (s2.length() <= SEQUENCE_NUMBER_LENGTH)) {
-            throw new RuntimeException(
-                "getChildrenExt: Invalid length for sequence " +
-                    " sorting > " +
-                    SEQUENCE_NUMBER_LENGTH +
-                    " for s1 (" +
-                    s1.length() + ") or s2 (" + s2.length() + ")");
+    int attempt = 0;
+    while (attempt < maxRetryAttempts) {
+      try {
+        List<String> childList = zooKeeper.getChildren(path, watch);
+        /* Sort children according to the sequence number, if desired */
+        if (sequenceSorted) {
+          Collections.sort(childList, new Comparator<String>() {
+            public int compare(String s1, String s2) {
+              if ((s1.length() <= SEQUENCE_NUMBER_LENGTH) ||
+                  (s2.length() <= SEQUENCE_NUMBER_LENGTH)) {
+                throw new RuntimeException(
+                    "getChildrenExt: Invalid length for sequence " +
+                        " sorting > " +
+                        SEQUENCE_NUMBER_LENGTH +
+                        " for s1 (" +
+                        s1.length() + ") or s2 (" + s2.length() + ")");
+              }
+              int s1sequenceNumber = Integer.parseInt(
+                  s1.substring(s1.length() -
+                      SEQUENCE_NUMBER_LENGTH));
+              int s2sequenceNumber = Integer.parseInt(
+                  s2.substring(s2.length() -
+                      SEQUENCE_NUMBER_LENGTH));
+              return s1sequenceNumber - s2sequenceNumber;
+            }
+          });
+        }
+        if (fullPath) {
+          List<String> fullChildList = new ArrayList<String>();
+          for (String child : childList) {
+            fullChildList.add(path + "/" + child);
           }
-          int s1sequenceNumber = Integer.parseInt(
-              s1.substring(s1.length() -
-                  SEQUENCE_NUMBER_LENGTH));
-          int s2sequenceNumber = Integer.parseInt(
-              s2.substring(s2.length() -
-                  SEQUENCE_NUMBER_LENGTH));
-          return s1sequenceNumber - s2sequenceNumber;
+          return fullChildList;
         }
-      });
-    }
-    if (fullPath) {
-      List<String> fullChildList = new ArrayList<String>();
-      for (String child : childList) {
-        fullChildList.add(path + "/" + child);
+        return childList;
+      } catch (KeeperException.ConnectionLossException e) {
+        LOG.warn("getChildrenExt: Connection loss on attempt " +
+            attempt + ", waiting " + retryWaitMsecs +
+            " msecs before retrying.", e);
       }
-      return fullChildList;
+      ++attempt;
+      Thread.sleep(retryWaitMsecs);
     }
-    return childList;
+    throw new IllegalStateException("createExt: Failed to create " + path  +
+        " after " + attempt + " tries!");
+  }
+
+  /**
+   * Close this client object. Once the client is closed, its session becomes
+   * invalid. All the ephemeral nodes in the ZooKeeper server associated with
+   * the session will be removed. The watches left on those nodes (and on
+   * their parents) will be triggered.
+   *
+   * @throws InterruptedException
+   */
+  public void close() throws InterruptedException {
+    zooKeeper.close();
   }
 }

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java?rev=1401120&r1=1401119&r2=1401120&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java Mon Oct 22 
23:12:13 2012
@@ -346,25 +346,23 @@ public class BspCase implements Watcher 
       numWorkers = 3;
     }
     try {
-
       cleanupTemporaryFiles();
 
       if (zkList == null) {
         return;
       }
       ZooKeeperExt zooKeeperExt =
-          new ZooKeeperExt(zkList, 30 * 1000, this);
-      List<String> rootChildren = zooKeeperExt.getChildren("/", false);
+          new ZooKeeperExt(zkList, 30 * 1000, 0, 0, this);
+      List<String> rootChildren =
+          zooKeeperExt.getChildrenExt("/", false, false, true);
       for (String rootChild : rootChildren) {
-        if (rootChild.startsWith("_hadoopBsp")) {
+        if (rootChild.startsWith("/_hadoopBsp")) {
           List<String> children =
-              zooKeeperExt.getChildren("/" + rootChild, false);
+              zooKeeperExt.getChildrenExt(rootChild, false, false, true);
           for (String child: children) {
             if (child.contains("job_local_")) {
-              System.out.println("Cleaning up /_hadoopBsp/" +
-                  child);
-              zooKeeperExt.deleteExt(
-                  "/_hadoopBsp/" + child, -1, true);
+              System.out.println("Cleaning up " + child);
+              zooKeeperExt.deleteExt(child, -1, true);
             }
           }
         }

Modified: 
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestZooKeeperExt.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/TestZooKeeperExt.java?rev=1401120&r1=1401119&r2=1401120&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/TestZooKeeperExt.java 
(original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/TestZooKeeperExt.java 
Mon Oct 22 23:12:13 2012
@@ -33,131 +33,141 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+/**
+ * Test the ZooKeeperExt class.
+ */
 public class TestZooKeeperExt implements Watcher {
-    /** ZooKeeperExt instance */
-    private ZooKeeperExt zooKeeperExt = null;
-    /** ZooKeeper server list */
-    private String zkList = System.getProperty("prop.zookeeper.list");
-
-    public static final String BASE_PATH = "/_zooKeeperExtTest";
-    public static final String FIRST_PATH = "/_first";
-
-    public void process(WatchedEvent event) {
+  /** ZooKeeperExt instance */
+  private ZooKeeperExt zooKeeperExt = null;
+  /** ZooKeeper server list */
+  private String zkList = System.getProperty("prop.zookeeper.list");
+
+  public static final String BASE_PATH = "/_zooKeeperExtTest";
+  public static final String FIRST_PATH = "/_first";
+
+  public void process(WatchedEvent event) {
+    return;
+  }
+
+  @Before
+  public void setUp() {
+    try {
+      if (zkList == null) {
         return;
-    }
-
-    @Before
-    public void setUp() {
-        try {
-            if (zkList == null) {
-                return;
-            }
-            zooKeeperExt =
-                new ZooKeeperExt(zkList, 30 * 1000, this);
-            zooKeeperExt.deleteExt(BASE_PATH, -1, true);
-        } catch (KeeperException.NoNodeException e) {
-            System.out.println("Clean start: No node " + BASE_PATH);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @After
-    public void tearDown() {
-        if (zooKeeperExt == null) {
-            return;
-        }
-        try {
-            zooKeeperExt.close();
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Test
-    public void testCreateExt() throws KeeperException, InterruptedException {
-        if (zooKeeperExt == null) {
-            System.out.println(
-                "testCreateExt: No prop.zookeeper.list set, skipping test");
-            return;
-        }
-        System.out.println("Created: " +
-            zooKeeperExt.createExt(
-                BASE_PATH + FIRST_PATH,
-                null,
-                Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT,
-                true));
-        zooKeeperExt.delete(BASE_PATH + FIRST_PATH, -1);
-        zooKeeperExt.delete(BASE_PATH, -1);
-    }
-
-    @Test
-    public void testDeleteExt() throws KeeperException, InterruptedException {
-        if (zooKeeperExt == null) {
-            System.out.println(
-                "testDeleteExt: No prop.zookeeper.list set, skipping test");
-            return;
-        }
-        zooKeeperExt.create(BASE_PATH,
-                              null,
-                              Ids.OPEN_ACL_UNSAFE,
-                              CreateMode.PERSISTENT);
-        zooKeeperExt.create(BASE_PATH + FIRST_PATH,
-                                null,
-                                Ids.OPEN_ACL_UNSAFE,
-                                CreateMode.PERSISTENT);
-        try {
-            zooKeeperExt.deleteExt(BASE_PATH, -1, false);
-        } catch (KeeperException.NotEmptyException e) {
-            System.out.println(
-                "Correctly failed to delete since not recursive");
-        }
-        zooKeeperExt.deleteExt(BASE_PATH, -1, true);
-    }
-
-    @Test
-    public void testGetChildrenExt()
-        throws KeeperException, InterruptedException {
-        if (zooKeeperExt == null) {
-           System.out.println(
-               "testGetChildrenExt: No prop.zookeeper.list set, skipping 
test");
-           return;
-        }
-        zooKeeperExt.create(BASE_PATH,
-                              null,
-                              Ids.OPEN_ACL_UNSAFE,
-                              CreateMode.PERSISTENT);
-        zooKeeperExt.create(BASE_PATH + "/b",
-                null,
-                Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT_SEQUENTIAL);
-        zooKeeperExt.create(BASE_PATH + "/a",
-                null,
-                Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT_SEQUENTIAL);
-        zooKeeperExt.create(BASE_PATH + "/d",
-                null,
-                Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT_SEQUENTIAL);
-        zooKeeperExt.create(BASE_PATH + "/c",
-                null,
-                Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT_SEQUENTIAL);
-        List<String> fullPathList =
-            zooKeeperExt.getChildrenExt(BASE_PATH, false, false, true);
-        for (String fullPath : fullPathList) {
-            assertTrue(fullPath.contains(BASE_PATH + "/"));
-        }
-        List<String> sequenceOrderedList =
-            zooKeeperExt.getChildrenExt(BASE_PATH, false, true, true);
-        for (String fullPath : sequenceOrderedList) {
-            assertTrue(fullPath.contains(BASE_PATH + "/"));
-        }
-        assertEquals(4, sequenceOrderedList.size());
-        assertTrue(sequenceOrderedList.get(0).contains("/b"));
-        assertTrue(sequenceOrderedList.get(1).contains("/a"));
-        assertTrue(sequenceOrderedList.get(2).contains("/d"));
-        assertTrue(sequenceOrderedList.get(3).contains("/c"));
-    }
+      }
+      zooKeeperExt =
+          new ZooKeeperExt(zkList, 30 * 1000, 0, 0, this);
+      zooKeeperExt.deleteExt(BASE_PATH, -1, true);
+    } catch (KeeperException.NoNodeException e) {
+      System.out.println("Clean start: No node " + BASE_PATH);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @After
+  public void tearDown() {
+    if (zooKeeperExt == null) {
+      return;
+    }
+    try {
+      zooKeeperExt.close();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void testCreateExt() throws KeeperException, InterruptedException {
+    if (zooKeeperExt == null) {
+      System.out.println(
+          "testCreateExt: No prop.zookeeper.list set, skipping test");
+      return;
+    }
+    System.out.println("Created: " +
+                           zooKeeperExt.createExt(
+                               BASE_PATH + FIRST_PATH,
+                               null,
+                               Ids.OPEN_ACL_UNSAFE,
+                               CreateMode.PERSISTENT,
+                               true));
+    zooKeeperExt.deleteExt(BASE_PATH + FIRST_PATH, -1, false);
+    zooKeeperExt.deleteExt(BASE_PATH, -1, false);
+  }
+
+  @Test
+  public void testDeleteExt() throws KeeperException, InterruptedException {
+    if (zooKeeperExt == null) {
+      System.out.println(
+          "testDeleteExt: No prop.zookeeper.list set, skipping test");
+      return;
+    }
+    zooKeeperExt.createExt(BASE_PATH,
+                           null,
+                           Ids.OPEN_ACL_UNSAFE,
+                           CreateMode.PERSISTENT,
+                           false);
+    zooKeeperExt.createExt(BASE_PATH + FIRST_PATH,
+                           null,
+                           Ids.OPEN_ACL_UNSAFE,
+                           CreateMode.PERSISTENT,
+                           false);
+    try {
+      zooKeeperExt.deleteExt(BASE_PATH, -1, false);
+    } catch (KeeperException.NotEmptyException e) {
+      System.out.println(
+          "Correctly failed to delete since not recursive");
+    }
+    zooKeeperExt.deleteExt(BASE_PATH, -1, true);
+  }
+
+  @Test
+  public void testGetChildrenExt()
+      throws KeeperException, InterruptedException {
+    if (zooKeeperExt == null) {
+      System.out.println(
+          "testGetChildrenExt: No prop.zookeeper.list set, skipping test");
+      return;
+    }
+    zooKeeperExt.createExt(BASE_PATH,
+                           null,
+                           Ids.OPEN_ACL_UNSAFE,
+                           CreateMode.PERSISTENT,
+                           false);
+    zooKeeperExt.createExt(BASE_PATH + "/b",
+                           null,
+                           Ids.OPEN_ACL_UNSAFE,
+                           CreateMode.PERSISTENT_SEQUENTIAL,
+                           false);
+    zooKeeperExt.createExt(BASE_PATH + "/a",
+                           null,
+                           Ids.OPEN_ACL_UNSAFE,
+                           CreateMode.PERSISTENT_SEQUENTIAL,
+                           false);
+    zooKeeperExt.createExt(BASE_PATH + "/d",
+                           null,
+                           Ids.OPEN_ACL_UNSAFE,
+                           CreateMode.PERSISTENT_SEQUENTIAL,
+                           false);
+    zooKeeperExt.createExt(BASE_PATH + "/c",
+                           null,
+                           Ids.OPEN_ACL_UNSAFE,
+                           CreateMode.PERSISTENT_SEQUENTIAL,
+                           false);
+    List<String> fullPathList =
+        zooKeeperExt.getChildrenExt(BASE_PATH, false, false, true);
+    for (String fullPath : fullPathList) {
+      assertTrue(fullPath.contains(BASE_PATH + "/"));
+    }
+    List<String> sequenceOrderedList =
+        zooKeeperExt.getChildrenExt(BASE_PATH, false, true, true);
+    for (String fullPath : sequenceOrderedList) {
+      assertTrue(fullPath.contains(BASE_PATH + "/"));
+    }
+    assertEquals(4, sequenceOrderedList.size());
+    assertTrue(sequenceOrderedList.get(0).contains("/b"));
+    assertTrue(sequenceOrderedList.get(1).contains("/a"));
+    assertTrue(sequenceOrderedList.get(2).contains("/d"));
+    assertTrue(sequenceOrderedList.get(3).contains("/c"));
+  }
 }


Reply via email to