Will-Lo commented on code in PR #3544:
URL: https://github.com/apache/gobblin/pull/3544#discussion_r959905999


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java:
##########
@@ -149,15 +150,47 @@ public void awaitHealthy() throws InterruptedException {
     return;
   }
 
-  @Override
-  public synchronized AddSpecResponse onAddSpec(Spec addedSpec) {
-    TopologySpec spec = (TopologySpec) addedSpec;
-    log.info ("Loading topology {}", spec.toLongString());
-    for (Map.Entry entry: spec.getConfigAsProperties().entrySet()) {
-      log.info ("topo: {} --> {}", entry.getKey(), entry.getValue());
+  private synchronized  AddSpecResponse onAddTopologySpec(TopologySpec spec) {
+    log.info("Loading topology {}", spec.toLongString());
+    for (Map.Entry entry : spec.getConfigAsProperties().entrySet()) {
+      log.info("topo: {} --> {}", entry.getKey(), entry.getValue());
     }
 
-    topologySpecMap.put(addedSpec.getUri(), (TopologySpec) addedSpec);
+    topologySpecMap.put(spec.getUri(), spec);
+    return new AddSpecResponse(null);
+  }
+
+  private  AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) {
+    Properties flowSpecProperties = flowSpec.getConfigAsProperties();
+    if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) && 
org.apache.commons.lang.StringUtils.isNotBlank(
+        flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) {
+      try {
+        new 
CronExpression(flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
+      } catch (Exception e) {
+        log.error("invalid cron schedule: {}", 
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY), e);
+        flowSpec.getCompilationErrors().add(new FlowSpec.CompilationError(0, 
"invalid cron schedule: " + 
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY) + 
e.getMessage()));
+        return null;
+      }
+    }
+    String response = null;
+
+    // always try to compile the flow to verify if it is compilable
+    Dag<JobExecutionPlan> dag = this.compileFlow(flowSpec);
+    // If dag is null then a compilation error has occurred
+    if (dag != null && !dag.isEmpty()) {
+      response = dag.toString();
+    }
+    // todo: should we check quota here?
+    return new AddSpecResponse<>(response);

Review Comment:
   I think runImmediately is a fairly common user use-case. We had a large 
scale user who would store flows with a schedule very far out, and then invoke 
it in an adhoc way using runImmediately. This would help them run backfills 
with the same configurations if needed, and also avoid concurrency issues as 
scheduled flows validate that they're not already running (but adhoc does not 
do this).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to