Will-Lo commented on code in PR #3544:
URL: https://github.com/apache/gobblin/pull/3544#discussion_r959018792


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -183,6 +186,7 @@ public void onDeleteSpec(URI deletedSpecURI, String 
deletedSpecVersion, Properti
     _log.info("Spec deletion detected: " + deletedSpecURI + "/" + 
deletedSpecVersion);
 
     if (topologyCatalog.isPresent()) {
+      //todo: de-risk: flow spec and topology spec share same uri?

Review Comment:
   this shouldn't be possible unless some user wants to explicitly name their 
flow config the url of their spec executor.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java:
##########
@@ -149,15 +150,47 @@ public void awaitHealthy() throws InterruptedException {
     return;
   }
 
-  @Override
-  public synchronized AddSpecResponse onAddSpec(Spec addedSpec) {
-    TopologySpec spec = (TopologySpec) addedSpec;
-    log.info ("Loading topology {}", spec.toLongString());
-    for (Map.Entry entry: spec.getConfigAsProperties().entrySet()) {
-      log.info ("topo: {} --> {}", entry.getKey(), entry.getValue());
+  private synchronized  AddSpecResponse onAddTopologySpec(TopologySpec spec) {
+    log.info("Loading topology {}", spec.toLongString());
+    for (Map.Entry entry : spec.getConfigAsProperties().entrySet()) {
+      log.info("topo: {} --> {}", entry.getKey(), entry.getValue());
     }
 
-    topologySpecMap.put(addedSpec.getUri(), (TopologySpec) addedSpec);
+    topologySpecMap.put(spec.getUri(), spec);
+    return new AddSpecResponse(null);
+  }
+
+  private  AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) {
+    Properties flowSpecProperties = flowSpec.getConfigAsProperties();
+    if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) && 
org.apache.commons.lang.StringUtils.isNotBlank(
+        flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) {
+      try {
+        new 
CronExpression(flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
+      } catch (Exception e) {
+        log.error("invalid cron schedule: {}", 
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY), e);
+        flowSpec.getCompilationErrors().add(new FlowSpec.CompilationError(0, 
"invalid cron schedule: " + 
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY) + 
e.getMessage()));
+        return null;
+      }
+    }
+    String response = null;
+
+    // always try to compile the flow to verify if it is compilable
+    Dag<JobExecutionPlan> dag = this.compileFlow(flowSpec);
+    // If dag is null then a compilation error has occurred
+    if (dag != null && !dag.isEmpty()) {
+      response = dag.toString();
+    }
+    // todo: should we check quota here?
+    return new AddSpecResponse<>(response);

Review Comment:
   Quota should be checked here if the flow is adhoc or set to runImmediately



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -393,7 +393,10 @@ private void configureServices(){
     registerServicesInLauncher();
 
     // Register Scheduler to listen to changes in Flows
-    if (configuration.isSchedulerEnabled()) {
+    // In warm standby mode, instead of scheduler we will add orchestrator as 
listener

Review Comment:
   Should we separate the compiler with the orchestrator? What we really want 
here is just the compiler to be used, and not any of the orchestration 
capabilities.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -393,7 +393,10 @@ private void configureServices(){
     registerServicesInLauncher();
 
     // Register Scheduler to listen to changes in Flows
-    if (configuration.isSchedulerEnabled()) {
+    // In warm standby mode, instead of scheduler we will add orchestrator as 
listener

Review Comment:
   So I think the only uncertainty here would be if a user has a scheduled 
flow, should we store their changes permanently if their flow cannot be run 
immediately due to quota?
   Currently, the behavior is that **we reject all changes if the flow is set 
to run immediately and quota is overrun**
   If the flow is not set to run immediately, it should pass.
   
   In other systems like ADF, the schedule of a flow and the configurations of 
a flow are separate. So it can be argued that we accept changes, but refuse to 
schedule them if their quota is overrun.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to