phet commented on code in PR #4026: URL: https://github.com/apache/gobblin/pull/4026#discussion_r1717966511
########## gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java: ########## @@ -186,7 +185,8 @@ protected void processMessage(DecodeableKafkaRecord message) { return; } } catch (Exception e) { - log.warn("Ran into unexpected error processing SpecStore changes. Reexamine scheduler. Error: {}", e); + log.warn("Ran into unexpected error processing specUri {} changes. Reexamine scheduler. " Review Comment: makes sense... nit: no need for "Ran into " ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java: ########## @@ -171,6 +172,8 @@ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel, // into inconsistent state. if (dagNodeToCancel.getValue().getJobFuture().isPresent()) { sendCancellationEvent(dagNodeToCancel, props); + log.info("Cancelled dag node {}, spec_producer_future {}", dagNodeToCancel.getValue().getId(), + props.get(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE)); } else { log.warn("No Job future when canceling DAG node - {}", dagNodeToCancel.getValue().getJobSpec().getUri()); Review Comment: NBD, but on success we log `.getId()` and on failure `.getJobSpec().getUri()` - could it be consistent? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java: ########## @@ -26,18 +26,26 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.event.TimingEvent; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.SpecNotFoundException; import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.DagManager; +import org.apache.gobblin.service.modules.orchestration.DagManagerUtils; import org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics; import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; import org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper; /** - * An implementation for {@link DagProc} that launches a new job. + * An implementation for {@link DagProc} that launches the start job of a flow. + * If there are multiple start jobs for the flow, {@link ReevaluateDagProc} is created for each of them and that + * launches those start jobs. + * In a life cycle of a flow, {@link LaunchDagProc} runs only one time, unless it fails and is + * retried by the retry-reminders. {@link ReevaluateDagProc} runs multiple times depending upon the number of jobs and + * number of parallel jobs. Review Comment: suggest for final sentence: > Post-launch, a subsequent {@link ReevaluateDagProc} runs after each job of the DAG completes. This may itself launch further jobs or conclude execution of the overall DAG/flow. ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java: ########## @@ -199,6 +202,21 @@ private static void sendCancellationEvent(Dag.DagNode<JobExecutionPlan> dagNodeT jobExecutionPlan.setExecutionStatus(CANCELLED); } + public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter, Dag<JobExecutionPlan> dag, String flowEvent) { + if (!dag.isEmpty()) { + // Every dag node will contain the same flow metadata + Config config = DagManagerUtils.getDagJobConfig(dag); + Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata(config); + dag.setFlowEvent(flowEvent); Review Comment: do we set this, but don't actually persist it in the store? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java: ########## @@ -199,6 +202,21 @@ private static void sendCancellationEvent(Dag.DagNode<JobExecutionPlan> dagNodeT jobExecutionPlan.setExecutionStatus(CANCELLED); } + public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter, Dag<JobExecutionPlan> dag, String flowEvent) { Review Comment: needs javadoc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org