[
https://issues.apache.org/jira/browse/GOBBLIN-1691?focusedWorklogId=803776&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-803776
]
ASF GitHub Bot logged work on GOBBLIN-1691:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 25/Aug/22 20:29
Start Date: 25/Aug/22 20:29
Worklog Time Spent: 10m
Work Description: ZihanLi58 commented on code in PR #3545:
URL: https://github.com/apache/gobblin/pull/3545#discussion_r955317349
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java:
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.commons.dbcp.BasicDataSource;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import javax.inject.Singleton;
+import javax.sql.DataSource;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+/**
+ * An implementation of {@link UserQuotaManager} that stores quota usage in
mysql.
+ */
+@Slf4j
+@Singleton
+public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
+ private final SimpleMysqlStore mysqlStore;
+
+ @Inject
+ public MysqlUserQuotaManager(Config config) throws IOException {
+ super(config);
+ this.mysqlStore = createQuotaStore(config);
+ }
+
+ @Override
+ public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean
onInit) throws IOException {
+ // This implementation does not need to update quota usage when the
service restarts or it's leadership status changes
+ if (!onInit) {
+ super.checkQuota(dagNode, onInit);
+ }
+ }
+ @Override
+ int incrementJobCount(String user, CountType countType) throws IOException {
+ try {
+ return this.mysqlStore.increaseCount(user, countType);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ void decrementJobCount(String user, CountType countType) throws IOException {
+ try {
+ this.mysqlStore.decreaseCount(user, countType);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @VisibleForTesting
+ int getCount(String name, CountType countType) throws IOException {
+ return this.mysqlStore.getCount(name, countType);
+ }
+
+ /**
+ * Creating an instance of StateStore.
+ */
+ protected SimpleMysqlStore createQuotaStore(Config config) throws
IOException {
+ String stateStoreTableName = ConfigUtils.getString(config,
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
+ ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE);
+
+ BasicDataSource basicDataSource = MysqlStateStore.newDataSource(config);
+
+ return new SimpleMysqlStore(basicDataSource, stateStoreTableName);
+ }
+
+ static class SimpleMysqlStore {
Review Comment:
QuotaMysqlStore?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java:
##########
@@ -29,16 +29,16 @@
import org.testng.annotations.Test;
-public class UserQuotaManagerTest {
+public class InMemoryUserQuotaManagerTest {
- UserQuotaManager _quotaManager;
+ InMemoryUserQuotaManager _quotaManager;
@BeforeClass
public void setUp() {
Config quotaConfig = ConfigFactory.empty()
- .withValue(UserQuotaManager.PER_USER_QUOTA,
ConfigValueFactory.fromAnyRef("user:1,user2:1,user3:1,user6:1"))
- .withValue(UserQuotaManager.PER_FLOWGROUP_QUOTA,
ConfigValueFactory.fromAnyRef("group1:1,group2:2"));
- this._quotaManager = new UserQuotaManager(quotaConfig);
+ .withValue(InMemoryUserQuotaManager.PER_USER_QUOTA,
ConfigValueFactory.fromAnyRef("user:1,user2:1,user3:1,user6:1"))
Review Comment:
what's the difference with AbstractUserQuotaManager.PER_USER_QUOTA?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * An abstract implementation of {@link UserQuotaManager} that
+ */
+@Slf4j
+abstract public class AbstractUserQuotaManager implements UserQuotaManager {
+ public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX +
"perUserQuota";
+ public static final String PER_FLOWGROUP_QUOTA =
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
+ public static final String USER_JOB_QUOTA_KEY =
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
+ public static final String QUOTA_SEPERATOR = ":";
+ public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
+ private final Map<String, Integer> perUserQuota;
+ private final Map<String, Integer> perFlowGroupQuota;
+ Set<String> runningDagIds = ConcurrentHashMap.newKeySet();
+ private final int defaultQuota;
+
+ public AbstractUserQuotaManager(Config config) {
+ this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY,
DEFAULT_USER_JOB_QUOTA);
+ ImmutableMap.Builder<String, Integer> userMapBuilder =
ImmutableMap.builder();
+ ImmutableMap.Builder<String, Integer> flowGroupMapBuilder =
ImmutableMap.builder();
+ // Quotas will take form of user:<Quota> and flowGroup:<Quota>
+ for (String flowGroupQuota : ConfigUtils.getStringList(config,
PER_FLOWGROUP_QUOTA)) {
+ flowGroupMapBuilder.put(flowGroupQuota.split(QUOTA_SEPERATOR)[0],
Integer.parseInt(flowGroupQuota.split(QUOTA_SEPERATOR)[1]));
+ }
+ // Keep quotas per user as well in form user:<Quota> which apply for all
flowgroups
+ for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA))
{
+ userMapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0],
Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
+ }
+ this.perUserQuota = userMapBuilder.build();
+ this.perFlowGroupQuota = flowGroupMapBuilder.build();
+ }
+
+ abstract int incrementJobCount(String key, CountType countType) throws
IOException;
+
+ abstract void decrementJobCount(String user, CountType countType) throws
IOException;
Review Comment:
"user" -> "key" to share the same name as incrementJobCount?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * An abstract implementation of {@link UserQuotaManager} that
+ */
+@Slf4j
+abstract public class AbstractUserQuotaManager implements UserQuotaManager {
+ public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX +
"perUserQuota";
+ public static final String PER_FLOWGROUP_QUOTA =
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
+ public static final String USER_JOB_QUOTA_KEY =
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
+ public static final String QUOTA_SEPERATOR = ":";
+ public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
+ private final Map<String, Integer> perUserQuota;
+ private final Map<String, Integer> perFlowGroupQuota;
+ Set<String> runningDagIds = ConcurrentHashMap.newKeySet();
Review Comment:
We seems to use this set to avoid increase quota twice for same execution,
how do you plan to avoid that in mysql quota manager?
Issue Time Tracking
-------------------
Worklog Id: (was: 803776)
Time Spent: 40m (was: 0.5h)
> add a mysql based user quota manager
> ------------------------------------
>
> Key: GOBBLIN-1691
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1691
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 40m
> Remaining Estimate: 0h
>
> mysql based user quota manager can share the usage data with other instances
> of Gobblin Service
--
This message was sent by Atlassian Jira
(v8.20.10#820010)