phet commented on code in PR #4026:
URL: https://github.com/apache/gobblin/pull/4026#discussion_r1717966511


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java:
##########
@@ -186,7 +185,8 @@ protected void processMessage(DecodeableKafkaRecord 
message) {
         return;
       }
     } catch (Exception e) {
-      log.warn("Ran into unexpected error processing SpecStore changes. 
Reexamine scheduler. Error: {}", e);
+      log.warn("Ran into unexpected error processing specUri {} changes. 
Reexamine scheduler. "

Review Comment:
   makes sense...
   
   nit: no need for "Ran into "



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -171,6 +172,8 @@ public static void 
cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
       // into inconsistent state.
       if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
         sendCancellationEvent(dagNodeToCancel, props);
+        log.info("Cancelled dag node {}, spec_producer_future {}", 
dagNodeToCancel.getValue().getId(),
+            props.get(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE));
       } else {
         log.warn("No Job future when canceling DAG node - {}", 
dagNodeToCancel.getValue().getJobSpec().getUri());

Review Comment:
   NBD, but on success we log `.getId()` and on failure 
`.getJobSpec().getUri()` - could it be consistent?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -26,18 +26,26 @@
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 
 
 /**
- * An implementation for {@link DagProc} that launches a new job.
+ * An implementation for {@link DagProc} that launches the start job of a flow.
+ * If there are multiple start jobs for the flow, {@link ReevaluateDagProc} is 
created for each of them and that
+ * launches those start jobs.
+ * In a life cycle of a flow, {@link LaunchDagProc} runs only one time, unless 
it fails and is
+ * retried by the retry-reminders. {@link ReevaluateDagProc} runs multiple 
times depending upon the number of jobs and
+ * number of parallel jobs.

Review Comment:
   suggest for final sentence:
   > Post-launch, a subsequent {@link ReevaluateDagProc} runs after each job of 
the DAG completes.  This may itself launch further jobs or conclude execution 
of the overall DAG/flow.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -199,6 +202,21 @@ private static void 
sendCancellationEvent(Dag.DagNode<JobExecutionPlan> dagNodeT
     jobExecutionPlan.setExecutionStatus(CANCELLED);
   }
 
+  public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter, 
Dag<JobExecutionPlan> dag, String flowEvent) {
+    if (!dag.isEmpty()) {
+      // Every dag node will contain the same flow metadata
+      Config config = DagManagerUtils.getDagJobConfig(dag);
+      Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(config);
+      dag.setFlowEvent(flowEvent);

Review Comment:
   do we set this, but don't actually persist it in the store?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -199,6 +202,21 @@ private static void 
sendCancellationEvent(Dag.DagNode<JobExecutionPlan> dagNodeT
     jobExecutionPlan.setExecutionStatus(CANCELLED);
   }
 
+  public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter, 
Dag<JobExecutionPlan> dag, String flowEvent) {

Review Comment:
   needs javadoc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to