dinoocch commented on code in PR #14237:
URL: https://github.com/apache/pinot/pull/14237#discussion_r1801597267
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java:
##########
@@ -124,35 +125,44 @@ public IdealState commit(HelixManager helixManager,
String resourceName,
Entry first = queue._pending.poll();
processed.add(first);
String mergedResourceName = first._resourceName;
- HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
- PropertyKey idealStateKey =
dataAccessor.keyBuilder().idealStates(resourceName);
- IdealState idealState = dataAccessor.getProperty(idealStateKey);
-
- // Make a copy of the idealState above to pass it to the updater
- // NOTE: new IdealState(idealState.getRecord()) does not work
because it's shallow copy for map fields and
- // list fields
- IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState);
-
- /**
- * If the local cache does not contain a value, need to check if
there is a
- * value in ZK; use it as initial value if exists
- */
- IdealState updatedIdealState = first._updater.apply(idealStateCopy);
- first._updatedIdealState = updatedIdealState;
- Iterator<Entry> it = queue._pending.iterator();
- while (it.hasNext()) {
- Entry ent = it.next();
- if (!ent._resourceName.equals(mergedResourceName)) {
- continue;
+ IdealState response = updateIdealState(helixManager,
mergedResourceName, idealState -> {
+ IdealState updatedIdealState = first._updater.apply(idealState);
+ first._updatedIdealState = updatedIdealState;
+ first._exception = null;
+ if (processed.size() > 1) {
+ queue._pending.addAll(processed.subList(1, processed.size()));
+ processed.clear();
+ processed.add(first);
+ }
+ Iterator<Entry> it = queue._pending.iterator();
+ while (it.hasNext()) {
+ Entry ent = it.next();
+ if (!ent._resourceName.equals(mergedResourceName)) {
+ continue;
+ }
+ processed.add(ent);
+ updatedIdealState = ent._updater.apply(updatedIdealState);
+ ent._updatedIdealState = updatedIdealState;
+ ent._exception = null;
+ it.remove();
}
- processed.add(ent);
- updatedIdealState = ent._updater.apply(idealStateCopy);
- ent._updatedIdealState = updatedIdealState;
- it.remove();
+ return updatedIdealState;
+ }, retryPolicy, noChangeOk);
+ if (response == null) {
+ RuntimeException ex = new RuntimeException("Failed to update
IdealState");
+ for (Entry ent : processed) {
+ ent._exception = ex;
+ ent._updatedIdealState = null;
+ }
+ throw ex;
+ }
+ } catch (Throwable e) {
+ // If the update failed, we should re-add all entries to the queue
+ for (Entry ent : processed) {
+ ent._exception = e;
+ ent._updatedIdealState = null;
}
- IdealState finalUpdatedIdealState = updatedIdealState;
- updateIdealState(helixManager, resourceName, anyIdealState ->
finalUpdatedIdealState,
- retryPolicy, noChangeOk);
+ throw e;
Review Comment:
I think we should only throw if the mergedResourceName matches the one
originally requested perhaps?
Otherwise some "bad" znode update would cause totally unrelated failures.
Plus the "owning" thread of the future `Entry` would have returned a failure
but the `Entry` might eventually still be processed
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java:
##########
@@ -124,35 +125,44 @@ public IdealState commit(HelixManager helixManager,
String resourceName,
Entry first = queue._pending.poll();
processed.add(first);
String mergedResourceName = first._resourceName;
- HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
- PropertyKey idealStateKey =
dataAccessor.keyBuilder().idealStates(resourceName);
- IdealState idealState = dataAccessor.getProperty(idealStateKey);
-
- // Make a copy of the idealState above to pass it to the updater
- // NOTE: new IdealState(idealState.getRecord()) does not work
because it's shallow copy for map fields and
- // list fields
- IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState);
-
- /**
- * If the local cache does not contain a value, need to check if
there is a
- * value in ZK; use it as initial value if exists
- */
- IdealState updatedIdealState = first._updater.apply(idealStateCopy);
- first._updatedIdealState = updatedIdealState;
- Iterator<Entry> it = queue._pending.iterator();
- while (it.hasNext()) {
- Entry ent = it.next();
- if (!ent._resourceName.equals(mergedResourceName)) {
- continue;
+ IdealState response = updateIdealState(helixManager,
mergedResourceName, idealState -> {
+ IdealState updatedIdealState = first._updater.apply(idealState);
+ first._updatedIdealState = updatedIdealState;
+ first._exception = null;
+ if (processed.size() > 1) {
+ queue._pending.addAll(processed.subList(1, processed.size()));
+ processed.clear();
+ processed.add(first);
+ }
+ Iterator<Entry> it = queue._pending.iterator();
+ while (it.hasNext()) {
+ Entry ent = it.next();
+ if (!ent._resourceName.equals(mergedResourceName)) {
+ continue;
+ }
+ processed.add(ent);
+ updatedIdealState = ent._updater.apply(updatedIdealState);
+ ent._updatedIdealState = updatedIdealState;
+ ent._exception = null;
+ it.remove();
}
- processed.add(ent);
- updatedIdealState = ent._updater.apply(idealStateCopy);
- ent._updatedIdealState = updatedIdealState;
- it.remove();
+ return updatedIdealState;
+ }, retryPolicy, noChangeOk);
+ if (response == null) {
+ RuntimeException ex = new RuntimeException("Failed to update
IdealState");
+ for (Entry ent : processed) {
+ ent._exception = ex;
+ ent._updatedIdealState = null;
+ }
+ throw ex;
+ }
+ } catch (Throwable e) {
+ // If the update failed, we should re-add all entries to the queue
Review Comment:
I'm not sure of the intent here with this comment? It doesn't seem to re-add
these to the queue (and I feel it shouldn't)
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java:
##########
@@ -124,35 +125,44 @@ public IdealState commit(HelixManager helixManager,
String resourceName,
Entry first = queue._pending.poll();
processed.add(first);
String mergedResourceName = first._resourceName;
- HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
- PropertyKey idealStateKey =
dataAccessor.keyBuilder().idealStates(resourceName);
- IdealState idealState = dataAccessor.getProperty(idealStateKey);
-
- // Make a copy of the idealState above to pass it to the updater
- // NOTE: new IdealState(idealState.getRecord()) does not work
because it's shallow copy for map fields and
- // list fields
- IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState);
-
- /**
- * If the local cache does not contain a value, need to check if
there is a
- * value in ZK; use it as initial value if exists
- */
- IdealState updatedIdealState = first._updater.apply(idealStateCopy);
- first._updatedIdealState = updatedIdealState;
- Iterator<Entry> it = queue._pending.iterator();
- while (it.hasNext()) {
- Entry ent = it.next();
- if (!ent._resourceName.equals(mergedResourceName)) {
- continue;
+ IdealState response = updateIdealState(helixManager,
mergedResourceName, idealState -> {
+ IdealState updatedIdealState = first._updater.apply(idealState);
+ first._updatedIdealState = updatedIdealState;
+ first._exception = null;
Review Comment:
I think this should be made to never be non-null in this line. For example
when picking first we might want to skip (or remove) any Entries with
exceptions (if this case is even possible)?
Namely I think we should avoid some case where the requesting thread gets an
exception but some other thread still picks up the Entry.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]