[ 
https://issues.apache.org/jira/browse/GOBBLIN-1689?focusedWorklogId=805046&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-805046
 ]

ASF GitHub Bot logged work on GOBBLIN-1689:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Aug/22 23:49
            Start Date: 30/Aug/22 23:49
    Worklog Time Spent: 10m 
      Work Description: Will-Lo commented on code in PR #3544:
URL: https://github.com/apache/gobblin/pull/3544#discussion_r959022111


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java:
##########
@@ -149,15 +150,47 @@ public void awaitHealthy() throws InterruptedException {
     return;
   }
 
-  @Override
-  public synchronized AddSpecResponse onAddSpec(Spec addedSpec) {
-    TopologySpec spec = (TopologySpec) addedSpec;
-    log.info ("Loading topology {}", spec.toLongString());
-    for (Map.Entry entry: spec.getConfigAsProperties().entrySet()) {
-      log.info ("topo: {} --> {}", entry.getKey(), entry.getValue());
+  private synchronized  AddSpecResponse onAddTopologySpec(TopologySpec spec) {
+    log.info("Loading topology {}", spec.toLongString());
+    for (Map.Entry entry : spec.getConfigAsProperties().entrySet()) {
+      log.info("topo: {} --> {}", entry.getKey(), entry.getValue());
     }
 
-    topologySpecMap.put(addedSpec.getUri(), (TopologySpec) addedSpec);
+    topologySpecMap.put(spec.getUri(), spec);
+    return new AddSpecResponse(null);
+  }
+
+  private  AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) {
+    Properties flowSpecProperties = flowSpec.getConfigAsProperties();
+    if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) && 
org.apache.commons.lang.StringUtils.isNotBlank(
+        flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) {
+      try {
+        new 
CronExpression(flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
+      } catch (Exception e) {
+        log.error("invalid cron schedule: {}", 
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY), e);
+        flowSpec.getCompilationErrors().add(new FlowSpec.CompilationError(0, 
"invalid cron schedule: " + 
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY) + 
e.getMessage()));
+        return null;
+      }
+    }
+    String response = null;
+
+    // always try to compile the flow to verify if it is compilable
+    Dag<JobExecutionPlan> dag = this.compileFlow(flowSpec);
+    // If dag is null then a compilation error has occurred
+    if (dag != null && !dag.isEmpty()) {
+      response = dag.toString();
+    }
+    // todo: should we check quota here?
+    return new AddSpecResponse<>(response);

Review Comment:
   So I think the only uncertainty here would be if a user has a scheduled 
flow, should we store their changes permanently if their flow cannot be run 
immediately due to quota?
   Currently, the behavior is that we reject all changes if the flow is set to 
run immediately and quota is overrun
   If the flow is not set to run immediately, it should pass.
   
   In other systems like ADF, the schedule of a flow and the configurations of 
a flow are separate. So it can be argued that we accept changes, but refuse to 
schedule them if their quota is overrun.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 805046)
    Time Spent: 2h 10m  (was: 2h)

> Decouple compiler from scheduler in warm standby mode 
> ------------------------------------------------------
>
>                 Key: GOBBLIN-1689
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1689
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Zihan Li
>            Priority: Major
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Now, when add/update flow spec, we rely on scheduler to compile the flow and 
> return status, but in warm standby mode, where every host can accept and 
> process request but not running the flow, we should be able to compile flow 
> without scheduler. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to