phet commented on code in PR #3837:
URL: https://github.com/apache/gobblin/pull/3837#discussion_r1408442638
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -335,22 +343,25 @@ public void orchestrate(Spec spec, Properties jobProps,
long triggerTimestampMil
Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() -
startTime, TimeUnit.NANOSECONDS);
}
- public void submitFlowToDagManager(FlowSpec flowSpec, Optional<String>
optionalFlowExecutionId) throws IOException, InterruptedException {
+ public void submitFlowToDagManager(FlowSpec flowSpec,
DagActionStore.DagAction flowAction) throws IOException, InterruptedException {
Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
-
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec,
optionalFlowExecutionId);
+
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec,
+ Optional.of(flowAction.getFlowExecutionId()));
if (optionalJobExecutionPlanDag.isPresent()) {
- submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
+ submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get(),
flowAction);
} else {
_log.warn("Flow: {} submitted to dagManager failed to compile and
produce a job execution plan dag", flowSpec);
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
}
}
- public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan>
jobExecutionPlanDag)
+ public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan>
jobExecutionPlanDag,
+ DagActionStore.DagAction launchAction)
throws IOException {
try {
- //Send the dag to the DagManager.
+ // Send the dag to the DagManager and delete the action after persisting
it to avoid redundant execution on start up
this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
+ this.dagActionStore.get().deleteDagAction(launchAction);
Review Comment:
what if the DM hasn't successfully handled it (e.g. the service fails after
line 364, but before the DM actions complete)? would the action/event be lost?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -195,15 +194,15 @@ protected void processMessage(DecodeableKafkaRecord
message) {
dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
}
- protected void submitFlowToDagManagerHelper(String flowGroup, String
flowName, String flowExecutionId) {
+ protected void submitFlowToDagManagerHelper(DagActionStore.DagAction
dagAction) {
Review Comment:
excellent--much clearer method signature!
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java:
##########
@@ -47,6 +47,13 @@ static Map<String, String> getFlowMetadata(Config
flowConfig) {
return metadata;
}
+ /**
+ * Retrieves a flowExecutionId from flowMetadata map and returns dummy value
if one is not set
+ */
+ public static String getFlowExecutionIdFromFlowMetadata(Map<String, String>
flowMetadata) {
+ return
flowMetadata.getOrDefault(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"<<no flowExecutionId>>");
+ }
Review Comment:
I really like the abstraction! as far as where this should live... don't we
already have another utils class with static methods for extracting the flow
group and flow name?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -292,7 +296,11 @@ public void orchestrate(Spec spec, Properties jobProps,
long triggerTimestampMil
// Depending on if DagManager is present, handle execution
if (this.dagManager.isPresent()) {
- submitFlowToDagManager((FlowSpec) spec, jobExecutionPlanDag);
+ DagActionStore.DagAction launchAction =
+ new DagActionStore.DagAction(flowGroup, flowName,
+
TimingEventUtils.getFlowExecutionIdFromFlowMetadata(flowMetadata),
+ DagActionStore.FlowActionType.LAUNCH);
Review Comment:
could we raise up the `DagAction` initialization on line 274 above the `if`
block, so it can be shared by the `else`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]