arjun4084346 commented on code in PR #3545: URL: https://github.com/apache/gobblin/pull/3545#discussion_r955460218
########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java: ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.collect.ImmutableMap; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.RequesterService; +import org.apache.gobblin.service.ServiceRequester; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.util.ConfigUtils; + + +/** + * An abstract implementation of {@link UserQuotaManager} that + */ +@Slf4j +abstract public class AbstractUserQuotaManager implements UserQuotaManager { + public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + "perUserQuota"; + public static final String PER_FLOWGROUP_QUOTA = DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota"; + public static final String USER_JOB_QUOTA_KEY = DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota"; + public static final String QUOTA_SEPERATOR = ":"; + public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE; + private final Map<String, Integer> perUserQuota; + private final Map<String, Integer> perFlowGroupQuota; + Set<String> runningDagIds = ConcurrentHashMap.newKeySet(); + private final int defaultQuota; + + public AbstractUserQuotaManager(Config config) { + this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, DEFAULT_USER_JOB_QUOTA); + ImmutableMap.Builder<String, Integer> userMapBuilder = ImmutableMap.builder(); + ImmutableMap.Builder<String, Integer> flowGroupMapBuilder = ImmutableMap.builder(); + // Quotas will take form of user:<Quota> and flowGroup:<Quota> + for (String flowGroupQuota : ConfigUtils.getStringList(config, PER_FLOWGROUP_QUOTA)) { + flowGroupMapBuilder.put(flowGroupQuota.split(QUOTA_SEPERATOR)[0], Integer.parseInt(flowGroupQuota.split(QUOTA_SEPERATOR)[1])); + } + // Keep quotas per user as well in form user:<Quota> which apply for all flowgroups + for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA)) { + userMapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0], Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1])); + } + this.perUserQuota = userMapBuilder.build(); + this.perFlowGroupQuota = flowGroupMapBuilder.build(); + } + + abstract int incrementJobCount(String key, CountType countType) throws IOException; + + abstract void decrementJobCount(String user, CountType countType) throws IOException; + + public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean onInit) throws IOException { + // Dag is already being tracked, no need to double increment for retries and multihop flows + if (isDagCurrentlyRunning(dagNode)) { + return; + } + String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null); + String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), + ConfigurationKeys.FLOW_GROUP_KEY, ""); + String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode); + boolean proxyUserCheck = true; + Set<String> usersQuotaIncrement = new HashSet<>(); // holds the users for which quota is increased + StringBuilder requesterMessage = new StringBuilder(); + runningDagIds.add(DagManagerUtils.generateDagId(dagNode)); + if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) { + int proxyQuotaIncrement = incrementJobCountAndCheckQuota( + DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), getQuotaForUser(proxyUser), CountType.USER_COUNT); + proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check succeeds + if (!proxyUserCheck) { + // add 1 to proxyUserIncrement since count starts at 0, and is negative if quota is exceeded + requesterMessage.append(String.format( + "Quota exceeded for proxy user %s on executor %s : quota=%s, requests above quota=%d%n", + proxyUser, specExecutorUri, getQuotaForUser(proxyUser), Math.abs(proxyQuotaIncrement)+1-getQuotaForUser(proxyUser))); + } + } + + String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode); + boolean requesterCheck = true; + + if (serializedRequesters != null && dagNode.getValue().getCurrentAttempts() <= 1) { + List<String> uniqueRequesters = DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters); + for (String requester : uniqueRequesters) { + int userQuotaIncrement = incrementJobCountAndCheckQuota( + DagManagerUtils.getUserQuotaKey(requester, dagNode), getQuotaForUser(requester), CountType.REQUESTER_COUNT); + boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota check succeeds + usersQuotaIncrement.add(requester); + requesterCheck = requesterCheck && thisRequesterCheck; + if (!thisRequesterCheck) { + requesterMessage.append(String.format( + "Quota exceeded for requester %s on executor %s : quota=%s, requests above quota=%d%n", + requester, specExecutorUri, getQuotaForUser(requester), Math.abs(userQuotaIncrement)-getQuotaForUser(requester))); + } + } + } + + boolean flowGroupCheck = true; + if (dagNode.getValue().getCurrentAttempts() <= 1) { + int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota( + DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), getQuotaForFlowGroup(flowGroup), CountType.FLOW_COUNT); + flowGroupCheck = flowGroupQuotaIncrement >= 0; + if (!flowGroupCheck) { + requesterMessage.append(String.format("Quota exceeded for flowgroup %s on executor %s : quota=%s, requests above quota=%d%n", + flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup), + Math.abs(flowGroupQuotaIncrement) + 1 - getQuotaForFlowGroup(flowGroup))); + } + } + + // Throw errors for reach quota at the end to avoid inconsistent job counts Review Comment: Thanks, though it was probably originally added by Jack (not sure) before a couple of refactoring. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
