Will-Lo commented on code in PR #3841:
URL: https://github.com/apache/gobblin/pull/3841#discussion_r1414702983


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -85,19 +88,24 @@ public String load(String key) throws Exception {
   @Getter
   @VisibleForTesting
   protected FlowCatalog flowCatalog;
+  protected DagActionStore dagActionStore;
 
   // Note that the topic is an empty string (rather than null to avoid NPE) 
because this monitor relies on the consumer
   // client itself to determine all Kafka related information dynamically 
rather than through the config.
   public DagActionStoreChangeMonitor(String topic, Config config, DagManager 
dagManager, int numThreads,
-      FlowCatalog flowCatalog, Orchestrator orchestrator, boolean 
isMultiActiveSchedulerEnabled) {
+      FlowCatalog flowCatalog, Orchestrator orchestrator, DagActionStore 
dagActionStore,
+      boolean isMultiActiveSchedulerEnabled) {
     // Differentiate group id for each host
     super(topic, config.withValue(GROUP_ID_KEY,
         ConfigValueFactory.fromAnyRef(DAG_ACTION_CHANGE_MONITOR_PREFIX + 
UUID.randomUUID().toString())),
         numThreads);
     this.dagManager = dagManager;
     this.flowCatalog = flowCatalog;
     this.orchestrator = orchestrator;
+    this.dagActionStore = dagActionStore;
     this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
+
+    initializeMonitor();

Review Comment:
   What happens when the non-leaders run this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to