[
https://issues.apache.org/jira/browse/HDFS-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848029#comment-17848029
]
ASF GitHub Bot commented on HDFS-17529:
---------------------------------------
ZanderXu commented on code in PR #6833:
URL: https://github.com/apache/hadoop/pull/6833#discussion_r1607499422
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java:
##########
@@ -86,4 +89,37 @@ public <T extends BaseRecord> boolean remove(T record)
throws IOException {
Class<T> recordClass = (Class<T>)StateStoreUtils.getRecordClass(clazz);
return remove(recordClass, query) == 1;
}
+
+ @Override
+ public <T extends BaseRecord> Map<T, Boolean> removeMultiple(List<T>
records) throws IOException {
+ assert !records.isEmpty();
+ // Fall back to iterative remove() calls if all records don't share 1 class
+ Class<? extends BaseRecord> expectedClazz = records.get(0).getClass();
+ if (!records.stream().allMatch(x -> x.getClass() == expectedClazz)) {
+ Map<T, Boolean> result = new HashMap<>();
+ for (T record : records) {
+ result.put(record, remove(record));
+ }
+ return result;
+ }
+
+ final List<Query<T>> queries = new ArrayList<>();
+ for (T record: records) {
+ queries.add(new Query<>(record));
+ }
+ @SuppressWarnings("unchecked")
+ Class<T> recordClass = (Class<T>)
StateStoreUtils.getRecordClass(expectedClazz);
+ Map<Query<T>, Integer> result = remove(recordClass, queries);
+ return result.entrySet().stream()
+ .collect(Collectors.toMap(e -> e.getKey().getPartial(), e ->
e.getValue() > 0));
Review Comment:
`remove(T record)` returns true if `remove(recordClass, query)` is 1. But
here is `e.getValue() > 0`. So how about make them consistent?
Here, how about using `e.getValue() == 1`?
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java:
##########
@@ -565,7 +568,7 @@ public void testRegistrationExpiredRaceCondition()
// Load cache
MembershipStore memStoreSpy = spy(membershipStore);
DelayAnswer delayer = new DelayAnswer(LOG);
- doAnswer(delayer).when(memStoreSpy).overrideExpiredRecords(any());
+ doAnswer(delayer).when(memStoreSpy).overrideExpiredRecords(any(),
anyBoolean());
Review Comment:
remove this `anyBoolean()`
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -284,51 +288,88 @@ public <T extends BaseRecord> StateStoreOperationResult
putAll(
}
@Override
- public <T extends BaseRecord> int remove(
- Class<T> clazz, Query<T> query) throws IOException {
+ public <T extends BaseRecord> Map<Query<T>, Integer> remove(Class<T> clazz,
+ List<Query<T>> queries) throws IOException {
verifyDriverReady();
- if (query == null) {
- return 0;
+ // Track how many entries are deleted by each query
+ Map<Query<T>, Integer> ret = new HashMap<>();
+ final List<T> trueRemoved = Collections.synchronizedList(new
ArrayList<>());
+ if (queries.isEmpty()) {
+ return ret;
}
// Read the current data
long start = monotonicNow();
- List<T> records = null;
+ List<T> records;
try {
QueryResult<T> result = get(clazz);
records = result.getRecords();
} catch (IOException ex) {
LOG.error("Cannot get existing records", ex);
getMetrics().addFailure(monotonicNow() - start);
- return 0;
+ return ret;
}
// Check the records to remove
String znode = getZNodeForClass(clazz);
- List<T> recordsToRemove = filterMultiple(query, records);
+ Set<T> recordsToRemove = new HashSet<>();
+ Map<Query<T>, List<T>> queryToRecords = new HashMap<>();
+ for (Query<T> query : queries) {
+ List<T> filtered = filterMultiple(query, records);
+ queryToRecords.put(query, filtered);
+ recordsToRemove.addAll(filtered);
+ }
// Remove the records
- int removed = 0;
- for (T existingRecord : recordsToRemove) {
+ List<Callable<Void>> callables = new ArrayList<>();
+ recordsToRemove.forEach(existingRecord -> callables.add(() -> {
LOG.info("Removing \"{}\"", existingRecord);
try {
String primaryKey = getPrimaryKey(existingRecord);
String path = getNodePath(znode, primaryKey);
if (zkManager.delete(path)) {
- removed++;
+ trueRemoved.add(existingRecord);
} else {
LOG.error("Did not remove \"{}\"", existingRecord);
}
} catch (Exception e) {
LOG.error("Cannot remove \"{}\"", existingRecord, e);
getMetrics().addFailure(monotonicNow() - start);
}
+ return null;
+ }));
+ try {
+ if (enableConcurrent) {
+ executorService.invokeAll(callables);
+ } else {
+ for (Callable<Void> callable : callables) {
+ callable.call();
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Record removal failed : {}", e.getMessage(), e);
+ throw new IOException(e);
Review Comment:
`throw new IOException(e);` should be removed since `remove(Class<T> clazz,
Query<T> query)` does not throw any exceptions except
`StateStoreUnavailableException` in `verifyDriverReady`.
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -284,51 +288,88 @@ public <T extends BaseRecord> StateStoreOperationResult
putAll(
}
@Override
- public <T extends BaseRecord> int remove(
- Class<T> clazz, Query<T> query) throws IOException {
+ public <T extends BaseRecord> Map<Query<T>, Integer> remove(Class<T> clazz,
+ List<Query<T>> queries) throws IOException {
verifyDriverReady();
- if (query == null) {
- return 0;
+ // Track how many entries are deleted by each query
+ Map<Query<T>, Integer> ret = new HashMap<>();
+ final List<T> trueRemoved = Collections.synchronizedList(new
ArrayList<>());
+ if (queries.isEmpty()) {
+ return ret;
}
// Read the current data
long start = monotonicNow();
- List<T> records = null;
+ List<T> records;
try {
QueryResult<T> result = get(clazz);
records = result.getRecords();
} catch (IOException ex) {
LOG.error("Cannot get existing records", ex);
getMetrics().addFailure(monotonicNow() - start);
- return 0;
+ return ret;
}
// Check the records to remove
String znode = getZNodeForClass(clazz);
- List<T> recordsToRemove = filterMultiple(query, records);
+ Set<T> recordsToRemove = new HashSet<>();
+ Map<Query<T>, List<T>> queryToRecords = new HashMap<>();
+ for (Query<T> query : queries) {
+ List<T> filtered = filterMultiple(query, records);
+ queryToRecords.put(query, filtered);
+ recordsToRemove.addAll(filtered);
+ }
// Remove the records
- int removed = 0;
- for (T existingRecord : recordsToRemove) {
+ List<Callable<Void>> callables = new ArrayList<>();
+ recordsToRemove.forEach(existingRecord -> callables.add(() -> {
LOG.info("Removing \"{}\"", existingRecord);
try {
String primaryKey = getPrimaryKey(existingRecord);
String path = getNodePath(znode, primaryKey);
if (zkManager.delete(path)) {
- removed++;
+ trueRemoved.add(existingRecord);
} else {
LOG.error("Did not remove \"{}\"", existingRecord);
}
} catch (Exception e) {
LOG.error("Cannot remove \"{}\"", existingRecord, e);
getMetrics().addFailure(monotonicNow() - start);
}
+ return null;
+ }));
+ try {
+ if (enableConcurrent) {
+ executorService.invokeAll(callables);
+ } else {
+ for (Callable<Void> callable : callables) {
+ callable.call();
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Record removal failed : {}", e.getMessage(), e);
+ throw new IOException(e);
}
long end = monotonicNow();
- if (removed > 0) {
+ if (!trueRemoved.isEmpty()) {
getMetrics().addRemove(end - start);
}
- return removed;
+ // Generate return map
+ for (Map.Entry<Query<T>, List<T>> entry : queryToRecords.entrySet()) {
+ for (T record : entry.getValue()) {
+ if (trueRemoved.contains(record)) {
+ ret.compute(entry.getKey(), (k, v) -> (v == null) ? 1 : v + 1);
+ break;
Review Comment:
This `break` should be removed?
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java:
##########
@@ -86,4 +89,37 @@ public <T extends BaseRecord> boolean remove(T record)
throws IOException {
Class<T> recordClass = (Class<T>)StateStoreUtils.getRecordClass(clazz);
return remove(recordClass, query) == 1;
}
+
+ @Override
+ public <T extends BaseRecord> Map<T, Boolean> removeMultiple(List<T>
records) throws IOException {
+ assert !records.isEmpty();
+ // Fall back to iterative remove() calls if all records don't share 1 class
+ Class<? extends BaseRecord> expectedClazz = records.get(0).getClass();
+ if (!records.stream().allMatch(x -> x.getClass() == expectedClazz)) {
+ Map<T, Boolean> result = new HashMap<>();
+ for (T record : records) {
+ result.put(record, remove(record));
+ }
+ return result;
+ }
+
+ final List<Query<T>> queries = new ArrayList<>();
+ for (T record: records) {
Review Comment:
`for (T record : records) {`
> Improve router state store cache entry deletion
> -----------------------------------------------
>
> Key: HDFS-17529
> URL: https://issues.apache.org/jira/browse/HDFS-17529
> Project: Hadoop HDFS
> Issue Type: Improvement
> Components: hdfs, rbf
> Reporter: Felix N
> Assignee: Felix N
> Priority: Major
> Labels: pull-request-available
>
> Current implementation for router state store update is quite inefficient, so
> much that when routers are removed and a lot of NameNodeMembership records
> are deleted in a short burst, the deletions triggered a router safemode in
> our cluster and caused a lot of troubles.
> This ticket aims to improve the deletion process for ZK state store
> implementation.
> See HDFS-17532 for the other half of this improvement
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]