This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch zooscalability in repository https://gitbox.apache.org/repos/asf/helix.git
commit 1085e7d06e281c1b9e5c23f330494a580b7effe5 Author: Huizhi Lu <[email protected]> AuthorDate: Wed Mar 11 23:38:52 2020 -0700 Make ZkBaseDataAccessor realm-aware (#855) This commit makes ZkBaseDataAccessor realm-aware by building according realm-aware ZkClients in the constructor. A Builder is provided to set realm-aware client config and connection config. --- .../helix/manager/zk/ZkBaseDataAccessor.java | 261 ++++++++++++++++++--- 1 file changed, 233 insertions(+), 28 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java index f287c22..1d60c7b 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java @@ -19,6 +19,7 @@ package org.apache.helix.manager.zk; * under the License. */ +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -26,16 +27,20 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.helix.AccessOption; import org.apache.helix.BaseDataAccessor; import org.apache.helix.HelixException; +import org.apache.helix.SystemPropertyKeys; import org.apache.helix.api.exceptions.HelixMetaDataAccessException; +import org.apache.helix.msdcommon.exception.InvalidRoutingDataException; import org.apache.helix.store.zk.ZNode; import org.apache.helix.util.HelixUtil; import org.apache.helix.zookeeper.api.client.HelixZkClient; import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.impl.client.FederatedZkClient; import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory; import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory; import org.apache.helix.zookeeper.zkclient.DataUpdater; @@ -60,21 +65,23 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { // created on SHARED mode. // TODO: move this to RealmAwareZkClient public enum ZkClientType { - /* + /** * When ZkBaseDataAccessor is created with the DEDICATED type, it supports ephemeral node * creation, callback functionality, and session management. But note that this is more * resource-heavy since it creates a dedicated ZK connection so should be used sparingly only * when the aforementioned features are needed. */ DEDICATED, - /* + + /** * When ZkBaseDataAccessor is created with the SHARED type, it only supports CRUD * functionalities. This will be the default mode of creation. */ SHARED, - /* + + /** * Uses FederatedZkClient (applicable on multi-realm mode only) that queries Metadata Store - * Directory Service for routing data + * Directory Service for routing data. */ FEDERATED } @@ -116,6 +123,13 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { // ZkClient private final boolean _usesExternalZkClient; + /** + * @deprecated it is recommended to use the builder constructor {@link Builder} + * instead to avoid having to manually create and maintain a RealmAwareZkClient + * outside of ZkBaseDataAccessor. + * + * @param zkClient A created RealmAwareZkClient + */ @Deprecated public ZkBaseDataAccessor(RealmAwareZkClient zkClient) { if (zkClient == null) { @@ -125,13 +139,63 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { _usesExternalZkClient = true; } + private ZkBaseDataAccessor(Builder builder) { + switch (builder.realmMode) { + case MULTI_REALM: + try { + if (builder.zkClientType == ZkClientType.DEDICATED) { + // Use a realm-aware dedicated zk client + _zkClient = DedicatedZkClientFactory.getInstance() + .buildZkClient(builder.realmAwareZkConnectionConfig, + builder.realmAwareZkClientConfig); + } else if (builder.zkClientType == ZkClientType.SHARED) { + // Use a realm-aware shared zk client + _zkClient = SharedZkClientFactory.getInstance() + .buildZkClient(builder.realmAwareZkConnectionConfig, + builder.realmAwareZkClientConfig); + } else { + _zkClient = new FederatedZkClient(builder.realmAwareZkConnectionConfig, + builder.realmAwareZkClientConfig); + } + } catch (IOException | InvalidRoutingDataException | IllegalStateException e) { + throw new HelixException("Not able to connect on multi-realm mode.", e); + } + break; + + case SINGLE_REALM: + // Create a HelixZkClient: Use a SharedZkClient because ZkBaseDataAccessor does not need to + // do ephemeral operations. + if (builder.zkClientType == ZkClientType.DEDICATED) { + // If DEDICATED, then we use a dedicated HelixZkClient because we must support ephemeral + // operations + _zkClient = DedicatedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(builder.zkAddress), + builder.realmAwareZkClientConfig.createHelixZkClientConfig()); + } else { + _zkClient = SharedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(builder.zkAddress), + builder.realmAwareZkClientConfig.createHelixZkClientConfig()); + _zkClient + .waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS); + } + break; + default: + throw new HelixException("Invalid RealmMode given: " + builder.realmMode); + } + + _usesExternalZkClient = false; + } + /** * The ZkBaseDataAccessor with custom serializer support of ZkSerializer type. * Note: This constructor will use a shared ZkConnection. * Do NOT use this for ephemeral node creation/callbacks/session management. * Do use this for simple CRUD operations to ZooKeeper. * @param zkAddress The zookeeper address + * + * @deprecated it is recommended to use the builder constructor {@link Builder} */ + @Deprecated public ZkBaseDataAccessor(String zkAddress, ZkSerializer zkSerializer) { this(zkAddress, zkSerializer, ZkClientType.SHARED); } @@ -142,7 +206,10 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { * Do NOT use this for ephemeral node creation/callbacks/session management. * Do use this for simple CRUD operations to ZooKeeper. * @param zkAddress The zookeeper address + * + * @deprecated it is recommended to use the builder constructor {@link Builder} */ + @Deprecated public ZkBaseDataAccessor(String zkAddress, PathBasedZkSerializer pathBasedZkSerializer) { this(zkAddress, pathBasedZkSerializer, ZkClientType.SHARED); } @@ -153,7 +220,10 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { * Does NOT support ephemeral node creation, callbacks, or session management. * Uses {@link ZNRecordSerializer} serializer * @param zkAddress The zookeeper address + * + * @deprecated it is recommended to use the builder constructor {@link Builder} */ + @Deprecated public ZkBaseDataAccessor(String zkAddress) { this(zkAddress, new ZNRecordSerializer()); } @@ -166,7 +236,10 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { * CRUD operations to ZooKeeper. * @param zkAddress * @param zkClientType + * + * @deprecated it is recommended to use the builder constructor {@link Builder} */ + @Deprecated public ZkBaseDataAccessor(String zkAddress, ZkClientType zkClientType) { this(zkAddress, new ZNRecordSerializer(), zkClientType); } @@ -179,21 +252,16 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { * CRUD operations to ZooKeeper. * @param zkAddress * @param zkSerializer + * + * @deprecated it is recommended to use the builder constructor {@link Builder} */ + @Deprecated public ZkBaseDataAccessor(String zkAddress, ZkSerializer zkSerializer, ZkClientType zkClientType) { - switch (zkClientType) { - case DEDICATED: - _zkClient = DedicatedZkClientFactory.getInstance().buildZkClient( - new HelixZkClient.ZkConnectionConfig(zkAddress), - new HelixZkClient.ZkClientConfig().setZkSerializer(zkSerializer)); - break; - case SHARED: - default: - _zkClient = SharedZkClientFactory.getInstance().buildZkClient( - new HelixZkClient.ZkConnectionConfig(zkAddress), - new HelixZkClient.ZkClientConfig().setZkSerializer(zkSerializer)); - } + RealmAwareZkClient.RealmAwareZkClientConfig clientConfig = + new RealmAwareZkClient.RealmAwareZkClientConfig().setZkSerializer(zkSerializer); + + _zkClient = buildRealmAwareZkClient(clientConfig, zkAddress, zkClientType); _usesExternalZkClient = false; } @@ -206,21 +274,16 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { * @param zkAddress * @param pathBasedZkSerializer * @param zkClientType + * + * @deprecated it is recommended to use the builder constructor {@link Builder} */ + @Deprecated public ZkBaseDataAccessor(String zkAddress, PathBasedZkSerializer pathBasedZkSerializer, ZkClientType zkClientType) { - switch (zkClientType) { - case DEDICATED: - _zkClient = DedicatedZkClientFactory.getInstance().buildZkClient( - new HelixZkClient.ZkConnectionConfig(zkAddress), - new HelixZkClient.ZkClientConfig().setZkSerializer(pathBasedZkSerializer)); - break; - case SHARED: - default: - _zkClient = SharedZkClientFactory.getInstance().buildZkClient( - new HelixZkClient.ZkConnectionConfig(zkAddress), - new HelixZkClient.ZkClientConfig().setZkSerializer(pathBasedZkSerializer)); - } + RealmAwareZkClient.RealmAwareZkClientConfig clientConfig = + new RealmAwareZkClient.RealmAwareZkClientConfig().setZkSerializer(pathBasedZkSerializer); + + _zkClient = buildRealmAwareZkClient(clientConfig, zkAddress, zkClientType); _usesExternalZkClient = false; } @@ -1256,4 +1319,146 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { _zkClient.close(); } } + + // TODO: refactor Builder class to remove duplicate code with other Helix Java APIs + public static class Builder { + private String zkAddress; + private RealmAwareZkClient.RealmMode realmMode; + private ZkBaseDataAccessor.ZkClientType zkClientType; + private RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig; + private RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig; + + public Builder() { + } + + public ZkBaseDataAccessor.Builder setZkAddress(String zkAddress) { + this.zkAddress = zkAddress; + return this; + } + + public ZkBaseDataAccessor.Builder setRealmMode(RealmAwareZkClient.RealmMode realmMode) { + this.realmMode = realmMode; + return this; + } + + public ZkBaseDataAccessor.Builder setZkClientType( + ZkBaseDataAccessor.ZkClientType zkClientType) { + this.zkClientType = zkClientType; + return this; + } + + public ZkBaseDataAccessor.Builder setRealmAwareZkConnectionConfig( + RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig) { + this.realmAwareZkConnectionConfig = realmAwareZkConnectionConfig; + return this; + } + + public ZkBaseDataAccessor.Builder setRealmAwareZkClientConfig( + RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig) { + this.realmAwareZkClientConfig = realmAwareZkClientConfig; + return this; + } + + /** + * Returns a <code>ZkBaseDataAccessor</code> instance. + * <p> + * Note: in multi-realm mode, if and only if ZK client type is set to <code>FEDERATED</code>, + * <code>ZkBaseDataAccessor</code> can access to multi-realm. Otherwise, it can only access to + * single-ream. + */ + public ZkBaseDataAccessor<?> build() { + validate(); + return new ZkBaseDataAccessor<>(this); + } + + /* + * Validates the given parameters before building an instance of ZkBaseDataAccessor. + */ + private void validate() { + // Resolve RealmMode based on other parameters + boolean isZkAddressSet = zkAddress != null && !zkAddress.isEmpty(); + boolean isZkClientTypeSet = zkClientType != null; + + // If ZkClientType is set, RealmMode must either be single-realm or not set. + if (isZkClientTypeSet && realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM) { + throw new HelixException( + "ZkClientType cannot be set on multi-realm mode!"); + } + // If ZkClientType is not set, default to SHARED + if (!isZkClientTypeSet) { + zkClientType = ZkBaseDataAccessor.ZkClientType.SHARED; + } + + if (realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM && !isZkAddressSet) { + throw new HelixException( + "RealmMode cannot be single-realm without a valid ZkAddress set!"); + } + + if (realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM && isZkAddressSet) { + throw new HelixException( + "ZkAddress cannot be set on multi-realm mode!"); + } + + if (realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM + && zkClientType == ZkClientType.FEDERATED) { + throw new HelixException( + "FederatedZkClient cannot be set on single-realm mode!"); + } + + if (realmMode == null) { + realmMode = isZkAddressSet ? RealmAwareZkClient.RealmMode.SINGLE_REALM + : RealmAwareZkClient.RealmMode.MULTI_REALM; + } + + // Resolve RealmAwareZkClientConfig + if (realmAwareZkClientConfig == null) { + realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig(); + } + + // Resolve RealmAwareZkConnectionConfig + if (realmAwareZkConnectionConfig == null) { + // If not set, create a default one + realmAwareZkConnectionConfig = + new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(); + } + } + } + + /* + * This is used for constructors that do not take a Builder in as a parameter because of + * keeping backward-compatibility. + */ + private RealmAwareZkClient buildRealmAwareZkClient( + RealmAwareZkClient.RealmAwareZkClientConfig clientConfig, String zkAddress, + ZkClientType zkClientType) { + if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) { + try { + return new FederatedZkClient( + new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(), clientConfig); + } catch (IllegalStateException | IOException | InvalidRoutingDataException e) { + throw new HelixException("Not able to connect on multi-realm mode.", e); + } + } + + RealmAwareZkClient zkClient; + + switch (zkClientType) { + case DEDICATED: + zkClient = DedicatedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), + clientConfig.createHelixZkClientConfig()); + break; + case SHARED: + default: + zkClient = SharedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), + clientConfig.createHelixZkClientConfig()); + + zkClient + .waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS); + break; + } + + return zkClient; + } }
