umustafi commented on code in PR #3841:
URL: https://github.com/apache/gobblin/pull/3841#discussion_r1416401050
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -85,19 +89,24 @@ public String load(String key) throws Exception {
@Getter
@VisibleForTesting
protected FlowCatalog flowCatalog;
+ protected DagActionStore dagActionStore;
// Note that the topic is an empty string (rather than null to avoid NPE)
because this monitor relies on the consumer
// client itself to determine all Kafka related information dynamically
rather than through the config.
public DagActionStoreChangeMonitor(String topic, Config config, DagManager
dagManager, int numThreads,
- FlowCatalog flowCatalog, Orchestrator orchestrator, boolean
isMultiActiveSchedulerEnabled) {
+ FlowCatalog flowCatalog, Orchestrator orchestrator, DagActionStore
dagActionStore,
+ boolean isMultiActiveSchedulerEnabled) {
// Differentiate group id for each host
super(topic, config.withValue(GROUP_ID_KEY,
ConfigValueFactory.fromAnyRef(DAG_ACTION_CHANGE_MONITOR_PREFIX +
UUID.randomUUID().toString())),
numThreads);
this.dagManager = dagManager;
this.flowCatalog = flowCatalog;
this.orchestrator = orchestrator;
+ this.dagActionStore = dagActionStore;
this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
+
+ initializeMonitor();
Review Comment:
Good pt, I didn't intend to block on processMessage calls while the
initializing is done. Moved.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]