[ 
https://issues.apache.org/jira/browse/GOBBLIN-1703?focusedWorklogId=813088&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-813088
 ]

ASF GitHub Bot logged work on GOBBLIN-1703:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Sep/22 18:12
            Start Date: 28/Sep/22 18:12
    Worklog Time Spent: 10m 
      Work Description: arjun4084346 commented on code in PR #3550:
URL: https://github.com/apache/gobblin/pull/3550#discussion_r982722025


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java:
##########
@@ -47,55 +49,259 @@
 @Slf4j
 @Singleton
 public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
-  private final MysqlQuotaStore mysqlStore;
+  public final MysqlQuotaStore quotaStore;
+  public final RunningDagIdsStore runningDagIds;
+
 
   @Inject
   public MysqlUserQuotaManager(Config config) throws IOException {
     super(config);
-    this.mysqlStore = createQuotaStore(config);
+    this.quotaStore = createQuotaStore(config);
+    this.runningDagIds = createRunningDagStore(config);
+  }
+
+  void addDagId(Connection connection, String dagId) throws IOException {
+    this.runningDagIds.add(connection, dagId);
+  }
+
+  @Override
+  boolean containsDagId(String dagId) throws IOException {
+    return this.runningDagIds.contains(dagId);
+  }
+
+  boolean removeDagId(Connection connection, String dagId) throws IOException {
+    return this.runningDagIds.remove(connection, dagId);
   }
 
   // This implementation does not need to update quota usage when the service 
restarts or it's leadership status changes
   public void init(Collection<Dag<JobExecutionPlan>> dags) {
   }
 
+  int incrementJobCount(Connection connection, String user, CountType 
countType) throws IOException, SQLException {
+    return this.quotaStore.increaseCount(connection, user, countType);
+  }
+
+  void decrementJobCount(Connection connection,String user, CountType 
countType) throws IOException, SQLException {
+      this.quotaStore.decreaseCount(user, countType);
+  }
+
   @Override
-  int incrementJobCount(String user, CountType countType) throws IOException {
+  protected QuotaCheck increaseAndCheckQuota(Dag.DagNode<JobExecutionPlan> 
dagNode) throws IOException {
+    QuotaCheck quotaCheck = new QuotaCheck(true, true, true, "");
+    Connection connection;
     try {
-      return this.mysqlStore.increaseCount(user, countType);
+      connection = this.quotaStore.dataSource.getConnection();
+      connection.setAutoCommit(false);
     } catch (SQLException e) {
       throw new IOException(e);
     }
+    StringBuilder requesterMessage = new StringBuilder();
+
+    // Dag is already being tracked, no need to double increment for retries 
and multihop flows
+    try {
+      if (containsDagId(DagManagerUtils.generateDagId(dagNode).toString())) {
+        return quotaCheck;
+      } else {
+        addDagId(connection, 
DagManagerUtils.generateDagId(dagNode).toString());
+      }
+
+      String proxyUser = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), 
AzkabanProjectConfig.USER_TO_PROXY, null);
+      String flowGroup = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
+          ConfigurationKeys.FLOW_GROUP_KEY, "");
+      String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+
+      boolean proxyUserCheck;
+
+      if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) {
+        int proxyQuotaIncrement = incrementJobCountAndCheckQuota(connection,
+            DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), 
getQuotaForUser(proxyUser), CountType.USER_COUNT);
+        proxyUserCheck = proxyQuotaIncrement >= 0;  // proxy user quota check 
succeeds
+        quotaCheck.setProxyUserCheck(proxyUserCheck);
+        if (!proxyUserCheck) {
+          // add 1 to proxyUserIncrement since proxyQuotaIncrement is the 
count before the increment
+          requesterMessage.append(String.format(
+              "Quota exceeded for proxy user %s on executor %s : quota=%s, 
requests above quota=%d%n",
+              proxyUser, specExecutorUri, getQuotaForUser(proxyUser), 
Math.abs(proxyQuotaIncrement) + 1 - getQuotaForUser(proxyUser)));
+        }
+      }
+
+      String serializedRequesters = 
DagManagerUtils.getSerializedRequesterList(dagNode);
+      boolean requesterCheck = true;
+
+      if (dagNode.getValue().getCurrentAttempts() <= 1) {
+        List<String> uniqueRequesters = 
DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
+        for (String requester : uniqueRequesters) {
+          int userQuotaIncrement = incrementJobCountAndCheckQuota(connection, 
DagManagerUtils.getUserQuotaKey(requester, dagNode),
+              getQuotaForUser(requester), CountType.REQUESTER_COUNT);
+          boolean thisRequesterCheck = userQuotaIncrement >= 0;  // user quota 
check succeeds
+          requesterCheck = requesterCheck && thisRequesterCheck;
+          quotaCheck.setRequesterCheck(requesterCheck);
+          if (!thisRequesterCheck) {
+            requesterMessage.append(String.format("Quota exceeded for 
requester %s on executor %s : quota=%s, requests above quota=%d%n. ",
+                requester, specExecutorUri, getQuotaForUser(requester), 
Math.abs(userQuotaIncrement) + 1 - getQuotaForUser(requester)));
+          }
+        }
+      }
+
+    boolean flowGroupCheck;
+
+    if (dagNode.getValue().getCurrentAttempts() <= 1) {
+      int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(connection,
+          DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), 
getQuotaForFlowGroup(flowGroup), CountType.FLOWGROUP_COUNT);
+      flowGroupCheck = flowGroupQuotaIncrement >= 0;
+      quotaCheck.setFlowGroupCheck(flowGroupCheck);
+      if (!flowGroupCheck) {

Review Comment:
   Do you mean if user check fails, should we immediately stop without trying 
to check proxy/requesterService quota? I think we can, but the existing code 
would check all the quotas anyway to be able to form a complete error message 
(in  `requesterMessage`) so I chose not to disturb that functionality





Issue Time Tracking
-------------------

    Worklog Id:     (was: 813088)
    Time Spent: 1h 10m  (was: 1h)

> avoid double quota usage increment for ad hoc flows
> ---------------------------------------------------
>
>                 Key: GOBBLIN-1703
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1703
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> When a gaas flow request comes, resource handler checks the quota right there.
> However, if the flow has runImmediately=true, the quota will be checked again 
> when the first job starts. This should be avoided.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to