[
https://issues.apache.org/jira/browse/GOBBLIN-1703?focusedWorklogId=813781&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-813781
]
ASF GitHub Bot logged work on GOBBLIN-1703:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 30/Sep/22 18:10
Start Date: 30/Sep/22 18:10
Worklog Time Spent: 10m
Work Description: ZihanLi58 commented on code in PR #3550:
URL: https://github.com/apache/gobblin/pull/3550#discussion_r984836918
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java:
##########
@@ -47,55 +50,219 @@
@Slf4j
@Singleton
public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
- private final MysqlQuotaStore mysqlStore;
+ public final MysqlQuotaStore quotaStore;
+ public final RunningDagIdsStore runningDagIds;
+
@Inject
public MysqlUserQuotaManager(Config config) throws IOException {
super(config);
- this.mysqlStore = createQuotaStore(config);
+ this.quotaStore = createQuotaStore(config);
+ this.runningDagIds = createRunningDagStore(config);
+ }
+
+ void addDagId(Connection connection, String dagId) throws IOException {
+ this.runningDagIds.add(connection, dagId);
+ }
+
+ @Override
+ boolean containsDagId(String dagId) throws IOException {
+ return this.runningDagIds.contains(dagId);
+ }
+
+ boolean removeDagId(Connection connection, String dagId) throws IOException {
+ return this.runningDagIds.remove(connection, dagId);
}
// This implementation does not need to update quota usage when the service
restarts or it's leadership status changes
public void init(Collection<Dag<JobExecutionPlan>> dags) {
}
@Override
- int incrementJobCount(String user, CountType countType) throws IOException {
- try {
- return this.mysqlStore.increaseCount(user, countType);
+ public void checkQuota(Collection<Dag.DagNode<JobExecutionPlan>> dagNodes)
throws IOException {
+ try (Connection connection = this.quotaStore.dataSource.getConnection()) {
+ connection.setAutoCommit(false);
+
+ for (Dag.DagNode<JobExecutionPlan> dagNode : dagNodes) {
+ QuotaCheck quotaCheck = increaseAndCheckQuota(connection, dagNode);
+ if ((!quotaCheck.proxyUserCheck || !quotaCheck.requesterCheck ||
!quotaCheck.flowGroupCheck)) {
+ connection.rollback();
+ throw new QuotaExceededException(quotaCheck.requesterMessage);
+ }
+ }
+ connection.commit();
} catch (SQLException e) {
throw new IOException(e);
}
}
- @Override
- void decrementJobCount(String user, CountType countType) throws IOException {
+ int incrementJobCount(Connection connection, String user, CountType
countType) throws IOException, SQLException {
+ return this.quotaStore.increaseCount(connection, user, countType);
+ }
+
+ void decrementJobCount(Connection connection,String user, CountType
countType) throws IOException, SQLException {
+ this.quotaStore.decreaseCount(connection, user, countType);
+ }
+
+ protected QuotaCheck increaseAndCheckQuota(Connection connection,
Dag.DagNode<JobExecutionPlan> dagNode)
+ throws SQLException, IOException {
+ QuotaCheck quotaCheck = new QuotaCheck(true, true, true, "");
+ StringBuilder requesterMessage = new StringBuilder();
+
+ // Dag is already being tracked, no need to double increment for retries
and multihop flows
+ if (containsDagId(DagManagerUtils.generateDagId(dagNode).toString())) {
+ return quotaCheck;
+ } else {
+ addDagId(connection, DagManagerUtils.generateDagId(dagNode).toString());
+ }
+
+ String proxyUser =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
AzkabanProjectConfig.USER_TO_PROXY, null);
+ String flowGroup =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
+ ConfigurationKeys.FLOW_GROUP_KEY, "");
+ String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+
+ boolean proxyUserCheck;
+
+ if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) {
+ int proxyQuotaIncrement = incrementJobCountAndCheckQuota(connection,
+ DagManagerUtils.getUserQuotaKey(proxyUser, dagNode),
getQuotaForUser(proxyUser), CountType.USER_COUNT);
+ proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check
succeeds
+ quotaCheck.setProxyUserCheck(proxyUserCheck);
+ if (!proxyUserCheck) {
+ // add 1 to proxyUserIncrement since proxyQuotaIncrement is the count
before the increment
+ requesterMessage.append(String.format(
+ "Quota exceeded for proxy user %s on executor %s : quota=%s,
requests above quota=%d%n",
+ proxyUser, specExecutorUri, getQuotaForUser(proxyUser),
Math.abs(proxyQuotaIncrement) + 1 - getQuotaForUser(proxyUser)));
+ }
+ }
+
+ String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
+ boolean requesterCheck = true;
+
+ if (dagNode.getValue().getCurrentAttempts() <= 1) {
+ List<String> uniqueRequesters =
DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
+ for (String requester : uniqueRequesters) {
+ int userQuotaIncrement = incrementJobCountAndCheckQuota(connection,
DagManagerUtils.getUserQuotaKey(requester, dagNode),
+ getQuotaForUser(requester), CountType.REQUESTER_COUNT);
+ boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota
check succeeds
+ requesterCheck = requesterCheck && thisRequesterCheck;
+ quotaCheck.setRequesterCheck(requesterCheck);
+ if (!thisRequesterCheck) {
+ requesterMessage.append(String.format("Quota exceeded for requester
%s on executor %s : quota=%s, requests above quota=%d%n. ",
+ requester, specExecutorUri, getQuotaForUser(requester),
Math.abs(userQuotaIncrement) + 1 - getQuotaForUser(requester)));
+ }
+ }
+ }
+
+ boolean flowGroupCheck;
+
+ if (dagNode.getValue().getCurrentAttempts() <= 1) {
+ int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(connection,
+ DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode),
getQuotaForFlowGroup(flowGroup), CountType.FLOWGROUP_COUNT);
+ flowGroupCheck = flowGroupQuotaIncrement >= 0;
+ quotaCheck.setFlowGroupCheck(flowGroupCheck);
+ if (!flowGroupCheck) {
+ requesterMessage.append(String.format("Quota exceeded for flowgroup %s
on executor %s : quota=%s, requests above quota=%d%n",
+ flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup),
+ Math.abs(flowGroupQuotaIncrement) + 1 -
getQuotaForFlowGroup(flowGroup)));
+ }
+ }
+
+ quotaCheck.setRequesterMessage(requesterMessage.toString());
+
+ return quotaCheck;
+ }
+
+ protected int incrementJobCountAndCheckQuota(Connection connection, String
key, int keyQuota, CountType countType)
+ throws IOException, SQLException {
+ int currentCount = incrementJobCount(connection, key, countType);
+ if (currentCount >= keyQuota) {
+ return -currentCount;
+ } else {
+ return currentCount;
+ }
+ }
+
+ /**
+ * Decrement the quota by one for the proxy user and requesters
corresponding to the provided {@link Dag.DagNode}.
+ * Returns true if the dag existed in the set of running dags and was
removed successfully
+ */
+ public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws
IOException {
+ Connection connection;
try {
- this.mysqlStore.decreaseCount(user, countType);
+ connection = this.quotaStore.dataSource.getConnection();
+ connection.setAutoCommit(false);
} catch (SQLException e) {
throw new IOException(e);
}
+
+ try {
+ boolean val = removeDagId(connection,
DagManagerUtils.generateDagId(dagNode).toString());
+ if (!val) {
+ return false;
+ }
+
+ String proxyUser =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
AzkabanProjectConfig.USER_TO_PROXY, null);
+ if (proxyUser != null) {
+ String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser,
dagNode);
+ decrementJobCount(connection, proxyUserKey, CountType.USER_COUNT);
+ }
+
+ String flowGroup =
+ ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
ConfigurationKeys.FLOW_GROUP_KEY, "");
+ decrementJobCount(connection,
DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode),
CountType.FLOWGROUP_COUNT);
+
+ String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
+ try {
+ for (String requester :
DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters)) {
+ String requesterKey = DagManagerUtils.getUserQuotaKey(requester,
dagNode);
+ decrementJobCount(connection, requesterKey,
CountType.REQUESTER_COUNT);
+ }
+ } catch (IOException e) {
+ log.error("Failed to release quota for requester list " +
serializedRequesters, e);
+ return false;
+ }
+ } catch (SQLException ex) {
+ throw new IOException(ex);
+ } finally {
+ try {
+ connection.close();
Review Comment:
Where did you commit the connection?
Issue Time Tracking
-------------------
Worklog Id: (was: 813781)
Time Spent: 1h 50m (was: 1h 40m)
> avoid double quota usage increment for ad hoc flows
> ---------------------------------------------------
>
> Key: GOBBLIN-1703
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1703
> Project: Apache Gobblin
> Issue Type: Bug
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> When a gaas flow request comes, resource handler checks the quota right there.
> However, if the flow has runImmediately=true, the quota will be checked again
> when the first job starts. This should be avoided.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)