[
https://issues.apache.org/jira/browse/GOBBLIN-1689?focusedWorklogId=805045&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-805045
]
ASF GitHub Bot logged work on GOBBLIN-1689:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 30/Aug/22 23:48
Start Date: 30/Aug/22 23:48
Worklog Time Spent: 10m
Work Description: Will-Lo commented on code in PR #3544:
URL: https://github.com/apache/gobblin/pull/3544#discussion_r959018792
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -183,6 +186,7 @@ public void onDeleteSpec(URI deletedSpecURI, String
deletedSpecVersion, Properti
_log.info("Spec deletion detected: " + deletedSpecURI + "/" +
deletedSpecVersion);
if (topologyCatalog.isPresent()) {
+ //todo: de-risk: flow spec and topology spec share same uri?
Review Comment:
this shouldn't be possible unless some user wants to explicitly name their
flow config the url of their spec executor.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java:
##########
@@ -149,15 +150,47 @@ public void awaitHealthy() throws InterruptedException {
return;
}
- @Override
- public synchronized AddSpecResponse onAddSpec(Spec addedSpec) {
- TopologySpec spec = (TopologySpec) addedSpec;
- log.info ("Loading topology {}", spec.toLongString());
- for (Map.Entry entry: spec.getConfigAsProperties().entrySet()) {
- log.info ("topo: {} --> {}", entry.getKey(), entry.getValue());
+ private synchronized AddSpecResponse onAddTopologySpec(TopologySpec spec) {
+ log.info("Loading topology {}", spec.toLongString());
+ for (Map.Entry entry : spec.getConfigAsProperties().entrySet()) {
+ log.info("topo: {} --> {}", entry.getKey(), entry.getValue());
}
- topologySpecMap.put(addedSpec.getUri(), (TopologySpec) addedSpec);
+ topologySpecMap.put(spec.getUri(), spec);
+ return new AddSpecResponse(null);
+ }
+
+ private AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) {
+ Properties flowSpecProperties = flowSpec.getConfigAsProperties();
+ if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) &&
org.apache.commons.lang.StringUtils.isNotBlank(
+ flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) {
+ try {
+ new
CronExpression(flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
+ } catch (Exception e) {
+ log.error("invalid cron schedule: {}",
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY), e);
+ flowSpec.getCompilationErrors().add(new FlowSpec.CompilationError(0,
"invalid cron schedule: " +
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY) +
e.getMessage()));
+ return null;
+ }
+ }
+ String response = null;
+
+ // always try to compile the flow to verify if it is compilable
+ Dag<JobExecutionPlan> dag = this.compileFlow(flowSpec);
+ // If dag is null then a compilation error has occurred
+ if (dag != null && !dag.isEmpty()) {
+ response = dag.toString();
+ }
+ // todo: should we check quota here?
+ return new AddSpecResponse<>(response);
Review Comment:
Quota should be checked here if the flow is adhoc or set to runImmediately
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -393,7 +393,10 @@ private void configureServices(){
registerServicesInLauncher();
// Register Scheduler to listen to changes in Flows
- if (configuration.isSchedulerEnabled()) {
+ // In warm standby mode, instead of scheduler we will add orchestrator as
listener
Review Comment:
Should we separate the compiler with the orchestrator? What we really want
here is just the compiler to be used, and not any of the orchestration
capabilities.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -393,7 +393,10 @@ private void configureServices(){
registerServicesInLauncher();
// Register Scheduler to listen to changes in Flows
- if (configuration.isSchedulerEnabled()) {
+ // In warm standby mode, instead of scheduler we will add orchestrator as
listener
Review Comment:
So I think the only uncertainty here would be if a user has a scheduled
flow, should we store their changes permanently if their flow cannot be run
immediately due to quota?
Currently, the behavior is that **we reject all changes if the flow is set
to run immediately and quota is overrun**
If the flow is not set to run immediately, it should pass.
In other systems like ADF, the schedule of a flow and the configurations of
a flow are separate. So it can be argued that we accept changes, but refuse to
schedule them if their quota is overrun.
Issue Time Tracking
-------------------
Worklog Id: (was: 805045)
Time Spent: 2h (was: 1h 50m)
> Decouple compiler from scheduler in warm standby mode
> ------------------------------------------------------
>
> Key: GOBBLIN-1689
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1689
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Zihan Li
> Priority: Major
> Time Spent: 2h
> Remaining Estimate: 0h
>
> Now, when add/update flow spec, we rely on scheduler to compile the flow and
> return status, but in warm standby mode, where every host can accept and
> process request but not running the flow, we should be able to compile flow
> without scheduler.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)