APACHE-KYLIN-2735: Introduce an option to make job scheduler consider job priority
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/bd78cf1f Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/bd78cf1f Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/bd78cf1f Branch: refs/heads/yaho-cube-planner Commit: bd78cf1fc89067542eb05161eafab6a4ee97fe99 Parents: ab3ac1a Author: Zhong <nju_y...@apache.org> Authored: Thu Aug 31 14:56:19 2017 +0800 Committer: Zhong <nju_y...@apache.org> Committed: Fri Sep 8 11:33:05 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 8 ++ .../kylin/job/engine/JobEngineConfig.java | 14 +++ .../kylin/job/execution/AbstractExecutable.java | 9 ++ .../job/execution/CheckpointExecutable.java | 7 ++ .../job/execution/DefaultChainedExecutable.java | 7 ++ .../job/impl/threadpool/DefaultScheduler.java | 111 ++++++++++++++++++- .../org/apache/kylin/engine/mr/CubingJob.java | 19 +++- 7 files changed, 171 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/bd78cf1f/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index d66d7ce..53bb2ad 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -503,6 +503,14 @@ abstract public class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.job.scheduler.default", "0")); } + public boolean getSchedulerPriorityConsidered() { + return Boolean.parseBoolean(getOptional("kylin.job.scheduler.priority-considered", "false")); + } + + public Integer getSchedulerPriorityBarFetchFromQueue() { + return Integer.parseInt(getOptional("kylin.job.scheduler.priority-bar-fetch-from-queue", "20")); + } + public Integer getSchedulerPollIntervalSecond() { return Integer.parseInt(getOptional("kylin.job.scheduler.poll-interval-second", "30")); } http://git-wip-us.apache.org/repos/asf/kylin/blob/bd78cf1f/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java index 6890557..9ba602f 100644 --- a/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java +++ b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java @@ -105,6 +105,20 @@ public class JobEngineConfig { return config; } + /** + * @return if consider job priority when scheduling jobs + * */ + public boolean getJobPriorityConsidered() { + return config.getSchedulerPriorityConsidered(); + } + + /** + * @return the priority bar for fetching jobs from job priority queue + */ + public int getFetchQueuePriorityBar() { + return config.getSchedulerPriorityBarFetchFromQueue(); + } + public String getHdfsWorkingDirectory() { return config.getHdfsWorkingDirectory(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/bd78cf1f/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index 30b6421..a37cdc9 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -44,6 +44,8 @@ import com.google.common.collect.Maps; */ public abstract class AbstractExecutable implements Executable, Idempotent { + public static final Integer DEFAULT_PRIORITY = 10; + protected static final String SUBMITTER = "submitter"; protected static final String NOTIFY_LIST = "notify_list"; protected static final String START_TIME = "startTime"; @@ -389,6 +391,13 @@ public abstract class AbstractExecutable implements Executable, Idempotent { return output.getState() == ExecutableState.READY; } + /** + * The larger the value, the higher priority + * */ + public int getDefaultPriority() { + return DEFAULT_PRIORITY; + } + /* * discarded is triggered by JobService, the Scheduler is not awake of that * http://git-wip-us.apache.org/repos/asf/kylin/blob/bd78cf1f/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java index 9864400..db477cb 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java @@ -29,6 +29,8 @@ public class CheckpointExecutable extends DefaultChainedExecutable { private static final Logger logger = LoggerFactory.getLogger(CheckpointExecutable.class); + public static final Integer DEFAULT_PRIORITY = 30; + private static final String DEPLOY_ENV_NAME = "envName"; private static final String PROJECT_INSTANCE_NAME = "projectName"; @@ -75,4 +77,9 @@ public class CheckpointExecutable extends DefaultChainedExecutable { public void setProjectName(String name) { setParam(PROJECT_INSTANCE_NAME, name); } + + @Override + public int getDefaultPriority() { + return DEFAULT_PRIORITY; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/bd78cf1f/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java index 0ade541..8a816ba 100755 --- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java @@ -31,6 +31,8 @@ import com.google.common.collect.Maps; */ public class DefaultChainedExecutable extends AbstractExecutable implements ChainedExecutable { + public static final Integer DEFAULT_PRIORITY = 10; + private final List<AbstractExecutable> subTasks = Lists.newArrayList(); public DefaultChainedExecutable() { @@ -168,4 +170,9 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai executable.setId(getId() + "-" + String.format("%02d", subTasks.size())); this.subTasks.add(executable); } + + @Override + public int getDefaultPriority() { + return DEFAULT_PRIORITY; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/bd78cf1f/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java index 315671c..684bd5b 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java @@ -18,7 +18,9 @@ package org.apache.kylin.job.impl.threadpool; +import java.util.Comparator; import java.util.Map; +import java.util.PriorityQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -29,6 +31,7 @@ import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.SetThreadName; import org.apache.kylin.job.Scheduler; import org.apache.kylin.job.engine.JobEngineConfig; @@ -51,7 +54,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti private JobLock jobLock; private ExecutableManager executableManager; - private FetcherRunner fetcher; + private Runnable fetcher; private ScheduledExecutorService fetcherPool; private ExecutorService jobPool; private DefaultContext context; @@ -69,6 +72,110 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti } } + private class FetcherRunnerWithPriority implements Runnable { + volatile PriorityQueue<Pair<AbstractExecutable, Integer>> jobPriorityQueue = new PriorityQueue<>(1, + new Comparator<Pair<AbstractExecutable, Integer>>() { + @Override + public int compare(Pair<AbstractExecutable, Integer> o1, Pair<AbstractExecutable, Integer> o2) { + return o1.getSecond() > o2.getSecond() ? -1 : 1; + } + }); + + private void addToJobPool(AbstractExecutable executable, int priority) { + String jobDesc = executable.toString(); + logger.info(jobDesc + " prepare to schedule and its priority is " + priority); + try { + context.addRunningJob(executable); + jobPool.execute(new JobRunner(executable)); + logger.info(jobDesc + " scheduled"); + } catch (Exception ex) { + context.removeRunningJob(executable); + logger.warn(jobDesc + " fail to schedule", ex); + } + } + + @Override + synchronized public void run() { + try { + // logger.debug("Job Fetcher is running..."); + Map<String, Executable> runningJobs = context.getRunningJobs(); + + // fetch job from jobPriorityQueue first to reduce chance to scan job list + Map<String, Integer> leftJobPriorities = Maps.newHashMap(); + Pair<AbstractExecutable, Integer> executableWithPriority; + while ((executableWithPriority = jobPriorityQueue.peek()) != null + // the priority of jobs in pendingJobPriorities should be above a threshold + && executableWithPriority.getSecond() >= jobEngineConfig.getFetchQueuePriorityBar()) { + executableWithPriority = jobPriorityQueue.poll(); + AbstractExecutable executable = executableWithPriority.getFirst(); + int curPriority = executableWithPriority.getSecond(); + // the job should wait more than one time + if (curPriority > executable.getDefaultPriority() + 1) { + addToJobPool(executable, curPriority); + } else { + leftJobPriorities.put(executable.getId(), curPriority + 1); + } + } + + if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) { + logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time"); + return; + } + + while ((executableWithPriority = jobPriorityQueue.poll()) != null) { + leftJobPriorities.put(executableWithPriority.getFirst().getId(), + executableWithPriority.getSecond() + 1); + } + + int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0; + for (final String id : executableManager.getAllJobIds()) { + if (runningJobs.containsKey(id)) { + // logger.debug("Job id:" + id + " is already running"); + nRunning++; + continue; + } + + AbstractExecutable executable = executableManager.getJob(id); + if (!executable.isReady()) { + final Output output = executableManager.getOutput(id); + // logger.debug("Job id:" + id + " not runnable"); + if (output.getState() == ExecutableState.DISCARDED) { + nDiscarded++; + } else if (output.getState() == ExecutableState.ERROR) { + nError++; + } else if (output.getState() == ExecutableState.SUCCEED) { + nSUCCEED++; + } else if (output.getState() == ExecutableState.STOPPED) { + nStopped++; + } else { + nOthers++; + } + continue; + } + + nReady++; + Integer priority = leftJobPriorities.get(id); + if (priority == null) { + priority = executable.getDefaultPriority(); + } + jobPriorityQueue.add(new Pair<>(executable, priority)); + } + + while (runningJobs.size() < jobEngineConfig.getMaxConcurrentJobLimit() + && (executableWithPriority = jobPriorityQueue.poll()) != null) { + addToJobPool(executableWithPriority.getFirst(), executableWithPriority.getSecond()); + } + + logger.info("Job Fetcher: " + nRunning + " running, " + runningJobs.size() + " actual running, " + + nStopped + " stopped, " + nReady + " ready, " + jobPriorityQueue.size() + " waiting, " // + + nSUCCEED + " already succeed, " + nError + " error, " + nDiscarded + " discarded, " + nOthers + + " others"); + } catch (Exception e) { + logger.warn("Job Fetcher caught a exception " + e); + } + } + } + private class FetcherRunner implements Runnable { @Override @@ -218,7 +325,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti int pollSecond = jobEngineConfig.getPollIntervalSecond(); logger.info("Fetching jobs every {} seconds", pollSecond); - fetcher = new FetcherRunner(); + fetcher = jobEngineConfig.getJobPriorityConsidered() ? new FetcherRunnerWithPriority() : new FetcherRunner(); fetcherPool.scheduleAtFixedRate(fetcher, pollSecond / 10, pollSecond, TimeUnit.SECONDS); hasStarted = true; } http://git-wip-us.apache.org/repos/asf/kylin/blob/bd78cf1f/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java index 1fa56c4..056b943 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java @@ -65,12 +65,18 @@ public class CubingJob extends DefaultChainedExecutable { } public enum CubingJobTypeEnum { - BUILD("BUILD"), OPTIMIZE("OPTIMIZE"), MERGE("MERGE"); + BUILD("BUILD", 20), OPTIMIZE("OPTIMIZE", 5), MERGE("MERGE", 25); private final String name; + private final int defaultPriority; - CubingJobTypeEnum(String name) { + CubingJobTypeEnum(String name, int priority) { this.name = name; + this.defaultPriority = priority; + } + + public int getDefaultPriority() { + return defaultPriority; } public String toString() { @@ -151,6 +157,15 @@ public class CubingJob extends DefaultChainedExecutable { super(); } + @Override + public int getDefaultPriority() { + CubingJobTypeEnum jobType = CubingJobTypeEnum.getByName(getJobType()); + if (jobType == null) { + return super.getDefaultPriority(); + } + return jobType.getDefaultPriority(); + } + protected void setDeployEnvName(String name) { setParam(DEPLOY_ENV_NAME, name); }