[ https://issues.apache.org/jira/browse/HDFS-16848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17654802#comment-17654802 ]
ASF GitHub Bot commented on HDFS-16848: --------------------------------------- ZanderXu commented on code in PR #5147: URL: https://github.com/apache/hadoop/pull/5147#discussion_r1062141662 ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java: ########## @@ -109,8 +138,16 @@ public <T extends BaseRecord> boolean initRecordStorage( } } + @VisibleForTesting + public void setEnableConcurrent(boolean enableConcurrent) { + this.enableConcurrent = enableConcurrent; + } + @Override public void close() throws Exception { + if(executorService != null) { Review Comment: `if (executorService != null) {` ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java: ########## @@ -63,8 +72,14 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { RBFConfigKeys.FEDERATION_STORE_PREFIX + "driver.zk."; public static final String FEDERATION_STORE_ZK_PARENT_PATH = FEDERATION_STORE_ZK_DRIVER_PREFIX + "parent-path"; + public static final String FEDERATION_STORE_ZK_CLIENT_THREADS_SIZE = + FEDERATION_STORE_ZK_DRIVER_PREFIX + "client.size"; Review Comment: how about changing the name to `FEDERATION_STORE_ZK_DRIVER_PREFIX + "async.max.threads"`? ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java: ########## @@ -63,8 +72,14 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { RBFConfigKeys.FEDERATION_STORE_PREFIX + "driver.zk."; public static final String FEDERATION_STORE_ZK_PARENT_PATH = FEDERATION_STORE_ZK_DRIVER_PREFIX + "parent-path"; + public static final String FEDERATION_STORE_ZK_CLIENT_THREADS_SIZE = + FEDERATION_STORE_ZK_DRIVER_PREFIX + "client.size"; + public static final int FEDERATION_STORE_ZK_CLIENT_THREADS_SIZE_DEFAULT = -1; Review Comment: This configuration should be moved to `org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys` if you want to add some descriptions in hdfs-rbf-default.xml ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java: ########## @@ -137,34 +174,22 @@ public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) String znode = getZNodeForClass(clazz); try { List<String> children = zkManager.getChildren(znode); - for (String child : children) { - try { - String path = getNodePath(znode, child); - Stat stat = new Stat(); - String data = zkManager.getStringData(path, stat); - boolean corrupted = false; - if (data == null || data.equals("")) { - // All records should have data, otherwise this is corrupted - corrupted = true; - } else { - try { - T record = createRecord(data, stat, clazz); - ret.add(record); - } catch (IOException e) { - LOG.error("Cannot create record type \"{}\" from \"{}\": {}", - clazz.getSimpleName(), data, e.getMessage()); - corrupted = true; - } - } - - if (corrupted) { - LOG.error("Cannot get data for {} at {}, cleaning corrupted data", - child, path); - zkManager.delete(path); + List<Callable<T>> callables = new ArrayList<>(); + if (enableConcurrent) { + children.forEach(child -> callables.add(() -> getRecord(clazz, znode, child))); + List<Future<T>> futures = executorService.invokeAll(callables); + for (Future<T> future : futures) { + if (future.get() != null) { + ret.add(future.get()); } - } catch (Exception e) { - LOG.error("Cannot get data for {}: {}", child, e.getMessage()); } + } else { + children.forEach(child -> { + T record = getRecord(clazz, znode, child); + if (record != null) { + ret.add(record); + } + }); Review Comment: ``` List<Callable<T>> callables = new ArrayList<>(); zkManager.getChildren(znode).forEach(c -> callables.add(() -> getRecord(clazz, znode, c))); if (enableConcurrent) { List<Future<T>> futures = executorService.invokeAll(callables); for (Future<T> future : futures) { if (future.get() != null) { ret.add(future.get()); } } } else { for (Callable<T> callable : callables) { T record = callable.call(); if (record != null) { ret.add(record); } } } ``` ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java: ########## @@ -126,33 +133,73 @@ private <T extends BaseRecord> void testGetNullRecord( assertNull(curatorFramework.checkExists().forPath(znode)); } + @Test + public void testAsyncPerformance() throws Exception { + StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) getStateStoreDriver(); + List<MountTable> insertList = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + MountTable newRecord = generateFakeRecord(MountTable.class); + insertList.add(newRecord); + } + // Insert Multiple on sync mode + long startSync = Time.now(); + stateStoreDriver.putAll(insertList, true, false); + long endSync = Time.now(); + stateStoreDriver.removeAll(MembershipState.class); + + stateStoreDriver.setEnableConcurrent(true); + // Insert Multiple on async mode + long startAsync = Time.now(); + stateStoreDriver.putAll(insertList, true, false); + long endAsync = Time.now(); + System.out.printf("Sync mode total running time is %d ms, " Review Comment: change it to `LOG.info()` or delete it. ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml: ########## @@ -377,6 +377,18 @@ </description> </property> + <property> + <name>dfs.federation.router.store.driver.zk.client.size</name> Review Comment: make this configuration more reasonable. ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java: ########## @@ -126,33 +133,73 @@ private <T extends BaseRecord> void testGetNullRecord( assertNull(curatorFramework.checkExists().forPath(znode)); } + @Test + public void testAsyncPerformance() throws Exception { + StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) getStateStoreDriver(); + List<MountTable> insertList = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + MountTable newRecord = generateFakeRecord(MountTable.class); + insertList.add(newRecord); + } + // Insert Multiple on sync mode + long startSync = Time.now(); + stateStoreDriver.putAll(insertList, true, false); + long endSync = Time.now(); + stateStoreDriver.removeAll(MembershipState.class); + + stateStoreDriver.setEnableConcurrent(true); + // Insert Multiple on async mode + long startAsync = Time.now(); + stateStoreDriver.putAll(insertList, true, false); + long endAsync = Time.now(); + System.out.printf("Sync mode total running time is %d ms, " + + "and async mode total running time is %d ms", + endSync - startSync, endAsync - startAsync); + assertTrue((endSync - startSync) > (endAsync - startAsync) * 2); Review Comment: assertTrue((endSync - startSync) > (endAsync - startAsync)); ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java: ########## @@ -192,22 +255,45 @@ public <T extends BaseRecord> boolean putAll( String znode = getZNodeForClass(recordClass); long start = monotonicNow(); - boolean status = true; - for (T record : records) { - String primaryKey = getPrimaryKey(record); - String recordZNode = getNodePath(znode, primaryKey); - byte[] data = serialize(record); - if (!writeNode(recordZNode, data, update, error)){ - status = false; + final AtomicBoolean status = new AtomicBoolean(true); + if (enableConcurrent) { + List<Callable<Void>> callables = new ArrayList<>(); + records.forEach(record -> + callables.add( + () -> { + String primaryKey = getPrimaryKey(record); + String recordZNode = getNodePath(znode, primaryKey); + byte[] data = serialize(record); + if (!writeNode(recordZNode, data, update, error)) { + status.set(false); + } + return null; + } + ) + ); + try { + executorService.invokeAll(callables); + } catch (Exception e) { + LOG.error("Write record failed : {}", e.getMessage(), e); + throw new IOException(e); } + } else { + records.forEach(record -> { Review Comment: nice suggestion to make the code clearer > RBF: Improve StateStoreZookeeperImpl > ------------------------------------- > > Key: HDFS-16848 > URL: https://issues.apache.org/jira/browse/HDFS-16848 > Project: Hadoop HDFS > Issue Type: Improvement > Components: rbf > Reporter: Sun Hao > Priority: Major > Labels: pull-request-available > Fix For: 3.4.0 > > > Currently, router is getting/updating state from zk sequentially. It will > slowdown router load/update state cache especially for a large cluster or a > multi region cluster. > We propose adding a threadpool to deal with zk state synchronization。 -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org