ZanderXu commented on code in PR #6833: URL: https://github.com/apache/hadoop/pull/6833#discussion_r1607499422
########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java: ########## @@ -86,4 +89,37 @@ public <T extends BaseRecord> boolean remove(T record) throws IOException { Class<T> recordClass = (Class<T>)StateStoreUtils.getRecordClass(clazz); return remove(recordClass, query) == 1; } + + @Override + public <T extends BaseRecord> Map<T, Boolean> removeMultiple(List<T> records) throws IOException { + assert !records.isEmpty(); + // Fall back to iterative remove() calls if all records don't share 1 class + Class<? extends BaseRecord> expectedClazz = records.get(0).getClass(); + if (!records.stream().allMatch(x -> x.getClass() == expectedClazz)) { + Map<T, Boolean> result = new HashMap<>(); + for (T record : records) { + result.put(record, remove(record)); + } + return result; + } + + final List<Query<T>> queries = new ArrayList<>(); + for (T record: records) { + queries.add(new Query<>(record)); + } + @SuppressWarnings("unchecked") + Class<T> recordClass = (Class<T>) StateStoreUtils.getRecordClass(expectedClazz); + Map<Query<T>, Integer> result = remove(recordClass, queries); + return result.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey().getPartial(), e -> e.getValue() > 0)); Review Comment: `remove(T record)` returns true if `remove(recordClass, query)` is 1. But here is `e.getValue() > 0`. So how about make them consistent? Here, how about using `e.getValue() == 1`? ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java: ########## @@ -565,7 +568,7 @@ public void testRegistrationExpiredRaceCondition() // Load cache MembershipStore memStoreSpy = spy(membershipStore); DelayAnswer delayer = new DelayAnswer(LOG); - doAnswer(delayer).when(memStoreSpy).overrideExpiredRecords(any()); + doAnswer(delayer).when(memStoreSpy).overrideExpiredRecords(any(), anyBoolean()); Review Comment: remove this `anyBoolean()` ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java: ########## @@ -284,51 +288,88 @@ public <T extends BaseRecord> StateStoreOperationResult putAll( } @Override - public <T extends BaseRecord> int remove( - Class<T> clazz, Query<T> query) throws IOException { + public <T extends BaseRecord> Map<Query<T>, Integer> remove(Class<T> clazz, + List<Query<T>> queries) throws IOException { verifyDriverReady(); - if (query == null) { - return 0; + // Track how many entries are deleted by each query + Map<Query<T>, Integer> ret = new HashMap<>(); + final List<T> trueRemoved = Collections.synchronizedList(new ArrayList<>()); + if (queries.isEmpty()) { + return ret; } // Read the current data long start = monotonicNow(); - List<T> records = null; + List<T> records; try { QueryResult<T> result = get(clazz); records = result.getRecords(); } catch (IOException ex) { LOG.error("Cannot get existing records", ex); getMetrics().addFailure(monotonicNow() - start); - return 0; + return ret; } // Check the records to remove String znode = getZNodeForClass(clazz); - List<T> recordsToRemove = filterMultiple(query, records); + Set<T> recordsToRemove = new HashSet<>(); + Map<Query<T>, List<T>> queryToRecords = new HashMap<>(); + for (Query<T> query : queries) { + List<T> filtered = filterMultiple(query, records); + queryToRecords.put(query, filtered); + recordsToRemove.addAll(filtered); + } // Remove the records - int removed = 0; - for (T existingRecord : recordsToRemove) { + List<Callable<Void>> callables = new ArrayList<>(); + recordsToRemove.forEach(existingRecord -> callables.add(() -> { LOG.info("Removing \"{}\"", existingRecord); try { String primaryKey = getPrimaryKey(existingRecord); String path = getNodePath(znode, primaryKey); if (zkManager.delete(path)) { - removed++; + trueRemoved.add(existingRecord); } else { LOG.error("Did not remove \"{}\"", existingRecord); } } catch (Exception e) { LOG.error("Cannot remove \"{}\"", existingRecord, e); getMetrics().addFailure(monotonicNow() - start); } + return null; + })); + try { + if (enableConcurrent) { + executorService.invokeAll(callables); + } else { + for (Callable<Void> callable : callables) { + callable.call(); + } + } + } catch (Exception e) { + LOG.error("Record removal failed : {}", e.getMessage(), e); + throw new IOException(e); Review Comment: `throw new IOException(e);` should be removed since `remove(Class<T> clazz, Query<T> query)` does not throw any exceptions except `StateStoreUnavailableException` in `verifyDriverReady`. ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java: ########## @@ -284,51 +288,88 @@ public <T extends BaseRecord> StateStoreOperationResult putAll( } @Override - public <T extends BaseRecord> int remove( - Class<T> clazz, Query<T> query) throws IOException { + public <T extends BaseRecord> Map<Query<T>, Integer> remove(Class<T> clazz, + List<Query<T>> queries) throws IOException { verifyDriverReady(); - if (query == null) { - return 0; + // Track how many entries are deleted by each query + Map<Query<T>, Integer> ret = new HashMap<>(); + final List<T> trueRemoved = Collections.synchronizedList(new ArrayList<>()); + if (queries.isEmpty()) { + return ret; } // Read the current data long start = monotonicNow(); - List<T> records = null; + List<T> records; try { QueryResult<T> result = get(clazz); records = result.getRecords(); } catch (IOException ex) { LOG.error("Cannot get existing records", ex); getMetrics().addFailure(monotonicNow() - start); - return 0; + return ret; } // Check the records to remove String znode = getZNodeForClass(clazz); - List<T> recordsToRemove = filterMultiple(query, records); + Set<T> recordsToRemove = new HashSet<>(); + Map<Query<T>, List<T>> queryToRecords = new HashMap<>(); + for (Query<T> query : queries) { + List<T> filtered = filterMultiple(query, records); + queryToRecords.put(query, filtered); + recordsToRemove.addAll(filtered); + } // Remove the records - int removed = 0; - for (T existingRecord : recordsToRemove) { + List<Callable<Void>> callables = new ArrayList<>(); + recordsToRemove.forEach(existingRecord -> callables.add(() -> { LOG.info("Removing \"{}\"", existingRecord); try { String primaryKey = getPrimaryKey(existingRecord); String path = getNodePath(znode, primaryKey); if (zkManager.delete(path)) { - removed++; + trueRemoved.add(existingRecord); } else { LOG.error("Did not remove \"{}\"", existingRecord); } } catch (Exception e) { LOG.error("Cannot remove \"{}\"", existingRecord, e); getMetrics().addFailure(monotonicNow() - start); } + return null; + })); + try { + if (enableConcurrent) { + executorService.invokeAll(callables); + } else { + for (Callable<Void> callable : callables) { + callable.call(); + } + } + } catch (Exception e) { + LOG.error("Record removal failed : {}", e.getMessage(), e); + throw new IOException(e); } long end = monotonicNow(); - if (removed > 0) { + if (!trueRemoved.isEmpty()) { getMetrics().addRemove(end - start); } - return removed; + // Generate return map + for (Map.Entry<Query<T>, List<T>> entry : queryToRecords.entrySet()) { + for (T record : entry.getValue()) { + if (trueRemoved.contains(record)) { + ret.compute(entry.getKey(), (k, v) -> (v == null) ? 1 : v + 1); + break; Review Comment: This `break` should be removed? ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java: ########## @@ -86,4 +89,37 @@ public <T extends BaseRecord> boolean remove(T record) throws IOException { Class<T> recordClass = (Class<T>)StateStoreUtils.getRecordClass(clazz); return remove(recordClass, query) == 1; } + + @Override + public <T extends BaseRecord> Map<T, Boolean> removeMultiple(List<T> records) throws IOException { + assert !records.isEmpty(); + // Fall back to iterative remove() calls if all records don't share 1 class + Class<? extends BaseRecord> expectedClazz = records.get(0).getClass(); + if (!records.stream().allMatch(x -> x.getClass() == expectedClazz)) { + Map<T, Boolean> result = new HashMap<>(); + for (T record : records) { + result.put(record, remove(record)); + } + return result; + } + + final List<Query<T>> queries = new ArrayList<>(); + for (T record: records) { Review Comment: `for (T record : records) {` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org