phet commented on code in PR #4026:
URL: https://github.com/apache/gobblin/pull/4026#discussion_r1717966511
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java:
##########
@@ -186,7 +185,8 @@ protected void processMessage(DecodeableKafkaRecord
message) {
return;
}
} catch (Exception e) {
- log.warn("Ran into unexpected error processing SpecStore changes.
Reexamine scheduler. Error: {}", e);
+ log.warn("Ran into unexpected error processing specUri {} changes.
Reexamine scheduler. "
Review Comment:
makes sense...
nit: no need for "Ran into "
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -171,6 +172,8 @@ public static void
cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
// into inconsistent state.
if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
sendCancellationEvent(dagNodeToCancel, props);
+ log.info("Cancelled dag node {}, spec_producer_future {}",
dagNodeToCancel.getValue().getId(),
+ props.get(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE));
} else {
log.warn("No Job future when canceling DAG node - {}",
dagNodeToCancel.getValue().getJobSpec().getUri());
Review Comment:
NBD, but on success we log `.getId()` and on failure
`.getJobSpec().getUri()` - could it be consistent?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -26,18 +26,26 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
/**
- * An implementation for {@link DagProc} that launches a new job.
+ * An implementation for {@link DagProc} that launches the start job of a flow.
+ * If there are multiple start jobs for the flow, {@link ReevaluateDagProc} is
created for each of them and that
+ * launches those start jobs.
+ * In a life cycle of a flow, {@link LaunchDagProc} runs only one time, unless
it fails and is
+ * retried by the retry-reminders. {@link ReevaluateDagProc} runs multiple
times depending upon the number of jobs and
+ * number of parallel jobs.
Review Comment:
suggest for final sentence:
> Post-launch, a subsequent {@link ReevaluateDagProc} runs after each job of
the DAG completes. This may itself launch further jobs or conclude execution
of the overall DAG/flow.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -199,6 +202,21 @@ private static void
sendCancellationEvent(Dag.DagNode<JobExecutionPlan> dagNodeT
jobExecutionPlan.setExecutionStatus(CANCELLED);
}
+ public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter,
Dag<JobExecutionPlan> dag, String flowEvent) {
+ if (!dag.isEmpty()) {
+ // Every dag node will contain the same flow metadata
+ Config config = DagManagerUtils.getDagJobConfig(dag);
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(config);
+ dag.setFlowEvent(flowEvent);
Review Comment:
do we set this, but don't actually persist it in the store?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -199,6 +202,21 @@ private static void
sendCancellationEvent(Dag.DagNode<JobExecutionPlan> dagNodeT
jobExecutionPlan.setExecutionStatus(CANCELLED);
}
+ public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter,
Dag<JobExecutionPlan> dag, String flowEvent) {
Review Comment:
needs javadoc
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]