Repository: incubator-distributedlog
Updated Branches:
  refs/heads/master b23291a2a -> 89613fb75


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestDistributedLock.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestDistributedLock.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestDistributedLock.java
index 2074c9e..1be7fe3 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestDistributedLock.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestDistributedLock.java
@@ -127,7 +127,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
             this.lockStateExecutor = lockStateExecutor;
 
         }
-        public DistributedLock createLock(int id, ZooKeeperClient zkc) throws 
Exception {
+        public ZKDistributedLock createLock(int id, ZooKeeperClient zkc) 
throws Exception {
             SessionLockFactory lockFactory = new ZKSessionLockFactory(
                     zkc,
                     clientId + id,
@@ -136,7 +136,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
                     Long.MAX_VALUE,
                     sessionTimeoutMs,
                     NullStatsLogger.INSTANCE);
-            return new DistributedLock(
+            return new ZKDistributedLock(
                     this.lockStateExecutor,
                     lockFactory,
                     this.lockPath,
@@ -190,9 +190,9 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
                 NullStatsLogger.INSTANCE);
     }
 
-    private static void checkLockAndReacquire(DistributedLock lock, boolean 
sync) throws Exception {
+    private static void checkLockAndReacquire(ZKDistributedLock lock, boolean 
sync) throws Exception {
         lock.checkOwnershipAndReacquire();
-        Future<DistributedLock> reacquireFuture = 
lock.getLockReacquireFuture();
+        Future<ZKDistributedLock> reacquireFuture = 
lock.getLockReacquireFuture();
         if (null != reacquireFuture && sync) {
             FutureUtils.result(reacquireFuture);
         }
@@ -210,7 +210,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         SessionLockFactory lockFactory = createLockFactory(clientId, zkc, 
Long.MAX_VALUE, 0);
         try {
             try {
-                DistributedLock lock = new DistributedLock(lockStateExecutor, 
lockFactory, lockPath,
+                ZKDistributedLock lock = new 
ZKDistributedLock(lockStateExecutor, lockFactory, lockPath,
                         Long.MAX_VALUE, NullStatsLogger.INSTANCE);
                 FutureUtils.result(lock.asyncAcquire());
                 fail("Should fail on creating lock if couldn't establishing 
connections to zookeeper");
@@ -226,7 +226,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         lockFactory = createLockFactory(clientId, zkc, Long.MAX_VALUE, 3);
         try {
             try {
-                DistributedLock lock = new DistributedLock(lockStateExecutor, 
lockFactory, lockPath,
+                ZKDistributedLock lock = new 
ZKDistributedLock(lockStateExecutor, lockFactory, lockPath,
                         Long.MAX_VALUE, NullStatsLogger.INSTANCE);
                 FutureUtils.result(lock.asyncAcquire());
                 fail("Should fail on creating lock if couldn't establishing 
connections to zookeeper after 3 retries");
@@ -241,7 +241,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
                 new CountDownThrowFailPointAction(0, 3));
         lockFactory = createLockFactory(clientId, zkc, Long.MAX_VALUE, 5);
         try {
-            DistributedLock lock = new DistributedLock(lockStateExecutor, 
lockFactory, lockPath,
+            ZKDistributedLock lock = new ZKDistributedLock(lockStateExecutor, 
lockFactory, lockPath,
                 Long.MAX_VALUE, NullStatsLogger.INSTANCE);
             FutureUtils.result(lock.asyncAcquire());
 
@@ -266,7 +266,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         createLockPath(zkc.get(), lockPath);
 
         SessionLockFactory lockFactory = createLockFactory(clientId, zkc);
-        DistributedLock lock = new DistributedLock(lockStateExecutor, 
lockFactory, lockPath,
+        ZKDistributedLock lock = new ZKDistributedLock(lockStateExecutor, 
lockFactory, lockPath,
                 Long.MAX_VALUE, NullStatsLogger.INSTANCE);
         FutureUtils.result(lock.asyncAcquire());
 
@@ -283,7 +283,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         assertEquals(0, children.size());
         assertFalse(lock.haveLock());
 
-        lock = new DistributedLock(lockStateExecutor, lockFactory, lockPath,
+        lock = new ZKDistributedLock(lockStateExecutor, lockFactory, lockPath,
                 Long.MAX_VALUE, NullStatsLogger.INSTANCE);
         FutureUtils.result(lock.asyncAcquire());
 
@@ -321,8 +321,8 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         createLockPath(zkc.get(), lockPath);
 
         SessionLockFactory lockFactory0 = createLockFactory(clientId, zkc0);
-        DistributedLock lock0 =
-                new DistributedLock(lockStateExecutor, lockFactory0, lockPath,
+        ZKDistributedLock lock0 =
+                new ZKDistributedLock(lockStateExecutor, lockFactory0, 
lockPath,
                         Long.MAX_VALUE, NullStatsLogger.INSTANCE);
         FutureUtils.result(lock0.asyncAcquire());
 
@@ -351,8 +351,8 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
 
 
         SessionLockFactory lockFactory = createLockFactory(clientId, zkc);
-        final DistributedLock lock1 =
-                new DistributedLock(lockStateExecutor, lockFactory, lockPath,
+        final ZKDistributedLock lock1 =
+                new ZKDistributedLock(lockStateExecutor, lockFactory, lockPath,
                         Long.MAX_VALUE, NullStatsLogger.INSTANCE);
         final CountDownLatch lockLatch = new CountDownLatch(1);
         Thread lockThread = new Thread(new Runnable() {
@@ -421,8 +421,8 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         createLockPath(zkc.get(), lockPath);
 
         SessionLockFactory lockFactory0 = createLockFactory(clientId, zkc0);
-        DistributedLock lock0 =
-                new DistributedLock(lockStateExecutor, lockFactory0, lockPath,
+        ZKDistributedLock lock0 =
+                new ZKDistributedLock(lockStateExecutor, lockFactory0, 
lockPath,
                         Long.MAX_VALUE, NullStatsLogger.INSTANCE);
         FutureUtils.result(lock0.asyncAcquire());
 
@@ -441,7 +441,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
             checkLockAndReacquire(lock0, false);
         } else {
             // session expire will trigger lock re-acquisition
-            Future<DistributedLock> asyncLockAcquireFuture;
+            Future<ZKDistributedLock> asyncLockAcquireFuture;
             do {
                 Thread.sleep(1);
                 asyncLockAcquireFuture = lock0.getLockReacquireFuture();
@@ -492,15 +492,15 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         createLockPath(zkc.get(), lockPath);
 
         SessionLockFactory lockFactory0 = createLockFactory(clientId, zkc0);
-        DistributedLock lock0 =
-                new DistributedLock(lockStateExecutor, lockFactory0, lockPath,
+        ZKDistributedLock lock0 =
+                new ZKDistributedLock(lockStateExecutor, lockFactory0, 
lockPath,
                         Long.MAX_VALUE, NullStatsLogger.INSTANCE);
         FutureUtils.result(lock0.asyncAcquire());
 
         final CountDownLatch lock1DoneLatch = new CountDownLatch(1);
         SessionLockFactory lockFactory1 = createLockFactory(clientId, zkc);
-        final DistributedLock lock1 =
-                new DistributedLock(lockStateExecutor, lockFactory1, lockPath,
+        final ZKDistributedLock lock1 =
+                new ZKDistributedLock(lockStateExecutor, lockFactory1, 
lockPath,
                         Long.MAX_VALUE, NullStatsLogger.INSTANCE);
         Thread lock1Thread = new Thread(new Runnable() {
             @Override
@@ -553,7 +553,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         } else {
             logger.info("Waiting lock0 to attempt acquisition after session 
expired");
             // session expire will trigger lock re-acquisition
-            Future<DistributedLock> asyncLockAcquireFuture;
+            Future<ZKDistributedLock> asyncLockAcquireFuture;
             do {
                 Thread.sleep(1);
                 asyncLockAcquireFuture = lock0.getLockReacquireFuture();
@@ -595,7 +595,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
                 CreateMode.PERSISTENT);
         String clientId = "lockHolder";
         SessionLockFactory lockFactory = createLockFactory(clientId, zkc, 
conf.getLockTimeoutMilliSeconds(), 0);
-        DistributedLock lock = new DistributedLock(lockStateExecutor, 
lockFactory, lockPath,
+        ZKDistributedLock lock = new ZKDistributedLock(lockStateExecutor, 
lockFactory, lockPath,
             conf.getLockTimeoutMilliSeconds(), NullStatsLogger.INSTANCE);
         FutureUtils.result(lock.asyncAcquire());
 
@@ -609,7 +609,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         assertEquals(true, lock.getInternalLock().isLockHeld());
 
         lockFactory = createLockFactory(clientId + "_2", zkc, 
conf.getLockTimeoutMilliSeconds(), 0);
-        DistributedLock lock2 = new DistributedLock(lockStateExecutor, 
lockFactory, lockPath,
+        ZKDistributedLock lock2 = new ZKDistributedLock(lockStateExecutor, 
lockFactory, lockPath,
             0, NullStatsLogger.INSTANCE);
 
         boolean exceptionEncountered = false;
@@ -631,7 +631,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
             CreateMode.PERSISTENT);
         String clientId = "lockHolder";
         SessionLockFactory factory = createLockFactory(clientId, zkc, 
conf.getLockTimeoutMilliSeconds(), 0);
-        DistributedLock lock = new DistributedLock(lockStateExecutor, factory, 
lockPath,
+        ZKDistributedLock lock = new ZKDistributedLock(lockStateExecutor, 
factory, lockPath,
             conf.getLockTimeoutMilliSeconds(), NullStatsLogger.INSTANCE);
         FutureUtils.result(lock.asyncAcquire());
 
@@ -645,7 +645,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         assertEquals(true, lock.getInternalLock().isLockHeld());
 
         factory = createLockFactory(clientId + "_2", zkc, 0, 0);
-        DistributedLock lock2 = new DistributedLock(lockStateExecutor, 
factory, lockPath,
+        ZKDistributedLock lock2 = new ZKDistributedLock(lockStateExecutor, 
factory, lockPath,
             0, NullStatsLogger.INSTANCE);
 
         boolean exceptionEncountered = false;
@@ -663,7 +663,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         assertEquals(false, lock.getInternalLock().isLockHeld());
 
         factory = createLockFactory(clientId + "_3", zkc, 0, 0);
-        DistributedLock lock3 = new DistributedLock(lockStateExecutor, 
factory, lockPath,
+        ZKDistributedLock lock3 = new ZKDistributedLock(lockStateExecutor, 
factory, lockPath,
             0, NullStatsLogger.INSTANCE);
 
         FutureUtils.result(lock3.asyncAcquire());
@@ -682,8 +682,8 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
     }
 
     // Assert key lock state (is locked, is internal locked, lock count, etc.) 
for two dlocks.
-    void assertLockState(DistributedLock lock0, boolean owned0, boolean 
intOwned0,
-                         DistributedLock lock1, boolean owned1, boolean 
intOwned1,
+    void assertLockState(ZKDistributedLock lock0, boolean owned0, boolean 
intOwned0,
+                         ZKDistributedLock lock1, boolean owned1, boolean 
intOwned1,
                          int waiters, String lockPath) throws Exception {
         assertEquals(owned0, lock0.haveLock());
         assertEquals(intOwned0, lock0.getInternalLock() != null && 
lock0.getInternalLock().isLockHeld());
@@ -697,9 +697,9 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         TestLockFactory locks = new TestLockFactory(runtime.getMethodName(), 
zkc, lockStateExecutor);
 
         int count = 3;
-        ArrayList<Future<DistributedLock>> results =
-                new ArrayList<Future<DistributedLock>>(count);
-        DistributedLock[] lockArray = new DistributedLock[count];
+        ArrayList<Future<ZKDistributedLock>> results =
+                new ArrayList<Future<ZKDistributedLock>>(count);
+        ZKDistributedLock[] lockArray = new ZKDistributedLock[count];
         final CountDownLatch[] latches = new CountDownLatch[count];
 
         // Set up <count> waiters, save async results, count down a latch when 
lock is acquired in
@@ -709,9 +709,9 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
             lockArray[i] = locks.createLock(i, zkc);
             final int index = i;
             results.add(lockArray[i].asyncAcquire().addEventListener(
-                new FutureEventListener<DistributedLock>() {
+                new FutureEventListener<ZKDistributedLock>() {
                     @Override
-                    public void onSuccess(DistributedLock lock) {
+                    public void onSuccess(ZKDistributedLock lock) {
                         latches[index].countDown();
                     }
                     @Override
@@ -735,8 +735,8 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
     @Test(timeout = 60000)
     public void testAsyncAcquireSyncThenAsyncOnSameLock() throws Exception {
         TestLockFactory locks = new TestLockFactory(runtime.getMethodName(), 
zkc, lockStateExecutor);
-        final DistributedLock lock0 = locks.createLock(0, zkc);
-        final DistributedLock lock1 = locks.createLock(1, zkc0);
+        final ZKDistributedLock lock0 = locks.createLock(0, zkc);
+        final ZKDistributedLock lock1 = locks.createLock(1, zkc0);
 
         FutureUtils.result(lock0.asyncAcquire());
 
@@ -774,11 +774,11 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
     @Test(timeout = 60000)
     public void testAsyncAcquireExpireDuringWait() throws Exception {
         TestLockFactory locks = new TestLockFactory(runtime.getMethodName(), 
zkc, lockStateExecutor);
-        final DistributedLock lock0 = locks.createLock(0, zkc);
-        final DistributedLock lock1 = locks.createLock(1, zkc0);
+        final ZKDistributedLock lock0 = locks.createLock(0, zkc);
+        final ZKDistributedLock lock1 = locks.createLock(1, zkc0);
 
         FutureUtils.result(lock0.asyncAcquire());
-        Future<DistributedLock> result = lock1.asyncAcquire();
+        Future<ZKDistributedLock> result = lock1.asyncAcquire();
         // make sure we place a waiter for lock1
         while (null == lock1.getLockWaiter()) {
             TimeUnit.MILLISECONDS.sleep(20);
@@ -800,11 +800,11 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
     @Test(timeout = 60000)
     public void testAsyncAcquireCloseDuringWait() throws Exception {
         TestLockFactory locks = new TestLockFactory(runtime.getMethodName(), 
zkc, lockStateExecutor);
-        final DistributedLock lock0 = locks.createLock(0, zkc);
-        final DistributedLock lock1 = locks.createLock(1, zkc0);
+        final ZKDistributedLock lock0 = locks.createLock(0, zkc);
+        final ZKDistributedLock lock1 = locks.createLock(1, zkc0);
 
         FutureUtils.result(lock0.asyncAcquire());
-        Future<DistributedLock> result = lock1.asyncAcquire();
+        Future<ZKDistributedLock> result = lock1.asyncAcquire();
         FutureUtils.result(lock1.asyncClose());
         try {
             Await.result(result);
@@ -819,9 +819,9 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
     @Test(timeout = 60000)
     public void testAsyncAcquireCloseAfterAcquire() throws Exception {
         TestLockFactory locks = new TestLockFactory(runtime.getMethodName(), 
zkc, lockStateExecutor);
-        final DistributedLock lock0 = locks.createLock(0, zkc);
+        final ZKDistributedLock lock0 = locks.createLock(0, zkc);
 
-        Future<DistributedLock> result = lock0.asyncAcquire();
+        Future<ZKDistributedLock> result = lock0.asyncAcquire();
         Await.result(result);
         FutureUtils.result(lock0.asyncClose());
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
index 87e2fa3..3e7948d 100644
--- 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
+++ 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
@@ -30,7 +30,6 @@ import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.LocalBookKeeper;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.tuple.Pair;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -268,7 +267,13 @@ public class DistributedLogCluster {
         } else {
             this.zks = null;
         }
-        this.dlmEmulator = new LocalDLMEmulator(numBookies, zkServers, zkPort, 
bkConf);
+        this.dlmEmulator = LocalDLMEmulator.newBuilder()
+                .numBookies(numBookies)
+                .zkHost(zkServers)
+                .zkPort(zkPort)
+                .serverConf(bkConf)
+                .shouldStartZK(false)
+                .build();
         this.shouldStartProxy = shouldStartProxy;
         this.proxyPort = proxyPort;
     }

Reply via email to