Repository: kylin Updated Branches: refs/heads/KYLIN-2578 c1d5e09f7 -> 5a8a1f291
fix UT Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5a8a1f29 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5a8a1f29 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5a8a1f29 Branch: refs/heads/KYLIN-2578 Commit: 5a8a1f291ea70e81160427d5af46d67783614e59 Parents: c1d5e09 Author: Yang Li <[email protected]> Authored: Sat May 6 23:06:10 2017 +0800 Committer: Yang Li <[email protected]> Committed: Sat May 6 23:06:10 2017 +0800 ---------------------------------------------------------------------- .../kylin/common/lock/DistributedLock.java | 5 +++++ .../hbase/util/ZookeeperDistributedLock.java | 22 ++++++++++++-------- .../storage/hbase/util/ZookeeperJobLock.java | 5 +++++ .../util/ZookeeperDistributedLockTest.java | 10 +++++---- 4 files changed, 29 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/5a8a1f29/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java index 7f5f13b..05d5354 100644 --- a/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java +++ b/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java @@ -51,6 +51,11 @@ public interface DistributedLock { boolean isLocked(String lockPath); /** + * Returns if lock is available at given path. + */ + boolean isLockedByMe(String lockPath); + + /** * Returns the owner of a lock path; returns null if the path is not locked by any one. */ String peekLock(String lockPath); http://git-wip-us.apache.org/repos/asf/kylin/blob/5a8a1f29/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java index f3b477f..156dbe3 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java @@ -114,6 +114,9 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock { final byte[] clientBytes; private ZookeeperDistributedLock(CuratorFramework curator, String client) { + if (client == null) + throw new NullPointerException("client must not be null"); + this.curator = curator; this.client = client; this.clientBytes = client.getBytes(Charset.forName("UTF-8")); @@ -129,21 +132,17 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock { logger.debug(client + " trying to lock " + lockPath); try { - if (client.equals(peekLock(lockPath))) { - return true; - } - curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(lockPath, clientBytes); - - if (client.equals(peekLock(lockPath))) { - logger.info(client + " acquired lock at " + lockPath); - return true; - } } catch (KeeperException.NodeExistsException ex) { logger.debug(lockPath + " is already locked"); } catch (Exception ex) { throw new RuntimeException("Error while " + client + " trying to lock " + lockPath, ex); } + + if (isLockedByMe(lockPath)) { + logger.info(client + " acquired lock at " + lockPath); + return true; + } return false; } @@ -195,6 +194,11 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock { } @Override + public boolean isLockedByMe(String lockPath) { + return client.equals(peekLock(lockPath)); + } + + @Override public void unlock(String lockPath) { logger.debug(client + " trying to unlock " + lockPath); http://git-wip-us.apache.org/repos/asf/kylin/blob/5a8a1f29/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java index 1063e40..64f246a 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java @@ -56,6 +56,11 @@ public class ZookeeperJobLock implements DistributedLock, JobLock { } @Override + public boolean isLockedByMe(String lockPath) { + return lock.isLockedByMe(lockPath); + } + + @Override public void unlock(String lockPath) { lock.unlock(lockPath); } http://git-wip-us.apache.org/repos/asf/kylin/blob/5a8a1f29/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLockTest.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLockTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLockTest.java index ead5963..b544f6c 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLockTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLockTest.java @@ -171,7 +171,7 @@ public class ZookeeperDistributedLockTest extends HBaseMetadataTestCase { for (int i = 0; i < nClients; i++) { threads[i].join(); } - + // verify sum assertEquals(0, countSum.get()); int expectedScore = 0; @@ -213,9 +213,11 @@ public class ZookeeperDistributedLockTest extends HBaseMetadataTestCase { // random lock int lockIdx = rand.nextInt(nLocks); - boolean locked = client.lock(lockPaths[lockIdx]); - if (locked) - counter += (lockIdx + 1); + if (client.isLockedByMe(lockPaths[lockIdx]) == false) { + boolean locked = client.lock(lockPaths[lockIdx]); + if (locked) + counter += (lockIdx + 1); + } // random unlock try {
