Author: aching
Date: Mon Oct 22 23:12:13 2012
New Revision: 1401120
URL: http://svn.apache.org/viewvc?rev=1401120&view=rev
Log:
GIRAPH-382: ZooKeeperExt should handle ConnectionLossException by
retrying. (aching)
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestZooKeeperExt.java
Modified: giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1401120&r1=1401119&r2=1401120&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Mon Oct 22 23:12:13 2012
@@ -1,6 +1,9 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-382: ZooKeeperExt should handle ConnectionLossException by
+ retrying. (aching)
+
GIRAPH-381: Ensure we get the original exception from
GraphMapper#run(). (aching)
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java?rev=1401120&r1=1401119&r2=1401120&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
Mon Oct 22 23:12:13 2012
@@ -161,6 +161,24 @@ public class GiraphConfiguration extends
/** Local ZooKeeper directory to use */
public static final String ZOOKEEPER_DIR = "giraph.zkDir";
+ /** Max attempts for handling ZooKeeper connection loss */
+ public static final String ZOOKEEPER_OPS_MAX_ATTEMPTS =
+ "giraph.zkOpsMaxAttempts";
+ /** Default of 3 attempts for handling ZooKeeper connection loss */
+ public static final int ZOOKEEPER_OPS_MAX_ATTEMPTS_DEFAULT = 3;
+
+ /**
+ * Msecs to wait before retrying a failed ZooKeeper op due to connection
+ * loss.
+ */
+ public static final String ZOOKEEPER_OPS_RETRY_WAIT_MSECS =
+ "giraph.zkOpsRetryWaitMsecs";
+ /**
+ * Default to wait 5 seconds before retrying a failed ZooKeeper op due to
+ * connection loss.
+ */
+ public static final int ZOOKEEPER_OPS_RETRY_WAIT_MSECS_DEFAULT = 5 * 1000;
+
/** TCP backlog (defaults to number of workers) */
public static final String TCP_BACKLOG = "giraph.tcpBacklog";
/**
@@ -796,6 +814,16 @@ public class GiraphConfiguration extends
ZOOKEEPER_SESSION_TIMEOUT_DEFAULT);
}
+ public int getZookeeperOpsMaxAttempts() {
+ return getInt(ZOOKEEPER_OPS_MAX_ATTEMPTS,
+ ZOOKEEPER_OPS_MAX_ATTEMPTS_DEFAULT);
+ }
+
+ public int getZookeeperOpsRetryWaitMsecs() {
+ return getInt(ZOOKEEPER_OPS_RETRY_WAIT_MSECS,
+ ZOOKEEPER_OPS_RETRY_WAIT_MSECS_DEFAULT);
+ }
+
public boolean getNettyServerUseExecutionHandler() {
return getBoolean(NETTY_SERVER_USE_EXECUTION_HANDLER,
NETTY_SERVER_USE_EXECUTION_HANDLER_DEFAULT);
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java?rev=1401120&r1=1401119&r2=1401120&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java
(original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java
Mon Oct 22 23:12:13 2012
@@ -333,8 +333,12 @@ public abstract class BspService<I exten
", " + getTaskPartition() + " on " + serverPortList);
}
try {
- this.zk = new ZooKeeperExt(
- serverPortList, sessionMsecTimeout, this, context);
+ this.zk = new ZooKeeperExt(serverPortList,
+ sessionMsecTimeout,
+ conf.getZookeeperOpsMaxAttempts(),
+ conf.getZookeeperOpsRetryWaitMsecs(),
+ this,
+ context);
connectedEvent.waitForever();
this.fs = FileSystem.get(getConfiguration());
} catch (IOException e) {
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1401120&r1=1401119&r2=1401120&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
Mon Oct 22 23:12:13 2012
@@ -586,10 +586,11 @@ public class BspServiceMaster<I extends
// Let workers know they can start trying to load the input splits
try {
- getZkExt().create(inputSplitsAllReadyPath,
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ getZkExt().createExt(inputSplitsAllReadyPath,
+ null,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ false);
} catch (KeeperException.NodeExistsException e) {
LOG.info("createInputSplits: Node " +
inputSplitsAllReadyPath + " already exists.");
@@ -1319,10 +1320,11 @@ public class BspServiceMaster<I extends
"(currently not supported)");
}
try {
- getZkExt().create(inputSplitsAllDonePath,
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ getZkExt().createExt(inputSplitsAllDonePath,
+ null,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ false);
} catch (KeeperException.NodeExistsException e) {
LOG.info("coordinateInputSplits: Node " +
inputSplitsAllDonePath + " already exists.");
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1401120&r1=1401119&r2=1401120&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
Mon Oct 22 23:12:13 2012
@@ -571,7 +571,7 @@ else[HADOOP_NON_SECURE]*/
LOG.error("unregisterHealth: Got failure, unregistering health on " +
myHealthZnode + " on superstep " + getSuperstep());
try {
- getZkExt().delete(myHealthZnode, -1);
+ getZkExt().deleteExt(myHealthZnode, -1, false);
} catch (InterruptedException e) {
throw new IllegalStateException(
"unregisterHealth: InterruptedException - Couldn't delete " +
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java?rev=1401120&r1=1401119&r2=1401120&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java
(original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java
Mon Oct 22 23:12:13 2012
@@ -37,16 +37,23 @@ import org.apache.zookeeper.ZooKeeper;
/**
* ZooKeeper provides only atomic operations. ZooKeeperExt provides additional
- * non-atomic operations that are useful. All methods of this class should
- * be thread-safe.
+ * non-atomic operations that are useful. It also provides wrappers to
+ * deal with ConnectionLossException. All methods of this class
+ * should be thread-safe.
*/
-public class ZooKeeperExt extends ZooKeeper {
+public class ZooKeeperExt {
/** Internal logger */
private static final Logger LOG = Logger.getLogger(ZooKeeperExt.class);
/** Length of the ZK sequence number */
private static final int SEQUENCE_NUMBER_LENGTH = 10;
+ /** Internal ZooKeeper */
+ private final ZooKeeper zooKeeper;
/** Ensure we have progress */
private final Progressable progressable;
+ /** Number of max attempts to retry when failing due to connection loss */
+ private final int maxRetryAttempts;
+ /** Milliseconds to wait before trying again due to connection loss */
+ private final int retryWaitMsecs;
/**
* Constructor to connect to ZooKeeper, does not make progress
@@ -61,14 +68,20 @@ public class ZooKeeperExt extends ZooKee
* "/foo/bar" would result in operations being run on
* "/app/a/foo/bar" (from the server perspective).
* @param sessionTimeout Session timeout in milliseconds
+ * @param maxRetryAttempts Max retry attempts during connection loss
+ * @param retryWaitMsecs Msecs to wait when retrying due to connection
+ * loss
* @param watcher A watcher object which will be notified of state changes,
* may also be notified for node events
* @throws IOException
*/
public ZooKeeperExt(String connectString,
int sessionTimeout,
+ int maxRetryAttempts,
+ int retryWaitMsecs,
Watcher watcher) throws IOException {
- this(connectString, sessionTimeout, watcher, null);
+ this(connectString, sessionTimeout, maxRetryAttempts,
+ retryWaitMsecs, watcher, null);
}
/**
@@ -84,6 +97,9 @@ public class ZooKeeperExt extends ZooKee
* "/foo/bar" would result in operations being run on
* "/app/a/foo/bar" (from the server perspective).
* @param sessionTimeout Session timeout in milliseconds
+ * @param maxRetryAttempts Max retry attempts during connection loss
+ * @param retryWaitMsecs Msecs to wait when retrying due to connection
+ * loss
* @param watcher A watcher object which will be notified of state changes,
* may also be notified for node events
* @param progressable Makes progress for longer operations
@@ -91,10 +107,14 @@ public class ZooKeeperExt extends ZooKee
*/
public ZooKeeperExt(String connectString,
int sessionTimeout,
+ int maxRetryAttempts,
+ int retryWaitMsecs,
Watcher watcher,
Progressable progressable) throws IOException {
- super(connectString, sessionTimeout, watcher);
+ this.zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
this.progressable = progressable;
+ this.maxRetryAttempts = maxRetryAttempts;
+ this.retryWaitMsecs = retryWaitMsecs;
}
/**
@@ -121,34 +141,46 @@ public class ZooKeeperExt extends ZooKee
LOG.debug("createExt: Creating path " + path);
}
- if (!recursive) {
- return create(path, data, acl, createMode);
- }
-
- try {
- return create(path, data, acl, createMode);
- } catch (KeeperException.NoNodeException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("createExt: Cannot directly create node " + path);
- }
- }
-
- int pos = path.indexOf("/", 1);
- for (; pos != -1; pos = path.indexOf("/", pos + 1)) {
+ int attempt = 0;
+ while (attempt < maxRetryAttempts) {
try {
- if (progressable != null) {
- progressable.progress();
+ if (!recursive) {
+ return zooKeeper.create(path, data, acl, createMode);
}
- create(
- path.substring(0, pos), null, acl, CreateMode.PERSISTENT);
- } catch (KeeperException.NodeExistsException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("createExt: Znode " + path.substring(0, pos) +
- " already exists");
+
+ try {
+ return zooKeeper.create(path, data, acl, createMode);
+ } catch (KeeperException.NoNodeException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("createExt: Cannot directly create node " + path);
+ }
+ }
+
+ int pos = path.indexOf("/", 1);
+ for (; pos != -1; pos = path.indexOf("/", pos + 1)) {
+ try {
+ if (progressable != null) {
+ progressable.progress();
+ }
+ zooKeeper.create(
+ path.substring(0, pos), null, acl, CreateMode.PERSISTENT);
+ } catch (KeeperException.NodeExistsException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("createExt: Znode " + path.substring(0, pos) +
+ " already exists");
+ }
+ }
}
+ return zooKeeper.create(path, data, acl, createMode);
+ } catch (KeeperException.ConnectionLossException e) {
+ LOG.warn("createExt: Connection loss on attempt " + attempt + ", " +
+ "waiting " + retryWaitMsecs + " msecs before retrying.", e);
}
+ ++attempt;
+ Thread.sleep(retryWaitMsecs);
}
- return create(path, data, acl, createMode);
+ throw new IllegalStateException("createExt: Failed to create " + path +
+ " after " + attempt + " tries!");
}
/**
@@ -217,7 +249,7 @@ public class ZooKeeperExt extends ZooKee
if (LOG.isDebugEnabled()) {
LOG.debug("createOrSet: Node exists on path " + path);
}
- setStat = setData(path, data, version);
+ setStat = zooKeeper.setData(path, data, version);
}
return new PathStat(createdPath, setStat);
}
@@ -263,29 +295,189 @@ public class ZooKeeperExt extends ZooKee
*/
public void deleteExt(final String path, int version, boolean recursive)
throws InterruptedException, KeeperException {
- if (!recursive) {
- delete(path, version);
- return;
+ int attempt = 0;
+ while (attempt < maxRetryAttempts) {
+ try {
+ if (!recursive) {
+ zooKeeper.delete(path, version);
+ return;
+ }
+
+ try {
+ zooKeeper.delete(path, version);
+ return;
+ } catch (KeeperException.NotEmptyException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("deleteExt: Cannot directly remove node " + path);
+ }
+ }
+
+ List<String> childList = zooKeeper.getChildren(path, false);
+ for (String child : childList) {
+ if (progressable != null) {
+ progressable.progress();
+ }
+ deleteExt(path + "/" + child, -1, true);
+ }
+
+ zooKeeper.delete(path, version);
+ return;
+ } catch (KeeperException.ConnectionLossException e) {
+ LOG.warn("deleteExt: Connection loss on attempt " +
+ attempt + ", waiting " + retryWaitMsecs +
+ " msecs before retrying.", e);
+ }
+ ++attempt;
+ Thread.sleep(retryWaitMsecs);
}
+ throw new IllegalStateException("deleteExt: Failed to delete " + path +
+ " after " + attempt + " tries!");
+ }
- try {
- delete(path, version);
- return;
- } catch (KeeperException.NotEmptyException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("deleteExt: Cannot directly remove node " + path);
+ /**
+ * Return the stat of the node of the given path. Return null if no such a
+ * node exists.
+ * <p>
+ * If the watch is true and the call is successful (no exception is thrown),
+ * a watch will be left on the node with the given path. The watch will be
+ * triggered by a successful operation that creates/delete the node or sets
+ * the data on the node.
+ *
+ * @param path
+ * the node path
+ * @param watch
+ * whether need to watch this node
+ * @return the stat of the node of the given path; return null if no such a
+ * node exists.
+ * @throws KeeperException If the server signals an error
+ * @throws InterruptedException If the server transaction is interrupted.
+ */
+ public Stat exists(String path, boolean watch) throws KeeperException,
+ InterruptedException {
+ int attempt = 0;
+ while (attempt < maxRetryAttempts) {
+ try {
+ return zooKeeper.exists(path, watch);
+ } catch (KeeperException.ConnectionLossException e) {
+ LOG.warn("exists: Connection loss on attempt " +
+ attempt + ", waiting " + retryWaitMsecs +
+ " msecs before retrying.", e);
}
+ ++attempt;
+ Thread.sleep(retryWaitMsecs);
}
+ throw new IllegalStateException("exists: Failed to check " + path +
+ " after " + attempt + " tries!");
+ }
- List<String> childList = getChildren(path, false);
- for (String child : childList) {
- if (progressable != null) {
- progressable.progress();
+ /**
+ * Return the stat of the node of the given path. Return null if no such a
+ * node exists.
+ * <p>
+ * If the watch is non-null and the call is successful (no exception is
+ * thrown), a watch will be left on the node with the given path. The
+ * watch will be triggered by a successful operation that
+ * creates/delete the node or sets the data on the node.
+ *
+ * @param path the node path
+ * @param watcher explicit watcher
+ * @return the stat of the node of the given path; return null if no such a
+ * node exists.
+ * @throws KeeperException If the server signals an error
+ * @throws InterruptedException If the server transaction is interrupted.
+ * @throws IllegalArgumentException if an invalid path is specified
+ */
+ public Stat exists(final String path, Watcher watcher)
+ throws KeeperException, InterruptedException {
+ int attempt = 0;
+ while (attempt < maxRetryAttempts) {
+ try {
+ return zooKeeper.exists(path, watcher);
+ } catch (KeeperException.ConnectionLossException e) {
+ LOG.warn("exists: Connection loss on attempt " +
+ attempt + ", waiting " + retryWaitMsecs +
+ " msecs before retrying.", e);
}
- deleteExt(path + "/" + child, -1, true);
+ ++attempt;
+ Thread.sleep(retryWaitMsecs);
}
+ throw new IllegalStateException("exists: Failed to check " + path +
+ " after " + attempt + " tries!");
+ }
- delete(path, version);
+ /**
+ * Return the data and the stat of the node of the given path.
+ * <p>
+ * If the watch is non-null and the call is successful (no exception is
+ * thrown), a watch will be left on the node with the given path. The watch
+ * will be triggered by a successful operation that sets data on the node, or
+ * deletes the node.
+ * <p>
+ * A KeeperException with error code KeeperException.NoNode will be thrown
+ * if no node with the given path exists.
+ *
+ * @param path the given path
+ * @param watcher explicit watcher
+ * @param stat the stat of the node
+ * @return the data of the node
+ * @throws KeeperException If the server signals an error with a non-zero
+ * error code
+ * @throws InterruptedException If the server transaction is interrupted.
+ * @throws IllegalArgumentException if an invalid path is specified
+ */
+ public byte[] getData(final String path, Watcher watcher, Stat stat)
+ throws KeeperException, InterruptedException {
+ int attempt = 0;
+ while (attempt < maxRetryAttempts) {
+ try {
+ return zooKeeper.getData(path, watcher, stat);
+ } catch (KeeperException.ConnectionLossException e) {
+ LOG.warn("getData: Connection loss on attempt " +
+ attempt + ", waiting " + retryWaitMsecs +
+ " msecs before retrying.", e);
+ }
+ ++attempt;
+ Thread.sleep(retryWaitMsecs);
+ }
+ throw new IllegalStateException("getData: Failed to get " + path +
+ " after " + attempt + " tries!");
+ }
+
+ /**
+ * Return the data and the stat of the node of the given path.
+ * <p>
+ * If the watch is true and the call is successful (no exception is
+ * thrown), a watch will be left on the node with the given path. The watch
+ * will be triggered by a successful operation that sets data on the node, or
+ * deletes the node.
+ * <p>
+ * A KeeperException with error code KeeperException.NoNode will be thrown
+ * if no node with the given path exists.
+ *
+ * @param path the given path
+ * @param watch whether need to watch this node
+ * @param stat the stat of the node
+ * @return the data of the node
+ * @throws KeeperException If the server signals an error with a non-zero
+ * error code
+ * @throws InterruptedException If the server transaction is interrupted.
+ */
+ public byte[] getData(String path, boolean watch, Stat stat)
+ throws KeeperException, InterruptedException {
+ int attempt = 0;
+ while (attempt < maxRetryAttempts) {
+ try {
+ return zooKeeper.getData(path, watch, stat);
+ } catch (KeeperException.ConnectionLossException e) {
+ LOG.warn("getData: Connection loss on attempt " +
+ attempt + ", waiting " + retryWaitMsecs +
+ " msecs before retrying.", e);
+ }
+ ++attempt;
+ Thread.sleep(retryWaitMsecs);
+ }
+ throw new IllegalStateException("getData: Failed to get " + path +
+ " after " + attempt + " tries!");
}
/**
@@ -306,37 +498,62 @@ public class ZooKeeperExt extends ZooKee
boolean watch,
boolean sequenceSorted,
boolean fullPath) throws KeeperException, InterruptedException {
- List<String> childList = getChildren(path, watch);
- /* Sort children according to the sequence number, if desired */
- if (sequenceSorted) {
- Collections.sort(childList, new Comparator<String>() {
- public int compare(String s1, String s2) {
- if ((s1.length() <= SEQUENCE_NUMBER_LENGTH) ||
- (s2.length() <= SEQUENCE_NUMBER_LENGTH)) {
- throw new RuntimeException(
- "getChildrenExt: Invalid length for sequence " +
- " sorting > " +
- SEQUENCE_NUMBER_LENGTH +
- " for s1 (" +
- s1.length() + ") or s2 (" + s2.length() + ")");
+ int attempt = 0;
+ while (attempt < maxRetryAttempts) {
+ try {
+ List<String> childList = zooKeeper.getChildren(path, watch);
+ /* Sort children according to the sequence number, if desired */
+ if (sequenceSorted) {
+ Collections.sort(childList, new Comparator<String>() {
+ public int compare(String s1, String s2) {
+ if ((s1.length() <= SEQUENCE_NUMBER_LENGTH) ||
+ (s2.length() <= SEQUENCE_NUMBER_LENGTH)) {
+ throw new RuntimeException(
+ "getChildrenExt: Invalid length for sequence " +
+ " sorting > " +
+ SEQUENCE_NUMBER_LENGTH +
+ " for s1 (" +
+ s1.length() + ") or s2 (" + s2.length() + ")");
+ }
+ int s1sequenceNumber = Integer.parseInt(
+ s1.substring(s1.length() -
+ SEQUENCE_NUMBER_LENGTH));
+ int s2sequenceNumber = Integer.parseInt(
+ s2.substring(s2.length() -
+ SEQUENCE_NUMBER_LENGTH));
+ return s1sequenceNumber - s2sequenceNumber;
+ }
+ });
+ }
+ if (fullPath) {
+ List<String> fullChildList = new ArrayList<String>();
+ for (String child : childList) {
+ fullChildList.add(path + "/" + child);
}
- int s1sequenceNumber = Integer.parseInt(
- s1.substring(s1.length() -
- SEQUENCE_NUMBER_LENGTH));
- int s2sequenceNumber = Integer.parseInt(
- s2.substring(s2.length() -
- SEQUENCE_NUMBER_LENGTH));
- return s1sequenceNumber - s2sequenceNumber;
+ return fullChildList;
}
- });
- }
- if (fullPath) {
- List<String> fullChildList = new ArrayList<String>();
- for (String child : childList) {
- fullChildList.add(path + "/" + child);
+ return childList;
+ } catch (KeeperException.ConnectionLossException e) {
+ LOG.warn("getChildrenExt: Connection loss on attempt " +
+ attempt + ", waiting " + retryWaitMsecs +
+ " msecs before retrying.", e);
}
- return fullChildList;
+ ++attempt;
+ Thread.sleep(retryWaitMsecs);
}
- return childList;
+ throw new IllegalStateException("createExt: Failed to create " + path +
+ " after " + attempt + " tries!");
+ }
+
+ /**
+ * Close this client object. Once the client is closed, its session becomes
+ * invalid. All the ephemeral nodes in the ZooKeeper server associated with
+ * the session will be removed. The watches left on those nodes (and on
+ * their parents) will be triggered.
+ *
+ * @throws InterruptedException
+ */
+ public void close() throws InterruptedException {
+ zooKeeper.close();
}
}
Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java?rev=1401120&r1=1401119&r2=1401120&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java Mon Oct 22
23:12:13 2012
@@ -346,25 +346,23 @@ public class BspCase implements Watcher
numWorkers = 3;
}
try {
-
cleanupTemporaryFiles();
if (zkList == null) {
return;
}
ZooKeeperExt zooKeeperExt =
- new ZooKeeperExt(zkList, 30 * 1000, this);
- List<String> rootChildren = zooKeeperExt.getChildren("/", false);
+ new ZooKeeperExt(zkList, 30 * 1000, 0, 0, this);
+ List<String> rootChildren =
+ zooKeeperExt.getChildrenExt("/", false, false, true);
for (String rootChild : rootChildren) {
- if (rootChild.startsWith("_hadoopBsp")) {
+ if (rootChild.startsWith("/_hadoopBsp")) {
List<String> children =
- zooKeeperExt.getChildren("/" + rootChild, false);
+ zooKeeperExt.getChildrenExt(rootChild, false, false, true);
for (String child: children) {
if (child.contains("job_local_")) {
- System.out.println("Cleaning up /_hadoopBsp/" +
- child);
- zooKeeperExt.deleteExt(
- "/_hadoopBsp/" + child, -1, true);
+ System.out.println("Cleaning up " + child);
+ zooKeeperExt.deleteExt(child, -1, true);
}
}
}
Modified:
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestZooKeeperExt.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/TestZooKeeperExt.java?rev=1401120&r1=1401119&r2=1401120&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/TestZooKeeperExt.java
(original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/TestZooKeeperExt.java
Mon Oct 22 23:12:13 2012
@@ -33,131 +33,141 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+/**
+ * Test the ZooKeeperExt class.
+ */
public class TestZooKeeperExt implements Watcher {
- /** ZooKeeperExt instance */
- private ZooKeeperExt zooKeeperExt = null;
- /** ZooKeeper server list */
- private String zkList = System.getProperty("prop.zookeeper.list");
-
- public static final String BASE_PATH = "/_zooKeeperExtTest";
- public static final String FIRST_PATH = "/_first";
-
- public void process(WatchedEvent event) {
+ /** ZooKeeperExt instance */
+ private ZooKeeperExt zooKeeperExt = null;
+ /** ZooKeeper server list */
+ private String zkList = System.getProperty("prop.zookeeper.list");
+
+ public static final String BASE_PATH = "/_zooKeeperExtTest";
+ public static final String FIRST_PATH = "/_first";
+
+ public void process(WatchedEvent event) {
+ return;
+ }
+
+ @Before
+ public void setUp() {
+ try {
+ if (zkList == null) {
return;
- }
-
- @Before
- public void setUp() {
- try {
- if (zkList == null) {
- return;
- }
- zooKeeperExt =
- new ZooKeeperExt(zkList, 30 * 1000, this);
- zooKeeperExt.deleteExt(BASE_PATH, -1, true);
- } catch (KeeperException.NoNodeException e) {
- System.out.println("Clean start: No node " + BASE_PATH);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @After
- public void tearDown() {
- if (zooKeeperExt == null) {
- return;
- }
- try {
- zooKeeperExt.close();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Test
- public void testCreateExt() throws KeeperException, InterruptedException {
- if (zooKeeperExt == null) {
- System.out.println(
- "testCreateExt: No prop.zookeeper.list set, skipping test");
- return;
- }
- System.out.println("Created: " +
- zooKeeperExt.createExt(
- BASE_PATH + FIRST_PATH,
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true));
- zooKeeperExt.delete(BASE_PATH + FIRST_PATH, -1);
- zooKeeperExt.delete(BASE_PATH, -1);
- }
-
- @Test
- public void testDeleteExt() throws KeeperException, InterruptedException {
- if (zooKeeperExt == null) {
- System.out.println(
- "testDeleteExt: No prop.zookeeper.list set, skipping test");
- return;
- }
- zooKeeperExt.create(BASE_PATH,
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- zooKeeperExt.create(BASE_PATH + FIRST_PATH,
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- try {
- zooKeeperExt.deleteExt(BASE_PATH, -1, false);
- } catch (KeeperException.NotEmptyException e) {
- System.out.println(
- "Correctly failed to delete since not recursive");
- }
- zooKeeperExt.deleteExt(BASE_PATH, -1, true);
- }
-
- @Test
- public void testGetChildrenExt()
- throws KeeperException, InterruptedException {
- if (zooKeeperExt == null) {
- System.out.println(
- "testGetChildrenExt: No prop.zookeeper.list set, skipping
test");
- return;
- }
- zooKeeperExt.create(BASE_PATH,
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- zooKeeperExt.create(BASE_PATH + "/b",
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT_SEQUENTIAL);
- zooKeeperExt.create(BASE_PATH + "/a",
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT_SEQUENTIAL);
- zooKeeperExt.create(BASE_PATH + "/d",
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT_SEQUENTIAL);
- zooKeeperExt.create(BASE_PATH + "/c",
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT_SEQUENTIAL);
- List<String> fullPathList =
- zooKeeperExt.getChildrenExt(BASE_PATH, false, false, true);
- for (String fullPath : fullPathList) {
- assertTrue(fullPath.contains(BASE_PATH + "/"));
- }
- List<String> sequenceOrderedList =
- zooKeeperExt.getChildrenExt(BASE_PATH, false, true, true);
- for (String fullPath : sequenceOrderedList) {
- assertTrue(fullPath.contains(BASE_PATH + "/"));
- }
- assertEquals(4, sequenceOrderedList.size());
- assertTrue(sequenceOrderedList.get(0).contains("/b"));
- assertTrue(sequenceOrderedList.get(1).contains("/a"));
- assertTrue(sequenceOrderedList.get(2).contains("/d"));
- assertTrue(sequenceOrderedList.get(3).contains("/c"));
- }
+ }
+ zooKeeperExt =
+ new ZooKeeperExt(zkList, 30 * 1000, 0, 0, this);
+ zooKeeperExt.deleteExt(BASE_PATH, -1, true);
+ } catch (KeeperException.NoNodeException e) {
+ System.out.println("Clean start: No node " + BASE_PATH);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @After
+ public void tearDown() {
+ if (zooKeeperExt == null) {
+ return;
+ }
+ try {
+ zooKeeperExt.close();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testCreateExt() throws KeeperException, InterruptedException {
+ if (zooKeeperExt == null) {
+ System.out.println(
+ "testCreateExt: No prop.zookeeper.list set, skipping test");
+ return;
+ }
+ System.out.println("Created: " +
+ zooKeeperExt.createExt(
+ BASE_PATH + FIRST_PATH,
+ null,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true));
+ zooKeeperExt.deleteExt(BASE_PATH + FIRST_PATH, -1, false);
+ zooKeeperExt.deleteExt(BASE_PATH, -1, false);
+ }
+
+ @Test
+ public void testDeleteExt() throws KeeperException, InterruptedException {
+ if (zooKeeperExt == null) {
+ System.out.println(
+ "testDeleteExt: No prop.zookeeper.list set, skipping test");
+ return;
+ }
+ zooKeeperExt.createExt(BASE_PATH,
+ null,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ false);
+ zooKeeperExt.createExt(BASE_PATH + FIRST_PATH,
+ null,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ false);
+ try {
+ zooKeeperExt.deleteExt(BASE_PATH, -1, false);
+ } catch (KeeperException.NotEmptyException e) {
+ System.out.println(
+ "Correctly failed to delete since not recursive");
+ }
+ zooKeeperExt.deleteExt(BASE_PATH, -1, true);
+ }
+
+ @Test
+ public void testGetChildrenExt()
+ throws KeeperException, InterruptedException {
+ if (zooKeeperExt == null) {
+ System.out.println(
+ "testGetChildrenExt: No prop.zookeeper.list set, skipping test");
+ return;
+ }
+ zooKeeperExt.createExt(BASE_PATH,
+ null,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ false);
+ zooKeeperExt.createExt(BASE_PATH + "/b",
+ null,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL,
+ false);
+ zooKeeperExt.createExt(BASE_PATH + "/a",
+ null,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL,
+ false);
+ zooKeeperExt.createExt(BASE_PATH + "/d",
+ null,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL,
+ false);
+ zooKeeperExt.createExt(BASE_PATH + "/c",
+ null,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL,
+ false);
+ List<String> fullPathList =
+ zooKeeperExt.getChildrenExt(BASE_PATH, false, false, true);
+ for (String fullPath : fullPathList) {
+ assertTrue(fullPath.contains(BASE_PATH + "/"));
+ }
+ List<String> sequenceOrderedList =
+ zooKeeperExt.getChildrenExt(BASE_PATH, false, true, true);
+ for (String fullPath : sequenceOrderedList) {
+ assertTrue(fullPath.contains(BASE_PATH + "/"));
+ }
+ assertEquals(4, sequenceOrderedList.size());
+ assertTrue(sequenceOrderedList.get(0).contains("/b"));
+ assertTrue(sequenceOrderedList.get(1).contains("/a"));
+ assertTrue(sequenceOrderedList.get(2).contains("/d"));
+ assertTrue(sequenceOrderedList.get(3).contains("/c"));
+ }
}