http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/com/twitter/distributedlog/client/ownership/TestOwnershipCache.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/ownership/TestOwnershipCache.java
 
b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/ownership/TestOwnershipCache.java
deleted file mode 100644
index c0e077b..0000000
--- 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/ownership/TestOwnershipCache.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client.ownership;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import com.twitter.distributedlog.client.ClientConfig;
-import com.twitter.finagle.stats.NullStatsReceiver;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.Map;
-import java.util.Set;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-/**
- * Test Case for Ownership Cache.
- */
-public class TestOwnershipCache {
-
-    @Rule
-    public TestName runtime = new TestName();
-
-    private static OwnershipCache createOwnershipCache() {
-        ClientConfig clientConfig = new ClientConfig();
-        return new OwnershipCache(clientConfig, null,
-                                  NullStatsReceiver.get(), 
NullStatsReceiver.get());
-    }
-
-    private static SocketAddress createSocketAddress(int port) {
-        return new InetSocketAddress("127.0.0.1", port);
-    }
-
-    @Test(timeout = 60000)
-    public void testUpdateOwner() {
-        OwnershipCache cache = createOwnershipCache();
-        SocketAddress addr = createSocketAddress(1000);
-        String stream = runtime.getMethodName();
-
-        assertTrue("Should successfully update owner if no owner exists 
before",
-                cache.updateOwner(stream, addr));
-        assertEquals("Owner should be " + addr + " for stream " + stream,
-                addr, cache.getOwner(stream));
-        assertTrue("Should successfully update owner if old owner is same",
-                cache.updateOwner(stream, addr));
-        assertEquals("Owner should be " + addr + " for stream " + stream,
-                addr, cache.getOwner(stream));
-    }
-
-    @Test(timeout = 60000)
-    public void testRemoveOwnerFromStream() {
-        OwnershipCache cache = createOwnershipCache();
-        int initialPort = 2000;
-        int numProxies = 2;
-        int numStreamsPerProxy = 2;
-        for (int i = 0; i < numProxies; i++) {
-            SocketAddress addr = createSocketAddress(initialPort + i);
-            for (int j = 0; j < numStreamsPerProxy; j++) {
-                String stream = runtime.getMethodName() + "_" + i + "_" + j;
-                cache.updateOwner(stream, addr);
-            }
-        }
-        Map<String, SocketAddress> ownershipMap = 
cache.getStreamOwnerMapping();
-        assertEquals("There should be " + (numProxies * numStreamsPerProxy) + 
" entries in cache",
-                numProxies * numStreamsPerProxy, ownershipMap.size());
-        Map<SocketAddress, Set<String>> ownershipDistribution = 
cache.getStreamOwnershipDistribution();
-        assertEquals("There should be " + numProxies + " proxies cached",
-                numProxies, ownershipDistribution.size());
-
-        String stream = runtime.getMethodName() + "_0_0";
-        SocketAddress owner = createSocketAddress(initialPort);
-
-        // remove non-existent mapping won't change anything
-        SocketAddress nonExistentAddr = createSocketAddress(initialPort + 999);
-        cache.removeOwnerFromStream(stream, nonExistentAddr, 
"remove-non-existent-addr");
-        assertEquals("Owner " + owner + " should not be removed",
-                owner, cache.getOwner(stream));
-        ownershipMap = cache.getStreamOwnerMapping();
-        assertEquals("There should be " + (numProxies * numStreamsPerProxy) + 
" entries in cache",
-                numProxies * numStreamsPerProxy, ownershipMap.size());
-
-        // remove existent mapping should remove ownership mapping
-        cache.removeOwnerFromStream(stream, owner, "remove-owner");
-        assertNull("Owner " + owner + " should be removed", 
cache.getOwner(stream));
-        ownershipMap = cache.getStreamOwnerMapping();
-        assertEquals("There should be " + (numProxies * numStreamsPerProxy - 
1) + " entries left in cache",
-                numProxies * numStreamsPerProxy - 1, ownershipMap.size());
-        ownershipDistribution = cache.getStreamOwnershipDistribution();
-        assertEquals("There should still be " + numProxies + " proxies cached",
-                numProxies, ownershipDistribution.size());
-        Set<String> ownedStreams = ownershipDistribution.get(owner);
-        assertEquals("There should be only " + (numStreamsPerProxy - 1) + " 
streams owned for " + owner,
-                numStreamsPerProxy - 1, ownedStreams.size());
-        assertFalse("Stream " + stream + " should not be owned by " + owner,
-                ownedStreams.contains(stream));
-    }
-
-    @Test(timeout = 60000)
-    public void testRemoveAllStreamsFromOwner() {
-        OwnershipCache cache = createOwnershipCache();
-        int initialPort = 2000;
-        int numProxies = 2;
-        int numStreamsPerProxy = 2;
-        for (int i = 0; i < numProxies; i++) {
-            SocketAddress addr = createSocketAddress(initialPort + i);
-            for (int j = 0; j < numStreamsPerProxy; j++) {
-                String stream = runtime.getMethodName() + "_" + i + "_" + j;
-                cache.updateOwner(stream, addr);
-            }
-        }
-        Map<String, SocketAddress> ownershipMap = 
cache.getStreamOwnerMapping();
-        assertEquals("There should be " + (numProxies * numStreamsPerProxy) + 
" entries in cache",
-                numProxies * numStreamsPerProxy, ownershipMap.size());
-        Map<SocketAddress, Set<String>> ownershipDistribution = 
cache.getStreamOwnershipDistribution();
-        assertEquals("There should be " + numProxies + " proxies cached",
-                numProxies, ownershipDistribution.size());
-
-        SocketAddress owner = createSocketAddress(initialPort);
-
-        // remove non-existent host won't change anything
-        SocketAddress nonExistentAddr = createSocketAddress(initialPort + 999);
-        cache.removeAllStreamsFromOwner(nonExistentAddr);
-        ownershipMap = cache.getStreamOwnerMapping();
-        assertEquals("There should still be " + (numProxies * 
numStreamsPerProxy) + " entries in cache",
-                numProxies * numStreamsPerProxy, ownershipMap.size());
-        ownershipDistribution = cache.getStreamOwnershipDistribution();
-        assertEquals("There should still be " + numProxies + " proxies cached",
-                numProxies, ownershipDistribution.size());
-
-        // remove existent host should remove ownership mapping
-        cache.removeAllStreamsFromOwner(owner);
-        ownershipMap = cache.getStreamOwnerMapping();
-        assertEquals("There should be " + ((numProxies - 1) * 
numStreamsPerProxy) + " entries left in cache",
-                (numProxies - 1) * numStreamsPerProxy, ownershipMap.size());
-        ownershipDistribution = cache.getStreamOwnershipDistribution();
-        assertEquals("There should be " + (numProxies - 1) + " proxies cached",
-                numProxies - 1, ownershipDistribution.size());
-        assertFalse("Host " + owner + " should not be cached",
-                ownershipDistribution.containsKey(owner));
-    }
-
-    @Test(timeout = 60000)
-    public void testReplaceOwner() {
-        OwnershipCache cache = createOwnershipCache();
-        int initialPort = 2000;
-        int numProxies = 2;
-        int numStreamsPerProxy = 2;
-        for (int i = 0; i < numProxies; i++) {
-            SocketAddress addr = createSocketAddress(initialPort + i);
-            for (int j = 0; j < numStreamsPerProxy; j++) {
-                String stream = runtime.getMethodName() + "_" + i + "_" + j;
-                cache.updateOwner(stream, addr);
-            }
-        }
-        Map<String, SocketAddress> ownershipMap = 
cache.getStreamOwnerMapping();
-        assertEquals("There should be " + (numProxies * numStreamsPerProxy) + 
" entries in cache",
-                numProxies * numStreamsPerProxy, ownershipMap.size());
-        Map<SocketAddress, Set<String>> ownershipDistribution = 
cache.getStreamOwnershipDistribution();
-        assertEquals("There should be " + numProxies + " proxies cached",
-                numProxies, ownershipDistribution.size());
-
-        String stream = runtime.getMethodName() + "_0_0";
-        SocketAddress oldOwner = createSocketAddress(initialPort);
-        SocketAddress newOwner = createSocketAddress(initialPort + 999);
-
-        cache.updateOwner(stream, newOwner);
-        assertEquals("Owner of " + stream + " should be changed from " + 
oldOwner + " to " + newOwner,
-                newOwner, cache.getOwner(stream));
-        ownershipMap = cache.getStreamOwnerMapping();
-        assertEquals("There should be " + (numProxies * numStreamsPerProxy) + 
" entries in cache",
-                numProxies * numStreamsPerProxy, ownershipMap.size());
-        assertEquals("Owner of " + stream + " should be " + newOwner,
-                newOwner, ownershipMap.get(stream));
-        ownershipDistribution = cache.getStreamOwnershipDistribution();
-        assertEquals("There should be " + (numProxies + 1) + " proxies cached",
-                numProxies + 1, ownershipDistribution.size());
-        Set<String> oldOwnedStreams = ownershipDistribution.get(oldOwner);
-        assertEquals("There should be only " + (numStreamsPerProxy - 1) + " 
streams owned by " + oldOwner,
-                numStreamsPerProxy - 1, oldOwnedStreams.size());
-        assertFalse("Stream " + stream + " should not be owned by " + oldOwner,
-                oldOwnedStreams.contains(stream));
-        Set<String> newOwnedStreams = ownershipDistribution.get(newOwner);
-        assertEquals("There should be only " + (numStreamsPerProxy - 1) + " 
streams owned by " + newOwner,
-                1, newOwnedStreams.size());
-        assertTrue("Stream " + stream + " should be owned by " + newOwner,
-                newOwnedStreams.contains(stream));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockDistributedLogServices.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockDistributedLogServices.java
 
b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockDistributedLogServices.java
deleted file mode 100644
index f088c0d..0000000
--- 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockDistributedLogServices.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client.proxy;
-
-import com.twitter.distributedlog.thrift.service.BulkWriteResponse;
-import com.twitter.distributedlog.thrift.service.ClientInfo;
-import com.twitter.distributedlog.thrift.service.DistributedLogService;
-import com.twitter.distributedlog.thrift.service.HeartbeatOptions;
-import com.twitter.distributedlog.thrift.service.ServerInfo;
-import com.twitter.distributedlog.thrift.service.WriteContext;
-import com.twitter.distributedlog.thrift.service.WriteResponse;
-import com.twitter.util.Future;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-/**
- * Mock DistributedLog Related Services.
- */
-public class MockDistributedLogServices {
-
-    /**
-     * Mock basic service.
-     */
-    static class MockBasicService implements 
DistributedLogService.ServiceIface {
-
-        @Override
-        public Future<ServerInfo> handshake() {
-            return Future.value(new ServerInfo());
-        }
-
-        @Override
-        public Future<ServerInfo> handshakeWithClientInfo(ClientInfo 
clientInfo) {
-            return Future.value(new ServerInfo());
-        }
-
-        @Override
-        public Future<WriteResponse> heartbeat(String stream, WriteContext 
ctx) {
-            return Future.value(new WriteResponse());
-        }
-
-        @Override
-        public Future<WriteResponse> heartbeatWithOptions(String stream,
-                                                          WriteContext ctx,
-                                                          HeartbeatOptions 
options) {
-            return Future.value(new WriteResponse());
-        }
-
-        @Override
-        public Future<WriteResponse> write(String stream,
-                                           ByteBuffer data) {
-            return Future.value(new WriteResponse());
-        }
-
-        @Override
-        public Future<WriteResponse> writeWithContext(String stream,
-                                                      ByteBuffer data,
-                                                      WriteContext ctx) {
-            return Future.value(new WriteResponse());
-        }
-
-        @Override
-        public Future<BulkWriteResponse> writeBulkWithContext(String stream,
-                                                              List<ByteBuffer> 
data,
-                                                              WriteContext 
ctx) {
-            return Future.value(new BulkWriteResponse());
-        }
-
-        @Override
-        public Future<WriteResponse> truncate(String stream,
-                                              String dlsn,
-                                              WriteContext ctx) {
-            return Future.value(new WriteResponse());
-        }
-
-        @Override
-        public Future<WriteResponse> release(String stream,
-                                             WriteContext ctx) {
-            return Future.value(new WriteResponse());
-        }
-
-        @Override
-        public Future<WriteResponse> create(String stream, WriteContext ctx) {
-            return Future.value(new WriteResponse());
-        }
-
-        @Override
-        public Future<WriteResponse> delete(String stream,
-                                            WriteContext ctx) {
-            return Future.value(new WriteResponse());
-        }
-
-        @Override
-        public Future<WriteResponse> getOwner(String stream, WriteContext ctx) 
{
-            return Future.value(new WriteResponse());
-        }
-
-        @Override
-        public Future<Void> setAcceptNewStream(boolean enabled) {
-            return Future.value(null);
-        }
-    }
-
-    /**
-     * Mock server info service.
-     */
-    public static class MockServerInfoService extends MockBasicService {
-
-        protected ServerInfo serverInfo;
-
-        public MockServerInfoService() {
-            serverInfo = new ServerInfo();
-        }
-
-        public void updateServerInfo(ServerInfo serverInfo) {
-            this.serverInfo = serverInfo;
-        }
-
-        @Override
-        public Future<ServerInfo> handshake() {
-            return Future.value(serverInfo);
-        }
-
-        @Override
-        public Future<ServerInfo> handshakeWithClientInfo(ClientInfo 
clientInfo) {
-            return Future.value(serverInfo);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockProxyClientBuilder.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockProxyClientBuilder.java
 
b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockProxyClientBuilder.java
deleted file mode 100644
index ff0bd05..0000000
--- 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockProxyClientBuilder.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client.proxy;
-
-import com.twitter.distributedlog.thrift.service.DistributedLogService;
-import java.net.SocketAddress;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Mock Proxy Client Builder.
- */
-class MockProxyClientBuilder implements ProxyClient.Builder {
-
-    static class MockProxyClient extends ProxyClient {
-        MockProxyClient(SocketAddress address,
-                        DistributedLogService.ServiceIface service) {
-            super(address, new MockThriftClient(), service);
-        }
-    }
-
-    private final ConcurrentMap<SocketAddress, MockProxyClient> clients =
-            new ConcurrentHashMap<SocketAddress, MockProxyClient>();
-
-    public void provideProxyClient(SocketAddress address,
-                                   MockProxyClient proxyClient) {
-        clients.put(address, proxyClient);
-    }
-
-    @Override
-    public ProxyClient build(SocketAddress address) {
-        return clients.get(address);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockThriftClient.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockThriftClient.java
 
b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockThriftClient.java
deleted file mode 100644
index 7877ed7..0000000
--- 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockThriftClient.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client.proxy;
-
-import com.twitter.finagle.Service;
-import com.twitter.finagle.thrift.ThriftClientRequest;
-import com.twitter.util.Future;
-
-/**
- * Mock Thrift Client.
- */
-class MockThriftClient extends Service<ThriftClientRequest, byte[]> {
-    @Override
-    public Future<byte[]> apply(ThriftClientRequest request) {
-        return Future.value(request.message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/TestProxyClientManager.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/TestProxyClientManager.java
 
b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/TestProxyClientManager.java
deleted file mode 100644
index 11e1e58..0000000
--- 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/TestProxyClientManager.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client.proxy;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.distributedlog.client.ClientConfig;
-import 
com.twitter.distributedlog.client.proxy.MockDistributedLogServices.MockBasicService;
-import 
com.twitter.distributedlog.client.proxy.MockDistributedLogServices.MockServerInfoService;
-import 
com.twitter.distributedlog.client.proxy.MockProxyClientBuilder.MockProxyClient;
-import com.twitter.distributedlog.client.resolver.DefaultRegionResolver;
-import com.twitter.distributedlog.client.stats.ClientStats;
-import com.twitter.distributedlog.thrift.service.ServerInfo;
-import com.twitter.finagle.stats.NullStatsReceiver;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.commons.lang3.tuple.Pair;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-/**
- * Test Proxy Client Manager.
- */
-public class TestProxyClientManager {
-
-    @Rule
-    public TestName runtime = new TestName();
-
-    static class TestHostProvider implements HostProvider {
-
-        Set<SocketAddress> hosts = new HashSet<SocketAddress>();
-
-        synchronized void addHost(SocketAddress host) {
-            hosts.add(host);
-        }
-
-        @Override
-        public synchronized Set<SocketAddress> getHosts() {
-            return ImmutableSet.copyOf(hosts);
-        }
-
-    }
-
-    private static ProxyClientManager 
createProxyClientManager(ProxyClient.Builder builder,
-                                                               long 
periodicHandshakeIntervalMs) {
-        HostProvider provider = new TestHostProvider();
-        return createProxyClientManager(builder, provider, 
periodicHandshakeIntervalMs);
-    }
-
-    private static ProxyClientManager 
createProxyClientManager(ProxyClient.Builder builder,
-                                                               HostProvider 
hostProvider,
-                                                               long 
periodicHandshakeIntervalMs) {
-        ClientConfig clientConfig = new ClientConfig();
-        
clientConfig.setPeriodicHandshakeIntervalMs(periodicHandshakeIntervalMs);
-        clientConfig.setPeriodicOwnershipSyncIntervalMs(-1);
-        HashedWheelTimer dlTimer = new HashedWheelTimer(
-                new 
ThreadFactoryBuilder().setNameFormat("TestProxyClientManager-timer-%d").build(),
-                clientConfig.getRedirectBackoffStartMs(),
-                TimeUnit.MILLISECONDS);
-        return new ProxyClientManager(clientConfig, builder, dlTimer, 
hostProvider,
-                new ClientStats(NullStatsReceiver.get(), false, new 
DefaultRegionResolver()));
-    }
-
-    private static SocketAddress createSocketAddress(int port) {
-        return new InetSocketAddress("127.0.0.1", port);
-    }
-
-    private static MockProxyClient createMockProxyClient(SocketAddress 
address) {
-        return new MockProxyClient(address, new MockBasicService());
-    }
-
-    private static Pair<MockProxyClient, MockServerInfoService> 
createMockProxyClient(
-            SocketAddress address, ServerInfo serverInfo) {
-        MockServerInfoService service = new MockServerInfoService();
-        MockProxyClient proxyClient = new MockProxyClient(address, service);
-        service.updateServerInfo(serverInfo);
-        return Pair.of(proxyClient, service);
-    }
-
-    @Test(timeout = 60000)
-    public void testBasicCreateRemove() throws Exception {
-        SocketAddress address = createSocketAddress(1000);
-        MockProxyClientBuilder builder = new MockProxyClientBuilder();
-        MockProxyClient mockProxyClient = createMockProxyClient(address);
-        builder.provideProxyClient(address, mockProxyClient);
-
-        ProxyClientManager clientManager = createProxyClientManager(builder, 
0L);
-        assertEquals("There should be no clients in the manager",
-                0, clientManager.getNumProxies());
-        ProxyClient proxyClient =  clientManager.createClient(address);
-        assertEquals("Create client should build the proxy client",
-                1, clientManager.getNumProxies());
-        assertTrue("The client returned should be the same client that builder 
built",
-                mockProxyClient == proxyClient);
-    }
-
-    @Test(timeout = 60000)
-    public void testGetShouldCreateClient() throws Exception {
-        SocketAddress address = createSocketAddress(2000);
-        MockProxyClientBuilder builder = new MockProxyClientBuilder();
-        MockProxyClient mockProxyClient = createMockProxyClient(address);
-        builder.provideProxyClient(address, mockProxyClient);
-
-        ProxyClientManager clientManager = createProxyClientManager(builder, 
0L);
-        assertEquals("There should be no clients in the manager",
-                0, clientManager.getNumProxies());
-        ProxyClient proxyClient =  clientManager.getClient(address);
-        assertEquals("Get client should build the proxy client",
-                1, clientManager.getNumProxies());
-        assertTrue("The client returned should be the same client that builder 
built",
-                mockProxyClient == proxyClient);
-    }
-
-    @Test(timeout = 60000)
-    public void testConditionalRemoveClient() throws Exception {
-        SocketAddress address = createSocketAddress(3000);
-        MockProxyClientBuilder builder = new MockProxyClientBuilder();
-        MockProxyClient mockProxyClient = createMockProxyClient(address);
-        MockProxyClient anotherMockProxyClient = 
createMockProxyClient(address);
-        builder.provideProxyClient(address, mockProxyClient);
-
-        ProxyClientManager clientManager = createProxyClientManager(builder, 
0L);
-        assertEquals("There should be no clients in the manager",
-                0, clientManager.getNumProxies());
-        clientManager.createClient(address);
-        assertEquals("Create client should build the proxy client",
-                1, clientManager.getNumProxies());
-        clientManager.removeClient(address, anotherMockProxyClient);
-        assertEquals("Conditional remove should not remove proxy client",
-                1, clientManager.getNumProxies());
-        clientManager.removeClient(address, mockProxyClient);
-        assertEquals("Conditional remove should remove proxy client",
-                0, clientManager.getNumProxies());
-    }
-
-    @Test(timeout = 60000)
-    public void testRemoveClient() throws Exception {
-        SocketAddress address = createSocketAddress(3000);
-        MockProxyClientBuilder builder = new MockProxyClientBuilder();
-        MockProxyClient mockProxyClient = createMockProxyClient(address);
-        builder.provideProxyClient(address, mockProxyClient);
-
-        ProxyClientManager clientManager = createProxyClientManager(builder, 
0L);
-        assertEquals("There should be no clients in the manager",
-                0, clientManager.getNumProxies());
-        clientManager.createClient(address);
-        assertEquals("Create client should build the proxy client",
-                1, clientManager.getNumProxies());
-        clientManager.removeClient(address);
-        assertEquals("Remove should remove proxy client",
-                0, clientManager.getNumProxies());
-    }
-
-    @Test(timeout = 60000)
-    public void testCreateClientShouldHandshake() throws Exception {
-        SocketAddress address = createSocketAddress(3000);
-        MockProxyClientBuilder builder = new MockProxyClientBuilder();
-        ServerInfo serverInfo = new ServerInfo();
-        serverInfo.putToOwnerships(runtime.getMethodName() + "_stream",
-                runtime.getMethodName() + "_owner");
-        Pair<MockProxyClient, MockServerInfoService> mockProxyClient =
-                createMockProxyClient(address, serverInfo);
-        builder.provideProxyClient(address, mockProxyClient.getLeft());
-
-        final AtomicReference<ServerInfo> resultHolder = new 
AtomicReference<ServerInfo>(null);
-        final CountDownLatch doneLatch = new CountDownLatch(1);
-        ProxyListener listener = new ProxyListener() {
-            @Override
-            public void onHandshakeSuccess(SocketAddress address, ProxyClient 
client, ServerInfo serverInfo) {
-                resultHolder.set(serverInfo);
-                doneLatch.countDown();
-            }
-            @Override
-            public void onHandshakeFailure(SocketAddress address, ProxyClient 
client, Throwable cause) {
-            }
-        };
-
-        ProxyClientManager clientManager = createProxyClientManager(builder, 
0L);
-        clientManager.registerProxyListener(listener);
-        assertEquals("There should be no clients in the manager",
-                0, clientManager.getNumProxies());
-        clientManager.createClient(address);
-        assertEquals("Create client should build the proxy client",
-                1, clientManager.getNumProxies());
-
-        // When a client is created, it would handshake with that proxy
-        doneLatch.await();
-        assertEquals("Handshake should return server info",
-                serverInfo, resultHolder.get());
-    }
-
-    @Test(timeout = 60000)
-    public void testHandshake() throws Exception {
-        final int numHosts = 3;
-        final int numStreamsPerHost = 3;
-        final int initialPort = 4000;
-
-        MockProxyClientBuilder builder = new MockProxyClientBuilder();
-        Map<SocketAddress, ServerInfo> serverInfoMap =
-                new HashMap<SocketAddress, ServerInfo>();
-        for (int i = 0; i < numHosts; i++) {
-            SocketAddress address = createSocketAddress(initialPort + i);
-            ServerInfo serverInfo = new ServerInfo();
-            for (int j = 0; j < numStreamsPerHost; j++) {
-                serverInfo.putToOwnerships(runtime.getMethodName() + 
"_stream_" + j,
-                        address.toString());
-            }
-            Pair<MockProxyClient, MockServerInfoService> mockProxyClient =
-                    createMockProxyClient(address, serverInfo);
-            builder.provideProxyClient(address, mockProxyClient.getLeft());
-            serverInfoMap.put(address, serverInfo);
-        }
-
-        final Map<SocketAddress, ServerInfo> results = new 
HashMap<SocketAddress, ServerInfo>();
-        final CountDownLatch doneLatch = new CountDownLatch(2 * numHosts);
-        ProxyListener listener = new ProxyListener() {
-            @Override
-            public void onHandshakeSuccess(SocketAddress address, ProxyClient 
client, ServerInfo serverInfo) {
-                synchronized (results) {
-                    results.put(address, serverInfo);
-                }
-                doneLatch.countDown();
-            }
-
-            @Override
-            public void onHandshakeFailure(SocketAddress address, ProxyClient 
client, Throwable cause) {
-            }
-        };
-
-        TestHostProvider rs = new TestHostProvider();
-        ProxyClientManager clientManager = createProxyClientManager(builder, 
rs, 0L);
-        clientManager.registerProxyListener(listener);
-        assertEquals("There should be no clients in the manager",
-                0, clientManager.getNumProxies());
-        for (int i = 0; i < numHosts; i++) {
-            rs.addHost(createSocketAddress(initialPort + i));
-        }
-        // handshake would handshake with 3 hosts again
-        clientManager.handshake();
-        doneLatch.await();
-        assertEquals("Handshake should return server info",
-                numHosts, results.size());
-        assertTrue("Handshake should get all server infos",
-                Maps.difference(serverInfoMap, results).areEqual());
-    }
-
-    @Test(timeout = 60000)
-    public void testPeriodicHandshake() throws Exception {
-        final int numHosts = 3;
-        final int numStreamsPerHost = 3;
-        final int initialPort = 5000;
-
-        MockProxyClientBuilder builder = new MockProxyClientBuilder();
-        Map<SocketAddress, ServerInfo> serverInfoMap =
-                new HashMap<SocketAddress, ServerInfo>();
-        Map<SocketAddress, MockServerInfoService> mockServiceMap =
-                new HashMap<SocketAddress, MockServerInfoService>();
-        final Map<SocketAddress, CountDownLatch> hostDoneLatches =
-                new HashMap<SocketAddress, CountDownLatch>();
-        for (int i = 0; i < numHosts; i++) {
-            SocketAddress address = createSocketAddress(initialPort + i);
-            ServerInfo serverInfo = new ServerInfo();
-            for (int j = 0; j < numStreamsPerHost; j++) {
-                serverInfo.putToOwnerships(runtime.getMethodName() + 
"_stream_" + j,
-                        address.toString());
-            }
-            Pair<MockProxyClient, MockServerInfoService> mockProxyClient =
-                    createMockProxyClient(address, serverInfo);
-            builder.provideProxyClient(address, mockProxyClient.getLeft());
-            serverInfoMap.put(address, serverInfo);
-            mockServiceMap.put(address, mockProxyClient.getRight());
-            hostDoneLatches.put(address, new CountDownLatch(2));
-        }
-
-        final Map<SocketAddress, ServerInfo> results = new 
HashMap<SocketAddress, ServerInfo>();
-        final CountDownLatch doneLatch = new CountDownLatch(numHosts);
-        ProxyListener listener = new ProxyListener() {
-            @Override
-            public void onHandshakeSuccess(SocketAddress address, ProxyClient 
client, ServerInfo serverInfo) {
-                synchronized (results) {
-                    results.put(address, serverInfo);
-                    CountDownLatch latch = hostDoneLatches.get(address);
-                    if (null != latch) {
-                        latch.countDown();
-                    }
-                }
-                doneLatch.countDown();
-            }
-
-            @Override
-            public void onHandshakeFailure(SocketAddress address, ProxyClient 
client, Throwable cause) {
-            }
-        };
-
-        TestHostProvider rs = new TestHostProvider();
-        ProxyClientManager clientManager = createProxyClientManager(builder, 
rs, 50L);
-        clientManager.setPeriodicHandshakeEnabled(false);
-        clientManager.registerProxyListener(listener);
-
-        assertEquals("There should be no clients in the manager",
-                0, clientManager.getNumProxies());
-        for (int i = 0; i < numHosts; i++) {
-            SocketAddress address = createSocketAddress(initialPort + i);
-            rs.addHost(address);
-            clientManager.createClient(address);
-        }
-
-        // make sure the first 3 handshakes going through
-        doneLatch.await();
-
-        assertEquals("Handshake should return server info",
-                numHosts, results.size());
-        assertTrue("Handshake should get all server infos",
-                Maps.difference(serverInfoMap, results).areEqual());
-
-        // update server info
-        for (int i = 0; i < numHosts; i++) {
-            SocketAddress address = createSocketAddress(initialPort + i);
-            ServerInfo serverInfo = new ServerInfo();
-            for (int j = 0; j < numStreamsPerHost; j++) {
-                serverInfo.putToOwnerships(runtime.getMethodName() + 
"_new_stream_" + j,
-                        address.toString());
-            }
-            MockServerInfoService service = mockServiceMap.get(address);
-            serverInfoMap.put(address, serverInfo);
-            service.updateServerInfo(serverInfo);
-        }
-
-        clientManager.setPeriodicHandshakeEnabled(true);
-        for (int i = 0; i < numHosts; i++) {
-            SocketAddress address = createSocketAddress(initialPort + i);
-            CountDownLatch latch = hostDoneLatches.get(address);
-            latch.await();
-        }
-
-        assertTrue("Periodic handshake should update all server infos",
-                Maps.difference(serverInfoMap, results).areEqual());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestConsistentHashRoutingService.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestConsistentHashRoutingService.java
 
b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestConsistentHashRoutingService.java
deleted file mode 100644
index 0f4804c..0000000
--- 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestConsistentHashRoutingService.java
+++ /dev/null
@@ -1,417 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client.routing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.twitter.distributedlog.client.resolver.DefaultRegionResolver;
-import com.twitter.distributedlog.service.DLSocketAddress;
-import com.twitter.finagle.Address;
-import com.twitter.finagle.Addresses;
-import com.twitter.finagle.ChannelWriteException;
-import com.twitter.finagle.NoBrokersAvailableException;
-import com.twitter.finagle.stats.NullStatsReceiver;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import org.junit.Test;
-
-/**
- * Test Case for {@link ConsistentHashRoutingService}.
- */
-public class TestConsistentHashRoutingService {
-
-    @Test(timeout = 60000)
-    public void testBlackoutHost() throws Exception {
-        TestName name = new TestName();
-        RoutingService routingService = 
ConsistentHashRoutingService.newBuilder()
-                .serverSet(new NameServerSet(name))
-                .resolveFromName(true)
-                .numReplicas(997)
-                .blackoutSeconds(2)
-                .build();
-
-        InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", 
3181);
-        Address address = Addresses.newInetAddress(inetAddress);
-        List<Address> addresses = new ArrayList<Address>(1);
-        addresses.add(address);
-        name.changeAddrs(addresses);
-
-        routingService.startService();
-
-        RoutingService.RoutingContext routingContext =
-                RoutingService.RoutingContext.of(new DefaultRegionResolver());
-
-        String streamName = "test-blackout-host";
-        assertEquals(inetAddress, routingService.getHost(streamName, 
routingContext));
-        routingService.removeHost(inetAddress, new ChannelWriteException(new 
IOException("test exception")));
-        try {
-            routingService.getHost(streamName, routingContext);
-            fail("Should fail to get host since no brokers are available");
-        } catch (NoBrokersAvailableException nbae) {
-            // expected
-        }
-
-        TimeUnit.SECONDS.sleep(3);
-        assertEquals(inetAddress, routingService.getHost(streamName, 
routingContext));
-
-        routingService.stopService();
-    }
-
-    @Test(timeout = 60000)
-    public void testPerformServerSetChangeOnName() throws Exception {
-        TestName name = new TestName();
-        ConsistentHashRoutingService routingService = 
(ConsistentHashRoutingService)
-                ConsistentHashRoutingService.newBuilder()
-                        .serverSet(new NameServerSet(name))
-                        .resolveFromName(true)
-                        .numReplicas(997)
-                        .build();
-
-        int basePort = 3180;
-        int numHosts = 4;
-        List<Address> addresses1 = Lists.newArrayListWithExpectedSize(4);
-        List<Address> addresses2 = Lists.newArrayListWithExpectedSize(4);
-        List<Address> addresses3 = Lists.newArrayListWithExpectedSize(4);
-
-        // fill up the addresses1
-        for (int i = 0; i < numHosts; i++) {
-            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", 
basePort + i);
-            Address address = Addresses.newInetAddress(inetAddress);
-            addresses1.add(address);
-        }
-        // fill up the addresses2 - overlap with addresses1
-        for (int i = 0; i < numHosts; i++) {
-            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", 
basePort + 2 + i);
-            Address address = Addresses.newInetAddress(inetAddress);
-            addresses2.add(address);
-        }
-        // fill up the addresses3 - not overlap with addresses2
-        for (int i = 0; i < numHosts; i++) {
-            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", 
basePort + 10 + i);
-            Address address = Addresses.newInetAddress(inetAddress);
-            addresses3.add(address);
-        }
-
-        final List<SocketAddress> leftAddresses = Lists.newArrayList();
-        final List<SocketAddress> joinAddresses = Lists.newArrayList();
-
-        RoutingService.RoutingListener routingListener = new 
RoutingService.RoutingListener() {
-            @Override
-            public void onServerLeft(SocketAddress address) {
-                synchronized (leftAddresses) {
-                    leftAddresses.add(address);
-                    leftAddresses.notifyAll();
-                }
-            }
-
-            @Override
-            public void onServerJoin(SocketAddress address) {
-                synchronized (joinAddresses) {
-                    joinAddresses.add(address);
-                    joinAddresses.notifyAll();
-                }
-            }
-        };
-
-        routingService.registerListener(routingListener);
-        name.changeAddrs(addresses1);
-
-        routingService.startService();
-
-        synchronized (joinAddresses) {
-            while (joinAddresses.size() < numHosts) {
-                joinAddresses.wait();
-            }
-        }
-
-        // validate 4 nodes joined
-        synchronized (joinAddresses) {
-            assertEquals(numHosts, joinAddresses.size());
-        }
-        synchronized (leftAddresses) {
-            assertEquals(0, leftAddresses.size());
-        }
-        assertEquals(numHosts, routingService.shardId2Address.size());
-        assertEquals(numHosts, routingService.address2ShardId.size());
-        for (int i = 0; i < numHosts; i++) {
-            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", 
basePort + i);
-            
assertTrue(routingService.address2ShardId.containsKey(inetAddress));
-            int shardId = routingService.address2ShardId.get(inetAddress);
-            SocketAddress sa = routingService.shardId2Address.get(shardId);
-            assertNotNull(sa);
-            assertEquals(inetAddress, sa);
-        }
-
-        // update addresses2 - 2 new hosts joined, 2 old hosts left
-        name.changeAddrs(addresses2);
-        synchronized (joinAddresses) {
-            while (joinAddresses.size() < numHosts + 2) {
-                joinAddresses.wait();
-            }
-        }
-        synchronized (leftAddresses) {
-            while (leftAddresses.size() < numHosts - 2) {
-                leftAddresses.wait();
-            }
-        }
-        assertEquals(numHosts, routingService.shardId2Address.size());
-        assertEquals(numHosts, routingService.address2ShardId.size());
-
-        // first 2 shards should leave
-        for (int i = 0; i < 2; i++) {
-            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", 
basePort + i);
-            
assertFalse(routingService.address2ShardId.containsKey(inetAddress));
-        }
-
-        for (int i = 0; i < numHosts; i++) {
-            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", 
basePort + 2 + i);
-            
assertTrue(routingService.address2ShardId.containsKey(inetAddress));
-            int shardId = routingService.address2ShardId.get(inetAddress);
-            SocketAddress sa = routingService.shardId2Address.get(shardId);
-            assertNotNull(sa);
-            assertEquals(inetAddress, sa);
-        }
-
-        // update addresses3 - 2 new hosts joined, 2 old hosts left
-        name.changeAddrs(addresses3);
-        synchronized (joinAddresses) {
-            while (joinAddresses.size() < numHosts + 2 + numHosts) {
-                joinAddresses.wait();
-            }
-        }
-        synchronized (leftAddresses) {
-            while (leftAddresses.size() < numHosts - 2 + numHosts) {
-                leftAddresses.wait();
-            }
-        }
-        assertEquals(numHosts, routingService.shardId2Address.size());
-        assertEquals(numHosts, routingService.address2ShardId.size());
-
-        // first 6 shards should leave
-        for (int i = 0; i < 2 + numHosts; i++) {
-            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", 
basePort + i);
-            
assertFalse(routingService.address2ShardId.containsKey(inetAddress));
-        }
-        // new 4 shards should exist
-        for (int i = 0; i < numHosts; i++) {
-            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", 
basePort + 10 + i);
-            
assertTrue(routingService.address2ShardId.containsKey(inetAddress));
-            int shardId = routingService.address2ShardId.get(inetAddress);
-            SocketAddress sa = routingService.shardId2Address.get(shardId);
-            assertNotNull(sa);
-            assertEquals(inetAddress, sa);
-        }
-
-    }
-
-    private static class TestServerSetWatcher implements ServerSetWatcher {
-
-        final LinkedBlockingQueue<ImmutableSet<DLSocketAddress>> changeQueue =
-                new LinkedBlockingQueue<ImmutableSet<DLSocketAddress>>();
-        final CopyOnWriteArrayList<ServerSetMonitor> monitors =
-                new CopyOnWriteArrayList<ServerSetMonitor>();
-
-        @Override
-        public void watch(ServerSetMonitor monitor) throws MonitorException {
-            monitors.add(monitor);
-            ImmutableSet<DLSocketAddress> change;
-            while ((change = changeQueue.poll()) != null) {
-                notifyChanges(change);
-            }
-        }
-
-        void notifyChanges(ImmutableSet<DLSocketAddress> addresses) {
-            if (monitors.isEmpty()) {
-                changeQueue.add(addresses);
-            } else {
-                for (ServerSetMonitor monitor : monitors) {
-                    monitor.onChange(addresses);
-                }
-            }
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testPerformServerSetChangeOnServerSet() throws Exception {
-        TestServerSetWatcher serverSetWatcher = new TestServerSetWatcher();
-        ConsistentHashRoutingService routingService = new 
ConsistentHashRoutingService(
-                serverSetWatcher, 997, Integer.MAX_VALUE, 
NullStatsReceiver.get());
-
-        int basePort = 3180;
-        int numHosts = 4;
-        Set<DLSocketAddress> addresses1 = Sets.newConcurrentHashSet();
-        Set<DLSocketAddress> addresses2 = Sets.newConcurrentHashSet();
-        Set<DLSocketAddress> addresses3 = Sets.newConcurrentHashSet();
-
-        // fill up the addresses1
-        for (int i = 0; i < numHosts; i++) {
-            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", 
basePort + i);
-            DLSocketAddress dsa = new DLSocketAddress(i, inetAddress);
-            addresses1.add(dsa);
-        }
-        // fill up the addresses2 - overlap with addresses1
-        for (int i = 0; i < numHosts; i++) {
-            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", 
basePort + numHosts + i);
-            DLSocketAddress dsa = new DLSocketAddress(i + 2, inetAddress);
-            addresses2.add(dsa);
-        }
-        // fill up the addresses3 - not overlap with addresses2
-        for (int i = 0; i < numHosts; i++) {
-            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", 
basePort + 10 + i);
-            DLSocketAddress dsa = new DLSocketAddress(i, inetAddress);
-            addresses3.add(dsa);
-        }
-
-        final List<SocketAddress> leftAddresses = Lists.newArrayList();
-        final List<SocketAddress> joinAddresses = Lists.newArrayList();
-
-        RoutingService.RoutingListener routingListener = new 
RoutingService.RoutingListener() {
-            @Override
-            public void onServerLeft(SocketAddress address) {
-                synchronized (leftAddresses) {
-                    leftAddresses.add(address);
-                    leftAddresses.notifyAll();
-                }
-            }
-
-            @Override
-            public void onServerJoin(SocketAddress address) {
-                synchronized (joinAddresses) {
-                    joinAddresses.add(address);
-                    joinAddresses.notifyAll();
-                }
-            }
-        };
-
-        routingService.registerListener(routingListener);
-        serverSetWatcher.notifyChanges(ImmutableSet.copyOf(addresses1));
-
-        routingService.startService();
-
-        synchronized (joinAddresses) {
-            while (joinAddresses.size() < numHosts) {
-                joinAddresses.wait();
-            }
-        }
-
-        // validate 4 nodes joined
-        synchronized (joinAddresses) {
-            assertEquals(numHosts, joinAddresses.size());
-        }
-        synchronized (leftAddresses) {
-            assertEquals(0, leftAddresses.size());
-        }
-        assertEquals(numHosts, routingService.shardId2Address.size());
-        assertEquals(numHosts, routingService.address2ShardId.size());
-        for (int i = 0; i < numHosts; i++) {
-            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", 
basePort + i);
-            
assertTrue(routingService.address2ShardId.containsKey(inetAddress));
-            int shardId = routingService.address2ShardId.get(inetAddress);
-            assertEquals(i, shardId);
-            SocketAddress sa = routingService.shardId2Address.get(shardId);
-            assertNotNull(sa);
-            assertEquals(inetAddress, sa);
-        }
-
-        // update addresses2 - 2 new hosts joined, 2 old hosts left
-        serverSetWatcher.notifyChanges(ImmutableSet.copyOf(addresses2));
-        synchronized (joinAddresses) {
-            while (joinAddresses.size() < numHosts + 2) {
-                joinAddresses.wait();
-            }
-        }
-        synchronized (leftAddresses) {
-            while (leftAddresses.size() < 2) {
-                leftAddresses.wait();
-            }
-        }
-
-        assertEquals(numHosts + 2, routingService.shardId2Address.size());
-        assertEquals(numHosts + 2, routingService.address2ShardId.size());
-        // first 2 shards should not leave
-        for (int i = 0; i < 2; i++) {
-            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", 
basePort + i);
-            
assertTrue(routingService.address2ShardId.containsKey(inetAddress));
-            int shardId = routingService.address2ShardId.get(inetAddress);
-            assertEquals(i, shardId);
-            SocketAddress sa = routingService.shardId2Address.get(shardId);
-            assertNotNull(sa);
-            assertEquals(inetAddress, sa);
-        }
-        for (int i = 0; i < numHosts; i++) {
-            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", 
basePort + numHosts + i);
-            
assertTrue(routingService.address2ShardId.containsKey(inetAddress));
-            int shardId = routingService.address2ShardId.get(inetAddress);
-            assertEquals(i + 2, shardId);
-            SocketAddress sa = routingService.shardId2Address.get(shardId);
-            assertNotNull(sa);
-            assertEquals(inetAddress, sa);
-        }
-
-        // update addresses3
-        serverSetWatcher.notifyChanges(ImmutableSet.copyOf(addresses3));
-        synchronized (joinAddresses) {
-            while (joinAddresses.size() < numHosts + 2 + numHosts) {
-                joinAddresses.wait();
-            }
-        }
-        synchronized (leftAddresses) {
-            while (leftAddresses.size() < 2 + numHosts) {
-                leftAddresses.wait();
-            }
-        }
-        assertEquals(numHosts + 2, routingService.shardId2Address.size());
-        assertEquals(numHosts + 2, routingService.address2ShardId.size());
-
-        // first 4 shards should leave
-        for (int i = 0; i < numHosts; i++) {
-            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", 
basePort + 10 + i);
-            
assertTrue(routingService.address2ShardId.containsKey(inetAddress));
-            int shardId = routingService.address2ShardId.get(inetAddress);
-            assertEquals(i, shardId);
-            SocketAddress sa = routingService.shardId2Address.get(shardId);
-            assertNotNull(sa);
-            assertEquals(inetAddress, sa);
-        }
-        // the other 2 shards should be still there
-        for (int i = 0; i < 2; i++) {
-            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", 
basePort + numHosts + 2 + i);
-            
assertTrue(routingService.address2ShardId.containsKey(inetAddress));
-            int shardId = routingService.address2ShardId.get(inetAddress);
-            assertEquals(numHosts + i, shardId);
-            SocketAddress sa = routingService.shardId2Address.get(shardId);
-            assertNotNull(sa);
-            assertEquals(inetAddress, sa);
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestInetNameResolution.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestInetNameResolution.java
 
b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestInetNameResolution.java
deleted file mode 100644
index 2552f9e..0000000
--- 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestInetNameResolution.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client.routing;
-
-import com.google.common.collect.ImmutableSet;
-import com.twitter.common.net.pool.DynamicHostSet;
-import com.twitter.thrift.Endpoint;
-import com.twitter.thrift.ServiceInstance;
-import java.net.InetSocketAddress;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test Case for `inet` name resolution.
- */
-public class TestInetNameResolution {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(TestRoutingService.class);
-
-    @Test(timeout = 10000)
-    public void testInetNameResolution() throws Exception {
-        String nameStr = "inet!127.0.0.1:3181";
-        final CountDownLatch resolved = new CountDownLatch(1);
-        final AtomicBoolean validationFailed = new AtomicBoolean(false);
-
-        NameServerSet serverSet = new NameServerSet(nameStr);
-        serverSet.watch(new 
DynamicHostSet.HostChangeMonitor<ServiceInstance>() {
-            @Override
-            public void onChange(ImmutableSet<ServiceInstance> hostSet) {
-                if (hostSet.size() > 1) {
-                    logger.error("HostSet has more elements than expected {}", 
hostSet);
-                    validationFailed.set(true);
-                    resolved.countDown();
-                } else if (hostSet.size() == 1) {
-                    ServiceInstance serviceInstance = 
hostSet.iterator().next();
-                    Endpoint endpoint = 
serviceInstance.getAdditionalEndpoints().get("thrift");
-                    InetSocketAddress address = new 
InetSocketAddress(endpoint.getHost(), endpoint.getPort());
-                    if (endpoint.getPort() != 3181) {
-                        logger.error("Port does not match the expected port 
{}", endpoint.getPort());
-                        validationFailed.set(true);
-                    } else if 
(!address.getAddress().getHostAddress().equals("127.0.0.1")) {
-                        logger.error("Host address does not match the expected 
address {}",
-                            address.getAddress().getHostAddress());
-                        validationFailed.set(true);
-                    }
-                    resolved.countDown();
-                }
-            }
-        });
-
-        resolved.await();
-        Assert.assertEquals(false, validationFailed.get());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestRegionsRoutingService.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestRegionsRoutingService.java
 
b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestRegionsRoutingService.java
deleted file mode 100644
index 49a375c..0000000
--- 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestRegionsRoutingService.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client.routing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import com.google.common.collect.Sets;
-import com.twitter.distributedlog.client.resolver.DefaultRegionResolver;
-import com.twitter.distributedlog.thrift.service.StatusCode;
-import com.twitter.finagle.NoBrokersAvailableException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.junit.Test;
-
-/**
- * Test Case for {@link RegionsRoutingService}.
- */
-public class TestRegionsRoutingService {
-
-    @Test(timeout = 60000)
-    public void testRoutingListener() throws Exception {
-        int numRoutingServices = 5;
-        RoutingService.Builder[] routingServiceBuilders = new 
RoutingService.Builder[numRoutingServices];
-        Set<SocketAddress> hosts = new HashSet<SocketAddress>();
-        Map<SocketAddress, String> regionMap = new HashMap<SocketAddress, 
String>();
-        for (int i = 0; i < numRoutingServices; i++) {
-            String finagleNameStr = "inet!127.0.0.1:" + (3181 + i);
-            routingServiceBuilders[i] = 
RoutingUtils.buildRoutingService(finagleNameStr);
-            SocketAddress address = new InetSocketAddress("127.0.0.1", 3181 + 
i);
-            hosts.add(address);
-            regionMap.put(address, "region-" + i);
-        }
-
-        final CountDownLatch doneLatch = new 
CountDownLatch(numRoutingServices);
-        final AtomicInteger numHostsLeft = new AtomicInteger(0);
-        final Set<SocketAddress> jointHosts = new HashSet<SocketAddress>();
-        RegionsRoutingService regionsRoutingService =
-                RegionsRoutingService.newBuilder()
-                    .routingServiceBuilders(routingServiceBuilders)
-                    .resolver(new DefaultRegionResolver(regionMap))
-                    .build();
-        regionsRoutingService.registerListener(new 
RoutingService.RoutingListener() {
-            @Override
-            public void onServerLeft(SocketAddress address) {
-                numHostsLeft.incrementAndGet();
-            }
-
-            @Override
-            public void onServerJoin(SocketAddress address) {
-                jointHosts.add(address);
-                doneLatch.countDown();
-            }
-        });
-
-        regionsRoutingService.startService();
-
-        doneLatch.await();
-
-        assertEquals(numRoutingServices, jointHosts.size());
-        assertEquals(0, numHostsLeft.get());
-        assertTrue(Sets.difference(hosts, 
jointHosts).immutableCopy().isEmpty());
-    }
-
-    @Test(timeout = 60000)
-    public void testGetHost() throws Exception {
-        int numRoutingServices = 3;
-        RoutingService.Builder[] routingServiceBuilders = new 
RoutingService.Builder[numRoutingServices];
-        Map<SocketAddress, String> regionMap = new HashMap<SocketAddress, 
String>();
-        for (int i = 0; i < numRoutingServices; i++) {
-            String finagleNameStr = "inet!127.0.0.1:" + (3181 + i);
-            routingServiceBuilders[i] = 
RoutingUtils.buildRoutingService(finagleNameStr);
-            SocketAddress address = new InetSocketAddress("127.0.0.1", 3181 + 
i);
-            regionMap.put(address, "region-" + i);
-        }
-
-        RegionsRoutingService regionsRoutingService =
-                RegionsRoutingService.newBuilder()
-                    .resolver(new DefaultRegionResolver(regionMap))
-                    .routingServiceBuilders(routingServiceBuilders)
-                    .build();
-        regionsRoutingService.startService();
-
-        RoutingService.RoutingContext routingContext =
-                RoutingService.RoutingContext.of(new DefaultRegionResolver())
-                        .addTriedHost(new InetSocketAddress("127.0.0.1", 
3183), StatusCode.WRITE_EXCEPTION);
-        assertEquals(new InetSocketAddress("127.0.0.1", 3181),
-                regionsRoutingService.getHost("any", routingContext));
-
-        routingContext =
-                RoutingService.RoutingContext.of(new DefaultRegionResolver())
-                        .addTriedHost(new InetSocketAddress("127.0.0.1", 
3181), StatusCode.WRITE_EXCEPTION);
-        assertEquals(new InetSocketAddress("127.0.0.1", 3182),
-                regionsRoutingService.getHost("any", routingContext));
-
-        // add 3182 to routing context as tried host
-        routingContext.addTriedHost(new InetSocketAddress("127.0.0.1", 3182), 
StatusCode.WRITE_EXCEPTION);
-        assertEquals(new InetSocketAddress("127.0.0.1", 3183),
-                regionsRoutingService.getHost("any", routingContext));
-
-        // add 3183 to routing context as tried host
-        routingContext.addTriedHost(new InetSocketAddress("127.0.0.1", 3183), 
StatusCode.WRITE_EXCEPTION);
-        try {
-            regionsRoutingService.getHost("any", routingContext);
-            fail("Should fail to get host since all regions are tried.");
-        } catch (NoBrokersAvailableException nbae) {
-            // expected
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestRoutingService.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestRoutingService.java
 
b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestRoutingService.java
deleted file mode 100644
index b79557e..0000000
--- 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestRoutingService.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client.routing;
-
-import static org.junit.Assert.assertEquals;
-
-import com.twitter.distributedlog.client.resolver.DefaultRegionResolver;
-import com.twitter.finagle.Address;
-import com.twitter.finagle.Addresses;
-import com.twitter.finagle.addr.WeightedAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test Case for {@link RoutingService}.
- */
-@RunWith(Parameterized.class)
-public class TestRoutingService {
-
-    static final Logger LOG = 
LoggerFactory.getLogger(TestRoutingService.class);
-
-    @Parameterized.Parameters
-    public static Collection<Object[]> configs() {
-        ArrayList<Object[]> list = new ArrayList<Object[]>();
-        for (int i = 0; i <= 1; i++) {
-            for (int j = 0; j <= 1; j++) {
-                for (int k = 0; k <= 1; k++) {
-                    list.add(new Boolean[] {i == 1, j == 1, k == 1});
-                }
-            }
-        }
-        return list;
-    }
-
-    private final boolean consistentHash;
-    private final boolean weightedAddresses;
-    private final boolean asyncResolution;
-
-    public TestRoutingService(boolean consistentHash, boolean 
weightedAddresses, boolean asyncResolution) {
-        this.consistentHash = consistentHash;
-        this.weightedAddresses = weightedAddresses;
-        this.asyncResolution = asyncResolution;
-    }
-
-    private List<Address> getAddresses(boolean weightedAddresses) {
-        ArrayList<Address> addresses = new ArrayList<Address>();
-        addresses.add(Addresses.newInetAddress(new 
InetSocketAddress("127.0.0.1", 3181)));
-        addresses.add(Addresses.newInetAddress(new 
InetSocketAddress("127.0.0.2", 3181)));
-        addresses.add(Addresses.newInetAddress(new 
InetSocketAddress("127.0.0.3", 3181)));
-        addresses.add(Addresses.newInetAddress(new 
InetSocketAddress("127.0.0.4", 3181)));
-        addresses.add(Addresses.newInetAddress(new 
InetSocketAddress("127.0.0.5", 3181)));
-        addresses.add(Addresses.newInetAddress(new 
InetSocketAddress("127.0.0.6", 3181)));
-        addresses.add(Addresses.newInetAddress(new 
InetSocketAddress("127.0.0.7", 3181)));
-
-        if (weightedAddresses) {
-            ArrayList<Address> wAddresses = new ArrayList<Address>();
-            for (Address address: addresses) {
-                wAddresses.add(WeightedAddress.apply(address, 1.0));
-            }
-            return wAddresses;
-        } else {
-            return addresses;
-        }
-    }
-
-    private void testRoutingServiceHelper(boolean consistentHash,
-                                          boolean weightedAddresses,
-                                          boolean asyncResolution)
-        throws Exception {
-        ExecutorService executorService = null;
-        final List<Address> addresses = getAddresses(weightedAddresses);
-        final TestName name = new TestName();
-        RoutingService routingService;
-        if (consistentHash) {
-            routingService = ConsistentHashRoutingService.newBuilder()
-                    .serverSet(new NameServerSet(name))
-                    .resolveFromName(true)
-                    .numReplicas(997)
-                    .build();
-        } else {
-            routingService = 
ServerSetRoutingService.newServerSetRoutingServiceBuilder()
-                    .serverSetWatcher(new TwitterServerSetWatcher(new 
NameServerSet(name), true)).build();
-        }
-
-        if (asyncResolution) {
-            executorService = Executors.newSingleThreadExecutor();
-            executorService.submit(new Runnable() {
-                @Override
-                public void run() {
-                    name.changeAddrs(addresses);
-                }
-            });
-        } else {
-            name.changeAddrs(addresses);
-        }
-        routingService.startService();
-
-        HashSet<SocketAddress> mapping = new HashSet<SocketAddress>();
-
-        for (int i = 0; i < 1000; i++) {
-            for (int j = 0; j < 5; j++) {
-                String stream = "TestStream-" + i + "-" + j;
-                mapping.add(routingService.getHost(stream,
-                        RoutingService.RoutingContext.of(new 
DefaultRegionResolver())));
-            }
-        }
-
-        assertEquals(mapping.size(), addresses.size());
-
-        if (null != executorService) {
-            executorService.shutdown();
-        }
-
-    }
-
-    @Test(timeout = 5000)
-    public void testRoutingService() throws Exception {
-        testRoutingServiceHelper(this.consistentHash, this.weightedAddresses, 
this.asyncResolution);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/com/twitter/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java
 
b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java
deleted file mode 100644
index 71d0b01..0000000
--- 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client.speculative;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-
-import com.twitter.util.CountDownLatch;
-import com.twitter.util.Future;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/**
- * Test {@link TestDefaultSpeculativeRequestExecutionPolicy}.
- */
-public class TestDefaultSpeculativeRequestExecutionPolicy {
-
-    @Test(timeout = 20000, expected = IllegalArgumentException.class)
-    public void testInvalidBackoffMultiplier() throws Exception {
-        new DefaultSpeculativeRequestExecutionPolicy(100, 200, -1);
-    }
-
-    @Test(timeout = 20000, expected = IllegalArgumentException.class)
-    public void testInvalidMaxSpeculativeTimeout() throws Exception {
-        new DefaultSpeculativeRequestExecutionPolicy(100, Integer.MAX_VALUE, 
2);
-    }
-
-    @Test(timeout = 20000)
-    public void testSpeculativeRequests() throws Exception {
-        DefaultSpeculativeRequestExecutionPolicy policy =
-                new DefaultSpeculativeRequestExecutionPolicy(10, 10000, 2);
-        SpeculativeRequestExecutor executor = 
mock(SpeculativeRequestExecutor.class);
-
-        final AtomicInteger callCount = new AtomicInteger(0);
-        final CountDownLatch latch = new CountDownLatch(3);
-
-        Mockito.doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable 
{
-                try {
-                    return Future.value(callCount.incrementAndGet() < 3);
-                } finally {
-                    latch.countDown();
-                }
-            }
-        }).when(executor).issueSpeculativeRequest();
-
-        ScheduledExecutorService executorService =
-                Executors.newSingleThreadScheduledExecutor();
-        policy.initiateSpeculativeRequest(executorService, executor);
-
-        latch.await();
-
-        assertEquals(40, policy.getNextSpeculativeRequestTimeout());
-    }
-
-    @Test(timeout = 20000)
-    public void testSpeculativeRequestsWithMaxTimeout() throws Exception {
-        DefaultSpeculativeRequestExecutionPolicy policy =
-                new DefaultSpeculativeRequestExecutionPolicy(10, 15, 2);
-        SpeculativeRequestExecutor executor = 
mock(SpeculativeRequestExecutor.class);
-
-        final AtomicInteger callCount = new AtomicInteger(0);
-        final CountDownLatch latch = new CountDownLatch(3);
-
-        Mockito.doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable 
{
-                try {
-                    return Future.value(callCount.incrementAndGet() < 3);
-                } finally {
-                    latch.countDown();
-                }
-            }
-        }).when(executor).issueSpeculativeRequest();
-
-        ScheduledExecutorService executorService =
-                Executors.newSingleThreadScheduledExecutor();
-        policy.initiateSpeculativeRequest(executorService, executor);
-
-        latch.await();
-
-        assertEquals(15, policy.getNextSpeculativeRequestTimeout());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/com/twitter/distributedlog/service/TestDistributedLogClientBuilder.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/service/TestDistributedLogClientBuilder.java
 
b/distributedlog-client/src/test/java/com/twitter/distributedlog/service/TestDistributedLogClientBuilder.java
deleted file mode 100644
index 986cdd3..0000000
--- 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/service/TestDistributedLogClientBuilder.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.service;
-
-import static org.junit.Assert.assertFalse;
-
-import com.twitter.finagle.builder.ClientBuilder;
-import com.twitter.finagle.thrift.ClientId$;
-import com.twitter.util.Duration;
-import org.junit.Test;
-
-/**
- * Test Case of {@link 
com.twitter.distributedlog.service.DistributedLogClientBuilder}.
- */
-public class TestDistributedLogClientBuilder {
-
-    @Test(timeout = 60000)
-    public void testBuildClientsFromSameBuilder() throws Exception {
-        DistributedLogClientBuilder builder = 
DistributedLogClientBuilder.newBuilder()
-                .name("build-clients-from-same-builder")
-                .clientId(ClientId$.MODULE$.apply("test-builder"))
-                .finagleNameStr("inet!127.0.0.1:7001")
-                .streamNameRegex(".*")
-                .handshakeWithClientInfo(true)
-                .clientBuilder(ClientBuilder.get()
-                    .hostConnectionLimit(1)
-                    .connectTimeout(Duration.fromSeconds(1))
-                    .tcpConnectTimeout(Duration.fromSeconds(1))
-                    .requestTimeout(Duration.fromSeconds(10)));
-        DistributedLogClient client1 = builder.build();
-        DistributedLogClient client2 = builder.build();
-        assertFalse(client1 == client2);
-    }
-}

Reply via email to