[ 
https://issues.apache.org/jira/browse/GOBBLIN-1910?focusedWorklogId=907420&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-907420
 ]

ASF GitHub Bot logged work on GOBBLIN-1910:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Feb/24 20:25
            Start Date: 28/Feb/24 20:25
    Worklog Time Spent: 10m 
      Work Description: umustafi commented on code in PR #3858:
URL: https://github.com/apache/gobblin/pull/3858#discussion_r1506558550


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+
+
+/**
+ * An interface to provide abstractions for managing operations on Dag.
+ * It accepts FlowSpec not a DagAction, so if a flow config is updated between 
retries

Review Comment:
   looks like a thought was missed here. Also it says it accepts FlowSpec, not 
a dagAction but I see dagAction below. 
   
   If FlowSpec is taken then can we guarantee we always have the latest 
FlowSpec provided by the caller? If not, then we should accept dagAction and 
then retrieve flowSpec from the store. Although note that we may need to add 
the flowExecutionId to the flowSpec if it doesn't already exist. 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java:
##########
@@ -54,21 +59,50 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
   // dagToJobs holds a map of dagId to running jobs of that dag
   private final Map<DagManager.DagId, 
LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new 
ConcurrentHashMap<>();
   private final Map<DagManager.DagId, Long> dagToDeadline = new 
ConcurrentHashMap<>();
-  private final DagStateStore dagStateStore;
-  private final DagStateStore failedDagStateStore;
+  private DagStateStore dagStateStore;
+  private DagStateStore failedDagStateStore;
+  private boolean dagStoresInitialized = false;
   private final UserQuotaManager quotaManager;
+  Map<URI, TopologySpec> topologySpecMap;
+  private final Config config;
   private static final String FAILED_DAG_STATESTORE_PREFIX = 
"failedDagStateStore";
   public static final String DAG_STATESTORE_CLASS_KEY = 
DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
+  FlowCatalog flowCatalog;
 
-  public MostlyMySqlDagManagementStateStore(Config config, Map<URI, 
TopologySpec> topologySpecMap) throws IOException {
-    this.dagStateStore = createDagStateStore(config, topologySpecMap);
-    this.failedDagStateStore = createDagStateStore(
-        ConfigUtils.getConfigOrEmpty(config, 
FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
+  @Inject
+  public MostlyMySqlDagManagementStateStore(Config config, FlowCatalog 
flowCatalog) throws IOException {
     this.quotaManager = new MysqlUserQuotaManager(config);
-    this.quotaManager.init(getDags());
+    this.config = config;
+    this.flowCatalog = flowCatalog;
+   }
+
+  @Override
+  // It should be called after topology spec map is set
+  public synchronized void start() throws IOException {

Review Comment:
   where is start called?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagProcEngineEnabledDagActionStoreChangeMonitor.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.orchestration.DagManagement;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+
+
+/**
+ * A DagActionStore change monitor that uses {@link DagActionStoreChangeEvent} 
schema to process Kafka messages received
+ * from its corresponding consumer client. This monitor responds to requests 
to resume or delete a flow and acts as a
+ * connector between the API and execution layers of GaaS.
+ */
+@Slf4j
+public class DagProcEngineEnabledDagActionStoreChangeMonitor extends 
DagActionStoreChangeMonitor {
+  private final DagManagement dagManagement;
+
+  // Note that the topic is an empty string (rather than null to avoid NPE) 
because this monitor relies on the consumer
+  // client itself to determine all Kafka related information dynamically 
rather than through the config.
+  public DagProcEngineEnabledDagActionStoreChangeMonitor(String topic, Config 
config, DagManager dagManager, int numThreads,
+      FlowCatalog flowCatalog, Orchestrator orchestrator, DagActionStore 
dagActionStore,
+      boolean isMultiActiveSchedulerEnabled, DagManagement dagManagement) {
+    // Differentiate group id for each host
+    super(topic, config, dagManager, numThreads, flowCatalog, orchestrator, 
dagActionStore, isMultiActiveSchedulerEnabled);
+    this.dagManagement = dagManagement;
+  }
+
+  @Override

Review Comment:
   what is changed in the overridden method? 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java:
##########
@@ -54,18 +56,30 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
   // dagToJobs holds a map of dagId to running jobs of that dag
   private final Map<DagManager.DagId, 
LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new 
ConcurrentHashMap<>();
   private final Map<DagManager.DagId, Long> dagToDeadline = new 
ConcurrentHashMap<>();
-  private final DagStateStore dagStateStore;
-  private final DagStateStore failedDagStateStore;
+  private DagStateStore dagStateStore;
+  private DagStateStore failedDagStateStore;
+  private boolean dagStoresInitialized = false;

Review Comment:
   can you add the last part in a comment? it's helpful to document the 
decision behind not initializing the stores in constructor and needing this 
var. 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * NewDagManager has these functionalities :
+ * a) manages {@link Dag}s through {@link DagManagementStateStore}.
+ * b) accept adhoc new dag launch requests from Orchestrator.
+ * c) provides dag actions to {@link DagProcessingEngine#DagProcEngineThread}
+ */
+@Slf4j
+@Singleton
+@Data
+public class DagManagementTaskStreamImpl implements DagManagement, 
DagTaskStream {
+  public static final String DAG_MANAGER_PREFIX = 
"gobblin.service.dagManager.";
+  private static final int INITIAL_HOUSEKEEPING_THREAD_DELAY = 2;
+
+  private final Config config;
+  @Inject private FlowCatalog flowCatalog;
+  private final boolean dagProcessingEngineEnabled;
+  private Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+  @Getter private final EventSubmitter eventSubmitter;
+  @Getter private static final DagManagerMetrics dagManagerMetrics = new 
DagManagerMetrics();
+  private volatile boolean isActive = false;
+
+  @Inject(optional=true)
+  protected Optional<DagActionStore> dagActionStore;
+  @Inject
+  @Getter DagManagementStateStore dagManagementStateStore;
+  private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+
+  @Inject
+  public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore> 
dagActionStore, DagManagementStateStore dagManagementStateStore) {
+    this.config = config;
+    this.dagActionStore = dagActionStore;
+    this.dagProcessingEngineEnabled = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.DAG_PROCESSING_ENGINE_ENABLED, false);

Review Comment:
   should there be a reference to the `dagProcessingEngine` here?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * NewDagManager has these functionalities :
+ * a) manages {@link Dag}s through {@link DagManagementStateStore}.
+ * b) accept adhoc new dag launch requests from Orchestrator.
+ * c) provides dag actions to {@link DagProcessingEngine#DagProcEngineThread}
+ */
+@Slf4j
+@Singleton
+@Data
+public class DagManagementTaskStreamImpl implements DagManagement, 
DagTaskStream {
+  public static final String DAG_MANAGER_PREFIX = 
"gobblin.service.dagManager.";
+  private static final int INITIAL_HOUSEKEEPING_THREAD_DELAY = 2;
+
+  private final Config config;
+  @Inject private FlowCatalog flowCatalog;
+  private final boolean dagProcessingEngineEnabled;
+  private Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+  @Getter private final EventSubmitter eventSubmitter;
+  @Getter private static final DagManagerMetrics dagManagerMetrics = new 
DagManagerMetrics();
+  private volatile boolean isActive = false;
+
+  @Inject(optional=true)
+  protected Optional<DagActionStore> dagActionStore;
+  @Inject
+  @Getter DagManagementStateStore dagManagementStateStore;
+  private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+
+  @Inject
+  public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore> 
dagActionStore, DagManagementStateStore dagManagementStateStore) {
+    this.config = config;
+    this.dagActionStore = dagActionStore;
+    this.dagProcessingEngineEnabled = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.DAG_PROCESSING_ENGINE_ENABLED, false);
+    this.dagManagementStateStore = dagManagementStateStore;
+    MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = new EventSubmitter.Builder(metricContext, 
"org.apache.gobblin.service").build();
+  }
+
+  public void setActive(boolean active) {
+    if (this.isActive == active) {
+      log.info("DagManager already {}, skipping further actions.", (!active) ? 
"inactive" : "active");
+    }
+    this.isActive = active;
+    try {
+      if (this.isActive) {
+        log.info("Activating NewDagManager.");
+        //Initializing state store for persisting Dags.
+        this.dagManagementStateStore.start();
+        dagManagerMetrics.activate();
+      } else { //Mark the DagManager inactive.
+        log.info("Inactivating the DagManager. Shutting down all DagManager 
threads");
+        dagManagerMetrics.cleanup();
+      }
+    } catch (IOException e) {
+        log.error("Exception encountered when activating the new DagManager", 
e);

Review Comment:
   same here for ex. Create a var with the class name and insert it as param to 
all logs



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * NewDagManager has these functionalities :
+ * a) manages {@link Dag}s through {@link DagManagementStateStore}.
+ * b) accept adhoc new dag launch requests from Orchestrator.
+ * c) provides dag actions to {@link DagProcessingEngine#DagProcEngineThread}
+ */
+@Slf4j
+@Singleton
+@Data
+public class DagManagementTaskStreamImpl implements DagManagement, 
DagTaskStream {
+  public static final String DAG_MANAGER_PREFIX = 
"gobblin.service.dagManager.";
+  private static final int INITIAL_HOUSEKEEPING_THREAD_DELAY = 2;

Review Comment:
   let's group these configs with `housekeeping_thread_delay` below. Perhaps 
even move to `ConfigurationKeys` like you have the others



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.junit.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+
+
+public class DagProcessingEngineTest {
+  private MostlyMySqlDagManagementStateStore dagManagementStateStore;
+  private static final String TEST_USER = "testUser";
+  private static final String TEST_PASSWORD = "testPassword";
+  private static final String TEST_DAG_STATE_STORE = "TestDagStateStore";
+  private static final String TEST_TABLE = "quotas";
+  static ITestMetastoreDatabase testMetastoreDatabase;
+  DagProcessingEngine.DagProcEngineThread dagProcEngineThread;
+  DagManagementTaskStreamImpl dagManagementTaskStream;
+  DagProcFactory dagProcFactory;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    // Setting up mock DB
+    testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+    Config config;
+    ConfigBuilder configBuilder = ConfigBuilder.create();
+    
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 testMetastoreDatabase.getJdbcUrl())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
+    config = configBuilder.build();
+
+    // Constructing TopologySpecMap.
+    Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+    String specExecInstance = "mySpecExecutor";
+    TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+    URI specExecURI = new URI(specExecInstance);
+    topologySpecMap.put(specExecURI, topologySpec);
+    this.dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null);
+    this.dagManagementStateStore.setTopologySpecMap(topologySpecMap);
+    this.dagManagementStateStore.start();
+    this.dagManagementTaskStream =
+        new DagManagementTaskStreamImpl(config, Optional.empty(), 
this.dagManagementStateStore);
+    this.dagManagementTaskStream.setActive(true);
+    this.dagProcFactory = new DagProcFactory();
+    this.dagProcEngineThread = new DagProcessingEngine.DagProcEngineThread(
+        this.dagManagementTaskStream, this.dagProcFactory, 
this.dagManagementStateStore);
+  }
+
+  @Test

Review Comment:
   add a comment above explaining this tests simple addition of action to 
stream and removal (later if we have more validations nice to have a quick 
comment to summarize_



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * NewDagManager has these functionalities :
+ * a) manages {@link Dag}s through {@link DagManagementStateStore}.
+ * b) accept adhoc new dag launch requests from Orchestrator.
+ * c) provides dag actions to {@link DagProcessingEngine#DagProcEngineThread}
+ */
+@Slf4j
+@Singleton
+@Data
+public class DagManagementTaskStreamImpl implements DagManagement, 
DagTaskStream {
+  public static final String DAG_MANAGER_PREFIX = 
"gobblin.service.dagManager.";
+  private static final int INITIAL_HOUSEKEEPING_THREAD_DELAY = 2;
+
+  private final Config config;
+  @Inject private FlowCatalog flowCatalog;
+  private final boolean dagProcessingEngineEnabled;
+  private Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+  @Getter private final EventSubmitter eventSubmitter;
+  @Getter private static final DagManagerMetrics dagManagerMetrics = new 
DagManagerMetrics();
+  private volatile boolean isActive = false;
+
+  @Inject(optional=true)
+  protected Optional<DagActionStore> dagActionStore;
+  @Inject
+  @Getter DagManagementStateStore dagManagementStateStore;
+  private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+
+  @Inject
+  public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore> 
dagActionStore, DagManagementStateStore dagManagementStateStore) {
+    this.config = config;
+    this.dagActionStore = dagActionStore;
+    this.dagProcessingEngineEnabled = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.DAG_PROCESSING_ENGINE_ENABLED, false);
+    this.dagManagementStateStore = dagManagementStateStore;
+    MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = new EventSubmitter.Builder(metricContext, 
"org.apache.gobblin.service").build();
+  }
+
+  public void setActive(boolean active) {
+    if (this.isActive == active) {
+      log.info("DagManager already {}, skipping further actions.", (!active) ? 
"inactive" : "active");

Review Comment:
   let's keep all the logs consist if they say [DagManager](url) or 
`NewDagManager` or this class name



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * NewDagManager has these functionalities :
+ * a) manages {@link Dag}s through {@link DagManagementStateStore}.
+ * b) accept adhoc new dag launch requests from Orchestrator.
+ * c) provides dag actions to {@link DagProcessingEngine#DagProcEngineThread}

Review Comment:
   outdated java doc. Duties look like 
   
   - interact with dagManagementStore to update/retrieve dags, checkpoint
   - add dagActions to the DagTaskStream



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.io.IOException;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * Defines an individual task on a Dag.
+ * Upon successful completion of the corresponding {@link 
DagProc#process(DagManagementStateStore)},
+ * {@link DagTask#conclude()} must be called.
+ */
+
+@Alpha
+public abstract class DagTask<T> {
+  @Getter public DagActionStore.DagAction dagAction;
+  private MultiActiveLeaseArbiter.LeaseAttemptStatus leaseObtainedStatus;

Review Comment:
   this should be `LeaseObtainedStatus` type to be able to utilize the fields 
of this class



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.linkedin.r2.util.NamedThreadFactory;
+import com.typesafe.config.Config;
+
+import javax.naming.OperationNotSupportedException;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore)}
+ */
+
+@Alpha
+@Slf4j
+@Singleton
+public class DagProcessingEngine {
+  public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX = 
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine.";
+  public static final String DAG_PROCESSING_ENGINE_ENABLED = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "enabled";
+  public static final String NUM_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
+  private static final Integer DEFAULT_NUM_THREADS = 3;
+
+  @Getter private final DagTaskStream dagTaskStream;
+  @Getter DagManagementStateStore dagManagementStateStore;
+
+  @Inject
+  public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, 
DagProcFactory dagProcFactory,
+      DagManagementStateStore dagManagementStateStore, 
Optional<MultiActiveLeaseArbiter> multiActiveLeaseArbiter) {
+    Integer numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
+    ScheduledExecutorService scheduledExecutorPool =
+        Executors.newScheduledThreadPool(numThreads, new 
NamedThreadFactory("DagProcessingEngineThread"));
+    this.dagTaskStream = dagTaskStream;
+    this.dagManagementStateStore = dagManagementStateStore;
+
+    for (int i=0; i < numThreads; i++) {
+      DagProcEngineThread dagProcEngineThread = new 
DagProcEngineThread(dagTaskStream, dagProcFactory, dagManagementStateStore);
+      scheduledExecutorPool.submit(dagProcEngineThread);
+    }
+  }
+
+  public void addDagNodeToRetry(Dag.DagNode<JobExecutionPlan> dagNode) throws 
OperationNotSupportedException {
+    // todo - how to add dag action for a dag node? should we create a dag 
node action? right now dag action is synonymous to flow action
+    // this.dagTaskStream.addDagTask(new RetryDagTask(dagNode));
+    throw new OperationNotSupportedException();
+  }
+
+  @AllArgsConstructor
+  private static class DagProcEngineThread implements Runnable {
+
+    private DagTaskStream dagTaskStream;
+    private DagProcFactory dagProcFactory;
+    private DagManagementStateStore dagManagementStateStore;
+
+    @Override
+    public void run() {
+      while (true) {
+        DagTask dagTask = dagTaskStream.next(); // blocking call
+        if (dagTask == null) {
+          continue;
+        }
+        DagProc dagProc = dagTask.host(dagProcFactory);
+        try {
+          // todo - add retries
+          dagProc.process(dagManagementStateStore);
+        } catch (Throwable t) {
+          log.error("DagProcEngineThread encountered error ", t);
+        }
+        // todo mark lease success and releases it
+        //dagTaskStream.complete(dagTask);
+      }

Review Comment:
   This seems a bit excessive to set reminders for even the host that obtained 
the lease until we know the attempt failed to conclude the lease. I'm worried 
about growing the reminder queue too quickly and being slow to process actions 
in the queue. 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state, 
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the 
executor.
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, T> {
+  protected final EventSubmitter eventSubmitter;
+
+  protected final MetricContext metricContext;
+
+  public DagProc() {
+    this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = new EventSubmitter.Builder(metricContext, 
"org.apache.gobblin.service").build();
+  }

Review Comment:
   I agree same MC and eventSubmitter makes sense for each DagProc. Then we 
want another constructor here that takes in MC and eventSubmitter as args





Issue Time Tracking
-------------------

    Worklog Id:     (was: 907420)
    Time Spent: 26h 20m  (was: 26h 10m)

> Refactor code to move current in-memory references to new design for REST 
> calls: Launch, Resume and Kill
> --------------------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1910
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1910
>             Project: Apache Gobblin
>          Issue Type: New Feature
>            Reporter: Meeth Gala
>            Priority: Major
>          Time Spent: 26h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Reply via email to