[ 
https://issues.apache.org/jira/browse/GOBBLIN-1970?focusedWorklogId=894174&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-894174
 ]

ASF GitHub Bot logged work on GOBBLIN-1970:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Dec/23 22:29
            Start Date: 05/Dec/23 22:29
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3841:
URL: https://github.com/apache/gobblin/pull/3841#discussion_r1416344671


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -192,9 +230,16 @@ protected void processMessage(DecodeableKafkaRecord 
message) {
     dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
   }
 
-  protected void submitFlowToDagManagerHelper(DagActionStore.DagAction 
dagAction) {
+  /**
+   * Used to forward a launch flow action event from the DagActionStore. 
Because it may be a completely new DAG not
+   * contained in the dagStore, we compile the flow to generate the dag before 
calling addDag(), handling any errors
+   * that may result in the process.
+   */
+  protected void submitFlowToDagManagerHelper(DagActionStore.DagAction 
dagAction, boolean isLoadedOnStartup) {

Review Comment:
   given the only thing `isLoadedOnStartup` determines is which meter to 
increment, I recommend to codify that:
   ```
   @Data
   private class SubmissionMetricProxy {
     private final ContextAwareMeter successMeter;
     private final ContextAwareMeter failureMeter;
   
     public static SubmissionMetricProxy ON_STARTUP = new SubmissionMetricProxy(
   DagActionStoreChangeMonitor.this.successfulLaunchSubmissionsOnStartup,
   DagActionStoreChangeMonitor.this.failedLaunchSubmissionsOnStartup);
   
     public static SubmissionMetricProxy POST_STARTUP = ...;
   
     public void markSuccess() {
       getSuccessMeter.mark();
     }
     public void markFailure() {
       getFailureMeter.mark();
     }
   }
   ```
   
   then create an overloaded version:
   ```
   protected void submitFlow(DagAction dagAction, boolean isStartup) {
     submitFlow(dagAction, isStartup ? SubmissionMetricProxy.ON_STARTUP : 
SubmissionMetricProxy.POST_STARTUP);
   }
   
   protected void submitFlow(DagAction dagAction, SubmissionMetricProxy smp) {
     ...
   }
   ```





Issue Time Tracking
-------------------

    Worklog Id:     (was: 894174)
    Time Spent: 1h 50m  (was: 1h 40m)

> Consolidate Processing Dag Actions to Single Code Path
> ------------------------------------------------------
>
>                 Key: GOBBLIN-1970
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1970
>             Project: Apache Gobblin
>          Issue Type: Improvement
>          Components: gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> We have similar code in the DagActionStoreChangeMonitor and DagManager to 
> process dag actions from the DagActionStore. There have been small 
> discrepancies between the code in each area leading to unexpected bugs in how 
> the actions are processed, so to fix forward and make this easier to maintain 
> with code in one place we consolidate all logic relating to the dag action 
> processing to the DagActionStoreChangeMonitor. The processing in the change 
> monitor is working correctly, while the handling of launch events in the 
> DagManager is failing compilation most likely due to an error loading the job 
> templates when initializing the compiler but we're not able to identify the 
> exact issue. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to