Updated Branches: refs/heads/master 13d19e914 -> 4a46ae055
[HELIX-210] Add support to set data with expect version in BaseDataAccessor, rb=13581 Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/4a46ae05 Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/4a46ae05 Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/4a46ae05 Branch: refs/heads/master Commit: 4a46ae055a877737c8eeda63e7c0917d48dc9d2b Parents: 13d19e9 Author: zzhang <[email protected]> Authored: Fri Aug 16 13:53:40 2013 -0700 Committer: zzhang <[email protected]> Committed: Fri Aug 16 13:53:40 2013 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/BaseDataAccessor.java | 43 +- .../helix/manager/zk/HelixGroupCommit.java | 8 +- .../helix/manager/zk/ZkBaseDataAccessor.java | 173 +++++--- .../manager/zk/ZkCacheBaseDataAccessor.java | 38 +- .../src/test/java/org/apache/helix/Mocks.java | 7 + .../manager/zk/TestZkBaseDataAccessor.java | 413 ++++++++++++++----- 6 files changed, 494 insertions(+), 188 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4a46ae05/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java index 7c65460..9154724 100644 --- a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java @@ -36,17 +36,18 @@ public interface BaseDataAccessor<T> /** * This will always attempt to create the znode, if it exists it will return false. Will * create parents if they do not exist. For performance reasons, it may try to create - * child first and only if it fails it will try to create parent + * child first and only if it fails it will try to create parents * * @param path path to the ZNode to create * @param record the data to write to the ZNode + * @param options Set the type of ZNode see the valid values in {@link AccessOption} * @return true if creation succeeded, false otherwise (e.g. if the ZNode exists) */ boolean create(String path, T record, int options); /** - * This will always attempt to set the data on existing node. If the znode does not - * exist it will create it. + * This will always attempt to set the data on existing node. If the ZNode does not + * exist it will create it and all its parents ZNodes if necessary * * @param path path to the ZNode to set * @param record the data to write to the ZNode @@ -56,18 +57,31 @@ public interface BaseDataAccessor<T> boolean set(String path, T record, int options); /** - * This will attempt to merge with existing data by calling znrecord.merge and if it - * does not exist it will create it znode + * This will attempt to set the data on existing node only if version matches. + * If the ZNode does not exist it will create it and all its parent ZNodes only if expected version is -1 + * + * @param path path to the ZNode to set + * @param record the data to write to the ZNode + * @param options Set the type of ZNode see the valid values in {@link AccessOption} + * @param expectVersion the expected version of the data to be overwritten, -1 means match any version + * @return true if data was successfully set, false otherwise (e.g. if the version mismatches) + */ + boolean set(String path, T record, int expectVersion, int options); + + /** + * This will attempt to update the data using the updater. If the ZNode + * does not exist it will create it and all its parent ZNodes. + * Updater will be invoked with null value if node does not exist. * * @param path path to the ZNode to update * @param updater an update routine for the data to merge in * @param options Set the type of ZNode see the valid values in {@link AccessOption} - * @return true if data merge succeeded, false otherwise + * @return true if data update succeeded, false otherwise */ boolean update(String path, DataUpdater<T> updater, int options); /** - * This will remove znode and all its child nodes if any + * This will remove the ZNode and all its descendants if any * * @param path path to the root ZNode to remove * @param options Set the type of ZNode see the valid values in {@link AccessOption} @@ -79,8 +93,8 @@ public interface BaseDataAccessor<T> * Use it when creating children under a parent node. This will use async api for better * performance. If the child already exists it will return false. * - * @param parentPath paths to the immediate parent ZNodes - * @param record List of data to write to each of the children + * @param paths the paths to the children ZNodes + * @param record List of data to write to each of the path * @param options Set the type of ZNode see the valid values in {@link AccessOption} * @return For each child: true if creation succeeded, false otherwise (e.g. if the child exists) */ @@ -90,7 +104,7 @@ public interface BaseDataAccessor<T> * can set multiple children under a parent node. This will use async api for better * performance. If this child does not exist it will create it. * - * @param parentPath paths to the immediate parent ZNodes + * @param paths the paths to the children ZNodes * @param record List of data with which to overwrite the corresponding ZNodes * @param options Set the type of ZNode see the valid values in {@link AccessOption} * @return For each child: true if the data was set, false otherwise @@ -101,10 +115,10 @@ public interface BaseDataAccessor<T> * Can update multiple nodes using async api for better performance. If a child does not * exist it will create it. * - * @param parentPath paths to the immediate parent ZNodes - * @param updaters List of update routines for records to merge in + * @param the paths to the children ZNodes + * @param updaters List of update routines for records to update * @param options Set the type of ZNode see the valid values in {@link AccessOption} - * @return For each child, true if the data was merged in, false otherwise + * @return For each child, true if the data is updated successfully, false otherwise */ boolean[] updateChildren(List<String> paths, List<DataUpdater<T>> updaters, int options); @@ -121,6 +135,7 @@ public interface BaseDataAccessor<T> * Get the {@link T} corresponding to the path * * @param path path to the ZNode + * @param stat retrieve the stat of the ZNode * @param options Set the type of ZNode see the valid values in {@link AccessOption} * @return the record data stored at the ZNode */ @@ -130,6 +145,7 @@ public interface BaseDataAccessor<T> * Get List of {@link T} corresponding to the paths using async api * * @param paths paths to the ZNodes + * @param stats retrieve a list of stats for the ZNodes * @param options Set the type of ZNode see the valid values in {@link AccessOption} * @return List of record data stored at each ZNode */ @@ -224,6 +240,7 @@ public interface BaseDataAccessor<T> void unsubscribeChildChanges(String path, IZkChildListener listener); /** + * TODO refactor this. reset() should not be in data accessor * reset the cache if any, when session expiry happens */ void reset(); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4a46ae05/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java index 6de5d82..e95c2bf 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.I0Itec.zkclient.DataUpdater; import org.I0Itec.zkclient.exception.ZkBadVersionException; import org.I0Itec.zkclient.exception.ZkNoNodeException; +import org.apache.helix.AccessOption; import org.apache.log4j.Logger; import org.apache.zookeeper.data.Stat; @@ -108,6 +109,9 @@ public class HelixGroupCommit<T> T merged = null; Stat readStat = new Stat(); + + // to create a new znode, we need set version to -1 + readStat.setVersion(-1); try { // accessor will fallback to zk if not found in cache @@ -115,7 +119,7 @@ public class HelixGroupCommit<T> } catch (ZkNoNodeException e) { - // OK. + // OK } // updater should handler merged == null @@ -149,7 +153,7 @@ public class HelixGroupCommit<T> it.remove(); } // System.out.println("size:"+ processed.size()); - accessor.set(mergedKey, merged, null, null, readStat.getVersion(), options); + accessor.set(mergedKey, merged, readStat.getVersion(), options); } catch (ZkBadVersionException e) { http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4a46ae05/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java index 77e2b5a..6f4e65c 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java @@ -19,7 +19,6 @@ package org.apache.helix.manager.zk; * under the License. */ -import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -56,6 +55,31 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { OK, NODE_EXISTS, ERROR } + + /** + * struct holding return information + * + */ + public class AccessResult + { + RetCode _retCode; + List<String> _pathCreated; + + Stat _stat; + + /** + * used by update only + */ + T _updatedValue; + + public AccessResult() + { + _retCode = RetCode.ERROR; + _pathCreated = new ArrayList<String>(); + _stat = new Stat(); + _updatedValue = null; + } + } private static Logger LOG = Logger.getLogger(ZkBaseDataAccessor.class); @@ -72,19 +96,22 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> @Override public boolean create(String path, T record, int options) { - return create(path, record, null, options) == RetCode.OK; + AccessResult result = doCreate(path, record, options); + return result._retCode == RetCode.OK; } /** * sync create */ - public RetCode create(String path, T record, List<String> pathCreated, int options) + public AccessResult doCreate(String path, T record, int options) { + AccessResult result = new AccessResult(); CreateMode mode = AccessOption.getMode(options); if (mode == null) { LOG.error("Invalid create mode. options: " + options); - return RetCode.ERROR; + result._retCode = RetCode.ERROR; + return result; } boolean retry; @@ -94,10 +121,10 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> try { _zkClient.create(path, record, mode); - if (pathCreated != null) - pathCreated.add(path); + result._pathCreated.add(path); - return RetCode.OK; + result._retCode = RetCode.OK; + return result; } catch (ZkNoNodeException e) { @@ -105,7 +132,9 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> String parentPath = HelixUtil.getZkParentPath(path); try { - RetCode rc = create(parentPath, null, pathCreated, AccessOption.PERSISTENT); + AccessResult res = doCreate(parentPath, null, AccessOption.PERSISTENT); + result._pathCreated.addAll(res._pathCreated); + RetCode rc = res._retCode; if (rc == RetCode.OK || rc == RetCode.NODE_EXISTS) { // if parent node created/exists, retry @@ -115,23 +144,27 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> catch (Exception e1) { LOG.error("Exception while creating path: " + parentPath, e1); - return RetCode.ERROR; + result._retCode = RetCode.ERROR; + return result; } } catch (ZkNodeExistsException e) { LOG.warn("Node already exists. path: " + path); - return RetCode.NODE_EXISTS; + result._retCode = RetCode.NODE_EXISTS; + return result; } catch (Exception e) { LOG.error("Exception while creating path: " + path, e); - return RetCode.ERROR; + result._retCode = RetCode.ERROR; + return result; } } while (retry); - return RetCode.OK; + result._retCode = RetCode.OK; + return result; } /** @@ -140,27 +173,43 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> @Override public boolean set(String path, T record, int options) { - return set(path, record, null, null, -1, options); + return set(path, record, -1, options); } /** * sync set + */ + @Override + public boolean set(String path, T record, int expectVersion, int options) + { + try + { + AccessResult result = doSet(path, record, expectVersion, options); + return result._retCode == RetCode.OK; + } + catch (ZkBadVersionException e) + { + return false; + } + } + + /** + * sync set * - * @param setstat - * : if node is created instead of set, stat will NOT be set */ - public boolean set(String path, - T record, - List<String> pathsCreated, - Stat setstat, - int expectVersion, - int options) + public AccessResult doSet(String path, + T record, + int expectVersion, + int options) { + AccessResult result = new AccessResult(); + CreateMode mode = AccessOption.getMode(options); if (mode == null) { LOG.error("Invalid set mode. options: " + options); - return false; + result._retCode = RetCode.ERROR; + return result; } boolean retry; @@ -169,36 +218,43 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> retry = false; try { - // _zkClient.writeData(path, record); - Stat setStat = _zkClient.writeDataGetStat(path, record, expectVersion); - if (setstat != null) - DataTree.copyStat(setStat, setstat); + Stat stat = _zkClient.writeDataGetStat(path, record, expectVersion); + DataTree.copyStat(stat, result._stat); } catch (ZkNoNodeException e) { - // node not exists, try create. in this case, stat will not be set + // node not exists, try create if expectedVersion == -1; in this case, stat will not be set + if (expectVersion != -1) + { + LOG.error("Could not create node if expectVersion != -1, was " + expectVersion); + result._retCode = RetCode.ERROR; + return result; + } try { - RetCode rc = create(path, record, pathsCreated, options); - // if (rc == RetCode.OK || rc == RetCode.NODE_EXISTS) - // retry = true; + // may create recursively + AccessResult res = doCreate(path, record, options); + result._pathCreated.addAll(res._pathCreated); + RetCode rc = res._retCode; switch (rc) { case OK: // not set stat if node is created (instead of set) break; - case NODE_EXISTS: + case NODE_EXISTS: retry = true; break; default: LOG.error("Fail to set path by creating: " + path); - return false; + result._retCode = RetCode.ERROR; + return result; } } catch (Exception e1) { LOG.error("Exception while setting path by creating: " + path, e); - return false; + result._retCode = RetCode.ERROR; + return result; } } catch (ZkBadVersionException e) @@ -208,12 +264,14 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> catch (Exception e) { LOG.error("Exception while setting path: " + path, e); - return false; + result._retCode = RetCode.ERROR; + return result; } } while (retry); - return true; + result._retCode = RetCode.OK; + return result; } /** @@ -222,25 +280,25 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> @Override public boolean update(String path, DataUpdater<T> updater, int options) { - return update(path, updater, null, null, options) != null; + AccessResult result = doUpdate(path, updater, options); + return result._retCode == RetCode.OK; } /** * sync update * - * @return: updatedData on success, or null on fail */ - public T update(String path, - DataUpdater<T> updater, - List<String> createPaths, - Stat stat, - int options) + public AccessResult doUpdate(String path, + DataUpdater<T> updater, + int options) { + AccessResult result = new AccessResult(); CreateMode mode = AccessOption.getMode(options); if (mode == null) { LOG.error("Invalid update mode. options: " + options); - return null; + result._retCode = RetCode.ERROR; + return result; } boolean retry; @@ -254,10 +312,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> T oldData = (T) _zkClient.readData(path, readStat); T newData = updater.update(oldData); Stat setStat = _zkClient.writeDataGetStat(path, newData, readStat.getVersion()); - if (stat != null) - { - DataTree.copyStat(setStat, stat); - } + DataTree.copyStat(setStat, result._stat); updatedData = newData; } @@ -267,11 +322,13 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> } catch (ZkNoNodeException e) { - // node not exist, try create + // node not exist, try create, pass null to updater try { T newData = updater.update(null); - RetCode rc = create(path, newData, createPaths, options); + AccessResult res = doCreate(path, newData, options); + result._pathCreated.addAll(res._pathCreated); + RetCode rc = res._retCode; switch (rc) { case OK: @@ -282,24 +339,29 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> break; default: LOG.error("Fail to update path by creating: " + path); - return null; + result._retCode = RetCode.ERROR; + return result; } } catch (Exception e1) { LOG.error("Exception while updating path by creating: " + path, e1); - return null; + result._retCode = RetCode.ERROR; + return result; } } catch (Exception e) { LOG.error("Exception while updating path: " + path, e); - return null; + result._retCode = RetCode.ERROR; + return result; } } while (retry); - return updatedData; + result._retCode = RetCode.OK; + result._updatedValue = updatedData; + return result; } /** @@ -516,13 +578,12 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> try { // optimize on common path - _zkClient.delete(path); + return _zkClient.delete(path); } catch (ZkException e) { - _zkClient.deleteRecursive(path); + return _zkClient.deleteRecursive(path); } - return true; } /** http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4a46ae05/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java index 869871c..e9e0587 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java @@ -276,11 +276,10 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> try { cache.lockWrite(); - List<String> pathsCreated = new ArrayList<String>(); - RetCode rc = _baseAccessor.create(serverPath, data, pathsCreated, options); - boolean success = (rc == RetCode.OK); + ZkBaseDataAccessor<T>.AccessResult result = _baseAccessor.doCreate(serverPath, data, options); + boolean success = (result._retCode == RetCode.OK); - updateCache(cache, pathsCreated, success, serverPath, data, ZNode.ZERO_STAT); + updateCache(cache, result._pathCreated, success, serverPath, data, ZNode.ZERO_STAT); return success; } @@ -297,6 +296,12 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> @Override public boolean set(String path, T data, int options) { + return set(path, data, -1, options); + } + + @Override + public boolean set(String path, T data, int expectVersion, int options) + { String clientPath = path; String serverPath = prependChroot(clientPath); @@ -306,15 +311,17 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> try { cache.lockWrite(); - Stat setStat = new Stat(); - List<String> pathsCreated = new ArrayList<String>(); - boolean success = - _baseAccessor.set(serverPath, data, pathsCreated, setStat, -1, options); + ZkBaseDataAccessor<T>.AccessResult result = _baseAccessor.doSet(serverPath, data, expectVersion, options); + boolean success = result._retCode == RetCode.OK; - updateCache(cache, pathsCreated, success, serverPath, data, setStat); + updateCache(cache, result._pathCreated, success, serverPath, data, result._stat); return success; } + catch (Exception e) + { + return false; + } finally { cache.unlockWrite(); @@ -322,9 +329,9 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> } // no cache - return _baseAccessor.set(serverPath, data, options); + return _baseAccessor.set(serverPath, data, expectVersion, options); } - + @Override public boolean update(String path, DataUpdater<T> updater, int options) { @@ -338,12 +345,9 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> try { cache.lockWrite(); - Stat setStat = new Stat(); - List<String> pathsCreated = new ArrayList<String>(); - T updateData = - _baseAccessor.update(serverPath, updater, pathsCreated, setStat, options); - boolean success = (updateData != null); - updateCache(cache, pathsCreated, success, serverPath, updateData, setStat); + ZkBaseDataAccessor<T>.AccessResult result = _baseAccessor.doUpdate(serverPath, updater, options); + boolean success = (result._retCode == RetCode.OK); + updateCache(cache, result._pathCreated, success, serverPath, result._updatedValue, result._stat); return success; } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4a46ae05/helix-core/src/test/java/org/apache/helix/Mocks.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java index e30c09b..d688561 100644 --- a/helix-core/src/test/java/org/apache/helix/Mocks.java +++ b/helix-core/src/test/java/org/apache/helix/Mocks.java @@ -195,6 +195,13 @@ public class Mocks { } + @Override + public boolean set(String path, ZNRecord record, int options, int expectVersion) + { + // TODO Auto-generated method stub + return false; + } + // @Override // public boolean subscribe(String path, IZkListener listener) { // // TODO Auto-generated method stub http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4a46ae05/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java index f97e018..b120105 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java @@ -28,11 +28,14 @@ import org.apache.helix.AccessOption; import org.apache.helix.BaseDataAccessor; import org.apache.helix.PropertyPathConfig; import org.apache.helix.PropertyType; +import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; import org.apache.helix.ZNRecordUpdater; import org.apache.helix.ZkUnitTestBase; import org.apache.helix.manager.zk.ZNRecordSerializer; import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.manager.zk.ZkBaseDataAccessor.AccessResult; +import org.apache.helix.manager.zk.ZkBaseDataAccessor.RetCode; import org.apache.helix.manager.zk.ZkClient; import org.apache.zookeeper.data.Stat; import org.testng.Assert; @@ -41,128 +44,338 @@ import org.testng.annotations.Test; public class TestZkBaseDataAccessor extends ZkUnitTestBase { + @Test - public void testSyncZkBaseDataAccessor() + public void testSyncSet() { - System.out.println("START TestZkBaseDataAccessor.sync at " + new Date(System.currentTimeMillis())); + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String testName = className + "_" + methodName; - String root = "TestZkBaseDataAccessor_syn"; - ZkClient zkClient = new ZkClient(ZK_ADDR); - zkClient.setZkSerializer(new ZNRecordSerializer()); - zkClient.deleteRecursive("/" + root); + System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis())); + + String path = String.format("/%s/%s", testName, "msg_0"); + ZNRecord record = new ZNRecord("msg_0"); + BaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); + + boolean success = accessor.set(path, record, AccessOption.PERSISTENT); + Assert.assertTrue(success); + ZNRecord getRecord = _gZkClient.readData(path); + Assert.assertNotNull(getRecord); + Assert.assertEquals(getRecord.getId(), "msg_0"); + + System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis())); - BaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(zkClient); + } + + @Test + public void testSyncSetWithVersion() + { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String testName = className + "_" + methodName; - // test sync create - for (int i = 0; i < 10; i++) + System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis())); + + String path = String.format("/%s/%s", testName, "msg_0"); + ZNRecord record = new ZNRecord("msg_0"); + BaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); + + // set persistent + boolean success = accessor.set(path, record, 0, AccessOption.PERSISTENT); + Assert.assertFalse(success, "Should fail since version not match"); + try { + _gZkClient.readData(path, false); + Assert.fail("Should get no node exception"); + } catch (Exception e) { - String msgId = "msg_" + i; - String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId); - boolean success = accessor.create(path, new ZNRecord(msgId), AccessOption.PERSISTENT); - Assert.assertTrue(success, "Should succeed in create"); + // OK } - - // test get what we created - for (int i = 0; i < 10; i++) + + success = accessor.set(path, record, -1, AccessOption.PERSISTENT); + Assert.assertTrue(success); + ZNRecord getRecord = _gZkClient.readData(path); + Assert.assertNotNull(getRecord); + Assert.assertEquals(getRecord.getId(), "msg_0"); + + // set ephemeral + path = String.format("/%s/%s", testName, "msg_1"); + record = new ZNRecord("msg_1"); + success = accessor.set(path, record, 0, AccessOption.EPHEMERAL); + Assert.assertFalse(success); + try { + _gZkClient.readData(path, false); + Assert.fail("Should get no node exception"); + } catch (Exception e) { - String msgId = "msg_" + i; - String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId); - ZNRecord record = zkClient.readData(path); - Assert.assertEquals(record.getId(), msgId, "Should get what we created"); + // OK } + + success = accessor.set(path, record, -1, AccessOption.EPHEMERAL); + Assert.assertTrue(success); + getRecord = _gZkClient.readData(path); + Assert.assertNotNull(getRecord); + Assert.assertEquals(getRecord.getId(), "msg_1"); + + + record.setSimpleField("key0", "value0"); + success = accessor.set(path, record, 0, AccessOption.PERSISTENT); + Assert.assertTrue(success, "Should pass. AccessOption.PERSISTENT is ignored"); + getRecord = _gZkClient.readData(path); + Assert.assertNotNull(getRecord); + Assert.assertEquals(getRecord.getSimpleFields().size(), 1); + Assert.assertNotNull(getRecord.getSimpleField("key0")); + Assert.assertEquals(getRecord.getSimpleField("key0"), "value0"); + + System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis())); + } + + @Test + public void testSyncDoSet() + { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String testName = className + "_" + methodName; - // test sync set - for (int i = 0; i < 10; i++) - { - String msgId = "msg_" + i; - String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId); - ZNRecord newRecord = new ZNRecord(msgId); - newRecord.setSimpleField("key1", "value1"); - boolean success = accessor.set(path, newRecord, AccessOption.PERSISTENT); - Assert.assertTrue(success, "Should succeed in set"); - } + System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis())); + + String path = String.format("/%s/%s/%s", testName, "msg_0", "submsg_0"); + ZNRecord record = new ZNRecord("submsg_0"); + ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); - // test get what we set - for (int i = 0; i < 10; i++) - { - String msgId = "msg_" + i; - String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId); - ZNRecord record = zkClient.readData(path); - Assert.assertEquals(record.getSimpleFields().size(), 1, "Should have 1 simple field set"); - Assert.assertEquals(record.getSimpleField("key1"), "value1", "Should have value1 set"); - } + AccessResult result = accessor.doSet(path, record, -1, AccessOption.PERSISTENT); + Assert.assertEquals(result._retCode, RetCode.OK); + Assert.assertEquals(result._pathCreated.size(), 3); + Assert.assertTrue(result._pathCreated.contains(String.format("/%s", testName))); + Assert.assertTrue(result._pathCreated.contains(String.format("/%s/%s", testName, "msg_0"))); + Assert.assertTrue(result._pathCreated.contains(path)); - // test sync update - for (int i = 0; i < 10; i++) - { - String msgId = "msg_" + i; - String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId); - ZNRecord newRecord = new ZNRecord(msgId); - newRecord.setSimpleField("key2", "value2"); - boolean success = accessor.update(path, new ZNRecordUpdater(newRecord), AccessOption.PERSISTENT); - Assert.assertTrue(success, "Should succeed in update"); - } + Assert.assertTrue(_gZkClient.exists(String.format("/%s", testName))); + Assert.assertTrue(_gZkClient.exists(String.format("/%s/%s", testName, "msg_0"))); + ZNRecord getRecord = _gZkClient.readData(path); + Assert.assertNotNull(getRecord); + Assert.assertEquals(getRecord.getId(), "submsg_0"); + + System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis())); + } + + @Test + public void testSyncCreate() + { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String testName = className + "_" + methodName; - // test get what we updated - for (int i = 0; i < 10; i++) - { - String msgId = "msg_" + i; - String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId); - ZNRecord record = zkClient.readData(path); - Assert.assertEquals(record.getSimpleFields().size(), 2, "Should have 2 simple fields set"); - Assert.assertEquals(record.getSimpleField("key2"), "value2", "Should have value2 set"); - } + System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis())); - // test sync get - for (int i = 0; i < 10; i++) - { - String msgId = "msg_" + i; - String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId); - ZNRecord record = accessor.get(path, null, 0); - Assert.assertEquals(record.getSimpleFields().size(), 2, "Should have 2 simple fields set"); - Assert.assertEquals(record.getSimpleField("key1"), "value1", "Should have value1 set"); - Assert.assertEquals(record.getSimpleField("key2"), "value2", "Should have value2 set"); - } + String path = String.format("/%s/%s", testName, "msg_0"); + ZNRecord record = new ZNRecord("msg_0"); + ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); + + boolean success = accessor.create(path, record, AccessOption.PERSISTENT); + Assert.assertTrue(success); + ZNRecord getRecord = _gZkClient.readData(path); + Assert.assertNotNull(getRecord); + Assert.assertEquals(getRecord.getId(), "msg_0"); - // test sync exist - for (int i = 0; i < 10; i++) - { - String msgId = "msg_" + i; - String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId); - boolean exists = accessor.exists(path, 0); - Assert.assertTrue(exists, "Should exist"); - } + record.setSimpleField("key0", "value0"); + success = accessor.create(path, record, AccessOption.PERSISTENT); + Assert.assertFalse(success, "Should fail since node already exists"); + getRecord = _gZkClient.readData(path); + Assert.assertNotNull(getRecord); + Assert.assertEquals(getRecord.getSimpleFields().size(), 0); + + System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis())); + } + + @Test + public void testSyncUpdate() + { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String testName = className + "_" + methodName; - // test getStat() - for (int i = 0; i < 10; i++) - { - String msgId = "msg_" + i; - String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId); - Stat stat = accessor.getStat(path, 0); - Assert.assertNotNull(stat, "Stat should exist"); - Assert.assertEquals(stat.getVersion(), 2, "DataVersion should be 2, since we set 1 and update 1"); - } + System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis())); + + String path = String.format("/%s/%s", testName, "msg_0"); + ZNRecord record = new ZNRecord("msg_0"); + ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); + + boolean success = accessor.update(path, new ZNRecordUpdater(record), AccessOption.PERSISTENT); + Assert.assertTrue(success); + ZNRecord getRecord = _gZkClient.readData(path); + Assert.assertNotNull(getRecord); + Assert.assertEquals(getRecord.getId(), "msg_0"); - // test sync remove - for (int i = 0; i < 10; i++) + record.setSimpleField("key0", "value0"); + success = accessor.update(path, new ZNRecordUpdater(record), AccessOption.PERSISTENT); + Assert.assertTrue(success); + getRecord = _gZkClient.readData(path); + Assert.assertNotNull(getRecord); + Assert.assertEquals(getRecord.getSimpleFields().size(), 1); + Assert.assertNotNull(getRecord.getSimpleField("key0")); + Assert.assertEquals(getRecord.getSimpleField("key0"), "value0"); + + // test throw exception from updater + success = accessor.update(path, new DataUpdater<ZNRecord>() { - String msgId = "msg_" + i; - String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId); - boolean success = accessor.remove(path, 0); - Assert.assertTrue(success, "Should remove"); - } + + @Override + public ZNRecord update(ZNRecord currentData) + { + throw new RuntimeException("IGNORABLE: test throw exception from updater"); + } + }, AccessOption.PERSISTENT); + Assert.assertFalse(success); + getRecord = _gZkClient.readData(path); + Assert.assertNotNull(getRecord); + Assert.assertEquals(getRecord.getSimpleFields().size(), 1); - // test get what we removed - for (int i = 0; i < 10; i++) + System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis())); + } + + @Test + public void testSyncRemove() + { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String testName = className + "_" + methodName; + + System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis())); + + String path = String.format("/%s/%s", testName, "msg_0"); + ZNRecord record = new ZNRecord("msg_0"); + ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); + + boolean success = accessor.remove(path, 0); + Assert.assertFalse(success); + + success = accessor.create(path, record, AccessOption.PERSISTENT); + Assert.assertTrue(success); + ZNRecord getRecord = _gZkClient.readData(path); + Assert.assertNotNull(getRecord); + Assert.assertEquals(getRecord.getId(), "msg_0"); + + success = accessor.remove(path, 0); + Assert.assertTrue(success); + Assert.assertFalse(_gZkClient.exists(path)); + + System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis())); + } + + @Test + public void testSyncGet() + { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String testName = className + "_" + methodName; + + System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis())); + + String path = String.format("/%s/%s", testName, "msg_0"); + ZNRecord record = new ZNRecord("msg_0"); + ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); + + Stat stat = new Stat(); + ZNRecord getRecord = accessor.get(path, stat, 0); + Assert.assertNull(getRecord); + + try { + accessor.get(path, stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST); + Assert.fail("Should throw exception if not exist"); + } catch (Exception e) { - String msgId = "msg_" + i; - String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId); - boolean exists = zkClient.exists(path); - Assert.assertFalse(exists, "Should be removed"); + // OK } + + boolean success = accessor.create(path, record, AccessOption.PERSISTENT); + Assert.assertTrue(success); + + getRecord = accessor.get(path, stat, 0); + Assert.assertNotNull(getRecord); + Assert.assertEquals(getRecord.getId(), "msg_0"); + Assert.assertEquals(stat.getVersion(), 0); + + record.setSimpleField("key0", "value0"); + success = accessor.set(path, record, AccessOption.PERSISTENT); + Assert.assertTrue(success); + + getRecord = accessor.get(path, stat, 0); + Assert.assertNotNull(getRecord); + Assert.assertEquals(record.getSimpleFields().size(), 1); + Assert.assertNotNull(getRecord.getSimpleField("key0")); + Assert.assertEquals(getRecord.getSimpleField("key0"), "value0"); + Assert.assertEquals(stat.getVersion(), 1); + + ZNRecord newRecord = new ZNRecord("msg_0"); + newRecord.setSimpleField("key1", "value1"); + success = accessor.update(path, new ZNRecordUpdater(newRecord), AccessOption.PERSISTENT); + Assert.assertTrue(success); + + getRecord = accessor.get(path, stat, 0); + Assert.assertNotNull(getRecord); + Assert.assertEquals(getRecord.getSimpleFields().size(), 2); + Assert.assertNotNull(getRecord.getSimpleField("key0")); + Assert.assertEquals(getRecord.getSimpleField("key0"), "value0"); + Assert.assertNotNull(getRecord.getSimpleField("key1")); + Assert.assertEquals(getRecord.getSimpleField("key1"), "value1"); + Assert.assertEquals(stat.getVersion(), 2); + + System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis())); + } + + @Test + public void testSyncExist() + { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String testName = className + "_" + methodName; + + System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis())); + + String path = String.format("/%s/%s", testName, "msg_0"); + ZNRecord record = new ZNRecord("msg_0"); + ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); + + boolean success = accessor.exists(path, 0); + Assert.assertFalse(success); + + success = accessor.create(path, record, AccessOption.EPHEMERAL); + Assert.assertTrue(success); + + success = accessor.exists(path, 0); + Assert.assertTrue(success); + + System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis())); + + } + + @Test + public void testSyncGetStat() + { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String testName = className + "_" + methodName; + + System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis())); + + String path = String.format("/%s/%s", testName, "msg_0"); + ZNRecord record = new ZNRecord("msg_0"); + ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); + + Stat stat = accessor.getStat(path, 0); + Assert.assertNull(stat); + + boolean success = accessor.create(path, record, AccessOption.EPHEMERAL); + Assert.assertTrue(success); + + stat = accessor.getStat(path, 0); + Assert.assertNotNull(stat); + Assert.assertEquals(stat.getVersion(), 0); + Assert.assertNotSame(stat.getEphemeralOwner(), 0); + + System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis())); - zkClient.close(); - System.out.println("END TestZkBaseDataAccessor.sync at " + new Date(System.currentTimeMillis())); } @Test
