[
https://issues.apache.org/jira/browse/GOBBLIN-1689?focusedWorklogId=805345&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-805345
]
ASF GitHub Bot logged work on GOBBLIN-1689:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 31/Aug/22 19:00
Start Date: 31/Aug/22 19:00
Worklog Time Spent: 10m
Work Description: ZihanLi58 commented on code in PR #3544:
URL: https://github.com/apache/gobblin/pull/3544#discussion_r959929174
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java:
##########
@@ -149,15 +150,47 @@ public void awaitHealthy() throws InterruptedException {
return;
}
- @Override
- public synchronized AddSpecResponse onAddSpec(Spec addedSpec) {
- TopologySpec spec = (TopologySpec) addedSpec;
- log.info ("Loading topology {}", spec.toLongString());
- for (Map.Entry entry: spec.getConfigAsProperties().entrySet()) {
- log.info ("topo: {} --> {}", entry.getKey(), entry.getValue());
+ private synchronized AddSpecResponse onAddTopologySpec(TopologySpec spec) {
+ log.info("Loading topology {}", spec.toLongString());
+ for (Map.Entry entry : spec.getConfigAsProperties().entrySet()) {
+ log.info("topo: {} --> {}", entry.getKey(), entry.getValue());
}
- topologySpecMap.put(addedSpec.getUri(), (TopologySpec) addedSpec);
+ topologySpecMap.put(spec.getUri(), spec);
+ return new AddSpecResponse(null);
+ }
+
+ private AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) {
+ Properties flowSpecProperties = flowSpec.getConfigAsProperties();
+ if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) &&
org.apache.commons.lang.StringUtils.isNotBlank(
+ flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) {
+ try {
+ new
CronExpression(flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
+ } catch (Exception e) {
+ log.error("invalid cron schedule: {}",
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY), e);
+ flowSpec.getCompilationErrors().add(new FlowSpec.CompilationError(0,
"invalid cron schedule: " +
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY) +
e.getMessage()));
+ return null;
+ }
+ }
+ String response = null;
+
+ // always try to compile the flow to verify if it is compilable
+ Dag<JobExecutionPlan> dag = this.compileFlow(flowSpec);
+ // If dag is null then a compilation error has occurred
+ if (dag != null && !dag.isEmpty()) {
+ response = dag.toString();
+ }
+ // todo: should we check quota here?
+ return new AddSpecResponse<>(response);
Review Comment:
Yeah, when we disable run immediate, we should be able to support adhoc run
for scheduled flow once we move adhoc flow to be triggered by dag action store
instead of spec store. But now we might want to keep the same behavior as
before.
Specifically for the use case where the schedule is far out and they want to
run it immediately, I believe we want to reject the request and ask them to
submit that again when quota is good
Issue Time Tracking
-------------------
Worklog Id: (was: 805345)
Time Spent: 3h 40m (was: 3.5h)
> Decouple compiler from scheduler in warm standby mode
> ------------------------------------------------------
>
> Key: GOBBLIN-1689
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1689
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Zihan Li
> Priority: Major
> Time Spent: 3h 40m
> Remaining Estimate: 0h
>
> Now, when add/update flow spec, we rely on scheduler to compile the flow and
> return status, but in warm standby mode, where every host can accept and
> process request but not running the flow, we should be able to compile flow
> without scheduler.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)