ZanderXu commented on code in PR #6833:
URL: https://github.com/apache/hadoop/pull/6833#discussion_r1607499422


##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java:
##########
@@ -86,4 +89,37 @@ public <T extends BaseRecord> boolean remove(T record) 
throws IOException {
     Class<T> recordClass = (Class<T>)StateStoreUtils.getRecordClass(clazz);
     return remove(recordClass, query) == 1;
   }
+
+  @Override
+  public <T extends BaseRecord> Map<T, Boolean> removeMultiple(List<T> 
records) throws IOException {
+    assert !records.isEmpty();
+    // Fall back to iterative remove() calls if all records don't share 1 class
+    Class<? extends BaseRecord> expectedClazz = records.get(0).getClass();
+    if (!records.stream().allMatch(x -> x.getClass() == expectedClazz)) {
+      Map<T, Boolean> result = new HashMap<>();
+      for (T record : records) {
+        result.put(record, remove(record));
+      }
+      return result;
+    }
+
+    final List<Query<T>> queries = new ArrayList<>();
+    for (T record: records) {
+      queries.add(new Query<>(record));
+    }
+    @SuppressWarnings("unchecked")
+    Class<T> recordClass = (Class<T>) 
StateStoreUtils.getRecordClass(expectedClazz);
+    Map<Query<T>, Integer> result = remove(recordClass, queries);
+    return result.entrySet().stream()
+        .collect(Collectors.toMap(e -> e.getKey().getPartial(), e -> 
e.getValue() > 0));

Review Comment:
   `remove(T record)` returns true if `remove(recordClass, query)` is 1. But 
here is `e.getValue() > 0`. So how about make them consistent? 
   
   Here, how about using `e.getValue() == 1`?



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java:
##########
@@ -565,7 +568,7 @@ public void testRegistrationExpiredRaceCondition()
     // Load cache
     MembershipStore memStoreSpy = spy(membershipStore);
     DelayAnswer delayer = new DelayAnswer(LOG);
-    doAnswer(delayer).when(memStoreSpy).overrideExpiredRecords(any());
+    doAnswer(delayer).when(memStoreSpy).overrideExpiredRecords(any(), 
anyBoolean());

Review Comment:
   remove this `anyBoolean()`



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -284,51 +288,88 @@ public <T extends BaseRecord> StateStoreOperationResult 
putAll(
   }
 
   @Override
-  public <T extends BaseRecord> int remove(
-      Class<T> clazz, Query<T> query) throws IOException {
+  public <T extends BaseRecord> Map<Query<T>, Integer> remove(Class<T> clazz,
+      List<Query<T>> queries) throws IOException {
     verifyDriverReady();
-    if (query == null) {
-      return 0;
+    // Track how many entries are deleted by each query
+    Map<Query<T>, Integer> ret = new HashMap<>();
+    final List<T> trueRemoved = Collections.synchronizedList(new 
ArrayList<>());
+    if (queries.isEmpty()) {
+      return ret;
     }
 
     // Read the current data
     long start = monotonicNow();
-    List<T> records = null;
+    List<T> records;
     try {
       QueryResult<T> result = get(clazz);
       records = result.getRecords();
     } catch (IOException ex) {
       LOG.error("Cannot get existing records", ex);
       getMetrics().addFailure(monotonicNow() - start);
-      return 0;
+      return ret;
     }
 
     // Check the records to remove
     String znode = getZNodeForClass(clazz);
-    List<T> recordsToRemove = filterMultiple(query, records);
+    Set<T> recordsToRemove = new HashSet<>();
+    Map<Query<T>, List<T>> queryToRecords = new HashMap<>();
+    for (Query<T> query : queries) {
+      List<T> filtered = filterMultiple(query, records);
+      queryToRecords.put(query, filtered);
+      recordsToRemove.addAll(filtered);
+    }
 
     // Remove the records
-    int removed = 0;
-    for (T existingRecord : recordsToRemove) {
+    List<Callable<Void>> callables = new ArrayList<>();
+    recordsToRemove.forEach(existingRecord -> callables.add(() -> {
       LOG.info("Removing \"{}\"", existingRecord);
       try {
         String primaryKey = getPrimaryKey(existingRecord);
         String path = getNodePath(znode, primaryKey);
         if (zkManager.delete(path)) {
-          removed++;
+          trueRemoved.add(existingRecord);
         } else {
           LOG.error("Did not remove \"{}\"", existingRecord);
         }
       } catch (Exception e) {
         LOG.error("Cannot remove \"{}\"", existingRecord, e);
         getMetrics().addFailure(monotonicNow() - start);
       }
+      return null;
+    }));
+    try {
+      if (enableConcurrent) {
+        executorService.invokeAll(callables);
+      } else {
+        for (Callable<Void> callable : callables) {
+          callable.call();
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Record removal failed : {}", e.getMessage(), e);
+      throw new IOException(e);

Review Comment:
   `throw new IOException(e);` should be removed since `remove(Class<T> clazz, 
Query<T> query)` does not throw any exceptions except 
`StateStoreUnavailableException` in `verifyDriverReady`.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -284,51 +288,88 @@ public <T extends BaseRecord> StateStoreOperationResult 
putAll(
   }
 
   @Override
-  public <T extends BaseRecord> int remove(
-      Class<T> clazz, Query<T> query) throws IOException {
+  public <T extends BaseRecord> Map<Query<T>, Integer> remove(Class<T> clazz,
+      List<Query<T>> queries) throws IOException {
     verifyDriverReady();
-    if (query == null) {
-      return 0;
+    // Track how many entries are deleted by each query
+    Map<Query<T>, Integer> ret = new HashMap<>();
+    final List<T> trueRemoved = Collections.synchronizedList(new 
ArrayList<>());
+    if (queries.isEmpty()) {
+      return ret;
     }
 
     // Read the current data
     long start = monotonicNow();
-    List<T> records = null;
+    List<T> records;
     try {
       QueryResult<T> result = get(clazz);
       records = result.getRecords();
     } catch (IOException ex) {
       LOG.error("Cannot get existing records", ex);
       getMetrics().addFailure(monotonicNow() - start);
-      return 0;
+      return ret;
     }
 
     // Check the records to remove
     String znode = getZNodeForClass(clazz);
-    List<T> recordsToRemove = filterMultiple(query, records);
+    Set<T> recordsToRemove = new HashSet<>();
+    Map<Query<T>, List<T>> queryToRecords = new HashMap<>();
+    for (Query<T> query : queries) {
+      List<T> filtered = filterMultiple(query, records);
+      queryToRecords.put(query, filtered);
+      recordsToRemove.addAll(filtered);
+    }
 
     // Remove the records
-    int removed = 0;
-    for (T existingRecord : recordsToRemove) {
+    List<Callable<Void>> callables = new ArrayList<>();
+    recordsToRemove.forEach(existingRecord -> callables.add(() -> {
       LOG.info("Removing \"{}\"", existingRecord);
       try {
         String primaryKey = getPrimaryKey(existingRecord);
         String path = getNodePath(znode, primaryKey);
         if (zkManager.delete(path)) {
-          removed++;
+          trueRemoved.add(existingRecord);
         } else {
           LOG.error("Did not remove \"{}\"", existingRecord);
         }
       } catch (Exception e) {
         LOG.error("Cannot remove \"{}\"", existingRecord, e);
         getMetrics().addFailure(monotonicNow() - start);
       }
+      return null;
+    }));
+    try {
+      if (enableConcurrent) {
+        executorService.invokeAll(callables);
+      } else {
+        for (Callable<Void> callable : callables) {
+          callable.call();
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Record removal failed : {}", e.getMessage(), e);
+      throw new IOException(e);
     }
     long end = monotonicNow();
-    if (removed > 0) {
+    if (!trueRemoved.isEmpty()) {
       getMetrics().addRemove(end - start);
     }
-    return removed;
+    // Generate return map
+    for (Map.Entry<Query<T>, List<T>> entry : queryToRecords.entrySet()) {
+      for (T record : entry.getValue()) {
+        if (trueRemoved.contains(record)) {
+          ret.compute(entry.getKey(), (k, v) ->  (v == null) ? 1 : v + 1);
+          break;

Review Comment:
   This `break` should be removed? 



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java:
##########
@@ -86,4 +89,37 @@ public <T extends BaseRecord> boolean remove(T record) 
throws IOException {
     Class<T> recordClass = (Class<T>)StateStoreUtils.getRecordClass(clazz);
     return remove(recordClass, query) == 1;
   }
+
+  @Override
+  public <T extends BaseRecord> Map<T, Boolean> removeMultiple(List<T> 
records) throws IOException {
+    assert !records.isEmpty();
+    // Fall back to iterative remove() calls if all records don't share 1 class
+    Class<? extends BaseRecord> expectedClazz = records.get(0).getClass();
+    if (!records.stream().allMatch(x -> x.getClass() == expectedClazz)) {
+      Map<T, Boolean> result = new HashMap<>();
+      for (T record : records) {
+        result.put(record, remove(record));
+      }
+      return result;
+    }
+
+    final List<Query<T>> queries = new ArrayList<>();
+    for (T record: records) {

Review Comment:
   `for (T record : records) {`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to