jasperjiaguo commented on code in PR #14214:
URL: https://github.com/apache/pinot/pull/14214#discussion_r1797515746
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java:
##########
@@ -124,34 +124,27 @@ public IdealState commit(HelixManager helixManager,
String resourceName,
Entry first = queue._pending.poll();
processed.add(first);
String mergedResourceName = first._resourceName;
Review Comment:
Not quite related to the fix, but I think
`Queue queue = getQueue(resourceName);`
so queue should already only has resourceName?
Why do we pull mergedResourceName again?
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java:
##########
@@ -124,34 +124,27 @@ public IdealState commit(HelixManager helixManager,
String resourceName,
Entry first = queue._pending.poll();
processed.add(first);
String mergedResourceName = first._resourceName;
- HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
- PropertyKey idealStateKey =
dataAccessor.keyBuilder().idealStates(resourceName);
- IdealState idealState = dataAccessor.getProperty(idealStateKey);
-
- // Make a copy of the idealState above to pass it to the updater
- // NOTE: new IdealState(idealState.getRecord()) does not work
because it's shallow copy for map fields and
- // list fields
- IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState);
- /**
- * If the local cache does not contain a value, need to check if
there is a
- * value in ZK; use it as initial value if exists
- */
- IdealState updatedIdealState = first._updater.apply(idealStateCopy);
- first._updatedIdealState = updatedIdealState;
- Iterator<Entry> it = queue._pending.iterator();
- while (it.hasNext()) {
- Entry ent = it.next();
- if (!ent._resourceName.equals(mergedResourceName)) {
- continue;
- }
- processed.add(ent);
- updatedIdealState = ent._updater.apply(idealStateCopy);
- ent._updatedIdealState = updatedIdealState;
- it.remove();
- }
- IdealState finalUpdatedIdealState = updatedIdealState;
- updateIdealState(helixManager, resourceName, anyIdealState ->
finalUpdatedIdealState,
+ updateIdealState(helixManager, mergedResourceName, idealState -> {
+ // Make a copy of the idealState above to pass it to the
updater
+ // NOTE: new IdealState(idealState.getRecord()) does not work
because it's shallow copy for map fields
+ // and list fields
+ IdealState idealStateCopy =
HelixHelper.cloneIdealState(idealState);
+ IdealState updatedIdealState =
first._updater.apply(idealStateCopy);
+ first._updatedIdealState = updatedIdealState;
+ Iterator<Entry> it = queue._pending.iterator();
+ while (it.hasNext()) {
+ Entry ent = it.next();
+ if (!ent._resourceName.equals(mergedResourceName)) {
+ continue;
+ }
+ processed.add(ent);
+ updatedIdealState = ent._updater.apply(updatedIdealState);
+ ent._updatedIdealState = updatedIdealState;
+ it.remove();
+ }
+ return updatedIdealState;
+ },
retryPolicy, noChangeOk);
Review Comment:
Not quite related to the fix, but if some max attempt failure happens, we
would lost track of the series of IS updates removed from the queue?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]