phet commented on code in PR #3841:
URL: https://github.com/apache/gobblin/pull/3841#discussion_r1416344671


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -192,9 +230,16 @@ protected void processMessage(DecodeableKafkaRecord 
message) {
     dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
   }
 
-  protected void submitFlowToDagManagerHelper(DagActionStore.DagAction 
dagAction) {
+  /**
+   * Used to forward a launch flow action event from the DagActionStore. 
Because it may be a completely new DAG not
+   * contained in the dagStore, we compile the flow to generate the dag before 
calling addDag(), handling any errors
+   * that may result in the process.
+   */
+  protected void submitFlowToDagManagerHelper(DagActionStore.DagAction 
dagAction, boolean isLoadedOnStartup) {

Review Comment:
   given the only thing `isLoadedOnStartup` determines is which meter to 
increment, I recommend to codify that:
   ```
   @Data
   private class SubmissionMetricProxy {
     private final ContextAwareMeter successMeter;
     private final ContextAwareMeter failureMeter;
   
     public static SubmissionMetricProxy ON_STARTUP = new SubmissionMetricProxy(
   DagActionStoreChangeMonitor.this.failedLaunchSubmissionsOnStartup,
   DagActionStoreChangeMonitor.this.successfulLaunchSubmissionsOnStartup);
   
   public static SubmissionMetricProxy POST_STARTUP = ...;
   
     public void markSuccess() {
       getSuccessMeter.mark();
     }
     public void markFailure() {
       getFailureMeter.mark();
     }
   }
   ```
   
   then create an overloaded version:
   ```
   protected void submitFlow(DagAction dagAction, boolean isStartup) {
     submitFlow(dagAction, isStartup ? SubmissionMetricProxy.ON_STARTUP : 
SubmissionMetricProxy.POST_STARTUP);
   }
   
   protected void submitFlow(DagAction dagAction, SubmissionMetricProxy smp) {
     ...
   }
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -85,19 +89,24 @@ public String load(String key) throws Exception {
   @Getter
   @VisibleForTesting
   protected FlowCatalog flowCatalog;
+  protected DagActionStore dagActionStore;
 
   // Note that the topic is an empty string (rather than null to avoid NPE) 
because this monitor relies on the consumer
   // client itself to determine all Kafka related information dynamically 
rather than through the config.
   public DagActionStoreChangeMonitor(String topic, Config config, DagManager 
dagManager, int numThreads,
-      FlowCatalog flowCatalog, Orchestrator orchestrator, boolean 
isMultiActiveSchedulerEnabled) {
+      FlowCatalog flowCatalog, Orchestrator orchestrator, DagActionStore 
dagActionStore,
+      boolean isMultiActiveSchedulerEnabled) {
     // Differentiate group id for each host
     super(topic, config.withValue(GROUP_ID_KEY,
         ConfigValueFactory.fromAnyRef(DAG_ACTION_CHANGE_MONITOR_PREFIX + 
UUID.randomUUID().toString())),
         numThreads);
     this.dagManager = dagManager;
     this.flowCatalog = flowCatalog;
     this.orchestrator = orchestrator;
+    this.dagActionStore = dagActionStore;
     this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
+
+    initializeMonitor();

Review Comment:
   it's often not best-practice to put extensive processing within a 
constructor.  is there a reason to do that here?  e.g. are you trying to 
prevent calls to `processMessage` until all of this has completed?
   
   much more common would be to define an "initialize" method that is supposed 
to be called on the new instance immediately following construction.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -107,6 +116,35 @@ protected void assignTopicPartitions() {
     return;
   }
 
+  /**
+   * Load all actions from the DagActionStore to process any missed actions 
during service startup
+   */
+  private void initializeMonitor() {
+    Collection<DagActionStore.DagAction> dagActions = null;
+    try {
+      dagActions = dagActionStore.getDagActions();
+    } catch (IOException e) {
+      throw new RuntimeException(String.format("Unable to retrieve dagActions 
from the dagActionStore while "
+          + "initializing the %s", 
DagActionStoreChangeMonitor.class.getCanonicalName()), e);
+    }
+    // TODO: make this multi-threaded to add parallelism
+    for (DagActionStore.DagAction action : dagActions) {
+        switch (action.getFlowActionType()) {
+          case KILL:
+            dagManager.handleKillFlowRequest(action.getFlowGroup(), 
action.getFlowName(), Long.parseLong(action.getFlowExecutionId()));
+            break;
+          case RESUME:
+            dagManager.handleResumeFlowRequest(action.getFlowGroup(), 
action.getFlowName(), Long.parseLong(action.getFlowExecutionId()));
+            break;

Review Comment:
   feels like duplication... could this code and `processMessage` each call on 
a method in common to handle this action-specific dispatch?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to