[ 
https://issues.apache.org/jira/browse/HDFS-16848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17637527#comment-17637527
 ] 

ASF GitHub Bot commented on HDFS-16848:
---------------------------------------

ZanderXu commented on code in PR #5147:
URL: https://github.com/apache/hadoop/pull/5147#discussion_r1029955572


##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -84,6 +102,20 @@ public boolean initDriver() {
     baseZNode = conf.get(
         FEDERATION_STORE_ZK_PARENT_PATH,
         FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT);
+    this.enableConcurrent = conf.getBoolean(
+        FEDERATION_STORE_ZK_CLIENT_CONCURRENT,
+        FEDERATION_STORE_ZK_CLIENT_CONCURRENT_DEFAULT
+    );
+    if(enableConcurrent) {

Review Comment:
   `if (enableConcurrent)`



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -137,34 +177,26 @@ public <T extends BaseRecord> QueryResult<T> get(Class<T> 
clazz)
     String znode = getZNodeForClass(clazz);
     try {
       List<String> children = zkManager.getChildren(znode);
-      for (String child : children) {
-        try {
-          String path = getNodePath(znode, child);
-          Stat stat = new Stat();
-          String data = zkManager.getStringData(path, stat);
-          boolean corrupted = false;
-          if (data == null || data.equals("")) {
-            // All records should have data, otherwise this is corrupted
-            corrupted = true;
-          } else {
-            try {
-              T record = createRecord(data, stat, clazz);
-              ret.add(record);
-            } catch (IOException e) {
-              LOG.error("Cannot create record type \"{}\" from \"{}\": {}",
-                  clazz.getSimpleName(), data, e.getMessage());
-              corrupted = true;
-            }
-          }
-
-          if (corrupted) {
-            LOG.error("Cannot get data for {} at {}, cleaning corrupted data",
-                child, path);
-            zkManager.delete(path);
+      List<Callable<T>> callables = new ArrayList<>();
+      if(enableConcurrent) {
+        children.forEach(child ->
+            callables.add(
+                () -> getRecord(clazz, znode, child)
+            )
+        );
+        List<Future<T>> futures = executorService.invokeAll(callables);
+        for (Future<T> future : futures) {
+          if (future.get() != null) {
+            ret.add(future.get());
           }
-        } catch (Exception e) {
-          LOG.error("Cannot get data for {}: {}", child, e.getMessage());
         }
+      } else {
+        children.forEach(child -> {
+          T record = getRecord(clazz, znode, child);
+          if(record != null) {

Review Comment:
   `if (record != null) {`



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -63,8 +72,17 @@ public class StateStoreZooKeeperImpl extends 
StateStoreSerializableImpl {
       RBFConfigKeys.FEDERATION_STORE_PREFIX + "driver.zk.";
   public static final String FEDERATION_STORE_ZK_PARENT_PATH =
       FEDERATION_STORE_ZK_DRIVER_PREFIX + "parent-path";
+  public static final String FEDERATION_STORE_ZK_CLIENT_THREADS_SIZE =
+      FEDERATION_STORE_ZK_DRIVER_PREFIX + "client.size";
+  public static final int FEDERATION_STORE_ZK_CLIENT_THREADS_SIZE_DEFAULT = 10;
+  public static final String FEDERATION_STORE_ZK_CLIENT_CONCURRENT =
+      FEDERATION_STORE_ZK_DRIVER_PREFIX + "client.concurrent";
+  public static final boolean FEDERATION_STORE_ZK_CLIENT_CONCURRENT_DEFAULT = 
false;

Review Comment:
   Can you use one configuration to control whether use this async mode or not? 
such as the number of thread, default value is -1 or 0, and use can enable this 
feature by configuring this value with a positive integer.
   
   And we should add detailed explanation of this configuration in 
hdfs-rbf-default.xml



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -84,6 +102,20 @@ public boolean initDriver() {
     baseZNode = conf.get(
         FEDERATION_STORE_ZK_PARENT_PATH,
         FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT);
+    this.enableConcurrent = conf.getBoolean(
+        FEDERATION_STORE_ZK_CLIENT_CONCURRENT,
+        FEDERATION_STORE_ZK_CLIENT_CONCURRENT_DEFAULT
+    );
+    if(enableConcurrent) {
+      int numThreads = conf.getInt(
+          FEDERATION_STORE_ZK_CLIENT_THREADS_SIZE,
+          FEDERATION_STORE_ZK_CLIENT_THREADS_SIZE_DEFAULT);
+      ThreadFactory threadFactory = new ThreadFactoryBuilder()

Review Comment:
   check the invalidation of this `numThreads`.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -137,34 +177,26 @@ public <T extends BaseRecord> QueryResult<T> get(Class<T> 
clazz)
     String znode = getZNodeForClass(clazz);
     try {
       List<String> children = zkManager.getChildren(znode);
-      for (String child : children) {
-        try {
-          String path = getNodePath(znode, child);
-          Stat stat = new Stat();
-          String data = zkManager.getStringData(path, stat);
-          boolean corrupted = false;
-          if (data == null || data.equals("")) {
-            // All records should have data, otherwise this is corrupted
-            corrupted = true;
-          } else {
-            try {
-              T record = createRecord(data, stat, clazz);
-              ret.add(record);
-            } catch (IOException e) {
-              LOG.error("Cannot create record type \"{}\" from \"{}\": {}",
-                  clazz.getSimpleName(), data, e.getMessage());
-              corrupted = true;
-            }
-          }
-
-          if (corrupted) {
-            LOG.error("Cannot get data for {} at {}, cleaning corrupted data",
-                child, path);
-            zkManager.delete(path);
+      List<Callable<T>> callables = new ArrayList<>();
+      if(enableConcurrent) {
+        children.forEach(child ->
+            callables.add(
+                () -> getRecord(clazz, znode, child)
+            )
+        );

Review Comment:
   single line.
   `children.forEach(child -> callables.add(() -> getRecord(clazz, znode, 
child)));`



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -178,6 +210,45 @@ public <T extends BaseRecord> QueryResult<T> get(Class<T> 
clazz)
     return new QueryResult<T>(ret, getTime());
   }
 
+  /**
+   * Get one data record in the StateStore or delete it if it's corrupted.
+   *
+   * @param clazz Record class to evaluate.
+   * @param znode The ZNode for the class.
+   * @param child The child for znode to get.
+   * @return The record to get.
+   */
+  private <T extends BaseRecord> T getRecord(Class<T> clazz, String znode, 
String child) {
+    T record = null;
+    try {
+      String path = getNodePath(znode, child);
+      Stat stat = new Stat();
+      String data = zkManager.getStringData(path, stat);
+      boolean corrupted = false;
+      if (data == null || data.equals("")) {
+        // All records should have data, otherwise this is corrupted
+        corrupted = true;
+      } else {
+        try {
+          record = createRecord(data, stat, clazz);
+        } catch (IOException e) {
+          LOG.error("Cannot create record type \"{}\" from \"{}\": {}",
+              clazz.getSimpleName(), data, e.getMessage());
+          corrupted = true;
+        }
+      }
+
+      if (corrupted) {
+        LOG.error("Cannot get data for {} at {}, cleaning corrupted data",
+            child, path);

Review Comment:
   single line.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -192,22 +263,45 @@ public <T extends BaseRecord> boolean putAll(
     String znode = getZNodeForClass(recordClass);
 
     long start = monotonicNow();
-    boolean status = true;
-    for (T record : records) {
-      String primaryKey = getPrimaryKey(record);
-      String recordZNode = getNodePath(znode, primaryKey);
-      byte[] data = serialize(record);
-      if (!writeNode(recordZNode, data, update, error)){
-        status = false;
+    final AtomicBoolean status = new AtomicBoolean(true);
+    if(enableConcurrent){
+      List<Callable<Void>> callables = new ArrayList<>();
+      records.forEach(record ->
+          callables.add(
+              () -> {
+                String primaryKey = getPrimaryKey(record);
+                String recordZNode = getNodePath(znode, primaryKey);
+                byte[] data = serialize(record);
+                if(!writeNode(recordZNode, data, update, error)) {

Review Comment:
   `if (!writeNode(recordZNode, data, update, error))`



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -192,22 +263,45 @@ public <T extends BaseRecord> boolean putAll(
     String znode = getZNodeForClass(recordClass);
 
     long start = monotonicNow();
-    boolean status = true;
-    for (T record : records) {
-      String primaryKey = getPrimaryKey(record);
-      String recordZNode = getNodePath(znode, primaryKey);
-      byte[] data = serialize(record);
-      if (!writeNode(recordZNode, data, update, error)){
-        status = false;
+    final AtomicBoolean status = new AtomicBoolean(true);
+    if(enableConcurrent){
+      List<Callable<Void>> callables = new ArrayList<>();
+      records.forEach(record ->
+          callables.add(
+              () -> {
+                String primaryKey = getPrimaryKey(record);
+                String recordZNode = getNodePath(znode, primaryKey);
+                byte[] data = serialize(record);
+                if(!writeNode(recordZNode, data, update, error)) {
+                  status.set(false);
+                }
+                return null;
+              }
+          )
+      );
+      try {
+        executorService.invokeAll(callables);
+      } catch (Exception e) {
+        LOG.error("Write record failed : {}", e.getMessage(), e);
+        throw new IOException(e);
       }
+    } else {
+      records.forEach(record -> {
+        String primaryKey = getPrimaryKey(record);
+        String recordZNode = getNodePath(znode, primaryKey);
+        byte[] data = serialize(record);
+        if(!writeNode(recordZNode, data, update, error)) {

Review Comment:
   `if (!writeNode(recordZNode, data, update, error)) {`



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -192,22 +263,45 @@ public <T extends BaseRecord> boolean putAll(
     String znode = getZNodeForClass(recordClass);
 
     long start = monotonicNow();
-    boolean status = true;
-    for (T record : records) {
-      String primaryKey = getPrimaryKey(record);
-      String recordZNode = getNodePath(znode, primaryKey);
-      byte[] data = serialize(record);
-      if (!writeNode(recordZNode, data, update, error)){
-        status = false;
+    final AtomicBoolean status = new AtomicBoolean(true);
+    if(enableConcurrent){

Review Comment:
   `  if (enableConcurrent) {`



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -137,34 +177,26 @@ public <T extends BaseRecord> QueryResult<T> get(Class<T> 
clazz)
     String znode = getZNodeForClass(clazz);
     try {
       List<String> children = zkManager.getChildren(znode);
-      for (String child : children) {
-        try {
-          String path = getNodePath(znode, child);
-          Stat stat = new Stat();
-          String data = zkManager.getStringData(path, stat);
-          boolean corrupted = false;
-          if (data == null || data.equals("")) {
-            // All records should have data, otherwise this is corrupted
-            corrupted = true;
-          } else {
-            try {
-              T record = createRecord(data, stat, clazz);
-              ret.add(record);
-            } catch (IOException e) {
-              LOG.error("Cannot create record type \"{}\" from \"{}\": {}",
-                  clazz.getSimpleName(), data, e.getMessage());
-              corrupted = true;
-            }
-          }
-
-          if (corrupted) {
-            LOG.error("Cannot get data for {} at {}, cleaning corrupted data",
-                child, path);
-            zkManager.delete(path);
+      List<Callable<T>> callables = new ArrayList<>();
+      if(enableConcurrent) {

Review Comment:
   `if( enableConcurrent) {`



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java:
##########
@@ -126,33 +133,71 @@ private <T extends BaseRecord> void testGetNullRecord(
     assertNull(curatorFramework.checkExists().forPath(znode));
   }
 
+  @Test
+  public void testAsyncPerformance() throws Exception {
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    List<MountTable> insertList = new ArrayList<>();
+    for (int i = 0; i < 1000; i++) {
+      MountTable newRecord = generateFakeRecord(MountTable.class);
+      insertList.add(newRecord);
+    }
+    // Insert Multiple on sync mode
+    long startSync = Time.now();
+    stateStoreDriver.putAll(insertList, true, false);
+    long endSync = Time.now();
+    stateStoreDriver.removeAll(MembershipState.class);
+
+    stateStoreDriver.setEnableConcurrent(true);
+    // Insert Multiple on async mode
+    long startAsync = Time.now();
+    stateStoreDriver.putAll(insertList, true, false);
+    long endAsync = Time.now();
+    System.out.printf("Sync mode total running time is %d ms, and async mode 
total running time is %d ms",

Review Comment:
   This UT is invalid, can you change it to `assertXXX` ?





> RBF: Improve StateStoreZookeeperImpl 
> -------------------------------------
>
>                 Key: HDFS-16848
>                 URL: https://issues.apache.org/jira/browse/HDFS-16848
>             Project: Hadoop HDFS
>          Issue Type: Improvement
>          Components: rbf
>            Reporter: Sun Hao
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 3.4.0
>
>
> Currently, router is getting/updating state from zk sequentially. It will 
> slowdown router load/update state cache especially for a large cluster or a 
> multi region cluster.
> We propose adding a threadpool to deal with zk state synchronization。



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to