ZihanLi58 commented on code in PR #3545: URL: https://github.com/apache/gobblin/pull/3545#discussion_r955317349
########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java: ########## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +import org.apache.commons.dbcp.BasicDataSource; + +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.Inject; +import com.typesafe.config.Config; + +import javax.inject.Singleton; +import javax.sql.DataSource; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metastore.MysqlStateStore; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.util.ConfigUtils; + +/** + * An implementation of {@link UserQuotaManager} that stores quota usage in mysql. + */ +@Slf4j +@Singleton +public class MysqlUserQuotaManager extends AbstractUserQuotaManager { + private final SimpleMysqlStore mysqlStore; + + @Inject + public MysqlUserQuotaManager(Config config) throws IOException { + super(config); + this.mysqlStore = createQuotaStore(config); + } + + @Override + public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean onInit) throws IOException { + // This implementation does not need to update quota usage when the service restarts or it's leadership status changes + if (!onInit) { + super.checkQuota(dagNode, onInit); + } + } + @Override + int incrementJobCount(String user, CountType countType) throws IOException { + try { + return this.mysqlStore.increaseCount(user, countType); + } catch (SQLException e) { + throw new IOException(e); + } + } + + @Override + void decrementJobCount(String user, CountType countType) throws IOException { + try { + this.mysqlStore.decreaseCount(user, countType); + } catch (SQLException e) { + throw new IOException(e); + } + } + + @VisibleForTesting + int getCount(String name, CountType countType) throws IOException { + return this.mysqlStore.getCount(name, countType); + } + + /** + * Creating an instance of StateStore. + */ + protected SimpleMysqlStore createQuotaStore(Config config) throws IOException { + String stateStoreTableName = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, + ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE); + + BasicDataSource basicDataSource = MysqlStateStore.newDataSource(config); + + return new SimpleMysqlStore(basicDataSource, stateStoreTableName); + } + + static class SimpleMysqlStore { Review Comment: QuotaMysqlStore? ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java: ########## @@ -29,16 +29,16 @@ import org.testng.annotations.Test; -public class UserQuotaManagerTest { +public class InMemoryUserQuotaManagerTest { - UserQuotaManager _quotaManager; + InMemoryUserQuotaManager _quotaManager; @BeforeClass public void setUp() { Config quotaConfig = ConfigFactory.empty() - .withValue(UserQuotaManager.PER_USER_QUOTA, ConfigValueFactory.fromAnyRef("user:1,user2:1,user3:1,user6:1")) - .withValue(UserQuotaManager.PER_FLOWGROUP_QUOTA, ConfigValueFactory.fromAnyRef("group1:1,group2:2")); - this._quotaManager = new UserQuotaManager(quotaConfig); + .withValue(InMemoryUserQuotaManager.PER_USER_QUOTA, ConfigValueFactory.fromAnyRef("user:1,user2:1,user3:1,user6:1")) Review Comment: what's the difference with AbstractUserQuotaManager.PER_USER_QUOTA? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java: ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.collect.ImmutableMap; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.RequesterService; +import org.apache.gobblin.service.ServiceRequester; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.util.ConfigUtils; + + +/** + * An abstract implementation of {@link UserQuotaManager} that + */ +@Slf4j +abstract public class AbstractUserQuotaManager implements UserQuotaManager { + public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + "perUserQuota"; + public static final String PER_FLOWGROUP_QUOTA = DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota"; + public static final String USER_JOB_QUOTA_KEY = DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota"; + public static final String QUOTA_SEPERATOR = ":"; + public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE; + private final Map<String, Integer> perUserQuota; + private final Map<String, Integer> perFlowGroupQuota; + Set<String> runningDagIds = ConcurrentHashMap.newKeySet(); + private final int defaultQuota; + + public AbstractUserQuotaManager(Config config) { + this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, DEFAULT_USER_JOB_QUOTA); + ImmutableMap.Builder<String, Integer> userMapBuilder = ImmutableMap.builder(); + ImmutableMap.Builder<String, Integer> flowGroupMapBuilder = ImmutableMap.builder(); + // Quotas will take form of user:<Quota> and flowGroup:<Quota> + for (String flowGroupQuota : ConfigUtils.getStringList(config, PER_FLOWGROUP_QUOTA)) { + flowGroupMapBuilder.put(flowGroupQuota.split(QUOTA_SEPERATOR)[0], Integer.parseInt(flowGroupQuota.split(QUOTA_SEPERATOR)[1])); + } + // Keep quotas per user as well in form user:<Quota> which apply for all flowgroups + for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA)) { + userMapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0], Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1])); + } + this.perUserQuota = userMapBuilder.build(); + this.perFlowGroupQuota = flowGroupMapBuilder.build(); + } + + abstract int incrementJobCount(String key, CountType countType) throws IOException; + + abstract void decrementJobCount(String user, CountType countType) throws IOException; Review Comment: "user" -> "key" to share the same name as incrementJobCount? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.collect.ImmutableMap; +import com.typesafe.config.Config; + +import lombok.AllArgsConstructor; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.RequesterService; +import org.apache.gobblin.service.ServiceRequester; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.util.ConfigUtils; + + +/** + * An abstract implementation of {@link UserQuotaManager} that + */ +@Slf4j +abstract public class AbstractUserQuotaManager implements UserQuotaManager { + public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + "perUserQuota"; + public static final String PER_FLOWGROUP_QUOTA = DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota"; + public static final String USER_JOB_QUOTA_KEY = DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota"; + public static final String QUOTA_SEPERATOR = ":"; + public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE; + private final Map<String, Integer> perUserQuota; + private final Map<String, Integer> perFlowGroupQuota; + Set<String> runningDagIds = ConcurrentHashMap.newKeySet(); Review Comment: We seems to use this set to avoid increase quota twice for same execution, how do you plan to avoid that in mysql quota manager? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
