[ 
https://issues.apache.org/jira/browse/GOBBLIN-1691?focusedWorklogId=803754&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-803754
 ]

ASF GitHub Bot logged work on GOBBLIN-1691:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Aug/22 19:22
            Start Date: 25/Aug/22 19:22
    Worklog Time Spent: 10m 
      Work Description: umustafi commented on code in PR #3545:
URL: https://github.com/apache/gobblin/pull/3545#discussion_r955301288


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * An abstract implementation of {@link UserQuotaManager} that

Review Comment:
   looks like this docstring was unfinished



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+
+public class MysqlUserQuotaManagerTest {
+  private static final String USER = "testUser";
+  private static final String PASSWORD = "testPassword";
+  private static final String TABLE = "quotas";
+  private static final String TEST_NAME = "abora";
+  private MysqlUserQuotaManager quotaManager;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
+
+    Config config = ConfigBuilder.create()
+        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY, 
testDb.getJdbcUrl())
+        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+        .build();
+
+    this.quotaManager = new MysqlUserQuotaManager(config);
+  }
+
+  @Test
+  public void testIncreaseCount() throws Exception {
+    int prevCount = this.quotaManager.incrementJobCount(TEST_NAME, 
AbstractUserQuotaManager.CountType.USER_COUNT);
+    Assert.assertEquals(prevCount, 0);
+
+    prevCount = this.quotaManager.incrementJobCount(TEST_NAME, 
AbstractUserQuotaManager.CountType.USER_COUNT);
+    Assert.assertEquals(prevCount, 1);
+    Assert.assertEquals(this.quotaManager.getCount(TEST_NAME, 
AbstractUserQuotaManager.CountType.USER_COUNT), 2);
+
+    prevCount = this.quotaManager.incrementJobCount(TEST_NAME, 
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+    Assert.assertEquals(prevCount, 0);
+
+    prevCount = this.quotaManager.incrementJobCount(TEST_NAME, 
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+    Assert.assertEquals(prevCount, 1);
+  }
+
+  @Test(dependsOnMethods = "testIncreaseCount")
+  public void testDecreaseCount() throws Exception {
+    this.quotaManager.decrementJobCount(TEST_NAME, 
AbstractUserQuotaManager.CountType.USER_COUNT);
+    Assert.assertEquals(this.quotaManager.getCount(TEST_NAME, 
AbstractUserQuotaManager.CountType.USER_COUNT), 1);
+
+    this.quotaManager.decrementJobCount(TEST_NAME, 
AbstractUserQuotaManager.CountType.USER_COUNT);
+    Assert.assertEquals(this.quotaManager.getCount(TEST_NAME, 
AbstractUserQuotaManager.CountType.USER_COUNT), 0);
+
+    this.quotaManager.decrementJobCount(TEST_NAME, 
AbstractUserQuotaManager.CountType.USER_COUNT);
+    Assert.assertEquals(this.quotaManager.getCount(TEST_NAME, 
AbstractUserQuotaManager.CountType.USER_COUNT), 0);
+
+    this.quotaManager.decrementJobCount(TEST_NAME, 
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+    Assert.assertEquals(this.quotaManager.getCount(TEST_NAME, 
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT), 1);
+    this.quotaManager.decrementJobCount(TEST_NAME, 
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+    Assert.assertEquals(this.quotaManager.getCount(TEST_NAME, 
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT), -1);
+  }
+
+  class ChangeCountRunnable implements Runnable {
+    boolean increaseOrDecrease;
+
+    public ChangeCountRunnable(boolean increaseOrDecrease) {
+      this.increaseOrDecrease = increaseOrDecrease;
+    }
+
+    @Override
+    public void run() {
+      int i = 0;
+      while (i++ < 1000) {
+        try {
+          if (increaseOrDecrease) {
+            
MysqlUserQuotaManagerTest.this.quotaManager.incrementJobCount(TEST_NAME, 
AbstractUserQuotaManager.CountType.USER_COUNT);
+          } else {
+            
MysqlUserQuotaManagerTest.this.quotaManager.decrementJobCount(TEST_NAME, 
AbstractUserQuotaManager.CountType.USER_COUNT);
+          }
+        } catch (IOException e) {
+          Assert.fail("Thread got an exception.", e);
+        }
+      }
+    }
+  }
+
+  @Test(dependsOnMethods = "testDecreaseCount")
+  public void testConcurrentChanges() throws IOException, InterruptedException 
{
+    Runnable increaseCountRunnable = new ChangeCountRunnable(true);
+    Runnable decreaseCountRunnable = new ChangeCountRunnable(false);
+    Thread thread1 = new Thread(increaseCountRunnable);
+    Thread thread2 = new Thread(increaseCountRunnable);
+    Thread thread3 = new Thread(increaseCountRunnable);
+    Thread thread4 = new Thread(decreaseCountRunnable);
+    Thread thread5 = new Thread(decreaseCountRunnable);
+    Thread thread6 = new Thread(decreaseCountRunnable);
+
+    thread1.start();
+    thread2.start();
+    thread3.start();
+    thread1.join();
+    thread2.join();
+    thread3.join();
+    Assert.assertEquals(this.quotaManager.getCount(TEST_NAME, 
AbstractUserQuotaManager.CountType.USER_COUNT), 3000);
+    thread4.start();
+    thread5.start();
+    thread6.start();
+    thread4.join();
+    thread5.join();
+    thread6.join();
+    Assert.assertEquals(this.quotaManager.getCount(TEST_NAME, 
AbstractUserQuotaManager.CountType.USER_COUNT), -1);

Review Comment:
   why does this become -1, should it decrease by 3?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * An abstract implementation of {@link UserQuotaManager} that
+ */
+@Slf4j
+abstract public class AbstractUserQuotaManager implements UserQuotaManager {
+  public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + 
"perUserQuota";
+  public static final String PER_FLOWGROUP_QUOTA = 
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
+  public static final String USER_JOB_QUOTA_KEY = 
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
+  public static final String QUOTA_SEPERATOR = ":";
+  public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
+  private final Map<String, Integer> perUserQuota;
+  private final Map<String, Integer> perFlowGroupQuota;
+  Set<String> runningDagIds = ConcurrentHashMap.newKeySet();
+  private final int defaultQuota;
+
+  public AbstractUserQuotaManager(Config config) {
+    this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, 
DEFAULT_USER_JOB_QUOTA);
+    ImmutableMap.Builder<String, Integer> userMapBuilder = 
ImmutableMap.builder();
+    ImmutableMap.Builder<String, Integer> flowGroupMapBuilder = 
ImmutableMap.builder();
+    // Quotas will take form of user:<Quota> and flowGroup:<Quota>
+    for (String flowGroupQuota : ConfigUtils.getStringList(config, 
PER_FLOWGROUP_QUOTA)) {
+      flowGroupMapBuilder.put(flowGroupQuota.split(QUOTA_SEPERATOR)[0], 
Integer.parseInt(flowGroupQuota.split(QUOTA_SEPERATOR)[1]));
+    }
+    // Keep quotas per user as well in form user:<Quota> which apply for all 
flowgroups
+    for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA)) 
{
+      userMapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0], 
Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
+    }
+    this.perUserQuota = userMapBuilder.build();
+    this.perFlowGroupQuota = flowGroupMapBuilder.build();
+  }
+
+  abstract int incrementJobCount(String key, CountType countType) throws 
IOException;
+
+  abstract void decrementJobCount(String user, CountType countType) throws 
IOException;
+
+  public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean 
onInit) throws IOException {
+    // Dag is already being tracked, no need to double increment for retries 
and multihop flows
+    if (isDagCurrentlyRunning(dagNode)) {
+      return;
+    }
+    String proxyUser = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), 
AzkabanProjectConfig.USER_TO_PROXY, null);
+    String flowGroup = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
+        ConfigurationKeys.FLOW_GROUP_KEY, "");
+    String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+    boolean proxyUserCheck = true;
+    Set<String> usersQuotaIncrement = new HashSet<>(); // holds the users for 
which quota is increased
+    StringBuilder requesterMessage = new StringBuilder();
+    runningDagIds.add(DagManagerUtils.generateDagId(dagNode));
+    if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) {
+      int proxyQuotaIncrement = incrementJobCountAndCheckQuota(
+          DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), 
getQuotaForUser(proxyUser), CountType.USER_COUNT);
+      proxyUserCheck = proxyQuotaIncrement >= 0;  // proxy user quota check 
succeeds
+      if (!proxyUserCheck) {
+        // add 1 to proxyUserIncrement since count starts at 0, and is 
negative if quota is exceeded
+        requesterMessage.append(String.format(
+            "Quota exceeded for proxy user %s on executor %s : quota=%s, 
requests above quota=%d%n",
+            proxyUser, specExecutorUri, getQuotaForUser(proxyUser), 
Math.abs(proxyQuotaIncrement)+1-getQuotaForUser(proxyUser)));
+      }
+    }
+
+    String serializedRequesters = 
DagManagerUtils.getSerializedRequesterList(dagNode);
+    boolean requesterCheck = true;
+
+    if (serializedRequesters != null && 
dagNode.getValue().getCurrentAttempts() <= 1) {
+      List<String> uniqueRequesters = 
DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
+      for (String requester : uniqueRequesters) {
+        int userQuotaIncrement = incrementJobCountAndCheckQuota(
+            DagManagerUtils.getUserQuotaKey(requester, dagNode), 
getQuotaForUser(requester), CountType.REQUESTER_COUNT);
+        boolean thisRequesterCheck = userQuotaIncrement >= 0;  // user quota 
check succeeds
+        usersQuotaIncrement.add(requester);
+        requesterCheck = requesterCheck && thisRequesterCheck;
+        if (!thisRequesterCheck) {
+          requesterMessage.append(String.format(
+              "Quota exceeded for requester %s on executor %s : quota=%s, 
requests above quota=%d%n",
+              requester, specExecutorUri, getQuotaForUser(requester), 
Math.abs(userQuotaIncrement)-getQuotaForUser(requester)));
+        }
+      }
+    }
+
+    boolean flowGroupCheck = true;
+    if (dagNode.getValue().getCurrentAttempts() <= 1) {
+      int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(
+          DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), 
getQuotaForFlowGroup(flowGroup), CountType.FLOW_COUNT);
+      flowGroupCheck = flowGroupQuotaIncrement >= 0;
+      if (!flowGroupCheck) {
+        requesterMessage.append(String.format("Quota exceeded for flowgroup %s 
on executor %s : quota=%s, requests above quota=%d%n",
+            flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup),
+            Math.abs(flowGroupQuotaIncrement) + 1 - 
getQuotaForFlowGroup(flowGroup)));
+      }
+    }
+
+    // Throw errors for reach quota at the end to avoid inconsistent job counts

Review Comment:
   nice! this is a good catch



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * An abstract implementation of {@link UserQuotaManager} that
+ */
+@Slf4j
+abstract public class AbstractUserQuotaManager implements UserQuotaManager {
+  public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + 
"perUserQuota";
+  public static final String PER_FLOWGROUP_QUOTA = 
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
+  public static final String USER_JOB_QUOTA_KEY = 
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
+  public static final String QUOTA_SEPERATOR = ":";
+  public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
+  private final Map<String, Integer> perUserQuota;
+  private final Map<String, Integer> perFlowGroupQuota;
+  Set<String> runningDagIds = ConcurrentHashMap.newKeySet();
+  private final int defaultQuota;
+
+  public AbstractUserQuotaManager(Config config) {
+    this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, 
DEFAULT_USER_JOB_QUOTA);

Review Comment:
   where is this being used



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManager.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.inject.Singleton;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * An implementation of {@link UserQuotaManager} that stores quota usage in 
memory.
+ */
+@Slf4j
+@Singleton
+public class InMemoryUserQuotaManager extends AbstractUserQuotaManager {
+  private final Map<String, Integer> proxyUserToJobCount = new 
ConcurrentHashMap<>();
+  private final Map<String, Integer> flowGroupToJobCount = new 
ConcurrentHashMap<>();
+  private final Map<String, Integer> requesterToJobCount = new 
ConcurrentHashMap<>();
+
+  @Inject
+  public InMemoryUserQuotaManager(Config config) {
+    super(config);
+  }
+
+  private int incrementJobCount(String key, Map<String, Integer> quotaMap) {
+    Integer currentCount;
+    // Modifications must be thread safe since DAGs on DagManagerThreads may 
update the quota for the same user
+    do {
+      currentCount = quotaMap.get(key);
+    } while (currentCount == null ? quotaMap.putIfAbsent(key, 1) != null : 
!quotaMap.replace(key, currentCount, currentCount + 1));
+
+    if (currentCount == null) {
+      currentCount = 0;
+    }
+
+    return currentCount;
+  }
+
+  private void decrementJobCount(String key, Map<String, Integer> quotaMap)  {
+    Integer currentCount;
+    if (key == null) {
+      return;
+    }
+    do {
+      currentCount = quotaMap.get(key);
+    } while (currentCount != null && currentCount > 0 && 
!quotaMap.replace(key, currentCount, currentCount - 1));

Review Comment:
   let's add an exception or ERROR log if there's a case decrement job count is 
called and `currentCount` is 0 or negative. We had seem some negative flow 
counts after host restart that were unexpected, so we want to be aware of odd 
behavior



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * An abstract implementation of {@link UserQuotaManager} that
+ */
+@Slf4j
+abstract public class AbstractUserQuotaManager implements UserQuotaManager {
+  public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + 
"perUserQuota";
+  public static final String PER_FLOWGROUP_QUOTA = 
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
+  public static final String USER_JOB_QUOTA_KEY = 
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
+  public static final String QUOTA_SEPERATOR = ":";
+  public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
+  private final Map<String, Integer> perUserQuota;
+  private final Map<String, Integer> perFlowGroupQuota;
+  Set<String> runningDagIds = ConcurrentHashMap.newKeySet();
+  private final int defaultQuota;
+
+  public AbstractUserQuotaManager(Config config) {
+    this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, 
DEFAULT_USER_JOB_QUOTA);
+    ImmutableMap.Builder<String, Integer> userMapBuilder = 
ImmutableMap.builder();
+    ImmutableMap.Builder<String, Integer> flowGroupMapBuilder = 
ImmutableMap.builder();
+    // Quotas will take form of user:<Quota> and flowGroup:<Quota>
+    for (String flowGroupQuota : ConfigUtils.getStringList(config, 
PER_FLOWGROUP_QUOTA)) {
+      flowGroupMapBuilder.put(flowGroupQuota.split(QUOTA_SEPERATOR)[0], 
Integer.parseInt(flowGroupQuota.split(QUOTA_SEPERATOR)[1]));
+    }
+    // Keep quotas per user as well in form user:<Quota> which apply for all 
flowgroups
+    for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA)) 
{
+      userMapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0], 
Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
+    }
+    this.perUserQuota = userMapBuilder.build();
+    this.perFlowGroupQuota = flowGroupMapBuilder.build();
+  }
+
+  abstract int incrementJobCount(String key, CountType countType) throws 
IOException;
+
+  abstract void decrementJobCount(String user, CountType countType) throws 
IOException;
+
+  public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean 
onInit) throws IOException {
+    // Dag is already being tracked, no need to double increment for retries 
and multihop flows
+    if (isDagCurrentlyRunning(dagNode)) {
+      return;
+    }
+    String proxyUser = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), 
AzkabanProjectConfig.USER_TO_PROXY, null);
+    String flowGroup = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
+        ConfigurationKeys.FLOW_GROUP_KEY, "");
+    String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+    boolean proxyUserCheck = true;
+    Set<String> usersQuotaIncrement = new HashSet<>(); // holds the users for 
which quota is increased
+    StringBuilder requesterMessage = new StringBuilder();
+    runningDagIds.add(DagManagerUtils.generateDagId(dagNode));
+    if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) {
+      int proxyQuotaIncrement = incrementJobCountAndCheckQuota(
+          DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), 
getQuotaForUser(proxyUser), CountType.USER_COUNT);
+      proxyUserCheck = proxyQuotaIncrement >= 0;  // proxy user quota check 
succeeds
+      if (!proxyUserCheck) {
+        // add 1 to proxyUserIncrement since count starts at 0, and is 
negative if quota is exceeded
+        requesterMessage.append(String.format(
+            "Quota exceeded for proxy user %s on executor %s : quota=%s, 
requests above quota=%d%n",
+            proxyUser, specExecutorUri, getQuotaForUser(proxyUser), 
Math.abs(proxyQuotaIncrement)+1-getQuotaForUser(proxyUser)));
+      }
+    }
+
+    String serializedRequesters = 
DagManagerUtils.getSerializedRequesterList(dagNode);
+    boolean requesterCheck = true;
+
+    if (serializedRequesters != null && 
dagNode.getValue().getCurrentAttempts() <= 1) {
+      List<String> uniqueRequesters = 
DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
+      for (String requester : uniqueRequesters) {
+        int userQuotaIncrement = incrementJobCountAndCheckQuota(
+            DagManagerUtils.getUserQuotaKey(requester, dagNode), 
getQuotaForUser(requester), CountType.REQUESTER_COUNT);
+        boolean thisRequesterCheck = userQuotaIncrement >= 0;  // user quota 
check succeeds
+        usersQuotaIncrement.add(requester);
+        requesterCheck = requesterCheck && thisRequesterCheck;
+        if (!thisRequesterCheck) {
+          requesterMessage.append(String.format(
+              "Quota exceeded for requester %s on executor %s : quota=%s, 
requests above quota=%d%n",

Review Comment:
   for requester check, should we also not add 1?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+
+public class MysqlUserQuotaManagerTest {
+  private static final String USER = "testUser";
+  private static final String PASSWORD = "testPassword";
+  private static final String TABLE = "quotas";
+  private static final String TEST_NAME = "abora";

Review Comment:
   rename this variable to make it more clear if this is the flow's name or 
user/requester/flowGroup name





Issue Time Tracking
-------------------

    Worklog Id:     (was: 803754)
    Time Spent: 0.5h  (was: 20m)

> add a mysql based user quota manager
> ------------------------------------
>
>                 Key: GOBBLIN-1691
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1691
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> mysql based user quota manager can share the usage data with other instances 
> of Gobblin Service



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to