[ 
https://issues.apache.org/jira/browse/HDFS-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848029#comment-17848029
 ] 

ASF GitHub Bot commented on HDFS-17529:
---------------------------------------

ZanderXu commented on code in PR #6833:
URL: https://github.com/apache/hadoop/pull/6833#discussion_r1607499422


##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java:
##########
@@ -86,4 +89,37 @@ public <T extends BaseRecord> boolean remove(T record) 
throws IOException {
     Class<T> recordClass = (Class<T>)StateStoreUtils.getRecordClass(clazz);
     return remove(recordClass, query) == 1;
   }
+
+  @Override
+  public <T extends BaseRecord> Map<T, Boolean> removeMultiple(List<T> 
records) throws IOException {
+    assert !records.isEmpty();
+    // Fall back to iterative remove() calls if all records don't share 1 class
+    Class<? extends BaseRecord> expectedClazz = records.get(0).getClass();
+    if (!records.stream().allMatch(x -> x.getClass() == expectedClazz)) {
+      Map<T, Boolean> result = new HashMap<>();
+      for (T record : records) {
+        result.put(record, remove(record));
+      }
+      return result;
+    }
+
+    final List<Query<T>> queries = new ArrayList<>();
+    for (T record: records) {
+      queries.add(new Query<>(record));
+    }
+    @SuppressWarnings("unchecked")
+    Class<T> recordClass = (Class<T>) 
StateStoreUtils.getRecordClass(expectedClazz);
+    Map<Query<T>, Integer> result = remove(recordClass, queries);
+    return result.entrySet().stream()
+        .collect(Collectors.toMap(e -> e.getKey().getPartial(), e -> 
e.getValue() > 0));

Review Comment:
   `remove(T record)` returns true if `remove(recordClass, query)` is 1. But 
here is `e.getValue() > 0`. So how about make them consistent? 
   
   Here, how about using `e.getValue() == 1`?



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java:
##########
@@ -565,7 +568,7 @@ public void testRegistrationExpiredRaceCondition()
     // Load cache
     MembershipStore memStoreSpy = spy(membershipStore);
     DelayAnswer delayer = new DelayAnswer(LOG);
-    doAnswer(delayer).when(memStoreSpy).overrideExpiredRecords(any());
+    doAnswer(delayer).when(memStoreSpy).overrideExpiredRecords(any(), 
anyBoolean());

Review Comment:
   remove this `anyBoolean()`



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -284,51 +288,88 @@ public <T extends BaseRecord> StateStoreOperationResult 
putAll(
   }
 
   @Override
-  public <T extends BaseRecord> int remove(
-      Class<T> clazz, Query<T> query) throws IOException {
+  public <T extends BaseRecord> Map<Query<T>, Integer> remove(Class<T> clazz,
+      List<Query<T>> queries) throws IOException {
     verifyDriverReady();
-    if (query == null) {
-      return 0;
+    // Track how many entries are deleted by each query
+    Map<Query<T>, Integer> ret = new HashMap<>();
+    final List<T> trueRemoved = Collections.synchronizedList(new 
ArrayList<>());
+    if (queries.isEmpty()) {
+      return ret;
     }
 
     // Read the current data
     long start = monotonicNow();
-    List<T> records = null;
+    List<T> records;
     try {
       QueryResult<T> result = get(clazz);
       records = result.getRecords();
     } catch (IOException ex) {
       LOG.error("Cannot get existing records", ex);
       getMetrics().addFailure(monotonicNow() - start);
-      return 0;
+      return ret;
     }
 
     // Check the records to remove
     String znode = getZNodeForClass(clazz);
-    List<T> recordsToRemove = filterMultiple(query, records);
+    Set<T> recordsToRemove = new HashSet<>();
+    Map<Query<T>, List<T>> queryToRecords = new HashMap<>();
+    for (Query<T> query : queries) {
+      List<T> filtered = filterMultiple(query, records);
+      queryToRecords.put(query, filtered);
+      recordsToRemove.addAll(filtered);
+    }
 
     // Remove the records
-    int removed = 0;
-    for (T existingRecord : recordsToRemove) {
+    List<Callable<Void>> callables = new ArrayList<>();
+    recordsToRemove.forEach(existingRecord -> callables.add(() -> {
       LOG.info("Removing \"{}\"", existingRecord);
       try {
         String primaryKey = getPrimaryKey(existingRecord);
         String path = getNodePath(znode, primaryKey);
         if (zkManager.delete(path)) {
-          removed++;
+          trueRemoved.add(existingRecord);
         } else {
           LOG.error("Did not remove \"{}\"", existingRecord);
         }
       } catch (Exception e) {
         LOG.error("Cannot remove \"{}\"", existingRecord, e);
         getMetrics().addFailure(monotonicNow() - start);
       }
+      return null;
+    }));
+    try {
+      if (enableConcurrent) {
+        executorService.invokeAll(callables);
+      } else {
+        for (Callable<Void> callable : callables) {
+          callable.call();
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Record removal failed : {}", e.getMessage(), e);
+      throw new IOException(e);

Review Comment:
   `throw new IOException(e);` should be removed since `remove(Class<T> clazz, 
Query<T> query)` does not throw any exceptions except 
`StateStoreUnavailableException` in `verifyDriverReady`.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -284,51 +288,88 @@ public <T extends BaseRecord> StateStoreOperationResult 
putAll(
   }
 
   @Override
-  public <T extends BaseRecord> int remove(
-      Class<T> clazz, Query<T> query) throws IOException {
+  public <T extends BaseRecord> Map<Query<T>, Integer> remove(Class<T> clazz,
+      List<Query<T>> queries) throws IOException {
     verifyDriverReady();
-    if (query == null) {
-      return 0;
+    // Track how many entries are deleted by each query
+    Map<Query<T>, Integer> ret = new HashMap<>();
+    final List<T> trueRemoved = Collections.synchronizedList(new 
ArrayList<>());
+    if (queries.isEmpty()) {
+      return ret;
     }
 
     // Read the current data
     long start = monotonicNow();
-    List<T> records = null;
+    List<T> records;
     try {
       QueryResult<T> result = get(clazz);
       records = result.getRecords();
     } catch (IOException ex) {
       LOG.error("Cannot get existing records", ex);
       getMetrics().addFailure(monotonicNow() - start);
-      return 0;
+      return ret;
     }
 
     // Check the records to remove
     String znode = getZNodeForClass(clazz);
-    List<T> recordsToRemove = filterMultiple(query, records);
+    Set<T> recordsToRemove = new HashSet<>();
+    Map<Query<T>, List<T>> queryToRecords = new HashMap<>();
+    for (Query<T> query : queries) {
+      List<T> filtered = filterMultiple(query, records);
+      queryToRecords.put(query, filtered);
+      recordsToRemove.addAll(filtered);
+    }
 
     // Remove the records
-    int removed = 0;
-    for (T existingRecord : recordsToRemove) {
+    List<Callable<Void>> callables = new ArrayList<>();
+    recordsToRemove.forEach(existingRecord -> callables.add(() -> {
       LOG.info("Removing \"{}\"", existingRecord);
       try {
         String primaryKey = getPrimaryKey(existingRecord);
         String path = getNodePath(znode, primaryKey);
         if (zkManager.delete(path)) {
-          removed++;
+          trueRemoved.add(existingRecord);
         } else {
           LOG.error("Did not remove \"{}\"", existingRecord);
         }
       } catch (Exception e) {
         LOG.error("Cannot remove \"{}\"", existingRecord, e);
         getMetrics().addFailure(monotonicNow() - start);
       }
+      return null;
+    }));
+    try {
+      if (enableConcurrent) {
+        executorService.invokeAll(callables);
+      } else {
+        for (Callable<Void> callable : callables) {
+          callable.call();
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Record removal failed : {}", e.getMessage(), e);
+      throw new IOException(e);
     }
     long end = monotonicNow();
-    if (removed > 0) {
+    if (!trueRemoved.isEmpty()) {
       getMetrics().addRemove(end - start);
     }
-    return removed;
+    // Generate return map
+    for (Map.Entry<Query<T>, List<T>> entry : queryToRecords.entrySet()) {
+      for (T record : entry.getValue()) {
+        if (trueRemoved.contains(record)) {
+          ret.compute(entry.getKey(), (k, v) ->  (v == null) ? 1 : v + 1);
+          break;

Review Comment:
   This `break` should be removed? 



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java:
##########
@@ -86,4 +89,37 @@ public <T extends BaseRecord> boolean remove(T record) 
throws IOException {
     Class<T> recordClass = (Class<T>)StateStoreUtils.getRecordClass(clazz);
     return remove(recordClass, query) == 1;
   }
+
+  @Override
+  public <T extends BaseRecord> Map<T, Boolean> removeMultiple(List<T> 
records) throws IOException {
+    assert !records.isEmpty();
+    // Fall back to iterative remove() calls if all records don't share 1 class
+    Class<? extends BaseRecord> expectedClazz = records.get(0).getClass();
+    if (!records.stream().allMatch(x -> x.getClass() == expectedClazz)) {
+      Map<T, Boolean> result = new HashMap<>();
+      for (T record : records) {
+        result.put(record, remove(record));
+      }
+      return result;
+    }
+
+    final List<Query<T>> queries = new ArrayList<>();
+    for (T record: records) {

Review Comment:
   `for (T record : records) {`





> Improve router state store cache entry deletion
> -----------------------------------------------
>
>                 Key: HDFS-17529
>                 URL: https://issues.apache.org/jira/browse/HDFS-17529
>             Project: Hadoop HDFS
>          Issue Type: Improvement
>          Components: hdfs, rbf
>            Reporter: Felix N
>            Assignee: Felix N
>            Priority: Major
>              Labels: pull-request-available
>
> Current implementation for router state store update is quite inefficient, so 
> much that when routers are removed and a lot of NameNodeMembership records 
> are deleted in a short burst, the deletions triggered a router safemode in 
> our cluster and caused a lot of troubles.
> This ticket aims to improve the deletion process for ZK state store 
> implementation.
> See HDFS-17532 for the other half of this improvement



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to