[
https://issues.apache.org/jira/browse/GOBBLIN-1689?focusedWorklogId=805046&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-805046
]
ASF GitHub Bot logged work on GOBBLIN-1689:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 30/Aug/22 23:49
Start Date: 30/Aug/22 23:49
Worklog Time Spent: 10m
Work Description: Will-Lo commented on code in PR #3544:
URL: https://github.com/apache/gobblin/pull/3544#discussion_r959022111
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java:
##########
@@ -149,15 +150,47 @@ public void awaitHealthy() throws InterruptedException {
return;
}
- @Override
- public synchronized AddSpecResponse onAddSpec(Spec addedSpec) {
- TopologySpec spec = (TopologySpec) addedSpec;
- log.info ("Loading topology {}", spec.toLongString());
- for (Map.Entry entry: spec.getConfigAsProperties().entrySet()) {
- log.info ("topo: {} --> {}", entry.getKey(), entry.getValue());
+ private synchronized AddSpecResponse onAddTopologySpec(TopologySpec spec) {
+ log.info("Loading topology {}", spec.toLongString());
+ for (Map.Entry entry : spec.getConfigAsProperties().entrySet()) {
+ log.info("topo: {} --> {}", entry.getKey(), entry.getValue());
}
- topologySpecMap.put(addedSpec.getUri(), (TopologySpec) addedSpec);
+ topologySpecMap.put(spec.getUri(), spec);
+ return new AddSpecResponse(null);
+ }
+
+ private AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) {
+ Properties flowSpecProperties = flowSpec.getConfigAsProperties();
+ if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) &&
org.apache.commons.lang.StringUtils.isNotBlank(
+ flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) {
+ try {
+ new
CronExpression(flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
+ } catch (Exception e) {
+ log.error("invalid cron schedule: {}",
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY), e);
+ flowSpec.getCompilationErrors().add(new FlowSpec.CompilationError(0,
"invalid cron schedule: " +
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY) +
e.getMessage()));
+ return null;
+ }
+ }
+ String response = null;
+
+ // always try to compile the flow to verify if it is compilable
+ Dag<JobExecutionPlan> dag = this.compileFlow(flowSpec);
+ // If dag is null then a compilation error has occurred
+ if (dag != null && !dag.isEmpty()) {
+ response = dag.toString();
+ }
+ // todo: should we check quota here?
+ return new AddSpecResponse<>(response);
Review Comment:
So I think the only uncertainty here would be if a user has a scheduled
flow, should we store their changes permanently if their flow cannot be run
immediately due to quota?
Currently, the behavior is that we reject all changes if the flow is set to
run immediately and quota is overrun
If the flow is not set to run immediately, it should pass.
In other systems like ADF, the schedule of a flow and the configurations of
a flow are separate. So it can be argued that we accept changes, but refuse to
schedule them if their quota is overrun.
Issue Time Tracking
-------------------
Worklog Id: (was: 805046)
Time Spent: 2h 10m (was: 2h)
> Decouple compiler from scheduler in warm standby mode
> ------------------------------------------------------
>
> Key: GOBBLIN-1689
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1689
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Zihan Li
> Priority: Major
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> Now, when add/update flow spec, we rely on scheduler to compile the flow and
> return status, but in warm standby mode, where every host can accept and
> process request but not running the flow, we should be able to compile flow
> without scheduler.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)