phet commented on code in PR #3841:
URL: https://github.com/apache/gobblin/pull/3841#discussion_r1416344671
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -192,9 +230,16 @@ protected void processMessage(DecodeableKafkaRecord
message) {
dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
}
- protected void submitFlowToDagManagerHelper(DagActionStore.DagAction
dagAction) {
+ /**
+ * Used to forward a launch flow action event from the DagActionStore.
Because it may be a completely new DAG not
+ * contained in the dagStore, we compile the flow to generate the dag before
calling addDag(), handling any errors
+ * that may result in the process.
+ */
+ protected void submitFlowToDagManagerHelper(DagActionStore.DagAction
dagAction, boolean isLoadedOnStartup) {
Review Comment:
given the only thing `isLoadedOnStartup` determines is which meter to
increment, I recommend to codify that:
```
@Data
private class SubmissionMetricProxy {
private final ContextAwareMeter successMeter;
private final ContextAwareMeter failureMeter;
public static SubmissionMetricProxy ON_STARTUP = new SubmissionMetricProxy(
DagActionStoreChangeMonitor.this.failedLaunchSubmissionsOnStartup,
DagActionStoreChangeMonitor.this.successfulLaunchSubmissionsOnStartup);
public static SubmissionMetricProxy POST_STARTUP = ...;
public void markSuccess() {
getSuccessMeter.mark();
}
public void markFailure() {
getFailureMeter.mark();
}
}
```
then create an overloaded version:
```
protected void submitFlow(DagAction dagAction, boolean isStartup) {
submitFlow(dagAction, isStartup ? SubmissionMetricProxy.ON_STARTUP :
SubmissionMetricProxy.POST_STARTUP);
}
protected void submitFlow(DagAction dagAction, SubmissionMetricProxy smp) {
...
}
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -85,19 +89,24 @@ public String load(String key) throws Exception {
@Getter
@VisibleForTesting
protected FlowCatalog flowCatalog;
+ protected DagActionStore dagActionStore;
// Note that the topic is an empty string (rather than null to avoid NPE)
because this monitor relies on the consumer
// client itself to determine all Kafka related information dynamically
rather than through the config.
public DagActionStoreChangeMonitor(String topic, Config config, DagManager
dagManager, int numThreads,
- FlowCatalog flowCatalog, Orchestrator orchestrator, boolean
isMultiActiveSchedulerEnabled) {
+ FlowCatalog flowCatalog, Orchestrator orchestrator, DagActionStore
dagActionStore,
+ boolean isMultiActiveSchedulerEnabled) {
// Differentiate group id for each host
super(topic, config.withValue(GROUP_ID_KEY,
ConfigValueFactory.fromAnyRef(DAG_ACTION_CHANGE_MONITOR_PREFIX +
UUID.randomUUID().toString())),
numThreads);
this.dagManager = dagManager;
this.flowCatalog = flowCatalog;
this.orchestrator = orchestrator;
+ this.dagActionStore = dagActionStore;
this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
+
+ initializeMonitor();
Review Comment:
it's often not best-practice to put extensive processing within a
constructor. is there a reason to do that here? e.g. are you trying to
prevent calls to `processMessage` until all of this has completed?
much more common would be to define an "initialize" method that is supposed
to be called on the new instance immediately following construction.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -107,6 +116,35 @@ protected void assignTopicPartitions() {
return;
}
+ /**
+ * Load all actions from the DagActionStore to process any missed actions
during service startup
+ */
+ private void initializeMonitor() {
+ Collection<DagActionStore.DagAction> dagActions = null;
+ try {
+ dagActions = dagActionStore.getDagActions();
+ } catch (IOException e) {
+ throw new RuntimeException(String.format("Unable to retrieve dagActions
from the dagActionStore while "
+ + "initializing the %s",
DagActionStoreChangeMonitor.class.getCanonicalName()), e);
+ }
+ // TODO: make this multi-threaded to add parallelism
+ for (DagActionStore.DagAction action : dagActions) {
+ switch (action.getFlowActionType()) {
+ case KILL:
+ dagManager.handleKillFlowRequest(action.getFlowGroup(),
action.getFlowName(), Long.parseLong(action.getFlowExecutionId()));
+ break;
+ case RESUME:
+ dagManager.handleResumeFlowRequest(action.getFlowGroup(),
action.getFlowName(), Long.parseLong(action.getFlowExecutionId()));
+ break;
Review Comment:
feels like duplication... could this code and `processMessage` each call on
a method in common to handle this action-specific dispatch?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]