Repository: incubator-distributedlog Updated Branches: refs/heads/master b23291a2a -> 89613fb75
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestDistributedLock.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestDistributedLock.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestDistributedLock.java index 2074c9e..1be7fe3 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestDistributedLock.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestDistributedLock.java @@ -127,7 +127,7 @@ public class TestDistributedLock extends TestDistributedLogBase { this.lockStateExecutor = lockStateExecutor; } - public DistributedLock createLock(int id, ZooKeeperClient zkc) throws Exception { + public ZKDistributedLock createLock(int id, ZooKeeperClient zkc) throws Exception { SessionLockFactory lockFactory = new ZKSessionLockFactory( zkc, clientId + id, @@ -136,7 +136,7 @@ public class TestDistributedLock extends TestDistributedLogBase { Long.MAX_VALUE, sessionTimeoutMs, NullStatsLogger.INSTANCE); - return new DistributedLock( + return new ZKDistributedLock( this.lockStateExecutor, lockFactory, this.lockPath, @@ -190,9 +190,9 @@ public class TestDistributedLock extends TestDistributedLogBase { NullStatsLogger.INSTANCE); } - private static void checkLockAndReacquire(DistributedLock lock, boolean sync) throws Exception { + private static void checkLockAndReacquire(ZKDistributedLock lock, boolean sync) throws Exception { lock.checkOwnershipAndReacquire(); - Future<DistributedLock> reacquireFuture = lock.getLockReacquireFuture(); + Future<ZKDistributedLock> reacquireFuture = lock.getLockReacquireFuture(); if (null != reacquireFuture && sync) { FutureUtils.result(reacquireFuture); } @@ -210,7 +210,7 @@ public class TestDistributedLock extends TestDistributedLogBase { SessionLockFactory lockFactory = createLockFactory(clientId, zkc, Long.MAX_VALUE, 0); try { try { - DistributedLock lock = new DistributedLock(lockStateExecutor, lockFactory, lockPath, + ZKDistributedLock lock = new ZKDistributedLock(lockStateExecutor, lockFactory, lockPath, Long.MAX_VALUE, NullStatsLogger.INSTANCE); FutureUtils.result(lock.asyncAcquire()); fail("Should fail on creating lock if couldn't establishing connections to zookeeper"); @@ -226,7 +226,7 @@ public class TestDistributedLock extends TestDistributedLogBase { lockFactory = createLockFactory(clientId, zkc, Long.MAX_VALUE, 3); try { try { - DistributedLock lock = new DistributedLock(lockStateExecutor, lockFactory, lockPath, + ZKDistributedLock lock = new ZKDistributedLock(lockStateExecutor, lockFactory, lockPath, Long.MAX_VALUE, NullStatsLogger.INSTANCE); FutureUtils.result(lock.asyncAcquire()); fail("Should fail on creating lock if couldn't establishing connections to zookeeper after 3 retries"); @@ -241,7 +241,7 @@ public class TestDistributedLock extends TestDistributedLogBase { new CountDownThrowFailPointAction(0, 3)); lockFactory = createLockFactory(clientId, zkc, Long.MAX_VALUE, 5); try { - DistributedLock lock = new DistributedLock(lockStateExecutor, lockFactory, lockPath, + ZKDistributedLock lock = new ZKDistributedLock(lockStateExecutor, lockFactory, lockPath, Long.MAX_VALUE, NullStatsLogger.INSTANCE); FutureUtils.result(lock.asyncAcquire()); @@ -266,7 +266,7 @@ public class TestDistributedLock extends TestDistributedLogBase { createLockPath(zkc.get(), lockPath); SessionLockFactory lockFactory = createLockFactory(clientId, zkc); - DistributedLock lock = new DistributedLock(lockStateExecutor, lockFactory, lockPath, + ZKDistributedLock lock = new ZKDistributedLock(lockStateExecutor, lockFactory, lockPath, Long.MAX_VALUE, NullStatsLogger.INSTANCE); FutureUtils.result(lock.asyncAcquire()); @@ -283,7 +283,7 @@ public class TestDistributedLock extends TestDistributedLogBase { assertEquals(0, children.size()); assertFalse(lock.haveLock()); - lock = new DistributedLock(lockStateExecutor, lockFactory, lockPath, + lock = new ZKDistributedLock(lockStateExecutor, lockFactory, lockPath, Long.MAX_VALUE, NullStatsLogger.INSTANCE); FutureUtils.result(lock.asyncAcquire()); @@ -321,8 +321,8 @@ public class TestDistributedLock extends TestDistributedLogBase { createLockPath(zkc.get(), lockPath); SessionLockFactory lockFactory0 = createLockFactory(clientId, zkc0); - DistributedLock lock0 = - new DistributedLock(lockStateExecutor, lockFactory0, lockPath, + ZKDistributedLock lock0 = + new ZKDistributedLock(lockStateExecutor, lockFactory0, lockPath, Long.MAX_VALUE, NullStatsLogger.INSTANCE); FutureUtils.result(lock0.asyncAcquire()); @@ -351,8 +351,8 @@ public class TestDistributedLock extends TestDistributedLogBase { SessionLockFactory lockFactory = createLockFactory(clientId, zkc); - final DistributedLock lock1 = - new DistributedLock(lockStateExecutor, lockFactory, lockPath, + final ZKDistributedLock lock1 = + new ZKDistributedLock(lockStateExecutor, lockFactory, lockPath, Long.MAX_VALUE, NullStatsLogger.INSTANCE); final CountDownLatch lockLatch = new CountDownLatch(1); Thread lockThread = new Thread(new Runnable() { @@ -421,8 +421,8 @@ public class TestDistributedLock extends TestDistributedLogBase { createLockPath(zkc.get(), lockPath); SessionLockFactory lockFactory0 = createLockFactory(clientId, zkc0); - DistributedLock lock0 = - new DistributedLock(lockStateExecutor, lockFactory0, lockPath, + ZKDistributedLock lock0 = + new ZKDistributedLock(lockStateExecutor, lockFactory0, lockPath, Long.MAX_VALUE, NullStatsLogger.INSTANCE); FutureUtils.result(lock0.asyncAcquire()); @@ -441,7 +441,7 @@ public class TestDistributedLock extends TestDistributedLogBase { checkLockAndReacquire(lock0, false); } else { // session expire will trigger lock re-acquisition - Future<DistributedLock> asyncLockAcquireFuture; + Future<ZKDistributedLock> asyncLockAcquireFuture; do { Thread.sleep(1); asyncLockAcquireFuture = lock0.getLockReacquireFuture(); @@ -492,15 +492,15 @@ public class TestDistributedLock extends TestDistributedLogBase { createLockPath(zkc.get(), lockPath); SessionLockFactory lockFactory0 = createLockFactory(clientId, zkc0); - DistributedLock lock0 = - new DistributedLock(lockStateExecutor, lockFactory0, lockPath, + ZKDistributedLock lock0 = + new ZKDistributedLock(lockStateExecutor, lockFactory0, lockPath, Long.MAX_VALUE, NullStatsLogger.INSTANCE); FutureUtils.result(lock0.asyncAcquire()); final CountDownLatch lock1DoneLatch = new CountDownLatch(1); SessionLockFactory lockFactory1 = createLockFactory(clientId, zkc); - final DistributedLock lock1 = - new DistributedLock(lockStateExecutor, lockFactory1, lockPath, + final ZKDistributedLock lock1 = + new ZKDistributedLock(lockStateExecutor, lockFactory1, lockPath, Long.MAX_VALUE, NullStatsLogger.INSTANCE); Thread lock1Thread = new Thread(new Runnable() { @Override @@ -553,7 +553,7 @@ public class TestDistributedLock extends TestDistributedLogBase { } else { logger.info("Waiting lock0 to attempt acquisition after session expired"); // session expire will trigger lock re-acquisition - Future<DistributedLock> asyncLockAcquireFuture; + Future<ZKDistributedLock> asyncLockAcquireFuture; do { Thread.sleep(1); asyncLockAcquireFuture = lock0.getLockReacquireFuture(); @@ -595,7 +595,7 @@ public class TestDistributedLock extends TestDistributedLogBase { CreateMode.PERSISTENT); String clientId = "lockHolder"; SessionLockFactory lockFactory = createLockFactory(clientId, zkc, conf.getLockTimeoutMilliSeconds(), 0); - DistributedLock lock = new DistributedLock(lockStateExecutor, lockFactory, lockPath, + ZKDistributedLock lock = new ZKDistributedLock(lockStateExecutor, lockFactory, lockPath, conf.getLockTimeoutMilliSeconds(), NullStatsLogger.INSTANCE); FutureUtils.result(lock.asyncAcquire()); @@ -609,7 +609,7 @@ public class TestDistributedLock extends TestDistributedLogBase { assertEquals(true, lock.getInternalLock().isLockHeld()); lockFactory = createLockFactory(clientId + "_2", zkc, conf.getLockTimeoutMilliSeconds(), 0); - DistributedLock lock2 = new DistributedLock(lockStateExecutor, lockFactory, lockPath, + ZKDistributedLock lock2 = new ZKDistributedLock(lockStateExecutor, lockFactory, lockPath, 0, NullStatsLogger.INSTANCE); boolean exceptionEncountered = false; @@ -631,7 +631,7 @@ public class TestDistributedLock extends TestDistributedLogBase { CreateMode.PERSISTENT); String clientId = "lockHolder"; SessionLockFactory factory = createLockFactory(clientId, zkc, conf.getLockTimeoutMilliSeconds(), 0); - DistributedLock lock = new DistributedLock(lockStateExecutor, factory, lockPath, + ZKDistributedLock lock = new ZKDistributedLock(lockStateExecutor, factory, lockPath, conf.getLockTimeoutMilliSeconds(), NullStatsLogger.INSTANCE); FutureUtils.result(lock.asyncAcquire()); @@ -645,7 +645,7 @@ public class TestDistributedLock extends TestDistributedLogBase { assertEquals(true, lock.getInternalLock().isLockHeld()); factory = createLockFactory(clientId + "_2", zkc, 0, 0); - DistributedLock lock2 = new DistributedLock(lockStateExecutor, factory, lockPath, + ZKDistributedLock lock2 = new ZKDistributedLock(lockStateExecutor, factory, lockPath, 0, NullStatsLogger.INSTANCE); boolean exceptionEncountered = false; @@ -663,7 +663,7 @@ public class TestDistributedLock extends TestDistributedLogBase { assertEquals(false, lock.getInternalLock().isLockHeld()); factory = createLockFactory(clientId + "_3", zkc, 0, 0); - DistributedLock lock3 = new DistributedLock(lockStateExecutor, factory, lockPath, + ZKDistributedLock lock3 = new ZKDistributedLock(lockStateExecutor, factory, lockPath, 0, NullStatsLogger.INSTANCE); FutureUtils.result(lock3.asyncAcquire()); @@ -682,8 +682,8 @@ public class TestDistributedLock extends TestDistributedLogBase { } // Assert key lock state (is locked, is internal locked, lock count, etc.) for two dlocks. - void assertLockState(DistributedLock lock0, boolean owned0, boolean intOwned0, - DistributedLock lock1, boolean owned1, boolean intOwned1, + void assertLockState(ZKDistributedLock lock0, boolean owned0, boolean intOwned0, + ZKDistributedLock lock1, boolean owned1, boolean intOwned1, int waiters, String lockPath) throws Exception { assertEquals(owned0, lock0.haveLock()); assertEquals(intOwned0, lock0.getInternalLock() != null && lock0.getInternalLock().isLockHeld()); @@ -697,9 +697,9 @@ public class TestDistributedLock extends TestDistributedLogBase { TestLockFactory locks = new TestLockFactory(runtime.getMethodName(), zkc, lockStateExecutor); int count = 3; - ArrayList<Future<DistributedLock>> results = - new ArrayList<Future<DistributedLock>>(count); - DistributedLock[] lockArray = new DistributedLock[count]; + ArrayList<Future<ZKDistributedLock>> results = + new ArrayList<Future<ZKDistributedLock>>(count); + ZKDistributedLock[] lockArray = new ZKDistributedLock[count]; final CountDownLatch[] latches = new CountDownLatch[count]; // Set up <count> waiters, save async results, count down a latch when lock is acquired in @@ -709,9 +709,9 @@ public class TestDistributedLock extends TestDistributedLogBase { lockArray[i] = locks.createLock(i, zkc); final int index = i; results.add(lockArray[i].asyncAcquire().addEventListener( - new FutureEventListener<DistributedLock>() { + new FutureEventListener<ZKDistributedLock>() { @Override - public void onSuccess(DistributedLock lock) { + public void onSuccess(ZKDistributedLock lock) { latches[index].countDown(); } @Override @@ -735,8 +735,8 @@ public class TestDistributedLock extends TestDistributedLogBase { @Test(timeout = 60000) public void testAsyncAcquireSyncThenAsyncOnSameLock() throws Exception { TestLockFactory locks = new TestLockFactory(runtime.getMethodName(), zkc, lockStateExecutor); - final DistributedLock lock0 = locks.createLock(0, zkc); - final DistributedLock lock1 = locks.createLock(1, zkc0); + final ZKDistributedLock lock0 = locks.createLock(0, zkc); + final ZKDistributedLock lock1 = locks.createLock(1, zkc0); FutureUtils.result(lock0.asyncAcquire()); @@ -774,11 +774,11 @@ public class TestDistributedLock extends TestDistributedLogBase { @Test(timeout = 60000) public void testAsyncAcquireExpireDuringWait() throws Exception { TestLockFactory locks = new TestLockFactory(runtime.getMethodName(), zkc, lockStateExecutor); - final DistributedLock lock0 = locks.createLock(0, zkc); - final DistributedLock lock1 = locks.createLock(1, zkc0); + final ZKDistributedLock lock0 = locks.createLock(0, zkc); + final ZKDistributedLock lock1 = locks.createLock(1, zkc0); FutureUtils.result(lock0.asyncAcquire()); - Future<DistributedLock> result = lock1.asyncAcquire(); + Future<ZKDistributedLock> result = lock1.asyncAcquire(); // make sure we place a waiter for lock1 while (null == lock1.getLockWaiter()) { TimeUnit.MILLISECONDS.sleep(20); @@ -800,11 +800,11 @@ public class TestDistributedLock extends TestDistributedLogBase { @Test(timeout = 60000) public void testAsyncAcquireCloseDuringWait() throws Exception { TestLockFactory locks = new TestLockFactory(runtime.getMethodName(), zkc, lockStateExecutor); - final DistributedLock lock0 = locks.createLock(0, zkc); - final DistributedLock lock1 = locks.createLock(1, zkc0); + final ZKDistributedLock lock0 = locks.createLock(0, zkc); + final ZKDistributedLock lock1 = locks.createLock(1, zkc0); FutureUtils.result(lock0.asyncAcquire()); - Future<DistributedLock> result = lock1.asyncAcquire(); + Future<ZKDistributedLock> result = lock1.asyncAcquire(); FutureUtils.result(lock1.asyncClose()); try { Await.result(result); @@ -819,9 +819,9 @@ public class TestDistributedLock extends TestDistributedLogBase { @Test(timeout = 60000) public void testAsyncAcquireCloseAfterAcquire() throws Exception { TestLockFactory locks = new TestLockFactory(runtime.getMethodName(), zkc, lockStateExecutor); - final DistributedLock lock0 = locks.createLock(0, zkc); + final ZKDistributedLock lock0 = locks.createLock(0, zkc); - Future<DistributedLock> result = lock0.asyncAcquire(); + Future<ZKDistributedLock> result = lock0.asyncAcquire(); Await.result(result); FutureUtils.result(lock0.asyncClose()); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java index 87e2fa3..3e7948d 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java @@ -30,7 +30,6 @@ import org.apache.bookkeeper.util.IOUtils; import org.apache.bookkeeper.util.LocalBookKeeper; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.Pair; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -268,7 +267,13 @@ public class DistributedLogCluster { } else { this.zks = null; } - this.dlmEmulator = new LocalDLMEmulator(numBookies, zkServers, zkPort, bkConf); + this.dlmEmulator = LocalDLMEmulator.newBuilder() + .numBookies(numBookies) + .zkHost(zkServers) + .zkPort(zkPort) + .serverConf(bkConf) + .shouldStartZK(false) + .build(); this.shouldStartProxy = shouldStartProxy; this.proxyPort = proxyPort; }
