[ 
https://issues.apache.org/jira/browse/GOBBLIN-1689?focusedWorklogId=805323&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-805323
 ]

ASF GitHub Bot logged work on GOBBLIN-1689:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 31/Aug/22 17:44
            Start Date: 31/Aug/22 17:44
    Worklog Time Spent: 10m 
      Work Description: ZihanLi58 commented on code in PR #3544:
URL: https://github.com/apache/gobblin/pull/3544#discussion_r959858522


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java:
##########
@@ -149,15 +150,47 @@ public void awaitHealthy() throws InterruptedException {
     return;
   }
 
-  @Override
-  public synchronized AddSpecResponse onAddSpec(Spec addedSpec) {
-    TopologySpec spec = (TopologySpec) addedSpec;
-    log.info ("Loading topology {}", spec.toLongString());
-    for (Map.Entry entry: spec.getConfigAsProperties().entrySet()) {
-      log.info ("topo: {} --> {}", entry.getKey(), entry.getValue());
+  private synchronized  AddSpecResponse onAddTopologySpec(TopologySpec spec) {
+    log.info("Loading topology {}", spec.toLongString());
+    for (Map.Entry entry : spec.getConfigAsProperties().entrySet()) {
+      log.info("topo: {} --> {}", entry.getKey(), entry.getValue());
     }
 
-    topologySpecMap.put(addedSpec.getUri(), (TopologySpec) addedSpec);
+    topologySpecMap.put(spec.getUri(), spec);
+    return new AddSpecResponse(null);
+  }
+
+  private  AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) {
+    Properties flowSpecProperties = flowSpec.getConfigAsProperties();
+    if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) && 
org.apache.commons.lang.StringUtils.isNotBlank(
+        flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) {
+      try {
+        new 
CronExpression(flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
+      } catch (Exception e) {
+        log.error("invalid cron schedule: {}", 
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY), e);
+        flowSpec.getCompilationErrors().add(new FlowSpec.CompilationError(0, 
"invalid cron schedule: " + 
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY) + 
e.getMessage()));
+        return null;
+      }
+    }
+    String response = null;
+
+    // always try to compile the flow to verify if it is compilable
+    Dag<JobExecutionPlan> dag = this.compileFlow(flowSpec);
+    // If dag is null then a compilation error has occurred
+    if (dag != null && !dag.isEmpty()) {
+      response = dag.toString();
+    }
+    // todo: should we check quota here?
+    return new AddSpecResponse<>(response);

Review Comment:
   Yup, I leave the todo hint here for the next PR after mysql quota is in 
place. 
   I think it's easy for us to not check quota for run immediate flow here and 
rely on the leader to check quota and skip it. That way we can achieve the goal 
you described. But I'm not sure whether we want to change the behavior of the 
service. I would prefer to disable run immediate function and only support 
adhoc when we migrate adhoc flow into dag action store. 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 805323)
    Time Spent: 2h 20m  (was: 2h 10m)

> Decouple compiler from scheduler in warm standby mode 
> ------------------------------------------------------
>
>                 Key: GOBBLIN-1689
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1689
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Zihan Li
>            Priority: Major
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Now, when add/update flow spec, we rely on scheduler to compile the flow and 
> return status, but in warm standby mode, where every host can accept and 
> process request but not running the flow, we should be able to compile flow 
> without scheduler. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to