HDFS-10631. Federation State Store ZooKeeper implementation. Contributed by 
Jason Kace and Inigo Goiri.

(cherry picked from commit 23c4ddee11ab1300325a6361124ee8ad6f68d7a4)
(cherry picked from commit 7cb6bdf09ed361e067ebf234230babd1391a7d4b)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b71a7530
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b71a7530
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b71a7530

Branch: refs/heads/branch-2
Commit: b71a75301448dade2ef051244d3972fc59389511
Parents: 368a7d3
Author: Inigo Goiri <inigo...@apache.org>
Authored: Mon Aug 21 11:40:41 2017 -0700
Committer: vrushali <vrush...@apache.org>
Committed: Fri Oct 20 11:22:32 2017 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/pom.xml         |   9 +
 .../driver/impl/StateStoreSerializableImpl.java |  19 ++
 .../driver/impl/StateStoreZooKeeperImpl.java    | 298 +++++++++++++++++++
 .../store/driver/TestStateStoreDriverBase.java  |   2 +-
 .../store/driver/TestStateStoreZK.java          | 105 +++++++
 5 files changed, 432 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b71a7530/hadoop-hdfs-project/hadoop-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml 
b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
index 453c919..64ad6fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
@@ -220,6 +220,15 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd";>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-framework</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-test</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b71a7530/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java
index e9b3fdf..e2038fa 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java
@@ -30,6 +30,11 @@ import 
org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
  */
 public abstract class StateStoreSerializableImpl extends StateStoreBaseImpl {
 
+  /** Mark for slashes in path names. */
+  protected static final String SLASH_MARK = "0SLASH0";
+  /** Mark for colon in path names. */
+  protected static final String COLON_MARK = "_";
+
   /** Default serializer for this driver. */
   private StateStoreSerializer serializer;
 
@@ -74,4 +79,18 @@ public abstract class StateStoreSerializableImpl extends 
StateStoreBaseImpl {
       String data, Class<T> clazz, boolean includeDates) throws IOException {
     return serializer.deserialize(data, clazz);
   }
+
+  /**
+   * Get the primary key for a record. If we don't want to store in folders, we
+   * need to remove / from the name.
+   *
+   * @param record Record to get the primary key for.
+   * @return Primary key for the record.
+   */
+  protected static String getPrimaryKey(BaseRecord record) {
+    String primaryKey = record.getPrimaryKey();
+    primaryKey = primaryKey.replaceAll("/", SLASH_MARK);
+    primaryKey = primaryKey.replaceAll(":", COLON_MARK);
+    return primaryKey;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b71a7530/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
new file mode 100644
index 0000000..ddcd537
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
+
+import static 
org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
+import static 
org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordName;
+import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
+import org.apache.hadoop.util.curator.ZKCuratorManager;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link StateStoreDriver} driver implementation that uses ZooKeeper as a
+ * backend.
+ * <p>
+ * The structure of the znodes in the ensemble is:
+ * PARENT_PATH
+ * |--- MOUNT
+ * |--- MEMBERSHIP
+ * |--- REBALANCER
+ * |--- ROUTERS
+ */
+public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StateStoreZooKeeperImpl.class);
+
+
+  /** Configuration keys. */
+  public static final String FEDERATION_STORE_ZK_DRIVER_PREFIX =
+      DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.zk.";
+  public static final String FEDERATION_STORE_ZK_PARENT_PATH =
+      FEDERATION_STORE_ZK_DRIVER_PREFIX + "parent-path";
+  public static final String FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT =
+      "/hdfs-federation";
+
+
+  /** Directory to store the state store data. */
+  private String baseZNode;
+
+  /** Interface to ZooKeeper. */
+  private ZKCuratorManager zkManager;
+
+
+  @Override
+  public boolean initDriver() {
+    LOG.info("Initializing ZooKeeper connection");
+
+    Configuration conf = getConf();
+    baseZNode = conf.get(
+        FEDERATION_STORE_ZK_PARENT_PATH,
+        FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT);
+    try {
+      this.zkManager = new ZKCuratorManager(conf);
+      this.zkManager.start();
+    } catch (IOException e) {
+      LOG.error("Cannot initialize the ZK connection", e);
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public <T extends BaseRecord> boolean initRecordStorage(
+      String className, Class<T> clazz) {
+    try {
+      String checkPath = getNodePath(baseZNode, className);
+      zkManager.createRootDirRecursively(checkPath);
+      return true;
+    } catch (Exception e) {
+      LOG.error("Cannot initialize ZK node for {}: {}",
+          className, e.getMessage());
+      return false;
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (zkManager  != null) {
+      zkManager.close();
+    }
+  }
+
+  @Override
+  public boolean isDriverReady() {
+    return zkManager != null;
+  }
+
+  @Override
+  public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz)
+      throws IOException {
+    return get(clazz, (String)null);
+  }
+
+  @Override
+  public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz, String sub)
+      throws IOException {
+    verifyDriverReady();
+    List<T> ret = new ArrayList<>();
+    String znode = getZNodeForClass(clazz);
+    try {
+      List<String> children = zkManager.getChildren(znode);
+      for (String child : children) {
+        try {
+          String path = getNodePath(znode, child);
+          Stat stat = new Stat();
+          String data = zkManager.getStringData(path, stat);
+          boolean corrupted = false;
+          if (data == null || data.equals("")) {
+            // All records should have data, otherwise this is corrupted
+            corrupted = true;
+          } else {
+            try {
+              T record = createRecord(data, stat, clazz);
+              ret.add(record);
+            } catch (IOException e) {
+              LOG.error("Cannot create record type \"{}\" from \"{}\": {}",
+                  clazz.getSimpleName(), data, e.getMessage());
+              corrupted = true;
+            }
+          }
+
+          if (corrupted) {
+            LOG.error("Cannot get data for {} at {}, cleaning corrupted data",
+                child, path);
+            zkManager.delete(path);
+          }
+        } catch (Exception e) {
+          LOG.error("Cannot get data for {}: {}", child, e.getMessage());
+        }
+      }
+    } catch (Exception e) {
+      String msg = "Cannot get children for \"" + znode + "\": " +
+          e.getMessage();
+      LOG.error(msg);
+      throw new IOException(msg);
+    }
+    return new QueryResult<T>(ret, getTime());
+  }
+
+  @Override
+  public <T extends BaseRecord> boolean putAll(
+      List<T> records, boolean update, boolean error) throws IOException {
+    verifyDriverReady();
+    if (records.isEmpty()) {
+      return true;
+    }
+
+    // All records should be the same
+    T record0 = records.get(0);
+    Class<? extends BaseRecord> recordClass = record0.getClass();
+    String znode = getZNodeForClass(recordClass);
+
+    boolean status = true;
+    for (T record : records) {
+      String primaryKey = getPrimaryKey(record);
+      String recordZNode = getNodePath(znode, primaryKey);
+      byte[] data = serialize(record);
+      if (!writeNode(recordZNode, data, update, error)){
+        status = false;
+      }
+    }
+    return status;
+  }
+
+  @Override
+  public <T extends BaseRecord> int remove(
+      Class<T> clazz, Query<T> query) throws IOException {
+    verifyDriverReady();
+    if (query == null) {
+      return 0;
+    }
+
+    // Read the current data
+    List<T> records = null;
+    try {
+      QueryResult<T> result = get(clazz);
+      records = result.getRecords();
+    } catch (IOException ex) {
+      LOG.error("Cannot get existing records", ex);
+      return 0;
+    }
+
+    // Check the records to remove
+    String znode = getZNodeForClass(clazz);
+    List<T> recordsToRemove = filterMultiple(query, records);
+
+    // Remove the records
+    int removed = 0;
+    for (T existingRecord : recordsToRemove) {
+      LOG.info("Removing \"{}\"", existingRecord);
+      try {
+        String primaryKey = getPrimaryKey(existingRecord);
+        String path = getNodePath(znode, primaryKey);
+        if (zkManager.delete(path)) {
+          removed++;
+        } else {
+          LOG.error("Did not remove \"{}\"", existingRecord);
+        }
+      } catch (Exception e) {
+        LOG.error("Cannot remove \"{}\"", existingRecord, e);
+      }
+    }
+    return removed;
+  }
+
+  @Override
+  public <T extends BaseRecord> boolean removeAll(Class<T> clazz)
+      throws IOException {
+    boolean status = true;
+    String znode = getZNodeForClass(clazz);
+    LOG.info("Deleting all children under {}", znode);
+    try {
+      List<String> children = zkManager.getChildren(znode);
+      for (String child : children) {
+        String path = getNodePath(znode, child);
+        LOG.info("Deleting {}", path);
+        zkManager.delete(path);
+      }
+    } catch (Exception e) {
+      LOG.error("Cannot remove {}: {}", znode, e.getMessage());
+      status = false;
+    }
+    return status;
+  }
+
+  private boolean writeNode(
+      String znode, byte[] bytes, boolean update, boolean error) {
+    try {
+      boolean created = zkManager.create(znode);
+      if (!update && !created && error) {
+        LOG.info("Cannot write record \"{}\", it already exists", znode);
+        return false;
+      }
+
+      // Write data
+      zkManager.setData(znode, bytes, -1);
+      return true;
+    } catch (Exception e) {
+      LOG.error("Cannot write record \"{}\": {}", znode, e.getMessage());
+    }
+    return false;
+  }
+
+  /**
+   * Get the ZNode for a class.
+   *
+   * @param clazz Record class to evaluate.
+   * @return The ZNode for the class.
+   */
+  private <T extends BaseRecord> String getZNodeForClass(Class<T> clazz) {
+    String className = getRecordName(clazz);
+    return getNodePath(baseZNode, className);
+  }
+
+  /**
+   * Creates a record from a string returned by ZooKeeper.
+   *
+   * @param source Object from ZooKeeper.
+   * @param clazz The data record type to create.
+   * @return The created record.
+   * @throws IOException
+   */
+  private <T extends BaseRecord> T createRecord(
+      String data, Stat stat, Class<T> clazz) throws IOException {
+    T record = newRecord(data, clazz, false);
+    record.setDateCreated(stat.getCtime());
+    record.setDateModified(stat.getMtime());
+    return record;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b71a7530/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
index 8239fb1..65e763b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
@@ -89,7 +89,7 @@ public class TestStateStoreDriverBase {
   }
 
   private String generateRandomString() {
-    String randomString = "/randomString-" + RANDOM.nextInt();
+    String randomString = "randomString-" + RANDOM.nextInt();
     return randomString;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b71a7530/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java
new file mode 100644
index 0000000..36353ff
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver;
+
+import static 
org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import 
org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test the ZooKeeper implementation of the State Store driver.
+ */
+public class TestStateStoreZK extends TestStateStoreDriverBase {
+
+  private static TestingServer curatorTestingServer;
+  private static CuratorFramework curatorFramework;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    curatorTestingServer = new TestingServer();
+    curatorTestingServer.start();
+    String connectString = curatorTestingServer.getConnectString();
+    curatorFramework = CuratorFrameworkFactory.builder()
+        .connectString(connectString)
+        .retryPolicy(new RetryNTimes(100, 100))
+        .build();
+    curatorFramework.start();
+
+    // Create the ZK State Store
+    Configuration conf =
+        getStateStoreConfiguration(StateStoreZooKeeperImpl.class);
+    conf.set(CommonConfigurationKeys.ZK_ADDRESS, connectString);
+    // Disable auto-repair of connection
+    conf.setLong(DFSConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS,
+        TimeUnit.HOURS.toMillis(1));
+    getStateStore(conf);
+  }
+
+  @AfterClass
+  public static void tearDownCluster() {
+    curatorFramework.close();
+    try {
+      curatorTestingServer.stop();
+    } catch (IOException e) {
+    }
+  }
+
+  @Before
+  public void startup() throws IOException {
+    removeAll(getStateStoreDriver());
+  }
+
+  @Test
+  public void testInsert()
+      throws IllegalArgumentException, IllegalAccessException, IOException {
+    testInsert(getStateStoreDriver());
+  }
+
+  @Test
+  public void testUpdate()
+      throws IllegalArgumentException, ReflectiveOperationException,
+      IOException, SecurityException {
+    testPut(getStateStoreDriver());
+  }
+
+  @Test
+  public void testDelete()
+      throws IllegalArgumentException, IllegalAccessException, IOException {
+    testRemove(getStateStoreDriver());
+  }
+
+  @Test
+  public void testFetchErrors()
+      throws IllegalArgumentException, IllegalAccessException, IOException {
+    testFetchErrors(getStateStoreDriver());
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to