[
https://issues.apache.org/jira/browse/GOBBLIN-1703?focusedWorklogId=812706&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-812706
]
ASF GitHub Bot logged work on GOBBLIN-1703:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 28/Sep/22 00:56
Start Date: 28/Sep/22 00:56
Worklog Time Spent: 10m
Work Description: umustafi commented on code in PR #3550:
URL: https://github.com/apache/gobblin/pull/3550#discussion_r981844268
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java:
##########
@@ -256,4 +283,70 @@ public void decreaseCount(String name, CountType
countType) throws IOException,
}
}
}
+
+ static class RunningDagIdsStore {
+ protected final DataSource dataSource;
+ final String tableName;
+ private final String CONTAINS_DAG_ID;
+ private final String ADD_DAG_ID;
+ private final String REMOVE_DAG_ID;
+
+ public RunningDagIdsStore(BasicDataSource dataSource, String tableName)
+ throws IOException {
+ this.dataSource = dataSource;
+ this.tableName = tableName;
+
+ CONTAINS_DAG_ID = "SELECT EXISTS(SELECT * FROM " + tableName + " WHERE
dagId = ?)" ;
+ ADD_DAG_ID = "INSERT INTO " + tableName + " (dagId) VALUES (?) ";
+ REMOVE_DAG_ID = "DELETE FROM " + tableName + " WHERE dagId = ?";
+
+ String createQuotaTable = "CREATE TABLE IF NOT EXISTS " + tableName + "
(dagId VARCHAR(500) CHARACTER SET latin1 NOT NULL, "
+ + "PRIMARY KEY (dagId), UNIQUE INDEX ind (dagId))";
+ try (Connection connection = dataSource.getConnection();
PreparedStatement createStatement =
connection.prepareStatement(createQuotaTable)) {
+ createStatement.executeUpdate();
+ } catch (SQLException e) {
+ throw new IOException("Failure creation table " + tableName, e);
Review Comment:
can you also log if the validation query is set in case connection is
refused?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java:
##########
@@ -181,6 +190,18 @@ private AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) {
// always try to compile the flow to verify if it is compilable
Dag<JobExecutionPlan> dag = this.compileFlow(flowSpec);
+
+ if (this.warmStandbyEnabled &&
+
(!flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)
|| PropertiesUtils.getPropAsBoolean(flowSpec.getConfigAsProperties(),
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
+ try {
+ // todo : we should probably check quota for all of the start nodes
+ userQuotaManager.checkQuota(dag.getNodes().get(0));
Review Comment:
we should iterate over and check all startNodes like we discussed right.
Only if we have capacity for all start nodes, we add this dag.
Issue Time Tracking
-------------------
Worklog Id: (was: 812706)
Time Spent: 50m (was: 40m)
> avoid double quota usage increment for ad hoc flows
> ---------------------------------------------------
>
> Key: GOBBLIN-1703
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1703
> Project: Apache Gobblin
> Issue Type: Bug
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 50m
> Remaining Estimate: 0h
>
> When a gaas flow request comes, resource handler checks the quota right there.
> However, if the flow has runImmediately=true, the quota will be checked again
> when the first job starts. This should be avoided.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)