This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch zooscalability in repository https://gitbox.apache.org/repos/asf/helix.git
commit 57ce75ca58d96fac87d1bbcb6329e72325d08f36 Author: Huizhi Lu <[email protected]> AuthorDate: Thu Mar 12 09:44:38 2020 -0700 Make ZKHelixAdmin and ZKHelixManager Realm-aware (#846) To make Helix Java APIs realm-aware, we need to make both ZKHelixAdmin and ZKHelixManager realm-aware. This commit adds a Builder to set client config and connection config for building realm-aware ZkClients underneath. --- .../apache/helix/manager/zk/CallbackHandler.java | 7 +- .../helix/manager/zk/ParticipantManager.java | 5 +- .../org/apache/helix/manager/zk/ZKHelixAdmin.java | 202 +++++++++++++++++---- .../apache/helix/manager/zk/ZKHelixManager.java | 102 ++++++++--- .../test/java/org/apache/helix/ZkTestHelper.java | 21 ++- .../integration/TestResourceGroupEndtoEnd.java | 3 +- .../controller/TestControllerLeadershipChange.java | 4 +- .../manager/ClusterControllerManager.java | 3 +- .../manager/ClusterDistributedController.java | 3 +- .../manager/MockParticipantManager.java | 3 +- .../helix/integration/manager/ZkTestManager.java | 5 +- .../helix/manager/zk/TestHandleNewSession.java | 5 +- .../impl/factory/HelixZkClientFactory.java | 2 +- 13 files changed, 281 insertions(+), 84 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java index 82c9b29..bbb8788 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java @@ -64,6 +64,7 @@ import org.apache.helix.model.Message; import org.apache.helix.model.ResourceConfig; import org.apache.helix.monitoring.mbeans.HelixCallbackMonitor; import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.zkclient.IZkChildListener; import org.apache.helix.zookeeper.zkclient.IZkDataListener; @@ -107,7 +108,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { private final Set<EventType> _eventTypes; private final HelixDataAccessor _accessor; private final ChangeType _changeType; - private final HelixZkClient _zkClient; + private final RealmAwareZkClient _zkClient; private final AtomicLong _lastNotificationTimeStamp; private final HelixManager _manager; private final PropertyKey _propertyKey; @@ -191,12 +192,12 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { */ private List<NotificationContext.Type> _expectTypes = nextNotificationType.get(Type.FINALIZE); - public CallbackHandler(HelixManager manager, HelixZkClient client, PropertyKey propertyKey, + public CallbackHandler(HelixManager manager, RealmAwareZkClient client, PropertyKey propertyKey, Object listener, EventType[] eventTypes, ChangeType changeType) { this(manager, client, propertyKey, listener, eventTypes, changeType, null); } - public CallbackHandler(HelixManager manager, HelixZkClient client, PropertyKey propertyKey, + public CallbackHandler(HelixManager manager, RealmAwareZkClient client, PropertyKey propertyKey, Object listener, EventType[] eventTypes, ChangeType changeType, HelixCallbackMonitor monitor) { if (listener == null) { diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java index 411d937..5090a86 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java @@ -47,6 +47,7 @@ import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory; import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.datamodel.ZNRecordBucketizer; import org.apache.helix.zookeeper.zkclient.DataUpdater; @@ -62,7 +63,7 @@ import org.slf4j.LoggerFactory; public class ParticipantManager { private static Logger LOG = LoggerFactory.getLogger(ParticipantManager.class); - final HelixZkClient _zkclient; + final RealmAwareZkClient _zkclient; final HelixManager _manager; final PropertyKey.Builder _keyBuilder; final String _clusterName; @@ -81,7 +82,7 @@ public class ParticipantManager { // session race condition when handling new session for the participant. private final String _sessionId; - public ParticipantManager(HelixManager manager, HelixZkClient zkclient, int sessionTimeout, + public ParticipantManager(HelixManager manager, RealmAwareZkClient zkclient, int sessionTimeout, LiveInstanceInfoProvider liveInstanceInfoProvider, List<PreConnectCallback> preConnectCallbacks, final String sessionId) { _zkclient = zkclient; diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index d7c40ff..ea9b55a 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -47,9 +47,9 @@ import org.apache.helix.HelixDefinedState; import org.apache.helix.HelixException; import org.apache.helix.InstanceType; import org.apache.helix.PropertyKey; -import org.apache.helix.PropertyKey.Builder; import org.apache.helix.PropertyPathBuilder; import org.apache.helix.PropertyType; +import org.apache.helix.SystemPropertyKeys; import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer; import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; @@ -74,12 +74,14 @@ import org.apache.helix.model.Message.MessageType; import org.apache.helix.model.PauseSignal; import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.msdcommon.exception.InvalidRoutingDataException; import org.apache.helix.tools.DefaultIdealStateCalculator; import org.apache.helix.util.HelixUtil; import org.apache.helix.util.RebalanceUtil; import org.apache.helix.zookeeper.api.client.HelixZkClient; import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.impl.client.FederatedZkClient; import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory; import org.apache.helix.zookeeper.zkclient.DataUpdater; import org.apache.helix.zookeeper.zkclient.exception.ZkException; @@ -90,18 +92,27 @@ import org.slf4j.LoggerFactory; public class ZKHelixAdmin implements HelixAdmin { + private static final Logger LOG = LoggerFactory.getLogger(ZKHelixAdmin.class); + public static final String CONNECTION_TIMEOUT = "helixAdmin.timeOutInSec"; private static final String MAINTENANCE_ZNODE_ID = "maintenance"; private static final int DEFAULT_SUPERCLUSTER_REPLICA = 3; private final RealmAwareZkClient _zkClient; private final ConfigAccessor _configAccessor; - // true if ZKHelixAdmin was instantiated with a HelixZkClient, false otherwise + // true if ZKHelixAdmin was instantiated with a RealmAwareZkClient, false otherwise // This is used for close() to determine how ZKHelixAdmin should close the underlying ZkClient private final boolean _usesExternalZkClient; private static Logger logger = LoggerFactory.getLogger(ZKHelixAdmin.class); + /** + * @deprecated it is recommended to use the builder constructor {@link Builder} + * instead to avoid having to manually create and maintain a RealmAwareZkClient + * outside of ZKHelixAdmin. + * + * @param zkClient A created RealmAwareZkClient + */ @Deprecated public ZKHelixAdmin(RealmAwareZkClient zkClient) { _zkClient = zkClient; @@ -109,14 +120,69 @@ public class ZKHelixAdmin implements HelixAdmin { _usesExternalZkClient = true; } + /** + * There are 2 realm-aware modes to connect to ZK: + * 1. if system property {@link SystemPropertyKeys#MULTI_ZK_ENABLED} is set to <code>"true"</code>, + * it will connect on multi-realm mode; + * 2. otherwise, it will connect on single-realm mode to the <code>zkAddress</code> provided. + * + * @param zkAddress ZK address + * @exception HelixException if not able to connect on multi-realm mode + * + * @deprecated it is recommended to use the builder constructor {@link Builder} + */ + @Deprecated public ZKHelixAdmin(String zkAddress) { int timeOutInSec = Integer.parseInt(System.getProperty(CONNECTION_TIMEOUT, "30")); - HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig(); - clientConfig.setZkSerializer(new ZNRecordSerializer()) - .setConnectInitTimeout(timeOutInSec * 1000); - _zkClient = SharedZkClientFactory.getInstance() - .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), clientConfig); - _zkClient.waitUntilConnected(timeOutInSec, TimeUnit.SECONDS); + RealmAwareZkClient.RealmAwareZkClientConfig clientConfig = + new RealmAwareZkClient.RealmAwareZkClientConfig() + .setConnectInitTimeout(timeOutInSec * 1000L) + .setZkSerializer(new ZNRecordSerializer()); + + RealmAwareZkClient zkClient; + + if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) { + try { + zkClient = new FederatedZkClient( + new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(), clientConfig); + } catch (IllegalStateException | IOException | InvalidRoutingDataException e) { + throw new HelixException("Not able to connect on multi-realm mode.", e); + } + } else { + zkClient = SharedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), + clientConfig.createHelixZkClientConfig()); + zkClient.waitUntilConnected(timeOutInSec, TimeUnit.SECONDS); + } + + _zkClient = zkClient; + _configAccessor = new ConfigAccessor(_zkClient); + _usesExternalZkClient = false; + } + + private ZKHelixAdmin(Builder builder) { + RealmAwareZkClient zkClient; + switch (builder.realmMode) { + case MULTI_REALM: + try { + zkClient = new FederatedZkClient(builder.realmAwareZkConnectionConfig, + builder.realmAwareZkClientConfig); + } catch (IOException | InvalidRoutingDataException | IllegalStateException e) { + throw new HelixException("Not able to connect on multi-realm mode.", e); + } + break; + case SINGLE_REALM: + // Create a HelixZkClient: Use a SharedZkClient because ZKHelixAdmin does not need to do + // ephemeral operations + zkClient = SharedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(builder.zkAddress), + builder.realmAwareZkClientConfig.createHelixZkClientConfig()); + break; + default: + throw new HelixException("Invalid RealmMode given: " + builder.realmMode); + } + + _zkClient = zkClient; _configAccessor = new ConfigAccessor(_zkClient); _usesExternalZkClient = false; } @@ -206,7 +272,7 @@ public class ZKHelixAdmin implements HelixAdmin { HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); - Builder keyBuilder = accessor.keyBuilder(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); return accessor.getProperty(keyBuilder.instanceConfig(instanceName)); } @@ -382,7 +448,7 @@ public class ZKHelixAdmin implements HelixAdmin { reason == null ? "NULL" : reason); HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); - Builder keyBuilder = accessor.keyBuilder(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); if (enabled) { accessor.removeProperty(keyBuilder.pause()); @@ -407,7 +473,7 @@ public class ZKHelixAdmin implements HelixAdmin { public boolean isInMaintenanceMode(String clusterName) { HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); - Builder keyBuilder = accessor.keyBuilder(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); return accessor.getBaseDataAccessor() .exists(keyBuilder.maintenance().getPath(), AccessOption.PERSISTENT); } @@ -448,7 +514,7 @@ public class ZKHelixAdmin implements HelixAdmin { final MaintenanceSignal.TriggeringEntity triggeringEntity) { HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); - Builder keyBuilder = accessor.keyBuilder(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); logger.info("Cluster {} {} {} maintenance mode for reason {}.", clusterName, triggeringEntity == MaintenanceSignal.TriggeringEntity.CONTROLLER ? "automatically" : "manually", enabled ? "enters" : "exits", reason == null ? "NULL" : reason); @@ -516,7 +582,7 @@ public class ZKHelixAdmin implements HelixAdmin { instanceName, clusterName); HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); - Builder keyBuilder = accessor.keyBuilder(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); // check the instance is alive LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName)); @@ -634,7 +700,7 @@ public class ZKHelixAdmin implements HelixAdmin { instanceNames == null ? "NULL" : HelixUtil.serializeByComma(instanceNames), clusterName); HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); - Builder keyBuilder = accessor.keyBuilder(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews()); Set<String> resetInstanceNames = new HashSet<String>(instanceNames); @@ -662,7 +728,7 @@ public class ZKHelixAdmin implements HelixAdmin { resourceNames == null ? "NULL" : HelixUtil.serializeByComma(resourceNames), clusterName); HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); - Builder keyBuilder = accessor.keyBuilder(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews()); Set<String> resetResourceNames = new HashSet<String>(resourceNames); @@ -790,7 +856,7 @@ public class ZKHelixAdmin implements HelixAdmin { HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); - Builder keyBuilder = accessor.keyBuilder(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); for (String instanceName : instances) { InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName)); @@ -909,7 +975,7 @@ public class ZKHelixAdmin implements HelixAdmin { HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); - Builder keyBuilder = accessor.keyBuilder(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); for (String resourceName : getResourcesInCluster(clusterName)) { IdealState is = accessor.getProperty(keyBuilder.idealStates(resourceName)); @@ -925,7 +991,7 @@ public class ZKHelixAdmin implements HelixAdmin { public IdealState getResourceIdealState(String clusterName, String resourceName) { HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); - Builder keyBuilder = accessor.keyBuilder(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); return accessor.getProperty(keyBuilder.idealStates(resourceName)); } @@ -938,7 +1004,7 @@ public class ZKHelixAdmin implements HelixAdmin { clusterName, idealState == null ? "NULL" : idealState.toString()); HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); - Builder keyBuilder = accessor.keyBuilder(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); accessor.setProperty(keyBuilder.idealStates(resourceName), idealState); } @@ -981,7 +1047,7 @@ public class ZKHelixAdmin implements HelixAdmin { public ExternalView getResourceExternalView(String clusterName, String resourceName) { HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); - Builder keyBuilder = accessor.keyBuilder(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); return accessor.getProperty(keyBuilder.externalView(resourceName)); } @@ -1015,7 +1081,7 @@ public class ZKHelixAdmin implements HelixAdmin { HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); - Builder keyBuilder = accessor.keyBuilder(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); accessor.setProperty(keyBuilder.stateModelDef(stateModelDef), stateModel); } @@ -1024,7 +1090,7 @@ public class ZKHelixAdmin implements HelixAdmin { logger.info("Drop resource {} from cluster {}", resourceName, clusterName); HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); - Builder keyBuilder = accessor.keyBuilder(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); accessor.removeProperty(keyBuilder.idealStates(resourceName)); accessor.removeProperty(keyBuilder.resourceConfig(resourceName)); @@ -1039,7 +1105,7 @@ public class ZKHelixAdmin implements HelixAdmin { public StateModelDefinition getStateModelDef(String clusterName, String stateModelName) { HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); - Builder keyBuilder = accessor.keyBuilder(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); return accessor.getProperty(keyBuilder.stateModelDef(stateModelName)); } @@ -1049,7 +1115,7 @@ public class ZKHelixAdmin implements HelixAdmin { logger.info("Deleting cluster {}.", clusterName); HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); - Builder keyBuilder = accessor.keyBuilder(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); String root = "/" + clusterName; if (accessor.getChildNames(keyBuilder.liveInstances()).size() > 0) { @@ -1093,7 +1159,7 @@ public class ZKHelixAdmin implements HelixAdmin { ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(grandCluster, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); - Builder keyBuilder = accessor.keyBuilder(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); accessor.setProperty(keyBuilder.idealStates(idealState.getResourceName()), idealState); } @@ -1288,7 +1354,7 @@ public class ZKHelixAdmin implements HelixAdmin { constraintId, clusterName); BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient); - Builder keyBuilder = new Builder(clusterName); + PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName); String path = keyBuilder.constraint(constraintType.toString()).getPath(); baseAccessor.update(path, new DataUpdater<ZNRecord>() { @@ -1311,7 +1377,7 @@ public class ZKHelixAdmin implements HelixAdmin { constraintId, clusterName); BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient); - Builder keyBuilder = new Builder(clusterName); + PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName); String path = keyBuilder.constraint(constraintType.toString()).getPath(); baseAccessor.update(path, new DataUpdater<ZNRecord>() { @@ -1333,7 +1399,7 @@ public class ZKHelixAdmin implements HelixAdmin { HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); - Builder keyBuilder = new Builder(clusterName); + PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName); return accessor.getProperty(keyBuilder.constraint(constraintType.toString())); } @@ -1415,7 +1481,7 @@ public class ZKHelixAdmin implements HelixAdmin { } HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); - Builder keyBuilder = accessor.keyBuilder(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName)); config.addTag(tag); @@ -1436,7 +1502,7 @@ public class ZKHelixAdmin implements HelixAdmin { } ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); - Builder keyBuilder = accessor.keyBuilder(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName)); config.removeTag(tag); @@ -1457,7 +1523,7 @@ public class ZKHelixAdmin implements HelixAdmin { } HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); - Builder keyBuilder = accessor.keyBuilder(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName)); config.setZoneId(zoneId); @@ -1649,7 +1715,7 @@ public class ZKHelixAdmin implements HelixAdmin { HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_zkClient)); - Builder keyBuilder = accessor.keyBuilder(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); List<IdealState> idealStates = accessor.getChildValues(keyBuilder.idealStates()); List<String> nullIdealStates = new ArrayList<>(); for (int i = 0; i < idealStates.size(); i++) { @@ -1690,7 +1756,7 @@ public class ZKHelixAdmin implements HelixAdmin { // Ensure that all instances are valid HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_zkClient)); - Builder keyBuilder = accessor.keyBuilder(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); List<String> instances = accessor.getChildNames(keyBuilder.instanceConfigs()); if (validateInstancesForWagedRebalance(clusterName, instances).containsValue(false)) { throw new HelixException(String @@ -1800,4 +1866,74 @@ public class ZKHelixAdmin implements HelixAdmin { clusterConfig)); return true; } + + // TODO: refactor builder to reduce duplicate code with other Helix Java APIs + public static class Builder { + private String zkAddress; + private RealmAwareZkClient.RealmMode realmMode; + private RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig; + private RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig; + + public Builder() { + } + + public ZKHelixAdmin.Builder setZkAddress(String zkAddress) { + this.zkAddress = zkAddress; + return this; + } + + public ZKHelixAdmin.Builder setRealmMode(RealmAwareZkClient.RealmMode realmMode) { + this.realmMode = realmMode; + return this; + } + + public ZKHelixAdmin.Builder setRealmAwareZkConnectionConfig( + RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig) { + realmAwareZkConnectionConfig = realmAwareZkConnectionConfig; + return this; + } + + public ZKHelixAdmin.Builder setRealmAwareZkClientConfig( + RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig) { + realmAwareZkClientConfig = realmAwareZkClientConfig; + return this; + } + + public ZKHelixAdmin build() { + validate(); + return new ZKHelixAdmin(this); + } + + /* + * Validates the given parameters before creating an instance of ZKHelixAdmin. + */ + private void validate() { + // Resolve RealmMode based on other parameters + boolean isZkAddressSet = zkAddress != null && !zkAddress.isEmpty(); + if (realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM && !isZkAddressSet) { + throw new HelixException( + "RealmMode cannot be single-realm without a valid ZkAddress set!"); + } + if (realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM && isZkAddressSet) { + throw new HelixException( + "ZkAddress cannot be set on multi-realm mode!"); + } + if (realmMode == null) { + realmMode = isZkAddressSet ? RealmAwareZkClient.RealmMode.SINGLE_REALM + : RealmAwareZkClient.RealmMode.MULTI_REALM; + } + + // Resolve RealmAwareZkClientConfig + if (realmAwareZkClientConfig == null) { + realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig(); + } + + // Resolve RealmAwareZkConnectionConfig + if (realmAwareZkConnectionConfig == null) { + // If not set, create a default one + realmAwareZkConnectionConfig = + new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(); + } + } + } } diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java index b452b4c..f7180b2 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java @@ -19,6 +19,7 @@ package org.apache.helix.manager.zk; * under the License. */ +import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -73,14 +74,17 @@ import org.apache.helix.model.LiveInstance; import org.apache.helix.monitoring.ZKPathDataDumpTask; import org.apache.helix.monitoring.mbeans.HelixCallbackMonitor; import org.apache.helix.monitoring.mbeans.MonitorLevel; +import org.apache.helix.msdcommon.exception.InvalidRoutingDataException; import org.apache.helix.participant.HelixStateMachineEngine; import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.store.zk.AutoFallbackPropertyStore; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.util.HelixUtil; import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory; +import org.apache.helix.zookeeper.impl.factory.HelixZkClientFactory; import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory; import org.apache.helix.zookeeper.zkclient.IZkStateListener; import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException; @@ -117,7 +121,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { private final String _version; private int _reportLatency; - protected HelixZkClient _zkclient = null; + protected RealmAwareZkClient _zkclient; private final DefaultMessagingService _messagingService; private Map<ChangeType, HelixCallbackMonitor> _callbackMonitors; @@ -652,30 +656,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { } void createClient() throws Exception { - PathBasedZkSerializer zkSerializer = - ChainedPathZkSerializer.builder(new ZNRecordSerializer()).build(); - - HelixZkClient.ZkConnectionConfig connectionConfig = new HelixZkClient.ZkConnectionConfig(_zkAddress); - connectionConfig.setSessionTimeout(_sessionTimeout); - HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig(); - clientConfig - .setZkSerializer(zkSerializer) - .setConnectInitTimeout(_connectionInitTimeout) - .setMonitorType(_instanceType.name()) - .setMonitorKey(_clusterName) - .setMonitorInstanceName(_instanceName) - .setMonitorRootPathOnly(isMonitorRootPathOnly()); - - HelixZkClient newClient; - switch (_instanceType) { - case ADMINISTRATOR: - newClient = SharedZkClientFactory.getInstance().buildZkClient(connectionConfig, clientConfig); - break; - default: - newClient = DedicatedZkClientFactory - .getInstance().buildZkClient(connectionConfig, clientConfig); - break; - } + final RealmAwareZkClient newClient = createSingleRealmZkClient(); synchronized (this) { if (_zkclient != null) { @@ -1285,4 +1266,75 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { public Long getSessionStartTime() { return _sessionStartTime; } + + /* + * Prepares connection config and client config based on the internal parameters given to + * HelixManager in order to create a ZkClient instance to use. Note that a shared ZkClient + * instance will be created if connecting as an ADMINISTRATOR to minimize the cost of creating + * ZkConnections. + */ + private RealmAwareZkClient createSingleRealmZkClient() { + final String shardingKey = buildShardingKey(); + PathBasedZkSerializer zkSerializer = + ChainedPathZkSerializer.builder(new ZNRecordSerializer()).build(); + + RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig = + new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder() + .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM) + .setZkRealmShardingKey(shardingKey) + .setSessionTimeout(_sessionTimeout).build(); + + RealmAwareZkClient.RealmAwareZkClientConfig clientConfig = + new RealmAwareZkClient.RealmAwareZkClientConfig(); + + clientConfig.setZkSerializer(zkSerializer) + .setConnectInitTimeout(_connectionInitTimeout) + .setMonitorType(_instanceType.name()) + .setMonitorKey(_clusterName) + .setMonitorInstanceName(_instanceName) + .setMonitorRootPathOnly(isMonitorRootPathOnly()); + + if (_instanceType == InstanceType.ADMINISTRATOR) { + return resolveZkClient(SharedZkClientFactory.getInstance(), connectionConfig, + clientConfig); + } + + return resolveZkClient(DedicatedZkClientFactory.getInstance(), connectionConfig, + clientConfig); + } + + /* + * Resolves what type of ZkClient this HelixManager should use based on whether MULTI_ZK_ENABLED + * System config is set or not. Two types of ZkClients are available: + * 1) If MULTI_ZK_ENABLED is set to true, we create a dedicated RealmAwareZkClient + * that provides full ZkClient functionalities and connects to the correct ZK by querying + * MetadataStoreDirectoryService. + * 2) Otherwise, we create a dedicated HelixZkClient which plainly connects to + * the ZK address given. + */ + private RealmAwareZkClient resolveZkClient(HelixZkClientFactory zkClientFactory, + RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig, + RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) { + if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) { + try { + // Create realm-aware ZkClient. + return zkClientFactory.buildZkClient(connectionConfig, clientConfig); + } catch (IllegalArgumentException | IOException | InvalidRoutingDataException e) { + throw new HelixException("Not able to connect on realm-aware mode for sharding key: " + + connectionConfig.getZkRealmShardingKey(), e); + } + } + + // If multi-zk mode is not enabled, create HelixZkClient with the provided zk address. + HelixZkClient.ZkClientConfig helixZkClientConfig = clientConfig.createHelixZkClientConfig(); + HelixZkClient.ZkConnectionConfig helixZkConnectionConfig = + new HelixZkClient.ZkConnectionConfig(_zkAddress) + .setSessionTimeout(connectionConfig.getSessionTimeout()); + + return zkClientFactory.buildZkClient(helixZkConnectionConfig, helixZkClientConfig); + } + + private String buildShardingKey() { + return _clusterName.charAt(0) == '/' ? _clusterName : "/" + _clusterName; + } } diff --git a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java index c2b3d35..73dded4 100644 --- a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java +++ b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java @@ -43,6 +43,7 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.zookeeper.api.client.HelixZkClient; import org.apache.helix.model.ExternalView; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.apache.helix.zookeeper.zkclient.IZkChildListener; import org.apache.helix.zookeeper.zkclient.IZkDataListener; import org.apache.helix.zookeeper.zkclient.IZkStateListener; @@ -71,7 +72,7 @@ public class ZkTestHelper { /** * Simulate a zk state change by calling {@link ZkClient#process(WatchedEvent)} directly */ - public static void simulateZkStateReconnected(HelixZkClient client) { + public static void simulateZkStateReconnected(RealmAwareZkClient client) { ZkClient zkClient = (ZkClient) client; WatchedEvent event = new WatchedEvent(EventType.None, KeeperState.Disconnected, null); zkClient.process(event); @@ -84,7 +85,7 @@ public class ZkTestHelper { * @param client * @return */ - public static String getSessionId(HelixZkClient client) { + public static String getSessionId(RealmAwareZkClient client) { ZkConnection connection = (ZkConnection) ((ZkClient) client).getConnection(); ZooKeeper curZookeeper = connection.getZookeeper(); return Long.toHexString(curZookeeper.getSessionId()); @@ -146,7 +147,7 @@ public class ZkTestHelper { LOG.info("After expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId())); } - public static void expireSession(HelixZkClient client) throws Exception { + public static void expireSession(RealmAwareZkClient client) throws Exception { final CountDownLatch waitNewSession = new CountDownLatch(1); final ZkClient zkClient = (ZkClient) client; @@ -213,7 +214,7 @@ public class ZkTestHelper { * @param client * @throws Exception */ - public static void asyncExpireSession(HelixZkClient client) throws Exception { + public static void asyncExpireSession(RealmAwareZkClient client) throws Exception { final ZkClient zkClient = (ZkClient) client; ZkConnection connection = ((ZkConnection) zkClient.getConnection()); ZooKeeper curZookeeper = connection.getZookeeper(); @@ -245,7 +246,7 @@ public class ZkTestHelper { /* * stateMap: partition->instance->state */ - public static boolean verifyState(HelixZkClient zkclient, String clusterName, String resourceName, + public static boolean verifyState(RealmAwareZkClient zkclient, String clusterName, String resourceName, Map<String, Map<String, String>> expectStateMap, String op) { boolean result = true; ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(zkclient); @@ -391,7 +392,7 @@ public class ZkTestHelper { } } - public static Map<String, List<String>> getZkWatch(HelixZkClient client) throws Exception { + public static Map<String, List<String>> getZkWatch(RealmAwareZkClient client) throws Exception { Map<String, List<String>> lists = new HashMap<String, List<String>>(); ZkClient zkClient = (ZkClient) client; @@ -424,7 +425,7 @@ public class ZkTestHelper { return lists; } - public static Map<String, Set<IZkDataListener>> getZkDataListener(HelixZkClient client) + public static Map<String, Set<IZkDataListener>> getZkDataListener(RealmAwareZkClient client) throws Exception { java.lang.reflect.Field field = getField(client.getClass(), "_dataListener"); field.setAccessible(true); @@ -433,7 +434,7 @@ public class ZkTestHelper { return dataListener; } - public static Map<String, Set<IZkChildListener>> getZkChildListener(HelixZkClient client) + public static Map<String, Set<IZkChildListener>> getZkChildListener(RealmAwareZkClient client) throws Exception { java.lang.reflect.Field field = getField(client.getClass(), "_childListener"); field.setAccessible(true); @@ -442,7 +443,7 @@ public class ZkTestHelper { return childListener; } - public static boolean tryWaitZkEventsCleaned(HelixZkClient zkclient) throws Exception { + public static boolean tryWaitZkEventsCleaned(RealmAwareZkClient zkclient) throws Exception { java.lang.reflect.Field field = getField(zkclient.getClass(), "_eventThread"); field.setAccessible(true); Object eventThread = field.get(zkclient); @@ -468,7 +469,7 @@ public class ZkTestHelper { return false; } - public static void injectExpire(HelixZkClient client) + public static void injectExpire(RealmAwareZkClient client) throws ExecutionException, InterruptedException { final ZkClient zkClient = (ZkClient) client; Future future = _executor.submit(new Runnable() { diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java index 0aa5787..0313a77 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java @@ -47,6 +47,7 @@ import org.apache.helix.participant.statemachine.StateModel; import org.apache.helix.participant.statemachine.StateModelFactory; import org.apache.helix.spectator.RoutingTableProvider; import org.apache.helix.tools.ClusterStateVerifier; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -427,7 +428,7 @@ public class TestResourceGroupEndtoEnd extends ZkTestBase { } @Override - public HelixZkClient getZkClient() { + public RealmAwareZkClient getZkClient() { return _zkclient; } diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerLeadershipChange.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerLeadershipChange.java index 8a1ff03..5aa9a91 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerLeadershipChange.java +++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerLeadershipChange.java @@ -34,12 +34,12 @@ import org.apache.helix.common.ZkTestBase; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.manager.zk.CallbackHandler; -import org.apache.helix.zookeeper.api.client.HelixZkClient; import org.apache.helix.model.IdealState; import org.apache.helix.model.LiveInstance; import org.apache.helix.monitoring.mbeans.MonitorDomainNames; import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -148,7 +148,7 @@ public class TestControllerLeadershipChange extends ZkTestBase { callbackHandlers.forEach(callbackHandler -> Assert.assertTrue(callbackHandler.isReady())); // check the zk connection is open - HelixZkClient zkClient = controller.getZkClient(); + RealmAwareZkClient zkClient = controller.getZkClient(); Assert.assertFalse(zkClient.isClosed()); Long sessionId = zkClient.getSessionId(); Assert.assertNotNull(sessionId); diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java index 7d810fb..9281e2d 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java @@ -27,6 +27,7 @@ import org.apache.helix.InstanceType; import org.apache.helix.manager.zk.CallbackHandler; import org.apache.helix.manager.zk.ZKHelixManager; import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,7 +95,7 @@ public class ClusterControllerManager extends ZKHelixManager implements Runnable } @Override - public HelixZkClient getZkClient() { + public RealmAwareZkClient getZkClient() { return _zkclient; } diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java index 8e15928..397fae5 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java @@ -28,6 +28,7 @@ import org.apache.helix.manager.zk.ZKHelixManager; import org.apache.helix.zookeeper.api.client.HelixZkClient; import org.apache.helix.participant.DistClusterControllerStateModelFactory; import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,7 +84,7 @@ public class ClusterDistributedController extends ZKHelixManager implements Runn } @Override - public HelixZkClient getZkClient() { + public RealmAwareZkClient getZkClient() { return _zkclient; } diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java index 0036e0d..84bc334 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java @@ -33,6 +33,7 @@ import org.apache.helix.mock.participant.MockSchemataModelFactory; import org.apache.helix.mock.participant.MockTransition; import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,7 +129,7 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable, } @Override - public HelixZkClient getZkClient() { + public RealmAwareZkClient getZkClient() { return _zkclient; } diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java index 1a6903a..93e4b53 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java @@ -22,10 +22,11 @@ package org.apache.helix.integration.manager; import java.util.List; import org.apache.helix.manager.zk.CallbackHandler; -import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; + public interface ZkTestManager { - HelixZkClient getZkClient(); + RealmAwareZkClient getZkClient(); List<CallbackHandler> getHandlers(); diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java index 00bedcf..e4c6695 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java @@ -35,6 +35,7 @@ import org.apache.helix.common.ZkTestBase; import org.apache.helix.controller.GenericHelixController; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.apache.helix.zookeeper.impl.client.ZkClient; import org.apache.helix.model.LiveInstance; import org.testng.Assert; @@ -492,7 +493,7 @@ public class TestHandleNewSession extends ZkTestBase { return _handlers; } - HelixZkClient getZkClient() { + RealmAwareZkClient getZkClient() { return _zkclient; } @@ -538,7 +539,7 @@ public class TestHandleNewSession extends ZkTestBase { return _handlers; } - HelixZkClient getZkClient() { + RealmAwareZkClient getZkClient() { return _zkclient; } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/HelixZkClientFactory.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/HelixZkClientFactory.java index ca13321..1f792ee 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/HelixZkClientFactory.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/HelixZkClientFactory.java @@ -29,7 +29,7 @@ import org.apache.helix.zookeeper.zkclient.ZkConnection; /** * Abstract class of the ZkClient factory. */ -abstract class HelixZkClientFactory implements RealmAwareZkClientFactory { +public abstract class HelixZkClientFactory implements RealmAwareZkClientFactory { /** * Build a ZkClient using specified connection config and client config
