KYLIN-2389 Improve resource utilization for DistributedScheduler

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/837bd820
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/837bd820
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/837bd820

Branch: refs/heads/master-hbase1.x
Commit: 837bd8200b250f38fcfb2d221764d5aca0c66403
Parents: e894465
Author: kangkaisen <kangkai...@163.com>
Authored: Fri Jan 13 19:58:41 2017 +0800
Committer: kangkaisen <kangkai...@163.com>
Committed: Wed Jan 18 16:14:24 2017 +0800

----------------------------------------------------------------------
 .../impl/threadpool/DistributedScheduler.java   |  8 +--
 .../kylin/job/lock/DistributedJobLock.java      |  2 +
 .../apache/kylin/rest/service/JobService.java   | 45 --------------
 .../hbase/util/ZookeeperDistributedJobLock.java | 63 ++++++++++++++++----
 4 files changed, 58 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/837bd820/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 3436529..84e62d5 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -195,13 +195,13 @@ public class DistributedScheduler implements 
Scheduler<AbstractExecutable>, Conn
             }
         }
 
-        //release job lock only when the all tasks of the job finish and the 
job server keep the cube lock.
+        //release job lock when job state is ready or running and the job 
server keep the cube lock.
         private void releaseJobLock(AbstractExecutable executable) {
             if (executable instanceof DefaultChainedExecutable) {
                 String segmentId = executable.getParam(SEGMENT_ID);
                 ExecutableState state = executable.getStatus();
 
-                if (state == ExecutableState.SUCCEED || state == 
ExecutableState.ERROR || state == ExecutableState.DISCARDED) {
+                if (state != ExecutableState.READY && state != 
ExecutableState.RUNNING) {
                     if (segmentWithLocks.contains(segmentId)) {
                         logger.info(executable.toString() + " will release the 
lock for the segment: " + segmentId);
                         jobLock.unlockWithName(segmentId);
@@ -232,7 +232,7 @@ public class DistributedScheduler implements 
Scheduler<AbstractExecutable>, Conn
                     if (executable instanceof DefaultChainedExecutable && 
executable.getParams().get(SEGMENT_ID).equalsIgnoreCase(segmentId) && 
!nodeData.equalsIgnoreCase(serverName)) {
                         try {
                             logger.warn(nodeData + " has released the lock 
for: " + segmentId + " but the job still running. so " + serverName + " resume 
the job");
-                            if (jobLock.lockWithName(segmentId, serverName)) {
+                            if (!jobLock.isHasLocked(segmentId)) {
                                 
executableManager.resumeRunningJobForce(executable.getId());
                                 fetcherPool.schedule(fetcher, 0, 
TimeUnit.SECONDS);
                                 break;
@@ -302,7 +302,7 @@ public class DistributedScheduler implements 
Scheduler<AbstractExecutable>, Conn
             AbstractExecutable executable = executableManager.getJob(id);
             if (output.getState() == ExecutableState.RUNNING && executable 
instanceof DefaultChainedExecutable) {
                 try {
-                    if (jobLock.lockWithName(executable.getParam(SEGMENT_ID), 
serverName)) {
+                    if (!jobLock.isHasLocked(executable.getParam(SEGMENT_ID))) 
{
                         
executableManager.resumeRunningJobForce(executable.getId());
                         fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
                     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/837bd820/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java 
b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
index 9335e56..1c173ec 100644
--- a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
@@ -24,6 +24,8 @@ public interface DistributedJobLock extends JobLock {
     
     boolean lockWithName(String name, String serverName);
 
+    boolean isHasLocked(String segmentId);
+
     void unlockWithName(String name);
 
     void watchLock(ExecutorService pool, DoWatchLock doWatch);

http://git-wip-us.apache.org/repos/asf/kylin/blob/837bd820/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 4709a91..ed24a9d 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -19,8 +19,6 @@
 package org.apache.kylin.rest.service;
 
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.Calendar;
 import java.util.Collections;
 import java.util.Date;
@@ -56,7 +54,6 @@ import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.lock.DistributedJobLock;
 import org.apache.kylin.job.lock.JobLock;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
@@ -282,15 +279,12 @@ public class JobService extends BasicService implements 
InitializingBean {
                 SourcePartition sourcePartition = new 
SourcePartition(startDate, endDate, startOffset, endOffset, 
sourcePartitionOffsetStart, sourcePartitionOffsetEnd);
                 sourcePartition = source.parsePartitionBeforeBuild(cube, 
sourcePartition);
                 newSeg = getCubeManager().appendSegment(cube, sourcePartition);
-                lockSegment(newSeg.getUuid());
                 job = EngineFactory.createBatchCubingJob(newSeg, submitter);
             } else if (buildType == CubeBuildTypeEnum.MERGE) {
                 newSeg = getCubeManager().mergeSegments(cube, startDate, 
endDate, startOffset, endOffset, force);
-                lockSegment(newSeg.getUuid());
                 job = EngineFactory.createBatchMergeJob(newSeg, submitter);
             } else if (buildType == CubeBuildTypeEnum.REFRESH) {
                 newSeg = getCubeManager().refreshSegment(cube, startDate, 
endDate, startOffset, endOffset);
-                lockSegment(newSeg.getUuid());
                 job = EngineFactory.createBatchCubingJob(newSeg, submitter);
             } else {
                 throw new JobException("invalid build type:" + buildType);
@@ -312,7 +306,6 @@ public class JobService extends BasicService implements 
InitializingBean {
                 }
             }
             throw e;
-
         }
 
         JobInstance jobInstance = getSingleJobInstance(job);
@@ -454,15 +447,11 @@ public class JobService extends BasicService implements 
InitializingBean {
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 
'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 
'MANAGEMENT')")
     public void resumeJob(JobInstance job) throws IOException, JobException {
-        lockSegment(job.getRelatedSegment());
-
         getExecutableManager().resumeJob(job.getId());
     }
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 
'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 
'MANAGEMENT')")
     public void rollbackJob(JobInstance job, String stepId) throws 
IOException, JobException {
-        lockSegment(job.getRelatedSegment());
-
         getExecutableManager().rollbackJob(job.getId(), stepId);
     }
 
@@ -486,47 +475,15 @@ public class JobService extends BasicService implements 
InitializingBean {
         }
         getExecutableManager().discardJob(job.getId());
 
-        //release the segment lock when discarded the job but the job hasn't 
scheduled
-        releaseSegmentLock(job.getRelatedSegment());
-
         return job;
     }
 
-
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 
'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 
'MANAGEMENT')")
     public JobInstance pauseJob(JobInstance job) throws IOException, 
JobException {
         getExecutableManager().pauseJob(job.getId());
-
-        //release the segment lock when discarded the job but the job hasn't 
scheduled
-        releaseSegmentLock(job.getRelatedSegment());
-
         return job;
     }
 
-    private void lockSegment(String segmentId) throws JobException {
-        if (jobLock instanceof DistributedJobLock) {
-            if (!((DistributedJobLock) jobLock).lockWithName(segmentId, 
getServerName())) {
-                throw new JobException("Fail to get the segment lock, the 
segment may be building in another job server");
-            }
-        }
-    }
-
-    private void releaseSegmentLock(String segmentId) {
-        if (jobLock instanceof DistributedJobLock) {
-            ((DistributedJobLock) jobLock).unlockWithName(segmentId);
-        }
-    }
-
-    private String getServerName() {
-        String serverName = null;
-        try {
-            serverName = InetAddress.getLocalHost().getHostName();
-        } catch (UnknownHostException e) {
-            logger.error("fail to get the hostname");
-        }
-        return serverName;
-    }
-    
     public List<CubingJob> listAllCubingJobs(final String cubeName, final 
String projectName, final Set<ExecutableState> statusList, final Map<String, 
Output> allOutputs) {
         return listAllCubingJobs(cubeName, projectName, statusList, -1L, -1L, 
allOutputs);
     }
@@ -584,6 +541,4 @@ public class JobService extends BasicService implements 
InitializingBean {
     public List<CubingJob> listAllCubingJobs(final String cubeName, final 
String projectName) {
         return listAllCubingJobs(cubeName, projectName, 
EnumSet.allOf(ExecutableState.class), getExecutableManager().getAllOutputs());
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/837bd820/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
index 5a44cc1..ee7cd50 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
@@ -96,6 +96,8 @@ public class ZookeeperDistributedJobLock implements 
DistributedJobLock {
      * @param serverName the hostname of job server
      *
      * @return <tt>true</tt> if the segment locked successfully
+     *
+     * @since 2.0
      */
 
     @Override
@@ -110,13 +112,13 @@ public class ZookeeperDistributedJobLock implements 
DistributedJobLock {
                 return false;
             }
             if (zkClient.checkExists().forPath(lockPath) != null) {
-                if (hasLock(serverName, lockPath)) {
+                if (isKeepLock(serverName, lockPath)) {
                     hasLock = true;
                     logger.info(serverName + " has kept the lock for segment: 
" + segmentId);
                 }
             } else {
                 
zkClient.create().withMode(CreateMode.EPHEMERAL).forPath(lockPath, 
serverName.getBytes(Charset.forName("UTF-8")));
-                if (hasLock(serverName, lockPath)) {
+                if (isKeepLock(serverName, lockPath)) {
                     hasLock = true;
                     logger.info(serverName + " lock the segment: " + segmentId 
+ " successfully");
                 }
@@ -131,19 +133,54 @@ public class ZookeeperDistributedJobLock implements 
DistributedJobLock {
         return true;
     }
 
-    private boolean hasLock(String serverName, String lockPath) {
-        String lockServerName = null;
+    /**
+     *
+     * Returns <tt>true</tt> if, the job server is keeping the lock for the 
lockPath
+     *
+     * @param serverName the hostname of job server
+     *
+     * @param lockPath the zookeeper node path of segment
+     *
+     * @return <tt>true</tt> if the job server is keeping the lock for the 
lockPath, otherwise
+     * <tt>false</tt>
+     *
+     * @since 2.0
+     */
+
+    private boolean isKeepLock(String serverName, String lockPath) {
         try {
             if (zkClient.checkExists().forPath(lockPath) != null) {
                 byte[] data = zkClient.getData().forPath(lockPath);
-                lockServerName = new String(data, Charset.forName("UTF-8"));
+                String lockServerName = new String(data, 
Charset.forName("UTF-8"));
+                return lockServerName.equalsIgnoreCase(serverName);
             }
         } catch (Exception e) {
             logger.error("fail to get the serverName for the path: " + 
lockPath, e);
         }
-        if(lockServerName == null)
-            return false;
-        return lockServerName.equalsIgnoreCase(serverName);
+        return false;
+    }
+
+    /**
+     *
+     * Returns <tt>true</tt> if, and only if, the segment has been locked.
+     *
+     * @param segmentId the id of segment need to release the lock.
+     *
+     * @return <tt>true</tt> if the segment has been locked, otherwise
+     * <tt>false</tt>
+     *
+     * @since 2.0
+     */
+
+    @Override
+    public boolean isHasLocked(String segmentId) {
+        String lockPath = getLockPath(segmentId);
+        try {
+            return zkClient.checkExists().forPath(lockPath) != null;
+        } catch (Exception e) {
+            logger.error("fail to get the path: " + lockPath, e);
+        }
+        return false;
     }
 
     /**
@@ -151,7 +188,9 @@ public class ZookeeperDistributedJobLock implements 
DistributedJobLock {
      *
      * <p> the segment related zookeeper node will be deleted.
      *
-     * @param segmentId the name of segment need to release the lock
+     * @param segmentId the id of segment need to release the lock
+     *
+     * @since 2.0
      */
 
     @Override
@@ -177,7 +216,10 @@ public class ZookeeperDistributedJobLock implements 
DistributedJobLock {
      * in order to when one job server is down, other job server can take over 
the running jobs.
      *
      * @param pool the threadPool watching the zookeeper node change
+     *
      * @param doWatch do the concrete action with the zookeeper node path and 
zookeeper node data
+     *
+     * @since 2.0
      */
 
     @Override
@@ -229,9 +271,8 @@ public class ZookeeperDistributedJobLock implements 
DistributedJobLock {
 
     @Override
     public void unlock() {
-
     }
-    
+
     public void close() {
         try {
             childrenCache.close();

Reply via email to