Will-Lo commented on code in PR #3544:
URL: https://github.com/apache/gobblin/pull/3544#discussion_r959018792
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -183,6 +186,7 @@ public void onDeleteSpec(URI deletedSpecURI, String
deletedSpecVersion, Properti
_log.info("Spec deletion detected: " + deletedSpecURI + "/" +
deletedSpecVersion);
if (topologyCatalog.isPresent()) {
+ //todo: de-risk: flow spec and topology spec share same uri?
Review Comment:
this shouldn't be possible unless some user wants to explicitly name their
flow config the url of their spec executor.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java:
##########
@@ -149,15 +150,47 @@ public void awaitHealthy() throws InterruptedException {
return;
}
- @Override
- public synchronized AddSpecResponse onAddSpec(Spec addedSpec) {
- TopologySpec spec = (TopologySpec) addedSpec;
- log.info ("Loading topology {}", spec.toLongString());
- for (Map.Entry entry: spec.getConfigAsProperties().entrySet()) {
- log.info ("topo: {} --> {}", entry.getKey(), entry.getValue());
+ private synchronized AddSpecResponse onAddTopologySpec(TopologySpec spec) {
+ log.info("Loading topology {}", spec.toLongString());
+ for (Map.Entry entry : spec.getConfigAsProperties().entrySet()) {
+ log.info("topo: {} --> {}", entry.getKey(), entry.getValue());
}
- topologySpecMap.put(addedSpec.getUri(), (TopologySpec) addedSpec);
+ topologySpecMap.put(spec.getUri(), spec);
+ return new AddSpecResponse(null);
+ }
+
+ private AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) {
+ Properties flowSpecProperties = flowSpec.getConfigAsProperties();
+ if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) &&
org.apache.commons.lang.StringUtils.isNotBlank(
+ flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) {
+ try {
+ new
CronExpression(flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
+ } catch (Exception e) {
+ log.error("invalid cron schedule: {}",
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY), e);
+ flowSpec.getCompilationErrors().add(new FlowSpec.CompilationError(0,
"invalid cron schedule: " +
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY) +
e.getMessage()));
+ return null;
+ }
+ }
+ String response = null;
+
+ // always try to compile the flow to verify if it is compilable
+ Dag<JobExecutionPlan> dag = this.compileFlow(flowSpec);
+ // If dag is null then a compilation error has occurred
+ if (dag != null && !dag.isEmpty()) {
+ response = dag.toString();
+ }
+ // todo: should we check quota here?
+ return new AddSpecResponse<>(response);
Review Comment:
Quota should be checked here if the flow is adhoc or set to runImmediately
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -393,7 +393,10 @@ private void configureServices(){
registerServicesInLauncher();
// Register Scheduler to listen to changes in Flows
- if (configuration.isSchedulerEnabled()) {
+ // In warm standby mode, instead of scheduler we will add orchestrator as
listener
Review Comment:
Should we separate the compiler with the orchestrator? What we really want
here is just the compiler to be used, and not any of the orchestration
capabilities.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -393,7 +393,10 @@ private void configureServices(){
registerServicesInLauncher();
// Register Scheduler to listen to changes in Flows
- if (configuration.isSchedulerEnabled()) {
+ // In warm standby mode, instead of scheduler we will add orchestrator as
listener
Review Comment:
So I think the only uncertainty here would be if a user has a scheduled
flow, should we store their changes permanently if their flow cannot be run
immediately due to quota?
Currently, the behavior is that **we reject all changes if the flow is set
to run immediately and quota is overrun**
If the flow is not set to run immediately, it should pass.
In other systems like ADF, the schedule of a flow and the configurations of
a flow are separate. So it can be argued that we accept changes, but refuse to
schedule them if their quota is overrun.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]