[ https://issues.apache.org/jira/browse/HDFS-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848029#comment-17848029 ]
ASF GitHub Bot commented on HDFS-17529: --------------------------------------- ZanderXu commented on code in PR #6833: URL: https://github.com/apache/hadoop/pull/6833#discussion_r1607499422 ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java: ########## @@ -86,4 +89,37 @@ public <T extends BaseRecord> boolean remove(T record) throws IOException { Class<T> recordClass = (Class<T>)StateStoreUtils.getRecordClass(clazz); return remove(recordClass, query) == 1; } + + @Override + public <T extends BaseRecord> Map<T, Boolean> removeMultiple(List<T> records) throws IOException { + assert !records.isEmpty(); + // Fall back to iterative remove() calls if all records don't share 1 class + Class<? extends BaseRecord> expectedClazz = records.get(0).getClass(); + if (!records.stream().allMatch(x -> x.getClass() == expectedClazz)) { + Map<T, Boolean> result = new HashMap<>(); + for (T record : records) { + result.put(record, remove(record)); + } + return result; + } + + final List<Query<T>> queries = new ArrayList<>(); + for (T record: records) { + queries.add(new Query<>(record)); + } + @SuppressWarnings("unchecked") + Class<T> recordClass = (Class<T>) StateStoreUtils.getRecordClass(expectedClazz); + Map<Query<T>, Integer> result = remove(recordClass, queries); + return result.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey().getPartial(), e -> e.getValue() > 0)); Review Comment: `remove(T record)` returns true if `remove(recordClass, query)` is 1. But here is `e.getValue() > 0`. So how about make them consistent? Here, how about using `e.getValue() == 1`? ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java: ########## @@ -565,7 +568,7 @@ public void testRegistrationExpiredRaceCondition() // Load cache MembershipStore memStoreSpy = spy(membershipStore); DelayAnswer delayer = new DelayAnswer(LOG); - doAnswer(delayer).when(memStoreSpy).overrideExpiredRecords(any()); + doAnswer(delayer).when(memStoreSpy).overrideExpiredRecords(any(), anyBoolean()); Review Comment: remove this `anyBoolean()` ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java: ########## @@ -284,51 +288,88 @@ public <T extends BaseRecord> StateStoreOperationResult putAll( } @Override - public <T extends BaseRecord> int remove( - Class<T> clazz, Query<T> query) throws IOException { + public <T extends BaseRecord> Map<Query<T>, Integer> remove(Class<T> clazz, + List<Query<T>> queries) throws IOException { verifyDriverReady(); - if (query == null) { - return 0; + // Track how many entries are deleted by each query + Map<Query<T>, Integer> ret = new HashMap<>(); + final List<T> trueRemoved = Collections.synchronizedList(new ArrayList<>()); + if (queries.isEmpty()) { + return ret; } // Read the current data long start = monotonicNow(); - List<T> records = null; + List<T> records; try { QueryResult<T> result = get(clazz); records = result.getRecords(); } catch (IOException ex) { LOG.error("Cannot get existing records", ex); getMetrics().addFailure(monotonicNow() - start); - return 0; + return ret; } // Check the records to remove String znode = getZNodeForClass(clazz); - List<T> recordsToRemove = filterMultiple(query, records); + Set<T> recordsToRemove = new HashSet<>(); + Map<Query<T>, List<T>> queryToRecords = new HashMap<>(); + for (Query<T> query : queries) { + List<T> filtered = filterMultiple(query, records); + queryToRecords.put(query, filtered); + recordsToRemove.addAll(filtered); + } // Remove the records - int removed = 0; - for (T existingRecord : recordsToRemove) { + List<Callable<Void>> callables = new ArrayList<>(); + recordsToRemove.forEach(existingRecord -> callables.add(() -> { LOG.info("Removing \"{}\"", existingRecord); try { String primaryKey = getPrimaryKey(existingRecord); String path = getNodePath(znode, primaryKey); if (zkManager.delete(path)) { - removed++; + trueRemoved.add(existingRecord); } else { LOG.error("Did not remove \"{}\"", existingRecord); } } catch (Exception e) { LOG.error("Cannot remove \"{}\"", existingRecord, e); getMetrics().addFailure(monotonicNow() - start); } + return null; + })); + try { + if (enableConcurrent) { + executorService.invokeAll(callables); + } else { + for (Callable<Void> callable : callables) { + callable.call(); + } + } + } catch (Exception e) { + LOG.error("Record removal failed : {}", e.getMessage(), e); + throw new IOException(e); Review Comment: `throw new IOException(e);` should be removed since `remove(Class<T> clazz, Query<T> query)` does not throw any exceptions except `StateStoreUnavailableException` in `verifyDriverReady`. ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java: ########## @@ -284,51 +288,88 @@ public <T extends BaseRecord> StateStoreOperationResult putAll( } @Override - public <T extends BaseRecord> int remove( - Class<T> clazz, Query<T> query) throws IOException { + public <T extends BaseRecord> Map<Query<T>, Integer> remove(Class<T> clazz, + List<Query<T>> queries) throws IOException { verifyDriverReady(); - if (query == null) { - return 0; + // Track how many entries are deleted by each query + Map<Query<T>, Integer> ret = new HashMap<>(); + final List<T> trueRemoved = Collections.synchronizedList(new ArrayList<>()); + if (queries.isEmpty()) { + return ret; } // Read the current data long start = monotonicNow(); - List<T> records = null; + List<T> records; try { QueryResult<T> result = get(clazz); records = result.getRecords(); } catch (IOException ex) { LOG.error("Cannot get existing records", ex); getMetrics().addFailure(monotonicNow() - start); - return 0; + return ret; } // Check the records to remove String znode = getZNodeForClass(clazz); - List<T> recordsToRemove = filterMultiple(query, records); + Set<T> recordsToRemove = new HashSet<>(); + Map<Query<T>, List<T>> queryToRecords = new HashMap<>(); + for (Query<T> query : queries) { + List<T> filtered = filterMultiple(query, records); + queryToRecords.put(query, filtered); + recordsToRemove.addAll(filtered); + } // Remove the records - int removed = 0; - for (T existingRecord : recordsToRemove) { + List<Callable<Void>> callables = new ArrayList<>(); + recordsToRemove.forEach(existingRecord -> callables.add(() -> { LOG.info("Removing \"{}\"", existingRecord); try { String primaryKey = getPrimaryKey(existingRecord); String path = getNodePath(znode, primaryKey); if (zkManager.delete(path)) { - removed++; + trueRemoved.add(existingRecord); } else { LOG.error("Did not remove \"{}\"", existingRecord); } } catch (Exception e) { LOG.error("Cannot remove \"{}\"", existingRecord, e); getMetrics().addFailure(monotonicNow() - start); } + return null; + })); + try { + if (enableConcurrent) { + executorService.invokeAll(callables); + } else { + for (Callable<Void> callable : callables) { + callable.call(); + } + } + } catch (Exception e) { + LOG.error("Record removal failed : {}", e.getMessage(), e); + throw new IOException(e); } long end = monotonicNow(); - if (removed > 0) { + if (!trueRemoved.isEmpty()) { getMetrics().addRemove(end - start); } - return removed; + // Generate return map + for (Map.Entry<Query<T>, List<T>> entry : queryToRecords.entrySet()) { + for (T record : entry.getValue()) { + if (trueRemoved.contains(record)) { + ret.compute(entry.getKey(), (k, v) -> (v == null) ? 1 : v + 1); + break; Review Comment: This `break` should be removed? ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java: ########## @@ -86,4 +89,37 @@ public <T extends BaseRecord> boolean remove(T record) throws IOException { Class<T> recordClass = (Class<T>)StateStoreUtils.getRecordClass(clazz); return remove(recordClass, query) == 1; } + + @Override + public <T extends BaseRecord> Map<T, Boolean> removeMultiple(List<T> records) throws IOException { + assert !records.isEmpty(); + // Fall back to iterative remove() calls if all records don't share 1 class + Class<? extends BaseRecord> expectedClazz = records.get(0).getClass(); + if (!records.stream().allMatch(x -> x.getClass() == expectedClazz)) { + Map<T, Boolean> result = new HashMap<>(); + for (T record : records) { + result.put(record, remove(record)); + } + return result; + } + + final List<Query<T>> queries = new ArrayList<>(); + for (T record: records) { Review Comment: `for (T record : records) {` > Improve router state store cache entry deletion > ----------------------------------------------- > > Key: HDFS-17529 > URL: https://issues.apache.org/jira/browse/HDFS-17529 > Project: Hadoop HDFS > Issue Type: Improvement > Components: hdfs, rbf > Reporter: Felix N > Assignee: Felix N > Priority: Major > Labels: pull-request-available > > Current implementation for router state store update is quite inefficient, so > much that when routers are removed and a lot of NameNodeMembership records > are deleted in a short burst, the deletions triggered a router safemode in > our cluster and caused a lot of troubles. > This ticket aims to improve the deletion process for ZK state store > implementation. > See HDFS-17532 for the other half of this improvement -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org