[ 
https://issues.apache.org/jira/browse/GOBBLIN-2002?focusedWorklogId=905439&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-905439
 ]

ASF GitHub Bot logged work on GOBBLIN-2002:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 16/Feb/24 21:34
            Start Date: 16/Feb/24 21:34
    Worklog Time Spent: 10m 
      Work Description: umustafi commented on code in PR #3878:
URL: https://github.com/apache/gobblin/pull/3878#discussion_r1493003984


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * An implementation of {@link DagManagementStateStore} to provide information 
about dags, dag nodes and their job states.
+ * This store maintains and utilizes in-memory references about dags and their 
job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link 
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply 
clean up after completion.
+ * This also encapsulates mysql based tables, i) dagStateStore, ii) 
failedDagStore, iii) userQuotaManager.
+ * They are used here to provide complete access to dag related information at 
one place.
+ */
+@Slf4j
+public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateStore {
+  private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
jobToDag = new ConcurrentHashMap<>();
+  private final Map<DagNodeId, Dag.DagNode<JobExecutionPlan>> dagNodes = new 
ConcurrentHashMap<>();
+  // dagToJobs holds a map of dagId to running jobs of that dag
+  private final Map<DagManager.DagId, 
LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new 
ConcurrentHashMap<>();
+  private final Map<DagManager.DagId, Long> dagToDeadline = new 
ConcurrentHashMap<>();
+  private final DagStateStore dagStateStore;
+  private final DagStateStore failedDagStateStore;
+  private final UserQuotaManager quotaManager;
+  private static final String FAILED_DAG_STATESTORE_PREFIX = 
"failedDagStateStore";
+  public static final String DAG_STATESTORE_CLASS_KEY = 
DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
+
+  public MostlyMySqlDagManagementStateStore(Config config, Map<URI, 
TopologySpec> topologySpecMap) throws IOException {
+    this.dagStateStore = createDagStateStore(config, topologySpecMap);
+    this.failedDagStateStore = createDagStateStore(
+        ConfigUtils.getConfigOrEmpty(config, 
FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
+    this.quotaManager = new MysqlUserQuotaManager(config);
+    this.quotaManager.init(getDags());
+  }
+
+  DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> 
topologySpecMap) {
+    try {
+      Class<?> dagStateStoreClass = 
Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, 
MysqlDagStateStore.class.getName()));
+      return (DagStateStore) 
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config, 
topologySpecMap);
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException {
+    this.dagStateStore.writeCheckpoint(dag);
+  }
+
+  @Override
+  public void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException {
+    this.dagStateStore.cleanUp(dag);
+    // todo - updated failedDagStateStore iff cleanup returned 1
+    this.failedDagStateStore.writeCheckpoint(dag);
+  }
+
+  @Override
+  public void deleteDag(Dag<JobExecutionPlan> dag) throws IOException {
+    this.dagStateStore.cleanUp(dag);
+  }
+
+  @Override
+  public void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
+    this.failedDagStateStore.cleanUp(dag);
+  }
+
+  @Override
+  public void deleteDag(DagManager.DagId dagId) throws IOException {
+    this.dagStateStore.cleanUp(dagId.toString());
+  }
+
+  @Override
+  public void deleteFailedDag(DagManager.DagId dagId) throws IOException {
+    this.failedDagStateStore.cleanUp(dagId.toString());
+  }
+
+  @Override
+  public List<Dag<JobExecutionPlan>> getDags() throws IOException {
+    return this.dagStateStore.getDags();
+  }
+
+  @Override
+  public Optional<Dag<JobExecutionPlan>> getFailedDag(DagManager.DagId dagId) 
throws IOException {
+    return Optional.of(this.failedDagStateStore.getDag(dagId.toString()));
+  }
+
+  @Override
+  public Set<String> getFailedDagIds() throws IOException {
+    return this.failedDagStateStore.getDagIds();
+  }
+
+  @Override
+  // todo - updating different mapps here and in addDagNodeState can result in 
inconsistency between the maps
+  public synchronized void deleteDagNodeState(DagManager.DagId dagId, 
Dag.DagNode<JobExecutionPlan> dagNode) {
+    this.jobToDag.remove(dagNode);
+    this.dagNodes.remove(dagNode.getValue().getId());
+    this.dagToDeadline.remove(dagId);
+    this.dagToJobs.get(dagId).remove(dagNode);
+    if (this.dagToJobs.get(dagId).isEmpty()) {
+      this.dagToJobs.remove(dagId);
+    }
+  }
+
+  // todo - updating different mapps here and in deleteDagNodeState can result 
in inconsistency between the maps
+  @Override
+  public synchronized void addDagNodeState(Dag.DagNode<JobExecutionPlan> 
dagNode, DagManager.DagId dagId)
+      throws IOException {
+    Optional<Dag<JobExecutionPlan>> dag = getDag(dagId);
+    if (!dag.isPresent()) {
+      throw new RuntimeException("Dag " + dagId + " not found");
+    }
+    this.jobToDag.put(dagNode, dag.get());
+    this.dagNodes.put(dagNode.getValue().getId(), dagNode);
+    if (!this.dagToJobs.containsKey(dagId)) {
+      this.dagToJobs.put(dagId, Lists.newLinkedList());
+    }
+    this.dagToJobs.get(dagId).add(dagNode);
+  }
+
+  @Override
+  public Optional<Dag<JobExecutionPlan>> getDag(DagManager.DagId dagId) throws 
IOException {
+    return Optional.of(this.dagStateStore.getDag(dagId.toString()));
+  }
+
+  @Override
+  public boolean containsDag(DagManager.DagId dagId) throws IOException {
+    return this.dagStateStore.existsDag(dagId);
+  }
+
+  @Override
+  public Optional<Dag.DagNode<JobExecutionPlan>> getDagNode(DagNodeId 
dagNodeId) {
+    return Optional.of(this.dagNodes.get(dagNodeId));
+  }
+
+
+  @Override
+  public Optional<Dag<JobExecutionPlan>> 
getParentDag(Dag.DagNode<JobExecutionPlan> dagNode) {
+    return Optional.of(this.jobToDag.get(dagNode));
+  }
+
+  @Override
+  public List<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId 
dagId) {
+    List<Dag.DagNode<JobExecutionPlan>> dagNodes = this.dagToJobs.get(dagId);
+    if (dagNodes != null) {
+      return dagNodes;
+    } else {
+      return Lists.newLinkedList();
+    }
+  }
+
+  public void initQuota(Collection<Dag<JobExecutionPlan>> dags) {
+    // This implementation does not need to update quota usage when the 
service restarts or when its leadership status changes

Review Comment:
   mark with todo or unimplemented exception



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -74,16 +78,10 @@ public interface DagManagementStateStore {
    */
   List<Dag<JobExecutionPlan>> getDags() throws IOException;
 
-  /**
-   * Return a list of all dag IDs contained in the dag state store.
-   */
-  Set<String> getDagIds() throws IOException;
-  Set<String> getFailedDagIds() throws IOException;
-
   /**
    * Initialize with the provided set of dags.
    */
-  void initQuotaManageer(Collection<Dag<JobExecutionPlan>> dags) throws 
IOException;
+  void initQuota(Collection<Dag<JobExecutionPlan>> dags) throws IOException;

Review Comment:
   I like this idea where we group together all actions related to 
startup/initialization. I am also okay with doing it in a separate PR. 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * An implementation of {@link DagManagementStateStore} to provide information 
about dags, dag nodes and their job states.
+ * This store maintains and utilizes in-memory references about dags and their 
job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link 
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply 
clean up after completion.
+ * This also encapsulates mysql based tables, i) dagStateStore, ii) 
failedDagStore, iii) userQuotaManager.
+ * They are used here to provide complete access to dag related information at 
one place.
+ */
+@Slf4j
+public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateStore {
+  private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
jobToDag = new ConcurrentHashMap<>();
+  private final Map<DagNodeId, Dag.DagNode<JobExecutionPlan>> dagNodes = new 
ConcurrentHashMap<>();
+  // dagToJobs holds a map of dagId to running jobs of that dag
+  private final Map<DagManager.DagId, 
LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new 
ConcurrentHashMap<>();
+  private final Map<DagManager.DagId, Long> dagToDeadline = new 
ConcurrentHashMap<>();
+  private final DagStateStore dagStateStore;
+  private final DagStateStore failedDagStateStore;
+  private final UserQuotaManager quotaManager;
+  private static final String FAILED_DAG_STATESTORE_PREFIX = 
"failedDagStateStore";
+  public static final String DAG_STATESTORE_CLASS_KEY = 
DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
+
+  public MostlyMySqlDagManagementStateStore(Config config, Map<URI, 
TopologySpec> topologySpecMap) throws IOException {
+    this.dagStateStore = createDagStateStore(config, topologySpecMap);
+    this.failedDagStateStore = createDagStateStore(
+        ConfigUtils.getConfigOrEmpty(config, 
FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
+    this.quotaManager = new MysqlUserQuotaManager(config);
+    this.quotaManager.init(getDags());
+  }
+
+  DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> 
topologySpecMap) {
+    try {
+      Class<?> dagStateStoreClass = 
Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, 
MysqlDagStateStore.class.getName()));
+      return (DagStateStore) 
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config, 
topologySpecMap);
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException {
+    this.dagStateStore.writeCheckpoint(dag);
+  }
+
+  @Override
+  public void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException {
+    this.dagStateStore.cleanUp(dag);
+    // todo - updated failedDagStateStore iff cleanup returned 1
+    this.failedDagStateStore.writeCheckpoint(dag);
+  }
+
+  @Override
+  public void deleteDag(Dag<JobExecutionPlan> dag) throws IOException {
+    this.dagStateStore.cleanUp(dag);
+  }
+
+  @Override
+  public void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
+    this.failedDagStateStore.cleanUp(dag);
+  }
+
+  @Override
+  public void deleteDag(DagManager.DagId dagId) throws IOException {
+    this.dagStateStore.cleanUp(dagId.toString());
+  }
+
+  @Override
+  public void deleteFailedDag(DagManager.DagId dagId) throws IOException {
+    this.failedDagStateStore.cleanUp(dagId.toString());
+  }
+
+  @Override
+  public List<Dag<JobExecutionPlan>> getDags() throws IOException {
+    return this.dagStateStore.getDags();
+  }
+
+  @Override
+  public Optional<Dag<JobExecutionPlan>> getFailedDag(DagManager.DagId dagId) 
throws IOException {
+    return Optional.of(this.failedDagStateStore.getDag(dagId.toString()));
+  }
+
+  @Override
+  public Set<String> getFailedDagIds() throws IOException {
+    return this.failedDagStateStore.getDagIds();
+  }
+
+  @Override
+  // todo - updating different mapps here and in addDagNodeState can result in 
inconsistency between the maps

Review Comment:
   typo in "mapps"



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * An implementation of {@link DagManagementStateStore} to provide information 
about dags, dag nodes and their job states.
+ * This store maintains and utilizes in-memory references about dags and their 
job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link 
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply 
clean up after completion.
+ * This also encapsulates mysql based tables, i) dagStateStore, ii) 
failedDagStore, iii) userQuotaManager.
+ * They are used here to provide complete access to dag related information at 
one place.
+ */
+@Slf4j
+public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateStore {
+  private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
jobToDag = new ConcurrentHashMap<>();
+  private final Map<DagNodeId, Dag.DagNode<JobExecutionPlan>> dagNodes = new 
ConcurrentHashMap<>();
+  // dagToJobs holds a map of dagId to running jobs of that dag
+  private final Map<DagManager.DagId, 
LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new 
ConcurrentHashMap<>();
+  private final Map<DagManager.DagId, Long> dagToDeadline = new 
ConcurrentHashMap<>();
+  private final DagStateStore dagStateStore;
+  private final DagStateStore failedDagStateStore;
+  private final UserQuotaManager quotaManager;
+  private static final String FAILED_DAG_STATESTORE_PREFIX = 
"failedDagStateStore";
+  public static final String DAG_STATESTORE_CLASS_KEY = 
DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
+
+  public MostlyMySqlDagManagementStateStore(Config config, Map<URI, 
TopologySpec> topologySpecMap) throws IOException {
+    this.dagStateStore = createDagStateStore(config, topologySpecMap);
+    this.failedDagStateStore = createDagStateStore(
+        ConfigUtils.getConfigOrEmpty(config, 
FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
+    this.quotaManager = new MysqlUserQuotaManager(config);
+    this.quotaManager.init(getDags());
+  }
+
+  DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> 
topologySpecMap) {
+    try {
+      Class<?> dagStateStoreClass = 
Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, 
MysqlDagStateStore.class.getName()));
+      return (DagStateStore) 
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config, 
topologySpecMap);
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException {
+    this.dagStateStore.writeCheckpoint(dag);
+  }
+
+  @Override
+  public void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException {
+    this.dagStateStore.cleanUp(dag);
+    // todo - updated failedDagStateStore iff cleanup returned 1
+    this.failedDagStateStore.writeCheckpoint(dag);
+  }
+
+  @Override
+  public void deleteDag(Dag<JobExecutionPlan> dag) throws IOException {
+    this.dagStateStore.cleanUp(dag);
+  }
+
+  @Override
+  public void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
+    this.failedDagStateStore.cleanUp(dag);
+  }
+
+  @Override
+  public void deleteDag(DagManager.DagId dagId) throws IOException {
+    this.dagStateStore.cleanUp(dagId.toString());
+  }
+
+  @Override
+  public void deleteFailedDag(DagManager.DagId dagId) throws IOException {
+    this.failedDagStateStore.cleanUp(dagId.toString());
+  }
+
+  @Override
+  public List<Dag<JobExecutionPlan>> getDags() throws IOException {
+    return this.dagStateStore.getDags();
+  }
+
+  @Override
+  public Optional<Dag<JobExecutionPlan>> getFailedDag(DagManager.DagId dagId) 
throws IOException {
+    return Optional.of(this.failedDagStateStore.getDag(dagId.toString()));
+  }
+
+  @Override
+  public Set<String> getFailedDagIds() throws IOException {
+    return this.failedDagStateStore.getDagIds();
+  }
+
+  @Override
+  // todo - updating different mapps here and in addDagNodeState can result in 
inconsistency between the maps

Review Comment:
   change them to concurrent hash map or add synchronized keyword



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+@Alpha
+public interface DagManagementStateStore {
+  /**
+   * Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s.
+   * e.g. on adding a failed dag in store to retry later, on submitting a dag 
node to spec producer because that changes
+   * dag node's state, on resuming a dag, on receiving a new dag from 
orchestrator.
+   * Opposite of this is {@link DagManagementStateStore#deleteDag} that 
removes the Dag from the store.
+   * @param dag The dag to checkpoint
+   */
+  void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException;
+
+  /**
+   * Returns true if the dag is present in the store.
+   * @param dagId DagId of the dag
+   */
+  boolean containsDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * Returns a dag if present, null otherwise.
+   * @param dagId DagId of the dag
+   */
+  Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * Delete the {@link Dag} from the backing store, typically called upon 
completion of execution.
+   * @param dag The dag completed/cancelled execution on {@link 
org.apache.gobblin.runtime.api.SpecExecutor}.
+   */
+  default void deleteDag(Dag<JobExecutionPlan> dag) throws IOException {
+    deleteDag(DagManagerUtils.generateDagId(dag));
+  }
+
+  /**
+   * Delete the {@link Dag} from the backing store, typically upon completion 
of execution.
+   * @param dagId The ID of the dag to clean up.
+   */
+  void deleteDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * This marks the dag as a failed one.
+   * Failed dags are queried using {@link 
DagManagementStateStore#getFailedDagIds()} later to be retried.
+   * @param dag failing dag
+   * @throws IOException
+   */
+  void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException;
+
+  /**
+   * Return a list of all failed dags' IDs contained in the dag state store.
+   */
+  Set<String> getFailedDagIds() throws IOException;
+
+  /**
+   * Returns the failed dag.
+   * If the dag is not found or is not marked as failed through {@link 
DagManagementStateStore#markDagFailed(Dag)}, it returns null.
+   * @param dagId dag id of the failed dag
+   */
+  Dag<JobExecutionPlan> getFailedDag(DagManager.DagId dagId) throws 
IOException;
+
+  /**
+   * Deletes the failed dag. No-op if dag is not found or is not marked as 
failed.
+   * @param dag
+   * @throws IOException
+   */
+  default void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
+    deleteFailedDag(DagManagerUtils.generateDagId(dag));
+  }
+
+  void deleteFailedDag(DagManager.DagId dagId) throws IOException;
+
+  /**
+   * Adds state of a {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} to the store.
+   * Note that a DagNode is a part of a Dag and may already be present in the 
store thorugh
+   * {@link DagManagementStateStore#checkpointDag}. This call is just an 
additional identifier which may be used
+   * for DagNode level operations. In the future, it may be merged with 
checkpointDag.
+   * @param dagNode dag node to be added
+   * @param dagId dag id of the dag this dag node belongs to
+   * @throws IOException
+   */
+  void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId 
dagId)
+      throws IOException;
+
+  /**
+   * Returns the requested {@link  
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}
+   * @param dagNodeId of the dag ndoe
+   */
+  Dag.DagNode<JobExecutionPlan> getDagNode(DagNodeId dagNodeId);
+
+  /**
+   * Returns a list of {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} for a {@link Dag}
+   * @param dagId DagId of the dag for which all DagNodes are requested
+   */
+  List<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId dagId) 
throws IOException;
+
+  /**
+   * Returns the {@link Dag} the provided {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} belongs to,
+   * or null if the dag node is not found.

Review Comment:
   +1 to this we follow Optional convention in an above method





Issue Time Tracking
-------------------

    Worklog Id:     (was: 905439)
    Time Spent: 8h 20m  (was: 8h 10m)

> create MostlyInMemoryDagManagementStateStore to merge UserQuotaManager, 
> DagStateStore and in-memory dag maps used in DagManager
> -------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-2002
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2002
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 8h 20m
>  Remaining Estimate: 0h
>
> this will help in refactoring DagManager



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to