This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch zooscalability in repository https://gitbox.apache.org/repos/asf/helix.git
commit ab62683bc91260d19598fc29518616c1fd215923 Author: Hunter Lee <[email protected]> AuthorDate: Mon Feb 10 16:57:58 2020 -0800 Add MetadataStoreRoutingDataWriter with DistributedLeaderElection (#727) We need a separate ZkClient-based writer that could allow users to write routing data to ZK. This diff adds such an interface, an implementation, and a distributed lock implementation that could help users to manipulate the routing data. Changelist: Add ZkRoutingDataWriter (+ interface) Add ZkDistributedLock (+ interface) to guarantee that there's at most one active writer at a time (where there are multiple Helix REST deployables) Add a test for ZkRoutingDataWriter Integrate ZkRoutingDataWriter with ZkMetadataStoreDirectory Add test methods to TestZkMetadataStoreDirectory Add ZkDistributedElection to replace ZkDistributedLock (and move ZkDistributedLock to a separate PR) --- .../metadatastore/ZkMetadataStoreDirectory.java | 62 +++-- .../MetadataStoreRoutingDataReader.java | 3 +- .../accessor/MetadataStoreRoutingDataWriter.java | 74 ++++++ .../{ => accessor}/ZkRoutingDataReader.java | 25 +- .../accessor/ZkRoutingDataWriter.java | 253 +++++++++++++++++++++ .../concurrency/ZkDistributedLeaderElection.java | 142 ++++++++++++ .../constant/MetadataStoreRoutingConstants.java | 3 + .../{ => accessor}/TestZkRoutingDataReader.java | 2 +- .../accessor/TestZkRoutingDataWriter.java | 107 +++++++++ .../helix/rest/server/AbstractTestClass.java | 9 - 10 files changed, 627 insertions(+), 53 deletions(-) diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java index 5a88ca9..536d058 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java @@ -29,16 +29,11 @@ import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import org.I0Itec.zkclient.IZkChildListener; -import org.I0Itec.zkclient.IZkDataListener; -import org.apache.helix.HelixManagerProperties; -import org.apache.helix.manager.zk.ZNRecordSerializer; -import org.apache.helix.manager.zk.client.DedicatedZkClientFactory; -import org.apache.helix.manager.zk.client.HelixZkClient; -import org.apache.helix.manager.zk.client.HelixZkClient.ZkClientConfig; -import org.apache.helix.manager.zk.zookeeper.IZkStateListener; +import org.apache.helix.rest.metadatastore.accessor.MetadataStoreRoutingDataReader; +import org.apache.helix.rest.metadatastore.accessor.MetadataStoreRoutingDataWriter; +import org.apache.helix.rest.metadatastore.accessor.ZkRoutingDataReader; +import org.apache.helix.rest.metadatastore.accessor.ZkRoutingDataWriter; import org.apache.helix.rest.metadatastore.exceptions.InvalidRoutingDataException; -import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +47,7 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, Routing // TODO: enable the line below when implementation is complete // The following maps' keys represent the namespace private final Map<String, MetadataStoreRoutingDataReader> _routingDataReaderMap; + private final Map<String, MetadataStoreRoutingDataWriter> _routingDataWriterMap; private final Map<String, MetadataStoreRoutingData> _routingDataMap; private final Map<String, String> _routingZkAddressMap; // <namespace, <realm, <list of sharding keys>> mappings @@ -68,14 +64,17 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, Routing throw new InvalidRoutingDataException("Routing ZK Addresses given are invalid!"); } _routingDataReaderMap = new HashMap<>(); + _routingDataWriterMap = new HashMap<>(); _routingZkAddressMap = routingZkAddressMap; _realmToShardingKeysMap = new ConcurrentHashMap<>(); _routingDataMap = new ConcurrentHashMap<>(); - // Create RoutingDataReaders + // Create RoutingDataReaders and RoutingDataWriters for (Map.Entry<String, String> routingEntry : _routingZkAddressMap.entrySet()) { _routingDataReaderMap.put(routingEntry.getKey(), new ZkRoutingDataReader(routingEntry.getKey(), routingEntry.getValue(), this)); + _routingDataWriterMap.put(routingEntry.getKey(), + new ZkRoutingDataWriter(routingEntry.getKey(), routingEntry.getValue())); // Populate realmToShardingKeys with ZkRoutingDataReader _realmToShardingKeysMap.put(routingEntry.getKey(), @@ -132,26 +131,38 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, Routing @Override public boolean addMetadataStoreRealm(String namespace, String realm) { - // TODO implement when MetadataStoreRoutingDataWriter is ready - throw new UnsupportedOperationException(); + if (!_routingDataWriterMap.containsKey(namespace)) { + throw new IllegalArgumentException( + "Failed to add metadata store realm: Namespace " + namespace + " is not found!"); + } + return _routingDataWriterMap.get(namespace).addMetadataStoreRealm(realm); } @Override public boolean deleteMetadataStoreRealm(String namespace, String realm) { - // TODO implement when MetadataStoreRoutingDataWriter is ready - throw new UnsupportedOperationException(); + if (!_routingDataWriterMap.containsKey(namespace)) { + throw new IllegalArgumentException( + "Failed to delete metadata store realm: Namespace " + namespace + " is not found!"); + } + return _routingDataWriterMap.get(namespace).deleteMetadataStoreRealm(realm); } @Override public boolean addShardingKey(String namespace, String realm, String shardingKey) { - // TODO implement when MetadataStoreRoutingDataWriter is ready - throw new UnsupportedOperationException(); + if (!_routingDataWriterMap.containsKey(namespace)) { + throw new IllegalArgumentException( + "Failed to add sharding key: Namespace " + namespace + " is not found!"); + } + return _routingDataWriterMap.get(namespace).addShardingKey(realm, shardingKey); } @Override public boolean deleteShardingKey(String namespace, String realm, String shardingKey) { - // TODO implement when MetadataStoreRoutingDataWriter is ready - throw new UnsupportedOperationException(); + if (!_routingDataWriterMap.containsKey(namespace)) { + throw new IllegalArgumentException( + "Failed to delete sharding key: Namespace " + namespace + " is not found!"); + } + return _routingDataWriterMap.get(namespace).deleteShardingKey(realm, shardingKey); } /** @@ -165,20 +176,20 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, Routing */ @Override public void refreshRoutingData(String namespace) { - // Safe to ignore the callback if any of the mapping is null. + // Safe to ignore the callback if any of the maps are null. // If routingDataMap is null, then it will be populated by the constructor anyway // If routingDataMap is not null, then it's safe for the callback function to update it - if (_routingZkAddressMap == null || _routingDataMap == null - || _realmToShardingKeysMap == null) { - LOG.error("Construction is not completed! "); + if (_routingZkAddressMap == null || _routingDataMap == null || _realmToShardingKeysMap == null + || _routingDataReaderMap == null || _routingDataWriterMap == null) { + LOG.warn( + "refreshRoutingData callback called before ZKMetadataStoreDirectory was fully initialized. Skipping refresh!"); return; } // Check if namespace exists; otherwise, return as a NOP and log it if (!_routingZkAddressMap.containsKey(namespace)) { - LOG.error("Failed to refresh internally-cached routing data! Namespace not found: {}", - namespace); - return; + LOG.error( + "Failed to refresh internally-cached routing data! Namespace not found: " + namespace); } try { @@ -197,5 +208,6 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, Routing @Override public synchronized void close() { _routingDataReaderMap.values().forEach(MetadataStoreRoutingDataReader::close); + _routingDataWriterMap.values().forEach(MetadataStoreRoutingDataWriter::close); } } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/MetadataStoreRoutingDataReader.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/MetadataStoreRoutingDataReader.java similarity index 93% rename from helix-rest/src/main/java/org/apache/helix/rest/metadatastore/MetadataStoreRoutingDataReader.java rename to helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/MetadataStoreRoutingDataReader.java index 3cc9a06..f19e8ff 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/MetadataStoreRoutingDataReader.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/MetadataStoreRoutingDataReader.java @@ -1,4 +1,4 @@ -package org.apache.helix.rest.metadatastore; +package org.apache.helix.rest.metadatastore.accessor; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -26,6 +26,7 @@ import org.apache.helix.rest.metadatastore.exceptions.InvalidRoutingDataExceptio /** * An interface for a DAO that fetches routing data from a source and return a key-value mapping * that represent the said routing data. + * Note: Each data reader connects to a single namespace. */ public interface MetadataStoreRoutingDataReader { diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/MetadataStoreRoutingDataWriter.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/MetadataStoreRoutingDataWriter.java new file mode 100644 index 0000000..349bbd0 --- /dev/null +++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/MetadataStoreRoutingDataWriter.java @@ -0,0 +1,74 @@ +package org.apache.helix.rest.metadatastore.accessor; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.List; +import java.util.Map; + + +/** + * An interface for a DAO that writes to the metadata store that stores routing data. + * Note: Each data writer connects to a single namespace. + */ +public interface MetadataStoreRoutingDataWriter { + + /** + * Creates a realm. If the namespace does not exist, it creates one. + * @param realm + * @return true if successful or if the realm already exists. false otherwise. + */ + boolean addMetadataStoreRealm(String realm); + + /** + * Deletes a realm. + * @param realm + * @return true if successful or the realm or namespace does not exist. false otherwise. + */ + boolean deleteMetadataStoreRealm(String realm); + + /** + * Creates a mapping between the sharding key to the realm. If realm doesn't exist, it will be created (this call is idempotent). + * @param realm + * @param shardingKey + * @return false if failed + */ + boolean addShardingKey(String realm, String shardingKey); + + /** + * Deletes the mapping between the sharding key to the realm. + * @param realm + * @param shardingKey + * @return false if failed; true if the deletion is successful or the key does not exist. + */ + boolean deleteShardingKey(String realm, String shardingKey); + + /** + * Sets (overwrites) the routing data with the given <realm, list of sharding keys> mapping. + * WARNING: This overwrites all existing routing data. Use with care! + * @param routingData + * @return + */ + boolean setRoutingData(Map<String, List<String>> routingData); + + /** + * Closes any stateful resources such as connections or threads. + */ + void close(); +} diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkRoutingDataReader.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataReader.java similarity index 92% rename from helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkRoutingDataReader.java rename to helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataReader.java index 453180f..ea8c290 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkRoutingDataReader.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataReader.java @@ -1,4 +1,4 @@ -package org.apache.helix.rest.metadatastore; +package org.apache.helix.rest.metadatastore.accessor; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -31,6 +31,7 @@ import org.apache.helix.manager.zk.ZNRecordSerializer; import org.apache.helix.manager.zk.client.DedicatedZkClientFactory; import org.apache.helix.manager.zk.client.HelixZkClient; import org.apache.helix.manager.zk.zookeeper.IZkStateListener; +import org.apache.helix.rest.metadatastore.RoutingDataListener; import org.apache.helix.rest.metadatastore.constant.MetadataStoreRoutingConstants; import org.apache.helix.rest.metadatastore.exceptions.InvalidRoutingDataException; import org.apache.zookeeper.Watcher; @@ -42,10 +43,6 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD private final HelixZkClient _zkClient; private final RoutingDataListener _routingDataListener; - public ZkRoutingDataReader(String namespace, String zkAddress) { - this(namespace, zkAddress, null); - } - public ZkRoutingDataReader(String namespace, String zkAddress, RoutingDataListener routingDataListener) { if (namespace == null || namespace.isEmpty()) { @@ -115,8 +112,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD } @Override - public synchronized void handleDataChange(String s, Object o) - throws Exception { + public synchronized void handleDataChange(String s, Object o) { if (_zkClient.isClosed()) { return; } @@ -124,8 +120,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD } @Override - public synchronized void handleDataDeleted(String s) - throws Exception { + public synchronized void handleDataDeleted(String s) { if (_zkClient.isClosed()) { return; } @@ -140,8 +135,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD } @Override - public synchronized void handleChildChange(String s, List<String> list) - throws Exception { + public synchronized void handleChildChange(String s, List<String> list) { if (_zkClient.isClosed()) { return; } @@ -156,8 +150,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD } @Override - public synchronized void handleStateChanged(Watcher.Event.KeeperState state) - throws Exception { + public synchronized void handleStateChanged(Watcher.Event.KeeperState state) { if (_zkClient.isClosed()) { return; } @@ -165,8 +158,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD } @Override - public synchronized void handleNewSession(String sessionId) - throws Exception { + public synchronized void handleNewSession(String sessionId) { if (_zkClient.isClosed()) { return; } @@ -174,8 +166,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD } @Override - public synchronized void handleSessionEstablishmentError(Throwable error) - throws Exception { + public synchronized void handleSessionEstablishmentError(Throwable error) { if (_zkClient.isClosed()) { return; } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataWriter.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataWriter.java new file mode 100644 index 0000000..3e43202 --- /dev/null +++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataWriter.java @@ -0,0 +1,253 @@ +package org.apache.helix.rest.metadatastore.accessor; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.I0Itec.zkclient.exception.ZkNodeExistsException; +import org.apache.helix.ZNRecord; +import org.apache.helix.manager.zk.ZNRecordSerializer; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.manager.zk.client.DedicatedZkClientFactory; +import org.apache.helix.manager.zk.client.HelixZkClient; +import org.apache.helix.rest.metadatastore.concurrency.ZkDistributedLeaderElection; +import org.apache.helix.rest.metadatastore.constant.MetadataStoreRoutingConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ZkRoutingDataWriter implements MetadataStoreRoutingDataWriter { + private static final Logger LOG = LoggerFactory.getLogger(ZkBaseDataAccessor.class); + + private final String _namespace; + private final HelixZkClient _zkClient; + private final ZkDistributedLeaderElection _leaderElection; + + public ZkRoutingDataWriter(String namespace, String zkAddress) { + if (namespace == null || namespace.isEmpty()) { + throw new IllegalArgumentException("namespace cannot be null or empty!"); + } + _namespace = namespace; + if (zkAddress == null || zkAddress.isEmpty()) { + throw new IllegalArgumentException("Zk address cannot be null or empty!"); + } + _zkClient = DedicatedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), + new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer())); + + // Ensure that ROUTING_DATA_PATH exists in ZK. If not, create + // create() semantic will fail if it already exists + try { + _zkClient.createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, true); + } catch (ZkNodeExistsException e) { + // This is okay + } + + // Get the hostname (REST endpoint) from System property + // TODO: Fill in when Helix REST implementations are ready + ZNRecord myServerInfo = new ZNRecord("dummy hostname"); + _leaderElection = new ZkDistributedLeaderElection(_zkClient, + MetadataStoreRoutingConstants.LEADER_ELECTION_ZNODE, myServerInfo); + } + + @Override + public synchronized boolean addMetadataStoreRealm(String realm) { + if (_leaderElection.isLeader()) { + if (_zkClient.isClosed()) { + throw new IllegalStateException("ZkClient is closed!"); + } + return createZkRealm(realm); + } + + // TODO: Forward the request to leader + return true; + } + + @Override + public synchronized boolean deleteMetadataStoreRealm(String realm) { + if (_leaderElection.isLeader()) { + if (_zkClient.isClosed()) { + throw new IllegalStateException("ZkClient is closed!"); + } + return _zkClient.delete(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm); + } + + // TODO: Forward the request to leader + return true; + } + + @Override + public synchronized boolean addShardingKey(String realm, String shardingKey) { + if (_leaderElection.isLeader()) { + if (_zkClient.isClosed()) { + throw new IllegalStateException("ZkClient is closed!"); + } + // If the realm does not exist already, then create the realm + String realmPath = MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm; + if (!_zkClient.exists(realmPath)) { + // Create the realm + if (!createZkRealm(realm)) { + // Failed to create the realm - log and return false + LOG.error( + "Failed to add sharding key because ZkRealm creation failed! Namespace: {}, Realm: {}, Sharding key: {}", + _namespace, realm, shardingKey); + return false; + } + } + + // Add the sharding key to an empty ZNRecord + ZNRecord znRecord; + try { + znRecord = _zkClient.readData(realmPath); + } catch (Exception e) { + LOG.error( + "Failed to read the realm ZNRecord in addShardingKey()! Namespace: {}, Realm: {}, ShardingKey: {}", + _namespace, realm, shardingKey, e); + return false; + } + znRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY, + Collections.singletonList(shardingKey)); + try { + _zkClient.writeData(realmPath, znRecord); + } catch (Exception e) { + LOG.error( + "Failed to write the realm ZNRecord in addShardingKey()! Namespace: {}, Realm: {}, ShardingKey: {}", + _namespace, realm, shardingKey, e); + return false; + } + return true; + } + + // TODO: Forward the request to leader + return true; + } + + @Override + public synchronized boolean deleteShardingKey(String realm, String shardingKey) { + if (_leaderElection.isLeader()) { + if (_zkClient.isClosed()) { + throw new IllegalStateException("ZkClient is closed!"); + } + ZNRecord znRecord = + _zkClient.readData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm, true); + if (znRecord == null || !znRecord + .getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY) + .contains(shardingKey)) { + // This realm does not exist or shardingKey doesn't exist. Return true! + return true; + } + znRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY) + .remove(shardingKey); + // Overwrite this ZNRecord with the sharding key removed + try { + _zkClient + .writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm, znRecord); + } catch (Exception e) { + LOG.error( + "Failed to write the data back in deleteShardingKey()! Namespace: {}, Realm: {}, ShardingKey: {}", + _namespace, realm, shardingKey, e); + return false; + } + return true; + } + + // TODO: Forward the request to leader + return true; + } + + @Override + public synchronized boolean setRoutingData(Map<String, List<String>> routingData) { + if (_leaderElection.isLeader()) { + if (_zkClient.isClosed()) { + throw new IllegalStateException("ZkClient is closed!"); + } + if (routingData == null) { + throw new IllegalArgumentException("routingData given is null!"); + } + + // Remove existing routing data + for (String zkRealm : _zkClient + .getChildren(MetadataStoreRoutingConstants.ROUTING_DATA_PATH)) { + if (!_zkClient.delete(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + zkRealm)) { + LOG.error( + "Failed to delete existing routing data in setRoutingData()! Namespace: {}, Realm: {}", + _namespace, zkRealm); + return false; + } + } + + // For each ZkRealm, write the given routing data to ZooKeeper + for (Map.Entry<String, List<String>> routingDataEntry : routingData.entrySet()) { + String zkRealm = routingDataEntry.getKey(); + List<String> shardingKeyList = routingDataEntry.getValue(); + + ZNRecord znRecord = new ZNRecord(zkRealm); + znRecord + .setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY, shardingKeyList); + + String realmPath = MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + zkRealm; + try { + if (!_zkClient.exists(realmPath)) { + _zkClient.createPersistent(realmPath); + } + _zkClient.writeData(realmPath, znRecord); + } catch (Exception e) { + LOG.error("Failed to write data in setRoutingData()! Namespace: {}, Realm: {}", + _namespace, zkRealm, e); + return false; + } + } + return true; + } + + // TODO: Forward the request to leader + return true; + } + + @Override + public synchronized void close() { + _zkClient.close(); + } + + /** + * Creates a ZK realm ZNode and populates it with an empty ZNRecord if it doesn't exist already. + * @param realm + * @return + */ + private boolean createZkRealm(String realm) { + if (_zkClient.exists(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm)) { + LOG.warn("createZkRealm() called for realm: {}, but this realm already exists! Namespace: {}", + realm, _namespace); + return true; + } + try { + _zkClient.createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm); + _zkClient.writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm, + new ZNRecord(realm)); + } catch (Exception e) { + LOG.error("Failed to create ZkRealm: {}, Namespace: ", realm, _namespace); + return false; + } + + return true; + } +} diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/concurrency/ZkDistributedLeaderElection.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/concurrency/ZkDistributedLeaderElection.java new file mode 100644 index 0000000..c9b6bb2 --- /dev/null +++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/concurrency/ZkDistributedLeaderElection.java @@ -0,0 +1,142 @@ +package org.apache.helix.rest.metadatastore.concurrency; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collections; +import java.util.List; + +import org.I0Itec.zkclient.IZkDataListener; +import org.I0Itec.zkclient.exception.ZkNodeExistsException; +import org.apache.helix.ZNRecord; +import org.apache.helix.manager.zk.ZNRecordSerializer; +import org.apache.helix.manager.zk.client.HelixZkClient; +import org.apache.helix.manager.zk.zookeeper.IZkStateListener; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.Watcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ZkDistributedLeaderElection implements IZkDataListener, IZkStateListener { + private static final Logger LOG = LoggerFactory.getLogger(ZkDistributedLeaderElection.class); + private static final String PREFIX = "MSDS_SERVER_"; + + private final HelixZkClient _zkClient; + private final String _basePath; + private final ZNRecord _participantInfo; + private ZNRecord _currentLeaderInfo; + + private String _myEphemeralSequentialPath; + private volatile boolean _isLeader; + + public ZkDistributedLeaderElection(HelixZkClient zkClient, String basePath, + ZNRecord participantInfo) { + synchronized (this) { + if (zkClient == null || zkClient.isClosed()) { + throw new IllegalArgumentException("ZkClient cannot be null or closed!"); + } + _zkClient = zkClient; + _zkClient.setZkSerializer(new ZNRecordSerializer()); + if (basePath == null || basePath.isEmpty()) { + throw new IllegalArgumentException("lockBasePath cannot be null or empty!"); + } + _basePath = basePath; + _participantInfo = participantInfo; + _isLeader = false; + } + init(); + } + + /** + * Create the base path if it doesn't exist and create an ephemeral sequential ZNode. + */ + private void init() { + try { + _zkClient.createPersistent(_basePath, true); + } catch (ZkNodeExistsException e) { + // Okay if it exists already + } + + // Create my ephemeral sequential node with my information + _myEphemeralSequentialPath = _zkClient + .create(_basePath + "/" + PREFIX, _participantInfo, CreateMode.EPHEMERAL_SEQUENTIAL); + if (_myEphemeralSequentialPath == null) { + throw new IllegalStateException( + "Unable to create ephemeral sequential node at path: " + _basePath); + } + tryAcquiringLeadership(); + } + + private void tryAcquiringLeadership() { + List<String> children = _zkClient.getChildren(_basePath); + Collections.sort(children); + String leaderName = children.get(0); + ZNRecord leaderInfo = _zkClient.readData(_basePath + "/" + leaderName, true); + + String[] myNameArray = _myEphemeralSequentialPath.split("/"); + String myName = myNameArray[myNameArray.length - 1]; + + if (leaderName.equals(myName)) { + // My turn for leadership + _isLeader = true; + _currentLeaderInfo = leaderInfo; + LOG.info("{} acquired leadership! Info: {}", myName, leaderInfo); + } else { + // Watch the ephemeral ZNode before me for a deletion event + String beforeMe = children.get(children.indexOf(myName) - 1); + _zkClient.subscribeDataChanges(_basePath + "/" + beforeMe, this); + } + } + + public synchronized boolean isLeader() { + return _isLeader; + } + + public synchronized ZNRecord getCurrentLeaderInfo() { + return _currentLeaderInfo; + } + + @Override + public synchronized void handleStateChanged(Watcher.Event.KeeperState state) { + if (state == Watcher.Event.KeeperState.SyncConnected) { + init(); + } + } + + @Override + public void handleNewSession(String sessionId) { + return; + } + + @Override + public void handleSessionEstablishmentError(Throwable error) { + return; + } + + @Override + public void handleDataChange(String s, Object o) { + return; + } + + @Override + public void handleDataDeleted(String s) { + tryAcquiringLeadership(); + } +} diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/constant/MetadataStoreRoutingConstants.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/constant/MetadataStoreRoutingConstants.java index fda355b..e4240e7 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/constant/MetadataStoreRoutingConstants.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/constant/MetadataStoreRoutingConstants.java @@ -24,4 +24,7 @@ public class MetadataStoreRoutingConstants { // For ZK only public static final String ZNRECORD_LIST_FIELD_KEY = "ZK_PATH_SHARDING_KEYS"; + + // Leader election ZNode for ZkRoutingDataWriter + public static final String LEADER_ELECTION_ZNODE = "/_ZK_ROUTING_DATA_WRITER_LEADER"; } diff --git a/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkRoutingDataReader.java b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/accessor/TestZkRoutingDataReader.java similarity index 99% rename from helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkRoutingDataReader.java rename to helix-rest/src/test/java/org/apache/helix/rest/metadatastore/accessor/TestZkRoutingDataReader.java index 4479f68..77eb5eb 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkRoutingDataReader.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/accessor/TestZkRoutingDataReader.java @@ -1,4 +1,4 @@ -package org.apache.helix.rest.metadatastore; +package org.apache.helix.rest.metadatastore.accessor; /* * Licensed to the Apache Software Foundation (ASF) under one diff --git a/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/accessor/TestZkRoutingDataWriter.java b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/accessor/TestZkRoutingDataWriter.java new file mode 100644 index 0000000..441bf65 --- /dev/null +++ b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/accessor/TestZkRoutingDataWriter.java @@ -0,0 +1,107 @@ +package org.apache.helix.rest.metadatastore.accessor; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.ImmutableMap; +import org.apache.helix.AccessOption; +import org.apache.helix.ZNRecord; +import org.apache.helix.rest.metadatastore.constant.MetadataStoreRoutingConstants; +import org.apache.helix.rest.server.AbstractTestClass; +import org.junit.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestZkRoutingDataWriter extends AbstractTestClass { + private static final String DUMMY_NAMESPACE = "NAMESPACE"; + private static final String DUMMY_REALM = "REALM"; + private static final String DUMMY_SHARDING_KEY = "SHARDING_KEY"; + private MetadataStoreRoutingDataWriter _zkRoutingDataWriter; + + @BeforeClass + public void beforeClass() { + _zkRoutingDataWriter = new ZkRoutingDataWriter(DUMMY_NAMESPACE, ZK_ADDR); + } + + @AfterClass + public void afterClass() { + _baseAccessor.remove(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, AccessOption.PERSISTENT); + _zkRoutingDataWriter.close(); + } + + @Test + public void testAddMetadataStoreRealm() { + _zkRoutingDataWriter.addMetadataStoreRealm(DUMMY_REALM); + ZNRecord znRecord = _baseAccessor + .get(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + DUMMY_REALM, null, + AccessOption.PERSISTENT); + Assert.assertNotNull(znRecord); + } + + @Test(dependsOnMethods = "testAddMetadataStoreRealm") + public void testDeleteMetadataStoreRealm() { + _zkRoutingDataWriter.deleteMetadataStoreRealm(DUMMY_REALM); + Assert.assertFalse(_baseAccessor + .exists(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + DUMMY_REALM, + AccessOption.PERSISTENT)); + } + + @Test(dependsOnMethods = "testDeleteMetadataStoreRealm") + public void testAddShardingKey() { + _zkRoutingDataWriter.addShardingKey(DUMMY_REALM, DUMMY_SHARDING_KEY); + ZNRecord znRecord = _baseAccessor + .get(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + DUMMY_REALM, null, + AccessOption.PERSISTENT); + Assert.assertNotNull(znRecord); + Assert.assertTrue(znRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY) + .contains(DUMMY_SHARDING_KEY)); + } + + @Test(dependsOnMethods = "testAddShardingKey") + public void testDeleteShardingKey() { + _zkRoutingDataWriter.deleteShardingKey(DUMMY_REALM, DUMMY_SHARDING_KEY); + ZNRecord znRecord = _baseAccessor + .get(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + DUMMY_REALM, null, + AccessOption.PERSISTENT); + Assert.assertNotNull(znRecord); + Assert.assertFalse(znRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY) + .contains(DUMMY_SHARDING_KEY)); + } + + @Test(dependsOnMethods = "testDeleteShardingKey") + public void testSetRoutingData() { + Map<String, List<String>> testRoutingDataMap = + ImmutableMap.of(DUMMY_REALM, Collections.singletonList(DUMMY_SHARDING_KEY)); + _zkRoutingDataWriter.setRoutingData(testRoutingDataMap); + ZNRecord znRecord = _baseAccessor + .get(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + DUMMY_REALM, null, + AccessOption.PERSISTENT); + Assert.assertNotNull(znRecord); + Assert.assertEquals( + znRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY).size(), 1); + Assert.assertTrue(znRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY) + .contains(DUMMY_SHARDING_KEY)); + } +} diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java index e6ecb82..c5ffd41 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java @@ -269,19 +269,10 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest { _gZkClient = null; } - if (_zkServer != null) { - TestHelper.stopZkServer(_zkServer); - _zkServer = null; - } - if (_gZkClientTestNS != null) { _gZkClientTestNS.close(); _gZkClientTestNS = null; } - if (_zkServerTestNS != null) { - TestHelper.stopZkServer(_zkServerTestNS); - _zkServerTestNS = null; - } if (_helixRestServer != null) { _helixRestServer.shutdown();
