This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch zooscalability in repository https://gitbox.apache.org/repos/asf/helix.git
commit 0ab3a1ead15e3dabb3d61e005ed36106689cfed3 Author: Hunter Lee <[email protected]> AuthorDate: Thu Feb 13 17:27:10 2020 -0800 Add RealmAwareZkClient and RealmAwareZkClientFactory interfaces (#745) RealmAwareZkClient and RealmAwareZkClientFactory are going to be the top-level interfaces for other realm-aware ZkClient APIs (think FederatedZkClient, DedicatedZkClient, etc.). RealmAwareZkClient will support all the existing interface methods that HelixZkClient supports. RealmAwareZkClientFactory will be the interface implemented by SharedZkClientFactory and DedicatedZkClientFactory. --- .../apache/helix/manager/zk/ZKHelixManager.java | 4 +- .../java/org/apache/helix/common/ZkTestBase.java | 3 +- .../helix/zookeeper/api/client/HelixZkClient.java | 173 +++++++++++++++++++++ .../zookeeper/api/client/RealmAwareZkClient.java | 94 ++++++----- .../api/factory/RealmAwareZkClientFactory.java | 22 +++ .../impl/factory/DedicatedZkClientFactory.java | 17 ++ .../impl/factory/HelixZkClientFactory.java | 3 +- .../impl/factory/SharedZkClientFactory.java | 17 ++ 8 files changed, 292 insertions(+), 41 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java index b4368f9..b452b4c 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java @@ -1144,11 +1144,11 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { @Override public void handleNewSession(String sessionId) throws Exception { /* - * TODO: after removing I0ItecIZkStateListenerHelixImpl, null session should be checked and + * TODO: after removing I0ItecIZkStateListenerImpl, null session should be checked and * discarded. * Null session is still a special case here, which is treated as non-session aware operation. * This special case could still potentially cause race condition, so null session should NOT - * be acceptable, once I0ItecIZkStateListenerHelixImpl is removed. Currently this special case + * be acceptable, once I0ItecIZkStateListenerImpl is removed. Currently this special case * is kept for backward compatibility. */ diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java index f1fdf53..88e9ea4 100644 --- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java @@ -149,8 +149,9 @@ public class ZkTestBase { } catch (Exception e) { Assert.fail("Failed to parse the number of ZKs from config!"); } + } else { + Assert.fail("multiZk config is set but numZk config is missing!"); } - Assert.fail("multiZk config is set but numZk config is missing!"); } // Start "numZkFromConfigInt" ZooKeepers diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/HelixZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/HelixZkClient.java index bbb8a98..9a1a69d 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/HelixZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/HelixZkClient.java @@ -19,8 +19,181 @@ package org.apache.helix.zookeeper.api.client; * under the License. */ +import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer; +import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer; +import org.apache.helix.zookeeper.zkclient.serialize.SerializableSerializer; +import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer; + + /** + * Deprecated - please use RealmAwareZkClient instead. + * * HelixZkClient interface that follows the supported API structure of RealmAwareZkClient. */ +@Deprecated public interface HelixZkClient extends RealmAwareZkClient { + + /** + * Deprecated - please use RealmAwareZkClient and RealmAwareZkConnectionConfig instead. + * + * Configuration for creating a new ZkConnection. + */ + @Deprecated + class ZkConnectionConfig { + // Connection configs + private final String _zkServers; + private int _sessionTimeout = HelixZkClient.DEFAULT_SESSION_TIMEOUT; + + public ZkConnectionConfig(String zkServers) { + _zkServers = zkServers; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof HelixZkClient.ZkConnectionConfig)) { + return false; + } + HelixZkClient.ZkConnectionConfig configObj = (HelixZkClient.ZkConnectionConfig) obj; + return (_zkServers == null && configObj._zkServers == null || _zkServers != null && _zkServers + .equals(configObj._zkServers)) && _sessionTimeout == configObj._sessionTimeout; + } + + @Override + public int hashCode() { + return _sessionTimeout * 31 + _zkServers.hashCode(); + } + + @Override + public String toString() { + return (_zkServers + "_" + _sessionTimeout).replaceAll("[\\W]", "_"); + } + + public HelixZkClient.ZkConnectionConfig setSessionTimeout(Integer sessionTimeout) { + this._sessionTimeout = sessionTimeout; + return this; + } + + public String getZkServers() { + return _zkServers; + } + + public int getSessionTimeout() { + return _sessionTimeout; + } + } + + /** + * Deprecated - please use RealmAwareZkClient and RealmAwareZkClientConfig instead. + * + * Configuration for creating a new HelixZkClient with serializer and monitor. + * + * TODO: If possible, try to merge with RealmAwareZkClient's RealmAwareZkClientConfig to reduce duplicate logic/code (without breaking backward-compatibility). + * Simply making this a subclass of RealmAwareZkClientConfig will break backward-compatiblity. + */ + @Deprecated + class ZkClientConfig { + // For client to init the connection + private long _connectInitTimeout = DEFAULT_CONNECTION_TIMEOUT; + + // Data access configs + private long _operationRetryTimeout = DEFAULT_OPERATION_TIMEOUT; + + // Others + private PathBasedZkSerializer _zkSerializer; + + // Monitoring + private String _monitorType; + private String _monitorKey; + private String _monitorInstanceName = null; + private boolean _monitorRootPathOnly = true; + + public ZkClientConfig setZkSerializer(PathBasedZkSerializer zkSerializer) { + this._zkSerializer = zkSerializer; + return this; + } + + public ZkClientConfig setZkSerializer(ZkSerializer zkSerializer) { + this._zkSerializer = new BasicZkSerializer(zkSerializer); + return this; + } + + /** + * Used as part of the MBean ObjectName. This item is required for enabling monitoring. + * + * @param monitorType + */ + public ZkClientConfig setMonitorType(String monitorType) { + this._monitorType = monitorType; + return this; + } + + /** + * Used as part of the MBean ObjectName. This item is required for enabling monitoring. + * + * @param monitorKey + */ + public ZkClientConfig setMonitorKey(String monitorKey) { + this._monitorKey = monitorKey; + return this; + } + + /** + * Used as part of the MBean ObjectName. This item is optional. + * + * @param instanceName + */ + public ZkClientConfig setMonitorInstanceName(String instanceName) { + this._monitorInstanceName = instanceName; + return this; + } + + public ZkClientConfig setMonitorRootPathOnly(Boolean monitorRootPathOnly) { + this._monitorRootPathOnly = monitorRootPathOnly; + return this; + } + + public ZkClientConfig setOperationRetryTimeout(Long operationRetryTimeout) { + this._operationRetryTimeout = operationRetryTimeout; + return this; + } + + public ZkClientConfig setConnectInitTimeout(long _connectInitTimeout) { + this._connectInitTimeout = _connectInitTimeout; + return this; + } + + public PathBasedZkSerializer getZkSerializer() { + if (_zkSerializer == null) { + _zkSerializer = new BasicZkSerializer(new SerializableSerializer()); + } + return _zkSerializer; + } + + public long getOperationRetryTimeout() { + return _operationRetryTimeout; + } + + public String getMonitorType() { + return _monitorType; + } + + public String getMonitorKey() { + return _monitorKey; + } + + public String getMonitorInstanceName() { + return _monitorInstanceName; + } + + public boolean isMonitorRootPathOnly() { + return _monitorRootPathOnly; + } + + public long getConnectInitTimeout() { + return _connectInitTimeout; + } + } } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java index f2345f6..aa8bf7e 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java @@ -40,7 +40,25 @@ import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; +/** + * The Realm-aware ZkClient interface. + * NOTE: "Realm-aware" does not necessarily mean that the RealmAwareZkClient instance will be connecting to multiple ZK realms. + * On single-realm mode, RealmAwareZkClient will reject requests going out to other ZK realms than the one set at initialization. + * On multi-realm mode, RealmAwareZkClient will connect to multiple ZK realms but will reject EPHEMERAL AccessMode operations. + */ public interface RealmAwareZkClient { + + /** + * Specifies which mode to run this RealmAwareZkClient on. + * + * SINGLE_REALM: CRUD, change subscription, and EPHEMERAL CreateMode are supported. + * MULTI_REALM: CRUD and change subscription are supported. Operations involving EPHEMERAL CreateMode will throw an UnsupportedOperationException. + */ + enum MODE { + SINGLE_REALM, + MULTI_REALM + } + int DEFAULT_OPERATION_TIMEOUT = Integer.MAX_VALUE; int DEFAULT_CONNECTION_TIMEOUT = 60 * 1000; int DEFAULT_SESSION_TIMEOUT = 30 * 1000; @@ -60,7 +78,7 @@ public interface RealmAwareZkClient { * TODO: remove below default implementation when getting rid of I0Itec in the new zk client. */ default void subscribeStateChanges(final IZkStateListener listener) { - subscribeStateChanges(new HelixZkClient.I0ItecIZkStateListenerHelixImpl(listener)); + subscribeStateChanges(new I0ItecIZkStateListenerImpl(listener)); } /* @@ -69,7 +87,7 @@ public interface RealmAwareZkClient { * TODO: remove below default implementation when getting rid of I0Itec in the new zk client. */ default void unsubscribeStateChanges(IZkStateListener listener) { - unsubscribeStateChanges(new HelixZkClient.I0ItecIZkStateListenerHelixImpl(listener)); + unsubscribeStateChanges(new I0ItecIZkStateListenerImpl(listener)); } /** @@ -250,11 +268,10 @@ public interface RealmAwareZkClient { * This is for backward compatibility and to avoid breaking the original implementation of * {@link org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener}. */ - class I0ItecIZkStateListenerHelixImpl - implements org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener { + class I0ItecIZkStateListenerImpl implements org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener { private IZkStateListener _listener; - I0ItecIZkStateListenerHelixImpl(IZkStateListener listener) { + I0ItecIZkStateListenerImpl(IZkStateListener listener) { _listener = listener; } @@ -282,15 +299,14 @@ public interface RealmAwareZkClient { if (obj == this) { return true; } - if (!(obj instanceof HelixZkClient.I0ItecIZkStateListenerHelixImpl)) { + if (!(obj instanceof I0ItecIZkStateListenerImpl)) { return false; } if (_listener == null) { return false; } - HelixZkClient.I0ItecIZkStateListenerHelixImpl - defaultListener = (HelixZkClient.I0ItecIZkStateListenerHelixImpl) obj; + I0ItecIZkStateListenerImpl defaultListener = (I0ItecIZkStateListenerImpl) obj; return _listener.equals(defaultListener._listener); } @@ -306,15 +322,19 @@ public interface RealmAwareZkClient { } /** - * Configuration for creating a new ZkConnection. + * ZkConnection-related configs for creating an instance of RealmAwareZkClient. */ - class ZkConnectionConfig { - // Connection configs - private final String _zkServers; - private int _sessionTimeout = HelixZkClient.DEFAULT_SESSION_TIMEOUT; + class RealmAwareZkConnectionConfig { + + /** + * zkRealmShardingKey: used to deduce which ZK realm this RealmAwareZkClientConfig should connect to. + * NOTE: this field will be ignored if MODE is MULTI_REALM! + */ + private final String _zkRealmShardingKey; + private int _sessionTimeout = DEFAULT_SESSION_TIMEOUT; - public ZkConnectionConfig(String zkServers) { - _zkServers = zkServers; + public RealmAwareZkConnectionConfig(String zkRealmShardingKey) { + _zkRealmShardingKey = zkRealmShardingKey; } @Override @@ -322,32 +342,32 @@ public interface RealmAwareZkClient { if (obj == this) { return true; } - if (!(obj instanceof HelixZkClient.ZkConnectionConfig)) { + if (!(obj instanceof RealmAwareZkConnectionConfig)) { return false; } - HelixZkClient.ZkConnectionConfig configObj = (HelixZkClient.ZkConnectionConfig) obj; - return (_zkServers == null && configObj._zkServers == null || - _zkServers != null && _zkServers.equals(configObj._zkServers)) && - _sessionTimeout == configObj._sessionTimeout; + RealmAwareZkConnectionConfig configObj = (RealmAwareZkConnectionConfig) obj; + return (_zkRealmShardingKey == null && configObj._zkRealmShardingKey == null + || _zkRealmShardingKey != null && _zkRealmShardingKey + .equals(configObj._zkRealmShardingKey)) && _sessionTimeout == configObj._sessionTimeout; } @Override public int hashCode() { - return _sessionTimeout * 31 + _zkServers.hashCode(); + return _sessionTimeout * 31 + _zkRealmShardingKey.hashCode(); } @Override public String toString() { - return (_zkServers + "_" + _sessionTimeout).replaceAll("[\\W]", "_"); + return (_zkRealmShardingKey + "_" + _sessionTimeout).replaceAll("[\\W]", "_"); } - public HelixZkClient.ZkConnectionConfig setSessionTimeout(Integer sessionTimeout) { + public RealmAwareZkConnectionConfig setSessionTimeout(int sessionTimeout) { this._sessionTimeout = sessionTimeout; return this; } - public String getZkServers() { - return _zkServers; + public String getZkRealmShardingKey() { + return _zkRealmShardingKey; } public int getSessionTimeout() { @@ -356,14 +376,14 @@ public interface RealmAwareZkClient { } /** - * Configuration for creating a new RealmAwareZkClient with serializer and monitor. + * ZkClient-related configs for creating an instance of RealmAwareZkClient. */ - class ZkClientConfig { + class RealmAwareZkClientConfig { // For client to init the connection - private long _connectInitTimeout = HelixZkClient.DEFAULT_CONNECTION_TIMEOUT; + private long _connectInitTimeout = DEFAULT_CONNECTION_TIMEOUT; // Data access configs - private long _operationRetryTimeout = HelixZkClient.DEFAULT_OPERATION_TIMEOUT; + private long _operationRetryTimeout = DEFAULT_OPERATION_TIMEOUT; // Others private PathBasedZkSerializer _zkSerializer; @@ -374,12 +394,12 @@ public interface RealmAwareZkClient { private String _monitorInstanceName = null; private boolean _monitorRootPathOnly = true; - public HelixZkClient.ZkClientConfig setZkSerializer(PathBasedZkSerializer zkSerializer) { + public RealmAwareZkClientConfig setZkSerializer(PathBasedZkSerializer zkSerializer) { this._zkSerializer = zkSerializer; return this; } - public HelixZkClient.ZkClientConfig setZkSerializer(ZkSerializer zkSerializer) { + public RealmAwareZkClientConfig setZkSerializer(ZkSerializer zkSerializer) { this._zkSerializer = new BasicZkSerializer(zkSerializer); return this; } @@ -389,7 +409,7 @@ public interface RealmAwareZkClient { * * @param monitorType */ - public HelixZkClient.ZkClientConfig setMonitorType(String monitorType) { + public RealmAwareZkClientConfig setMonitorType(String monitorType) { this._monitorType = monitorType; return this; } @@ -399,7 +419,7 @@ public interface RealmAwareZkClient { * * @param monitorKey */ - public HelixZkClient.ZkClientConfig setMonitorKey(String monitorKey) { + public RealmAwareZkClientConfig setMonitorKey(String monitorKey) { this._monitorKey = monitorKey; return this; } @@ -409,22 +429,22 @@ public interface RealmAwareZkClient { * * @param instanceName */ - public HelixZkClient.ZkClientConfig setMonitorInstanceName(String instanceName) { + public RealmAwareZkClientConfig setMonitorInstanceName(String instanceName) { this._monitorInstanceName = instanceName; return this; } - public HelixZkClient.ZkClientConfig setMonitorRootPathOnly(Boolean monitorRootPathOnly) { + public RealmAwareZkClientConfig setMonitorRootPathOnly(Boolean monitorRootPathOnly) { this._monitorRootPathOnly = monitorRootPathOnly; return this; } - public HelixZkClient.ZkClientConfig setOperationRetryTimeout(Long operationRetryTimeout) { + public RealmAwareZkClientConfig setOperationRetryTimeout(Long operationRetryTimeout) { this._operationRetryTimeout = operationRetryTimeout; return this; } - public HelixZkClient.ZkClientConfig setConnectInitTimeout(long _connectInitTimeout) { + public RealmAwareZkClientConfig setConnectInitTimeout(long _connectInitTimeout) { this._connectInitTimeout = _connectInitTimeout; return this; } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java index 9fbf259..f68ffe4 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java @@ -19,5 +19,27 @@ package org.apache.helix.zookeeper.api.factory; * under the License. */ +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; + + +/** + * Creates an instance of RealmAwareZkClient. + */ public interface RealmAwareZkClientFactory { + /** + * Build a RealmAwareZkClient using specified connection config and client config. + * @param connectionConfig + * @param clientConfig + * @return HelixZkClient + */ + RealmAwareZkClient buildZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig, + RealmAwareZkClient.RealmAwareZkClientConfig clientConfig); + + /** + * Builds a RealmAwareZkClient using specified connection config and default client config. + * @param connectionConfig + * @return RealmAwareZkClient + */ + RealmAwareZkClient buildZkClient( + RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig); } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/DedicatedZkClientFactory.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/DedicatedZkClientFactory.java index 90ecf9b..2695a5d 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/DedicatedZkClientFactory.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/DedicatedZkClientFactory.java @@ -20,6 +20,7 @@ package org.apache.helix.zookeeper.impl.factory; */ import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.apache.helix.zookeeper.impl.client.ZkClient; @@ -31,6 +32,22 @@ public class DedicatedZkClientFactory extends HelixZkClientFactory { protected DedicatedZkClientFactory() { } + @Override + public RealmAwareZkClient buildZkClient( + RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig, + RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) { + // TODO: Implement the logic + // Return an instance of DedicatedZkClient + return null; + } + + @Override + public RealmAwareZkClient buildZkClient( + RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig) { + // TODO: Implement the logic + return null; + } + private static class SingletonHelper { private static final DedicatedZkClientFactory INSTANCE = new DedicatedZkClientFactory(); } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/HelixZkClientFactory.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/HelixZkClientFactory.java index 9e1ca6e..9584c57 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/HelixZkClientFactory.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/HelixZkClientFactory.java @@ -20,6 +20,7 @@ package org.apache.helix.zookeeper.impl.factory; */ import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.api.factory.RealmAwareZkClientFactory; import org.apache.helix.zookeeper.exception.ZkClientException; import org.apache.helix.zookeeper.zkclient.IZkConnection; import org.apache.helix.zookeeper.zkclient.ZkConnection; @@ -28,7 +29,7 @@ import org.apache.helix.zookeeper.zkclient.ZkConnection; /** * Abstract class of the ZkClient factory. */ -abstract class HelixZkClientFactory { +abstract class HelixZkClientFactory implements RealmAwareZkClientFactory { /** * Build a ZkClient using specified connection config and client config diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java index bf9d9a1..a9b8e33 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java @@ -22,6 +22,7 @@ package org.apache.helix.zookeeper.impl.factory; import java.util.HashMap; import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.apache.helix.zookeeper.exception.ZkClientException; import org.apache.helix.zookeeper.impl.client.SharedZkClient; import org.slf4j.Logger; @@ -40,6 +41,22 @@ public class SharedZkClientFactory extends HelixZkClientFactory { protected SharedZkClientFactory() { } + @Override + public RealmAwareZkClient buildZkClient( + RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig, + RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) { + // TODO: Implement the logic + // Return an instance of SharedZkClient + return null; + } + + @Override + public RealmAwareZkClient buildZkClient( + RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig) { + // TODO: Implement the logic + return null; + } + private static class SingletonHelper { private static final SharedZkClientFactory INSTANCE = new SharedZkClientFactory(); }
