[ https://issues.apache.org/jira/browse/HDFS-16848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17637527#comment-17637527 ]
ASF GitHub Bot commented on HDFS-16848: --------------------------------------- ZanderXu commented on code in PR #5147: URL: https://github.com/apache/hadoop/pull/5147#discussion_r1029955572 ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java: ########## @@ -84,6 +102,20 @@ public boolean initDriver() { baseZNode = conf.get( FEDERATION_STORE_ZK_PARENT_PATH, FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT); + this.enableConcurrent = conf.getBoolean( + FEDERATION_STORE_ZK_CLIENT_CONCURRENT, + FEDERATION_STORE_ZK_CLIENT_CONCURRENT_DEFAULT + ); + if(enableConcurrent) { Review Comment: `if (enableConcurrent)` ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java: ########## @@ -137,34 +177,26 @@ public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) String znode = getZNodeForClass(clazz); try { List<String> children = zkManager.getChildren(znode); - for (String child : children) { - try { - String path = getNodePath(znode, child); - Stat stat = new Stat(); - String data = zkManager.getStringData(path, stat); - boolean corrupted = false; - if (data == null || data.equals("")) { - // All records should have data, otherwise this is corrupted - corrupted = true; - } else { - try { - T record = createRecord(data, stat, clazz); - ret.add(record); - } catch (IOException e) { - LOG.error("Cannot create record type \"{}\" from \"{}\": {}", - clazz.getSimpleName(), data, e.getMessage()); - corrupted = true; - } - } - - if (corrupted) { - LOG.error("Cannot get data for {} at {}, cleaning corrupted data", - child, path); - zkManager.delete(path); + List<Callable<T>> callables = new ArrayList<>(); + if(enableConcurrent) { + children.forEach(child -> + callables.add( + () -> getRecord(clazz, znode, child) + ) + ); + List<Future<T>> futures = executorService.invokeAll(callables); + for (Future<T> future : futures) { + if (future.get() != null) { + ret.add(future.get()); } - } catch (Exception e) { - LOG.error("Cannot get data for {}: {}", child, e.getMessage()); } + } else { + children.forEach(child -> { + T record = getRecord(clazz, znode, child); + if(record != null) { Review Comment: `if (record != null) {` ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java: ########## @@ -63,8 +72,17 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { RBFConfigKeys.FEDERATION_STORE_PREFIX + "driver.zk."; public static final String FEDERATION_STORE_ZK_PARENT_PATH = FEDERATION_STORE_ZK_DRIVER_PREFIX + "parent-path"; + public static final String FEDERATION_STORE_ZK_CLIENT_THREADS_SIZE = + FEDERATION_STORE_ZK_DRIVER_PREFIX + "client.size"; + public static final int FEDERATION_STORE_ZK_CLIENT_THREADS_SIZE_DEFAULT = 10; + public static final String FEDERATION_STORE_ZK_CLIENT_CONCURRENT = + FEDERATION_STORE_ZK_DRIVER_PREFIX + "client.concurrent"; + public static final boolean FEDERATION_STORE_ZK_CLIENT_CONCURRENT_DEFAULT = false; Review Comment: Can you use one configuration to control whether use this async mode or not? such as the number of thread, default value is -1 or 0, and use can enable this feature by configuring this value with a positive integer. And we should add detailed explanation of this configuration in hdfs-rbf-default.xml ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java: ########## @@ -84,6 +102,20 @@ public boolean initDriver() { baseZNode = conf.get( FEDERATION_STORE_ZK_PARENT_PATH, FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT); + this.enableConcurrent = conf.getBoolean( + FEDERATION_STORE_ZK_CLIENT_CONCURRENT, + FEDERATION_STORE_ZK_CLIENT_CONCURRENT_DEFAULT + ); + if(enableConcurrent) { + int numThreads = conf.getInt( + FEDERATION_STORE_ZK_CLIENT_THREADS_SIZE, + FEDERATION_STORE_ZK_CLIENT_THREADS_SIZE_DEFAULT); + ThreadFactory threadFactory = new ThreadFactoryBuilder() Review Comment: check the invalidation of this `numThreads`. ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java: ########## @@ -137,34 +177,26 @@ public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) String znode = getZNodeForClass(clazz); try { List<String> children = zkManager.getChildren(znode); - for (String child : children) { - try { - String path = getNodePath(znode, child); - Stat stat = new Stat(); - String data = zkManager.getStringData(path, stat); - boolean corrupted = false; - if (data == null || data.equals("")) { - // All records should have data, otherwise this is corrupted - corrupted = true; - } else { - try { - T record = createRecord(data, stat, clazz); - ret.add(record); - } catch (IOException e) { - LOG.error("Cannot create record type \"{}\" from \"{}\": {}", - clazz.getSimpleName(), data, e.getMessage()); - corrupted = true; - } - } - - if (corrupted) { - LOG.error("Cannot get data for {} at {}, cleaning corrupted data", - child, path); - zkManager.delete(path); + List<Callable<T>> callables = new ArrayList<>(); + if(enableConcurrent) { + children.forEach(child -> + callables.add( + () -> getRecord(clazz, znode, child) + ) + ); Review Comment: single line. `children.forEach(child -> callables.add(() -> getRecord(clazz, znode, child)));` ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java: ########## @@ -178,6 +210,45 @@ public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) return new QueryResult<T>(ret, getTime()); } + /** + * Get one data record in the StateStore or delete it if it's corrupted. + * + * @param clazz Record class to evaluate. + * @param znode The ZNode for the class. + * @param child The child for znode to get. + * @return The record to get. + */ + private <T extends BaseRecord> T getRecord(Class<T> clazz, String znode, String child) { + T record = null; + try { + String path = getNodePath(znode, child); + Stat stat = new Stat(); + String data = zkManager.getStringData(path, stat); + boolean corrupted = false; + if (data == null || data.equals("")) { + // All records should have data, otherwise this is corrupted + corrupted = true; + } else { + try { + record = createRecord(data, stat, clazz); + } catch (IOException e) { + LOG.error("Cannot create record type \"{}\" from \"{}\": {}", + clazz.getSimpleName(), data, e.getMessage()); + corrupted = true; + } + } + + if (corrupted) { + LOG.error("Cannot get data for {} at {}, cleaning corrupted data", + child, path); Review Comment: single line. ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java: ########## @@ -192,22 +263,45 @@ public <T extends BaseRecord> boolean putAll( String znode = getZNodeForClass(recordClass); long start = monotonicNow(); - boolean status = true; - for (T record : records) { - String primaryKey = getPrimaryKey(record); - String recordZNode = getNodePath(znode, primaryKey); - byte[] data = serialize(record); - if (!writeNode(recordZNode, data, update, error)){ - status = false; + final AtomicBoolean status = new AtomicBoolean(true); + if(enableConcurrent){ + List<Callable<Void>> callables = new ArrayList<>(); + records.forEach(record -> + callables.add( + () -> { + String primaryKey = getPrimaryKey(record); + String recordZNode = getNodePath(znode, primaryKey); + byte[] data = serialize(record); + if(!writeNode(recordZNode, data, update, error)) { Review Comment: `if (!writeNode(recordZNode, data, update, error))` ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java: ########## @@ -192,22 +263,45 @@ public <T extends BaseRecord> boolean putAll( String znode = getZNodeForClass(recordClass); long start = monotonicNow(); - boolean status = true; - for (T record : records) { - String primaryKey = getPrimaryKey(record); - String recordZNode = getNodePath(znode, primaryKey); - byte[] data = serialize(record); - if (!writeNode(recordZNode, data, update, error)){ - status = false; + final AtomicBoolean status = new AtomicBoolean(true); + if(enableConcurrent){ + List<Callable<Void>> callables = new ArrayList<>(); + records.forEach(record -> + callables.add( + () -> { + String primaryKey = getPrimaryKey(record); + String recordZNode = getNodePath(znode, primaryKey); + byte[] data = serialize(record); + if(!writeNode(recordZNode, data, update, error)) { + status.set(false); + } + return null; + } + ) + ); + try { + executorService.invokeAll(callables); + } catch (Exception e) { + LOG.error("Write record failed : {}", e.getMessage(), e); + throw new IOException(e); } + } else { + records.forEach(record -> { + String primaryKey = getPrimaryKey(record); + String recordZNode = getNodePath(znode, primaryKey); + byte[] data = serialize(record); + if(!writeNode(recordZNode, data, update, error)) { Review Comment: `if (!writeNode(recordZNode, data, update, error)) {` ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java: ########## @@ -192,22 +263,45 @@ public <T extends BaseRecord> boolean putAll( String znode = getZNodeForClass(recordClass); long start = monotonicNow(); - boolean status = true; - for (T record : records) { - String primaryKey = getPrimaryKey(record); - String recordZNode = getNodePath(znode, primaryKey); - byte[] data = serialize(record); - if (!writeNode(recordZNode, data, update, error)){ - status = false; + final AtomicBoolean status = new AtomicBoolean(true); + if(enableConcurrent){ Review Comment: ` if (enableConcurrent) {` ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java: ########## @@ -137,34 +177,26 @@ public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) String znode = getZNodeForClass(clazz); try { List<String> children = zkManager.getChildren(znode); - for (String child : children) { - try { - String path = getNodePath(znode, child); - Stat stat = new Stat(); - String data = zkManager.getStringData(path, stat); - boolean corrupted = false; - if (data == null || data.equals("")) { - // All records should have data, otherwise this is corrupted - corrupted = true; - } else { - try { - T record = createRecord(data, stat, clazz); - ret.add(record); - } catch (IOException e) { - LOG.error("Cannot create record type \"{}\" from \"{}\": {}", - clazz.getSimpleName(), data, e.getMessage()); - corrupted = true; - } - } - - if (corrupted) { - LOG.error("Cannot get data for {} at {}, cleaning corrupted data", - child, path); - zkManager.delete(path); + List<Callable<T>> callables = new ArrayList<>(); + if(enableConcurrent) { Review Comment: `if( enableConcurrent) {` ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java: ########## @@ -126,33 +133,71 @@ private <T extends BaseRecord> void testGetNullRecord( assertNull(curatorFramework.checkExists().forPath(znode)); } + @Test + public void testAsyncPerformance() throws Exception { + StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) getStateStoreDriver(); + List<MountTable> insertList = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + MountTable newRecord = generateFakeRecord(MountTable.class); + insertList.add(newRecord); + } + // Insert Multiple on sync mode + long startSync = Time.now(); + stateStoreDriver.putAll(insertList, true, false); + long endSync = Time.now(); + stateStoreDriver.removeAll(MembershipState.class); + + stateStoreDriver.setEnableConcurrent(true); + // Insert Multiple on async mode + long startAsync = Time.now(); + stateStoreDriver.putAll(insertList, true, false); + long endAsync = Time.now(); + System.out.printf("Sync mode total running time is %d ms, and async mode total running time is %d ms", Review Comment: This UT is invalid, can you change it to `assertXXX` ? > RBF: Improve StateStoreZookeeperImpl > ------------------------------------- > > Key: HDFS-16848 > URL: https://issues.apache.org/jira/browse/HDFS-16848 > Project: Hadoop HDFS > Issue Type: Improvement > Components: rbf > Reporter: Sun Hao > Priority: Major > Labels: pull-request-available > Fix For: 3.4.0 > > > Currently, router is getting/updating state from zk sequentially. It will > slowdown router load/update state cache especially for a large cluster or a > multi region cluster. > We propose adding a threadpool to deal with zk state synchronization。 -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org