dinoocch commented on code in PR #14237:
URL: https://github.com/apache/pinot/pull/14237#discussion_r1802191661
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java:
##########
@@ -120,39 +121,46 @@ public IdealState commit(HelixManager helixManager,
String resourceName,
// All pending entries have been processed, the updatedIdealState
should be set.
return entry._updatedIdealState;
}
- // remove from queue
- Entry first = queue._pending.poll();
- processed.add(first);
- String mergedResourceName = first._resourceName;
- HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
- PropertyKey idealStateKey =
dataAccessor.keyBuilder().idealStates(resourceName);
- IdealState idealState = dataAccessor.getProperty(idealStateKey);
-
- // Make a copy of the idealState above to pass it to the updater
- // NOTE: new IdealState(idealState.getRecord()) does not work
because it's shallow copy for map fields and
- // list fields
- IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState);
-
- /**
- * If the local cache does not contain a value, need to check if
there is a
- * value in ZK; use it as initial value if exists
- */
- IdealState updatedIdealState = first._updater.apply(idealStateCopy);
- first._updatedIdealState = updatedIdealState;
- Iterator<Entry> it = queue._pending.iterator();
- while (it.hasNext()) {
- Entry ent = it.next();
- if (!ent._resourceName.equals(mergedResourceName)) {
- continue;
+ IdealState response = updateIdealState(helixManager, resourceName,
idealState -> {
Review Comment:
My only concern with handling _only_ the resource for the current thread is
that it breaks the fifo behavior of the queue. So I wonder if some requests
would be unfairly starved if they get unlucky?
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java:
##########
@@ -120,39 +121,46 @@ public IdealState commit(HelixManager helixManager,
String resourceName,
// All pending entries have been processed, the updatedIdealState
should be set.
return entry._updatedIdealState;
}
- // remove from queue
- Entry first = queue._pending.poll();
- processed.add(first);
- String mergedResourceName = first._resourceName;
- HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
- PropertyKey idealStateKey =
dataAccessor.keyBuilder().idealStates(resourceName);
- IdealState idealState = dataAccessor.getProperty(idealStateKey);
-
- // Make a copy of the idealState above to pass it to the updater
- // NOTE: new IdealState(idealState.getRecord()) does not work
because it's shallow copy for map fields and
- // list fields
- IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState);
-
- /**
- * If the local cache does not contain a value, need to check if
there is a
- * value in ZK; use it as initial value if exists
- */
- IdealState updatedIdealState = first._updater.apply(idealStateCopy);
- first._updatedIdealState = updatedIdealState;
- Iterator<Entry> it = queue._pending.iterator();
- while (it.hasNext()) {
- Entry ent = it.next();
- if (!ent._resourceName.equals(mergedResourceName)) {
- continue;
+ IdealState response = updateIdealState(helixManager, resourceName,
idealState -> {
+ IdealState updatedIdealState = idealState;
+ if (!processed.isEmpty()) {
+ queue._pending.addAll(processed);
+ processed.clear();
+ }
+ Iterator<Entry> it = queue._pending.iterator();
+ boolean noChange = true;
+ while (it.hasNext()) {
+ Entry ent = it.next();
+ if (!ent._resourceName.equals(resourceName)) {
+ continue;
+ }
+ processed.add(ent);
+ it.remove();
+ updatedIdealState = ent._updater.apply(updatedIdealState);
+ noChange = false;
+ ent._updatedIdealState = updatedIdealState;
+ ent._exception = null;
+ }
+ if (noChange) {
+ return null;
Review Comment:
I'm not quite sure what this `return null` would imply here? I'd expect this
case should be the same `return entry._updatedIdealState;`? But might be
missing something?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]