[
https://issues.apache.org/jira/browse/GOBBLIN-1691?focusedWorklogId=803754&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-803754
]
ASF GitHub Bot logged work on GOBBLIN-1691:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 25/Aug/22 19:22
Start Date: 25/Aug/22 19:22
Worklog Time Spent: 10m
Work Description: umustafi commented on code in PR #3545:
URL: https://github.com/apache/gobblin/pull/3545#discussion_r955301288
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * An abstract implementation of {@link UserQuotaManager} that
Review Comment:
looks like this docstring was unfinished
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+
+public class MysqlUserQuotaManagerTest {
+ private static final String USER = "testUser";
+ private static final String PASSWORD = "testPassword";
+ private static final String TABLE = "quotas";
+ private static final String TEST_NAME = "abora";
+ private MysqlUserQuotaManager quotaManager;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
+
+ Config config = ConfigBuilder.create()
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY,
testDb.getJdbcUrl())
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+ .build();
+
+ this.quotaManager = new MysqlUserQuotaManager(config);
+ }
+
+ @Test
+ public void testIncreaseCount() throws Exception {
+ int prevCount = this.quotaManager.incrementJobCount(TEST_NAME,
AbstractUserQuotaManager.CountType.USER_COUNT);
+ Assert.assertEquals(prevCount, 0);
+
+ prevCount = this.quotaManager.incrementJobCount(TEST_NAME,
AbstractUserQuotaManager.CountType.USER_COUNT);
+ Assert.assertEquals(prevCount, 1);
+ Assert.assertEquals(this.quotaManager.getCount(TEST_NAME,
AbstractUserQuotaManager.CountType.USER_COUNT), 2);
+
+ prevCount = this.quotaManager.incrementJobCount(TEST_NAME,
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+ Assert.assertEquals(prevCount, 0);
+
+ prevCount = this.quotaManager.incrementJobCount(TEST_NAME,
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+ Assert.assertEquals(prevCount, 1);
+ }
+
+ @Test(dependsOnMethods = "testIncreaseCount")
+ public void testDecreaseCount() throws Exception {
+ this.quotaManager.decrementJobCount(TEST_NAME,
AbstractUserQuotaManager.CountType.USER_COUNT);
+ Assert.assertEquals(this.quotaManager.getCount(TEST_NAME,
AbstractUserQuotaManager.CountType.USER_COUNT), 1);
+
+ this.quotaManager.decrementJobCount(TEST_NAME,
AbstractUserQuotaManager.CountType.USER_COUNT);
+ Assert.assertEquals(this.quotaManager.getCount(TEST_NAME,
AbstractUserQuotaManager.CountType.USER_COUNT), 0);
+
+ this.quotaManager.decrementJobCount(TEST_NAME,
AbstractUserQuotaManager.CountType.USER_COUNT);
+ Assert.assertEquals(this.quotaManager.getCount(TEST_NAME,
AbstractUserQuotaManager.CountType.USER_COUNT), 0);
+
+ this.quotaManager.decrementJobCount(TEST_NAME,
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+ Assert.assertEquals(this.quotaManager.getCount(TEST_NAME,
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT), 1);
+ this.quotaManager.decrementJobCount(TEST_NAME,
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+ Assert.assertEquals(this.quotaManager.getCount(TEST_NAME,
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT), -1);
+ }
+
+ class ChangeCountRunnable implements Runnable {
+ boolean increaseOrDecrease;
+
+ public ChangeCountRunnable(boolean increaseOrDecrease) {
+ this.increaseOrDecrease = increaseOrDecrease;
+ }
+
+ @Override
+ public void run() {
+ int i = 0;
+ while (i++ < 1000) {
+ try {
+ if (increaseOrDecrease) {
+
MysqlUserQuotaManagerTest.this.quotaManager.incrementJobCount(TEST_NAME,
AbstractUserQuotaManager.CountType.USER_COUNT);
+ } else {
+
MysqlUserQuotaManagerTest.this.quotaManager.decrementJobCount(TEST_NAME,
AbstractUserQuotaManager.CountType.USER_COUNT);
+ }
+ } catch (IOException e) {
+ Assert.fail("Thread got an exception.", e);
+ }
+ }
+ }
+ }
+
+ @Test(dependsOnMethods = "testDecreaseCount")
+ public void testConcurrentChanges() throws IOException, InterruptedException
{
+ Runnable increaseCountRunnable = new ChangeCountRunnable(true);
+ Runnable decreaseCountRunnable = new ChangeCountRunnable(false);
+ Thread thread1 = new Thread(increaseCountRunnable);
+ Thread thread2 = new Thread(increaseCountRunnable);
+ Thread thread3 = new Thread(increaseCountRunnable);
+ Thread thread4 = new Thread(decreaseCountRunnable);
+ Thread thread5 = new Thread(decreaseCountRunnable);
+ Thread thread6 = new Thread(decreaseCountRunnable);
+
+ thread1.start();
+ thread2.start();
+ thread3.start();
+ thread1.join();
+ thread2.join();
+ thread3.join();
+ Assert.assertEquals(this.quotaManager.getCount(TEST_NAME,
AbstractUserQuotaManager.CountType.USER_COUNT), 3000);
+ thread4.start();
+ thread5.start();
+ thread6.start();
+ thread4.join();
+ thread5.join();
+ thread6.join();
+ Assert.assertEquals(this.quotaManager.getCount(TEST_NAME,
AbstractUserQuotaManager.CountType.USER_COUNT), -1);
Review Comment:
why does this become -1, should it decrease by 3?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * An abstract implementation of {@link UserQuotaManager} that
+ */
+@Slf4j
+abstract public class AbstractUserQuotaManager implements UserQuotaManager {
+ public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX +
"perUserQuota";
+ public static final String PER_FLOWGROUP_QUOTA =
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
+ public static final String USER_JOB_QUOTA_KEY =
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
+ public static final String QUOTA_SEPERATOR = ":";
+ public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
+ private final Map<String, Integer> perUserQuota;
+ private final Map<String, Integer> perFlowGroupQuota;
+ Set<String> runningDagIds = ConcurrentHashMap.newKeySet();
+ private final int defaultQuota;
+
+ public AbstractUserQuotaManager(Config config) {
+ this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY,
DEFAULT_USER_JOB_QUOTA);
+ ImmutableMap.Builder<String, Integer> userMapBuilder =
ImmutableMap.builder();
+ ImmutableMap.Builder<String, Integer> flowGroupMapBuilder =
ImmutableMap.builder();
+ // Quotas will take form of user:<Quota> and flowGroup:<Quota>
+ for (String flowGroupQuota : ConfigUtils.getStringList(config,
PER_FLOWGROUP_QUOTA)) {
+ flowGroupMapBuilder.put(flowGroupQuota.split(QUOTA_SEPERATOR)[0],
Integer.parseInt(flowGroupQuota.split(QUOTA_SEPERATOR)[1]));
+ }
+ // Keep quotas per user as well in form user:<Quota> which apply for all
flowgroups
+ for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA))
{
+ userMapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0],
Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
+ }
+ this.perUserQuota = userMapBuilder.build();
+ this.perFlowGroupQuota = flowGroupMapBuilder.build();
+ }
+
+ abstract int incrementJobCount(String key, CountType countType) throws
IOException;
+
+ abstract void decrementJobCount(String user, CountType countType) throws
IOException;
+
+ public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean
onInit) throws IOException {
+ // Dag is already being tracked, no need to double increment for retries
and multihop flows
+ if (isDagCurrentlyRunning(dagNode)) {
+ return;
+ }
+ String proxyUser =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
AzkabanProjectConfig.USER_TO_PROXY, null);
+ String flowGroup =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
+ ConfigurationKeys.FLOW_GROUP_KEY, "");
+ String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+ boolean proxyUserCheck = true;
+ Set<String> usersQuotaIncrement = new HashSet<>(); // holds the users for
which quota is increased
+ StringBuilder requesterMessage = new StringBuilder();
+ runningDagIds.add(DagManagerUtils.generateDagId(dagNode));
+ if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) {
+ int proxyQuotaIncrement = incrementJobCountAndCheckQuota(
+ DagManagerUtils.getUserQuotaKey(proxyUser, dagNode),
getQuotaForUser(proxyUser), CountType.USER_COUNT);
+ proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check
succeeds
+ if (!proxyUserCheck) {
+ // add 1 to proxyUserIncrement since count starts at 0, and is
negative if quota is exceeded
+ requesterMessage.append(String.format(
+ "Quota exceeded for proxy user %s on executor %s : quota=%s,
requests above quota=%d%n",
+ proxyUser, specExecutorUri, getQuotaForUser(proxyUser),
Math.abs(proxyQuotaIncrement)+1-getQuotaForUser(proxyUser)));
+ }
+ }
+
+ String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
+ boolean requesterCheck = true;
+
+ if (serializedRequesters != null &&
dagNode.getValue().getCurrentAttempts() <= 1) {
+ List<String> uniqueRequesters =
DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
+ for (String requester : uniqueRequesters) {
+ int userQuotaIncrement = incrementJobCountAndCheckQuota(
+ DagManagerUtils.getUserQuotaKey(requester, dagNode),
getQuotaForUser(requester), CountType.REQUESTER_COUNT);
+ boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota
check succeeds
+ usersQuotaIncrement.add(requester);
+ requesterCheck = requesterCheck && thisRequesterCheck;
+ if (!thisRequesterCheck) {
+ requesterMessage.append(String.format(
+ "Quota exceeded for requester %s on executor %s : quota=%s,
requests above quota=%d%n",
+ requester, specExecutorUri, getQuotaForUser(requester),
Math.abs(userQuotaIncrement)-getQuotaForUser(requester)));
+ }
+ }
+ }
+
+ boolean flowGroupCheck = true;
+ if (dagNode.getValue().getCurrentAttempts() <= 1) {
+ int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(
+ DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode),
getQuotaForFlowGroup(flowGroup), CountType.FLOW_COUNT);
+ flowGroupCheck = flowGroupQuotaIncrement >= 0;
+ if (!flowGroupCheck) {
+ requesterMessage.append(String.format("Quota exceeded for flowgroup %s
on executor %s : quota=%s, requests above quota=%d%n",
+ flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup),
+ Math.abs(flowGroupQuotaIncrement) + 1 -
getQuotaForFlowGroup(flowGroup)));
+ }
+ }
+
+ // Throw errors for reach quota at the end to avoid inconsistent job counts
Review Comment:
nice! this is a good catch
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * An abstract implementation of {@link UserQuotaManager} that
+ */
+@Slf4j
+abstract public class AbstractUserQuotaManager implements UserQuotaManager {
+ public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX +
"perUserQuota";
+ public static final String PER_FLOWGROUP_QUOTA =
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
+ public static final String USER_JOB_QUOTA_KEY =
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
+ public static final String QUOTA_SEPERATOR = ":";
+ public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
+ private final Map<String, Integer> perUserQuota;
+ private final Map<String, Integer> perFlowGroupQuota;
+ Set<String> runningDagIds = ConcurrentHashMap.newKeySet();
+ private final int defaultQuota;
+
+ public AbstractUserQuotaManager(Config config) {
+ this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY,
DEFAULT_USER_JOB_QUOTA);
Review Comment:
where is this being used
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManager.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.inject.Singleton;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * An implementation of {@link UserQuotaManager} that stores quota usage in
memory.
+ */
+@Slf4j
+@Singleton
+public class InMemoryUserQuotaManager extends AbstractUserQuotaManager {
+ private final Map<String, Integer> proxyUserToJobCount = new
ConcurrentHashMap<>();
+ private final Map<String, Integer> flowGroupToJobCount = new
ConcurrentHashMap<>();
+ private final Map<String, Integer> requesterToJobCount = new
ConcurrentHashMap<>();
+
+ @Inject
+ public InMemoryUserQuotaManager(Config config) {
+ super(config);
+ }
+
+ private int incrementJobCount(String key, Map<String, Integer> quotaMap) {
+ Integer currentCount;
+ // Modifications must be thread safe since DAGs on DagManagerThreads may
update the quota for the same user
+ do {
+ currentCount = quotaMap.get(key);
+ } while (currentCount == null ? quotaMap.putIfAbsent(key, 1) != null :
!quotaMap.replace(key, currentCount, currentCount + 1));
+
+ if (currentCount == null) {
+ currentCount = 0;
+ }
+
+ return currentCount;
+ }
+
+ private void decrementJobCount(String key, Map<String, Integer> quotaMap) {
+ Integer currentCount;
+ if (key == null) {
+ return;
+ }
+ do {
+ currentCount = quotaMap.get(key);
+ } while (currentCount != null && currentCount > 0 &&
!quotaMap.replace(key, currentCount, currentCount - 1));
Review Comment:
let's add an exception or ERROR log if there's a case decrement job count is
called and `currentCount` is 0 or negative. We had seem some negative flow
counts after host restart that were unexpected, so we want to be aware of odd
behavior
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * An abstract implementation of {@link UserQuotaManager} that
+ */
+@Slf4j
+abstract public class AbstractUserQuotaManager implements UserQuotaManager {
+ public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX +
"perUserQuota";
+ public static final String PER_FLOWGROUP_QUOTA =
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
+ public static final String USER_JOB_QUOTA_KEY =
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
+ public static final String QUOTA_SEPERATOR = ":";
+ public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
+ private final Map<String, Integer> perUserQuota;
+ private final Map<String, Integer> perFlowGroupQuota;
+ Set<String> runningDagIds = ConcurrentHashMap.newKeySet();
+ private final int defaultQuota;
+
+ public AbstractUserQuotaManager(Config config) {
+ this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY,
DEFAULT_USER_JOB_QUOTA);
+ ImmutableMap.Builder<String, Integer> userMapBuilder =
ImmutableMap.builder();
+ ImmutableMap.Builder<String, Integer> flowGroupMapBuilder =
ImmutableMap.builder();
+ // Quotas will take form of user:<Quota> and flowGroup:<Quota>
+ for (String flowGroupQuota : ConfigUtils.getStringList(config,
PER_FLOWGROUP_QUOTA)) {
+ flowGroupMapBuilder.put(flowGroupQuota.split(QUOTA_SEPERATOR)[0],
Integer.parseInt(flowGroupQuota.split(QUOTA_SEPERATOR)[1]));
+ }
+ // Keep quotas per user as well in form user:<Quota> which apply for all
flowgroups
+ for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA))
{
+ userMapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0],
Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
+ }
+ this.perUserQuota = userMapBuilder.build();
+ this.perFlowGroupQuota = flowGroupMapBuilder.build();
+ }
+
+ abstract int incrementJobCount(String key, CountType countType) throws
IOException;
+
+ abstract void decrementJobCount(String user, CountType countType) throws
IOException;
+
+ public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean
onInit) throws IOException {
+ // Dag is already being tracked, no need to double increment for retries
and multihop flows
+ if (isDagCurrentlyRunning(dagNode)) {
+ return;
+ }
+ String proxyUser =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
AzkabanProjectConfig.USER_TO_PROXY, null);
+ String flowGroup =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
+ ConfigurationKeys.FLOW_GROUP_KEY, "");
+ String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+ boolean proxyUserCheck = true;
+ Set<String> usersQuotaIncrement = new HashSet<>(); // holds the users for
which quota is increased
+ StringBuilder requesterMessage = new StringBuilder();
+ runningDagIds.add(DagManagerUtils.generateDagId(dagNode));
+ if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) {
+ int proxyQuotaIncrement = incrementJobCountAndCheckQuota(
+ DagManagerUtils.getUserQuotaKey(proxyUser, dagNode),
getQuotaForUser(proxyUser), CountType.USER_COUNT);
+ proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check
succeeds
+ if (!proxyUserCheck) {
+ // add 1 to proxyUserIncrement since count starts at 0, and is
negative if quota is exceeded
+ requesterMessage.append(String.format(
+ "Quota exceeded for proxy user %s on executor %s : quota=%s,
requests above quota=%d%n",
+ proxyUser, specExecutorUri, getQuotaForUser(proxyUser),
Math.abs(proxyQuotaIncrement)+1-getQuotaForUser(proxyUser)));
+ }
+ }
+
+ String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
+ boolean requesterCheck = true;
+
+ if (serializedRequesters != null &&
dagNode.getValue().getCurrentAttempts() <= 1) {
+ List<String> uniqueRequesters =
DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
+ for (String requester : uniqueRequesters) {
+ int userQuotaIncrement = incrementJobCountAndCheckQuota(
+ DagManagerUtils.getUserQuotaKey(requester, dagNode),
getQuotaForUser(requester), CountType.REQUESTER_COUNT);
+ boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota
check succeeds
+ usersQuotaIncrement.add(requester);
+ requesterCheck = requesterCheck && thisRequesterCheck;
+ if (!thisRequesterCheck) {
+ requesterMessage.append(String.format(
+ "Quota exceeded for requester %s on executor %s : quota=%s,
requests above quota=%d%n",
Review Comment:
for requester check, should we also not add 1?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+
+public class MysqlUserQuotaManagerTest {
+ private static final String USER = "testUser";
+ private static final String PASSWORD = "testPassword";
+ private static final String TABLE = "quotas";
+ private static final String TEST_NAME = "abora";
Review Comment:
rename this variable to make it more clear if this is the flow's name or
user/requester/flowGroup name
Issue Time Tracking
-------------------
Worklog Id: (was: 803754)
Time Spent: 0.5h (was: 20m)
> add a mysql based user quota manager
> ------------------------------------
>
> Key: GOBBLIN-1691
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1691
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> mysql based user quota manager can share the usage data with other instances
> of Gobblin Service
--
This message was sent by Atlassian Jira
(v8.20.10#820010)