[ https://issues.apache.org/jira/browse/GOBBLIN-2002?focusedWorklogId=905403&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-905403 ]
ASF GitHub Bot logged work on GOBBLIN-2002: ------------------------------------------- Author: ASF GitHub Bot Created on: 16/Feb/24 18:41 Start Date: 16/Feb/24 18:41 Worklog Time Spent: 10m Work Description: phet commented on code in PR #3878: URL: https://github.com/apache/gobblin/pull/3878#discussion_r1492843795 ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyInMemoryDagManagementStateStore.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.net.URI; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.DagNodeId; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; + + +/** + * An implementation of {@link DagManagementStateStore} to provide information about dags, dag nodes and their job states. + * This store maintains and utilizes in-memory references about dags and their job states and is used + * to determine what the current status of the {@link Dag} and/or {@link Dag.DagNode} is and what actions needs to be + * taken next likewise mark it as: complete, failed, sla breached or simply clean up after completion. + * This also encapsulates mysql based tables, i) dagStateStore, ii) failedDagStore, iii) userQuotaManager. + * They are used here to provide complete access to dag related information at one place. + */ +@Slf4j +public class MostlyInMemoryDagManagementStateStore implements DagManagementStateStore { + private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag = new HashMap<>(); + private final Map<DagNodeId, Dag.DagNode<JobExecutionPlan>> dagNodes = new HashMap<>(); + // dagToJobs holds a map of dagId to running jobs of that dag + final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new HashMap<>(); + final Map<String, Long> dagToDeadline = new HashMap<>(); + private final DagStateStore dagStateStore; + private final DagStateStore failedDagStateStore; + private final UserQuotaManager quotaManager; + private static final String FAILED_DAG_STATESTORE_PREFIX = "failedDagStateStore"; + public static final String DAG_STATESTORE_CLASS_KEY = DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass"; + + public MostlyInMemoryDagManagementStateStore(Config config, Map<URI, TopologySpec> topologySpecMap) throws IOException { + this.dagStateStore = createDagStateStore(config, topologySpecMap); + this.failedDagStateStore = createDagStateStore( + ConfigUtils.getConfigOrEmpty(config, FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap); + this.quotaManager = new MysqlUserQuotaManager(config); + this.quotaManager.init(getDags()); + } + + DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> topologySpecMap) { + try { + Class<?> dagStateStoreClass = Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, MysqlDagStateStore.class.getName())); + return (DagStateStore) GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config, topologySpecMap); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } + + @Override + public void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException { + this.dagStateStore.writeCheckpoint(dag); + } + + @Override + public void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException { + this.failedDagStateStore.writeCheckpoint(dag); + } + + @Override + public void deleteDag(Dag<JobExecutionPlan> dag) throws IOException { + this.dagStateStore.cleanUp(dag); + } + + @Override + public void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException { + this.failedDagStateStore.cleanUp(dag); + } + + @Override + public void deleteDag(DagManager.DagId dagId) throws IOException { + this.dagStateStore.cleanUp(dagId.toString()); + } + + @Override + public void deleteFailedDag(DagManager.DagId dagId) throws IOException { + this.failedDagStateStore.cleanUp(dagId.toString()); + } + + @Override + public List<Dag<JobExecutionPlan>> getDags() throws IOException { + return this.dagStateStore.getDags(); + } + + @Override + public Dag<JobExecutionPlan> getFailedDag(DagManager.DagId dagId) throws IOException { + return this.failedDagStateStore.getDag(dagId.toString()); + } + + @Override + public Set<String> getFailedDagIds() throws IOException { + return this.failedDagStateStore.getDagIds(); + } + + @Override + public synchronized void deleteDagNodeState(DagManager.DagId dagId, Dag.DagNode<JobExecutionPlan> dagNode) { Review Comment: instance-level synchronization is probably not what we want, but rather `dagId`-level synchronization. totally reasonable to leave this as a TODO Issue Time Tracking ------------------- Worklog Id: (was: 905403) Time Spent: 7h (was: 6h 50m) > create MostlyInMemoryDagManagementStateStore to merge UserQuotaManager, > DagStateStore and in-memory dag maps used in DagManager > ------------------------------------------------------------------------------------------------------------------------------- > > Key: GOBBLIN-2002 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2002 > Project: Apache Gobblin > Issue Type: Improvement > Reporter: Arjun Singh Bora > Priority: Major > Time Spent: 7h > Remaining Estimate: 0h > > this will help in refactoring DagManager -- This message was sent by Atlassian Jira (v8.20.10#820010)