Technoboy- commented on a change in pull request #1358: URL: https://github.com/apache/shardingsphere-elasticjob/pull/1358#discussion_r471057983
########## File path: elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/dag/DagService.java ########## @@ -0,0 +1,557 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.elasticjob.lite.internal.dag; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.DateFormatUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.curator.framework.recipes.queue.DistributedDelayQueue; +import org.apache.curator.framework.recipes.queue.QueueBuilder; +import org.apache.curator.framework.recipes.queue.QueueSerializer; +import org.apache.shardingsphere.elasticjob.api.JobConfiguration; +import org.apache.shardingsphere.elasticjob.api.JobDagConfiguration; +import org.apache.shardingsphere.elasticjob.infra.concurrent.BlockUtils; +import org.apache.shardingsphere.elasticjob.infra.env.IpUtils; +import org.apache.shardingsphere.elasticjob.infra.exception.DagRuntimeException; +import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService; +import org.apache.shardingsphere.elasticjob.lite.internal.state.JobStateEnum; +import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter; +import org.apache.shardingsphere.elasticjob.tracing.JobEventBus; +import org.apache.shardingsphere.elasticjob.tracing.event.DagJobExecutionEvent; + +import java.io.UnsupportedEncodingException; +import java.lang.management.ManagementFactory; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Job dag service. + */ +@Slf4j +public class DagService implements CuratorCacheListener { + public static final String ROOT_JOB = "self"; + + private static final String DAG_LATCH_PATH = "/daglatch/"; + + private static final int DEFAULT_RETRY_INTERVAL = 30; + + private static final String RETRY_PATH = "/dagretry/%s/%s"; + + private final DagNodeStorage dagNodeStorage; + + private final JobDagConfiguration jobDagConfig; + + private final String jobName; + + private final String dagName; + + private final InterProcessMutex mutex; + + private final CoordinatorRegistryCenter regCenter; + + private final JobEventBus jobEventBus; + + private CuratorCache jobStateCache; + + private DistributedDelayQueue<String> delayQueue; + + public DagService(final CoordinatorRegistryCenter regCenter, final String jobName, final JobEventBus jobEventBus, final JobDagConfiguration jobDagConfig) { + this.jobDagConfig = jobDagConfig; + this.regCenter = regCenter; + this.jobName = jobName; + this.dagNodeStorage = new DagNodeStorage(regCenter, jobDagConfig.getDagName(), jobName); + this.dagName = jobDagConfig.getDagName(); + this.jobEventBus = jobEventBus; + if (StringUtils.equals(jobDagConfig.getDagDependencies(), ROOT_JOB)) { + this.mutex = new InterProcessMutex((CuratorFramework) regCenter.getRawClient(), DAG_LATCH_PATH + dagName); + } else { + this.mutex = null; + } + regDagConfig(); + } + + public DagService(final CoordinatorRegistryCenter regCenter, final String dagName, final DagNodeStorage dagNodeStorage) { + this.regCenter = regCenter; + this.dagName = dagName; + this.dagNodeStorage = dagNodeStorage; + this.jobName = ""; + this.jobDagConfig = null; + this.mutex = null; + this.jobStateCache = null; + this.delayQueue = null; + this.jobEventBus = null; + } + + /** + * Init delay queue for retry jobs. + * + * @return DistributedDelayQueue + */ + private DistributedDelayQueue<String> initDelayQueue() { + String retryPath = String.format(RETRY_PATH, dagName, jobName); + DistributedDelayQueue<String> delayQueue = QueueBuilder.builder((CuratorFramework) regCenter.getRawClient(), new JobRetryTrigger(regCenter, dagName), new QueueSerializer<String>() { + @Override + public byte[] serialize(final String item) { + try { + return item.getBytes("utf-8"); + } catch (UnsupportedEncodingException e) { + log.error("Dag-{}[{}] Init delay queue exception.", dagName, jobName, e); + } + return null; + } + + @Override + public String deserialize(final byte[] bytes) { + return new String(bytes); + } + }, retryPath).buildDelayQueue(); + + try { + delayQueue.start(); + log.info("Dag-{}[{}] start delay queue, path={}", dagName, jobName, retryPath); + //CHECKSTYLE:OFF + } catch (Exception e) { + //CHECKSTYLE:ON + log.error("Dag-{}[{}] start delay queue Exception, path={}", dagName, jobName, retryPath, e); + } + + return delayQueue; + } + + private void startJobStateListener() { + jobStateCache.listenable().addListener(this); + try { + jobStateCache.start(); + postEvent(DagJobStates.REG.getValue(), "Job register success"); + //CHECKSTYLE:OFF + } catch (Exception exp) { + //CHECKSTYLE:ON + log.error("Start dag-{} job-{} state path listener Exception.", dagName, jobName, exp); + // ignore + postEvent(DagJobStates.REG.getValue(), "Job register Error:" + exp.getMessage()); + } + log.info("Dag-{} job-{} state path listener has started success.", dagName, jobName); + } + + private void stopJobStateListener() { + jobStateCache.close(); + } + + /** + * Is dag root job. + * + * @return boolean is dag root job + */ + public boolean isDagRootJob() { + return StringUtils.equals(jobDagConfig.getDagDependencies(), "self"); + } + + /** + * current dag status. + * + * @return DagStates + */ + public DagStates getDagStates() { + return DagStates.of(this.dagNodeStorage.currentDagStates()); + } + + /** + * Persist Dag config into zk. + * always overwrite. + */ + private void regDagConfig() { + this.dagNodeStorage.persistDagConfig(genDependenciesString()); + this.delayQueue = initDelayQueue(); + this.jobStateCache = CuratorCache.build((CuratorFramework) regCenter.getRawClient(), this.dagNodeStorage.pathOfJobNodeState()); + this.startJobStateListener(); + } + + private String genDependenciesString() { + return jobDagConfig.getDagDependencies(); + } + + /** + * 1. select leader ; + * 2. ReGraph DAG ; + * 3. Change DAG states to running + */ + public void changeDagStatesAndReGraph() { + if (null == mutex) { + log.error("Need root job when change dag states and regraph!"); + throw new DagRuntimeException("Need root job when change dag states and regraph!"); + } + + if (!acquireDagLeader()) { + blockUntilCompleted(); + return; + } + + if (getDagStates() == DagStates.RUNNING) { + log.info("Dag-{} states already RUNNING", dagName); + return; + } + + try { + String batchNo = getBatchNo(); + Map<String, Set<String>> allDagNode = dagNodeStorage.getAllDagConfigJobs(); + checkCycle(allDagNode); + dagNodeStorage.initDagGraph(allDagNode, batchNo); + dagNodeStorage.updateDagStates(DagStates.RUNNING); + dagNodeStorage.updateDagJobStates(JobStateEnum.RUNNING); + // create graph event + postEvent(DagJobStates.INIT.getValue(), "Create graph success"); + //CHECKSTYLE:OFF + } catch (Exception ex) { + //CHECKSTYLE:ON + postEvent(DagJobStates.INIT.getValue(), "Create graph error:" + ex.getMessage()); + } finally { + releaseDagLeader(); + } + } + + private void blockUntilCompleted() { + int count = 0; + while (getDagStates() != DagStates.RUNNING) { + count++; + log.debug("DAG '{}' sleep short time until DAG graph completed. {}", dagName, count); + BlockUtils.sleep(300L); + if (count > 200) { + log.error("Dag-{} Wait a long time with Dag graph NOT complete", dagName); + throw new DagRuntimeException("Dag graph not complete!"); + } + } + } + + private boolean acquireDagLeader() { + try { + return mutex.acquire(200, TimeUnit.MILLISECONDS); + //CHECKSTYLE:OFF + } catch (Exception exp) { + //CHECKSTYLE:ON + log.debug("Dag-{} acquire lock error!", dagName, exp); + } + return false; + } + + private void releaseDagLeader() { + try { + if (mutex.isAcquiredInThisProcess()) { + mutex.release(); + } + //CHECKSTYLE:OFF + } catch (Exception exp) { + //CHECKSTYLE:ON + log.debug("Dag-{} release lock error!", dagName, exp); + } + } + + /** + * Check dag has cycle. + * + * @param allDagNode dag config info. + */ + private void checkCycle(final Map<String, Set<String>> allDagNode) { + Map<String, Set<String>> cloneMap = Maps.newHashMap(); + allDagNode.forEach((key, value) -> cloneMap.put(key, Sets.newHashSet(value))); + + while (removeSelf(cloneMap)) { + if (log.isDebugEnabled()) { + log.debug("Dag-{} remove root job.", dagName); + } + } + if (!cloneMap.isEmpty()) { + log.error("Dag {} find cycle {}", dagName, cloneMap.keySet().size()); + printCycleNode(cloneMap); + throw new DagRuntimeException("Dag job find cycles"); + } + log.info("Dag {} checkCycle success", dagName); + } + + private void printCycleNode(final Map<String, Set<String>> cloneMap) { + cloneMap.forEach((k, v) -> { + log.error("{} has cycle with {}", k, Joiner.on("|").join(v)); + }); + } + + private boolean removeSelf(final Map<String, Set<String>> cloneMap) { + Iterator<Map.Entry<String, Set<String>>> iterator = cloneMap.entrySet().iterator(); + boolean removed = false; + while (iterator.hasNext()) { + Map.Entry<String, Set<String>> next = iterator.next(); + Set<String> value = next.getValue(); + value.remove("self"); + if (value.isEmpty()) { + markKeyAsSelf(cloneMap, next.getKey()); + iterator.remove(); + removed = true; + } + } + return removed; + } + + private void markKeyAsSelf(final Map<String, Set<String>> cloneMap, final String key) { + cloneMap.values().forEach(s -> s.remove(key)); + } + + private String getBatchNo() { + String date = DateFormatUtils.format(new Date(), "yyMMddHHmmss"); + return dagName + IpUtils.getIp() + ManagementFactory.getRuntimeMXBean().getName().split("@")[0] + date; + } + + /** + * When dag job start run ,check it's dependencies jobs states. + */ + public void checkJobDependenciesState() { + DagJobStates currentJobRunStates = dagNodeStorage.getDagJobRunStates(jobName); + if (currentJobRunStates == DagJobStates.SUCCESS || currentJobRunStates == DagJobStates.FAIL) { + log.info("DAG- {} job- {} 's states is {},Can not run again!", jobDagConfig.getDagName(), jobName, currentJobRunStates); + throw new DagRuntimeException("Dag job has been completed"); + } + if (isDagRootJob()) { + log.debug("DAG {} job {} is root,No deps.", jobDagConfig.getDagName(), jobName); + return; + } + + // 要求dep skip 或 success Review comment: Need to change to English ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
