[ 
https://issues.apache.org/jira/browse/GOBBLIN-1689?focusedWorklogId=805045&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-805045
 ]

ASF GitHub Bot logged work on GOBBLIN-1689:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Aug/22 23:48
            Start Date: 30/Aug/22 23:48
    Worklog Time Spent: 10m 
      Work Description: Will-Lo commented on code in PR #3544:
URL: https://github.com/apache/gobblin/pull/3544#discussion_r959018792


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -183,6 +186,7 @@ public void onDeleteSpec(URI deletedSpecURI, String 
deletedSpecVersion, Properti
     _log.info("Spec deletion detected: " + deletedSpecURI + "/" + 
deletedSpecVersion);
 
     if (topologyCatalog.isPresent()) {
+      //todo: de-risk: flow spec and topology spec share same uri?

Review Comment:
   this shouldn't be possible unless some user wants to explicitly name their 
flow config the url of their spec executor.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java:
##########
@@ -149,15 +150,47 @@ public void awaitHealthy() throws InterruptedException {
     return;
   }
 
-  @Override
-  public synchronized AddSpecResponse onAddSpec(Spec addedSpec) {
-    TopologySpec spec = (TopologySpec) addedSpec;
-    log.info ("Loading topology {}", spec.toLongString());
-    for (Map.Entry entry: spec.getConfigAsProperties().entrySet()) {
-      log.info ("topo: {} --> {}", entry.getKey(), entry.getValue());
+  private synchronized  AddSpecResponse onAddTopologySpec(TopologySpec spec) {
+    log.info("Loading topology {}", spec.toLongString());
+    for (Map.Entry entry : spec.getConfigAsProperties().entrySet()) {
+      log.info("topo: {} --> {}", entry.getKey(), entry.getValue());
     }
 
-    topologySpecMap.put(addedSpec.getUri(), (TopologySpec) addedSpec);
+    topologySpecMap.put(spec.getUri(), spec);
+    return new AddSpecResponse(null);
+  }
+
+  private  AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) {
+    Properties flowSpecProperties = flowSpec.getConfigAsProperties();
+    if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) && 
org.apache.commons.lang.StringUtils.isNotBlank(
+        flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) {
+      try {
+        new 
CronExpression(flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
+      } catch (Exception e) {
+        log.error("invalid cron schedule: {}", 
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY), e);
+        flowSpec.getCompilationErrors().add(new FlowSpec.CompilationError(0, 
"invalid cron schedule: " + 
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY) + 
e.getMessage()));
+        return null;
+      }
+    }
+    String response = null;
+
+    // always try to compile the flow to verify if it is compilable
+    Dag<JobExecutionPlan> dag = this.compileFlow(flowSpec);
+    // If dag is null then a compilation error has occurred
+    if (dag != null && !dag.isEmpty()) {
+      response = dag.toString();
+    }
+    // todo: should we check quota here?
+    return new AddSpecResponse<>(response);

Review Comment:
   Quota should be checked here if the flow is adhoc or set to runImmediately



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -393,7 +393,10 @@ private void configureServices(){
     registerServicesInLauncher();
 
     // Register Scheduler to listen to changes in Flows
-    if (configuration.isSchedulerEnabled()) {
+    // In warm standby mode, instead of scheduler we will add orchestrator as 
listener

Review Comment:
   Should we separate the compiler with the orchestrator? What we really want 
here is just the compiler to be used, and not any of the orchestration 
capabilities.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -393,7 +393,10 @@ private void configureServices(){
     registerServicesInLauncher();
 
     // Register Scheduler to listen to changes in Flows
-    if (configuration.isSchedulerEnabled()) {
+    // In warm standby mode, instead of scheduler we will add orchestrator as 
listener

Review Comment:
   So I think the only uncertainty here would be if a user has a scheduled 
flow, should we store their changes permanently if their flow cannot be run 
immediately due to quota?
   Currently, the behavior is that **we reject all changes if the flow is set 
to run immediately and quota is overrun**
   If the flow is not set to run immediately, it should pass.
   
   In other systems like ADF, the schedule of a flow and the configurations of 
a flow are separate. So it can be argued that we accept changes, but refuse to 
schedule them if their quota is overrun.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 805045)
    Time Spent: 2h  (was: 1h 50m)

> Decouple compiler from scheduler in warm standby mode 
> ------------------------------------------------------
>
>                 Key: GOBBLIN-1689
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1689
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Zihan Li
>            Priority: Major
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> Now, when add/update flow spec, we rely on scheduler to compile the flow and 
> return status, but in warm standby mode, where every host can accept and 
> process request but not running the flow, we should be able to compile flow 
> without scheduler. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to