[
https://issues.apache.org/jira/browse/GOBBLIN-1703?focusedWorklogId=813088&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-813088
]
ASF GitHub Bot logged work on GOBBLIN-1703:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 28/Sep/22 18:12
Start Date: 28/Sep/22 18:12
Worklog Time Spent: 10m
Work Description: arjun4084346 commented on code in PR #3550:
URL: https://github.com/apache/gobblin/pull/3550#discussion_r982722025
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java:
##########
@@ -47,55 +49,259 @@
@Slf4j
@Singleton
public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
- private final MysqlQuotaStore mysqlStore;
+ public final MysqlQuotaStore quotaStore;
+ public final RunningDagIdsStore runningDagIds;
+
@Inject
public MysqlUserQuotaManager(Config config) throws IOException {
super(config);
- this.mysqlStore = createQuotaStore(config);
+ this.quotaStore = createQuotaStore(config);
+ this.runningDagIds = createRunningDagStore(config);
+ }
+
+ void addDagId(Connection connection, String dagId) throws IOException {
+ this.runningDagIds.add(connection, dagId);
+ }
+
+ @Override
+ boolean containsDagId(String dagId) throws IOException {
+ return this.runningDagIds.contains(dagId);
+ }
+
+ boolean removeDagId(Connection connection, String dagId) throws IOException {
+ return this.runningDagIds.remove(connection, dagId);
}
// This implementation does not need to update quota usage when the service
restarts or it's leadership status changes
public void init(Collection<Dag<JobExecutionPlan>> dags) {
}
+ int incrementJobCount(Connection connection, String user, CountType
countType) throws IOException, SQLException {
+ return this.quotaStore.increaseCount(connection, user, countType);
+ }
+
+ void decrementJobCount(Connection connection,String user, CountType
countType) throws IOException, SQLException {
+ this.quotaStore.decreaseCount(user, countType);
+ }
+
@Override
- int incrementJobCount(String user, CountType countType) throws IOException {
+ protected QuotaCheck increaseAndCheckQuota(Dag.DagNode<JobExecutionPlan>
dagNode) throws IOException {
+ QuotaCheck quotaCheck = new QuotaCheck(true, true, true, "");
+ Connection connection;
try {
- return this.mysqlStore.increaseCount(user, countType);
+ connection = this.quotaStore.dataSource.getConnection();
+ connection.setAutoCommit(false);
} catch (SQLException e) {
throw new IOException(e);
}
+ StringBuilder requesterMessage = new StringBuilder();
+
+ // Dag is already being tracked, no need to double increment for retries
and multihop flows
+ try {
+ if (containsDagId(DagManagerUtils.generateDagId(dagNode).toString())) {
+ return quotaCheck;
+ } else {
+ addDagId(connection,
DagManagerUtils.generateDagId(dagNode).toString());
+ }
+
+ String proxyUser =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
AzkabanProjectConfig.USER_TO_PROXY, null);
+ String flowGroup =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
+ ConfigurationKeys.FLOW_GROUP_KEY, "");
+ String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+
+ boolean proxyUserCheck;
+
+ if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) {
+ int proxyQuotaIncrement = incrementJobCountAndCheckQuota(connection,
+ DagManagerUtils.getUserQuotaKey(proxyUser, dagNode),
getQuotaForUser(proxyUser), CountType.USER_COUNT);
+ proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check
succeeds
+ quotaCheck.setProxyUserCheck(proxyUserCheck);
+ if (!proxyUserCheck) {
+ // add 1 to proxyUserIncrement since proxyQuotaIncrement is the
count before the increment
+ requesterMessage.append(String.format(
+ "Quota exceeded for proxy user %s on executor %s : quota=%s,
requests above quota=%d%n",
+ proxyUser, specExecutorUri, getQuotaForUser(proxyUser),
Math.abs(proxyQuotaIncrement) + 1 - getQuotaForUser(proxyUser)));
+ }
+ }
+
+ String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
+ boolean requesterCheck = true;
+
+ if (dagNode.getValue().getCurrentAttempts() <= 1) {
+ List<String> uniqueRequesters =
DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
+ for (String requester : uniqueRequesters) {
+ int userQuotaIncrement = incrementJobCountAndCheckQuota(connection,
DagManagerUtils.getUserQuotaKey(requester, dagNode),
+ getQuotaForUser(requester), CountType.REQUESTER_COUNT);
+ boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota
check succeeds
+ requesterCheck = requesterCheck && thisRequesterCheck;
+ quotaCheck.setRequesterCheck(requesterCheck);
+ if (!thisRequesterCheck) {
+ requesterMessage.append(String.format("Quota exceeded for
requester %s on executor %s : quota=%s, requests above quota=%d%n. ",
+ requester, specExecutorUri, getQuotaForUser(requester),
Math.abs(userQuotaIncrement) + 1 - getQuotaForUser(requester)));
+ }
+ }
+ }
+
+ boolean flowGroupCheck;
+
+ if (dagNode.getValue().getCurrentAttempts() <= 1) {
+ int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(connection,
+ DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode),
getQuotaForFlowGroup(flowGroup), CountType.FLOWGROUP_COUNT);
+ flowGroupCheck = flowGroupQuotaIncrement >= 0;
+ quotaCheck.setFlowGroupCheck(flowGroupCheck);
+ if (!flowGroupCheck) {
Review Comment:
Do you mean if user check fails, should we immediately stop without trying
to check proxy/requesterService quota? I think we can, but the existing code
would check all the quotas anyway to be able to form a complete error message
(in `requesterMessage`) so I chose not to disturb that functionality
Issue Time Tracking
-------------------
Worklog Id: (was: 813088)
Time Spent: 1h 10m (was: 1h)
> avoid double quota usage increment for ad hoc flows
> ---------------------------------------------------
>
> Key: GOBBLIN-1703
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1703
> Project: Apache Gobblin
> Issue Type: Bug
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> When a gaas flow request comes, resource handler checks the quota right there.
> However, if the flow has runImmediately=true, the quota will be checked again
> when the first job starts. This should be avoided.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)