[ 
https://issues.apache.org/jira/browse/HDFS-16848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17683617#comment-17683617
 ] 

ASF GitHub Bot commented on HDFS-16848:
---------------------------------------

howzi commented on code in PR #5147:
URL: https://github.com/apache/hadoop/pull/5147#discussion_r1095265373


##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java:
##########
@@ -239,6 +239,18 @@ public class RBFConfigKeys extends 
CommonConfigurationKeysPublic {
   public static final long
       FEDERATION_STORE_ROUTER_EXPIRATION_DELETION_MS_DEFAULT = -1;
 
+  // HDFS Router-based federation State Store ZK DRIVER
+  public static final String FEDERATION_STORE_ZK_DRIVER_PREFIX =
+      RBFConfigKeys.FEDERATION_STORE_PREFIX + "driver.zk.";
+  public static final String FEDERATION_STORE_ZK_PARENT_PATH =
+      FEDERATION_STORE_ZK_DRIVER_PREFIX + "parent-path";
+  public static final String FEDERATION_STORE_ZK_ASYNC_MAX_THREADS =
+      FEDERATION_STORE_ZK_DRIVER_PREFIX + "async.max.threads";
+  public static final int FEDERATION_STORE_ZK_ASYNC_MAX_THREADS_DEFAULT =
+      -1;
+  public static final String FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT =
+      "/hdfs-federation";

Review Comment:
   fixed



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java:
##########
@@ -126,33 +131,75 @@ private <T extends BaseRecord> void testGetNullRecord(
     assertNull(curatorFramework.checkExists().forPath(znode));
   }
 
+  @Test
+  public void testAsyncPerformance() throws Exception {
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    List<MountTable> insertList = new ArrayList<>();
+    for (int i = 0; i < 1000; i++) {
+      MountTable newRecord = generateFakeRecord(MountTable.class);
+      insertList.add(newRecord);
+    }
+    // Insert Multiple on sync mode
+    long startSync = Time.now();
+    stateStoreDriver.putAll(insertList, true, false);
+    long endSync = Time.now();
+    stateStoreDriver.removeAll(MembershipState.class);
+
+    stateStoreDriver.setEnableConcurrent(true);
+    // Insert Multiple on async mode
+    long startAsync = Time.now();
+    stateStoreDriver.putAll(insertList, true, false);
+    long endAsync = Time.now();
+    assertTrue((endSync - startSync) > (endAsync - startAsync));
+  }
+
   @Test
   public void testGetNullRecord() throws Exception {
-    testGetNullRecord(getStateStoreDriver());
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    testGetNullRecord(stateStoreDriver);
+    // test async mode
+    stateStoreDriver.setEnableConcurrent(true);
+    testGetNullRecord(stateStoreDriver);
   }
 
   @Test
   public void testInsert()
       throws IllegalArgumentException, IllegalAccessException, IOException {
-    testInsert(getStateStoreDriver());
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    testInsert(stateStoreDriver);
+    // test async mode
+    stateStoreDriver.setEnableConcurrent(true);
+    testInsert(stateStoreDriver);
   }
 
   @Test
   public void testUpdate()
       throws IllegalArgumentException, ReflectiveOperationException,
       IOException, SecurityException {
-    testPut(getStateStoreDriver());
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    testPut(stateStoreDriver);
+    // test async mode
+    stateStoreDriver.setEnableConcurrent(true);
+    testPut(stateStoreDriver);
   }
 
   @Test
   public void testDelete()
       throws IllegalArgumentException, IllegalAccessException, IOException {
-    testRemove(getStateStoreDriver());
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    testRemove(stateStoreDriver);
+    // test async mode
+    stateStoreDriver.setEnableConcurrent(true);
+    testRemove(stateStoreDriver);
   }
 
   @Test
   public void testFetchErrors()
       throws IllegalArgumentException, IllegalAccessException, IOException {
-    testFetchErrors(getStateStoreDriver());
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    testFetchErrors(stateStoreDriver);
+    // test async mode

Review Comment:
   fixed



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java:
##########
@@ -126,33 +131,75 @@ private <T extends BaseRecord> void testGetNullRecord(
     assertNull(curatorFramework.checkExists().forPath(znode));
   }
 
+  @Test
+  public void testAsyncPerformance() throws Exception {
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    List<MountTable> insertList = new ArrayList<>();
+    for (int i = 0; i < 1000; i++) {
+      MountTable newRecord = generateFakeRecord(MountTable.class);
+      insertList.add(newRecord);
+    }
+    // Insert Multiple on sync mode
+    long startSync = Time.now();
+    stateStoreDriver.putAll(insertList, true, false);
+    long endSync = Time.now();
+    stateStoreDriver.removeAll(MembershipState.class);
+
+    stateStoreDriver.setEnableConcurrent(true);
+    // Insert Multiple on async mode
+    long startAsync = Time.now();
+    stateStoreDriver.putAll(insertList, true, false);
+    long endAsync = Time.now();
+    assertTrue((endSync - startSync) > (endAsync - startAsync));
+  }
+
   @Test
   public void testGetNullRecord() throws Exception {
-    testGetNullRecord(getStateStoreDriver());
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    testGetNullRecord(stateStoreDriver);
+    // test async mode
+    stateStoreDriver.setEnableConcurrent(true);
+    testGetNullRecord(stateStoreDriver);
   }
 
   @Test
   public void testInsert()
       throws IllegalArgumentException, IllegalAccessException, IOException {
-    testInsert(getStateStoreDriver());
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    testInsert(stateStoreDriver);
+    // test async mode
+    stateStoreDriver.setEnableConcurrent(true);
+    testInsert(stateStoreDriver);
   }
 
   @Test
   public void testUpdate()
       throws IllegalArgumentException, ReflectiveOperationException,
       IOException, SecurityException {
-    testPut(getStateStoreDriver());
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    testPut(stateStoreDriver);
+    // test async mode

Review Comment:
   fixed



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java:
##########
@@ -126,33 +131,75 @@ private <T extends BaseRecord> void testGetNullRecord(
     assertNull(curatorFramework.checkExists().forPath(znode));
   }
 
+  @Test
+  public void testAsyncPerformance() throws Exception {
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    List<MountTable> insertList = new ArrayList<>();
+    for (int i = 0; i < 1000; i++) {
+      MountTable newRecord = generateFakeRecord(MountTable.class);
+      insertList.add(newRecord);
+    }
+    // Insert Multiple on sync mode
+    long startSync = Time.now();
+    stateStoreDriver.putAll(insertList, true, false);
+    long endSync = Time.now();
+    stateStoreDriver.removeAll(MembershipState.class);
+
+    stateStoreDriver.setEnableConcurrent(true);
+    // Insert Multiple on async mode
+    long startAsync = Time.now();
+    stateStoreDriver.putAll(insertList, true, false);
+    long endAsync = Time.now();
+    assertTrue((endSync - startSync) > (endAsync - startAsync));
+  }
+
   @Test
   public void testGetNullRecord() throws Exception {
-    testGetNullRecord(getStateStoreDriver());
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    testGetNullRecord(stateStoreDriver);
+    // test async mode
+    stateStoreDriver.setEnableConcurrent(true);
+    testGetNullRecord(stateStoreDriver);
   }
 
   @Test
   public void testInsert()
       throws IllegalArgumentException, IllegalAccessException, IOException {
-    testInsert(getStateStoreDriver());
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    testInsert(stateStoreDriver);
+    // test async mode

Review Comment:
   fixed





> RBF: Improve StateStoreZookeeperImpl 
> -------------------------------------
>
>                 Key: HDFS-16848
>                 URL: https://issues.apache.org/jira/browse/HDFS-16848
>             Project: Hadoop HDFS
>          Issue Type: Improvement
>          Components: rbf
>            Reporter: Sun Hao
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 3.4.0
>
>
> Currently, router is getting/updating state from zk sequentially. It will 
> slowdown router load/update state cache especially for a large cluster or a 
> multi region cluster.
> We propose adding a threadpool to deal with zk state synchronization。



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to