HDFS-10631. Federation State Store ZooKeeper implementation. Contributed by Jason Kace and Inigo Goiri.
(cherry picked from commit 23c4ddee11ab1300325a6361124ee8ad6f68d7a4) (cherry picked from commit 7cb6bdf09ed361e067ebf234230babd1391a7d4b) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b71a7530 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b71a7530 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b71a7530 Branch: refs/heads/branch-2 Commit: b71a75301448dade2ef051244d3972fc59389511 Parents: 368a7d3 Author: Inigo Goiri <inigo...@apache.org> Authored: Mon Aug 21 11:40:41 2017 -0700 Committer: vrushali <vrush...@apache.org> Committed: Fri Oct 20 11:22:32 2017 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/pom.xml | 9 + .../driver/impl/StateStoreSerializableImpl.java | 19 ++ .../driver/impl/StateStoreZooKeeperImpl.java | 298 +++++++++++++++++++ .../store/driver/TestStateStoreDriverBase.java | 2 +- .../store/driver/TestStateStoreZK.java | 105 +++++++ 5 files changed, 432 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b71a7530/hadoop-hdfs-project/hadoop-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml index 453c919..64ad6fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml @@ -220,6 +220,15 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> + </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/hadoop/blob/b71a7530/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java index e9b3fdf..e2038fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java @@ -30,6 +30,11 @@ import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; */ public abstract class StateStoreSerializableImpl extends StateStoreBaseImpl { + /** Mark for slashes in path names. */ + protected static final String SLASH_MARK = "0SLASH0"; + /** Mark for colon in path names. */ + protected static final String COLON_MARK = "_"; + /** Default serializer for this driver. */ private StateStoreSerializer serializer; @@ -74,4 +79,18 @@ public abstract class StateStoreSerializableImpl extends StateStoreBaseImpl { String data, Class<T> clazz, boolean includeDates) throws IOException { return serializer.deserialize(data, clazz); } + + /** + * Get the primary key for a record. If we don't want to store in folders, we + * need to remove / from the name. + * + * @param record Record to get the primary key for. + * @return Primary key for the record. + */ + protected static String getPrimaryKey(BaseRecord record) { + String primaryKey = record.getPrimaryKey(); + primaryKey = primaryKey.replaceAll("/", SLASH_MARK); + primaryKey = primaryKey.replaceAll(":", COLON_MARK); + return primaryKey; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b71a7530/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java new file mode 100644 index 0000000..ddcd537 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple; +import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordName; +import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; +import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; +import org.apache.hadoop.util.curator.ZKCuratorManager; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link StateStoreDriver} driver implementation that uses ZooKeeper as a + * backend. + * <p> + * The structure of the znodes in the ensemble is: + * PARENT_PATH + * |--- MOUNT + * |--- MEMBERSHIP + * |--- REBALANCER + * |--- ROUTERS + */ +public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { + + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreZooKeeperImpl.class); + + + /** Configuration keys. */ + public static final String FEDERATION_STORE_ZK_DRIVER_PREFIX = + DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.zk."; + public static final String FEDERATION_STORE_ZK_PARENT_PATH = + FEDERATION_STORE_ZK_DRIVER_PREFIX + "parent-path"; + public static final String FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT = + "/hdfs-federation"; + + + /** Directory to store the state store data. */ + private String baseZNode; + + /** Interface to ZooKeeper. */ + private ZKCuratorManager zkManager; + + + @Override + public boolean initDriver() { + LOG.info("Initializing ZooKeeper connection"); + + Configuration conf = getConf(); + baseZNode = conf.get( + FEDERATION_STORE_ZK_PARENT_PATH, + FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT); + try { + this.zkManager = new ZKCuratorManager(conf); + this.zkManager.start(); + } catch (IOException e) { + LOG.error("Cannot initialize the ZK connection", e); + return false; + } + return true; + } + + @Override + public <T extends BaseRecord> boolean initRecordStorage( + String className, Class<T> clazz) { + try { + String checkPath = getNodePath(baseZNode, className); + zkManager.createRootDirRecursively(checkPath); + return true; + } catch (Exception e) { + LOG.error("Cannot initialize ZK node for {}: {}", + className, e.getMessage()); + return false; + } + } + + @Override + public void close() throws Exception { + if (zkManager != null) { + zkManager.close(); + } + } + + @Override + public boolean isDriverReady() { + return zkManager != null; + } + + @Override + public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) + throws IOException { + return get(clazz, (String)null); + } + + @Override + public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz, String sub) + throws IOException { + verifyDriverReady(); + List<T> ret = new ArrayList<>(); + String znode = getZNodeForClass(clazz); + try { + List<String> children = zkManager.getChildren(znode); + for (String child : children) { + try { + String path = getNodePath(znode, child); + Stat stat = new Stat(); + String data = zkManager.getStringData(path, stat); + boolean corrupted = false; + if (data == null || data.equals("")) { + // All records should have data, otherwise this is corrupted + corrupted = true; + } else { + try { + T record = createRecord(data, stat, clazz); + ret.add(record); + } catch (IOException e) { + LOG.error("Cannot create record type \"{}\" from \"{}\": {}", + clazz.getSimpleName(), data, e.getMessage()); + corrupted = true; + } + } + + if (corrupted) { + LOG.error("Cannot get data for {} at {}, cleaning corrupted data", + child, path); + zkManager.delete(path); + } + } catch (Exception e) { + LOG.error("Cannot get data for {}: {}", child, e.getMessage()); + } + } + } catch (Exception e) { + String msg = "Cannot get children for \"" + znode + "\": " + + e.getMessage(); + LOG.error(msg); + throw new IOException(msg); + } + return new QueryResult<T>(ret, getTime()); + } + + @Override + public <T extends BaseRecord> boolean putAll( + List<T> records, boolean update, boolean error) throws IOException { + verifyDriverReady(); + if (records.isEmpty()) { + return true; + } + + // All records should be the same + T record0 = records.get(0); + Class<? extends BaseRecord> recordClass = record0.getClass(); + String znode = getZNodeForClass(recordClass); + + boolean status = true; + for (T record : records) { + String primaryKey = getPrimaryKey(record); + String recordZNode = getNodePath(znode, primaryKey); + byte[] data = serialize(record); + if (!writeNode(recordZNode, data, update, error)){ + status = false; + } + } + return status; + } + + @Override + public <T extends BaseRecord> int remove( + Class<T> clazz, Query<T> query) throws IOException { + verifyDriverReady(); + if (query == null) { + return 0; + } + + // Read the current data + List<T> records = null; + try { + QueryResult<T> result = get(clazz); + records = result.getRecords(); + } catch (IOException ex) { + LOG.error("Cannot get existing records", ex); + return 0; + } + + // Check the records to remove + String znode = getZNodeForClass(clazz); + List<T> recordsToRemove = filterMultiple(query, records); + + // Remove the records + int removed = 0; + for (T existingRecord : recordsToRemove) { + LOG.info("Removing \"{}\"", existingRecord); + try { + String primaryKey = getPrimaryKey(existingRecord); + String path = getNodePath(znode, primaryKey); + if (zkManager.delete(path)) { + removed++; + } else { + LOG.error("Did not remove \"{}\"", existingRecord); + } + } catch (Exception e) { + LOG.error("Cannot remove \"{}\"", existingRecord, e); + } + } + return removed; + } + + @Override + public <T extends BaseRecord> boolean removeAll(Class<T> clazz) + throws IOException { + boolean status = true; + String znode = getZNodeForClass(clazz); + LOG.info("Deleting all children under {}", znode); + try { + List<String> children = zkManager.getChildren(znode); + for (String child : children) { + String path = getNodePath(znode, child); + LOG.info("Deleting {}", path); + zkManager.delete(path); + } + } catch (Exception e) { + LOG.error("Cannot remove {}: {}", znode, e.getMessage()); + status = false; + } + return status; + } + + private boolean writeNode( + String znode, byte[] bytes, boolean update, boolean error) { + try { + boolean created = zkManager.create(znode); + if (!update && !created && error) { + LOG.info("Cannot write record \"{}\", it already exists", znode); + return false; + } + + // Write data + zkManager.setData(znode, bytes, -1); + return true; + } catch (Exception e) { + LOG.error("Cannot write record \"{}\": {}", znode, e.getMessage()); + } + return false; + } + + /** + * Get the ZNode for a class. + * + * @param clazz Record class to evaluate. + * @return The ZNode for the class. + */ + private <T extends BaseRecord> String getZNodeForClass(Class<T> clazz) { + String className = getRecordName(clazz); + return getNodePath(baseZNode, className); + } + + /** + * Creates a record from a string returned by ZooKeeper. + * + * @param source Object from ZooKeeper. + * @param clazz The data record type to create. + * @return The created record. + * @throws IOException + */ + private <T extends BaseRecord> T createRecord( + String data, Stat stat, Class<T> clazz) throws IOException { + T record = newRecord(data, clazz, false); + record.setDateCreated(stat.getCtime()); + record.setDateModified(stat.getMtime()); + return record; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b71a7530/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java index 8239fb1..65e763b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java @@ -89,7 +89,7 @@ public class TestStateStoreDriverBase { } private String generateRandomString() { - String randomString = "/randomString-" + RANDOM.nextInt(); + String randomString = "randomString-" + RANDOM.nextInt(); return randomString; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b71a7530/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java new file mode 100644 index 0000000..36353ff --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver; + +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.curator.test.TestingServer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test the ZooKeeper implementation of the State Store driver. + */ +public class TestStateStoreZK extends TestStateStoreDriverBase { + + private static TestingServer curatorTestingServer; + private static CuratorFramework curatorFramework; + + @BeforeClass + public static void setupCluster() throws Exception { + curatorTestingServer = new TestingServer(); + curatorTestingServer.start(); + String connectString = curatorTestingServer.getConnectString(); + curatorFramework = CuratorFrameworkFactory.builder() + .connectString(connectString) + .retryPolicy(new RetryNTimes(100, 100)) + .build(); + curatorFramework.start(); + + // Create the ZK State Store + Configuration conf = + getStateStoreConfiguration(StateStoreZooKeeperImpl.class); + conf.set(CommonConfigurationKeys.ZK_ADDRESS, connectString); + // Disable auto-repair of connection + conf.setLong(DFSConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS, + TimeUnit.HOURS.toMillis(1)); + getStateStore(conf); + } + + @AfterClass + public static void tearDownCluster() { + curatorFramework.close(); + try { + curatorTestingServer.stop(); + } catch (IOException e) { + } + } + + @Before + public void startup() throws IOException { + removeAll(getStateStoreDriver()); + } + + @Test + public void testInsert() + throws IllegalArgumentException, IllegalAccessException, IOException { + testInsert(getStateStoreDriver()); + } + + @Test + public void testUpdate() + throws IllegalArgumentException, ReflectiveOperationException, + IOException, SecurityException { + testPut(getStateStoreDriver()); + } + + @Test + public void testDelete() + throws IllegalArgumentException, IllegalAccessException, IOException { + testRemove(getStateStoreDriver()); + } + + @Test + public void testFetchErrors() + throws IllegalArgumentException, IllegalAccessException, IOException { + testFetchErrors(getStateStoreDriver()); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org