http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java b/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java deleted file mode 100644 index 0e2a152..0000000 --- a/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java +++ /dev/null @@ -1,608 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.twitter.common.zookeeper.ServerSet; -import org.apache.distributedlog.client.ClientConfig; -import org.apache.distributedlog.client.DistributedLogClientImpl; -import org.apache.distributedlog.client.monitor.MonitorServiceClient; -import org.apache.distributedlog.client.proxy.ClusterClient; -import org.apache.distributedlog.client.resolver.DefaultRegionResolver; -import org.apache.distributedlog.client.resolver.RegionResolver; -import org.apache.distributedlog.client.routing.RegionsRoutingService; -import org.apache.distributedlog.client.routing.RoutingService; -import org.apache.distributedlog.client.routing.RoutingUtils; -import org.apache.distributedlog.thrift.service.DistributedLogService; -import com.twitter.finagle.Name; -import com.twitter.finagle.Resolver$; -import com.twitter.finagle.Service; -import com.twitter.finagle.ThriftMux; -import com.twitter.finagle.builder.ClientBuilder; -import com.twitter.finagle.stats.NullStatsReceiver; -import com.twitter.finagle.stats.StatsReceiver; -import com.twitter.finagle.thrift.ClientId; -import com.twitter.finagle.thrift.ThriftClientFramedCodec; -import com.twitter.finagle.thrift.ThriftClientRequest; -import com.twitter.util.Duration; -import java.net.SocketAddress; -import java.net.URI; -import java.util.Random; -import org.apache.commons.lang.StringUtils; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Option; - -/** - * Builder to build {@link DistributedLogClient}. - */ -public final class DistributedLogClientBuilder { - - private static final Logger logger = LoggerFactory.getLogger(DistributedLogClientBuilder.class); - - private static final Random random = new Random(System.currentTimeMillis()); - - private String name = null; - private ClientId clientId = null; - private RoutingService.Builder routingServiceBuilder = null; - private ClientBuilder clientBuilder = null; - private String serverRoutingServiceFinagleName = null; - private StatsReceiver statsReceiver = new NullStatsReceiver(); - private StatsReceiver streamStatsReceiver = new NullStatsReceiver(); - private ClientConfig clientConfig = new ClientConfig(); - private boolean enableRegionStats = false; - private final RegionResolver regionResolver = new DefaultRegionResolver(); - - /** - * Create a client builder. - * - * @return client builder - */ - public static DistributedLogClientBuilder newBuilder() { - return new DistributedLogClientBuilder(); - } - - /** - * Create a new client builder from an existing {@code builder}. - * - * @param builder the existing builder. - * @return a new client builder. - */ - public static DistributedLogClientBuilder newBuilder(DistributedLogClientBuilder builder) { - DistributedLogClientBuilder newBuilder = new DistributedLogClientBuilder(); - newBuilder.name = builder.name; - newBuilder.clientId = builder.clientId; - newBuilder.clientBuilder = builder.clientBuilder; - newBuilder.routingServiceBuilder = builder.routingServiceBuilder; - newBuilder.statsReceiver = builder.statsReceiver; - newBuilder.streamStatsReceiver = builder.streamStatsReceiver; - newBuilder.enableRegionStats = builder.enableRegionStats; - newBuilder.serverRoutingServiceFinagleName = builder.serverRoutingServiceFinagleName; - newBuilder.clientConfig = ClientConfig.newConfig(builder.clientConfig); - return newBuilder; - } - - // private constructor - private DistributedLogClientBuilder() {} - - /** - * Client Name. - * - * @param name - * client name - * @return client builder. - */ - public DistributedLogClientBuilder name(String name) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.name = name; - return newBuilder; - } - - /** - * Client ID. - * - * @param clientId - * client id - * @return client builder. - */ - public DistributedLogClientBuilder clientId(ClientId clientId) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientId = clientId; - return newBuilder; - } - - /** - * Serverset to access proxy services. - * - * @param serverSet - * server set. - * @return client builder. - */ - public DistributedLogClientBuilder serverSet(ServerSet serverSet) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(serverSet); - newBuilder.enableRegionStats = false; - return newBuilder; - } - - /** - * Server Sets to access proxy services. - * - * <p>The <i>local</i> server set will be tried first then <i>remotes</i>. - * - * @param local local server set. - * @param remotes remote server sets. - * @return client builder. - */ - public DistributedLogClientBuilder serverSets(ServerSet local, ServerSet...remotes) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - RoutingService.Builder[] builders = new RoutingService.Builder[remotes.length + 1]; - builders[0] = RoutingUtils.buildRoutingService(local); - for (int i = 1; i < builders.length; i++) { - builders[i] = RoutingUtils.buildRoutingService(remotes[i - 1]); - } - newBuilder.routingServiceBuilder = RegionsRoutingService.newBuilder() - .resolver(regionResolver) - .routingServiceBuilders(builders); - newBuilder.enableRegionStats = remotes.length > 0; - return newBuilder; - } - - /** - * Name to access proxy services. - * - * @param finagleNameStr - * finagle name string. - * @return client builder. - */ - public DistributedLogClientBuilder finagleNameStr(String finagleNameStr) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(finagleNameStr); - newBuilder.enableRegionStats = false; - return newBuilder; - } - - /** - * Finagle name strs to access proxy services. - * - * <p>The <i>local</i> finalge name str will be tried first, then <i>remotes</i>. - * - * @param local local server set. - * @param remotes remote server sets. - * @return client builder. - */ - public DistributedLogClientBuilder finagleNameStrs(String local, String...remotes) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - RoutingService.Builder[] builders = new RoutingService.Builder[remotes.length + 1]; - builders[0] = RoutingUtils.buildRoutingService(local); - for (int i = 1; i < builders.length; i++) { - builders[i] = RoutingUtils.buildRoutingService(remotes[i - 1]); - } - newBuilder.routingServiceBuilder = RegionsRoutingService.newBuilder() - .routingServiceBuilders(builders) - .resolver(regionResolver); - newBuilder.enableRegionStats = remotes.length > 0; - return newBuilder; - } - - /** - * URI to access proxy services. - * - * <p>Assuming the write proxies are announced under `.write_proxy` of the provided namespace uri. - * The builder will convert the dl uri (e.g. distributedlog://{zkserver}/path/to/namespace) to - * zookeeper serverset based finagle name str (`zk!{zkserver}!/path/to/namespace/.write_proxy`) - * - * @param uri namespace uri to access the serverset of write proxies - * @return distributedlog builder - */ - public DistributedLogClientBuilder uri(URI uri) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - String zkServers = uri.getAuthority().replace(";", ","); - String[] zkServerList = StringUtils.split(zkServers, ','); - String finagleNameStr = String.format( - "zk!%s!%s/.write_proxy", - zkServerList[random.nextInt(zkServerList.length)], // zk server - uri.getPath()); - newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(finagleNameStr); - newBuilder.enableRegionStats = false; - return newBuilder; - } - - /** - * Address of write proxy to connect. - * - * @param address - * write proxy address. - * @return client builder. - */ - public DistributedLogClientBuilder host(SocketAddress address) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(address); - newBuilder.enableRegionStats = false; - return newBuilder; - } - - private DistributedLogClientBuilder routingServiceBuilder(RoutingService.Builder builder) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.routingServiceBuilder = builder; - newBuilder.enableRegionStats = false; - return newBuilder; - } - - /** - * Routing Service to access proxy services. - * - * @param routingService - * routing service - * @return client builder. - */ - @VisibleForTesting - public DistributedLogClientBuilder routingService(RoutingService routingService) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(routingService); - newBuilder.enableRegionStats = false; - return newBuilder; - } - - /** - * Stats receiver to expose client stats. - * - * @param statsReceiver - * stats receiver. - * @return client builder. - */ - public DistributedLogClientBuilder statsReceiver(StatsReceiver statsReceiver) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.statsReceiver = statsReceiver; - return newBuilder; - } - - /** - * Stream Stats Receiver to expose per stream stats. - * - * @param streamStatsReceiver - * stream stats receiver - * @return client builder. - */ - public DistributedLogClientBuilder streamStatsReceiver(StatsReceiver streamStatsReceiver) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.streamStatsReceiver = streamStatsReceiver; - return newBuilder; - } - - /** - * Set underlying finagle client builder. - * - * @param builder - * finagle client builder. - * @return client builder. - */ - public DistributedLogClientBuilder clientBuilder(ClientBuilder builder) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientBuilder = builder; - return newBuilder; - } - - /** - * Backoff time when redirecting to an already retried host. - * - * @param ms - * backoff time. - * @return client builder. - */ - public DistributedLogClientBuilder redirectBackoffStartMs(int ms) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setRedirectBackoffStartMs(ms); - return newBuilder; - } - - /** - * Max backoff time when redirecting to an already retried host. - * - * @param ms - * backoff time. - * @return client builder. - */ - public DistributedLogClientBuilder redirectBackoffMaxMs(int ms) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setRedirectBackoffMaxMs(ms); - return newBuilder; - } - - /** - * Max redirects that is allowed per request. - * - * <p>If <i>redirects</i> are exhausted, fail the request immediately. - * - * @param redirects - * max redirects allowed before failing a request. - * @return client builder. - */ - public DistributedLogClientBuilder maxRedirects(int redirects) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setMaxRedirects(redirects); - return newBuilder; - } - - /** - * Timeout per request in millis. - * - * @param timeoutMs - * timeout per request in millis. - * @return client builder. - */ - public DistributedLogClientBuilder requestTimeoutMs(int timeoutMs) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setRequestTimeoutMs(timeoutMs); - return newBuilder; - } - - /** - * Set thriftmux enabled. - * - * @param enabled - * is thriftmux enabled - * @return client builder. - */ - public DistributedLogClientBuilder thriftmux(boolean enabled) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setThriftMux(enabled); - return newBuilder; - } - - /** - * Set failfast stream exception handling enabled. - * - * @param enabled - * is failfast exception handling enabled - * @return client builder. - */ - public DistributedLogClientBuilder streamFailfast(boolean enabled) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setStreamFailfast(enabled); - return newBuilder; - } - - /** - * Set the regex to match stream names that the client cares about. - * - * @param nameRegex - * stream name regex - * @return client builder - */ - public DistributedLogClientBuilder streamNameRegex(String nameRegex) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setStreamNameRegex(nameRegex); - return newBuilder; - } - - /** - * Whether to use the new handshake endpoint to exchange ownership cache. - * - * <p>Enable this when the servers are updated to support handshaking with client info. - * - * @param enabled - * new handshake endpoint is enabled. - * @return client builder. - */ - public DistributedLogClientBuilder handshakeWithClientInfo(boolean enabled) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setHandshakeWithClientInfo(enabled); - return newBuilder; - } - - /** - * Set the periodic handshake interval in milliseconds. - * - * <p>Every <code>intervalMs</code>, the DL client will handshake with existing proxies again. - * If the interval is less than ownership sync interval, the handshake won't sync ownerships. Otherwise, it will. - * - * @see #periodicOwnershipSyncIntervalMs(long) - * @param intervalMs - * handshake interval - * @return client builder. - */ - public DistributedLogClientBuilder periodicHandshakeIntervalMs(long intervalMs) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setPeriodicHandshakeIntervalMs(intervalMs); - return newBuilder; - } - - /** - * Set the periodic ownership sync interval in milliseconds. - * - * <p>If periodic handshake is enabled, the handshake will sync ownership if the elapsed time is larger than - * sync interval. - * - * @see #periodicHandshakeIntervalMs(long) - * @param intervalMs - * interval that handshake should sync ownerships. - * @return client builder - */ - public DistributedLogClientBuilder periodicOwnershipSyncIntervalMs(long intervalMs) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setPeriodicOwnershipSyncIntervalMs(intervalMs); - return newBuilder; - } - - /** - * Enable/Disable periodic dumping ownership cache. - * - * @param enabled - * flag to enable/disable periodic dumping ownership cache - * @return client builder. - */ - public DistributedLogClientBuilder periodicDumpOwnershipCache(boolean enabled) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setPeriodicDumpOwnershipCacheEnabled(enabled); - return newBuilder; - } - - /** - * Set periodic dumping ownership cache interval. - * - * @param intervalMs - * interval on dumping ownership cache, in millis. - * @return client builder - */ - public DistributedLogClientBuilder periodicDumpOwnershipCacheIntervalMs(long intervalMs) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setPeriodicDumpOwnershipCacheIntervalMs(intervalMs); - return newBuilder; - } - - /** - * Enable handshake tracing. - * - * @param enabled - * flag to enable/disable handshake tracing - * @return client builder - */ - public DistributedLogClientBuilder handshakeTracing(boolean enabled) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setHandshakeTracingEnabled(enabled); - return newBuilder; - } - - /** - * Enable checksum on requests to the proxy. - * - * @param enabled - * flag to enable/disable checksum - * @return client builder - */ - public DistributedLogClientBuilder checksum(boolean enabled) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setChecksumEnabled(enabled); - return newBuilder; - } - - /** - * Configure the finagle name string for the server-side routing service. - * - * @param nameStr name string of the server-side routing service - * @return client builder - */ - public DistributedLogClientBuilder serverRoutingServiceFinagleNameStr(String nameStr) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.serverRoutingServiceFinagleName = nameStr; - return newBuilder; - } - - DistributedLogClientBuilder clientConfig(ClientConfig clientConfig) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig = ClientConfig.newConfig(clientConfig); - return newBuilder; - } - - /** - * Build distributedlog client. - * - * @return distributedlog client. - */ - public DistributedLogClient build() { - return buildClient(); - } - - /** - * Build monitor service client. - * - * @return monitor service client. - */ - public MonitorServiceClient buildMonitorClient() { - - return buildClient(); - } - - @SuppressWarnings("unchecked") - ClusterClient buildServerRoutingServiceClient(String serverRoutingServiceFinagleName) { - ClientBuilder builder = this.clientBuilder; - if (null == builder) { - builder = ClientBuilder.get() - .tcpConnectTimeout(Duration.fromMilliseconds(200)) - .connectTimeout(Duration.fromMilliseconds(200)) - .requestTimeout(Duration.fromSeconds(1)) - .retries(20); - if (!clientConfig.getThriftMux()) { - builder = builder.hostConnectionLimit(1); - } - } - if (clientConfig.getThriftMux()) { - builder = builder.stack(ThriftMux.client().withClientId(clientId)); - } else { - builder = builder.codec(ThriftClientFramedCodec.apply(Option.apply(clientId))); - } - - Name name; - try { - name = Resolver$.MODULE$.eval(serverRoutingServiceFinagleName); - } catch (Exception exc) { - logger.error("Exception in Resolver.eval for name {}", serverRoutingServiceFinagleName, exc); - throw new RuntimeException(exc); - } - - // builder the client - Service<ThriftClientRequest, byte[]> client = - ClientBuilder.safeBuildFactory( - builder.dest(name).reportTo(statsReceiver.scope("routing")) - ).toService(); - DistributedLogService.ServiceIface service = - new DistributedLogService.ServiceToClient(client, new TBinaryProtocol.Factory()); - return new ClusterClient(client, service); - } - - DistributedLogClientImpl buildClient() { - checkNotNull(name, "No name provided."); - checkNotNull(clientId, "No client id provided."); - checkNotNull(routingServiceBuilder, "No routing service builder provided."); - checkNotNull(statsReceiver, "No stats receiver provided."); - if (null == streamStatsReceiver) { - streamStatsReceiver = new NullStatsReceiver(); - } - - Optional<ClusterClient> serverRoutingServiceClient = Optional.absent(); - if (null != serverRoutingServiceFinagleName) { - serverRoutingServiceClient = Optional.of( - buildServerRoutingServiceClient(serverRoutingServiceFinagleName)); - } - - RoutingService routingService = routingServiceBuilder - .statsReceiver(statsReceiver.scope("routing")) - .build(); - DistributedLogClientImpl clientImpl = - new DistributedLogClientImpl( - name, - clientId, - routingService, - clientBuilder, - clientConfig, - serverRoutingServiceClient, - statsReceiver, - streamStatsReceiver, - regionResolver, - enableRegionStats); - routingService.startService(); - clientImpl.handshake(); - return clientImpl; - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/service/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/service/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/service/package-info.java deleted file mode 100644 index 033882f..0000000 --- a/distributedlog-client/src/main/java/org/apache/distributedlog/service/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * DistributedLog Service Client. - */ -package org.apache.distributedlog.service; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/resources/findbugsExclude.xml ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/resources/findbugsExclude.xml b/distributedlog-client/src/main/resources/findbugsExclude.xml deleted file mode 100644 index 05ee085..0000000 --- a/distributedlog-client/src/main/resources/findbugsExclude.xml +++ /dev/null @@ -1,23 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -//--> -<FindBugsFilter> - <Match> - <!-- generated code, we can't be held responsible for findbugs in it //--> - <Class name="~org\.apache\.distributedlog\.thrift.*" /> - </Match> -</FindBugsFilter> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/test/java/org/apache/distributedlog/client/TestDistributedLogMultiStreamWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/TestDistributedLogMultiStreamWriter.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/TestDistributedLogMultiStreamWriter.java deleted file mode 100644 index d7494de..0000000 --- a/distributedlog-client/src/test/java/org/apache/distributedlog/client/TestDistributedLogMultiStreamWriter.java +++ /dev/null @@ -1,383 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.client; - -import static com.google.common.base.Charsets.UTF_8; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import com.google.common.collect.Lists; -import org.apache.distributedlog.DLSN; -import org.apache.distributedlog.LogRecord; -import org.apache.distributedlog.LogRecordSet; -import org.apache.distributedlog.LogRecordSetBuffer; -import org.apache.distributedlog.exceptions.LogRecordTooLongException; -import org.apache.distributedlog.io.CompressionCodec; -import org.apache.distributedlog.service.DistributedLogClient; -import com.twitter.finagle.IndividualRequestTimeoutException; -import com.twitter.util.Await; -import com.twitter.util.Future; -import com.twitter.util.Promise; -import java.nio.ByteBuffer; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.junit.Test; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -/** - * Test {@link DistributedLogMultiStreamWriter}. - */ -public class TestDistributedLogMultiStreamWriter { - - @Test(timeout = 20000, expected = IllegalArgumentException.class) - public void testBuildWithNullStreams() throws Exception { - DistributedLogMultiStreamWriter.newBuilder() - .build(); - } - - @Test(timeout = 20000, expected = IllegalArgumentException.class) - public void testBuildWithEmptyStreamList() throws Exception { - DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.<String>newArrayList()) - .build(); - } - - @Test(timeout = 20000, expected = NullPointerException.class) - public void testBuildWithNullClient() throws Exception { - DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .build(); - } - - @Test(timeout = 20000, expected = NullPointerException.class) - public void testBuildWithNullCodec() throws Exception { - DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(mock(DistributedLogClient.class)) - .compressionCodec(null) - .build(); - } - - @Test(timeout = 20000, expected = IllegalArgumentException.class) - public void testBuildWithInvalidSpeculativeSettings1() - throws Exception { - DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(mock(DistributedLogClient.class)) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(-1) - .build(); - } - - @Test(timeout = 20000, expected = IllegalArgumentException.class) - public void testBuildWithInvalidSpeculativeSettings2() - throws Exception { - DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(mock(DistributedLogClient.class)) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(10) - .maxSpeculativeTimeoutMs(5) - .build(); - } - - @Test(timeout = 20000, expected = IllegalArgumentException.class) - public void testBuildWithInvalidSpeculativeSettings3() - throws Exception { - DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(mock(DistributedLogClient.class)) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(10) - .maxSpeculativeTimeoutMs(20) - .speculativeBackoffMultiplier(-1) - .build(); - } - - @Test(timeout = 20000, expected = IllegalArgumentException.class) - public void testBuildWithInvalidSpeculativeSettings4() - throws Exception { - DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(mock(DistributedLogClient.class)) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(10) - .maxSpeculativeTimeoutMs(20) - .speculativeBackoffMultiplier(2) - .requestTimeoutMs(10) - .build(); - } - - @Test(timeout = 20000) - public void testBuildMultiStreamWriter() - throws Exception { - DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(mock(DistributedLogClient.class)) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(10) - .maxSpeculativeTimeoutMs(20) - .speculativeBackoffMultiplier(2) - .requestTimeoutMs(50) - .build(); - assertTrue(true); - } - - @Test(timeout = 20000) - public void testBuildWithPeriodicalFlushEnabled() throws Exception { - ScheduledExecutorService executorService = mock(ScheduledExecutorService.class); - DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(mock(DistributedLogClient.class)) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(10) - .maxSpeculativeTimeoutMs(20) - .speculativeBackoffMultiplier(2) - .requestTimeoutMs(50) - .flushIntervalMs(1000) - .scheduler(executorService) - .build(); - verify(executorService, times(1)).scheduleAtFixedRate(writer, 1000000, 1000000, TimeUnit.MICROSECONDS); - } - - @Test(timeout = 20000) - public void testBuildWithPeriodicalFlushDisabled() throws Exception { - ScheduledExecutorService executorService = mock(ScheduledExecutorService.class); - DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(mock(DistributedLogClient.class)) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(10) - .maxSpeculativeTimeoutMs(20) - .speculativeBackoffMultiplier(2) - .requestTimeoutMs(50) - .flushIntervalMs(0) - .scheduler(executorService) - .build(); - verify(executorService, times(0)).scheduleAtFixedRate(writer, 1000, 1000, TimeUnit.MILLISECONDS); - writer.close(); - } - - @Test(timeout = 20000) - public void testFlushWhenBufferIsFull() throws Exception { - DistributedLogClient client = mock(DistributedLogClient.class); - when(client.writeRecordSet((String) any(), (LogRecordSetBuffer) any())) - .thenReturn(Future.value(new DLSN(1L, 1L, 999L))); - - ScheduledExecutorService executorService = - Executors.newSingleThreadScheduledExecutor(); - DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(client) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(100000) - .maxSpeculativeTimeoutMs(200000) - .speculativeBackoffMultiplier(2) - .requestTimeoutMs(500000) - .flushIntervalMs(0) - .bufferSize(0) - .scheduler(executorService) - .build(); - - ByteBuffer buffer = ByteBuffer.wrap("test".getBytes(UTF_8)); - writer.write(buffer); - - verify(client, times(1)).writeRecordSet((String) any(), (LogRecordSetBuffer) any()); - - writer.close(); - } - - @Test(timeout = 20000) - public void testFlushWhenExceedMaxLogRecordSetSize() - throws Exception { - DistributedLogClient client = mock(DistributedLogClient.class); - when(client.writeRecordSet((String) any(), (LogRecordSetBuffer) any())) - .thenReturn(Future.value(new DLSN(1L, 1L, 999L))); - ScheduledExecutorService executorService = - Executors.newSingleThreadScheduledExecutor(); - DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(client) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(100000) - .maxSpeculativeTimeoutMs(200000) - .speculativeBackoffMultiplier(2) - .requestTimeoutMs(500000) - .flushIntervalMs(0) - .bufferSize(Integer.MAX_VALUE) - .scheduler(executorService) - .build(); - - byte[] data = new byte[LogRecord.MAX_LOGRECORD_SIZE - 3 * 100]; - ByteBuffer buffer1 = ByteBuffer.wrap(data); - writer.write(buffer1); - verify(client, times(0)).writeRecordSet((String) any(), (LogRecordSetBuffer) any()); - LogRecordSet.Writer recordSetWriter1 = writer.getLogRecordSetWriter(); - assertEquals(1, recordSetWriter1.getNumRecords()); - assertEquals(LogRecordSet.HEADER_LEN + 4 + data.length, recordSetWriter1.getNumBytes()); - - ByteBuffer buffer2 = ByteBuffer.wrap(data); - writer.write(buffer2); - verify(client, times(1)).writeRecordSet((String) any(), (LogRecordSetBuffer) any()); - LogRecordSet.Writer recordSetWriter2 = writer.getLogRecordSetWriter(); - assertEquals(1, recordSetWriter2.getNumRecords()); - assertEquals(LogRecordSet.HEADER_LEN + 4 + data.length, recordSetWriter2.getNumBytes()); - assertTrue(recordSetWriter1 != recordSetWriter2); - - writer.close(); - } - - @Test(timeout = 20000) - public void testWriteTooLargeRecord() throws Exception { - DistributedLogClient client = mock(DistributedLogClient.class); - DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(client) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(100000) - .maxSpeculativeTimeoutMs(200000) - .speculativeBackoffMultiplier(2) - .requestTimeoutMs(5000000) - .flushIntervalMs(0) - .bufferSize(0) - .build(); - - byte[] data = new byte[LogRecord.MAX_LOGRECORD_SIZE + 10]; - ByteBuffer buffer = ByteBuffer.wrap(data); - Future<DLSN> writeFuture = writer.write(buffer); - assertTrue(writeFuture.isDefined()); - try { - Await.result(writeFuture); - fail("Should fail on writing too long record"); - } catch (LogRecordTooLongException lrtle) { - // expected - } - writer.close(); - } - - @Test(timeout = 20000) - public void testSpeculativeWrite() throws Exception { - DistributedLogClient client = mock(DistributedLogClient.class); - DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(client) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(10) - .maxSpeculativeTimeoutMs(20) - .speculativeBackoffMultiplier(2) - .requestTimeoutMs(5000000) - .flushIntervalMs(0) - .bufferSize(0) - .build(); - - final String secondStream = writer.getStream(1); - - final DLSN dlsn = new DLSN(99L, 88L, 0L); - - Mockito.doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - Object[] arguments = invocation.getArguments(); - String stream = (String) arguments[0]; - if (stream.equals(secondStream)) { - return Future.value(dlsn); - } else { - return new Promise<DLSN>(); - } - } - }).when(client).writeRecordSet((String) any(), (LogRecordSetBuffer) any()); - - byte[] data = "test-test".getBytes(UTF_8); - ByteBuffer buffer = ByteBuffer.wrap(data); - Future<DLSN> writeFuture = writer.write(buffer); - DLSN writeDLSN = Await.result(writeFuture); - assertEquals(dlsn, writeDLSN); - writer.close(); - } - - @Test(timeout = 20000) - public void testPeriodicalFlush() throws Exception { - DistributedLogClient client = mock(DistributedLogClient.class); - DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(client) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(10) - .maxSpeculativeTimeoutMs(20) - .speculativeBackoffMultiplier(2) - .requestTimeoutMs(5000000) - .flushIntervalMs(10) - .bufferSize(Integer.MAX_VALUE) - .build(); - - final DLSN dlsn = new DLSN(99L, 88L, 0L); - - Mockito.doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - return Future.value(dlsn); - } - }).when(client).writeRecordSet((String) any(), (LogRecordSetBuffer) any()); - - byte[] data = "test-test".getBytes(UTF_8); - ByteBuffer buffer = ByteBuffer.wrap(data); - Future<DLSN> writeFuture = writer.write(buffer); - DLSN writeDLSN = Await.result(writeFuture); - assertEquals(dlsn, writeDLSN); - writer.close(); - } - - @Test(timeout = 20000) - public void testFailRequestAfterRetriedAllStreams() throws Exception { - DistributedLogClient client = mock(DistributedLogClient.class); - when(client.writeRecordSet((String) any(), (LogRecordSetBuffer) any())) - .thenReturn(new Promise<DLSN>()); - DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(client) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(10) - .maxSpeculativeTimeoutMs(20) - .speculativeBackoffMultiplier(2) - .requestTimeoutMs(5000000) - .flushIntervalMs(10) - .bufferSize(Integer.MAX_VALUE) - .build(); - - byte[] data = "test-test".getBytes(UTF_8); - ByteBuffer buffer = ByteBuffer.wrap(data); - Future<DLSN> writeFuture = writer.write(buffer); - try { - Await.result(writeFuture); - fail("Should fail the request after retries all streams"); - } catch (IndividualRequestTimeoutException e) { - long timeoutMs = e.timeout().inMilliseconds(); - assertTrue(timeoutMs >= (10 + 20) && timeoutMs < 5000000); - } - writer.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java deleted file mode 100644 index 86d1c11..0000000 --- a/distributedlog-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java +++ /dev/null @@ -1,207 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.client.ownership; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import org.apache.distributedlog.client.ClientConfig; -import com.twitter.finagle.stats.NullStatsReceiver; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.Map; -import java.util.Set; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; - -/** - * Test Case for Ownership Cache. - */ -public class TestOwnershipCache { - - @Rule - public TestName runtime = new TestName(); - - private static OwnershipCache createOwnershipCache() { - ClientConfig clientConfig = new ClientConfig(); - return new OwnershipCache(clientConfig, null, - NullStatsReceiver.get(), NullStatsReceiver.get()); - } - - private static SocketAddress createSocketAddress(int port) { - return new InetSocketAddress("127.0.0.1", port); - } - - @Test(timeout = 60000) - public void testUpdateOwner() { - OwnershipCache cache = createOwnershipCache(); - SocketAddress addr = createSocketAddress(1000); - String stream = runtime.getMethodName(); - - assertTrue("Should successfully update owner if no owner exists before", - cache.updateOwner(stream, addr)); - assertEquals("Owner should be " + addr + " for stream " + stream, - addr, cache.getOwner(stream)); - assertTrue("Should successfully update owner if old owner is same", - cache.updateOwner(stream, addr)); - assertEquals("Owner should be " + addr + " for stream " + stream, - addr, cache.getOwner(stream)); - } - - @Test(timeout = 60000) - public void testRemoveOwnerFromStream() { - OwnershipCache cache = createOwnershipCache(); - int initialPort = 2000; - int numProxies = 2; - int numStreamsPerProxy = 2; - for (int i = 0; i < numProxies; i++) { - SocketAddress addr = createSocketAddress(initialPort + i); - for (int j = 0; j < numStreamsPerProxy; j++) { - String stream = runtime.getMethodName() + "_" + i + "_" + j; - cache.updateOwner(stream, addr); - } - } - Map<String, SocketAddress> ownershipMap = cache.getStreamOwnerMapping(); - assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache", - numProxies * numStreamsPerProxy, ownershipMap.size()); - Map<SocketAddress, Set<String>> ownershipDistribution = cache.getStreamOwnershipDistribution(); - assertEquals("There should be " + numProxies + " proxies cached", - numProxies, ownershipDistribution.size()); - - String stream = runtime.getMethodName() + "_0_0"; - SocketAddress owner = createSocketAddress(initialPort); - - // remove non-existent mapping won't change anything - SocketAddress nonExistentAddr = createSocketAddress(initialPort + 999); - cache.removeOwnerFromStream(stream, nonExistentAddr, "remove-non-existent-addr"); - assertEquals("Owner " + owner + " should not be removed", - owner, cache.getOwner(stream)); - ownershipMap = cache.getStreamOwnerMapping(); - assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache", - numProxies * numStreamsPerProxy, ownershipMap.size()); - - // remove existent mapping should remove ownership mapping - cache.removeOwnerFromStream(stream, owner, "remove-owner"); - assertNull("Owner " + owner + " should be removed", cache.getOwner(stream)); - ownershipMap = cache.getStreamOwnerMapping(); - assertEquals("There should be " + (numProxies * numStreamsPerProxy - 1) + " entries left in cache", - numProxies * numStreamsPerProxy - 1, ownershipMap.size()); - ownershipDistribution = cache.getStreamOwnershipDistribution(); - assertEquals("There should still be " + numProxies + " proxies cached", - numProxies, ownershipDistribution.size()); - Set<String> ownedStreams = ownershipDistribution.get(owner); - assertEquals("There should be only " + (numStreamsPerProxy - 1) + " streams owned for " + owner, - numStreamsPerProxy - 1, ownedStreams.size()); - assertFalse("Stream " + stream + " should not be owned by " + owner, - ownedStreams.contains(stream)); - } - - @Test(timeout = 60000) - public void testRemoveAllStreamsFromOwner() { - OwnershipCache cache = createOwnershipCache(); - int initialPort = 2000; - int numProxies = 2; - int numStreamsPerProxy = 2; - for (int i = 0; i < numProxies; i++) { - SocketAddress addr = createSocketAddress(initialPort + i); - for (int j = 0; j < numStreamsPerProxy; j++) { - String stream = runtime.getMethodName() + "_" + i + "_" + j; - cache.updateOwner(stream, addr); - } - } - Map<String, SocketAddress> ownershipMap = cache.getStreamOwnerMapping(); - assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache", - numProxies * numStreamsPerProxy, ownershipMap.size()); - Map<SocketAddress, Set<String>> ownershipDistribution = cache.getStreamOwnershipDistribution(); - assertEquals("There should be " + numProxies + " proxies cached", - numProxies, ownershipDistribution.size()); - - SocketAddress owner = createSocketAddress(initialPort); - - // remove non-existent host won't change anything - SocketAddress nonExistentAddr = createSocketAddress(initialPort + 999); - cache.removeAllStreamsFromOwner(nonExistentAddr); - ownershipMap = cache.getStreamOwnerMapping(); - assertEquals("There should still be " + (numProxies * numStreamsPerProxy) + " entries in cache", - numProxies * numStreamsPerProxy, ownershipMap.size()); - ownershipDistribution = cache.getStreamOwnershipDistribution(); - assertEquals("There should still be " + numProxies + " proxies cached", - numProxies, ownershipDistribution.size()); - - // remove existent host should remove ownership mapping - cache.removeAllStreamsFromOwner(owner); - ownershipMap = cache.getStreamOwnerMapping(); - assertEquals("There should be " + ((numProxies - 1) * numStreamsPerProxy) + " entries left in cache", - (numProxies - 1) * numStreamsPerProxy, ownershipMap.size()); - ownershipDistribution = cache.getStreamOwnershipDistribution(); - assertEquals("There should be " + (numProxies - 1) + " proxies cached", - numProxies - 1, ownershipDistribution.size()); - assertFalse("Host " + owner + " should not be cached", - ownershipDistribution.containsKey(owner)); - } - - @Test(timeout = 60000) - public void testReplaceOwner() { - OwnershipCache cache = createOwnershipCache(); - int initialPort = 2000; - int numProxies = 2; - int numStreamsPerProxy = 2; - for (int i = 0; i < numProxies; i++) { - SocketAddress addr = createSocketAddress(initialPort + i); - for (int j = 0; j < numStreamsPerProxy; j++) { - String stream = runtime.getMethodName() + "_" + i + "_" + j; - cache.updateOwner(stream, addr); - } - } - Map<String, SocketAddress> ownershipMap = cache.getStreamOwnerMapping(); - assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache", - numProxies * numStreamsPerProxy, ownershipMap.size()); - Map<SocketAddress, Set<String>> ownershipDistribution = cache.getStreamOwnershipDistribution(); - assertEquals("There should be " + numProxies + " proxies cached", - numProxies, ownershipDistribution.size()); - - String stream = runtime.getMethodName() + "_0_0"; - SocketAddress oldOwner = createSocketAddress(initialPort); - SocketAddress newOwner = createSocketAddress(initialPort + 999); - - cache.updateOwner(stream, newOwner); - assertEquals("Owner of " + stream + " should be changed from " + oldOwner + " to " + newOwner, - newOwner, cache.getOwner(stream)); - ownershipMap = cache.getStreamOwnerMapping(); - assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache", - numProxies * numStreamsPerProxy, ownershipMap.size()); - assertEquals("Owner of " + stream + " should be " + newOwner, - newOwner, ownershipMap.get(stream)); - ownershipDistribution = cache.getStreamOwnershipDistribution(); - assertEquals("There should be " + (numProxies + 1) + " proxies cached", - numProxies + 1, ownershipDistribution.size()); - Set<String> oldOwnedStreams = ownershipDistribution.get(oldOwner); - assertEquals("There should be only " + (numStreamsPerProxy - 1) + " streams owned by " + oldOwner, - numStreamsPerProxy - 1, oldOwnedStreams.size()); - assertFalse("Stream " + stream + " should not be owned by " + oldOwner, - oldOwnedStreams.contains(stream)); - Set<String> newOwnedStreams = ownershipDistribution.get(newOwner); - assertEquals("There should be only " + (numStreamsPerProxy - 1) + " streams owned by " + newOwner, - 1, newOwnedStreams.size()); - assertTrue("Stream " + stream + " should be owned by " + newOwner, - newOwnedStreams.contains(stream)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java deleted file mode 100644 index 8ef33bd..0000000 --- a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.client.proxy; - -import org.apache.distributedlog.thrift.service.BulkWriteResponse; -import org.apache.distributedlog.thrift.service.ClientInfo; -import org.apache.distributedlog.thrift.service.DistributedLogService; -import org.apache.distributedlog.thrift.service.HeartbeatOptions; -import org.apache.distributedlog.thrift.service.ServerInfo; -import org.apache.distributedlog.thrift.service.WriteContext; -import org.apache.distributedlog.thrift.service.WriteResponse; -import com.twitter.util.Future; -import java.nio.ByteBuffer; -import java.util.List; - -/** - * Mock DistributedLog Related Services. - */ -public class MockDistributedLogServices { - - /** - * Mock basic service. - */ - static class MockBasicService implements DistributedLogService.ServiceIface { - - @Override - public Future<ServerInfo> handshake() { - return Future.value(new ServerInfo()); - } - - @Override - public Future<ServerInfo> handshakeWithClientInfo(ClientInfo clientInfo) { - return Future.value(new ServerInfo()); - } - - @Override - public Future<WriteResponse> heartbeat(String stream, WriteContext ctx) { - return Future.value(new WriteResponse()); - } - - @Override - public Future<WriteResponse> heartbeatWithOptions(String stream, - WriteContext ctx, - HeartbeatOptions options) { - return Future.value(new WriteResponse()); - } - - @Override - public Future<WriteResponse> write(String stream, - ByteBuffer data) { - return Future.value(new WriteResponse()); - } - - @Override - public Future<WriteResponse> writeWithContext(String stream, - ByteBuffer data, - WriteContext ctx) { - return Future.value(new WriteResponse()); - } - - @Override - public Future<BulkWriteResponse> writeBulkWithContext(String stream, - List<ByteBuffer> data, - WriteContext ctx) { - return Future.value(new BulkWriteResponse()); - } - - @Override - public Future<WriteResponse> truncate(String stream, - String dlsn, - WriteContext ctx) { - return Future.value(new WriteResponse()); - } - - @Override - public Future<WriteResponse> release(String stream, - WriteContext ctx) { - return Future.value(new WriteResponse()); - } - - @Override - public Future<WriteResponse> create(String stream, WriteContext ctx) { - return Future.value(new WriteResponse()); - } - - @Override - public Future<WriteResponse> delete(String stream, - WriteContext ctx) { - return Future.value(new WriteResponse()); - } - - @Override - public Future<WriteResponse> getOwner(String stream, WriteContext ctx) { - return Future.value(new WriteResponse()); - } - - @Override - public Future<Void> setAcceptNewStream(boolean enabled) { - return Future.value(null); - } - } - - /** - * Mock server info service. - */ - public static class MockServerInfoService extends MockBasicService { - - protected ServerInfo serverInfo; - - public MockServerInfoService() { - serverInfo = new ServerInfo(); - } - - public void updateServerInfo(ServerInfo serverInfo) { - this.serverInfo = serverInfo; - } - - @Override - public Future<ServerInfo> handshake() { - return Future.value(serverInfo); - } - - @Override - public Future<ServerInfo> handshakeWithClientInfo(ClientInfo clientInfo) { - return Future.value(serverInfo); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java deleted file mode 100644 index e38c2ed..0000000 --- a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.client.proxy; - -import org.apache.distributedlog.thrift.service.DistributedLogService; -import java.net.SocketAddress; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * Mock Proxy Client Builder. - */ -class MockProxyClientBuilder implements ProxyClient.Builder { - - static class MockProxyClient extends ProxyClient { - MockProxyClient(SocketAddress address, - DistributedLogService.ServiceIface service) { - super(address, new MockThriftClient(), service); - } - } - - private final ConcurrentMap<SocketAddress, MockProxyClient> clients = - new ConcurrentHashMap<SocketAddress, MockProxyClient>(); - - public void provideProxyClient(SocketAddress address, - MockProxyClient proxyClient) { - clients.put(address, proxyClient); - } - - @Override - public ProxyClient build(SocketAddress address) { - return clients.get(address); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java deleted file mode 100644 index ad1c878..0000000 --- a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.client.proxy; - -import com.twitter.finagle.Service; -import com.twitter.finagle.thrift.ThriftClientRequest; -import com.twitter.util.Future; - -/** - * Mock Thrift Client. - */ -class MockThriftClient extends Service<ThriftClientRequest, byte[]> { - @Override - public Future<byte[]> apply(ThriftClientRequest request) { - return Future.value(request.message); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java deleted file mode 100644 index 6d9a471..0000000 --- a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java +++ /dev/null @@ -1,368 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.client.proxy; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.distributedlog.client.ClientConfig; -import org.apache.distributedlog.client.proxy.MockDistributedLogServices.MockBasicService; -import org.apache.distributedlog.client.proxy.MockDistributedLogServices.MockServerInfoService; -import org.apache.distributedlog.client.proxy.MockProxyClientBuilder.MockProxyClient; -import org.apache.distributedlog.client.resolver.DefaultRegionResolver; -import org.apache.distributedlog.client.stats.ClientStats; -import org.apache.distributedlog.thrift.service.ServerInfo; -import com.twitter.finagle.stats.NullStatsReceiver; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.commons.lang3.tuple.Pair; -import org.jboss.netty.util.HashedWheelTimer; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; - -/** - * Test Proxy Client Manager. - */ -public class TestProxyClientManager { - - @Rule - public TestName runtime = new TestName(); - - static class TestHostProvider implements HostProvider { - - Set<SocketAddress> hosts = new HashSet<SocketAddress>(); - - synchronized void addHost(SocketAddress host) { - hosts.add(host); - } - - @Override - public synchronized Set<SocketAddress> getHosts() { - return ImmutableSet.copyOf(hosts); - } - - } - - private static ProxyClientManager createProxyClientManager(ProxyClient.Builder builder, - long periodicHandshakeIntervalMs) { - HostProvider provider = new TestHostProvider(); - return createProxyClientManager(builder, provider, periodicHandshakeIntervalMs); - } - - private static ProxyClientManager createProxyClientManager(ProxyClient.Builder builder, - HostProvider hostProvider, - long periodicHandshakeIntervalMs) { - ClientConfig clientConfig = new ClientConfig(); - clientConfig.setPeriodicHandshakeIntervalMs(periodicHandshakeIntervalMs); - clientConfig.setPeriodicOwnershipSyncIntervalMs(-1); - HashedWheelTimer dlTimer = new HashedWheelTimer( - new ThreadFactoryBuilder().setNameFormat("TestProxyClientManager-timer-%d").build(), - clientConfig.getRedirectBackoffStartMs(), - TimeUnit.MILLISECONDS); - return new ProxyClientManager(clientConfig, builder, dlTimer, hostProvider, - new ClientStats(NullStatsReceiver.get(), false, new DefaultRegionResolver())); - } - - private static SocketAddress createSocketAddress(int port) { - return new InetSocketAddress("127.0.0.1", port); - } - - private static MockProxyClient createMockProxyClient(SocketAddress address) { - return new MockProxyClient(address, new MockBasicService()); - } - - private static Pair<MockProxyClient, MockServerInfoService> createMockProxyClient( - SocketAddress address, ServerInfo serverInfo) { - MockServerInfoService service = new MockServerInfoService(); - MockProxyClient proxyClient = new MockProxyClient(address, service); - service.updateServerInfo(serverInfo); - return Pair.of(proxyClient, service); - } - - @Test(timeout = 60000) - public void testBasicCreateRemove() throws Exception { - SocketAddress address = createSocketAddress(1000); - MockProxyClientBuilder builder = new MockProxyClientBuilder(); - MockProxyClient mockProxyClient = createMockProxyClient(address); - builder.provideProxyClient(address, mockProxyClient); - - ProxyClientManager clientManager = createProxyClientManager(builder, 0L); - assertEquals("There should be no clients in the manager", - 0, clientManager.getNumProxies()); - ProxyClient proxyClient = clientManager.createClient(address); - assertEquals("Create client should build the proxy client", - 1, clientManager.getNumProxies()); - assertTrue("The client returned should be the same client that builder built", - mockProxyClient == proxyClient); - } - - @Test(timeout = 60000) - public void testGetShouldCreateClient() throws Exception { - SocketAddress address = createSocketAddress(2000); - MockProxyClientBuilder builder = new MockProxyClientBuilder(); - MockProxyClient mockProxyClient = createMockProxyClient(address); - builder.provideProxyClient(address, mockProxyClient); - - ProxyClientManager clientManager = createProxyClientManager(builder, 0L); - assertEquals("There should be no clients in the manager", - 0, clientManager.getNumProxies()); - ProxyClient proxyClient = clientManager.getClient(address); - assertEquals("Get client should build the proxy client", - 1, clientManager.getNumProxies()); - assertTrue("The client returned should be the same client that builder built", - mockProxyClient == proxyClient); - } - - @Test(timeout = 60000) - public void testConditionalRemoveClient() throws Exception { - SocketAddress address = createSocketAddress(3000); - MockProxyClientBuilder builder = new MockProxyClientBuilder(); - MockProxyClient mockProxyClient = createMockProxyClient(address); - MockProxyClient anotherMockProxyClient = createMockProxyClient(address); - builder.provideProxyClient(address, mockProxyClient); - - ProxyClientManager clientManager = createProxyClientManager(builder, 0L); - assertEquals("There should be no clients in the manager", - 0, clientManager.getNumProxies()); - clientManager.createClient(address); - assertEquals("Create client should build the proxy client", - 1, clientManager.getNumProxies()); - clientManager.removeClient(address, anotherMockProxyClient); - assertEquals("Conditional remove should not remove proxy client", - 1, clientManager.getNumProxies()); - clientManager.removeClient(address, mockProxyClient); - assertEquals("Conditional remove should remove proxy client", - 0, clientManager.getNumProxies()); - } - - @Test(timeout = 60000) - public void testRemoveClient() throws Exception { - SocketAddress address = createSocketAddress(3000); - MockProxyClientBuilder builder = new MockProxyClientBuilder(); - MockProxyClient mockProxyClient = createMockProxyClient(address); - builder.provideProxyClient(address, mockProxyClient); - - ProxyClientManager clientManager = createProxyClientManager(builder, 0L); - assertEquals("There should be no clients in the manager", - 0, clientManager.getNumProxies()); - clientManager.createClient(address); - assertEquals("Create client should build the proxy client", - 1, clientManager.getNumProxies()); - clientManager.removeClient(address); - assertEquals("Remove should remove proxy client", - 0, clientManager.getNumProxies()); - } - - @Test(timeout = 60000) - public void testCreateClientShouldHandshake() throws Exception { - SocketAddress address = createSocketAddress(3000); - MockProxyClientBuilder builder = new MockProxyClientBuilder(); - ServerInfo serverInfo = new ServerInfo(); - serverInfo.putToOwnerships(runtime.getMethodName() + "_stream", - runtime.getMethodName() + "_owner"); - Pair<MockProxyClient, MockServerInfoService> mockProxyClient = - createMockProxyClient(address, serverInfo); - builder.provideProxyClient(address, mockProxyClient.getLeft()); - - final AtomicReference<ServerInfo> resultHolder = new AtomicReference<ServerInfo>(null); - final CountDownLatch doneLatch = new CountDownLatch(1); - ProxyListener listener = new ProxyListener() { - @Override - public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) { - resultHolder.set(serverInfo); - doneLatch.countDown(); - } - @Override - public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) { - } - }; - - ProxyClientManager clientManager = createProxyClientManager(builder, 0L); - clientManager.registerProxyListener(listener); - assertEquals("There should be no clients in the manager", - 0, clientManager.getNumProxies()); - clientManager.createClient(address); - assertEquals("Create client should build the proxy client", - 1, clientManager.getNumProxies()); - - // When a client is created, it would handshake with that proxy - doneLatch.await(); - assertEquals("Handshake should return server info", - serverInfo, resultHolder.get()); - } - - @Test(timeout = 60000) - public void testHandshake() throws Exception { - final int numHosts = 3; - final int numStreamsPerHost = 3; - final int initialPort = 4000; - - MockProxyClientBuilder builder = new MockProxyClientBuilder(); - Map<SocketAddress, ServerInfo> serverInfoMap = - new HashMap<SocketAddress, ServerInfo>(); - for (int i = 0; i < numHosts; i++) { - SocketAddress address = createSocketAddress(initialPort + i); - ServerInfo serverInfo = new ServerInfo(); - for (int j = 0; j < numStreamsPerHost; j++) { - serverInfo.putToOwnerships(runtime.getMethodName() + "_stream_" + j, - address.toString()); - } - Pair<MockProxyClient, MockServerInfoService> mockProxyClient = - createMockProxyClient(address, serverInfo); - builder.provideProxyClient(address, mockProxyClient.getLeft()); - serverInfoMap.put(address, serverInfo); - } - - final Map<SocketAddress, ServerInfo> results = new HashMap<SocketAddress, ServerInfo>(); - final CountDownLatch doneLatch = new CountDownLatch(2 * numHosts); - ProxyListener listener = new ProxyListener() { - @Override - public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) { - synchronized (results) { - results.put(address, serverInfo); - } - doneLatch.countDown(); - } - - @Override - public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) { - } - }; - - TestHostProvider rs = new TestHostProvider(); - ProxyClientManager clientManager = createProxyClientManager(builder, rs, 0L); - clientManager.registerProxyListener(listener); - assertEquals("There should be no clients in the manager", - 0, clientManager.getNumProxies()); - for (int i = 0; i < numHosts; i++) { - rs.addHost(createSocketAddress(initialPort + i)); - } - // handshake would handshake with 3 hosts again - clientManager.handshake(); - doneLatch.await(); - assertEquals("Handshake should return server info", - numHosts, results.size()); - assertTrue("Handshake should get all server infos", - Maps.difference(serverInfoMap, results).areEqual()); - } - - @Test(timeout = 60000) - public void testPeriodicHandshake() throws Exception { - final int numHosts = 3; - final int numStreamsPerHost = 3; - final int initialPort = 5000; - - MockProxyClientBuilder builder = new MockProxyClientBuilder(); - Map<SocketAddress, ServerInfo> serverInfoMap = - new HashMap<SocketAddress, ServerInfo>(); - Map<SocketAddress, MockServerInfoService> mockServiceMap = - new HashMap<SocketAddress, MockServerInfoService>(); - final Map<SocketAddress, CountDownLatch> hostDoneLatches = - new HashMap<SocketAddress, CountDownLatch>(); - for (int i = 0; i < numHosts; i++) { - SocketAddress address = createSocketAddress(initialPort + i); - ServerInfo serverInfo = new ServerInfo(); - for (int j = 0; j < numStreamsPerHost; j++) { - serverInfo.putToOwnerships(runtime.getMethodName() + "_stream_" + j, - address.toString()); - } - Pair<MockProxyClient, MockServerInfoService> mockProxyClient = - createMockProxyClient(address, serverInfo); - builder.provideProxyClient(address, mockProxyClient.getLeft()); - serverInfoMap.put(address, serverInfo); - mockServiceMap.put(address, mockProxyClient.getRight()); - hostDoneLatches.put(address, new CountDownLatch(2)); - } - - final Map<SocketAddress, ServerInfo> results = new HashMap<SocketAddress, ServerInfo>(); - final CountDownLatch doneLatch = new CountDownLatch(numHosts); - ProxyListener listener = new ProxyListener() { - @Override - public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) { - synchronized (results) { - results.put(address, serverInfo); - CountDownLatch latch = hostDoneLatches.get(address); - if (null != latch) { - latch.countDown(); - } - } - doneLatch.countDown(); - } - - @Override - public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) { - } - }; - - TestHostProvider rs = new TestHostProvider(); - ProxyClientManager clientManager = createProxyClientManager(builder, rs, 50L); - clientManager.setPeriodicHandshakeEnabled(false); - clientManager.registerProxyListener(listener); - - assertEquals("There should be no clients in the manager", - 0, clientManager.getNumProxies()); - for (int i = 0; i < numHosts; i++) { - SocketAddress address = createSocketAddress(initialPort + i); - rs.addHost(address); - clientManager.createClient(address); - } - - // make sure the first 3 handshakes going through - doneLatch.await(); - - assertEquals("Handshake should return server info", - numHosts, results.size()); - assertTrue("Handshake should get all server infos", - Maps.difference(serverInfoMap, results).areEqual()); - - // update server info - for (int i = 0; i < numHosts; i++) { - SocketAddress address = createSocketAddress(initialPort + i); - ServerInfo serverInfo = new ServerInfo(); - for (int j = 0; j < numStreamsPerHost; j++) { - serverInfo.putToOwnerships(runtime.getMethodName() + "_new_stream_" + j, - address.toString()); - } - MockServerInfoService service = mockServiceMap.get(address); - serverInfoMap.put(address, serverInfo); - service.updateServerInfo(serverInfo); - } - - clientManager.setPeriodicHandshakeEnabled(true); - for (int i = 0; i < numHosts; i++) { - SocketAddress address = createSocketAddress(initialPort + i); - CountDownLatch latch = hostDoneLatches.get(address); - latch.await(); - } - - assertTrue("Periodic handshake should update all server infos", - Maps.difference(serverInfoMap, results).areEqual()); - } - -}