[ 
https://issues.apache.org/jira/browse/GOBBLIN-1910?focusedWorklogId=908988&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-908988
 ]

ASF GitHub Bot logged work on GOBBLIN-1910:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Mar/24 19:58
            Start Date: 08/Mar/24 19:58
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3858:
URL: https://github.com/apache/gobblin/pull/3858#discussion_r1518201064


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the
+ * {@link org.apache.gobblin.service.modules.flowgraph.Dag} based on the type 
of {@link DagTask}.
+ * Each {@link DagTask} returned from the {@link DagTaskStream} comes with a 
time-limited lease conferring the exclusive
+ * right to perform the work of the task.
+ * The {@link DagProcFactory} transforms each {@link DagTask} into a specific, 
concrete {@link DagProc}, which
+ * encapsulates all processing inside {@link 
DagProc#process(DagManagementStateStore)}
+ */
+
+@Alpha
+@Slf4j
+@Singleton
+public class DagProcessingEngine {
+
+  @Getter private final DagTaskStream dagTaskStream;
+  @Getter DagManagementStateStore dagManagementStateStore;
+
+  @Inject
+  public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, 
DagProcFactory dagProcFactory,
+      DagManagementStateStore dagManagementStateStore) {
+    Integer numThreads = ConfigUtils.getInt
+        (config, ServiceConfigKeys.NUM_DAG_PROC_THREADS_KEY, 
ServiceConfigKeys.DEFAULT_NUM_DAG_PROC_THREADS);
+    ScheduledExecutorService scheduledExecutorPool =
+        Executors.newScheduledThreadPool(numThreads,
+            ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("DagProcessingEngineThread")));
+    this.dagTaskStream = dagTaskStream;
+    this.dagManagementStateStore = dagManagementStateStore;
+
+    for (int i=0; i < numThreads; i++) {
+      // todo - set metrics for count of active DagProcEngineThread
+      DagProcEngineThread dagProcEngineThread = new 
DagProcEngineThread(dagTaskStream, dagProcFactory, dagManagementStateStore);
+      scheduledExecutorPool.submit(dagProcEngineThread);
+    }
+  }
+
+  @AllArgsConstructor
+  @VisibleForTesting
+  static class DagProcEngineThread implements Runnable {
+    private DagTaskStream dagTaskStream;
+    private DagProcFactory dagProcFactory;
+    private DagManagementStateStore dagManagementStateStore;
+
+    @Override
+    public void run() {
+      while (true) {
+        DagTask<DagProc> dagTask = dagTaskStream.next(); // blocking call
+        if (dagTask == null) {
+          log.warn("Received a null dag task, ignoring.");

Review Comment:
   nit: add TODO to remind us to add metric



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * DagManagementTaskStreamImpl has these functionalities :
+ * a) interact with {@link DagManagementStateStore} to update/retrieve dags, 
checkpoint
+ * b) add {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction} to 
the {@link DagTaskStream}
+ */
+@Slf4j
+@Singleton
+@Data
+public class DagManagementTaskStreamImpl implements DagManagement, 
DagTaskStream {
+  private final Config config;
+  @Getter private final EventSubmitter eventSubmitter;
+  @Getter private static final DagManagerMetrics dagManagerMetrics = new 
DagManagerMetrics();
+  private volatile boolean isActive = false;
+
+  @Inject(optional=true)
+  protected Optional<DagActionStore> dagActionStore;
+  @Inject
+  @Getter DagManagementStateStore dagManagementStateStore;
+  private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+
+  @Inject
+  public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore> 
dagActionStore, DagManagementStateStore dagManagementStateStore) {
+    this.config = config;
+    this.dagActionStore = dagActionStore;
+    this.dagManagementStateStore = dagManagementStateStore;
+    MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = new EventSubmitter.Builder(metricContext, 
"org.apache.gobblin.service").build();
+  }
+
+  public void setActive(boolean active) {
+    if (this.isActive == active) {
+      log.info("DagManagementTaskStreamImpl already {}, skipping further 
actions.", (!active) ? "inactive" : "active");
+    }
+    this.isActive = active;
+    if (this.isActive) {
+      log.info("Activating DagManagementTaskStreamImpl.");
+      dagManagerMetrics.activate();
+    } else { //Mark the DagManager inactive.
+      log.info("Inactivating the DagManagementTaskStreamImpl. Shutting down 
all DagManager threads");
+      dagManagerMetrics.cleanup();
+    }
+  }
+
+  @Override
+  public synchronized void addDagAction(DagActionStore.DagAction dagAction) 
throws IOException {
+    // TODO: Used to track missing dag issue, remove later as needed
+    log.info("Add dagAction{}", dagAction);
+    if (!isActive) {
+      log.warn("Skipping add dagAction because this instance of 
DagManagementTaskStreamImpl is not active for dag: {}",
+          dagAction);
+      return;

Review Comment:
   1. witespace needed on line 92
   2. this is really confusing to log "Add dagAction" and then log skipping.  
since they're both at the beginning and both print the `dagAction`, let's log 
once and decide between either "adding" or "skipping because inactive"



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
+
+
+/**
+ * A {@link DagTask} responsible to handle launch tasks.
+ */
+
+public class LaunchDagTask extends DagTask<LaunchDagProc> {
+  public LaunchDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+    super(dagAction, leaseObtainedStatus);
+  }
+
+
+  @Override
+  public LaunchDagProc host(DagTaskVisitor<LaunchDagProc> visitor) {

Review Comment:
   this should still be generic:
   ```
   public <T> T host(DagTaskVisitor<T> visitor);
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.orchestration.DagManagement;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+
+
+/**
+ * A DagActionStore change monitor that uses {@link DagActionStoreChangeEvent} 
schema to process Kafka messages received
+ * from its corresponding consumer client. This monitor responds to requests 
to resume or delete a flow and acts as a
+ * connector between the API and execution layers of GaaS.
+ * {@link org.apache.gobblin.service.modules.orchestration.DagManager} which 
was only needed in the `handleDagAction` method
+ * of its parent class is not needed by this class, so it passes a null value 
for DagManager to its parent in super().
+ */
+@Slf4j
+public class DagManagementDagActionStoreChangeMonitor extends 
DagActionStoreChangeMonitor {
+  private final DagManagement dagManagement;
+  protected ContextAwareMeter unexpectedLaunchEventErrors;
+
+  // Note that the topic is an empty string (rather than null to avoid NPE) 
because this monitor relies on the consumer
+  // client itself to determine all Kafka related information dynamically 
rather than through the config.
+  public DagManagementDagActionStoreChangeMonitor(Config config, int 
numThreads,
+      FlowCatalog flowCatalog, Orchestrator orchestrator, DagActionStore 
dagActionStore,
+      boolean isMultiActiveSchedulerEnabled, DagManagement dagManagement) {
+    // Differentiate group id for each host
+    super("", config, null, numThreads, flowCatalog, orchestrator, 
dagActionStore, isMultiActiveSchedulerEnabled);
+    this.dagManagement = dagManagement;
+  }
+
+  /**
+   * This implementation passes on the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction} to the
+   * {@link DagManagement} instead of finding a {@link 
org.apache.gobblin.runtime.api.FlowSpec} passing the spec to {@link 
Orchestrator}.

Review Comment:
   nit: missing "and" before "passing the..."



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.google.inject.Singleton;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+
+
+/**
+ * {@link DagTaskVisitor} for transforming a specific {@link DagTask} derived 
class to its companion {@link DagProc} derived class.
+ * Each {@link DagTask} needs it own {@link DagProcFactory#meet} method 
overload to create {@link DagProc} that is
+ * supposed to process that {@link DagTask}.
+ */
+
+@Alpha
+@Singleton
+public class DagProcFactory implements DagTaskVisitor {

Review Comment:
   shouldn't this:
   ```
   implements DagTaskVisitor<DagProc>
   ```
   ?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -36,6 +40,18 @@
  */
 @Alpha
 public interface DagManagementStateStore {
+  /**
+   * Returns a {@link FlowSpec} for the given URI.
+   * @throws SpecNotFoundException if the spec is not found
+   */
+  FlowSpec getFlowSpec(URI uri) throws SpecNotFoundException;
+
+  /**
+   * Removes a {@link FlowSpec} with the given URI and pass the deletion to 
listeners if `triggerListener` is true
+   * No-op if the flow spec was not present in the store.
+   */
+  void remove(URI uri, Properties headers, boolean triggerListener);

Review Comment:
   I like the name `getFlowSpec`. should this be `removeFlowSpec` to parallel 
that?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.io.IOException;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * Defines an individual task on a Dag.
+ * Upon successful completion of the corresponding {@link 
DagProc#process(DagManagementStateStore)},
+ * {@link DagTask#conclude()} must be called.
+ */
+
+@Alpha
+public abstract class DagTask<T> {
+  @Getter public final DagActionStore.DagAction dagAction;
+  private final MultiActiveLeaseArbiter.LeaseObtainedStatus 
leaseObtainedStatus;
+  @Getter protected final DagManager.DagId dagId;
+
+  public DagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+    this.dagAction = dagAction;
+    this.leaseObtainedStatus = leaseObtainedStatus;
+    this.dagId = DagManagerUtils.generateDagId(dagAction.getFlowGroup(), 
dagAction.getFlowName(), dagAction.getFlowExecutionId());
+  }
+
+  public abstract T host(DagTaskVisitor<T> visitor);

Review Comment:
   the generic param doesn't belong on the class, but on this method, so:
   ```
   public abstract class DagTask {
     //...
     public abstract <T> T host(DagTaskVisitor<T> visitor);
     //...
   }
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * Responsible for performing the actual work for a given {@link DagTask} by 
first initializing its state, performing
+ * actions based on the type of {@link DagTask} and finally submitting an 
event to the executor.
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, T> {
+  protected static final MetricContext metricContext = 
Instrumented.getMetricContext(new State(), DagProc.class);
+  protected static final EventSubmitter eventSubmitter = new 
EventSubmitter.Builder(
+      metricContext, "org.apache.gobblin.service").build();
+
+  public final void process(DagManagementStateStore dagManagementStateStore) 
throws IOException {
+    S state = initialize(dagManagementStateStore);   // todo - retry
+    T result = act(dagManagementStateStore, state);   // todo - retry
+    commit(dagManagementStateStore, result);   // todo - retry
+  }
+
+  protected abstract S initialize(DagManagementStateStore 
dagManagementStateStore) throws IOException;
+
+  protected abstract T act(DagManagementStateStore dagManagementStateStore, S 
state) throws IOException;
+
+  protected abstract void sendNotification(T result, EventSubmitter 
eventSubmitter) throws IOException;

Review Comment:
   glad to see this method, please add an invocation to `process`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * DagManagementTaskStreamImpl has these functionalities :
+ * a) interact with {@link DagManagementStateStore} to update/retrieve dags, 
checkpoint
+ * b) add {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction} to 
the {@link DagTaskStream}
+ */
+@Slf4j
+@Singleton
+@Data
+public class DagManagementTaskStreamImpl implements DagManagement, 
DagTaskStream {
+  private final Config config;
+  @Getter private final EventSubmitter eventSubmitter;
+  @Getter private static final DagManagerMetrics dagManagerMetrics = new 
DagManagerMetrics();
+  private volatile boolean isActive = false;
+
+  @Inject(optional=true)
+  protected Optional<DagActionStore> dagActionStore;
+  @Inject
+  @Getter DagManagementStateStore dagManagementStateStore;

Review Comment:
   I saw you mention the DMSS in the javadoc, but I don't actually see where 
it's being used below.  can we remove this member?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * DagManagementTaskStreamImpl has these functionalities :
+ * a) interact with {@link DagManagementStateStore} to update/retrieve dags, 
checkpoint
+ * b) add {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction} to 
the {@link DagTaskStream}
+ */
+@Slf4j
+@Singleton
+@Data
+public class DagManagementTaskStreamImpl implements DagManagement, 
DagTaskStream {
+  private final Config config;
+  @Getter private final EventSubmitter eventSubmitter;
+  @Getter private static final DagManagerMetrics dagManagerMetrics = new 
DagManagerMetrics();
+  private volatile boolean isActive = false;
+
+  @Inject(optional=true)
+  protected Optional<DagActionStore> dagActionStore;
+  @Inject
+  @Getter DagManagementStateStore dagManagementStateStore;
+  private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+
+  @Inject
+  public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore> 
dagActionStore, DagManagementStateStore dagManagementStateStore) {
+    this.config = config;
+    this.dagActionStore = dagActionStore;
+    this.dagManagementStateStore = dagManagementStateStore;
+    MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = new EventSubmitter.Builder(metricContext, 
"org.apache.gobblin.service").build();
+  }
+
+  public void setActive(boolean active) {

Review Comment:
   I wonder: when we're running in full multi-active mode, won't this always be 
active? 
   
   or even when it's always active is there a need to turn on metrics reporting 
only once we've initialized during startup?
   
   either way, I suggest clarifying in method javadoc, e.g. that this is either 
interim functionality we'll remove once we fully arrive in MA or what the 
reason is to always retain this impl



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.orchestration.DagManagement;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+
+
+/**
+ * A DagActionStore change monitor that uses {@link DagActionStoreChangeEvent} 
schema to process Kafka messages received
+ * from its corresponding consumer client. This monitor responds to requests 
to resume or delete a flow and acts as a
+ * connector between the API and execution layers of GaaS.
+ * {@link org.apache.gobblin.service.modules.orchestration.DagManager} which 
was only needed in the `handleDagAction` method
+ * of its parent class is not needed by this class, so it passes a null value 
for DagManager to its parent in super().
+ */
+@Slf4j
+public class DagManagementDagActionStoreChangeMonitor extends 
DagActionStoreChangeMonitor {
+  private final DagManagement dagManagement;
+  protected ContextAwareMeter unexpectedLaunchEventErrors;
+
+  // Note that the topic is an empty string (rather than null to avoid NPE) 
because this monitor relies on the consumer
+  // client itself to determine all Kafka related information dynamically 
rather than through the config.
+  public DagManagementDagActionStoreChangeMonitor(Config config, int 
numThreads,
+      FlowCatalog flowCatalog, Orchestrator orchestrator, DagActionStore 
dagActionStore,
+      boolean isMultiActiveSchedulerEnabled, DagManagement dagManagement) {
+    // Differentiate group id for each host
+    super("", config, null, numThreads, flowCatalog, orchestrator, 
dagActionStore, isMultiActiveSchedulerEnabled);
+    this.dagManagement = dagManagement;
+  }
+
+  /**
+   * This implementation passes on the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction} to the
+   * {@link DagManagement} instead of finding a {@link 
org.apache.gobblin.runtime.api.FlowSpec} passing the spec to {@link 
Orchestrator}.
+   */
+  @Override
+  protected void handleDagAction(DagActionStore.DagAction dagAction, boolean 
isStartup) {
+    log.info("(" + (isStartup ? "on-startup" : "post-startup") + ") DagAction 
change ({}) received for flow: {}",
+        dagAction.getFlowActionType(), dagAction);
+    LaunchSubmissionMetricProxy launchSubmissionMetricProxy = isStartup ? 
ON_STARTUP : POST_STARTUP;
+    try {
+      // todo - add actions for other other type of dag actions
+      if 
(dagAction.getFlowActionType().equals(DagActionStore.FlowActionType.LAUNCH)) {
+        // If multi-active scheduler is NOT turned on we should not receive 
these type of events
+        if (!this.isMultiActiveSchedulerEnabled) {
+          this.unexpectedLaunchEventErrors.mark();
+          throw new RuntimeException(String.format("Received LAUNCH dagAction 
while not in multi-active scheduler "
+              + "mode for flowAction: %s", dagAction));
+        }
+        dagManagement.addDagAction(dagAction);
+      } else {
+        log.warn("Received unsupported dagAction {}. Expected to be a KILL, 
RESUME, or LAUNCH", dagAction.getFlowActionType());
+        this.unexpectedErrors.mark();
+      }
+    } catch (IOException e) {
+      log.warn("Failed to add Job Execution Plan for flowId {} due to 
exception {}", dagAction.getFlowId(), e.getMessage());

Review Comment:
   more accurate to say, "failed to addDagAction..."



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.orchestration.DagManagement;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+
+
+/**
+ * A DagActionStore change monitor that uses {@link DagActionStoreChangeEvent} 
schema to process Kafka messages received
+ * from its corresponding consumer client. This monitor responds to requests 
to resume or delete a flow and acts as a
+ * connector between the API and execution layers of GaaS.
+ * {@link org.apache.gobblin.service.modules.orchestration.DagManager} which 
was only needed in the `handleDagAction` method
+ * of its parent class is not needed by this class, so it passes a null value 
for DagManager to its parent in super().

Review Comment:
   I really appreciate you documenting this, but given it's an impl detail, 
let's leave it off the class javadoc and instead put it as javadoc on the ctor 
where this takes place.  the other comment there about the topic name being 
`""` could be changed to javadoc and included there too



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * Responsible for performing the actual work for a given {@link DagTask} by 
first initializing its state, performing
+ * actions based on the type of {@link DagTask} and finally submitting an 
event to the executor.
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, T> {
+  protected static final MetricContext metricContext = 
Instrumented.getMetricContext(new State(), DagProc.class);
+  protected static final EventSubmitter eventSubmitter = new 
EventSubmitter.Builder(
+      metricContext, "org.apache.gobblin.service").build();
+
+  public final void process(DagManagementStateStore dagManagementStateStore) 
throws IOException {
+    S state = initialize(dagManagementStateStore);   // todo - retry
+    T result = act(dagManagementStateStore, state);   // todo - retry
+    commit(dagManagementStateStore, result);   // todo - retry
+  }
+
+  protected abstract S initialize(DagManagementStateStore 
dagManagementStateStore) throws IOException;
+
+  protected abstract T act(DagManagementStateStore dagManagementStateStore, S 
state) throws IOException;
+
+  protected abstract void sendNotification(T result, EventSubmitter 
eventSubmitter) throws IOException;
+
+  // todo - commit the modified dags to the persistent store, maybe not 
required for InMem dagManagementStateStore
+  protected abstract void commit(DagManagementStateStore 
dagManagementStateStore, T result);

Review Comment:
   even for the non-in-memory, I believe this can go away, and everything 
handled completely within `act`.
   
   if you want to leave in for now to wait and see, that's ok, but let's make 
it non-`abstract` and provide an empty impl, so we don't require each derived 
class to implement a method that may be unnecessary



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeoutException;
+
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.testing.AssertWithBackoff;
+
+import static org.mockito.Mockito.spy;
+
+@Slf4j
+public class DagProcessingEngineTest {
+  private static final String TEST_USER = "testUser";
+  private static final String TEST_PASSWORD = "testPassword";
+  private static final String TEST_TABLE = "quotas";
+  static ITestMetastoreDatabase testMetastoreDatabase;
+  DagProcessingEngine.DagProcEngineThread dagProcEngineThread;
+  DagManagementTaskStreamImpl dagManagementTaskStream;
+  DagProcessingEngine dagProcessingEngine;
+  DagTaskStream dagTaskStream;
+  DagProcFactory dagProcFactory;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    // Setting up mock DB
+    testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+    Config config;
+    ConfigBuilder configBuilder = ConfigBuilder.create();
+    
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 testMetastoreDatabase.getJdbcUrl())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
+    config = configBuilder.build();
+
+    // Constructing TopologySpecMap.
+    Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+    String specExecInstance = "mySpecExecutor";
+    TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+    URI specExecURI = new URI(specExecInstance);
+    topologySpecMap.put(specExecURI, topologySpec);
+    MostlyMySqlDagManagementStateStore dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null);
+    dagManagementStateStore.setTopologySpecMap(topologySpecMap);
+    this.dagManagementTaskStream =
+        new DagManagementTaskStreamImpl(config, Optional.empty(), 
dagManagementStateStore);
+    this.dagManagementTaskStream.setActive(true);
+    this.dagProcFactory = new DagProcFactory();
+    this.dagProcEngineThread = new DagProcessingEngine.DagProcEngineThread(
+        this.dagManagementTaskStream, this.dagProcFactory, 
dagManagementStateStore);
+    this.dagTaskStream = spy(new MockedDagTaskStream());
+    this.dagProcessingEngine =
+        new DagProcessingEngine(config, dagTaskStream, this.dagProcFactory, 
dagManagementStateStore);
+  }
+
+  static class MockedDagTaskStream implements DagTaskStream {
+    public static final int MAX_NUM_OF_TASKS = 10;
+    volatile int i=0;
+
+    @Override
+    public boolean hasNext() {
+      return true;
+    }
+
+    @Override
+    public synchronized DagTask next() {
+      i++;
+      if (i <= MAX_NUM_OF_TASKS) {
+        return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, "fn-" 
+ i, "1234" + i, DagActionStore.FlowActionType.LAUNCH), null);
+      } else {
+        throw new RuntimeException("Max num of tasks reached");
+      }
+    }
+  }
+
+  static class MockedDagTask extends DagTask<MockedDagProc> {
+
+    public MockedDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+      super(dagAction, leaseObtainedStatus);
+    }
+
+    @Override
+    public MockedDagProc host(DagTaskVisitor<MockedDagProc> visitor) {
+      return new MockedDagProc();
+    }
+
+    @Override
+    public boolean conclude() throws IOException {
+      return false;
+    }
+  }
+
+  static class MockedDagProc extends DagProc<Void, Void> {
+
+    @Override
+    protected Void initialize(DagManagementStateStore dagManagementStateStore) 
{
+      return null;
+    }
+
+    @Override
+    protected Void act(DagManagementStateStore dagManagementStateStore, Void 
state) {
+      return null;
+    }
+
+    @Override
+    protected void sendNotification(Void result, EventSubmitter 
eventSubmitter) {
+    }
+
+    @Override
+    protected void commit(DagManagementStateStore dagManagementStateStore, 
Void result) {
+    }
+  }
+
+  // This tests verifies that

Review Comment:
   comment looks incomplete



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagProcEngineEnabledDagActionStoreChangeMonitorFactory.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.util.Objects;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+import javax.inject.Provider;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.runtime.util.InjectionNames;
+import org.apache.gobblin.service.modules.orchestration.DagManagement;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A factory implementation that returns a {@link 
DagManagementDagActionStoreChangeMonitor} instance.
+ */
+@Slf4j
+public class DagProcEngineEnabledDagActionStoreChangeMonitorFactory implements 
Provider<DagActionStoreChangeMonitor> {

Review Comment:
   let's bring this factory name into alignment w/ the underlying class it 
creates instances of 



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeoutException;
+
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.testing.AssertWithBackoff;
+
+import static org.mockito.Mockito.spy;
+
+@Slf4j
+public class DagProcessingEngineTest {
+  private static final String TEST_USER = "testUser";
+  private static final String TEST_PASSWORD = "testPassword";
+  private static final String TEST_TABLE = "quotas";
+  static ITestMetastoreDatabase testMetastoreDatabase;
+  DagProcessingEngine.DagProcEngineThread dagProcEngineThread;
+  DagManagementTaskStreamImpl dagManagementTaskStream;
+  DagProcessingEngine dagProcessingEngine;
+  DagTaskStream dagTaskStream;
+  DagProcFactory dagProcFactory;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    // Setting up mock DB
+    testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+    Config config;
+    ConfigBuilder configBuilder = ConfigBuilder.create();
+    
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 testMetastoreDatabase.getJdbcUrl())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
+    config = configBuilder.build();
+
+    // Constructing TopologySpecMap.
+    Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+    String specExecInstance = "mySpecExecutor";
+    TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+    URI specExecURI = new URI(specExecInstance);
+    topologySpecMap.put(specExecURI, topologySpec);
+    MostlyMySqlDagManagementStateStore dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null);
+    dagManagementStateStore.setTopologySpecMap(topologySpecMap);
+    this.dagManagementTaskStream =
+        new DagManagementTaskStreamImpl(config, Optional.empty(), 
dagManagementStateStore);
+    this.dagManagementTaskStream.setActive(true);
+    this.dagProcFactory = new DagProcFactory();
+    this.dagProcEngineThread = new DagProcessingEngine.DagProcEngineThread(
+        this.dagManagementTaskStream, this.dagProcFactory, 
dagManagementStateStore);
+    this.dagTaskStream = spy(new MockedDagTaskStream());
+    this.dagProcessingEngine =
+        new DagProcessingEngine(config, dagTaskStream, this.dagProcFactory, 
dagManagementStateStore);
+  }
+
+  static class MockedDagTaskStream implements DagTaskStream {
+    public static final int MAX_NUM_OF_TASKS = 10;
+    volatile int i=0;
+
+    @Override
+    public boolean hasNext() {
+      return true;
+    }
+
+    @Override
+    public synchronized DagTask next() {
+      i++;
+      if (i <= MAX_NUM_OF_TASKS) {
+        return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, "fn-" 
+ i, "1234" + i, DagActionStore.FlowActionType.LAUNCH), null);
+      } else {
+        throw new RuntimeException("Max num of tasks reached");
+      }
+    }
+  }
+
+  static class MockedDagTask extends DagTask<MockedDagProc> {
+
+    public MockedDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+      super(dagAction, leaseObtainedStatus);
+    }
+
+    @Override
+    public MockedDagProc host(DagTaskVisitor<MockedDagProc> visitor) {
+      return new MockedDagProc();
+    }
+
+    @Override
+    public boolean conclude() throws IOException {
+      return false;
+    }
+  }
+
+  static class MockedDagProc extends DagProc<Void, Void> {
+
+    @Override
+    protected Void initialize(DagManagementStateStore dagManagementStateStore) 
{
+      return null;
+    }
+
+    @Override
+    protected Void act(DagManagementStateStore dagManagementStateStore, Void 
state) {
+      return null;
+    }
+
+    @Override
+    protected void sendNotification(Void result, EventSubmitter 
eventSubmitter) {
+    }
+
+    @Override
+    protected void commit(DagManagementStateStore dagManagementStateStore, 
Void result) {
+    }
+  }
+
+  // This tests verifies that
+  @Test
+  public void dagProcessingTest() throws InterruptedException, 
TimeoutException {
+    int expectedNumOfInvocations = MockedDagTaskStream.MAX_NUM_OF_TASKS + 
ServiceConfigKeys.DEFAULT_NUM_DAG_PROC_THREADS;

Review Comment:
   is it the sum because there are numTasks returned and then each thread 
additionally blocks (infinitely) to wait for more?  if so, that's unlikely to 
be immediately clear to a maintainer so deserves a comment



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.junit.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+
+public class DagManagementTaskStreamImplTest {
+  private static final String TEST_USER = "testUser";
+  private static final String TEST_PASSWORD = "testPassword";
+  private static final String TEST_TABLE = "quotas";
+  static ITestMetastoreDatabase testMetastoreDatabase;
+  DagProcessingEngine.DagProcEngineThread dagProcEngineThread;
+  DagManagementTaskStreamImpl dagManagementTaskStream;
+  DagProcFactory dagProcFactory;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    // Setting up mock DB
+    testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+    Config config;
+    ConfigBuilder configBuilder = ConfigBuilder.create();
+    
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 testMetastoreDatabase.getJdbcUrl())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
+    config = configBuilder.build();
+
+    // Constructing TopologySpecMap.
+    Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+    String specExecInstance = "mySpecExecutor";
+    TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+    URI specExecURI = new URI(specExecInstance);
+    topologySpecMap.put(specExecURI, topologySpec);
+    MostlyMySqlDagManagementStateStore dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null);
+    dagManagementStateStore.setTopologySpecMap(topologySpecMap);
+    this.dagManagementTaskStream =
+        new DagManagementTaskStreamImpl(config, Optional.empty(), 
dagManagementStateStore);
+    this.dagManagementTaskStream.setActive(true);
+    this.dagProcFactory = new DagProcFactory();
+    this.dagProcEngineThread = new DagProcessingEngine.DagProcEngineThread(
+        this.dagManagementTaskStream, this.dagProcFactory, 
dagManagementStateStore);
+  }
+
+  // This tests adding and removal of dag actions from dag task stream
+  // when we have different dag procs in future, we can test dag processing 
and exception handling
+  @Test
+  public void addRemoveDagActions() throws IOException {
+    dagManagementTaskStream.addDagAction(
+        new DagActionStore.DagAction("fg", "fn", "12345", 
DagActionStore.FlowActionType.LAUNCH));
+    DagTask dagTask = dagManagementTaskStream.next();
+    Assert.assertTrue(dagTask instanceof LaunchDagTask);
+    Object dagProc = dagTask.host(this.dagProcFactory);

Review Comment:
   once `DagProcFactory implements DagTaskVisitor<DagProc>`, you'll be able to 
use `DagProc dagProc = ...` rather than `Object`





Issue Time Tracking
-------------------

    Worklog Id:     (was: 908988)
    Time Spent: 29h 10m  (was: 29h)

> Refactor code to move current in-memory references to new design for REST 
> calls: Launch, Resume and Kill
> --------------------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1910
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1910
>             Project: Apache Gobblin
>          Issue Type: New Feature
>            Reporter: Meeth Gala
>            Priority: Major
>          Time Spent: 29h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to