[ 
https://issues.apache.org/jira/browse/GOBBLIN-1703?focusedWorklogId=812706&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-812706
 ]

ASF GitHub Bot logged work on GOBBLIN-1703:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Sep/22 00:56
            Start Date: 28/Sep/22 00:56
    Worklog Time Spent: 10m 
      Work Description: umustafi commented on code in PR #3550:
URL: https://github.com/apache/gobblin/pull/3550#discussion_r981844268


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java:
##########
@@ -256,4 +283,70 @@ public void decreaseCount(String name, CountType 
countType) throws IOException,
       }
     }
   }
+
+  static class RunningDagIdsStore {
+    protected final DataSource dataSource;
+    final String tableName;
+    private final String CONTAINS_DAG_ID;
+    private final String ADD_DAG_ID;
+    private final String REMOVE_DAG_ID;
+
+    public RunningDagIdsStore(BasicDataSource dataSource, String tableName)
+        throws IOException {
+      this.dataSource = dataSource;
+      this.tableName = tableName;
+
+      CONTAINS_DAG_ID = "SELECT EXISTS(SELECT * FROM " + tableName + " WHERE 
dagId = ?)" ;
+      ADD_DAG_ID = "INSERT INTO " + tableName + " (dagId) VALUES (?) ";
+      REMOVE_DAG_ID = "DELETE FROM " + tableName + " WHERE dagId = ?";
+
+      String createQuotaTable = "CREATE TABLE IF NOT EXISTS " + tableName + " 
(dagId VARCHAR(500) CHARACTER SET latin1 NOT NULL, "
+          + "PRIMARY KEY (dagId), UNIQUE INDEX ind (dagId))";
+      try (Connection connection = dataSource.getConnection(); 
PreparedStatement createStatement = 
connection.prepareStatement(createQuotaTable)) {
+        createStatement.executeUpdate();
+      } catch (SQLException e) {
+        throw new IOException("Failure creation table " + tableName, e);

Review Comment:
   can you also log if the validation query is set in case connection is 
refused?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java:
##########
@@ -181,6 +190,18 @@ private  AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) {
 
     // always try to compile the flow to verify if it is compilable
     Dag<JobExecutionPlan> dag = this.compileFlow(flowSpec);
+
+    if (this.warmStandbyEnabled &&
+        
(!flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)
 || PropertiesUtils.getPropAsBoolean(flowSpec.getConfigAsProperties(), 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
+      try {
+        // todo : we should probably check quota for all of the start nodes
+        userQuotaManager.checkQuota(dag.getNodes().get(0));

Review Comment:
   we should iterate over and check all startNodes like we discussed right. 
Only if we have capacity for all start nodes, we add this dag.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 812706)
    Time Spent: 50m  (was: 40m)

> avoid double quota usage increment for ad hoc flows
> ---------------------------------------------------
>
>                 Key: GOBBLIN-1703
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1703
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> When a gaas flow request comes, resource handler checks the quota right there.
> However, if the flow has runImmediately=true, the quota will be checked again 
> when the first job starts. This should be avoided.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to