KYLIN-2389 Improve resource utilization for DistributedScheduler
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/837bd820 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/837bd820 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/837bd820 Branch: refs/heads/master-hbase1.x Commit: 837bd8200b250f38fcfb2d221764d5aca0c66403 Parents: e894465 Author: kangkaisen <kangkai...@163.com> Authored: Fri Jan 13 19:58:41 2017 +0800 Committer: kangkaisen <kangkai...@163.com> Committed: Wed Jan 18 16:14:24 2017 +0800 ---------------------------------------------------------------------- .../impl/threadpool/DistributedScheduler.java | 8 +-- .../kylin/job/lock/DistributedJobLock.java | 2 + .../apache/kylin/rest/service/JobService.java | 45 -------------- .../hbase/util/ZookeeperDistributedJobLock.java | 63 ++++++++++++++++---- 4 files changed, 58 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/837bd820/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java index 3436529..84e62d5 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java @@ -195,13 +195,13 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn } } - //release job lock only when the all tasks of the job finish and the job server keep the cube lock. + //release job lock when job state is ready or running and the job server keep the cube lock. private void releaseJobLock(AbstractExecutable executable) { if (executable instanceof DefaultChainedExecutable) { String segmentId = executable.getParam(SEGMENT_ID); ExecutableState state = executable.getStatus(); - if (state == ExecutableState.SUCCEED || state == ExecutableState.ERROR || state == ExecutableState.DISCARDED) { + if (state != ExecutableState.READY && state != ExecutableState.RUNNING) { if (segmentWithLocks.contains(segmentId)) { logger.info(executable.toString() + " will release the lock for the segment: " + segmentId); jobLock.unlockWithName(segmentId); @@ -232,7 +232,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn if (executable instanceof DefaultChainedExecutable && executable.getParams().get(SEGMENT_ID).equalsIgnoreCase(segmentId) && !nodeData.equalsIgnoreCase(serverName)) { try { logger.warn(nodeData + " has released the lock for: " + segmentId + " but the job still running. so " + serverName + " resume the job"); - if (jobLock.lockWithName(segmentId, serverName)) { + if (!jobLock.isHasLocked(segmentId)) { executableManager.resumeRunningJobForce(executable.getId()); fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS); break; @@ -302,7 +302,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn AbstractExecutable executable = executableManager.getJob(id); if (output.getState() == ExecutableState.RUNNING && executable instanceof DefaultChainedExecutable) { try { - if (jobLock.lockWithName(executable.getParam(SEGMENT_ID), serverName)) { + if (!jobLock.isHasLocked(executable.getParam(SEGMENT_ID))) { executableManager.resumeRunningJobForce(executable.getId()); fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS); } http://git-wip-us.apache.org/repos/asf/kylin/blob/837bd820/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java index 9335e56..1c173ec 100644 --- a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java +++ b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java @@ -24,6 +24,8 @@ public interface DistributedJobLock extends JobLock { boolean lockWithName(String name, String serverName); + boolean isHasLocked(String segmentId); + void unlockWithName(String name); void watchLock(ExecutorService pool, DoWatchLock doWatch); http://git-wip-us.apache.org/repos/asf/kylin/blob/837bd820/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index 4709a91..ed24a9d 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -19,8 +19,6 @@ package org.apache.kylin.rest.service; import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.Calendar; import java.util.Collections; import java.util.Date; @@ -56,7 +54,6 @@ import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.Output; -import org.apache.kylin.job.lock.DistributedJobLock; import org.apache.kylin.job.lock.JobLock; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.realization.RealizationStatusEnum; @@ -282,15 +279,12 @@ public class JobService extends BasicService implements InitializingBean { SourcePartition sourcePartition = new SourcePartition(startDate, endDate, startOffset, endOffset, sourcePartitionOffsetStart, sourcePartitionOffsetEnd); sourcePartition = source.parsePartitionBeforeBuild(cube, sourcePartition); newSeg = getCubeManager().appendSegment(cube, sourcePartition); - lockSegment(newSeg.getUuid()); job = EngineFactory.createBatchCubingJob(newSeg, submitter); } else if (buildType == CubeBuildTypeEnum.MERGE) { newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, startOffset, endOffset, force); - lockSegment(newSeg.getUuid()); job = EngineFactory.createBatchMergeJob(newSeg, submitter); } else if (buildType == CubeBuildTypeEnum.REFRESH) { newSeg = getCubeManager().refreshSegment(cube, startDate, endDate, startOffset, endOffset); - lockSegment(newSeg.getUuid()); job = EngineFactory.createBatchCubingJob(newSeg, submitter); } else { throw new JobException("invalid build type:" + buildType); @@ -312,7 +306,6 @@ public class JobService extends BasicService implements InitializingBean { } } throw e; - } JobInstance jobInstance = getSingleJobInstance(job); @@ -454,15 +447,11 @@ public class JobService extends BasicService implements InitializingBean { @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')") public void resumeJob(JobInstance job) throws IOException, JobException { - lockSegment(job.getRelatedSegment()); - getExecutableManager().resumeJob(job.getId()); } @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')") public void rollbackJob(JobInstance job, String stepId) throws IOException, JobException { - lockSegment(job.getRelatedSegment()); - getExecutableManager().rollbackJob(job.getId(), stepId); } @@ -486,47 +475,15 @@ public class JobService extends BasicService implements InitializingBean { } getExecutableManager().discardJob(job.getId()); - //release the segment lock when discarded the job but the job hasn't scheduled - releaseSegmentLock(job.getRelatedSegment()); - return job; } - @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')") public JobInstance pauseJob(JobInstance job) throws IOException, JobException { getExecutableManager().pauseJob(job.getId()); - - //release the segment lock when discarded the job but the job hasn't scheduled - releaseSegmentLock(job.getRelatedSegment()); - return job; } - private void lockSegment(String segmentId) throws JobException { - if (jobLock instanceof DistributedJobLock) { - if (!((DistributedJobLock) jobLock).lockWithName(segmentId, getServerName())) { - throw new JobException("Fail to get the segment lock, the segment may be building in another job server"); - } - } - } - - private void releaseSegmentLock(String segmentId) { - if (jobLock instanceof DistributedJobLock) { - ((DistributedJobLock) jobLock).unlockWithName(segmentId); - } - } - - private String getServerName() { - String serverName = null; - try { - serverName = InetAddress.getLocalHost().getHostName(); - } catch (UnknownHostException e) { - logger.error("fail to get the hostname"); - } - return serverName; - } - public List<CubingJob> listAllCubingJobs(final String cubeName, final String projectName, final Set<ExecutableState> statusList, final Map<String, Output> allOutputs) { return listAllCubingJobs(cubeName, projectName, statusList, -1L, -1L, allOutputs); } @@ -584,6 +541,4 @@ public class JobService extends BasicService implements InitializingBean { public List<CubingJob> listAllCubingJobs(final String cubeName, final String projectName) { return listAllCubingJobs(cubeName, projectName, EnumSet.allOf(ExecutableState.class), getExecutableManager().getAllOutputs()); } - - } http://git-wip-us.apache.org/repos/asf/kylin/blob/837bd820/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java index 5a44cc1..ee7cd50 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java @@ -96,6 +96,8 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock { * @param serverName the hostname of job server * * @return <tt>true</tt> if the segment locked successfully + * + * @since 2.0 */ @Override @@ -110,13 +112,13 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock { return false; } if (zkClient.checkExists().forPath(lockPath) != null) { - if (hasLock(serverName, lockPath)) { + if (isKeepLock(serverName, lockPath)) { hasLock = true; logger.info(serverName + " has kept the lock for segment: " + segmentId); } } else { zkClient.create().withMode(CreateMode.EPHEMERAL).forPath(lockPath, serverName.getBytes(Charset.forName("UTF-8"))); - if (hasLock(serverName, lockPath)) { + if (isKeepLock(serverName, lockPath)) { hasLock = true; logger.info(serverName + " lock the segment: " + segmentId + " successfully"); } @@ -131,19 +133,54 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock { return true; } - private boolean hasLock(String serverName, String lockPath) { - String lockServerName = null; + /** + * + * Returns <tt>true</tt> if, the job server is keeping the lock for the lockPath + * + * @param serverName the hostname of job server + * + * @param lockPath the zookeeper node path of segment + * + * @return <tt>true</tt> if the job server is keeping the lock for the lockPath, otherwise + * <tt>false</tt> + * + * @since 2.0 + */ + + private boolean isKeepLock(String serverName, String lockPath) { try { if (zkClient.checkExists().forPath(lockPath) != null) { byte[] data = zkClient.getData().forPath(lockPath); - lockServerName = new String(data, Charset.forName("UTF-8")); + String lockServerName = new String(data, Charset.forName("UTF-8")); + return lockServerName.equalsIgnoreCase(serverName); } } catch (Exception e) { logger.error("fail to get the serverName for the path: " + lockPath, e); } - if(lockServerName == null) - return false; - return lockServerName.equalsIgnoreCase(serverName); + return false; + } + + /** + * + * Returns <tt>true</tt> if, and only if, the segment has been locked. + * + * @param segmentId the id of segment need to release the lock. + * + * @return <tt>true</tt> if the segment has been locked, otherwise + * <tt>false</tt> + * + * @since 2.0 + */ + + @Override + public boolean isHasLocked(String segmentId) { + String lockPath = getLockPath(segmentId); + try { + return zkClient.checkExists().forPath(lockPath) != null; + } catch (Exception e) { + logger.error("fail to get the path: " + lockPath, e); + } + return false; } /** @@ -151,7 +188,9 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock { * * <p> the segment related zookeeper node will be deleted. * - * @param segmentId the name of segment need to release the lock + * @param segmentId the id of segment need to release the lock + * + * @since 2.0 */ @Override @@ -177,7 +216,10 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock { * in order to when one job server is down, other job server can take over the running jobs. * * @param pool the threadPool watching the zookeeper node change + * * @param doWatch do the concrete action with the zookeeper node path and zookeeper node data + * + * @since 2.0 */ @Override @@ -229,9 +271,8 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock { @Override public void unlock() { - } - + public void close() { try { childrenCache.close();