This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch zooscalability in repository https://gitbox.apache.org/repos/asf/helix.git
commit e3a33851db2089fecb92d1a60d29fb9c55a90b1e Author: Hunter Lee <[email protected]> AuthorDate: Fri Mar 13 17:19:34 2020 -0700 Reformat ZkBaseDataAccessor (#893) Changelist: 1. Add generic type markers to Builder (<T>) 2. Fix a bug in validate function 3. Default to ZNRecordSerializer to preserve existing behavior --- .../helix/manager/zk/ZkBaseDataAccessor.java | 301 +++++++++++---------- 1 file changed, 159 insertions(+), 142 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java index 1d60c7b..32f33f8 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java @@ -52,6 +52,7 @@ import org.apache.helix.zookeeper.zkclient.exception.ZkException; import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException; import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException; import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer; +import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.data.Stat; @@ -59,6 +60,7 @@ import org.apache.zookeeper.server.DataTree; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { // Designates which mode ZkBaseDataAccessor should be created in. If not specified, it will be @@ -139,7 +141,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { _usesExternalZkClient = true; } - private ZkBaseDataAccessor(Builder builder) { + private ZkBaseDataAccessor(Builder<T> builder) { switch (builder.realmMode) { case MULTI_REALM: try { @@ -397,16 +399,16 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { result._pathCreated.addAll(res._pathCreated); RetCode rc = res._retCode; switch (rc) { - case OK: - // not set stat if node is created (instead of set) - break; - case NODE_EXISTS: - retry = true; - break; - default: - LOG.error("Fail to set path by creating: " + path); - result._retCode = RetCode.ERROR; - return result; + case OK: + // not set stat if node is created (instead of set) + break; + case NODE_EXISTS: + retry = true; + break; + default: + LOG.error("Fail to set path by creating: " + path); + result._retCode = RetCode.ERROR; + return result; } } catch (Exception e1) { LOG.error("Exception while setting path by creating: " + path, e); @@ -477,16 +479,16 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { rc = RetCode.OK; } switch (rc) { - case OK: - updatedData = newData; - break; - case NODE_EXISTS: - retry = true; - break; - default: - LOG.error("Fail to update path by creating: " + path); - result._retCode = RetCode.ERROR; - return result; + case OK: + updatedData = newData; + break; + case NODE_EXISTS: + retry = true; + break; + default: + LOG.error("Fail to update path by creating: " + path); + result._retCode = RetCode.ERROR; + return result; } } catch (Exception e1) { LOG.error("Exception while updating path by creating: " + path, e1); @@ -553,17 +555,19 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { // init stats if (stats != null) { stats.clear(); - stats.addAll(Collections.<Stat> nCopies(paths.size(), null)); + stats.addAll(Collections.<Stat>nCopies(paths.size(), null)); } long startT = System.nanoTime(); try { // issue asyn get requests - ZkAsyncCallbacks.GetDataCallbackHandler[] cbList = new ZkAsyncCallbacks.GetDataCallbackHandler[paths.size()]; + ZkAsyncCallbacks.GetDataCallbackHandler[] cbList = + new ZkAsyncCallbacks.GetDataCallbackHandler[paths.size()]; for (int i = 0; i < paths.size(); i++) { - if (!needRead[i]) + if (!needRead[i]) { continue; + } String path = paths.get(i); cbList[i] = new ZkAsyncCallbacks.GetDataCallbackHandler(); @@ -572,19 +576,21 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { // wait for completion for (int i = 0; i < cbList.length; i++) { - if (!needRead[i]) + if (!needRead[i]) { continue; + } ZkAsyncCallbacks.GetDataCallbackHandler cb = cbList[i]; cb.waitForSuccess(); } // construct return results - List<T> records = new ArrayList<T>(Collections.<T> nCopies(paths.size(), null)); + List<T> records = new ArrayList<T>(Collections.<T>nCopies(paths.size(), null)); Map<String, Integer> pathFailToRead = new HashMap<>(); for (int i = 0; i < paths.size(); i++) { - if (!needRead[i]) + if (!needRead[i]) { continue; + } ZkAsyncCallbacks.GetDataCallbackHandler cb = cbList[i]; if (Code.get(cb.getRc()) == Code.OK) { @@ -610,8 +616,9 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { } finally { long endT = System.nanoTime(); if (LOG.isTraceEnabled()) { - LOG.trace("getData_async, size: " + paths.size() + ", paths: " + paths.get(0) - + ",... time: " + (endT - startT) + " ns"); + LOG.trace( + "getData_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: " + ( + endT - startT) + " ns"); } } } @@ -752,15 +759,16 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { /** * async create. give up on error other than NONODE */ - ZkAsyncCallbacks.CreateCallbackHandler[] create(List<String> paths, List<T> records, boolean[] needCreate, - List<List<String>> pathsCreated, int options) { + ZkAsyncCallbacks.CreateCallbackHandler[] create(List<String> paths, List<T> records, + boolean[] needCreate, List<List<String>> pathsCreated, int options) { if ((records != null && records.size() != paths.size()) || needCreate.length != paths.size() || (pathsCreated != null && pathsCreated.size() != paths.size())) { throw new IllegalArgumentException( "paths, records, needCreate, and pathsCreated should be of same size"); } - ZkAsyncCallbacks.CreateCallbackHandler[] cbList = new ZkAsyncCallbacks.CreateCallbackHandler[paths.size()]; + ZkAsyncCallbacks.CreateCallbackHandler[] cbList = + new ZkAsyncCallbacks.CreateCallbackHandler[paths.size()]; CreateMode mode = AccessOption.getMode(options); if (mode == null) { @@ -773,8 +781,9 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { retry = false; for (int i = 0; i < paths.size(); i++) { - if (!needCreate[i]) + if (!needCreate[i]) { continue; + } String path = paths.get(i); T record = records == null ? null : records.get(i); @@ -782,12 +791,13 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { _zkClient.asyncCreate(path, record, mode, cbList[i]); } - List<String> parentPaths = new ArrayList<>(Collections.<String> nCopies(paths.size(), null)); + List<String> parentPaths = new ArrayList<>(Collections.<String>nCopies(paths.size(), null)); boolean failOnNoNode = false; for (int i = 0; i < paths.size(); i++) { - if (!needCreate[i]) + if (!needCreate[i]) { continue; + } ZkAsyncCallbacks.CreateCallbackHandler cb = cbList[i]; cb.waitForSuccess(); @@ -819,8 +829,9 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.PERSISTENT); for (int i = 0; i < parentCbList.length; i++) { ZkAsyncCallbacks.CreateCallbackHandler parentCb = parentCbList[i]; - if (parentCb == null) + if (parentCb == null) { continue; + } Code rc = Code.get(parentCb.getRc()); @@ -853,12 +864,13 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { boolean[] needCreate = new boolean[paths.size()]; Arrays.fill(needCreate, true); List<List<String>> pathsCreated = - new ArrayList<>(Collections.<List<String>> nCopies(paths.size(), null)); + new ArrayList<>(Collections.<List<String>>nCopies(paths.size(), null)); long startT = System.nanoTime(); try { - ZkAsyncCallbacks.CreateCallbackHandler[] cbList = create(paths, records, needCreate, pathsCreated, options); + ZkAsyncCallbacks.CreateCallbackHandler[] cbList = + create(paths, records, needCreate, pathsCreated, options); for (int i = 0; i < cbList.length; i++) { ZkAsyncCallbacks.CreateCallbackHandler cb = cbList[i]; @@ -866,12 +878,12 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { } return success; - } finally { long endT = System.nanoTime(); if (LOG.isTraceEnabled()) { - LOG.trace("create_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: " - + (endT - startT) + " ns"); + LOG.trace( + "create_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: " + ( + endT - startT) + " ns"); } } } @@ -894,8 +906,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { return new boolean[0]; } - if ((records != null && records.size() != paths.size()) - || (pathsCreated != null && pathsCreated.size() != paths.size())) { + if ((records != null && records.size() != paths.size()) || (pathsCreated != null + && pathsCreated.size() != paths.size())) { throw new IllegalArgumentException("paths, records, and pathsCreated should be of same size"); } @@ -907,8 +919,9 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { return success; } - List<Stat> setStats = new ArrayList<>(Collections.<Stat> nCopies(paths.size(), null)); - ZkAsyncCallbacks.SetDataCallbackHandler[] cbList = new ZkAsyncCallbacks.SetDataCallbackHandler[paths.size()]; + List<Stat> setStats = new ArrayList<>(Collections.<Stat>nCopies(paths.size(), null)); + ZkAsyncCallbacks.SetDataCallbackHandler[] cbList = + new ZkAsyncCallbacks.SetDataCallbackHandler[paths.size()]; ZkAsyncCallbacks.CreateCallbackHandler[] createCbList = null; boolean[] needSet = new boolean[paths.size()]; Arrays.fill(needSet, true); @@ -921,14 +934,14 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { retry = false; for (int i = 0; i < paths.size(); i++) { - if (!needSet[i]) + if (!needSet[i]) { continue; + } String path = paths.get(i); T record = records.get(i); cbList[i] = new ZkAsyncCallbacks.SetDataCallbackHandler(); _zkClient.asyncSetData(path, record, -1, cbList[i]); - } boolean failOnNoNode = false; @@ -938,18 +951,18 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { cb.waitForSuccess(); Code rc = Code.get(cb.getRc()); switch (rc) { - case OK: - setStats.set(i, cb.getStat()); - needSet[i] = false; - break; - case NONODE: - // if fail on NoNode, try create the node - failOnNoNode = true; - break; - default: - // if fail on error other than NoNode, give up - needSet[i] = false; - break; + case OK: + setStats.set(i, cb.getStat()); + needSet[i] = false; + break; + case NONODE: + // if fail on NoNode, try create the node + failOnNoNode = true; + break; + default: + // if fail on error other than NoNode, give up + needSet[i] = false; + break; } } @@ -965,18 +978,18 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { Code rc = Code.get(createCb.getRc()); switch (rc) { - case OK: - setStats.set(i, ZNode.ZERO_STAT); - needSet[i] = false; - break; - case NODEEXISTS: - retry = true; - break; - default: - // if creation fails on error other than NodeExists - // no need to retry set - needSet[i] = false; - break; + case OK: + setStats.set(i, ZNode.ZERO_STAT); + needSet[i] = false; + break; + case NODEEXISTS: + retry = true; + break; + default: + // if creation fails on error other than NodeExists + // no need to retry set + needSet[i] = false; + break; } } } @@ -1006,13 +1019,15 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { } finally { long endT = System.nanoTime(); if (LOG.isTraceEnabled()) { - LOG.trace("setData_async, size: " + paths.size() + ", paths: " + paths.get(0) - + ",... time: " + (endT - startT) + " ns"); + LOG.trace( + "setData_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: " + ( + endT - startT) + " ns"); } } } // TODO: rename to update + /** * async update */ @@ -1039,14 +1054,14 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { return Collections.emptyList(); } - if (updaters.size() != paths.size() - || (pathsCreated != null && pathsCreated.size() != paths.size())) { + if (updaters.size() != paths.size() || (pathsCreated != null && pathsCreated.size() != paths + .size())) { throw new IllegalArgumentException( "paths, updaters, and pathsCreated should be of same size"); } - List<Stat> setStats = new ArrayList<Stat>(Collections.<Stat> nCopies(paths.size(), null)); - List<T> updateData = new ArrayList<T>(Collections.<T> nCopies(paths.size(), null)); + List<Stat> setStats = new ArrayList<Stat>(Collections.<Stat>nCopies(paths.size(), null)); + List<T> updateData = new ArrayList<T>(Collections.<T>nCopies(paths.size(), null)); CreateMode mode = AccessOption.getMode(options); if (mode == null) { @@ -1054,7 +1069,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { return updateData; } - ZkAsyncCallbacks.SetDataCallbackHandler[] cbList = new ZkAsyncCallbacks.SetDataCallbackHandler[paths.size()]; + ZkAsyncCallbacks.SetDataCallbackHandler[] cbList = + new ZkAsyncCallbacks.SetDataCallbackHandler[paths.size()]; ZkAsyncCallbacks.CreateCallbackHandler[] createCbList = null; boolean[] needUpdate = new boolean[paths.size()]; Arrays.fill(needUpdate, true); @@ -1104,29 +1120,30 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { for (int i = 0; i < paths.size(); i++) { ZkAsyncCallbacks.SetDataCallbackHandler cb = cbList[i]; - if (cb == null) + if (cb == null) { continue; + } cb.waitForSuccess(); switch (Code.get(cb.getRc())) { - case OK: - updateData.set(i, newDataList.get(i)); - setStats.set(i, cb.getStat()); - needUpdate[i] = false; - break; - case NONODE: - failOnNoNode = true; - needCreate[i] = true; - break; - case BADVERSION: - failOnBadVersion = true; - break; - default: - // if fail on error other than NoNode or BadVersion - // will not retry - needUpdate[i] = false; - break; + case OK: + updateData.set(i, newDataList.get(i)); + setStats.set(i, cb.getStat()); + needUpdate[i] = false; + break; + case NONODE: + failOnNoNode = true; + needCreate[i] = true; + break; + case BADVERSION: + failOnBadVersion = true; + break; + default: + // if fail on error other than NoNode or BadVersion + // will not retry + needUpdate[i] = false; + break; } } @@ -1140,19 +1157,19 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { } switch (Code.get(createCb.getRc())) { - case OK: - needUpdate[i] = false; - updateData.set(i, newDataList.get(i)); - setStats.set(i, ZNode.ZERO_STAT); - break; - case NODEEXISTS: - retry = true; - break; - default: - // if fail on error other than NodeExists - // will not retry - needUpdate[i] = false; - break; + case OK: + needUpdate[i] = false; + updateData.set(i, newDataList.get(i)); + setStats.set(i, ZNode.ZERO_STAT); + break; + case NODEEXISTS: + retry = true; + break; + default: + // if fail on error other than NodeExists + // will not retry + needUpdate[i] = false; + break; } } } @@ -1172,11 +1189,11 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { } finally { long endT = System.nanoTime(); if (LOG.isTraceEnabled()) { - LOG.trace("setData_async, size: " + paths.size() + ", paths: " + paths.get(0) - + ",... time: " + (endT - startT) + " ns"); + LOG.trace( + "setData_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: " + ( + endT - startT) + " ns"); } } - } /** @@ -1209,7 +1226,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { long startT = System.nanoTime(); try { - ZkAsyncCallbacks.ExistsCallbackHandler[] cbList = new ZkAsyncCallbacks.ExistsCallbackHandler[paths.size()]; + ZkAsyncCallbacks.ExistsCallbackHandler[] cbList = + new ZkAsyncCallbacks.ExistsCallbackHandler[paths.size()]; for (int i = 0; i < paths.size(); i++) { String path = paths.get(i); cbList[i] = new ZkAsyncCallbacks.ExistsCallbackHandler(); @@ -1226,8 +1244,9 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { } finally { long endT = System.nanoTime(); if (LOG.isTraceEnabled()) { - LOG.trace("exists_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: " - + (endT - startT) + " ns"); + LOG.trace( + "exists_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: " + ( + endT - startT) + " ns"); } } } @@ -1243,7 +1262,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { boolean[] success = new boolean[paths.size()]; - ZkAsyncCallbacks.DeleteCallbackHandler[] cbList = new ZkAsyncCallbacks.DeleteCallbackHandler[paths.size()]; + ZkAsyncCallbacks.DeleteCallbackHandler[] cbList = + new ZkAsyncCallbacks.DeleteCallbackHandler[paths.size()]; long startT = System.nanoTime(); @@ -1264,8 +1284,9 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { } finally { long endT = System.nanoTime(); if (LOG.isTraceEnabled()) { - LOG.trace("delete_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: " - + (endT - startT) + " ns"); + LOG.trace( + "delete_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: " + ( + endT - startT) + " ns"); } } } @@ -1321,39 +1342,38 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { } // TODO: refactor Builder class to remove duplicate code with other Helix Java APIs - public static class Builder { + public static class Builder<T> { private String zkAddress; - private RealmAwareZkClient.RealmMode realmMode; private ZkBaseDataAccessor.ZkClientType zkClientType; + private RealmAwareZkClient.RealmMode realmMode; private RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig; private RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig; public Builder() { } - public ZkBaseDataAccessor.Builder setZkAddress(String zkAddress) { + public Builder<T> setZkAddress(String zkAddress) { this.zkAddress = zkAddress; return this; } - public ZkBaseDataAccessor.Builder setRealmMode(RealmAwareZkClient.RealmMode realmMode) { + public Builder<T> setRealmMode(RealmAwareZkClient.RealmMode realmMode) { this.realmMode = realmMode; return this; } - public ZkBaseDataAccessor.Builder setZkClientType( - ZkBaseDataAccessor.ZkClientType zkClientType) { + public Builder<T> setZkClientType(ZkClientType zkClientType) { this.zkClientType = zkClientType; return this; } - public ZkBaseDataAccessor.Builder setRealmAwareZkConnectionConfig( + public Builder<T> setRealmAwareZkConnectionConfig( RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig) { this.realmAwareZkConnectionConfig = realmAwareZkConnectionConfig; return this; } - public ZkBaseDataAccessor.Builder setRealmAwareZkClientConfig( + public Builder<T> setRealmAwareZkClientConfig( RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig) { this.realmAwareZkClientConfig = realmAwareZkClientConfig; return this; @@ -1362,11 +1382,11 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { /** * Returns a <code>ZkBaseDataAccessor</code> instance. * <p> - * Note: in multi-realm mode, if and only if ZK client type is set to <code>FEDERATED</code>, - * <code>ZkBaseDataAccessor</code> can access to multi-realm. Otherwise, it can only access to - * single-ream. + * Note: ZK client type must be set to <code>FEDERATED</code> in order for + * <code>ZkBaseDataAccessor</code> can access multiple ZKs. Otherwise, it can only access + * single-ZK. */ - public ZkBaseDataAccessor<?> build() { + public ZkBaseDataAccessor<T> build() { validate(); return new ZkBaseDataAccessor<>(this); } @@ -1381,28 +1401,24 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { // If ZkClientType is set, RealmMode must either be single-realm or not set. if (isZkClientTypeSet && realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM) { - throw new HelixException( - "ZkClientType cannot be set on multi-realm mode!"); + throw new HelixException("ZkClientType cannot be set on multi-realm mode!"); } - // If ZkClientType is not set, default to SHARED - if (!isZkClientTypeSet) { - zkClientType = ZkBaseDataAccessor.ZkClientType.SHARED; + // If ZkClientType is not set and realmMode is single-realm, default to SHARED + if (!isZkClientTypeSet && realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM) { + zkClientType = ZkClientType.SHARED; } if (realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM && !isZkAddressSet) { - throw new HelixException( - "RealmMode cannot be single-realm without a valid ZkAddress set!"); + throw new HelixException("RealmMode cannot be single-realm without a valid ZkAddress set!"); } if (realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM && isZkAddressSet) { - throw new HelixException( - "ZkAddress cannot be set on multi-realm mode!"); + throw new HelixException("ZkAddress cannot be set on multi-realm mode!"); } if (realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM && zkClientType == ZkClientType.FEDERATED) { - throw new HelixException( - "FederatedZkClient cannot be set on single-realm mode!"); + throw new HelixException("FederatedZkClient cannot be set on single-realm mode!"); } if (realmMode == null) { @@ -1412,7 +1428,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { // Resolve RealmAwareZkClientConfig if (realmAwareZkClientConfig == null) { - realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig(); + realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig() + .setZkSerializer(new ZNRecordSerializer()); } // Resolve RealmAwareZkConnectionConfig
