[ 
https://issues.apache.org/jira/browse/GOBBLIN-1703?focusedWorklogId=813155&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-813155
 ]

ASF GitHub Bot logged work on GOBBLIN-1703:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Sep/22 22:43
            Start Date: 28/Sep/22 22:43
    Worklog Time Spent: 10m 
      Work Description: ZihanLi58 commented on code in PR #3550:
URL: https://github.com/apache/gobblin/pull/3550#discussion_r982919303


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java:
##########
@@ -47,55 +49,259 @@
 @Slf4j
 @Singleton
 public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
-  private final MysqlQuotaStore mysqlStore;
+  public final MysqlQuotaStore quotaStore;
+  public final RunningDagIdsStore runningDagIds;
+
 
   @Inject
   public MysqlUserQuotaManager(Config config) throws IOException {
     super(config);
-    this.mysqlStore = createQuotaStore(config);
+    this.quotaStore = createQuotaStore(config);
+    this.runningDagIds = createRunningDagStore(config);
+  }
+
+  void addDagId(Connection connection, String dagId) throws IOException {
+    this.runningDagIds.add(connection, dagId);
+  }
+
+  @Override
+  boolean containsDagId(String dagId) throws IOException {
+    return this.runningDagIds.contains(dagId);
+  }
+
+  boolean removeDagId(Connection connection, String dagId) throws IOException {
+    return this.runningDagIds.remove(connection, dagId);
   }
 
   // This implementation does not need to update quota usage when the service 
restarts or it's leadership status changes
   public void init(Collection<Dag<JobExecutionPlan>> dags) {
   }
 
+  int incrementJobCount(Connection connection, String user, CountType 
countType) throws IOException, SQLException {
+    return this.quotaStore.increaseCount(connection, user, countType);
+  }
+
+  void decrementJobCount(Connection connection,String user, CountType 
countType) throws IOException, SQLException {
+      this.quotaStore.decreaseCount(user, countType);
+  }
+
   @Override
-  int incrementJobCount(String user, CountType countType) throws IOException {
+  protected QuotaCheck increaseAndCheckQuota(Dag.DagNode<JobExecutionPlan> 
dagNode) throws IOException {
+    QuotaCheck quotaCheck = new QuotaCheck(true, true, true, "");
+    Connection connection;
     try {
-      return this.mysqlStore.increaseCount(user, countType);
+      connection = this.quotaStore.dataSource.getConnection();
+      connection.setAutoCommit(false);
     } catch (SQLException e) {
       throw new IOException(e);
     }
+    StringBuilder requesterMessage = new StringBuilder();
+
+    // Dag is already being tracked, no need to double increment for retries 
and multihop flows
+    try {
+      if (containsDagId(DagManagerUtils.generateDagId(dagNode).toString())) {
+        return quotaCheck;
+      } else {
+        addDagId(connection, 
DagManagerUtils.generateDagId(dagNode).toString());
+      }
+
+      String proxyUser = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), 
AzkabanProjectConfig.USER_TO_PROXY, null);
+      String flowGroup = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
+          ConfigurationKeys.FLOW_GROUP_KEY, "");
+      String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+
+      boolean proxyUserCheck;
+
+      if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) {
+        int proxyQuotaIncrement = incrementJobCountAndCheckQuota(connection,
+            DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), 
getQuotaForUser(proxyUser), CountType.USER_COUNT);
+        proxyUserCheck = proxyQuotaIncrement >= 0;  // proxy user quota check 
succeeds
+        quotaCheck.setProxyUserCheck(proxyUserCheck);
+        if (!proxyUserCheck) {
+          // add 1 to proxyUserIncrement since proxyQuotaIncrement is the 
count before the increment
+          requesterMessage.append(String.format(
+              "Quota exceeded for proxy user %s on executor %s : quota=%s, 
requests above quota=%d%n",
+              proxyUser, specExecutorUri, getQuotaForUser(proxyUser), 
Math.abs(proxyQuotaIncrement) + 1 - getQuotaForUser(proxyUser)));
+        }
+      }
+
+      String serializedRequesters = 
DagManagerUtils.getSerializedRequesterList(dagNode);
+      boolean requesterCheck = true;
+
+      if (dagNode.getValue().getCurrentAttempts() <= 1) {
+        List<String> uniqueRequesters = 
DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
+        for (String requester : uniqueRequesters) {
+          int userQuotaIncrement = incrementJobCountAndCheckQuota(connection, 
DagManagerUtils.getUserQuotaKey(requester, dagNode),
+              getQuotaForUser(requester), CountType.REQUESTER_COUNT);
+          boolean thisRequesterCheck = userQuotaIncrement >= 0;  // user quota 
check succeeds
+          requesterCheck = requesterCheck && thisRequesterCheck;
+          quotaCheck.setRequesterCheck(requesterCheck);
+          if (!thisRequesterCheck) {
+            requesterMessage.append(String.format("Quota exceeded for 
requester %s on executor %s : quota=%s, requests above quota=%d%n. ",
+                requester, specExecutorUri, getQuotaForUser(requester), 
Math.abs(userQuotaIncrement) + 1 - getQuotaForUser(requester)));
+          }
+        }
+      }
+
+    boolean flowGroupCheck;
+
+    if (dagNode.getValue().getCurrentAttempts() <= 1) {
+      int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(connection,
+          DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), 
getQuotaForFlowGroup(flowGroup), CountType.FLOWGROUP_COUNT);
+      flowGroupCheck = flowGroupQuotaIncrement >= 0;
+      quotaCheck.setFlowGroupCheck(flowGroupCheck);
+      if (!flowGroupCheck) {

Review Comment:
   I don't mean that, we still process to the end, but before commit the 
change, we check whether all three succeed, if not, instead of commit, we 
should revert the change at this point. But we still return all check result to 
user. 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 813155)
    Time Spent: 1h 20m  (was: 1h 10m)

> avoid double quota usage increment for ad hoc flows
> ---------------------------------------------------
>
>                 Key: GOBBLIN-1703
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1703
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> When a gaas flow request comes, resource handler checks the quota right there.
> However, if the flow has runImmediately=true, the quota will be checked again 
> when the first job starts. This should be avoided.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to