phet commented on a change in pull request #3415:
URL: https://github.com/apache/gobblin/pull/3415#discussion_r743140827
##########
File path:
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -1063,26 +1092,41 @@ private void releaseQuota(DagNode<JobExecutionPlan>
dagNode) {
String proxyUser =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
AzkabanProjectConfig.USER_TO_PROXY, null);
if (proxyUser != null) {
String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser,
dagNode);
- if (proxyUserToJobCount.containsKey(proxyUserKey) &&
proxyUserToJobCount.get(proxyUserKey) > 0) {
- proxyUserToJobCount.put(proxyUserKey,
proxyUserToJobCount.get(proxyUserKey) - 1);
- }
+ decrementQuotaUsage(proxyUserToJobCount, proxyUserKey);
}
String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
if (serializedRequesters != null) {
try {
for (ServiceRequester requester :
RequesterService.deserialize(serializedRequesters)) {
String requesterKey =
DagManagerUtils.getUserQuotaKey(requester.getName(), dagNode);
- if (requesterToJobCount.containsKey(requesterKey) &&
requesterToJobCount.get(requesterKey) > 0) {
- requesterToJobCount.put(requesterKey,
requesterToJobCount.get(requesterKey) - 1);
- }
+ decrementQuotaUsage(requesterToJobCount, requesterKey);
}
} catch (IOException e) {
log.error("Failed to release quota for requester list " +
serializedRequesters, e);
}
}
}
+ private void decrementQuotaUsage(Map<String, Integer> quotaMap, String
user) {
+ Integer currentCount;
+ do {
+ currentCount = quotaMap.get(user);
+ } while (currentCount != null && currentCount > 0 &&
!quotaMap.replace(user, currentCount, currentCount - 1));
+ }
+
+ private void decrementQuotaUsageForUsers(Map<String, Integer>
requesterAndCountToDecreaseMap) {
+ for (Map.Entry<String, Integer> entry :
requesterAndCountToDecreaseMap.entrySet()) {
+ String user = entry.getKey();
+ Integer currentCount;
+ int newCount;
+ do {
+ currentCount = DagManagerThread.requesterToJobCount.get(user);
+ newCount = Math.max(0,
DagManagerThread.requesterToJobCount.get(user) - entry.getValue());
+ } while (currentCount != null && currentCount > 0 &&
!DagManagerThread.requesterToJobCount.replace(user, currentCount, newCount));
Review comment:
the short-circuit on both of these might not be solely that
`currentCount > 0`, but that `currentCount != newCount`
##########
File path:
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -984,15 +985,16 @@ private void checkQuota(DagNode<JobExecutionPlan>
dagNode) throws IOException {
}
if (!requesterCheck) {
- throw new IOException(requesterMessage);
+ throw new IOException(requesterMessage.toString());
}
}
/**
* Increment quota by one for the given map and key.
+ * We need synchronization on this method because quotaMap is shared among
all the {@link DagManagerThread}s.
* @return true if quota is not reached for this user or user is
whitelisted, false otherwise.
*/
- private boolean incrementMapAndCheckQuota(Map<String, Integer> quotaMap,
String user, DagNode<JobExecutionPlan> dagNode) {
+ synchronized private boolean incrementMapAndCheckQuota(Map<String,
Integer> quotaMap, String user, DagNode<JobExecutionPlan> dagNode) {
String key = DagManagerUtils.getUserQuotaKey(user, dagNode);
Review comment:
maybe just a comment acknowledging the mismatch between naming and
semantics. e.g. javadoc for `getQuotaForUser` might say, "really the quota per
DAG node, per user" while linking to `getUserQuotaKey` as a reminder
##########
File path:
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -960,49 +962,76 @@ private void checkQuota(DagNode<JobExecutionPlan>
dagNode) throws IOException {
String proxyUser =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
AzkabanProjectConfig.USER_TO_PROXY, null);
String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
boolean proxyUserCheck = true;
+ int proxyQuotaIncrement;
+ Map<String, Integer> usersQuotaIncrement = new HashMap<>();
+ StringBuilder requesterMessage = new StringBuilder();
+
if (proxyUser != null) {
- proxyUserCheck = incrementMapAndCheckQuota(proxyUserToJobCount,
proxyUser, dagNode);
+ proxyQuotaIncrement =
incrementJobCountAndCheckUserQuota(proxyUserToJobCount, proxyUser, dagNode);
+ proxyUserCheck = proxyQuotaIncrement < 0; // proxy user quota check
failed
+ if (!proxyUserCheck) {
+ requesterMessage.append(String.format(
+ "Quota exceeded for proxy user %s on executor %s : quota=%s,
runningJobs=%d%n",
+ proxyUser, specExecutorUri, getQuotaForUser(proxyUser),
+
proxyUserToJobCount.get(DagManagerUtils.getUserQuotaKey(proxyUser, dagNode))));
+ }
}
String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
boolean requesterCheck = true;
- String requesterMessage = null;
+
if (serializedRequesters != null) {
for (ServiceRequester requester :
RequesterService.deserialize(serializedRequesters)) {
- requesterCheck &= incrementMapAndCheckQuota(requesterToJobCount,
requester.getName(), dagNode);
- if (!requesterCheck && requesterMessage == null) {
- requesterMessage = "Quota exceeded for requester " +
requester.getName() + " on executor " + specExecutorUri + ": quota="
- + getQuotaForUser(requester.getName()) + ", runningJobs=" +
requesterToJobCount.get(DagManagerUtils.getUserQuotaKey(requester.getName(),
dagNode));
+ int userQuotaIncrement =
incrementJobCountAndCheckUserQuota(requesterToJobCount, requester.getName(),
dagNode);
+ boolean thisRequesterCheck;
+ thisRequesterCheck = userQuotaIncrement < 0; // user quota check
failed
+ usersQuotaIncrement.put(requester.getName(),
Math.max(userQuotaIncrement, 0));
+ requesterCheck = requesterCheck && thisRequesterCheck;
+ if (!thisRequesterCheck) {
+ requesterMessage.append(String.format(
+ "Quota exceeded for requester %s on executor %s : quota=%s,
runningJobs=%d%n",
+ requester.getName(), specExecutorUri,
getQuotaForUser(requester.getName()),
+
requesterToJobCount.get(DagManagerUtils.getUserQuotaKey(requester.getName(),
dagNode))));
}
}
}
// Throw errors for reach quota at the end to avoid inconsistent job
counts
- if (!proxyUserCheck) {
- throw new IOException("Quota exceeded for proxy user " + proxyUser + "
on executor " + specExecutorUri +
- ": quota=" + getQuotaForUser(proxyUser) + ", runningJobs=" +
proxyUserToJobCount.get(DagManagerUtils.getUserQuotaKey(proxyUser, dagNode)));
- }
+ if (!proxyUserCheck || !requesterCheck) {
+ if (!proxyUserCheck) {
+ decrementQuotaUsage(proxyUserToJobCount, proxyUser);
+ }
+
+ if (!requesterCheck) {
+ decrementQuotaUsageForUsers(usersQuotaIncrement);
+ }
- if (!requesterCheck) {
- throw new IOException(requesterMessage);
+ throw new IOException(requesterMessage.toString());
}
}
/**
* Increment quota by one for the given map and key.
- * @return true if quota is not reached for this user or user is
whitelisted, false otherwise.
+ * @return -1 if quota is already reached for this user, 0 if quota does
not need to be increased, +1 if used quota is increased by 1
*/
- private boolean incrementMapAndCheckQuota(Map<String, Integer> quotaMap,
String user, DagNode<JobExecutionPlan> dagNode) {
+ private int incrementJobCountAndCheckUserQuota(Map<String, Integer>
quotaMap, String user, DagNode<JobExecutionPlan> dagNode) {
String key = DagManagerUtils.getUserQuotaKey(user, dagNode);
- int jobCount = quotaMap.getOrDefault(key, 0);
// Only increment job count for first attempt, since job is considered
running between retries
- if (dagNode.getValue().getCurrentAttempts() == 1) {
- jobCount++;
- quotaMap.put(key, jobCount);
+ if (dagNode.getValue().getCurrentAttempts() != 1) {
+ return 0;
}
- return jobCount <= getQuotaForUser(user);
+ int jobCount = quotaMap.getOrDefault(key, 0);
+ jobCount++;
+
+ if (jobCount > getQuotaForUser(user)) {
+ return -1;
+ }
+
+ quotaMap.put(key, jobCount);
Review comment:
the same `ConcurrentHashMap::replace` from decrement should be used here
too, on the increment.
##########
File path:
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -960,49 +962,76 @@ private void checkQuota(DagNode<JobExecutionPlan>
dagNode) throws IOException {
String proxyUser =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
AzkabanProjectConfig.USER_TO_PROXY, null);
String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
boolean proxyUserCheck = true;
+ int proxyQuotaIncrement;
+ Map<String, Integer> usersQuotaIncrement = new HashMap<>();
+ StringBuilder requesterMessage = new StringBuilder();
+
if (proxyUser != null) {
- proxyUserCheck = incrementMapAndCheckQuota(proxyUserToJobCount,
proxyUser, dagNode);
+ proxyQuotaIncrement =
incrementJobCountAndCheckUserQuota(proxyUserToJobCount, proxyUser, dagNode);
+ proxyUserCheck = proxyQuotaIncrement < 0; // proxy user quota check
failed
+ if (!proxyUserCheck) {
+ requesterMessage.append(String.format(
+ "Quota exceeded for proxy user %s on executor %s : quota=%s,
runningJobs=%d%n",
+ proxyUser, specExecutorUri, getQuotaForUser(proxyUser),
+
proxyUserToJobCount.get(DagManagerUtils.getUserQuotaKey(proxyUser, dagNode))));
+ }
}
String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
boolean requesterCheck = true;
- String requesterMessage = null;
+
if (serializedRequesters != null) {
for (ServiceRequester requester :
RequesterService.deserialize(serializedRequesters)) {
- requesterCheck &= incrementMapAndCheckQuota(requesterToJobCount,
requester.getName(), dagNode);
- if (!requesterCheck && requesterMessage == null) {
- requesterMessage = "Quota exceeded for requester " +
requester.getName() + " on executor " + specExecutorUri + ": quota="
- + getQuotaForUser(requester.getName()) + ", runningJobs=" +
requesterToJobCount.get(DagManagerUtils.getUserQuotaKey(requester.getName(),
dagNode));
+ int userQuotaIncrement =
incrementJobCountAndCheckUserQuota(requesterToJobCount, requester.getName(),
dagNode);
+ boolean thisRequesterCheck;
+ thisRequesterCheck = userQuotaIncrement < 0; // user quota check
failed
+ usersQuotaIncrement.put(requester.getName(),
Math.max(userQuotaIncrement, 0));
Review comment:
uncertain, so checking: do you know the requesters seq to be distinct?
if not, possible you'd overwrite the key w/ 0 when the same one is seen again a
subsequent time.
##########
File path:
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -960,49 +962,76 @@ private void checkQuota(DagNode<JobExecutionPlan>
dagNode) throws IOException {
String proxyUser =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
AzkabanProjectConfig.USER_TO_PROXY, null);
String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
boolean proxyUserCheck = true;
+ int proxyQuotaIncrement;
+ Map<String, Integer> usersQuotaIncrement = new HashMap<>();
+ StringBuilder requesterMessage = new StringBuilder();
+
if (proxyUser != null) {
- proxyUserCheck = incrementMapAndCheckQuota(proxyUserToJobCount,
proxyUser, dagNode);
+ proxyQuotaIncrement =
incrementJobCountAndCheckUserQuota(proxyUserToJobCount, proxyUser, dagNode);
+ proxyUserCheck = proxyQuotaIncrement < 0; // proxy user quota check
failed
+ if (!proxyUserCheck) {
+ requesterMessage.append(String.format(
+ "Quota exceeded for proxy user %s on executor %s : quota=%s,
runningJobs=%d%n",
+ proxyUser, specExecutorUri, getQuotaForUser(proxyUser),
+
proxyUserToJobCount.get(DagManagerUtils.getUserQuotaKey(proxyUser, dagNode))));
+ }
}
String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
boolean requesterCheck = true;
- String requesterMessage = null;
+
if (serializedRequesters != null) {
for (ServiceRequester requester :
RequesterService.deserialize(serializedRequesters)) {
- requesterCheck &= incrementMapAndCheckQuota(requesterToJobCount,
requester.getName(), dagNode);
- if (!requesterCheck && requesterMessage == null) {
- requesterMessage = "Quota exceeded for requester " +
requester.getName() + " on executor " + specExecutorUri + ": quota="
- + getQuotaForUser(requester.getName()) + ", runningJobs=" +
requesterToJobCount.get(DagManagerUtils.getUserQuotaKey(requester.getName(),
dagNode));
+ int userQuotaIncrement =
incrementJobCountAndCheckUserQuota(requesterToJobCount, requester.getName(),
dagNode);
+ boolean thisRequesterCheck;
+ thisRequesterCheck = userQuotaIncrement < 0; // user quota check
failed
+ usersQuotaIncrement.put(requester.getName(),
Math.max(userQuotaIncrement, 0));
+ requesterCheck = requesterCheck && thisRequesterCheck;
+ if (!thisRequesterCheck) {
+ requesterMessage.append(String.format(
+ "Quota exceeded for requester %s on executor %s : quota=%s,
runningJobs=%d%n",
+ requester.getName(), specExecutorUri,
getQuotaForUser(requester.getName()),
+
requesterToJobCount.get(DagManagerUtils.getUserQuotaKey(requester.getName(),
dagNode))));
}
}
}
// Throw errors for reach quota at the end to avoid inconsistent job
counts
- if (!proxyUserCheck) {
- throw new IOException("Quota exceeded for proxy user " + proxyUser + "
on executor " + specExecutorUri +
- ": quota=" + getQuotaForUser(proxyUser) + ", runningJobs=" +
proxyUserToJobCount.get(DagManagerUtils.getUserQuotaKey(proxyUser, dagNode)));
- }
+ if (!proxyUserCheck || !requesterCheck) {
+ if (!proxyUserCheck) {
+ decrementQuotaUsage(proxyUserToJobCount, proxyUser);
+ }
+
+ if (!requesterCheck) {
+ decrementQuotaUsageForUsers(usersQuotaIncrement);
+ }
- if (!requesterCheck) {
- throw new IOException(requesterMessage);
+ throw new IOException(requesterMessage.toString());
}
}
/**
* Increment quota by one for the given map and key.
- * @return true if quota is not reached for this user or user is
whitelisted, false otherwise.
+ * @return -1 if quota is already reached for this user, 0 if quota does
not need to be increased, +1 if used quota is increased by 1
Review comment:
as above, multiple threads may interleave increments and decrements, so
if you'd like to log the actual number at the time the quota was exhausted,
you'll have to return it here. you could return the count as a negative
number, with the sign acting as a sentinel of failure.
I'm comfortable w/ overloading the int return type w/ complex semantics,
simply because Java lacks the Algebraic Data Types clearly suited to a
direct/natural representation. although, somewhat "clearer" to use (a
full-fledged impl of) `Either` as we discussed earlier, this being a `private`
method, I see no pressure to do so, and ultimately meager payoff.
##########
File path:
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -960,49 +962,76 @@ private void checkQuota(DagNode<JobExecutionPlan>
dagNode) throws IOException {
String proxyUser =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
AzkabanProjectConfig.USER_TO_PROXY, null);
String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
boolean proxyUserCheck = true;
+ int proxyQuotaIncrement;
+ Map<String, Integer> usersQuotaIncrement = new HashMap<>();
+ StringBuilder requesterMessage = new StringBuilder();
+
if (proxyUser != null) {
- proxyUserCheck = incrementMapAndCheckQuota(proxyUserToJobCount,
proxyUser, dagNode);
+ proxyQuotaIncrement =
incrementJobCountAndCheckUserQuota(proxyUserToJobCount, proxyUser, dagNode);
+ proxyUserCheck = proxyQuotaIncrement < 0; // proxy user quota check
failed
+ if (!proxyUserCheck) {
+ requesterMessage.append(String.format(
+ "Quota exceeded for proxy user %s on executor %s : quota=%s,
runningJobs=%d%n",
+ proxyUser, specExecutorUri, getQuotaForUser(proxyUser),
+
proxyUserToJobCount.get(DagManagerUtils.getUserQuotaKey(proxyUser, dagNode))));
+ }
}
String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
boolean requesterCheck = true;
- String requesterMessage = null;
+
if (serializedRequesters != null) {
for (ServiceRequester requester :
RequesterService.deserialize(serializedRequesters)) {
- requesterCheck &= incrementMapAndCheckQuota(requesterToJobCount,
requester.getName(), dagNode);
- if (!requesterCheck && requesterMessage == null) {
- requesterMessage = "Quota exceeded for requester " +
requester.getName() + " on executor " + specExecutorUri + ": quota="
- + getQuotaForUser(requester.getName()) + ", runningJobs=" +
requesterToJobCount.get(DagManagerUtils.getUserQuotaKey(requester.getName(),
dagNode));
+ int userQuotaIncrement =
incrementJobCountAndCheckUserQuota(requesterToJobCount, requester.getName(),
dagNode);
+ boolean thisRequesterCheck;
+ thisRequesterCheck = userQuotaIncrement < 0; // user quota check
failed
+ usersQuotaIncrement.put(requester.getName(),
Math.max(userQuotaIncrement, 0));
+ requesterCheck = requesterCheck && thisRequesterCheck;
+ if (!thisRequesterCheck) {
+ requesterMessage.append(String.format(
+ "Quota exceeded for requester %s on executor %s : quota=%s,
runningJobs=%d%n",
+ requester.getName(), specExecutorUri,
getQuotaForUser(requester.getName()),
Review comment:
given multi-threading, `getQuotaForUser` may differ from what it was
during `incrementJobCountAndCheck...`; need to return then-current value from
that method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]