[
https://issues.apache.org/jira/browse/GOBBLIN-1703?focusedWorklogId=813081&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-813081
]
ASF GitHub Bot logged work on GOBBLIN-1703:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 28/Sep/22 17:56
Start Date: 28/Sep/22 17:56
Worklog Time Spent: 10m
Work Description: ZihanLi58 commented on code in PR #3550:
URL: https://github.com/apache/gobblin/pull/3550#discussion_r982706347
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java:
##########
@@ -181,6 +190,19 @@ private AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) {
// always try to compile the flow to verify if it is compilable
Dag<JobExecutionPlan> dag = this.compileFlow(flowSpec);
+
+ if (this.warmStandbyEnabled &&
+
(!flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)
|| PropertiesUtils.getPropAsBoolean(flowSpec.getConfigAsProperties(),
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
+ try {
+ for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getStartNodes()) {
+ userQuotaManager.checkQuota(dagNode);
+
flowSpec.getConfigAsProperties().setProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW,
"true");
Review Comment:
why this is part of the for loop?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java:
##########
@@ -181,6 +190,19 @@ private AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) {
// always try to compile the flow to verify if it is compilable
Dag<JobExecutionPlan> dag = this.compileFlow(flowSpec);
+
+ if (this.warmStandbyEnabled &&
+
(!flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)
|| PropertiesUtils.getPropAsBoolean(flowSpec.getConfigAsProperties(),
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
+ try {
+ for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getStartNodes()) {
+ userQuotaManager.checkQuota(dagNode);
Review Comment:
I hope we can make the check for all starting nodes atomic as well. So any
node fail the check, we need to rollback the previous one
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java:
##########
@@ -47,55 +49,259 @@
@Slf4j
@Singleton
public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
- private final MysqlQuotaStore mysqlStore;
+ public final MysqlQuotaStore quotaStore;
+ public final RunningDagIdsStore runningDagIds;
+
@Inject
public MysqlUserQuotaManager(Config config) throws IOException {
super(config);
- this.mysqlStore = createQuotaStore(config);
+ this.quotaStore = createQuotaStore(config);
+ this.runningDagIds = createRunningDagStore(config);
+ }
+
+ void addDagId(Connection connection, String dagId) throws IOException {
+ this.runningDagIds.add(connection, dagId);
+ }
+
+ @Override
+ boolean containsDagId(String dagId) throws IOException {
+ return this.runningDagIds.contains(dagId);
+ }
+
+ boolean removeDagId(Connection connection, String dagId) throws IOException {
+ return this.runningDagIds.remove(connection, dagId);
}
// This implementation does not need to update quota usage when the service
restarts or it's leadership status changes
public void init(Collection<Dag<JobExecutionPlan>> dags) {
}
+ int incrementJobCount(Connection connection, String user, CountType
countType) throws IOException, SQLException {
+ return this.quotaStore.increaseCount(connection, user, countType);
+ }
+
+ void decrementJobCount(Connection connection,String user, CountType
countType) throws IOException, SQLException {
+ this.quotaStore.decreaseCount(user, countType);
+ }
+
@Override
- int incrementJobCount(String user, CountType countType) throws IOException {
+ protected QuotaCheck increaseAndCheckQuota(Dag.DagNode<JobExecutionPlan>
dagNode) throws IOException {
+ QuotaCheck quotaCheck = new QuotaCheck(true, true, true, "");
+ Connection connection;
try {
- return this.mysqlStore.increaseCount(user, countType);
+ connection = this.quotaStore.dataSource.getConnection();
+ connection.setAutoCommit(false);
} catch (SQLException e) {
throw new IOException(e);
}
+ StringBuilder requesterMessage = new StringBuilder();
+
+ // Dag is already being tracked, no need to double increment for retries
and multihop flows
+ try {
+ if (containsDagId(DagManagerUtils.generateDagId(dagNode).toString())) {
+ return quotaCheck;
+ } else {
+ addDagId(connection,
DagManagerUtils.generateDagId(dagNode).toString());
+ }
+
+ String proxyUser =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
AzkabanProjectConfig.USER_TO_PROXY, null);
+ String flowGroup =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
+ ConfigurationKeys.FLOW_GROUP_KEY, "");
+ String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+
+ boolean proxyUserCheck;
+
+ if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) {
+ int proxyQuotaIncrement = incrementJobCountAndCheckQuota(connection,
+ DagManagerUtils.getUserQuotaKey(proxyUser, dagNode),
getQuotaForUser(proxyUser), CountType.USER_COUNT);
+ proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check
succeeds
+ quotaCheck.setProxyUserCheck(proxyUserCheck);
+ if (!proxyUserCheck) {
+ // add 1 to proxyUserIncrement since proxyQuotaIncrement is the
count before the increment
+ requesterMessage.append(String.format(
+ "Quota exceeded for proxy user %s on executor %s : quota=%s,
requests above quota=%d%n",
+ proxyUser, specExecutorUri, getQuotaForUser(proxyUser),
Math.abs(proxyQuotaIncrement) + 1 - getQuotaForUser(proxyUser)));
+ }
+ }
+
+ String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
+ boolean requesterCheck = true;
+
+ if (dagNode.getValue().getCurrentAttempts() <= 1) {
+ List<String> uniqueRequesters =
DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
+ for (String requester : uniqueRequesters) {
+ int userQuotaIncrement = incrementJobCountAndCheckQuota(connection,
DagManagerUtils.getUserQuotaKey(requester, dagNode),
+ getQuotaForUser(requester), CountType.REQUESTER_COUNT);
+ boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota
check succeeds
+ requesterCheck = requesterCheck && thisRequesterCheck;
+ quotaCheck.setRequesterCheck(requesterCheck);
+ if (!thisRequesterCheck) {
+ requesterMessage.append(String.format("Quota exceeded for
requester %s on executor %s : quota=%s, requests above quota=%d%n. ",
+ requester, specExecutorUri, getQuotaForUser(requester),
Math.abs(userQuotaIncrement) + 1 - getQuotaForUser(requester)));
+ }
+ }
+ }
+
+ boolean flowGroupCheck;
+
+ if (dagNode.getValue().getCurrentAttempts() <= 1) {
+ int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(connection,
+ DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode),
getQuotaForFlowGroup(flowGroup), CountType.FLOWGROUP_COUNT);
+ flowGroupCheck = flowGroupQuotaIncrement >= 0;
+ quotaCheck.setFlowGroupCheck(flowGroupCheck);
+ if (!flowGroupCheck) {
Review Comment:
For mysql quota, if we fail to check any of the quota type, shouldn't we
directly rollback the change in this method?
Issue Time Tracking
-------------------
Worklog Id: (was: 813081)
Time Spent: 1h (was: 50m)
> avoid double quota usage increment for ad hoc flows
> ---------------------------------------------------
>
> Key: GOBBLIN-1703
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1703
> Project: Apache Gobblin
> Issue Type: Bug
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 1h
> Remaining Estimate: 0h
>
> When a gaas flow request comes, resource handler checks the quota right there.
> However, if the flow has runImmediately=true, the quota will be checked again
> when the first job starts. This should be avoided.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)