[ 
https://issues.apache.org/jira/browse/GOBBLIN-1691?focusedWorklogId=803776&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-803776
 ]

ASF GitHub Bot logged work on GOBBLIN-1691:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Aug/22 20:29
            Start Date: 25/Aug/22 20:29
    Worklog Time Spent: 10m 
      Work Description: ZihanLi58 commented on code in PR #3545:
URL: https://github.com/apache/gobblin/pull/3545#discussion_r955317349


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java:
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.commons.dbcp.BasicDataSource;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import javax.inject.Singleton;
+import javax.sql.DataSource;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+/**
+ * An implementation of {@link UserQuotaManager} that stores quota usage in 
mysql.
+ */
+@Slf4j
+@Singleton
+public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
+  private final SimpleMysqlStore mysqlStore;
+
+  @Inject
+  public MysqlUserQuotaManager(Config config) throws IOException {
+    super(config);
+    this.mysqlStore = createQuotaStore(config);
+  }
+
+  @Override
+  public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean 
onInit) throws IOException {
+    // This implementation does not need to update quota usage when the 
service restarts or it's leadership status changes
+    if (!onInit) {
+      super.checkQuota(dagNode, onInit);
+    }
+  }
+  @Override
+  int incrementJobCount(String user, CountType countType) throws IOException {
+    try {
+      return this.mysqlStore.increaseCount(user, countType);
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  void decrementJobCount(String user, CountType countType) throws IOException {
+    try {
+      this.mysqlStore.decreaseCount(user, countType);
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @VisibleForTesting
+  int getCount(String name, CountType countType) throws IOException {
+    return this.mysqlStore.getCount(name, countType);
+  }
+
+  /**
+   * Creating an instance of StateStore.
+   */
+  protected SimpleMysqlStore createQuotaStore(Config config) throws 
IOException {
+    String stateStoreTableName = ConfigUtils.getString(config, 
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
+        ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE);
+
+    BasicDataSource basicDataSource = MysqlStateStore.newDataSource(config);
+
+    return new SimpleMysqlStore(basicDataSource, stateStoreTableName);
+  }
+
+  static class SimpleMysqlStore {

Review Comment:
   QuotaMysqlStore?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java:
##########
@@ -29,16 +29,16 @@
 import org.testng.annotations.Test;
 
 
-public class UserQuotaManagerTest {
+public class InMemoryUserQuotaManagerTest {
 
-  UserQuotaManager _quotaManager;
+  InMemoryUserQuotaManager _quotaManager;
 
   @BeforeClass
   public void setUp() {
     Config quotaConfig = ConfigFactory.empty()
-        .withValue(UserQuotaManager.PER_USER_QUOTA, 
ConfigValueFactory.fromAnyRef("user:1,user2:1,user3:1,user6:1"))
-        .withValue(UserQuotaManager.PER_FLOWGROUP_QUOTA, 
ConfigValueFactory.fromAnyRef("group1:1,group2:2"));
-    this._quotaManager = new UserQuotaManager(quotaConfig);
+        .withValue(InMemoryUserQuotaManager.PER_USER_QUOTA, 
ConfigValueFactory.fromAnyRef("user:1,user2:1,user3:1,user6:1"))

Review Comment:
   what's the difference with AbstractUserQuotaManager.PER_USER_QUOTA?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * An abstract implementation of {@link UserQuotaManager} that
+ */
+@Slf4j
+abstract public class AbstractUserQuotaManager implements UserQuotaManager {
+  public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + 
"perUserQuota";
+  public static final String PER_FLOWGROUP_QUOTA = 
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
+  public static final String USER_JOB_QUOTA_KEY = 
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
+  public static final String QUOTA_SEPERATOR = ":";
+  public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
+  private final Map<String, Integer> perUserQuota;
+  private final Map<String, Integer> perFlowGroupQuota;
+  Set<String> runningDagIds = ConcurrentHashMap.newKeySet();
+  private final int defaultQuota;
+
+  public AbstractUserQuotaManager(Config config) {
+    this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, 
DEFAULT_USER_JOB_QUOTA);
+    ImmutableMap.Builder<String, Integer> userMapBuilder = 
ImmutableMap.builder();
+    ImmutableMap.Builder<String, Integer> flowGroupMapBuilder = 
ImmutableMap.builder();
+    // Quotas will take form of user:<Quota> and flowGroup:<Quota>
+    for (String flowGroupQuota : ConfigUtils.getStringList(config, 
PER_FLOWGROUP_QUOTA)) {
+      flowGroupMapBuilder.put(flowGroupQuota.split(QUOTA_SEPERATOR)[0], 
Integer.parseInt(flowGroupQuota.split(QUOTA_SEPERATOR)[1]));
+    }
+    // Keep quotas per user as well in form user:<Quota> which apply for all 
flowgroups
+    for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA)) 
{
+      userMapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0], 
Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
+    }
+    this.perUserQuota = userMapBuilder.build();
+    this.perFlowGroupQuota = flowGroupMapBuilder.build();
+  }
+
+  abstract int incrementJobCount(String key, CountType countType) throws 
IOException;
+
+  abstract void decrementJobCount(String user, CountType countType) throws 
IOException;

Review Comment:
   "user" -> "key" to share the same name as  incrementJobCount?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * An abstract implementation of {@link UserQuotaManager} that
+ */
+@Slf4j
+abstract public class AbstractUserQuotaManager implements UserQuotaManager {
+  public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + 
"perUserQuota";
+  public static final String PER_FLOWGROUP_QUOTA = 
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
+  public static final String USER_JOB_QUOTA_KEY = 
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
+  public static final String QUOTA_SEPERATOR = ":";
+  public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
+  private final Map<String, Integer> perUserQuota;
+  private final Map<String, Integer> perFlowGroupQuota;
+  Set<String> runningDagIds = ConcurrentHashMap.newKeySet();

Review Comment:
   We seems to use this set to avoid increase quota twice for same execution, 
how do you plan to avoid that in mysql quota manager?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 803776)
    Time Spent: 40m  (was: 0.5h)

> add a mysql based user quota manager
> ------------------------------------
>
>                 Key: GOBBLIN-1691
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1691
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> mysql based user quota manager can share the usage data with other instances 
> of Gobblin Service



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to