This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch zooscalability in repository https://gitbox.apache.org/repos/asf/helix.git
commit 06450c3a2785891771d46898b0824b80b499d7ac Author: Huizhi Lu <[email protected]> AuthorDate: Sat Feb 29 09:45:23 2020 -0800 Add FederatedZkClient (#789) As part of ZkClient API enhancement, we wish to add FederatedZkClient, which is a wrapper of the raw ZkClient, that provides realm-aware access to ZooKeeper. FederatedZkClient will internally maintain multiple ZooKeeper sessions connecting to different ZooKeeper realms on an as-needed basis and route requests to the appropriate ZooKeeper based on the ZK path sharding key. Ephemeral node creation is not supported. --- .../zookeeper/impl/client/FederatedZkClient.java | 322 ++++++++++++++++----- .../apache/helix/zookeeper/impl/ZkTestBase.java | 13 +- .../impl/client/TestFederatedZkClient.java | 312 ++++++++++++++++++++ 3 files changed, 573 insertions(+), 74 deletions(-) diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java index 3925a6d..5f63408 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java @@ -19,345 +19,533 @@ package org.apache.helix.zookeeper.impl.client; * under the License. */ +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData; import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; +import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory; import org.apache.helix.zookeeper.zkclient.DataUpdater; import org.apache.helix.zookeeper.zkclient.IZkChildListener; import org.apache.helix.zookeeper.zkclient.IZkDataListener; +import org.apache.helix.zookeeper.zkclient.ZkConnection; import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks; -import org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener; +import org.apache.helix.zookeeper.zkclient.IZkStateListener; +import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer; import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer; import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.Op; import org.apache.zookeeper.OpResult; +import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Implements and supports all ZK operations defined in interface {@link RealmAwareZkClient}, + * except for session-aware operations such as creating ephemeral nodes, for which + * an {@link UnsupportedOperationException} will be thrown. + * <p> + * It acts as a single ZK client but will automatically route read/write/change subscription + * requests to the corresponding ZkClient with the help of metadata store directory service. + * It could connect to multiple ZK addresses and maintain a {@link ZkClient} for each ZK address. + * <p> + * Note: each Zk realm has its own event queue to handle listeners. So listeners from different ZK + * realms could be handled concurrently because listeners of a ZK realm are handled in its own + * queue. The concurrency of listeners should be aware of when implementing listeners for different + * ZK realms. The users should use thread-safe data structures if they wish to handle change + * callbacks. + */ +public class FederatedZkClient implements RealmAwareZkClient { + private static final Logger LOG = LoggerFactory.getLogger(FederatedZkClient.class); + private static final String FEDERATED_ZK_CLIENT = FederatedZkClient.class.getSimpleName(); + private static final String DEDICATED_ZK_CLIENT_FACTORY = + DedicatedZkClientFactory.class.getSimpleName(); + + private final MetadataStoreRoutingData _metadataStoreRoutingData; + private final RealmAwareZkClient.RealmAwareZkClientConfig _clientConfig; + + // ZK realm -> ZkClient + private final Map<String, ZkClient> _zkRealmToZkClientMap; + + private volatile boolean _isClosed; + private PathBasedZkSerializer _pathBasedZkSerializer; + + // TODO: support capacity of ZkClient number in one FederatedZkClient and do garbage collection. + public FederatedZkClient(RealmAwareZkClient.RealmAwareZkClientConfig clientConfig, + MetadataStoreRoutingData metadataStoreRoutingData) { + if (metadataStoreRoutingData == null) { + throw new IllegalArgumentException("MetadataStoreRoutingData cannot be null!"); + } + if (clientConfig == null) { + throw new IllegalArgumentException("Client config cannot be null!"); + } + + _isClosed = false; + _clientConfig = clientConfig; + _pathBasedZkSerializer = clientConfig.getZkSerializer(); + _metadataStoreRoutingData = metadataStoreRoutingData; + _zkRealmToZkClientMap = new ConcurrentHashMap<>(); + } -public class FederatedZkClient implements RealmAwareZkClient { @Override public List<String> subscribeChildChanges(String path, IZkChildListener listener) { - return null; + return getZkClient(path).subscribeChildChanges(path, listener); } @Override public void unsubscribeChildChanges(String path, IZkChildListener listener) { - + getZkClient(path).unsubscribeChildChanges(path, listener); } @Override public void subscribeDataChanges(String path, IZkDataListener listener) { - + getZkClient(path).subscribeDataChanges(path, listener); } @Override public void unsubscribeDataChanges(String path, IZkDataListener listener) { - + getZkClient(path).unsubscribeDataChanges(path, listener); } @Override public void subscribeStateChanges(IZkStateListener listener) { - + throwUnsupportedOperationException(); } @Override public void unsubscribeStateChanges(IZkStateListener listener) { + throwUnsupportedOperationException(); + } + @Override + public void subscribeStateChanges( + org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener) { + throwUnsupportedOperationException(); } @Override - public void unsubscribeAll() { + public void unsubscribeStateChanges( + org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener) { + throwUnsupportedOperationException(); + } + @Override + public void unsubscribeAll() { + _zkRealmToZkClientMap.values().forEach(ZkClient::unsubscribeAll); } @Override public void createPersistent(String path) { - + createPersistent(path, false); } @Override public void createPersistent(String path, boolean createParents) { - + createPersistent(path, createParents, ZooDefs.Ids.OPEN_ACL_UNSAFE); } @Override public void createPersistent(String path, boolean createParents, List<ACL> acl) { - + getZkClient(path).createPersistent(path, createParents, acl); } @Override public void createPersistent(String path, Object data) { - + create(path, data, CreateMode.PERSISTENT); } @Override public void createPersistent(String path, Object data, List<ACL> acl) { - + create(path, data, acl, CreateMode.PERSISTENT); } @Override public String createPersistentSequential(String path, Object data) { - return null; + return create(path, data, CreateMode.PERSISTENT_SEQUENTIAL); } @Override public String createPersistentSequential(String path, Object data, List<ACL> acl) { - return null; + return create(path, data, acl, CreateMode.PERSISTENT_SEQUENTIAL); } @Override public void createEphemeral(String path) { - + create(path, null, CreateMode.EPHEMERAL); } @Override public void createEphemeral(String path, String sessionId) { - + createEphemeral(path, null, sessionId); } @Override public void createEphemeral(String path, List<ACL> acl) { - + create(path, null, acl, CreateMode.EPHEMERAL); } @Override public void createEphemeral(String path, List<ACL> acl, String sessionId) { - + create(path, null, acl, CreateMode.EPHEMERAL, sessionId); } @Override public String create(String path, Object data, CreateMode mode) { - return null; + return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode); } @Override - public String create(String path, Object datat, List<ACL> acl, CreateMode mode) { - return null; + public String create(String path, Object data, List<ACL> acl, CreateMode mode) { + return create(path, data, acl, mode, null); } @Override public void createEphemeral(String path, Object data) { - + create(path, data, CreateMode.EPHEMERAL); } @Override public void createEphemeral(String path, Object data, String sessionId) { - + create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, sessionId); } @Override public void createEphemeral(String path, Object data, List<ACL> acl) { - + create(path, data, acl, CreateMode.EPHEMERAL); } @Override public void createEphemeral(String path, Object data, List<ACL> acl, String sessionId) { - + create(path, data, acl, CreateMode.EPHEMERAL, sessionId); } @Override public String createEphemeralSequential(String path, Object data) { - return null; + return create(path, data, CreateMode.EPHEMERAL_SEQUENTIAL); } @Override public String createEphemeralSequential(String path, Object data, List<ACL> acl) { - return null; + return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL); } @Override public String createEphemeralSequential(String path, Object data, String sessionId) { - return null; + return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, + sessionId); } @Override public String createEphemeralSequential(String path, Object data, List<ACL> acl, String sessionId) { - return null; + return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL, sessionId); } @Override public List<String> getChildren(String path) { - return null; + return getZkClient(path).getChildren(path); } @Override public int countChildren(String path) { - return 0; + return getZkClient(path).countChildren(path); } @Override public boolean exists(String path) { - return false; + return getZkClient(path).exists(path); } @Override public Stat getStat(String path) { - return null; + return getZkClient(path).getStat(path); } @Override public boolean waitUntilExists(String path, TimeUnit timeUnit, long time) { - return false; + return getZkClient(path).waitUntilExists(path, timeUnit, time); } @Override public void deleteRecursively(String path) { - + getZkClient(path).deleteRecursively(path); } @Override public boolean delete(String path) { - return false; + return getZkClient(path).delete(path); } @Override + @SuppressWarnings("unchecked") public <T> T readData(String path) { - return null; + return (T) readData(path, false); } @Override public <T> T readData(String path, boolean returnNullIfPathNotExists) { - return null; + return getZkClient(path).readData(path, returnNullIfPathNotExists); } @Override public <T> T readData(String path, Stat stat) { - return null; + return getZkClient(path).readData(path, stat); } @Override public <T> T readData(String path, Stat stat, boolean watch) { - return null; + return getZkClient(path).readData(path, stat, watch); } @Override public <T> T readDataAndStat(String path, Stat stat, boolean returnNullIfPathNotExists) { - return null; + return getZkClient(path).readData(path, stat, returnNullIfPathNotExists); } @Override public void writeData(String path, Object object) { - + writeData(path, object, -1); } @Override public <T> void updateDataSerialized(String path, DataUpdater<T> updater) { - + getZkClient(path).updateDataSerialized(path, updater); } @Override - public void writeData(String path, Object datat, int expectedVersion) { - + public void writeData(String path, Object data, int expectedVersion) { + writeDataReturnStat(path, data, expectedVersion); } @Override - public Stat writeDataReturnStat(String path, Object datat, int expectedVersion) { - return null; + public Stat writeDataReturnStat(String path, Object data, int expectedVersion) { + return getZkClient(path).writeDataReturnStat(path, data, expectedVersion); } @Override - public Stat writeDataGetStat(String path, Object datat, int expectedVersion) { - return null; + public Stat writeDataGetStat(String path, Object data, int expectedVersion) { + return writeDataReturnStat(path, data, expectedVersion); } @Override - public void asyncCreate(String path, Object datat, CreateMode mode, + public void asyncCreate(String path, Object data, CreateMode mode, ZkAsyncCallbacks.CreateCallbackHandler cb) { - + getZkClient(path).asyncCreate(path, data, mode, cb); } @Override - public void asyncSetData(String path, Object datat, int version, + public void asyncSetData(String path, Object data, int version, ZkAsyncCallbacks.SetDataCallbackHandler cb) { - + getZkClient(path).asyncSetData(path, data, version, cb); } @Override public void asyncGetData(String path, ZkAsyncCallbacks.GetDataCallbackHandler cb) { - + getZkClient(path).asyncGetData(path, cb); } @Override public void asyncExists(String path, ZkAsyncCallbacks.ExistsCallbackHandler cb) { - + getZkClient(path).asyncExists(path, cb); } @Override public void asyncDelete(String path, ZkAsyncCallbacks.DeleteCallbackHandler cb) { - + getZkClient(path).asyncDelete(path, cb); } @Override public void watchForData(String path) { - + getZkClient(path).watchForData(path); } @Override public List<String> watchForChilds(String path) { - return null; + return getZkClient(path).watchForChilds(path); } @Override public long getCreationTime(String path) { - return 0; + return getZkClient(path).getCreationTime(path); } @Override public List<OpResult> multi(Iterable<Op> ops) { + throwUnsupportedOperationException(); return null; } @Override public boolean waitUntilConnected(long time, TimeUnit timeUnit) { + throwUnsupportedOperationException(); return false; } @Override public String getServers() { + throwUnsupportedOperationException(); return null; } @Override public long getSessionId() { - return 0; + // Session-aware is unsupported. + throwUnsupportedOperationException(); + return 0L; } @Override public void close() { + if (isClosed()) { + return; + } + _isClosed = true; + + synchronized (_zkRealmToZkClientMap) { + Iterator<Map.Entry<String, ZkClient>> iterator = _zkRealmToZkClientMap.entrySet().iterator(); + + while (iterator.hasNext()) { + Map.Entry<String, ZkClient> entry = iterator.next(); + String zkRealm = entry.getKey(); + ZkClient zkClient = entry.getValue(); + + // Catch any exception from ZkClient's close() to avoid that there is leakage of + // remaining unclosed ZkClient. + try { + zkClient.close(); + } catch (Exception e) { + LOG.error("Exception thrown when closing ZkClient for ZkRealm: {}!", zkRealm, e); + } + iterator.remove(); + } + } + + LOG.info("{} is successfully closed.", FEDERATED_ZK_CLIENT); } @Override public boolean isClosed() { - return false; + return _isClosed; } @Override public byte[] serialize(Object data, String path) { - return new byte[0]; + return getZkClient(path).serialize(data, path); } @Override public <T> T deserialize(byte[] data, String path) { - return null; + return getZkClient(path).deserialize(data, path); } @Override public void setZkSerializer(ZkSerializer zkSerializer) { - + _pathBasedZkSerializer = new BasicZkSerializer(zkSerializer); + _zkRealmToZkClientMap.values() + .forEach(zkClient -> zkClient.setZkSerializer(_pathBasedZkSerializer)); } @Override public void setZkSerializer(PathBasedZkSerializer zkSerializer) { - + _pathBasedZkSerializer = zkSerializer; + _zkRealmToZkClientMap.values().forEach(zkClient -> zkClient.setZkSerializer(zkSerializer)); } @Override public PathBasedZkSerializer getZkSerializer() { - return null; + return _pathBasedZkSerializer; + } + + private String create(final String path, final Object dataObject, final List<ACL> acl, + final CreateMode mode, final String expectedSessionId) { + if (mode.isEphemeral()) { + throwUnsupportedOperationException(); + } + + // Create mode is not session-aware, so the node does not have to be created + // by the expectedSessionId. + return getZkClient(path).create(path, dataObject, acl, mode); + } + + private ZkClient getZkClient(String path) { + // If FederatedZkClient is closed, should not return ZkClient. + checkClosedState(); + + String zkRealm = getZkRealm(path); + + // Use this zkClient reference to protect the returning zkClient from being null because of + // race condition. Once we get the reference, even _zkRealmToZkClientMap is cleared by closed(), + // this zkClient is not null which guarantees the returned value not null. + ZkClient zkClient = _zkRealmToZkClientMap.get(zkRealm); + + if (zkClient == null) { + // 1. Synchronized to avoid creating duplicate ZkClient for the same ZkRealm. + // 2. Synchronized with close() to avoid creating new ZkClient when all ZkClients are + // being closed and _zkRealmToZkClientMap is being cleared. + synchronized (_zkRealmToZkClientMap) { + // Because of potential race condition: thread B to get ZkClient could be blocked by this + // synchronized, while thread A is executing closed() in its synchronized block. So thread B + // could still enter this synchronized block once A completes executing closed() and + // releases the synchronized lock. + // Check closed state again to avoid creating a new ZkClient after FederatedZkClient + // is already closed. + checkClosedState(); + + if (!_zkRealmToZkClientMap.containsKey(zkRealm)) { + zkClient = createZkClient(zkRealm); + _zkRealmToZkClientMap.put(zkRealm, zkClient); + } else { + zkClient = _zkRealmToZkClientMap.get(zkRealm); + } + } + } + + return zkClient; + } + + private String getZkRealm(String path) { + String zkRealm; + try { + zkRealm = _metadataStoreRoutingData.getMetadataStoreRealm(path); + } catch (NoSuchElementException ex) { + throw new NoSuchElementException("Cannot find ZK realm for the path: " + path); + } + + if (zkRealm == null || zkRealm.isEmpty()) { + throw new NoSuchElementException("Cannot find ZK realm for the path: " + path); + } + + return zkRealm; + } + + private ZkClient createZkClient(String zkAddress) { + LOG.debug("Creating ZkClient for realm: {}.", zkAddress); + return new ZkClient(new ZkConnection(zkAddress), (int) _clientConfig.getConnectInitTimeout(), + _clientConfig.getOperationRetryTimeout(), _pathBasedZkSerializer, + _clientConfig.getMonitorType(), _clientConfig.getMonitorKey(), + _clientConfig.getMonitorInstanceName(), _clientConfig.isMonitorRootPathOnly()); + } + + private void checkClosedState() { + if (isClosed()) { + throw new IllegalStateException(FEDERATED_ZK_CLIENT + " is closed!"); + } + } + + private void throwUnsupportedOperationException() { + throw new UnsupportedOperationException( + "Session-aware operation is not supported by " + FEDERATED_ZK_CLIENT + + ". Instead, please use " + DEDICATED_ZK_CLIENT_FACTORY + + " to create a dedicated RealmAwareZkClient for this operation."); } } diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java index 10edaf4..7e59652 100644 --- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java @@ -57,12 +57,11 @@ public class ZkTestBase { * Multiple ZK references */ // The following maps hold ZK connect string as keys - protected Map<String, ZkServer> _zkServerMap = new HashMap<>(); - protected int _numZk = 1; // Initial value + protected final Map<String, ZkServer> _zkServerMap = new HashMap<>(); + protected static int _numZk = 1; // Initial value @BeforeSuite - public void beforeSuite() - throws IOException { + public void beforeSuite() throws IOException { // Due to ZOOKEEPER-2693 fix, we need to specify whitelist for execute zk commends System.setProperty("zookeeper.4lw.commands.whitelist", "*"); @@ -80,8 +79,7 @@ public class ZkTestBase { } @AfterSuite - public void afterSuite() - throws IOException { + public void afterSuite() throws IOException { // Clean up all JMX objects for (ObjectName mbean : MBEAN_SERVER.queryNames(null, null)) { try { @@ -124,7 +122,7 @@ public class ZkTestBase { * @param zkAddress * @return */ - private ZkServer startZkServer(final String zkAddress) { + protected ZkServer startZkServer(final String zkAddress) { String zkDir = zkAddress.replace(':', '_'); final String logDir = "/tmp/" + zkDir + "/logs"; final String dataDir = "/tmp/" + zkDir + "/dataDir"; @@ -142,6 +140,7 @@ public class ZkTestBase { int port = Integer.parseInt(zkAddress.substring(zkAddress.lastIndexOf(':') + 1)); ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port); + System.out.println("Starting ZK server at " + zkAddress); zkServer.start(); return zkServer; } diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java new file mode 100644 index 0000000..5801690 --- /dev/null +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java @@ -0,0 +1,312 @@ +package org.apache.helix.zookeeper.impl.client; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.TimeUnit; + +import org.apache.helix.msdcommon.datamodel.TrieRoutingData; +import org.apache.helix.msdcommon.exception.InvalidRoutingDataException; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; +import org.apache.helix.zookeeper.impl.ZkTestBase; +import org.apache.helix.zookeeper.zkclient.IZkStateListener; +import org.apache.helix.zookeeper.zkclient.ZkServer; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestFederatedZkClient extends ZkTestBase { + private static final String TEST_SHARDING_KEY_PREFIX = "/test_sharding_key_"; + private static final String TEST_REALM_ONE_VALID_PATH = TEST_SHARDING_KEY_PREFIX + "1/a/b/c"; + private static final String TEST_REALM_TWO_VALID_PATH = TEST_SHARDING_KEY_PREFIX + "2/x/y/z"; + private static final String TEST_INVALID_PATH = TEST_SHARDING_KEY_PREFIX + "invalid/a/b/c"; + private static final String UNSUPPORTED_OPERATION_MESSAGE = + "Session-aware operation is not supported by FederatedZkClient."; + + private RealmAwareZkClient _realmAwareZkClient; + // Need to start an extra ZK server for multi-realm test, if only one ZK server is running. + private String _extraZkRealm; + private ZkServer _extraZkServer; + + @BeforeClass + public void beforeClass() throws InvalidRoutingDataException { + System.out.println("Starting " + TestFederatedZkClient.class.getSimpleName()); + + // Populate rawRoutingData + // <Realm, List of sharding keys> Mapping + Map<String, List<String>> rawRoutingData = new HashMap<>(); + for (int i = 0; i < _numZk; i++) { + List<String> shardingKeyList = Collections.singletonList(TEST_SHARDING_KEY_PREFIX + (i + 1)); + String realmName = ZK_PREFIX + (ZK_START_PORT + i); + rawRoutingData.put(realmName, shardingKeyList); + } + + if (rawRoutingData.size() < 2) { + System.out.println("There is only one ZK realm. Starting one more ZK to test multi-realm."); + _extraZkRealm = ZK_PREFIX + (ZK_START_PORT + 1); + _extraZkServer = startZkServer(_extraZkRealm); + // RealmTwo's sharding key: /test_sharding_key_2 + List<String> shardingKeyList = Collections.singletonList(TEST_SHARDING_KEY_PREFIX + "2"); + rawRoutingData.put(_extraZkRealm, shardingKeyList); + } + + // Feed the raw routing data into TrieRoutingData to construct an in-memory representation + // of routing information. + _realmAwareZkClient = new FederatedZkClient(new RealmAwareZkClient.RealmAwareZkClientConfig(), + new TrieRoutingData(rawRoutingData)); + } + + @AfterClass + public void afterClass() { + // Close it as it is created in before class. + _realmAwareZkClient.close(); + + // Close the extra zk server. + if (_extraZkServer != null) { + _extraZkServer.shutdown(); + } + + System.out.println("Ending " + TestFederatedZkClient.class.getSimpleName()); + } + + /* + * Tests that an unsupported operation should throw an UnsupportedOperationException. + */ + @Test + public void testUnsupportedOperations() { + // Test creating ephemeral. + try { + _realmAwareZkClient.create(TEST_REALM_ONE_VALID_PATH, "Hello", CreateMode.EPHEMERAL); + Assert.fail("Ephemeral node should not be created."); + } catch (UnsupportedOperationException ex) { + Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE)); + } + + // Test creating ephemeral sequential. + try { + _realmAwareZkClient + .create(TEST_REALM_ONE_VALID_PATH, "Hello", CreateMode.EPHEMERAL_SEQUENTIAL); + Assert.fail("Ephemeral node should not be created."); + } catch (UnsupportedOperationException ex) { + Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE)); + } + + List<Op> ops = Arrays.asList( + Op.create(TEST_REALM_ONE_VALID_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT), Op.delete(TEST_REALM_ONE_VALID_PATH, -1)); + try { + _realmAwareZkClient.multi(ops); + Assert.fail("multi() should not be supported."); + } catch (UnsupportedOperationException ex) { + Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE)); + } + + try { + _realmAwareZkClient.getSessionId(); + Assert.fail("getSessionId() should not be supported."); + } catch (UnsupportedOperationException ex) { + Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE)); + } + + try { + _realmAwareZkClient.getServers(); + Assert.fail("getServers() should not be supported."); + } catch (UnsupportedOperationException ex) { + Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE)); + } + + try { + _realmAwareZkClient.waitUntilConnected(5L, TimeUnit.SECONDS); + Assert.fail("getServers() should not be supported."); + } catch (UnsupportedOperationException ex) { + Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE)); + } + + // Test state change subscription. + IZkStateListener listener = new IZkStateListener() { + @Override + public void handleStateChanged(Watcher.Event.KeeperState state) { + System.out.println("Handle new state: " + state); + } + + @Override + public void handleNewSession(String sessionId) { + System.out.println("Handle new session: " + sessionId); + } + + @Override + public void handleSessionEstablishmentError(Throwable error) { + System.out.println("Handle session establishment error: " + error); + } + }; + + try { + _realmAwareZkClient.subscribeStateChanges(listener); + Assert.fail("getServers() should not be supported."); + } catch (UnsupportedOperationException ex) { + Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE)); + } + + try { + _realmAwareZkClient.unsubscribeStateChanges(listener); + Assert.fail("getServers() should not be supported."); + } catch (UnsupportedOperationException ex) { + Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE)); + } + } + + /* + * Tests the persistent create() call against a valid path and an invalid path. + * Valid path is one that belongs to the realm designated by the sharding key. + * Invalid path is one that does not belong to the realm designated by the sharding key. + */ + @Test(dependsOnMethods = "testUnsupportedOperations") + public void testCreatePersistent() { + _realmAwareZkClient.setZkSerializer(new ZNRecordSerializer()); + + // Create a dummy ZNRecord + ZNRecord znRecord = new ZNRecord("DummyRecord"); + znRecord.setSimpleField("Dummy", "Value"); + + // Test writing and reading against the validPath + _realmAwareZkClient.createPersistent(TEST_REALM_ONE_VALID_PATH, true); + _realmAwareZkClient.writeData(TEST_REALM_ONE_VALID_PATH, znRecord); + Assert.assertEquals(_realmAwareZkClient.readData(TEST_REALM_ONE_VALID_PATH), znRecord); + + // Test writing and reading against the invalid path + try { + _realmAwareZkClient.createPersistent(TEST_INVALID_PATH, true); + Assert.fail("Create() should not succeed on an invalid path!"); + } catch (NoSuchElementException ex) { + Assert + .assertEquals(ex.getMessage(), "Cannot find ZK realm for the path: " + TEST_INVALID_PATH); + } + } + + /* + * Tests that exists() works on valid path and fails on invalid path. + */ + @Test(dependsOnMethods = "testCreatePersistent") + public void testExists() { + Assert.assertTrue(_realmAwareZkClient.exists(TEST_REALM_ONE_VALID_PATH)); + + try { + _realmAwareZkClient.exists(TEST_INVALID_PATH); + Assert.fail("Exists() should not succeed on an invalid path!"); + } catch (NoSuchElementException ex) { + Assert + .assertEquals(ex.getMessage(), "Cannot find ZK realm for the path: " + TEST_INVALID_PATH); + } + } + + /* + * Tests that delete() works on valid path and fails on invalid path. + */ + @Test(dependsOnMethods = "testExists") + public void testDelete() { + try { + _realmAwareZkClient.delete(TEST_INVALID_PATH); + Assert.fail("Exists() should not succeed on an invalid path!"); + } catch (NoSuchElementException ex) { + Assert + .assertEquals(ex.getMessage(), "Cannot find ZK realm for the path: " + TEST_INVALID_PATH); + } + + Assert.assertTrue(_realmAwareZkClient.delete(TEST_REALM_ONE_VALID_PATH)); + Assert.assertFalse(_realmAwareZkClient.exists(TEST_REALM_ONE_VALID_PATH)); + } + + /* + * Tests that multi-realm feature. + */ + @Test(dependsOnMethods = "testDelete") + public void testMultiRealmCRUD() { + ZNRecord realmOneZnRecord = new ZNRecord("realmOne"); + realmOneZnRecord.setSimpleField("realmOne", "Value"); + + ZNRecord realmTwoZnRecord = new ZNRecord("realmTwo"); + realmTwoZnRecord.setSimpleField("realmTwo", "Value"); + + // Writing on realmOne. + _realmAwareZkClient.createPersistent(TEST_REALM_ONE_VALID_PATH, true); + _realmAwareZkClient.writeData(TEST_REALM_ONE_VALID_PATH, realmOneZnRecord); + + // RealmOne path is created but realmTwo path is not. + Assert.assertTrue(_realmAwareZkClient.exists(TEST_REALM_ONE_VALID_PATH)); + Assert.assertFalse(_realmAwareZkClient.exists(TEST_REALM_TWO_VALID_PATH)); + + // Writing on realmTwo. + _realmAwareZkClient.createPersistent(TEST_REALM_TWO_VALID_PATH, true); + _realmAwareZkClient.writeData(TEST_REALM_TWO_VALID_PATH, realmTwoZnRecord); + + // RealmTwo path is created. + Assert.assertTrue(_realmAwareZkClient.exists(TEST_REALM_TWO_VALID_PATH)); + + // Reading on both realms. + Assert.assertEquals(_realmAwareZkClient.readData(TEST_REALM_ONE_VALID_PATH), realmOneZnRecord); + Assert.assertEquals(_realmAwareZkClient.readData(TEST_REALM_TWO_VALID_PATH), realmTwoZnRecord); + + Assert.assertTrue(_realmAwareZkClient.delete(TEST_REALM_ONE_VALID_PATH)); + Assert.assertFalse(_realmAwareZkClient.exists(TEST_REALM_ONE_VALID_PATH)); + + // Deleting on realmOne does not delete on realmTwo. + Assert.assertTrue(_realmAwareZkClient.exists(TEST_REALM_TWO_VALID_PATH)); + + // Deleting on realmTwo. + Assert.assertTrue(_realmAwareZkClient.delete(TEST_REALM_TWO_VALID_PATH)); + Assert.assertFalse(_realmAwareZkClient.exists(TEST_REALM_TWO_VALID_PATH)); + } + + /* + * Tests that close() works. + * TODO: test that all raw zkClients are closed after FederatedZkClient close() is called. This + * could help avoid ZkClient leakage. + */ + @Test(dependsOnMethods = "testMultiRealmCRUD") + public void testClose() { + Assert.assertFalse(_realmAwareZkClient.isClosed()); + + _realmAwareZkClient.close(); + + Assert.assertTrue(_realmAwareZkClient.isClosed()); + + // Client is closed, so operation should not be executed. + try { + _realmAwareZkClient.createPersistent(TEST_REALM_ONE_VALID_PATH); + Assert + .fail("createPersistent() should not be executed because RealmAwareZkClient is closed."); + } catch (IllegalStateException ex) { + Assert.assertEquals(ex.getMessage(), "FederatedZkClient is closed!"); + } + } +}
