This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch zooscalability in repository https://gitbox.apache.org/repos/asf/helix.git
commit 0bb4150731af81a6f1ebdaa80cb363a4c61993ef Author: Hunter Lee <[email protected]> AuthorDate: Wed Mar 11 19:44:26 2020 -0700 Make ZkCacheBaseDataAccessor and ZkHelixPropertyStore realm-aware (#863) This commit makes both ZkCacheBaseDataAccessor and ZkHelixPropertyStore realm-aware by choosing the appropriate realm-aware ZkClients in the constructor. Also, we add a Builder here to give users options to set Connection config and Client config. Note that ZkHelixPropertyStore extends CacheBaseDataAccessor so there is no change needed. --- .../helix/manager/zk/ZkBaseDataAccessor.java | 11 +- .../helix/manager/zk/ZkCacheBaseDataAccessor.java | 274 ++++++++++++++++++--- 2 files changed, 243 insertions(+), 42 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java index f08ba55..f287c22 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java @@ -58,6 +58,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { // Designates which mode ZkBaseDataAccessor should be created in. If not specified, it will be // created on SHARED mode. + // TODO: move this to RealmAwareZkClient public enum ZkClientType { /* * When ZkBaseDataAccessor is created with the DEDICATED type, it supports ephemeral node @@ -70,7 +71,12 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { * When ZkBaseDataAccessor is created with the SHARED type, it only supports CRUD * functionalities. This will be the default mode of creation. */ - SHARED + SHARED, + /* + * Uses FederatedZkClient (applicable on multi-realm mode only) that queries Metadata Store + * Directory Service for routing data + */ + FEDERATED } enum RetCode { @@ -104,7 +110,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { private static Logger LOG = LoggerFactory.getLogger(ZkBaseDataAccessor.class); private final RealmAwareZkClient _zkClient; - // true if ZkBaseDataAccessor was instantiated with a HelixZkClient, false otherwise + + // true if ZkBaseDataAccessor was instantiated with a RealmAwareZkClient, false otherwise // This is used for close() to determine how ZkBaseDataAccessor should close the underlying // ZkClient private final boolean _usesExternalZkClient; diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java index 6d2c5cf..bd05ea7 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java @@ -19,10 +19,10 @@ package org.apache.helix.manager.zk; * under the License. */ +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -31,12 +31,16 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.helix.AccessOption; import org.apache.helix.HelixException; +import org.apache.helix.SystemPropertyKeys; import org.apache.helix.manager.zk.ZkBaseDataAccessor.RetCode; +import org.apache.helix.msdcommon.exception.InvalidRoutingDataException; import org.apache.helix.store.HelixPropertyListener; import org.apache.helix.store.HelixPropertyStore; import org.apache.helix.store.zk.ZNode; import org.apache.helix.util.PathUtils; import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; +import org.apache.helix.zookeeper.impl.client.FederatedZkClient; import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory; import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory; import org.apache.helix.zookeeper.zkclient.DataUpdater; @@ -58,7 +62,15 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> { protected ZkCallbackCache<T> _zkCache; final ZkBaseDataAccessor<T> _baseAccessor; - final Map<String, Cache<T>> _cacheMap; + + // TODO: need to make sure no overlap between wtCachePaths and zkCachePaths + // TreeMap key is ordered by key string length, so more general (i.e. short) prefix + // comes first + final Map<String, Cache<T>> _cacheMap = new TreeMap<>((o1, o2) -> { + int len1 = o1.split("/").length; + int len2 = o2.split("/").length; + return len1 - len2; + }); final String _chrootPath; final List<String> _wtCachePaths; @@ -70,12 +82,14 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> { private final ReentrantLock _eventLock = new ReentrantLock(); private ZkCacheEventThread _eventThread; - private HelixZkClient _zkClient = null; + private RealmAwareZkClient _zkClient; + @Deprecated public ZkCacheBaseDataAccessor(ZkBaseDataAccessor<T> baseAccessor, List<String> wtCachePaths) { this(baseAccessor, null, wtCachePaths, null); } + @Deprecated public ZkCacheBaseDataAccessor(ZkBaseDataAccessor<T> baseAccessor, String chrootPath, List<String> wtCachePaths, List<String> zkCachePaths) { _baseAccessor = baseAccessor; @@ -90,50 +104,62 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> { _wtCachePaths = wtCachePaths; _zkCachePaths = zkCachePaths; - // TODO: need to make sure no overlap between wtCachePaths and zkCachePaths - // TreeMap key is ordered by key string length, so more general (i.e. short) prefix - // comes first - _cacheMap = new TreeMap<>(new Comparator<String>() { - @Override - public int compare(String o1, String o2) { - int len1 = o1.split("/").length; - int len2 = o2.split("/").length; - return len1 - len2; - } - }); - start(); } + @Deprecated public ZkCacheBaseDataAccessor(String zkAddress, ZkSerializer serializer, String chrootPath, List<String> wtCachePaths, List<String> zkCachePaths) { this(zkAddress, serializer, chrootPath, wtCachePaths, zkCachePaths, null, null, ZkBaseDataAccessor.ZkClientType.SHARED); } + @Deprecated public ZkCacheBaseDataAccessor(String zkAddress, ZkSerializer serializer, String chrootPath, List<String> wtCachePaths, List<String> zkCachePaths, String monitorType, String monitorkey) { this(zkAddress, serializer, chrootPath, wtCachePaths, zkCachePaths, monitorType, monitorkey, ZkBaseDataAccessor.ZkClientType.SHARED); } + @Deprecated public ZkCacheBaseDataAccessor(String zkAddress, ZkSerializer serializer, String chrootPath, List<String> wtCachePaths, List<String> zkCachePaths, String monitorType, String monitorkey, ZkBaseDataAccessor.ZkClientType zkClientType) { - HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig(); - clientConfig.setZkSerializer(serializer).setMonitorType(monitorType).setMonitorKey(monitorkey); - switch (zkClientType) { - case DEDICATED: - _zkClient = DedicatedZkClientFactory.getInstance().buildZkClient( - new HelixZkClient.ZkConnectionConfig(zkAddress), - new HelixZkClient.ZkClientConfig().setZkSerializer(serializer)); - break; - case SHARED: - default: - _zkClient = SharedZkClientFactory.getInstance() - .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), clientConfig); - } - _zkClient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS); + + // If the multi ZK config is enabled, use multi-realm mode with FederatedZkClient + if (Boolean.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) { + try { + RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder = + new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder(); + RealmAwareZkClient.RealmAwareZkClientConfig clientConfig = + new RealmAwareZkClient.RealmAwareZkClientConfig(); + clientConfig.setZkSerializer(serializer).setMonitorType(monitorType) + .setMonitorKey(monitorkey); + // Use a federated zk client + _zkClient = new FederatedZkClient(connectionConfigBuilder.build(), clientConfig); + } catch (IOException | InvalidRoutingDataException | IllegalStateException e) { + // Note: IllegalStateException is for HttpRoutingDataReader if MSDS endpoint cannot be + // found + throw new HelixException("Failed to create ZkCacheBaseDataAccessor!", e); + } + } else { + HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig(); + clientConfig.setZkSerializer(serializer).setMonitorType(monitorType) + .setMonitorKey(monitorkey); + switch (zkClientType) { + case DEDICATED: + _zkClient = DedicatedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), + new HelixZkClient.ZkClientConfig().setZkSerializer(serializer)); + break; + case SHARED: + default: + _zkClient = SharedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), clientConfig); + } + _zkClient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS); + } + _baseAccessor = new ZkBaseDataAccessor<>(_zkClient); if (chrootPath == null || chrootPath.equals("/")) { @@ -146,17 +172,67 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> { _wtCachePaths = wtCachePaths; _zkCachePaths = zkCachePaths; - // TODO: need to make sure no overlap between wtCachePaths and zkCachePaths - // TreeMap key is ordered by key string length, so more general (i.e. short) prefix - // comes first - _cacheMap = new TreeMap<>(new Comparator<String>() { - @Override - public int compare(String o1, String o2) { - int len1 = o1.split("/").length; - int len2 = o2.split("/").length; - return len1 - len2; - } - }); + start(); + } + + /** + * Constructor using a Builder that allows users to set connection and client configs. + * @param builder + */ + private ZkCacheBaseDataAccessor(Builder builder) { + _chrootPath = builder._chrootPath; + _wtCachePaths = builder._wtCachePaths; + _zkCachePaths = builder._zkCachePaths; + + RealmAwareZkClient zkClient; + switch (builder._realmMode) { + case MULTI_REALM: + try { + if (builder._zkClientType == ZkBaseDataAccessor.ZkClientType.DEDICATED) { + // Use a realm-aware dedicated zk client + zkClient = DedicatedZkClientFactory.getInstance() + .buildZkClient(builder._realmAwareZkConnectionConfig, + builder._realmAwareZkClientConfig); + } else if (builder._zkClientType == ZkBaseDataAccessor.ZkClientType.SHARED) { + // Use a realm-aware shared zk client + zkClient = SharedZkClientFactory.getInstance() + .buildZkClient(builder._realmAwareZkConnectionConfig, + builder._realmAwareZkClientConfig); + } else { + // Use a federated zk client + zkClient = new FederatedZkClient(builder._realmAwareZkConnectionConfig, + builder._realmAwareZkClientConfig); + } + break; // Must break out of the switch statement here since zkClient has been created + } catch (IOException | InvalidRoutingDataException | IllegalStateException e) { + // Note: IllegalStateException is for HttpRoutingDataReader if MSDS endpoint cannot be + // found + throw new HelixException("Failed to create ZkCacheBaseDataAccessor!", e); + } + case SINGLE_REALM: + switch (builder._zkClientType) { + case DEDICATED: + // If DEDICATED, then we use a dedicated HelixZkClient because we must support ephemeral + // operations + zkClient = DedicatedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(builder._zkAddress), + builder._realmAwareZkClientConfig.createHelixZkClientConfig()); + break; + case SHARED: + default: + zkClient = SharedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(builder._zkAddress), + builder._realmAwareZkClientConfig.createHelixZkClientConfig()); + zkClient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, + TimeUnit.MILLISECONDS); + break; + } + default: + throw new HelixException("Invalid RealmMode given: " + builder._realmMode); + } + + _zkClient = zkClient; + _baseAccessor = new ZkBaseDataAccessor<>(_zkClient); start(); } @@ -842,4 +918,122 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> { _zkClient.close(); } } + + public static class Builder { + private String _zkAddress; + private RealmAwareZkClient.RealmMode _realmMode; + private RealmAwareZkClient.RealmAwareZkConnectionConfig _realmAwareZkConnectionConfig; + private RealmAwareZkClient.RealmAwareZkClientConfig _realmAwareZkClientConfig; + + /** ZkCacheBaseDataAccessor-specific parameters */ + private String _chrootPath; + private List<String> _wtCachePaths; + private List<String> _zkCachePaths; + private ZkBaseDataAccessor.ZkClientType _zkClientType; + + public Builder() { + } + + public Builder setZkAddress(String zkAddress) { + _zkAddress = zkAddress; + return this; + } + + public Builder setRealmMode(RealmAwareZkClient.RealmMode realmMode) { + _realmMode = realmMode; + return this; + } + + public Builder setRealmAwareZkConnectionConfig( + RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig) { + _realmAwareZkConnectionConfig = realmAwareZkConnectionConfig; + return this; + } + + public Builder setRealmAwareZkClientConfig( + RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig) { + _realmAwareZkClientConfig = realmAwareZkClientConfig; + return this; + } + + public Builder setChrootPath(String chrootPath) { + _chrootPath = chrootPath; + return this; + } + + public Builder setWtCachePaths(List<String> wtCachePaths) { + _wtCachePaths = wtCachePaths; + return this; + } + + public Builder setZkCachePaths(List<String> zkCachePaths) { + _zkCachePaths = zkCachePaths; + return this; + } + + /** + * Sets the ZkClientType. If this is set, ZkCacheBaseDataAccessor will be created on + * single-realm mode. + * @param zkClientType + * @return + */ + public Builder setZkClientType(ZkBaseDataAccessor.ZkClientType zkClientType) { + _zkClientType = zkClientType; + return this; + } + + public ZkCacheBaseDataAccessor build() { + validate(); + return new ZkCacheBaseDataAccessor(this); + } + + private void validate() { + // Resolve RealmMode based on other parameters + boolean isZkAddressSet = _zkAddress != null && !_zkAddress.isEmpty(); + boolean isZkClientTypeSet = _zkClientType != null; + + // If ZkClientType is set, RealmMode must either be single-realm or not set. + if (isZkClientTypeSet && _realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM) { + throw new HelixException( + "ZkCacheBaseDataAccessor: you cannot set ZkClientType on multi-realm mode!"); + } + // If ZkClientType is not set, default to SHARED + if (!isZkClientTypeSet) { + _zkClientType = ZkBaseDataAccessor.ZkClientType.SHARED; + } + + if (_realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM && !isZkAddressSet) { + throw new HelixException( + "ZkCacheBaseDataAccessor: RealmMode cannot be single-realm without a valid ZkAddress set!"); + } + + if (_realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM && isZkAddressSet) { + throw new HelixException( + "ZkCacheBaseDataAccessor: You cannot set the ZkAddress on multi-realm mode!"); + } + + if (_realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM + && _zkClientType == ZkBaseDataAccessor.ZkClientType.FEDERATED) { + throw new HelixException( + "ZkCacheBaseDataAccessor: You cannot use FederatedZkClient on single-realm mode!"); + } + + if (_realmMode == null) { + _realmMode = isZkAddressSet ? RealmAwareZkClient.RealmMode.SINGLE_REALM + : RealmAwareZkClient.RealmMode.MULTI_REALM; + } + + // Resolve RealmAwareZkClientConfig + if (_realmAwareZkClientConfig == null) { + _realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig(); + } + + // Resolve RealmAwareZkConnectionConfig + if (_realmAwareZkConnectionConfig == null) { + // If not set, create a default one + _realmAwareZkConnectionConfig = + new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(); + } + } + } }
