[
https://issues.apache.org/jira/browse/GOBBLIN-1970?focusedWorklogId=894173&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-894173
]
ASF GitHub Bot logged work on GOBBLIN-1970:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 05/Dec/23 22:29
Start Date: 05/Dec/23 22:29
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3841:
URL: https://github.com/apache/gobblin/pull/3841#discussion_r1416344671
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -192,9 +230,16 @@ protected void processMessage(DecodeableKafkaRecord
message) {
dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
}
- protected void submitFlowToDagManagerHelper(DagActionStore.DagAction
dagAction) {
+ /**
+ * Used to forward a launch flow action event from the DagActionStore.
Because it may be a completely new DAG not
+ * contained in the dagStore, we compile the flow to generate the dag before
calling addDag(), handling any errors
+ * that may result in the process.
+ */
+ protected void submitFlowToDagManagerHelper(DagActionStore.DagAction
dagAction, boolean isLoadedOnStartup) {
Review Comment:
given the only thing `isLoadedOnStartup` determines is which meter to
increment, I recommend to codify that:
```
@Data
private class SubmissionMetricProxy {
private final ContextAwareMeter successMeter;
private final ContextAwareMeter failureMeter;
public static SubmissionMetricProxy ON_STARTUP = new SubmissionMetricProxy(
DagActionStoreChangeMonitor.this.successfulLaunchSubmissionsOnStartup,
DagActionStoreChangeMonitor.this.failedLaunchSubmissionsOnStartup);
public static SubmissionMetricProxy POST_STARTUP = ...;
public void markSuccess() {
getSuccessMeter.mark();
}
public void markFailure() {
getFailureMeter.mark();
}
}
```
then create an overloaded version:
```
protected void submitFlow(DagAction dagAction, boolean isStartup) {
submitFlow(dagAction, isStartup ? SubmissionMetricProxy.ON_STARTUP :
SubmissionMetricProxy.POST_STARTUP);
}
protected void submitFlow(DagAction dagAction, SubmissionMetricProxy smp) {
...
}
```
Issue Time Tracking
-------------------
Worklog Id: (was: 894173)
Time Spent: 1h 40m (was: 1.5h)
> Consolidate Processing Dag Actions to Single Code Path
> ------------------------------------------------------
>
> Key: GOBBLIN-1970
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1970
> Project: Apache Gobblin
> Issue Type: Improvement
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 1h 40m
> Remaining Estimate: 0h
>
> We have similar code in the DagActionStoreChangeMonitor and DagManager to
> process dag actions from the DagActionStore. There have been small
> discrepancies between the code in each area leading to unexpected bugs in how
> the actions are processed, so to fix forward and make this easier to maintain
> with code in one place we consolidate all logic relating to the dag action
> processing to the DagActionStoreChangeMonitor. The processing in the change
> monitor is working correctly, while the handling of launch events in the
> DagManager is failing compilation most likely due to an error loading the job
> templates when initializing the compiler but we're not able to identify the
> exact issue.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)