[ 
https://issues.apache.org/jira/browse/GOBBLIN-1691?focusedWorklogId=803798&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-803798
 ]

ASF GitHub Bot logged work on GOBBLIN-1691:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Aug/22 22:07
            Start Date: 25/Aug/22 22:07
    Worklog Time Spent: 10m 
      Work Description: arjun4084346 commented on code in PR #3545:
URL: https://github.com/apache/gobblin/pull/3545#discussion_r955460218


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * An abstract implementation of {@link UserQuotaManager} that
+ */
+@Slf4j
+abstract public class AbstractUserQuotaManager implements UserQuotaManager {
+  public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + 
"perUserQuota";
+  public static final String PER_FLOWGROUP_QUOTA = 
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
+  public static final String USER_JOB_QUOTA_KEY = 
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
+  public static final String QUOTA_SEPERATOR = ":";
+  public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
+  private final Map<String, Integer> perUserQuota;
+  private final Map<String, Integer> perFlowGroupQuota;
+  Set<String> runningDagIds = ConcurrentHashMap.newKeySet();
+  private final int defaultQuota;
+
+  public AbstractUserQuotaManager(Config config) {
+    this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, 
DEFAULT_USER_JOB_QUOTA);
+    ImmutableMap.Builder<String, Integer> userMapBuilder = 
ImmutableMap.builder();
+    ImmutableMap.Builder<String, Integer> flowGroupMapBuilder = 
ImmutableMap.builder();
+    // Quotas will take form of user:<Quota> and flowGroup:<Quota>
+    for (String flowGroupQuota : ConfigUtils.getStringList(config, 
PER_FLOWGROUP_QUOTA)) {
+      flowGroupMapBuilder.put(flowGroupQuota.split(QUOTA_SEPERATOR)[0], 
Integer.parseInt(flowGroupQuota.split(QUOTA_SEPERATOR)[1]));
+    }
+    // Keep quotas per user as well in form user:<Quota> which apply for all 
flowgroups
+    for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA)) 
{
+      userMapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0], 
Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
+    }
+    this.perUserQuota = userMapBuilder.build();
+    this.perFlowGroupQuota = flowGroupMapBuilder.build();
+  }
+
+  abstract int incrementJobCount(String key, CountType countType) throws 
IOException;
+
+  abstract void decrementJobCount(String user, CountType countType) throws 
IOException;
+
+  public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean 
onInit) throws IOException {
+    // Dag is already being tracked, no need to double increment for retries 
and multihop flows
+    if (isDagCurrentlyRunning(dagNode)) {
+      return;
+    }
+    String proxyUser = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), 
AzkabanProjectConfig.USER_TO_PROXY, null);
+    String flowGroup = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
+        ConfigurationKeys.FLOW_GROUP_KEY, "");
+    String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+    boolean proxyUserCheck = true;
+    Set<String> usersQuotaIncrement = new HashSet<>(); // holds the users for 
which quota is increased
+    StringBuilder requesterMessage = new StringBuilder();
+    runningDagIds.add(DagManagerUtils.generateDagId(dagNode));
+    if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) {
+      int proxyQuotaIncrement = incrementJobCountAndCheckQuota(
+          DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), 
getQuotaForUser(proxyUser), CountType.USER_COUNT);
+      proxyUserCheck = proxyQuotaIncrement >= 0;  // proxy user quota check 
succeeds
+      if (!proxyUserCheck) {
+        // add 1 to proxyUserIncrement since count starts at 0, and is 
negative if quota is exceeded
+        requesterMessage.append(String.format(
+            "Quota exceeded for proxy user %s on executor %s : quota=%s, 
requests above quota=%d%n",
+            proxyUser, specExecutorUri, getQuotaForUser(proxyUser), 
Math.abs(proxyQuotaIncrement)+1-getQuotaForUser(proxyUser)));
+      }
+    }
+
+    String serializedRequesters = 
DagManagerUtils.getSerializedRequesterList(dagNode);
+    boolean requesterCheck = true;
+
+    if (serializedRequesters != null && 
dagNode.getValue().getCurrentAttempts() <= 1) {
+      List<String> uniqueRequesters = 
DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
+      for (String requester : uniqueRequesters) {
+        int userQuotaIncrement = incrementJobCountAndCheckQuota(
+            DagManagerUtils.getUserQuotaKey(requester, dagNode), 
getQuotaForUser(requester), CountType.REQUESTER_COUNT);
+        boolean thisRequesterCheck = userQuotaIncrement >= 0;  // user quota 
check succeeds
+        usersQuotaIncrement.add(requester);
+        requesterCheck = requesterCheck && thisRequesterCheck;
+        if (!thisRequesterCheck) {
+          requesterMessage.append(String.format(
+              "Quota exceeded for requester %s on executor %s : quota=%s, 
requests above quota=%d%n",
+              requester, specExecutorUri, getQuotaForUser(requester), 
Math.abs(userQuotaIncrement)-getQuotaForUser(requester)));
+        }
+      }
+    }
+
+    boolean flowGroupCheck = true;
+    if (dagNode.getValue().getCurrentAttempts() <= 1) {
+      int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(
+          DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), 
getQuotaForFlowGroup(flowGroup), CountType.FLOW_COUNT);
+      flowGroupCheck = flowGroupQuotaIncrement >= 0;
+      if (!flowGroupCheck) {
+        requesterMessage.append(String.format("Quota exceeded for flowgroup %s 
on executor %s : quota=%s, requests above quota=%d%n",
+            flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup),
+            Math.abs(flowGroupQuotaIncrement) + 1 - 
getQuotaForFlowGroup(flowGroup)));
+      }
+    }
+
+    // Throw errors for reach quota at the end to avoid inconsistent job counts

Review Comment:
   Thanks, though it was probably originally added by Jack (not sure) before a 
couple of refactoring.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 803798)
    Time Spent: 1h  (was: 50m)

> add a mysql based user quota manager
> ------------------------------------
>
>                 Key: GOBBLIN-1691
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1691
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> mysql based user quota manager can share the usage data with other instances 
> of Gobblin Service



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to