shaofengshi closed pull request #286: KYLIN-3617 Use job's cache in job 
scheduler
URL: https://github.com/apache/kylin/pull/286
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java 
b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
index 0cc6c8e5fb..8352005234 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
@@ -23,6 +23,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.NavigableSet;
+import java.util.Set;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
@@ -241,6 +242,10 @@ private long writeJobOutputResource(String path, 
ExecutableOutputPO output) thro
         }
     }
 
+    public ExecutableOutputPO getJobOutputDigest(String uuid) {
+        return executableOutputDigestMap.get(uuid);
+    }
+
     public List<ExecutableOutputPO> getJobOutputDigests(long timeStart, long 
timeEndExclusive) {
         List<ExecutableOutputPO> jobOutputDigests = Lists.newArrayList();
         for (ExecutableOutputPO po : executableOutputDigestMap.values()) {
@@ -268,6 +273,10 @@ private long writeJobOutputResource(String path, 
ExecutableOutputPO output) thro
         }
     }
 
+    public ExecutablePO getJobDigest(String uuid) {
+        return executableDigestMap.get(uuid);
+    }
+
     public List<ExecutablePO> getJobDigests(long timeStart, long 
timeEndExclusive) {
         List<ExecutablePO> jobDigests = Lists.newArrayList();
         for (ExecutablePO po : executableDigestMap.values()) {
@@ -277,6 +286,11 @@ private long writeJobOutputResource(String path, 
ExecutableOutputPO output) thro
         return jobDigests;
     }
 
+    public List<String> getJobIdsInCache() {
+        Set<String> idSet = executableDigestMap.keySet();
+        return Lists.newArrayList(idSet);
+    }
+
     public List<String> getJobIds() throws PersistentException {
         try {
             NavigableSet<String> resources = 
store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT);
@@ -391,4 +405,13 @@ public void deleteJobOutput(String uuid) throws 
PersistentException {
             throw new PersistentException(e);
         }
     }
+
+    public void reloadAll() throws IOException {
+        try (AutoReadWriteLock.AutoLock lock = 
executableDigestMapLock.lockForWrite()) {
+            executableDigestCrud.reloadAll();
+        }
+        try (AutoReadWriteLock.AutoLock lock = 
executableOutputDigestMapLock.lockForWrite()) {
+            executableOutputDigestCrud.reloadAll();
+        }
+    }
 }
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java 
b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 5cc8a0f7d7..b866618e05 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -155,6 +155,10 @@ public AbstractExecutable getJob(String uuid) {
         }
     }
 
+    public AbstractExecutable getJobDigest(String uuid) {
+        return parseTo(executableDao.getJobDigest(uuid));
+    }
+
     public Output getOutput(String uuid) {
         try {
             final ExecutableOutputPO jobOutput = 
executableDao.getJobOutput(uuid);
@@ -166,6 +170,12 @@ public Output getOutput(String uuid) {
         }
     }
 
+    public Output getOutputDigest(String uuid) {
+        final ExecutableOutputPO jobOutput = 
executableDao.getJobOutputDigest(uuid);
+        Preconditions.checkArgument(jobOutput != null, "there is no related 
output for job id:" + uuid);
+        return parseOutput(jobOutput);
+    }
+
     private DefaultOutput parseOutput(ExecutableOutputPO jobOutput) {
         final DefaultOutput result = new DefaultOutput();
         result.setExtra(jobOutput.getInfo());
@@ -286,6 +296,10 @@ public void updateAllRunningJobsToError() {
         }
     }
 
+    public List<String> getAllJobIdsInCache() {
+        return executableDao.getJobIdsInCache();
+    }
+
     public void resumeAllRunningJobs() {
         try {
             final List<ExecutableOutputPO> jobOutputs = 
executableDao.getJobOutputs();
@@ -439,6 +453,10 @@ public void updateJobOutput(String jobId, ExecutableState 
newStatus, Map<String,
         }
     }
 
+    public void reloadAll() throws IOException {
+        executableDao.reloadAll();
+    }
+
     public void forceKillJob(String jobId) {
         try {
             final ExecutableOutputPO jobOutput = 
executableDao.getJobOutput(jobId);
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
index e5f15fe49d..877c0d01d0 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
@@ -50,7 +50,7 @@ synchronized public void run() {
             }
 
             int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 
0, nDiscarded = 0, nSUCCEED = 0;
-            for (final String id : executableManager.getAllJobIds()) {
+            for (final String id : executableManager.getAllJobIdsInCache()) {
                 if (isJobPoolFull()) {
                     return;
                 }
@@ -60,16 +60,16 @@ synchronized public void run() {
                     continue;
                 }
 
-                final Output output = executableManager.getOutput(id);
-                if ((output.getState() != ExecutableState.READY)) {
+                final Output outputDigest = 
executableManager.getOutputDigest(id);
+                if ((outputDigest.getState() != ExecutableState.READY)) {
                     // logger.debug("Job id:" + id + " not runnable");
-                    if (output.getState() == ExecutableState.SUCCEED) {
+                    if (outputDigest.getState() == ExecutableState.SUCCEED) {
                         nSUCCEED++;
-                    } else if (output.getState() == ExecutableState.ERROR) {
+                    } else if (outputDigest.getState() == 
ExecutableState.ERROR) {
                         nError++;
-                    } else if (output.getState() == ExecutableState.DISCARDED) 
{
+                    } else if (outputDigest.getState() == 
ExecutableState.DISCARDED) {
                         nDiscarded++;
-                    } else if (output.getState() == ExecutableState.STOPPED) {
+                    } else if (outputDigest.getState() == 
ExecutableState.STOPPED) {
                         nStopped++;
                     } else {
                         if (fetchFailed) {
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 5dd2c7c80d..bcd6c81844 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -78,7 +78,6 @@ public static synchronized void destroyInstance() {
     // 
============================================================================
 
     private JobLock jobLock;
-    private ExecutableManager executableManager;
     private FetcherRunner fetcher;
     private ScheduledExecutorService fetcherPool;
     private ExecutorService jobPool;
@@ -95,6 +94,10 @@ public DefaultScheduler() {
         }
     }
 
+    public ExecutableManager getExecutableManager() {
+        return ExecutableManager.getInstance(jobEngineConfig.getConfig());
+    }
+
     public FetcherRunner getFetcherRunner() {
         return fetcher;
     }
@@ -159,7 +162,6 @@ public synchronized void init(JobEngineConfig 
jobEngineConfig, JobLock lock) thr
             throw new IllegalStateException("Cannot start job scheduler due to 
lack of job lock");
         }
 
-        executableManager = 
ExecutableManager.getInstance(jobEngineConfig.getConfig());
         //load all executable, set them to a consistent status
         fetcherPool = Executors.newScheduledThreadPool(1);
         int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit();
@@ -168,6 +170,7 @@ public synchronized void init(JobEngineConfig 
jobEngineConfig, JobLock lock) thr
         context = new DefaultContext(Maps.<String, Executable> 
newConcurrentMap(), jobEngineConfig.getConfig());
 
         logger.info("Staring resume all running jobs.");
+        ExecutableManager executableManager = getExecutableManager();
         executableManager.resumeAllRunningJobs();
         logger.info("Finishing resume all running jobs.");
 
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
index b562fac8c1..1d13afdcf8 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
@@ -86,23 +86,23 @@ synchronized public void run() {
             }
 
             int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 
0, nDiscarded = 0, nSUCCEED = 0;
-            for (final String id : executableManager.getAllJobIds()) {
+            for (final String id : executableManager.getAllJobIdsInCache()) {
                 if (runningJobs.containsKey(id)) {
                     // logger.debug("Job id:" + id + " is already running");
                     nRunning++;
                     continue;
                 }
 
-                final Output output = executableManager.getOutput(id);
-                if ((output.getState() != ExecutableState.READY)) {
+                final Output outputDigest = 
executableManager.getOutputDigest(id);
+                if ((outputDigest.getState() != ExecutableState.READY)) {
                     // logger.debug("Job id:" + id + " not runnable");
-                    if (output.getState() == ExecutableState.SUCCEED) {
+                    if (outputDigest.getState() == ExecutableState.SUCCEED) {
                         nSUCCEED++;
-                    } else if (output.getState() == ExecutableState.ERROR) {
+                    } else if (outputDigest.getState() == 
ExecutableState.ERROR) {
                         nError++;
-                    } else if (output.getState() == ExecutableState.DISCARDED) 
{
+                    } else if (outputDigest.getState() == 
ExecutableState.DISCARDED) {
                         nDiscarded++;
-                    } else if (output.getState() == ExecutableState.STOPPED) {
+                    } else if (outputDigest.getState() == 
ExecutableState.STOPPED) {
                         nStopped++;
                     } else {
                         if (fetchFailed) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to