http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/com/twitter/distributedlog/client/ownership/TestOwnershipCache.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/ownership/TestOwnershipCache.java b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/ownership/TestOwnershipCache.java deleted file mode 100644 index c0e077b..0000000 --- a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/ownership/TestOwnershipCache.java +++ /dev/null @@ -1,207 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.ownership; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import com.twitter.distributedlog.client.ClientConfig; -import com.twitter.finagle.stats.NullStatsReceiver; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.Map; -import java.util.Set; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; - -/** - * Test Case for Ownership Cache. - */ -public class TestOwnershipCache { - - @Rule - public TestName runtime = new TestName(); - - private static OwnershipCache createOwnershipCache() { - ClientConfig clientConfig = new ClientConfig(); - return new OwnershipCache(clientConfig, null, - NullStatsReceiver.get(), NullStatsReceiver.get()); - } - - private static SocketAddress createSocketAddress(int port) { - return new InetSocketAddress("127.0.0.1", port); - } - - @Test(timeout = 60000) - public void testUpdateOwner() { - OwnershipCache cache = createOwnershipCache(); - SocketAddress addr = createSocketAddress(1000); - String stream = runtime.getMethodName(); - - assertTrue("Should successfully update owner if no owner exists before", - cache.updateOwner(stream, addr)); - assertEquals("Owner should be " + addr + " for stream " + stream, - addr, cache.getOwner(stream)); - assertTrue("Should successfully update owner if old owner is same", - cache.updateOwner(stream, addr)); - assertEquals("Owner should be " + addr + " for stream " + stream, - addr, cache.getOwner(stream)); - } - - @Test(timeout = 60000) - public void testRemoveOwnerFromStream() { - OwnershipCache cache = createOwnershipCache(); - int initialPort = 2000; - int numProxies = 2; - int numStreamsPerProxy = 2; - for (int i = 0; i < numProxies; i++) { - SocketAddress addr = createSocketAddress(initialPort + i); - for (int j = 0; j < numStreamsPerProxy; j++) { - String stream = runtime.getMethodName() + "_" + i + "_" + j; - cache.updateOwner(stream, addr); - } - } - Map<String, SocketAddress> ownershipMap = cache.getStreamOwnerMapping(); - assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache", - numProxies * numStreamsPerProxy, ownershipMap.size()); - Map<SocketAddress, Set<String>> ownershipDistribution = cache.getStreamOwnershipDistribution(); - assertEquals("There should be " + numProxies + " proxies cached", - numProxies, ownershipDistribution.size()); - - String stream = runtime.getMethodName() + "_0_0"; - SocketAddress owner = createSocketAddress(initialPort); - - // remove non-existent mapping won't change anything - SocketAddress nonExistentAddr = createSocketAddress(initialPort + 999); - cache.removeOwnerFromStream(stream, nonExistentAddr, "remove-non-existent-addr"); - assertEquals("Owner " + owner + " should not be removed", - owner, cache.getOwner(stream)); - ownershipMap = cache.getStreamOwnerMapping(); - assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache", - numProxies * numStreamsPerProxy, ownershipMap.size()); - - // remove existent mapping should remove ownership mapping - cache.removeOwnerFromStream(stream, owner, "remove-owner"); - assertNull("Owner " + owner + " should be removed", cache.getOwner(stream)); - ownershipMap = cache.getStreamOwnerMapping(); - assertEquals("There should be " + (numProxies * numStreamsPerProxy - 1) + " entries left in cache", - numProxies * numStreamsPerProxy - 1, ownershipMap.size()); - ownershipDistribution = cache.getStreamOwnershipDistribution(); - assertEquals("There should still be " + numProxies + " proxies cached", - numProxies, ownershipDistribution.size()); - Set<String> ownedStreams = ownershipDistribution.get(owner); - assertEquals("There should be only " + (numStreamsPerProxy - 1) + " streams owned for " + owner, - numStreamsPerProxy - 1, ownedStreams.size()); - assertFalse("Stream " + stream + " should not be owned by " + owner, - ownedStreams.contains(stream)); - } - - @Test(timeout = 60000) - public void testRemoveAllStreamsFromOwner() { - OwnershipCache cache = createOwnershipCache(); - int initialPort = 2000; - int numProxies = 2; - int numStreamsPerProxy = 2; - for (int i = 0; i < numProxies; i++) { - SocketAddress addr = createSocketAddress(initialPort + i); - for (int j = 0; j < numStreamsPerProxy; j++) { - String stream = runtime.getMethodName() + "_" + i + "_" + j; - cache.updateOwner(stream, addr); - } - } - Map<String, SocketAddress> ownershipMap = cache.getStreamOwnerMapping(); - assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache", - numProxies * numStreamsPerProxy, ownershipMap.size()); - Map<SocketAddress, Set<String>> ownershipDistribution = cache.getStreamOwnershipDistribution(); - assertEquals("There should be " + numProxies + " proxies cached", - numProxies, ownershipDistribution.size()); - - SocketAddress owner = createSocketAddress(initialPort); - - // remove non-existent host won't change anything - SocketAddress nonExistentAddr = createSocketAddress(initialPort + 999); - cache.removeAllStreamsFromOwner(nonExistentAddr); - ownershipMap = cache.getStreamOwnerMapping(); - assertEquals("There should still be " + (numProxies * numStreamsPerProxy) + " entries in cache", - numProxies * numStreamsPerProxy, ownershipMap.size()); - ownershipDistribution = cache.getStreamOwnershipDistribution(); - assertEquals("There should still be " + numProxies + " proxies cached", - numProxies, ownershipDistribution.size()); - - // remove existent host should remove ownership mapping - cache.removeAllStreamsFromOwner(owner); - ownershipMap = cache.getStreamOwnerMapping(); - assertEquals("There should be " + ((numProxies - 1) * numStreamsPerProxy) + " entries left in cache", - (numProxies - 1) * numStreamsPerProxy, ownershipMap.size()); - ownershipDistribution = cache.getStreamOwnershipDistribution(); - assertEquals("There should be " + (numProxies - 1) + " proxies cached", - numProxies - 1, ownershipDistribution.size()); - assertFalse("Host " + owner + " should not be cached", - ownershipDistribution.containsKey(owner)); - } - - @Test(timeout = 60000) - public void testReplaceOwner() { - OwnershipCache cache = createOwnershipCache(); - int initialPort = 2000; - int numProxies = 2; - int numStreamsPerProxy = 2; - for (int i = 0; i < numProxies; i++) { - SocketAddress addr = createSocketAddress(initialPort + i); - for (int j = 0; j < numStreamsPerProxy; j++) { - String stream = runtime.getMethodName() + "_" + i + "_" + j; - cache.updateOwner(stream, addr); - } - } - Map<String, SocketAddress> ownershipMap = cache.getStreamOwnerMapping(); - assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache", - numProxies * numStreamsPerProxy, ownershipMap.size()); - Map<SocketAddress, Set<String>> ownershipDistribution = cache.getStreamOwnershipDistribution(); - assertEquals("There should be " + numProxies + " proxies cached", - numProxies, ownershipDistribution.size()); - - String stream = runtime.getMethodName() + "_0_0"; - SocketAddress oldOwner = createSocketAddress(initialPort); - SocketAddress newOwner = createSocketAddress(initialPort + 999); - - cache.updateOwner(stream, newOwner); - assertEquals("Owner of " + stream + " should be changed from " + oldOwner + " to " + newOwner, - newOwner, cache.getOwner(stream)); - ownershipMap = cache.getStreamOwnerMapping(); - assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache", - numProxies * numStreamsPerProxy, ownershipMap.size()); - assertEquals("Owner of " + stream + " should be " + newOwner, - newOwner, ownershipMap.get(stream)); - ownershipDistribution = cache.getStreamOwnershipDistribution(); - assertEquals("There should be " + (numProxies + 1) + " proxies cached", - numProxies + 1, ownershipDistribution.size()); - Set<String> oldOwnedStreams = ownershipDistribution.get(oldOwner); - assertEquals("There should be only " + (numStreamsPerProxy - 1) + " streams owned by " + oldOwner, - numStreamsPerProxy - 1, oldOwnedStreams.size()); - assertFalse("Stream " + stream + " should not be owned by " + oldOwner, - oldOwnedStreams.contains(stream)); - Set<String> newOwnedStreams = ownershipDistribution.get(newOwner); - assertEquals("There should be only " + (numStreamsPerProxy - 1) + " streams owned by " + newOwner, - 1, newOwnedStreams.size()); - assertTrue("Stream " + stream + " should be owned by " + newOwner, - newOwnedStreams.contains(stream)); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockDistributedLogServices.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockDistributedLogServices.java b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockDistributedLogServices.java deleted file mode 100644 index f088c0d..0000000 --- a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockDistributedLogServices.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.proxy; - -import com.twitter.distributedlog.thrift.service.BulkWriteResponse; -import com.twitter.distributedlog.thrift.service.ClientInfo; -import com.twitter.distributedlog.thrift.service.DistributedLogService; -import com.twitter.distributedlog.thrift.service.HeartbeatOptions; -import com.twitter.distributedlog.thrift.service.ServerInfo; -import com.twitter.distributedlog.thrift.service.WriteContext; -import com.twitter.distributedlog.thrift.service.WriteResponse; -import com.twitter.util.Future; -import java.nio.ByteBuffer; -import java.util.List; - -/** - * Mock DistributedLog Related Services. - */ -public class MockDistributedLogServices { - - /** - * Mock basic service. - */ - static class MockBasicService implements DistributedLogService.ServiceIface { - - @Override - public Future<ServerInfo> handshake() { - return Future.value(new ServerInfo()); - } - - @Override - public Future<ServerInfo> handshakeWithClientInfo(ClientInfo clientInfo) { - return Future.value(new ServerInfo()); - } - - @Override - public Future<WriteResponse> heartbeat(String stream, WriteContext ctx) { - return Future.value(new WriteResponse()); - } - - @Override - public Future<WriteResponse> heartbeatWithOptions(String stream, - WriteContext ctx, - HeartbeatOptions options) { - return Future.value(new WriteResponse()); - } - - @Override - public Future<WriteResponse> write(String stream, - ByteBuffer data) { - return Future.value(new WriteResponse()); - } - - @Override - public Future<WriteResponse> writeWithContext(String stream, - ByteBuffer data, - WriteContext ctx) { - return Future.value(new WriteResponse()); - } - - @Override - public Future<BulkWriteResponse> writeBulkWithContext(String stream, - List<ByteBuffer> data, - WriteContext ctx) { - return Future.value(new BulkWriteResponse()); - } - - @Override - public Future<WriteResponse> truncate(String stream, - String dlsn, - WriteContext ctx) { - return Future.value(new WriteResponse()); - } - - @Override - public Future<WriteResponse> release(String stream, - WriteContext ctx) { - return Future.value(new WriteResponse()); - } - - @Override - public Future<WriteResponse> create(String stream, WriteContext ctx) { - return Future.value(new WriteResponse()); - } - - @Override - public Future<WriteResponse> delete(String stream, - WriteContext ctx) { - return Future.value(new WriteResponse()); - } - - @Override - public Future<WriteResponse> getOwner(String stream, WriteContext ctx) { - return Future.value(new WriteResponse()); - } - - @Override - public Future<Void> setAcceptNewStream(boolean enabled) { - return Future.value(null); - } - } - - /** - * Mock server info service. - */ - public static class MockServerInfoService extends MockBasicService { - - protected ServerInfo serverInfo; - - public MockServerInfoService() { - serverInfo = new ServerInfo(); - } - - public void updateServerInfo(ServerInfo serverInfo) { - this.serverInfo = serverInfo; - } - - @Override - public Future<ServerInfo> handshake() { - return Future.value(serverInfo); - } - - @Override - public Future<ServerInfo> handshakeWithClientInfo(ClientInfo clientInfo) { - return Future.value(serverInfo); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockProxyClientBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockProxyClientBuilder.java b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockProxyClientBuilder.java deleted file mode 100644 index ff0bd05..0000000 --- a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockProxyClientBuilder.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.proxy; - -import com.twitter.distributedlog.thrift.service.DistributedLogService; -import java.net.SocketAddress; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * Mock Proxy Client Builder. - */ -class MockProxyClientBuilder implements ProxyClient.Builder { - - static class MockProxyClient extends ProxyClient { - MockProxyClient(SocketAddress address, - DistributedLogService.ServiceIface service) { - super(address, new MockThriftClient(), service); - } - } - - private final ConcurrentMap<SocketAddress, MockProxyClient> clients = - new ConcurrentHashMap<SocketAddress, MockProxyClient>(); - - public void provideProxyClient(SocketAddress address, - MockProxyClient proxyClient) { - clients.put(address, proxyClient); - } - - @Override - public ProxyClient build(SocketAddress address) { - return clients.get(address); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockThriftClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockThriftClient.java b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockThriftClient.java deleted file mode 100644 index 7877ed7..0000000 --- a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockThriftClient.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.proxy; - -import com.twitter.finagle.Service; -import com.twitter.finagle.thrift.ThriftClientRequest; -import com.twitter.util.Future; - -/** - * Mock Thrift Client. - */ -class MockThriftClient extends Service<ThriftClientRequest, byte[]> { - @Override - public Future<byte[]> apply(ThriftClientRequest request) { - return Future.value(request.message); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/TestProxyClientManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/TestProxyClientManager.java b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/TestProxyClientManager.java deleted file mode 100644 index 11e1e58..0000000 --- a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/TestProxyClientManager.java +++ /dev/null @@ -1,368 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.proxy; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.twitter.distributedlog.client.ClientConfig; -import com.twitter.distributedlog.client.proxy.MockDistributedLogServices.MockBasicService; -import com.twitter.distributedlog.client.proxy.MockDistributedLogServices.MockServerInfoService; -import com.twitter.distributedlog.client.proxy.MockProxyClientBuilder.MockProxyClient; -import com.twitter.distributedlog.client.resolver.DefaultRegionResolver; -import com.twitter.distributedlog.client.stats.ClientStats; -import com.twitter.distributedlog.thrift.service.ServerInfo; -import com.twitter.finagle.stats.NullStatsReceiver; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.commons.lang3.tuple.Pair; -import org.jboss.netty.util.HashedWheelTimer; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; - -/** - * Test Proxy Client Manager. - */ -public class TestProxyClientManager { - - @Rule - public TestName runtime = new TestName(); - - static class TestHostProvider implements HostProvider { - - Set<SocketAddress> hosts = new HashSet<SocketAddress>(); - - synchronized void addHost(SocketAddress host) { - hosts.add(host); - } - - @Override - public synchronized Set<SocketAddress> getHosts() { - return ImmutableSet.copyOf(hosts); - } - - } - - private static ProxyClientManager createProxyClientManager(ProxyClient.Builder builder, - long periodicHandshakeIntervalMs) { - HostProvider provider = new TestHostProvider(); - return createProxyClientManager(builder, provider, periodicHandshakeIntervalMs); - } - - private static ProxyClientManager createProxyClientManager(ProxyClient.Builder builder, - HostProvider hostProvider, - long periodicHandshakeIntervalMs) { - ClientConfig clientConfig = new ClientConfig(); - clientConfig.setPeriodicHandshakeIntervalMs(periodicHandshakeIntervalMs); - clientConfig.setPeriodicOwnershipSyncIntervalMs(-1); - HashedWheelTimer dlTimer = new HashedWheelTimer( - new ThreadFactoryBuilder().setNameFormat("TestProxyClientManager-timer-%d").build(), - clientConfig.getRedirectBackoffStartMs(), - TimeUnit.MILLISECONDS); - return new ProxyClientManager(clientConfig, builder, dlTimer, hostProvider, - new ClientStats(NullStatsReceiver.get(), false, new DefaultRegionResolver())); - } - - private static SocketAddress createSocketAddress(int port) { - return new InetSocketAddress("127.0.0.1", port); - } - - private static MockProxyClient createMockProxyClient(SocketAddress address) { - return new MockProxyClient(address, new MockBasicService()); - } - - private static Pair<MockProxyClient, MockServerInfoService> createMockProxyClient( - SocketAddress address, ServerInfo serverInfo) { - MockServerInfoService service = new MockServerInfoService(); - MockProxyClient proxyClient = new MockProxyClient(address, service); - service.updateServerInfo(serverInfo); - return Pair.of(proxyClient, service); - } - - @Test(timeout = 60000) - public void testBasicCreateRemove() throws Exception { - SocketAddress address = createSocketAddress(1000); - MockProxyClientBuilder builder = new MockProxyClientBuilder(); - MockProxyClient mockProxyClient = createMockProxyClient(address); - builder.provideProxyClient(address, mockProxyClient); - - ProxyClientManager clientManager = createProxyClientManager(builder, 0L); - assertEquals("There should be no clients in the manager", - 0, clientManager.getNumProxies()); - ProxyClient proxyClient = clientManager.createClient(address); - assertEquals("Create client should build the proxy client", - 1, clientManager.getNumProxies()); - assertTrue("The client returned should be the same client that builder built", - mockProxyClient == proxyClient); - } - - @Test(timeout = 60000) - public void testGetShouldCreateClient() throws Exception { - SocketAddress address = createSocketAddress(2000); - MockProxyClientBuilder builder = new MockProxyClientBuilder(); - MockProxyClient mockProxyClient = createMockProxyClient(address); - builder.provideProxyClient(address, mockProxyClient); - - ProxyClientManager clientManager = createProxyClientManager(builder, 0L); - assertEquals("There should be no clients in the manager", - 0, clientManager.getNumProxies()); - ProxyClient proxyClient = clientManager.getClient(address); - assertEquals("Get client should build the proxy client", - 1, clientManager.getNumProxies()); - assertTrue("The client returned should be the same client that builder built", - mockProxyClient == proxyClient); - } - - @Test(timeout = 60000) - public void testConditionalRemoveClient() throws Exception { - SocketAddress address = createSocketAddress(3000); - MockProxyClientBuilder builder = new MockProxyClientBuilder(); - MockProxyClient mockProxyClient = createMockProxyClient(address); - MockProxyClient anotherMockProxyClient = createMockProxyClient(address); - builder.provideProxyClient(address, mockProxyClient); - - ProxyClientManager clientManager = createProxyClientManager(builder, 0L); - assertEquals("There should be no clients in the manager", - 0, clientManager.getNumProxies()); - clientManager.createClient(address); - assertEquals("Create client should build the proxy client", - 1, clientManager.getNumProxies()); - clientManager.removeClient(address, anotherMockProxyClient); - assertEquals("Conditional remove should not remove proxy client", - 1, clientManager.getNumProxies()); - clientManager.removeClient(address, mockProxyClient); - assertEquals("Conditional remove should remove proxy client", - 0, clientManager.getNumProxies()); - } - - @Test(timeout = 60000) - public void testRemoveClient() throws Exception { - SocketAddress address = createSocketAddress(3000); - MockProxyClientBuilder builder = new MockProxyClientBuilder(); - MockProxyClient mockProxyClient = createMockProxyClient(address); - builder.provideProxyClient(address, mockProxyClient); - - ProxyClientManager clientManager = createProxyClientManager(builder, 0L); - assertEquals("There should be no clients in the manager", - 0, clientManager.getNumProxies()); - clientManager.createClient(address); - assertEquals("Create client should build the proxy client", - 1, clientManager.getNumProxies()); - clientManager.removeClient(address); - assertEquals("Remove should remove proxy client", - 0, clientManager.getNumProxies()); - } - - @Test(timeout = 60000) - public void testCreateClientShouldHandshake() throws Exception { - SocketAddress address = createSocketAddress(3000); - MockProxyClientBuilder builder = new MockProxyClientBuilder(); - ServerInfo serverInfo = new ServerInfo(); - serverInfo.putToOwnerships(runtime.getMethodName() + "_stream", - runtime.getMethodName() + "_owner"); - Pair<MockProxyClient, MockServerInfoService> mockProxyClient = - createMockProxyClient(address, serverInfo); - builder.provideProxyClient(address, mockProxyClient.getLeft()); - - final AtomicReference<ServerInfo> resultHolder = new AtomicReference<ServerInfo>(null); - final CountDownLatch doneLatch = new CountDownLatch(1); - ProxyListener listener = new ProxyListener() { - @Override - public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) { - resultHolder.set(serverInfo); - doneLatch.countDown(); - } - @Override - public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) { - } - }; - - ProxyClientManager clientManager = createProxyClientManager(builder, 0L); - clientManager.registerProxyListener(listener); - assertEquals("There should be no clients in the manager", - 0, clientManager.getNumProxies()); - clientManager.createClient(address); - assertEquals("Create client should build the proxy client", - 1, clientManager.getNumProxies()); - - // When a client is created, it would handshake with that proxy - doneLatch.await(); - assertEquals("Handshake should return server info", - serverInfo, resultHolder.get()); - } - - @Test(timeout = 60000) - public void testHandshake() throws Exception { - final int numHosts = 3; - final int numStreamsPerHost = 3; - final int initialPort = 4000; - - MockProxyClientBuilder builder = new MockProxyClientBuilder(); - Map<SocketAddress, ServerInfo> serverInfoMap = - new HashMap<SocketAddress, ServerInfo>(); - for (int i = 0; i < numHosts; i++) { - SocketAddress address = createSocketAddress(initialPort + i); - ServerInfo serverInfo = new ServerInfo(); - for (int j = 0; j < numStreamsPerHost; j++) { - serverInfo.putToOwnerships(runtime.getMethodName() + "_stream_" + j, - address.toString()); - } - Pair<MockProxyClient, MockServerInfoService> mockProxyClient = - createMockProxyClient(address, serverInfo); - builder.provideProxyClient(address, mockProxyClient.getLeft()); - serverInfoMap.put(address, serverInfo); - } - - final Map<SocketAddress, ServerInfo> results = new HashMap<SocketAddress, ServerInfo>(); - final CountDownLatch doneLatch = new CountDownLatch(2 * numHosts); - ProxyListener listener = new ProxyListener() { - @Override - public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) { - synchronized (results) { - results.put(address, serverInfo); - } - doneLatch.countDown(); - } - - @Override - public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) { - } - }; - - TestHostProvider rs = new TestHostProvider(); - ProxyClientManager clientManager = createProxyClientManager(builder, rs, 0L); - clientManager.registerProxyListener(listener); - assertEquals("There should be no clients in the manager", - 0, clientManager.getNumProxies()); - for (int i = 0; i < numHosts; i++) { - rs.addHost(createSocketAddress(initialPort + i)); - } - // handshake would handshake with 3 hosts again - clientManager.handshake(); - doneLatch.await(); - assertEquals("Handshake should return server info", - numHosts, results.size()); - assertTrue("Handshake should get all server infos", - Maps.difference(serverInfoMap, results).areEqual()); - } - - @Test(timeout = 60000) - public void testPeriodicHandshake() throws Exception { - final int numHosts = 3; - final int numStreamsPerHost = 3; - final int initialPort = 5000; - - MockProxyClientBuilder builder = new MockProxyClientBuilder(); - Map<SocketAddress, ServerInfo> serverInfoMap = - new HashMap<SocketAddress, ServerInfo>(); - Map<SocketAddress, MockServerInfoService> mockServiceMap = - new HashMap<SocketAddress, MockServerInfoService>(); - final Map<SocketAddress, CountDownLatch> hostDoneLatches = - new HashMap<SocketAddress, CountDownLatch>(); - for (int i = 0; i < numHosts; i++) { - SocketAddress address = createSocketAddress(initialPort + i); - ServerInfo serverInfo = new ServerInfo(); - for (int j = 0; j < numStreamsPerHost; j++) { - serverInfo.putToOwnerships(runtime.getMethodName() + "_stream_" + j, - address.toString()); - } - Pair<MockProxyClient, MockServerInfoService> mockProxyClient = - createMockProxyClient(address, serverInfo); - builder.provideProxyClient(address, mockProxyClient.getLeft()); - serverInfoMap.put(address, serverInfo); - mockServiceMap.put(address, mockProxyClient.getRight()); - hostDoneLatches.put(address, new CountDownLatch(2)); - } - - final Map<SocketAddress, ServerInfo> results = new HashMap<SocketAddress, ServerInfo>(); - final CountDownLatch doneLatch = new CountDownLatch(numHosts); - ProxyListener listener = new ProxyListener() { - @Override - public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) { - synchronized (results) { - results.put(address, serverInfo); - CountDownLatch latch = hostDoneLatches.get(address); - if (null != latch) { - latch.countDown(); - } - } - doneLatch.countDown(); - } - - @Override - public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) { - } - }; - - TestHostProvider rs = new TestHostProvider(); - ProxyClientManager clientManager = createProxyClientManager(builder, rs, 50L); - clientManager.setPeriodicHandshakeEnabled(false); - clientManager.registerProxyListener(listener); - - assertEquals("There should be no clients in the manager", - 0, clientManager.getNumProxies()); - for (int i = 0; i < numHosts; i++) { - SocketAddress address = createSocketAddress(initialPort + i); - rs.addHost(address); - clientManager.createClient(address); - } - - // make sure the first 3 handshakes going through - doneLatch.await(); - - assertEquals("Handshake should return server info", - numHosts, results.size()); - assertTrue("Handshake should get all server infos", - Maps.difference(serverInfoMap, results).areEqual()); - - // update server info - for (int i = 0; i < numHosts; i++) { - SocketAddress address = createSocketAddress(initialPort + i); - ServerInfo serverInfo = new ServerInfo(); - for (int j = 0; j < numStreamsPerHost; j++) { - serverInfo.putToOwnerships(runtime.getMethodName() + "_new_stream_" + j, - address.toString()); - } - MockServerInfoService service = mockServiceMap.get(address); - serverInfoMap.put(address, serverInfo); - service.updateServerInfo(serverInfo); - } - - clientManager.setPeriodicHandshakeEnabled(true); - for (int i = 0; i < numHosts; i++) { - SocketAddress address = createSocketAddress(initialPort + i); - CountDownLatch latch = hostDoneLatches.get(address); - latch.await(); - } - - assertTrue("Periodic handshake should update all server infos", - Maps.difference(serverInfoMap, results).areEqual()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestConsistentHashRoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestConsistentHashRoutingService.java b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestConsistentHashRoutingService.java deleted file mode 100644 index 0f4804c..0000000 --- a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestConsistentHashRoutingService.java +++ /dev/null @@ -1,417 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.routing; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.twitter.distributedlog.client.resolver.DefaultRegionResolver; -import com.twitter.distributedlog.service.DLSocketAddress; -import com.twitter.finagle.Address; -import com.twitter.finagle.Addresses; -import com.twitter.finagle.ChannelWriteException; -import com.twitter.finagle.NoBrokersAvailableException; -import com.twitter.finagle.stats.NullStatsReceiver; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import org.junit.Test; - -/** - * Test Case for {@link ConsistentHashRoutingService}. - */ -public class TestConsistentHashRoutingService { - - @Test(timeout = 60000) - public void testBlackoutHost() throws Exception { - TestName name = new TestName(); - RoutingService routingService = ConsistentHashRoutingService.newBuilder() - .serverSet(new NameServerSet(name)) - .resolveFromName(true) - .numReplicas(997) - .blackoutSeconds(2) - .build(); - - InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", 3181); - Address address = Addresses.newInetAddress(inetAddress); - List<Address> addresses = new ArrayList<Address>(1); - addresses.add(address); - name.changeAddrs(addresses); - - routingService.startService(); - - RoutingService.RoutingContext routingContext = - RoutingService.RoutingContext.of(new DefaultRegionResolver()); - - String streamName = "test-blackout-host"; - assertEquals(inetAddress, routingService.getHost(streamName, routingContext)); - routingService.removeHost(inetAddress, new ChannelWriteException(new IOException("test exception"))); - try { - routingService.getHost(streamName, routingContext); - fail("Should fail to get host since no brokers are available"); - } catch (NoBrokersAvailableException nbae) { - // expected - } - - TimeUnit.SECONDS.sleep(3); - assertEquals(inetAddress, routingService.getHost(streamName, routingContext)); - - routingService.stopService(); - } - - @Test(timeout = 60000) - public void testPerformServerSetChangeOnName() throws Exception { - TestName name = new TestName(); - ConsistentHashRoutingService routingService = (ConsistentHashRoutingService) - ConsistentHashRoutingService.newBuilder() - .serverSet(new NameServerSet(name)) - .resolveFromName(true) - .numReplicas(997) - .build(); - - int basePort = 3180; - int numHosts = 4; - List<Address> addresses1 = Lists.newArrayListWithExpectedSize(4); - List<Address> addresses2 = Lists.newArrayListWithExpectedSize(4); - List<Address> addresses3 = Lists.newArrayListWithExpectedSize(4); - - // fill up the addresses1 - for (int i = 0; i < numHosts; i++) { - InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i); - Address address = Addresses.newInetAddress(inetAddress); - addresses1.add(address); - } - // fill up the addresses2 - overlap with addresses1 - for (int i = 0; i < numHosts; i++) { - InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 2 + i); - Address address = Addresses.newInetAddress(inetAddress); - addresses2.add(address); - } - // fill up the addresses3 - not overlap with addresses2 - for (int i = 0; i < numHosts; i++) { - InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 10 + i); - Address address = Addresses.newInetAddress(inetAddress); - addresses3.add(address); - } - - final List<SocketAddress> leftAddresses = Lists.newArrayList(); - final List<SocketAddress> joinAddresses = Lists.newArrayList(); - - RoutingService.RoutingListener routingListener = new RoutingService.RoutingListener() { - @Override - public void onServerLeft(SocketAddress address) { - synchronized (leftAddresses) { - leftAddresses.add(address); - leftAddresses.notifyAll(); - } - } - - @Override - public void onServerJoin(SocketAddress address) { - synchronized (joinAddresses) { - joinAddresses.add(address); - joinAddresses.notifyAll(); - } - } - }; - - routingService.registerListener(routingListener); - name.changeAddrs(addresses1); - - routingService.startService(); - - synchronized (joinAddresses) { - while (joinAddresses.size() < numHosts) { - joinAddresses.wait(); - } - } - - // validate 4 nodes joined - synchronized (joinAddresses) { - assertEquals(numHosts, joinAddresses.size()); - } - synchronized (leftAddresses) { - assertEquals(0, leftAddresses.size()); - } - assertEquals(numHosts, routingService.shardId2Address.size()); - assertEquals(numHosts, routingService.address2ShardId.size()); - for (int i = 0; i < numHosts; i++) { - InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i); - assertTrue(routingService.address2ShardId.containsKey(inetAddress)); - int shardId = routingService.address2ShardId.get(inetAddress); - SocketAddress sa = routingService.shardId2Address.get(shardId); - assertNotNull(sa); - assertEquals(inetAddress, sa); - } - - // update addresses2 - 2 new hosts joined, 2 old hosts left - name.changeAddrs(addresses2); - synchronized (joinAddresses) { - while (joinAddresses.size() < numHosts + 2) { - joinAddresses.wait(); - } - } - synchronized (leftAddresses) { - while (leftAddresses.size() < numHosts - 2) { - leftAddresses.wait(); - } - } - assertEquals(numHosts, routingService.shardId2Address.size()); - assertEquals(numHosts, routingService.address2ShardId.size()); - - // first 2 shards should leave - for (int i = 0; i < 2; i++) { - InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i); - assertFalse(routingService.address2ShardId.containsKey(inetAddress)); - } - - for (int i = 0; i < numHosts; i++) { - InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 2 + i); - assertTrue(routingService.address2ShardId.containsKey(inetAddress)); - int shardId = routingService.address2ShardId.get(inetAddress); - SocketAddress sa = routingService.shardId2Address.get(shardId); - assertNotNull(sa); - assertEquals(inetAddress, sa); - } - - // update addresses3 - 2 new hosts joined, 2 old hosts left - name.changeAddrs(addresses3); - synchronized (joinAddresses) { - while (joinAddresses.size() < numHosts + 2 + numHosts) { - joinAddresses.wait(); - } - } - synchronized (leftAddresses) { - while (leftAddresses.size() < numHosts - 2 + numHosts) { - leftAddresses.wait(); - } - } - assertEquals(numHosts, routingService.shardId2Address.size()); - assertEquals(numHosts, routingService.address2ShardId.size()); - - // first 6 shards should leave - for (int i = 0; i < 2 + numHosts; i++) { - InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i); - assertFalse(routingService.address2ShardId.containsKey(inetAddress)); - } - // new 4 shards should exist - for (int i = 0; i < numHosts; i++) { - InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 10 + i); - assertTrue(routingService.address2ShardId.containsKey(inetAddress)); - int shardId = routingService.address2ShardId.get(inetAddress); - SocketAddress sa = routingService.shardId2Address.get(shardId); - assertNotNull(sa); - assertEquals(inetAddress, sa); - } - - } - - private static class TestServerSetWatcher implements ServerSetWatcher { - - final LinkedBlockingQueue<ImmutableSet<DLSocketAddress>> changeQueue = - new LinkedBlockingQueue<ImmutableSet<DLSocketAddress>>(); - final CopyOnWriteArrayList<ServerSetMonitor> monitors = - new CopyOnWriteArrayList<ServerSetMonitor>(); - - @Override - public void watch(ServerSetMonitor monitor) throws MonitorException { - monitors.add(monitor); - ImmutableSet<DLSocketAddress> change; - while ((change = changeQueue.poll()) != null) { - notifyChanges(change); - } - } - - void notifyChanges(ImmutableSet<DLSocketAddress> addresses) { - if (monitors.isEmpty()) { - changeQueue.add(addresses); - } else { - for (ServerSetMonitor monitor : monitors) { - monitor.onChange(addresses); - } - } - } - } - - @Test(timeout = 60000) - public void testPerformServerSetChangeOnServerSet() throws Exception { - TestServerSetWatcher serverSetWatcher = new TestServerSetWatcher(); - ConsistentHashRoutingService routingService = new ConsistentHashRoutingService( - serverSetWatcher, 997, Integer.MAX_VALUE, NullStatsReceiver.get()); - - int basePort = 3180; - int numHosts = 4; - Set<DLSocketAddress> addresses1 = Sets.newConcurrentHashSet(); - Set<DLSocketAddress> addresses2 = Sets.newConcurrentHashSet(); - Set<DLSocketAddress> addresses3 = Sets.newConcurrentHashSet(); - - // fill up the addresses1 - for (int i = 0; i < numHosts; i++) { - InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i); - DLSocketAddress dsa = new DLSocketAddress(i, inetAddress); - addresses1.add(dsa); - } - // fill up the addresses2 - overlap with addresses1 - for (int i = 0; i < numHosts; i++) { - InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + numHosts + i); - DLSocketAddress dsa = new DLSocketAddress(i + 2, inetAddress); - addresses2.add(dsa); - } - // fill up the addresses3 - not overlap with addresses2 - for (int i = 0; i < numHosts; i++) { - InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 10 + i); - DLSocketAddress dsa = new DLSocketAddress(i, inetAddress); - addresses3.add(dsa); - } - - final List<SocketAddress> leftAddresses = Lists.newArrayList(); - final List<SocketAddress> joinAddresses = Lists.newArrayList(); - - RoutingService.RoutingListener routingListener = new RoutingService.RoutingListener() { - @Override - public void onServerLeft(SocketAddress address) { - synchronized (leftAddresses) { - leftAddresses.add(address); - leftAddresses.notifyAll(); - } - } - - @Override - public void onServerJoin(SocketAddress address) { - synchronized (joinAddresses) { - joinAddresses.add(address); - joinAddresses.notifyAll(); - } - } - }; - - routingService.registerListener(routingListener); - serverSetWatcher.notifyChanges(ImmutableSet.copyOf(addresses1)); - - routingService.startService(); - - synchronized (joinAddresses) { - while (joinAddresses.size() < numHosts) { - joinAddresses.wait(); - } - } - - // validate 4 nodes joined - synchronized (joinAddresses) { - assertEquals(numHosts, joinAddresses.size()); - } - synchronized (leftAddresses) { - assertEquals(0, leftAddresses.size()); - } - assertEquals(numHosts, routingService.shardId2Address.size()); - assertEquals(numHosts, routingService.address2ShardId.size()); - for (int i = 0; i < numHosts; i++) { - InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i); - assertTrue(routingService.address2ShardId.containsKey(inetAddress)); - int shardId = routingService.address2ShardId.get(inetAddress); - assertEquals(i, shardId); - SocketAddress sa = routingService.shardId2Address.get(shardId); - assertNotNull(sa); - assertEquals(inetAddress, sa); - } - - // update addresses2 - 2 new hosts joined, 2 old hosts left - serverSetWatcher.notifyChanges(ImmutableSet.copyOf(addresses2)); - synchronized (joinAddresses) { - while (joinAddresses.size() < numHosts + 2) { - joinAddresses.wait(); - } - } - synchronized (leftAddresses) { - while (leftAddresses.size() < 2) { - leftAddresses.wait(); - } - } - - assertEquals(numHosts + 2, routingService.shardId2Address.size()); - assertEquals(numHosts + 2, routingService.address2ShardId.size()); - // first 2 shards should not leave - for (int i = 0; i < 2; i++) { - InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i); - assertTrue(routingService.address2ShardId.containsKey(inetAddress)); - int shardId = routingService.address2ShardId.get(inetAddress); - assertEquals(i, shardId); - SocketAddress sa = routingService.shardId2Address.get(shardId); - assertNotNull(sa); - assertEquals(inetAddress, sa); - } - for (int i = 0; i < numHosts; i++) { - InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + numHosts + i); - assertTrue(routingService.address2ShardId.containsKey(inetAddress)); - int shardId = routingService.address2ShardId.get(inetAddress); - assertEquals(i + 2, shardId); - SocketAddress sa = routingService.shardId2Address.get(shardId); - assertNotNull(sa); - assertEquals(inetAddress, sa); - } - - // update addresses3 - serverSetWatcher.notifyChanges(ImmutableSet.copyOf(addresses3)); - synchronized (joinAddresses) { - while (joinAddresses.size() < numHosts + 2 + numHosts) { - joinAddresses.wait(); - } - } - synchronized (leftAddresses) { - while (leftAddresses.size() < 2 + numHosts) { - leftAddresses.wait(); - } - } - assertEquals(numHosts + 2, routingService.shardId2Address.size()); - assertEquals(numHosts + 2, routingService.address2ShardId.size()); - - // first 4 shards should leave - for (int i = 0; i < numHosts; i++) { - InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 10 + i); - assertTrue(routingService.address2ShardId.containsKey(inetAddress)); - int shardId = routingService.address2ShardId.get(inetAddress); - assertEquals(i, shardId); - SocketAddress sa = routingService.shardId2Address.get(shardId); - assertNotNull(sa); - assertEquals(inetAddress, sa); - } - // the other 2 shards should be still there - for (int i = 0; i < 2; i++) { - InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + numHosts + 2 + i); - assertTrue(routingService.address2ShardId.containsKey(inetAddress)); - int shardId = routingService.address2ShardId.get(inetAddress); - assertEquals(numHosts + i, shardId); - SocketAddress sa = routingService.shardId2Address.get(shardId); - assertNotNull(sa); - assertEquals(inetAddress, sa); - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestInetNameResolution.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestInetNameResolution.java b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestInetNameResolution.java deleted file mode 100644 index 2552f9e..0000000 --- a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestInetNameResolution.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.routing; - -import com.google.common.collect.ImmutableSet; -import com.twitter.common.net.pool.DynamicHostSet; -import com.twitter.thrift.Endpoint; -import com.twitter.thrift.ServiceInstance; -import java.net.InetSocketAddress; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Test Case for `inet` name resolution. - */ -public class TestInetNameResolution { - - private static final Logger logger = LoggerFactory.getLogger(TestRoutingService.class); - - @Test(timeout = 10000) - public void testInetNameResolution() throws Exception { - String nameStr = "inet!127.0.0.1:3181"; - final CountDownLatch resolved = new CountDownLatch(1); - final AtomicBoolean validationFailed = new AtomicBoolean(false); - - NameServerSet serverSet = new NameServerSet(nameStr); - serverSet.watch(new DynamicHostSet.HostChangeMonitor<ServiceInstance>() { - @Override - public void onChange(ImmutableSet<ServiceInstance> hostSet) { - if (hostSet.size() > 1) { - logger.error("HostSet has more elements than expected {}", hostSet); - validationFailed.set(true); - resolved.countDown(); - } else if (hostSet.size() == 1) { - ServiceInstance serviceInstance = hostSet.iterator().next(); - Endpoint endpoint = serviceInstance.getAdditionalEndpoints().get("thrift"); - InetSocketAddress address = new InetSocketAddress(endpoint.getHost(), endpoint.getPort()); - if (endpoint.getPort() != 3181) { - logger.error("Port does not match the expected port {}", endpoint.getPort()); - validationFailed.set(true); - } else if (!address.getAddress().getHostAddress().equals("127.0.0.1")) { - logger.error("Host address does not match the expected address {}", - address.getAddress().getHostAddress()); - validationFailed.set(true); - } - resolved.countDown(); - } - } - }); - - resolved.await(); - Assert.assertEquals(false, validationFailed.get()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestRegionsRoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestRegionsRoutingService.java b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestRegionsRoutingService.java deleted file mode 100644 index 49a375c..0000000 --- a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestRegionsRoutingService.java +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.routing; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import com.google.common.collect.Sets; -import com.twitter.distributedlog.client.resolver.DefaultRegionResolver; -import com.twitter.distributedlog.thrift.service.StatusCode; -import com.twitter.finagle.NoBrokersAvailableException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Test; - -/** - * Test Case for {@link RegionsRoutingService}. - */ -public class TestRegionsRoutingService { - - @Test(timeout = 60000) - public void testRoutingListener() throws Exception { - int numRoutingServices = 5; - RoutingService.Builder[] routingServiceBuilders = new RoutingService.Builder[numRoutingServices]; - Set<SocketAddress> hosts = new HashSet<SocketAddress>(); - Map<SocketAddress, String> regionMap = new HashMap<SocketAddress, String>(); - for (int i = 0; i < numRoutingServices; i++) { - String finagleNameStr = "inet!127.0.0.1:" + (3181 + i); - routingServiceBuilders[i] = RoutingUtils.buildRoutingService(finagleNameStr); - SocketAddress address = new InetSocketAddress("127.0.0.1", 3181 + i); - hosts.add(address); - regionMap.put(address, "region-" + i); - } - - final CountDownLatch doneLatch = new CountDownLatch(numRoutingServices); - final AtomicInteger numHostsLeft = new AtomicInteger(0); - final Set<SocketAddress> jointHosts = new HashSet<SocketAddress>(); - RegionsRoutingService regionsRoutingService = - RegionsRoutingService.newBuilder() - .routingServiceBuilders(routingServiceBuilders) - .resolver(new DefaultRegionResolver(regionMap)) - .build(); - regionsRoutingService.registerListener(new RoutingService.RoutingListener() { - @Override - public void onServerLeft(SocketAddress address) { - numHostsLeft.incrementAndGet(); - } - - @Override - public void onServerJoin(SocketAddress address) { - jointHosts.add(address); - doneLatch.countDown(); - } - }); - - regionsRoutingService.startService(); - - doneLatch.await(); - - assertEquals(numRoutingServices, jointHosts.size()); - assertEquals(0, numHostsLeft.get()); - assertTrue(Sets.difference(hosts, jointHosts).immutableCopy().isEmpty()); - } - - @Test(timeout = 60000) - public void testGetHost() throws Exception { - int numRoutingServices = 3; - RoutingService.Builder[] routingServiceBuilders = new RoutingService.Builder[numRoutingServices]; - Map<SocketAddress, String> regionMap = new HashMap<SocketAddress, String>(); - for (int i = 0; i < numRoutingServices; i++) { - String finagleNameStr = "inet!127.0.0.1:" + (3181 + i); - routingServiceBuilders[i] = RoutingUtils.buildRoutingService(finagleNameStr); - SocketAddress address = new InetSocketAddress("127.0.0.1", 3181 + i); - regionMap.put(address, "region-" + i); - } - - RegionsRoutingService regionsRoutingService = - RegionsRoutingService.newBuilder() - .resolver(new DefaultRegionResolver(regionMap)) - .routingServiceBuilders(routingServiceBuilders) - .build(); - regionsRoutingService.startService(); - - RoutingService.RoutingContext routingContext = - RoutingService.RoutingContext.of(new DefaultRegionResolver()) - .addTriedHost(new InetSocketAddress("127.0.0.1", 3183), StatusCode.WRITE_EXCEPTION); - assertEquals(new InetSocketAddress("127.0.0.1", 3181), - regionsRoutingService.getHost("any", routingContext)); - - routingContext = - RoutingService.RoutingContext.of(new DefaultRegionResolver()) - .addTriedHost(new InetSocketAddress("127.0.0.1", 3181), StatusCode.WRITE_EXCEPTION); - assertEquals(new InetSocketAddress("127.0.0.1", 3182), - regionsRoutingService.getHost("any", routingContext)); - - // add 3182 to routing context as tried host - routingContext.addTriedHost(new InetSocketAddress("127.0.0.1", 3182), StatusCode.WRITE_EXCEPTION); - assertEquals(new InetSocketAddress("127.0.0.1", 3183), - regionsRoutingService.getHost("any", routingContext)); - - // add 3183 to routing context as tried host - routingContext.addTriedHost(new InetSocketAddress("127.0.0.1", 3183), StatusCode.WRITE_EXCEPTION); - try { - regionsRoutingService.getHost("any", routingContext); - fail("Should fail to get host since all regions are tried."); - } catch (NoBrokersAvailableException nbae) { - // expected - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestRoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestRoutingService.java b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestRoutingService.java deleted file mode 100644 index b79557e..0000000 --- a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/routing/TestRoutingService.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.routing; - -import static org.junit.Assert.assertEquals; - -import com.twitter.distributedlog.client.resolver.DefaultRegionResolver; -import com.twitter.finagle.Address; -import com.twitter.finagle.Addresses; -import com.twitter.finagle.addr.WeightedAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Test Case for {@link RoutingService}. - */ -@RunWith(Parameterized.class) -public class TestRoutingService { - - static final Logger LOG = LoggerFactory.getLogger(TestRoutingService.class); - - @Parameterized.Parameters - public static Collection<Object[]> configs() { - ArrayList<Object[]> list = new ArrayList<Object[]>(); - for (int i = 0; i <= 1; i++) { - for (int j = 0; j <= 1; j++) { - for (int k = 0; k <= 1; k++) { - list.add(new Boolean[] {i == 1, j == 1, k == 1}); - } - } - } - return list; - } - - private final boolean consistentHash; - private final boolean weightedAddresses; - private final boolean asyncResolution; - - public TestRoutingService(boolean consistentHash, boolean weightedAddresses, boolean asyncResolution) { - this.consistentHash = consistentHash; - this.weightedAddresses = weightedAddresses; - this.asyncResolution = asyncResolution; - } - - private List<Address> getAddresses(boolean weightedAddresses) { - ArrayList<Address> addresses = new ArrayList<Address>(); - addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.1", 3181))); - addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.2", 3181))); - addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.3", 3181))); - addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.4", 3181))); - addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.5", 3181))); - addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.6", 3181))); - addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.7", 3181))); - - if (weightedAddresses) { - ArrayList<Address> wAddresses = new ArrayList<Address>(); - for (Address address: addresses) { - wAddresses.add(WeightedAddress.apply(address, 1.0)); - } - return wAddresses; - } else { - return addresses; - } - } - - private void testRoutingServiceHelper(boolean consistentHash, - boolean weightedAddresses, - boolean asyncResolution) - throws Exception { - ExecutorService executorService = null; - final List<Address> addresses = getAddresses(weightedAddresses); - final TestName name = new TestName(); - RoutingService routingService; - if (consistentHash) { - routingService = ConsistentHashRoutingService.newBuilder() - .serverSet(new NameServerSet(name)) - .resolveFromName(true) - .numReplicas(997) - .build(); - } else { - routingService = ServerSetRoutingService.newServerSetRoutingServiceBuilder() - .serverSetWatcher(new TwitterServerSetWatcher(new NameServerSet(name), true)).build(); - } - - if (asyncResolution) { - executorService = Executors.newSingleThreadExecutor(); - executorService.submit(new Runnable() { - @Override - public void run() { - name.changeAddrs(addresses); - } - }); - } else { - name.changeAddrs(addresses); - } - routingService.startService(); - - HashSet<SocketAddress> mapping = new HashSet<SocketAddress>(); - - for (int i = 0; i < 1000; i++) { - for (int j = 0; j < 5; j++) { - String stream = "TestStream-" + i + "-" + j; - mapping.add(routingService.getHost(stream, - RoutingService.RoutingContext.of(new DefaultRegionResolver()))); - } - } - - assertEquals(mapping.size(), addresses.size()); - - if (null != executorService) { - executorService.shutdown(); - } - - } - - @Test(timeout = 5000) - public void testRoutingService() throws Exception { - testRoutingServiceHelper(this.consistentHash, this.weightedAddresses, this.asyncResolution); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/com/twitter/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java deleted file mode 100644 index 71d0b01..0000000 --- a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.speculative; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; - -import com.twitter.util.CountDownLatch; -import com.twitter.util.Future; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Test; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -/** - * Test {@link TestDefaultSpeculativeRequestExecutionPolicy}. - */ -public class TestDefaultSpeculativeRequestExecutionPolicy { - - @Test(timeout = 20000, expected = IllegalArgumentException.class) - public void testInvalidBackoffMultiplier() throws Exception { - new DefaultSpeculativeRequestExecutionPolicy(100, 200, -1); - } - - @Test(timeout = 20000, expected = IllegalArgumentException.class) - public void testInvalidMaxSpeculativeTimeout() throws Exception { - new DefaultSpeculativeRequestExecutionPolicy(100, Integer.MAX_VALUE, 2); - } - - @Test(timeout = 20000) - public void testSpeculativeRequests() throws Exception { - DefaultSpeculativeRequestExecutionPolicy policy = - new DefaultSpeculativeRequestExecutionPolicy(10, 10000, 2); - SpeculativeRequestExecutor executor = mock(SpeculativeRequestExecutor.class); - - final AtomicInteger callCount = new AtomicInteger(0); - final CountDownLatch latch = new CountDownLatch(3); - - Mockito.doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - try { - return Future.value(callCount.incrementAndGet() < 3); - } finally { - latch.countDown(); - } - } - }).when(executor).issueSpeculativeRequest(); - - ScheduledExecutorService executorService = - Executors.newSingleThreadScheduledExecutor(); - policy.initiateSpeculativeRequest(executorService, executor); - - latch.await(); - - assertEquals(40, policy.getNextSpeculativeRequestTimeout()); - } - - @Test(timeout = 20000) - public void testSpeculativeRequestsWithMaxTimeout() throws Exception { - DefaultSpeculativeRequestExecutionPolicy policy = - new DefaultSpeculativeRequestExecutionPolicy(10, 15, 2); - SpeculativeRequestExecutor executor = mock(SpeculativeRequestExecutor.class); - - final AtomicInteger callCount = new AtomicInteger(0); - final CountDownLatch latch = new CountDownLatch(3); - - Mockito.doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - try { - return Future.value(callCount.incrementAndGet() < 3); - } finally { - latch.countDown(); - } - } - }).when(executor).issueSpeculativeRequest(); - - ScheduledExecutorService executorService = - Executors.newSingleThreadScheduledExecutor(); - policy.initiateSpeculativeRequest(executorService, executor); - - latch.await(); - - assertEquals(15, policy.getNextSpeculativeRequestTimeout()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/com/twitter/distributedlog/service/TestDistributedLogClientBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/test/java/com/twitter/distributedlog/service/TestDistributedLogClientBuilder.java b/distributedlog-client/src/test/java/com/twitter/distributedlog/service/TestDistributedLogClientBuilder.java deleted file mode 100644 index 986cdd3..0000000 --- a/distributedlog-client/src/test/java/com/twitter/distributedlog/service/TestDistributedLogClientBuilder.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.service; - -import static org.junit.Assert.assertFalse; - -import com.twitter.finagle.builder.ClientBuilder; -import com.twitter.finagle.thrift.ClientId$; -import com.twitter.util.Duration; -import org.junit.Test; - -/** - * Test Case of {@link com.twitter.distributedlog.service.DistributedLogClientBuilder}. - */ -public class TestDistributedLogClientBuilder { - - @Test(timeout = 60000) - public void testBuildClientsFromSameBuilder() throws Exception { - DistributedLogClientBuilder builder = DistributedLogClientBuilder.newBuilder() - .name("build-clients-from-same-builder") - .clientId(ClientId$.MODULE$.apply("test-builder")) - .finagleNameStr("inet!127.0.0.1:7001") - .streamNameRegex(".*") - .handshakeWithClientInfo(true) - .clientBuilder(ClientBuilder.get() - .hostConnectionLimit(1) - .connectTimeout(Duration.fromSeconds(1)) - .tcpConnectTimeout(Duration.fromSeconds(1)) - .requestTimeout(Duration.fromSeconds(10))); - DistributedLogClient client1 = builder.build(); - DistributedLogClient client2 = builder.build(); - assertFalse(client1 == client2); - } -}