DL-131: Enable checkstyle for distributedlog-client module Author: Xi Liu <[email protected]>
Reviewers: Sijie Guo <[email protected]> Closes #87 from xiliuant/xi/checkstyle_client Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/367335d5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/367335d5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/367335d5 Branch: refs/heads/master Commit: 367335d5e7811c2677801919f061b043bf087258 Parents: cf9bcd2 Author: Xi Liu <[email protected]> Authored: Tue Dec 27 09:06:24 2016 -0800 Committer: Sijie Guo <[email protected]> Committed: Tue Dec 27 09:06:24 2016 -0800 ---------------------------------------------------------------------- distributedlog-client/pom.xml | 33 ++++ .../distributedlog/client/ClientConfig.java | 6 +- .../client/DistributedLogClientImpl.java | 72 ++++--- .../client/DistributedLogMultiStreamWriter.java | 108 ++++++----- .../client/monitor/MonitorServiceClient.java | 10 +- .../client/monitor/package-info.java | 21 ++ .../client/ownership/OwnershipCache.java | 17 +- .../client/ownership/package-info.java | 21 ++ .../distributedlog/client/package-info.java | 2 +- .../client/proxy/ProxyClient.java | 13 +- .../client/proxy/ProxyClientManager.java | 30 +-- .../client/proxy/ProxyListener.java | 3 +- .../client/proxy/package-info.java | 21 ++ .../client/resolver/DefaultRegionResolver.java | 3 + .../client/resolver/package-info.java | 21 ++ .../routing/ConsistentHashRoutingService.java | 67 ++++--- .../client/routing/NameServerSet.java | 42 ++-- .../client/routing/RegionsRoutingService.java | 54 ++++-- .../client/routing/RoutingService.java | 16 +- .../client/routing/RoutingUtils.java | 5 +- .../client/routing/ServerSetRoutingService.java | 29 +-- .../client/routing/ServerSetWatcher.java | 23 ++- .../routing/SingleHostRoutingService.java | 23 ++- .../distributedlog/client/routing/TestName.java | 20 +- .../client/routing/TwitterServerSetWatcher.java | 17 +- .../client/routing/package-info.java | 21 ++ .../client/serverset/DLZkServerSet.java | 5 +- .../client/serverset/package-info.java | 21 ++ ...efaultSpeculativeRequestExecutionPolicy.java | 20 +- .../SpeculativeRequestExecutionPolicy.java | 5 +- .../speculative/SpeculativeRequestExecutor.java | 8 +- .../client/speculative/package-info.java | 21 ++ .../client/stats/ClientStats.java | 3 +- .../client/stats/ClientStatsLogger.java | 2 +- .../distributedlog/client/stats/OpStats.java | 3 +- .../client/stats/OpStatsLogger.java | 2 +- .../client/stats/OwnershipStatsLogger.java | 6 +- .../client/stats/package-info.java | 21 ++ .../distributedlog/service/DLSocketAddress.java | 9 +- .../service/DistributedLogClient.java | 10 +- .../service/DistributedLogClientBuilder.java | 190 ++++++++++--------- .../distributedlog/service/package-info.java | 21 ++ .../TestDistributedLogMultiStreamWriter.java | 25 ++- .../client/ownership/TestOwnershipCache.java | 16 +- .../proxy/MockDistributedLogServices.java | 10 +- .../client/proxy/MockProxyClientBuilder.java | 3 +- .../client/proxy/MockThriftClient.java | 4 +- .../client/proxy/TestProxyClientManager.java | 18 +- .../TestConsistentHashRoutingService.java | 20 +- .../client/routing/TestInetNameResolution.java | 20 +- .../routing/TestRegionsRoutingService.java | 12 +- .../client/routing/TestRoutingService.java | 23 ++- ...efaultSpeculativeRequestExecutionPolicy.java | 15 +- .../TestDistributedLogClientBuilder.java | 8 +- 54 files changed, 804 insertions(+), 415 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/pom.xml ---------------------------------------------------------------------- diff --git a/distributedlog-client/pom.xml b/distributedlog-client/pom.xml index a65e739..1e88672 100644 --- a/distributedlog-client/pom.xml +++ b/distributedlog-client/pom.xml @@ -129,6 +129,39 @@ <forkedProcessTimeoutInSeconds>1800</forkedProcessTimeoutInSeconds> </configuration> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>2.17</version> + <dependencies> + <dependency> + <groupId>com.puppycrawl.tools</groupId> + <artifactId>checkstyle</artifactId> + <version>6.19</version> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>distributedlog-build-tools</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + <configuration> + <configLocation>distributedlog/checkstyle.xml</configLocation> + <suppressionsLocation>distributedlog/suppressions.xml</suppressionsLocation> + <consoleOutput>true</consoleOutput> + <failOnViolation>true</failOnViolation> + <includeResources>false</includeResources> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + </configuration> + <executions> + <execution> + <phase>test-compile</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> </project> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ClientConfig.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ClientConfig.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ClientConfig.java index 4c0a933..de74f5a 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ClientConfig.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ClientConfig.java @@ -17,12 +17,12 @@ */ package com.twitter.distributedlog.client; -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkNotNull; import java.util.concurrent.TimeUnit; /** - * Client Config + * Client Config. */ public class ClientConfig { int redirectBackoffStartMs = 25; @@ -95,7 +95,7 @@ public class ClientConfig { } public ClientConfig setStreamNameRegex(String nameRegex) { - Preconditions.checkNotNull(nameRegex); + checkNotNull(nameRegex); this.streamNameRegex = nameRegex; return this; } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java index dc33003..d3edb74 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java @@ -24,13 +24,12 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.twitter.distributedlog.DLSN; import com.twitter.distributedlog.LogRecordSetBuffer; -import com.twitter.distributedlog.util.ProtocolUtils; +import com.twitter.distributedlog.client.monitor.MonitorServiceClient; +import com.twitter.distributedlog.client.ownership.OwnershipCache; import com.twitter.distributedlog.client.proxy.HostProvider; import com.twitter.distributedlog.client.proxy.ProxyClient; import com.twitter.distributedlog.client.proxy.ProxyClientManager; import com.twitter.distributedlog.client.proxy.ProxyListener; -import com.twitter.distributedlog.client.monitor.MonitorServiceClient; -import com.twitter.distributedlog.client.ownership.OwnershipCache; import com.twitter.distributedlog.client.resolver.RegionResolver; import com.twitter.distributedlog.client.routing.RoutingService; import com.twitter.distributedlog.client.routing.RoutingService.RoutingContext; @@ -49,6 +48,7 @@ import com.twitter.distributedlog.thrift.service.ServerStatus; import com.twitter.distributedlog.thrift.service.StatusCode; import com.twitter.distributedlog.thrift.service.WriteContext; import com.twitter.distributedlog.thrift.service.WriteResponse; +import com.twitter.distributedlog.util.ProtocolUtils; import com.twitter.finagle.CancelledRequestException; import com.twitter.finagle.ConnectionFailedException; import com.twitter.finagle.Failure; @@ -66,16 +66,6 @@ import com.twitter.util.Function0; import com.twitter.util.Future; import com.twitter.util.FutureEventListener; import com.twitter.util.Promise; -import org.apache.thrift.TApplicationException; -import org.jboss.netty.channel.ChannelException; -import org.jboss.netty.util.HashedWheelTimer; -import org.jboss.netty.util.Timeout; -import org.jboss.netty.util.TimerTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.collection.Seq; -import scala.runtime.AbstractFunction1; - import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -91,14 +81,24 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.thrift.TApplicationException; +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.util.HashedWheelTimer; +import org.jboss.netty.util.Timeout; +import org.jboss.netty.util.TimerTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.Seq; +import scala.runtime.AbstractFunction1; + /** - * Implementation of distributedlog client + * Implementation of distributedlog client. */ public class DistributedLogClientImpl implements DistributedLogClient, MonitorServiceClient, RoutingService.RoutingListener, ProxyListener, HostProvider { - static final Logger logger = LoggerFactory.getLogger(DistributedLogClientImpl.class); + private static final Logger logger = LoggerFactory.getLogger(DistributedLogClientImpl.class); private final String clientName; private final ClientId clientId; @@ -142,13 +142,13 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe void send(SocketAddress address) { long elapsedMs = stopwatch.elapsed(TimeUnit.MILLISECONDS); - if (clientConfig.getMaxRedirects() > 0 && - tries.get() >= clientConfig.getMaxRedirects()) { + if (clientConfig.getMaxRedirects() > 0 + && tries.get() >= clientConfig.getMaxRedirects()) { fail(address, new RequestTimeoutException(Duration.fromMilliseconds(elapsedMs), "Exhausted max redirects in " + elapsedMs + " ms")); return; - } else if (clientConfig.getRequestTimeoutMs() > 0 && - elapsedMs >= clientConfig.getRequestTimeoutMs()) { + } else if (clientConfig.getRequestTimeoutMs() > 0 + && elapsedMs >= clientConfig.getRequestTimeoutMs()) { fail(address, new RequestTimeoutException(Duration.fromMilliseconds(elapsedMs), "Exhausted max request timeout " + clientConfig.getRequestTimeoutMs() + " in " + elapsedMs + " ms")); @@ -203,7 +203,7 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe } @Override - synchronized public void run(Timeout timeout) throws Exception { + public synchronized void run(Timeout timeout) throws Exception { if (!timeout.isCancelled() && null != nextAddressToSend) { doSend(nextAddressToSend); } else { @@ -231,18 +231,20 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe @Override Future<ResponseHeader> sendRequest(final ProxyClient sc) { - return sc.getService().writeBulkWithContext(stream, data, ctx).addEventListener(new FutureEventListener<BulkWriteResponse>() { + return sc.getService().writeBulkWithContext(stream, data, ctx) + .addEventListener(new FutureEventListener<BulkWriteResponse>() { @Override public void onSuccess(BulkWriteResponse response) { // For non-success case, the ResponseHeader handler (the caller) will handle it. - // Note success in this case means no finagle errors have occurred (such as finagle connection issues). - // In general code != SUCCESS means there's some error reported by dlog service. The caller will handle such - // errors. + // Note success in this case means no finagle errors have occurred + // (such as finagle connection issues). In general code != SUCCESS means there's some error + // reported by dlog service. The caller will handle such errors. if (response.getHeader().getCode() == StatusCode.SUCCESS) { beforeComplete(sc, response.getHeader()); BulkWriteOp.this.complete(sc.getAddress(), response); if (response.getWriteResponses().size() == 0 && data.size() > 0) { - logger.error("non-empty bulk write got back empty response without failure for stream {}", stream); + logger.error("non-empty bulk write got back empty response without failure for stream {}", + stream); } } } @@ -277,7 +279,8 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe // Should never happen, but just in case so there's some record. if (bulkWriteResponse.getWriteResponses().size() != data.size()) { - logger.error("wrong number of results, response = {} records = ", bulkWriteResponse.getWriteResponses().size(), data.size()); + logger.error("wrong number of results, response = {} records = {}", + bulkWriteResponse.getWriteResponses().size(), data.size()); } } @@ -597,8 +600,15 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe } }); - logger.info("Build distributedlog client : name = {}, client_id = {}, routing_service = {}, stats_receiver = {}, thriftmux = {}", - new Object[] { name, clientId, routingService.getClass(), statsReceiver.getClass(), clientConfig.getThriftMux() }); + logger.info("Build distributedlog client : name = {}, client_id = {}, routing_service = {}," + + " stats_receiver = {}, thriftmux = {}", + new Object[] { + name, + clientId, + routingService.getClass(), + statsReceiver.getClass(), + clientConfig.getThriftMux() + }); } @Override @@ -612,9 +622,9 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe @Override public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) { - if (null != serverInfo && - serverInfo.isSetServerStatus() && - ServerStatus.DOWN == serverInfo.getServerStatus()) { + if (null != serverInfo + && serverInfo.isSetServerStatus() + && ServerStatus.DOWN == serverInfo.getServerStatus()) { logger.info("{} is detected as DOWN during handshaking", address); // server is shutting down handleServiceUnavailable(address, client, Optional.<StreamOp>absent()); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogMultiStreamWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogMultiStreamWriter.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogMultiStreamWriter.java index eac8f9d..8ccbbfc 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogMultiStreamWriter.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogMultiStreamWriter.java @@ -17,7 +17,11 @@ */ package com.twitter.distributedlog.client; -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE; +import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE; + import com.google.common.base.Stopwatch; import com.google.common.base.Ticker; import com.google.common.collect.Lists; @@ -37,7 +41,6 @@ import com.twitter.util.Duration; import com.twitter.util.Future; import com.twitter.util.FutureEventListener; import com.twitter.util.Promise; - import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; @@ -47,30 +50,36 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import static com.twitter.distributedlog.LogRecord.*; - /** - * Write to multiple streams + * Write to multiple streams. */ public class DistributedLogMultiStreamWriter implements Runnable { + /** + * Create a new builder to create a multi stream writer. + * + * @return a new builder to create a multi stream writer. + */ public static Builder newBuilder() { return new Builder(); } + /** + * Builder for the multi stream writer. + */ public static class Builder { - private DistributedLogClient _client = null; - private List<String> _streams = null; - private int _bufferSize = 16 * 1024; // 16k - private long _flushIntervalMicros = 2000; // 2ms - private CompressionCodec.Type _codec = CompressionCodec.Type.NONE; - private ScheduledExecutorService _executorService = null; - private long _requestTimeoutMs = 500; // 500ms - private int _firstSpeculativeTimeoutMs = 50; // 50ms - private int _maxSpeculativeTimeoutMs = 200; // 200ms - private float _speculativeBackoffMultiplier = 2; - private Ticker _ticker = Ticker.systemTicker(); + private DistributedLogClient client = null; + private List<String> streams = null; + private int bufferSize = 16 * 1024; // 16k + private long flushIntervalMicros = 2000; // 2ms + private CompressionCodec.Type codec = CompressionCodec.Type.NONE; + private ScheduledExecutorService executorService = null; + private long requestTimeoutMs = 500; // 500ms + private int firstSpeculativeTimeoutMs = 50; // 50ms + private int maxSpeculativeTimeoutMs = 200; // 200ms + private float speculativeBackoffMultiplier = 2; + private Ticker ticker = Ticker.systemTicker(); private Builder() {} @@ -82,7 +91,7 @@ public class DistributedLogMultiStreamWriter implements Runnable { * @return builder */ public Builder client(DistributedLogClient client) { - this._client = client; + this.client = client; return this; } @@ -94,12 +103,13 @@ public class DistributedLogMultiStreamWriter implements Runnable { * @return builder */ public Builder streams(List<String> streams) { - this._streams = streams; + this.streams = streams; return this; } /** * Set the output buffer size. + * * <p>If output buffer size is 0, the writes will be transmitted to * wire immediately. * @@ -108,7 +118,7 @@ public class DistributedLogMultiStreamWriter implements Runnable { * @return builder */ public Builder bufferSize(int bufferSize) { - this._bufferSize = bufferSize; + this.bufferSize = bufferSize; return this; } @@ -120,7 +130,7 @@ public class DistributedLogMultiStreamWriter implements Runnable { * @return builder */ public Builder flushIntervalMs(int flushIntervalMs) { - this._flushIntervalMicros = TimeUnit.MILLISECONDS.toMicros(flushIntervalMs); + this.flushIntervalMicros = TimeUnit.MILLISECONDS.toMicros(flushIntervalMs); return this; } @@ -132,7 +142,7 @@ public class DistributedLogMultiStreamWriter implements Runnable { * @return builder */ public Builder flushIntervalMicros(int flushIntervalMicros) { - this._flushIntervalMicros = flushIntervalMicros; + this.flushIntervalMicros = flushIntervalMicros; return this; } @@ -143,7 +153,7 @@ public class DistributedLogMultiStreamWriter implements Runnable { * @return builder */ public Builder compressionCodec(CompressionCodec.Type codec) { - this._codec = codec; + this.codec = codec; return this; } @@ -155,7 +165,7 @@ public class DistributedLogMultiStreamWriter implements Runnable { * @return builder */ public Builder scheduler(ScheduledExecutorService executorService) { - this._executorService = executorService; + this.executorService = executorService; return this; } @@ -167,17 +177,19 @@ public class DistributedLogMultiStreamWriter implements Runnable { * @return builder */ public Builder requestTimeoutMs(long requestTimeoutMs) { - this._requestTimeoutMs = requestTimeoutMs; + this.requestTimeoutMs = requestTimeoutMs; return this; } /** * Set the first speculative timeout in milliseconds. + * * <p>The multi-streams writer does speculative writes on streams. * The write issues first write request to a stream, if the write request * doesn't respond within speculative timeout. it issues next write request * to a different stream. It does such speculative retries until receive * a success or request timeout ({@link #requestTimeoutMs(long)}). + * * <p>This setting is to configure the first speculative timeout, in milliseconds. * * @param timeoutMs @@ -185,17 +197,19 @@ public class DistributedLogMultiStreamWriter implements Runnable { * @return builder */ public Builder firstSpeculativeTimeoutMs(int timeoutMs) { - this._firstSpeculativeTimeoutMs = timeoutMs; + this.firstSpeculativeTimeoutMs = timeoutMs; return this; } /** * Set the max speculative timeout in milliseconds. + * * <p>The multi-streams writer does speculative writes on streams. * The write issues first write request to a stream, if the write request * doesn't respond within speculative timeout. it issues next write request * to a different stream. It does such speculative retries until receive * a success or request timeout ({@link #requestTimeoutMs(long)}). + * * <p>This setting is to configure the max speculative timeout, in milliseconds. * * @param timeoutMs @@ -203,17 +217,19 @@ public class DistributedLogMultiStreamWriter implements Runnable { * @return builder */ public Builder maxSpeculativeTimeoutMs(int timeoutMs) { - this._maxSpeculativeTimeoutMs = timeoutMs; + this.maxSpeculativeTimeoutMs = timeoutMs; return this; } /** * Set the speculative timeout backoff multiplier. + * * <p>The multi-streams writer does speculative writes on streams. * The write issues first write request to a stream, if the write request * doesn't respond within speculative timeout. it issues next write request * to a different stream. It does such speculative retries until receive * a success or request timeout ({@link #requestTimeoutMs(long)}). + * * <p>This setting is to configure the speculative timeout backoff multiplier. * * @param multiplier @@ -221,7 +237,7 @@ public class DistributedLogMultiStreamWriter implements Runnable { * @return builder */ public Builder speculativeBackoffMultiplier(float multiplier) { - this._speculativeBackoffMultiplier = multiplier; + this.speculativeBackoffMultiplier = multiplier; return this; } @@ -234,7 +250,7 @@ public class DistributedLogMultiStreamWriter implements Runnable { * @see Ticker */ public Builder clockTicker(Ticker ticker) { - this._ticker = ticker; + this.ticker = ticker; return this; } @@ -244,29 +260,29 @@ public class DistributedLogMultiStreamWriter implements Runnable { * @return the multi stream writer. */ public DistributedLogMultiStreamWriter build() { - Preconditions.checkArgument((null != _streams && !_streams.isEmpty()), + checkArgument((null != streams && !streams.isEmpty()), "No streams provided"); - Preconditions.checkNotNull(_client, + checkNotNull(client, "No distributedlog client provided"); - Preconditions.checkNotNull(_codec, + checkNotNull(codec, "No compression codec provided"); - Preconditions.checkArgument(_firstSpeculativeTimeoutMs > 0 - && _firstSpeculativeTimeoutMs <= _maxSpeculativeTimeoutMs - && _speculativeBackoffMultiplier > 0 - && _maxSpeculativeTimeoutMs < _requestTimeoutMs, + checkArgument(firstSpeculativeTimeoutMs > 0 + && firstSpeculativeTimeoutMs <= maxSpeculativeTimeoutMs + && speculativeBackoffMultiplier > 0 + && maxSpeculativeTimeoutMs < requestTimeoutMs, "Invalid speculative timeout settings"); return new DistributedLogMultiStreamWriter( - _streams, - _client, - Math.min(_bufferSize, MAX_LOGRECORDSET_SIZE), - _flushIntervalMicros, - _requestTimeoutMs, - _firstSpeculativeTimeoutMs, - _maxSpeculativeTimeoutMs, - _speculativeBackoffMultiplier, - _codec, - _ticker, - _executorService); + streams, + client, + Math.min(bufferSize, MAX_LOGRECORDSET_SIZE), + flushIntervalMicros, + requestTimeoutMs, + firstSpeculativeTimeoutMs, + maxSpeculativeTimeoutMs, + speculativeBackoffMultiplier, + codec, + ticker, + executorService); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/monitor/MonitorServiceClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/monitor/MonitorServiceClient.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/monitor/MonitorServiceClient.java index 427a793..e541578 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/monitor/MonitorServiceClient.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/monitor/MonitorServiceClient.java @@ -18,11 +18,13 @@ package com.twitter.distributedlog.client.monitor; import com.twitter.util.Future; - import java.net.SocketAddress; import java.util.Map; import java.util.Set; +/** + * Interface for distributedlog monitor service. + */ public interface MonitorServiceClient { /** @@ -35,7 +37,7 @@ public interface MonitorServiceClient { Future<Void> check(String stream); /** - * Send heartbeat to the stream and its readers + * Send heartbeat to the stream and its readers. * * @param stream * stream. @@ -51,7 +53,7 @@ public interface MonitorServiceClient { Map<SocketAddress, Set<String>> getStreamOwnershipDistribution(); /** - * Enable/Disable accepting new stream on a given proxy + * Enable/Disable accepting new stream on a given proxy. * * @param enabled * flag to enable/disable accepting new streams on a given proxy @@ -60,7 +62,7 @@ public interface MonitorServiceClient { Future<Void> setAcceptNewStream(boolean enabled); /** - * Close the client + * Close the client. */ void close(); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/monitor/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/monitor/package-info.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/monitor/package-info.java new file mode 100644 index 0000000..c4e7df0 --- /dev/null +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/monitor/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * DistributedLog Monitor Client. + */ +package com.twitter.distributedlog.client.monitor; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ownership/OwnershipCache.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ownership/OwnershipCache.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ownership/OwnershipCache.java index 72ad14e..387d727 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ownership/OwnershipCache.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ownership/OwnershipCache.java @@ -21,21 +21,20 @@ import com.google.common.collect.ImmutableMap; import com.twitter.distributedlog.client.ClientConfig; import com.twitter.distributedlog.client.stats.OwnershipStatsLogger; import com.twitter.finagle.stats.StatsReceiver; -import org.jboss.netty.util.HashedWheelTimer; -import org.jboss.netty.util.Timeout; -import org.jboss.netty.util.TimerTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.net.SocketAddress; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import org.jboss.netty.util.HashedWheelTimer; +import org.jboss.netty.util.Timeout; +import org.jboss.netty.util.TimerTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Client Side Ownership Cache + * Client Side Ownership Cache. */ public class OwnershipCache implements TimerTask { @@ -62,8 +61,8 @@ public class OwnershipCache implements TimerTask { } private void scheduleDumpOwnershipCache() { - if (clientConfig.isPeriodicDumpOwnershipCacheEnabled() && - clientConfig.getPeriodicDumpOwnershipCacheIntervalMs() > 0) { + if (clientConfig.isPeriodicDumpOwnershipCacheEnabled() + && clientConfig.getPeriodicDumpOwnershipCacheIntervalMs() > 0) { timer.newTimeout(this, clientConfig.getPeriodicDumpOwnershipCacheIntervalMs(), TimeUnit.MILLISECONDS); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ownership/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ownership/package-info.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ownership/package-info.java new file mode 100644 index 0000000..721702e --- /dev/null +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ownership/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Utils for managing ownership at client side. + */ +package com.twitter.distributedlog.client.ownership; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/package-info.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/package-info.java index 508753a..aa167fb 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/package-info.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/package-info.java @@ -16,6 +16,6 @@ * limitations under the License. */ /** - * DistributedLog Client + * DistributedLog Client. */ package com.twitter.distributedlog.client; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyClient.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyClient.java index 4c79327..d131e28 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyClient.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyClient.java @@ -28,19 +28,21 @@ import com.twitter.finagle.thrift.ThriftClientFramedCodec; import com.twitter.finagle.thrift.ThriftClientRequest; import com.twitter.util.Duration; import com.twitter.util.Future; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import org.apache.thrift.protocol.TBinaryProtocol; import scala.Option; import scala.runtime.BoxedUnit; -import java.net.InetSocketAddress; -import java.net.SocketAddress; - /** * Client talks to a single proxy. */ public class ProxyClient { - public static interface Builder { + /** + * Builder to build a proxy client talking to given host <code>address</code>. + */ + public interface Builder { /** * Build a proxy client to <code>address</code>. * @@ -59,6 +61,9 @@ public class ProxyClient { return new DefaultBuilder(clientName, clientId, clientBuilder, clientConfig, clientStats); } + /** + * Default Builder for {@link ProxyClient}. + */ public static class DefaultBuilder implements Builder { private final String clientName; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyClientManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyClientManager.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyClientManager.java index 6364509..c7d56f6 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyClientManager.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyClientManager.java @@ -25,12 +25,6 @@ import com.twitter.distributedlog.client.stats.OpStats; import com.twitter.distributedlog.thrift.service.ClientInfo; import com.twitter.distributedlog.thrift.service.ServerInfo; import com.twitter.util.FutureEventListener; -import org.jboss.netty.util.HashedWheelTimer; -import org.jboss.netty.util.Timeout; -import org.jboss.netty.util.TimerTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.net.SocketAddress; import java.util.Map; import java.util.Set; @@ -40,6 +34,11 @@ import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.jboss.netty.util.HashedWheelTimer; +import org.jboss.netty.util.Timeout; +import org.jboss.netty.util.TimerTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Manager manages clients (channels) to proxies. @@ -94,8 +93,8 @@ public class ProxyClientManager implements TimerTask { return; } if (periodicHandshakeEnabled) { - final boolean syncOwnerships; - syncOwnerships = lastOwnershipSyncStopwatch.elapsed(TimeUnit.MILLISECONDS) >= clientConfig.getPeriodicOwnershipSyncIntervalMs(); + final boolean syncOwnerships = lastOwnershipSyncStopwatch.elapsed(TimeUnit.MILLISECONDS) + >= clientConfig.getPeriodicOwnershipSyncIntervalMs(); final Set<SocketAddress> hostsSnapshot = hostProvider.getHosts(); final AtomicInteger numHosts = new AtomicInteger(hostsSnapshot.size()); @@ -130,10 +129,13 @@ public class ProxyClientManager implements TimerTask { private void complete() { if (0 == numHosts.decrementAndGet()) { if (syncOwnerships) { - logger.info("Periodic handshaked with {} hosts : {} streams returned," + - " {} hosts succeeded, {} hosts failed", - new Object[]{hostsSnapshot.size(), numStreams.get(), - numSuccesses.get(), numFailures.get()}); + logger.info("Periodic handshaked with {} hosts : {} streams returned," + + " {} hosts succeeded, {} hosts failed", + new Object[] { + hostsSnapshot.size(), + numStreams.get(), + numSuccesses.get(), + numFailures.get()}); if (clientConfig.isHandshakeTracingEnabled()) { logger.info("Periodic handshaked stream distribution : {}", streamDistributions); } @@ -265,7 +267,7 @@ public class ProxyClientManager implements TimerTask { } /** - * Handshake with a given proxy + * Handshake with a given proxy. * * @param address * proxy address @@ -299,7 +301,7 @@ public class ProxyClientManager implements TimerTask { /** * Handshake with all proxies. * - * NOTE: this is a synchronous call. + * <p>NOTE: this is a synchronous call. */ public void handshake() { Set<SocketAddress> hostsSnapshot = hostProvider.getHosts(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyListener.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyListener.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyListener.java index 052ff67..e024825 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyListener.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyListener.java @@ -18,11 +18,10 @@ package com.twitter.distributedlog.client.proxy; import com.twitter.distributedlog.thrift.service.ServerInfo; - import java.net.SocketAddress; /** - * Listener on server changes + * Listener on server changes. */ public interface ProxyListener { /** http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/package-info.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/package-info.java new file mode 100644 index 0000000..dc28c76 --- /dev/null +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Clients that interact with individual proxies. + */ +package com.twitter.distributedlog.client.proxy; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/resolver/DefaultRegionResolver.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/resolver/DefaultRegionResolver.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/resolver/DefaultRegionResolver.java index 27f315f..ab2fbed 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/resolver/DefaultRegionResolver.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/resolver/DefaultRegionResolver.java @@ -24,6 +24,9 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +/** + * Default implementation of {@link RegionResolver}. + */ public class DefaultRegionResolver implements RegionResolver { private static final String DEFAULT_REGION = "default-region"; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/resolver/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/resolver/package-info.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/resolver/package-info.java new file mode 100644 index 0000000..4bb53a5 --- /dev/null +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/resolver/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Resolver to resolve network addresses. + */ +package com.twitter.distributedlog.client.resolver; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ConsistentHashRoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ConsistentHashRoutingService.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ConsistentHashRoutingService.java index 570c8bf..6d1e37e 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ConsistentHashRoutingService.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ConsistentHashRoutingService.java @@ -17,8 +17,10 @@ */ package com.twitter.distributedlog.client.routing; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import com.google.common.collect.MapDifference; import com.google.common.collect.Maps; @@ -34,14 +36,6 @@ import com.twitter.finagle.stats.Gauge; import com.twitter.finagle.stats.NullStatsReceiver; import com.twitter.finagle.stats.StatsReceiver; import com.twitter.util.Function0; -import org.apache.commons.lang3.tuple.Pair; -import org.jboss.netty.util.HashedWheelTimer; -import org.jboss.netty.util.Timeout; -import org.jboss.netty.util.TimerTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.collection.Seq; - import java.net.SocketAddress; import java.util.Arrays; import java.util.HashMap; @@ -53,7 +47,17 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang3.tuple.Pair; +import org.jboss.netty.util.HashedWheelTimer; +import org.jboss.netty.util.Timeout; +import org.jboss.netty.util.TimerTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.Seq; +/** + * Consistent Hashing Based {@link RoutingService}. + */ public class ConsistentHashRoutingService extends ServerSetRoutingService { private static final Logger logger = LoggerFactory.getLogger(ConsistentHashRoutingService.class); @@ -63,52 +67,60 @@ public class ConsistentHashRoutingService extends ServerSetRoutingService { return new ConsistentHashRoutingService(serverSetWatcher, numReplicas, 300, NullStatsReceiver.get()); } + /** + * Builder helper class to build a consistent hash bashed {@link RoutingService}. + * + * @return builder to build a consistent hash based {@link RoutingService}. + */ public static Builder newBuilder() { return new Builder(); } + /** + * Builder for building consistent hash based routing service. + */ public static class Builder implements RoutingService.Builder { - private ServerSet _serverSet; - private boolean _resolveFromName = false; - private int _numReplicas; - private int _blackoutSeconds = 300; - private StatsReceiver _statsReceiver = NullStatsReceiver.get(); + private ServerSet serverSet; + private boolean resolveFromName = false; + private int numReplicas; + private int blackoutSeconds = 300; + private StatsReceiver statsReceiver = NullStatsReceiver.get(); private Builder() {} public Builder serverSet(ServerSet serverSet) { - this._serverSet = serverSet; + this.serverSet = serverSet; return this; } public Builder resolveFromName(boolean enabled) { - this._resolveFromName = enabled; + this.resolveFromName = enabled; return this; } public Builder numReplicas(int numReplicas) { - this._numReplicas = numReplicas; + this.numReplicas = numReplicas; return this; } public Builder blackoutSeconds(int seconds) { - this._blackoutSeconds = seconds; + this.blackoutSeconds = seconds; return this; } public Builder statsReceiver(StatsReceiver statsReceiver) { - this._statsReceiver = statsReceiver; + this.statsReceiver = statsReceiver; return this; } @Override public RoutingService build() { - Preconditions.checkNotNull(_serverSet, "No serverset provided."); - Preconditions.checkNotNull(_statsReceiver, "No stats receiver provided."); - Preconditions.checkArgument(_numReplicas > 0, "Invalid number of replicas : " + _numReplicas); - return new ConsistentHashRoutingService(new TwitterServerSetWatcher(_serverSet, _resolveFromName), - _numReplicas, _blackoutSeconds, _statsReceiver); + checkNotNull(serverSet, "No serverset provided."); + checkNotNull(statsReceiver, "No stats receiver provided."); + checkArgument(numReplicas > 0, "Invalid number of replicas : " + numReplicas); + return new ConsistentHashRoutingService(new TwitterServerSetWatcher(serverSet, resolveFromName), + numReplicas, blackoutSeconds, statsReceiver); } } @@ -365,8 +377,9 @@ public class ConsistentHashRoutingService extends ServerSetRoutingService { circle.remove(shardId, host); if (reason.isPresent()) { if (reason.get() instanceof ChannelException) { - logger.info("Shard {} ({}) left due to ChannelException, black it out for {} seconds (message = {})", - new Object[] { shardId, host, blackoutSeconds, reason.get().toString() }); + logger.info("Shard {} ({}) left due to ChannelException, black it out for {} seconds" + + " (message = {})", + new Object[] { shardId, host, blackoutSeconds, reason.get().toString() }); BlackoutHost blackoutHost = new BlackoutHost(shardId, host); hashedWheelTimer.newTimeout(blackoutHost, blackoutSeconds, TimeUnit.SECONDS); } else { @@ -425,7 +438,7 @@ public class ConsistentHashRoutingService extends ServerSetRoutingService { // Assign a random negative shardId int shardId; do { - shardId = Math.min(-1 , (int)(Math.random() * Integer.MIN_VALUE)); + shardId = Math.min(-1 , (int) (Math.random() * Integer.MIN_VALUE)); } while (null != shardId2Address.get(shardId)); shard = shardId; } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/NameServerSet.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/NameServerSet.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/NameServerSet.java index 43d7902..eeba4ac 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/NameServerSet.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/NameServerSet.java @@ -17,13 +17,6 @@ */ package com.twitter.distributedlog.client.routing; -import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; - import com.google.common.collect.ImmutableSet; import com.twitter.common.base.Command; import com.twitter.common.base.Commands; @@ -36,16 +29,26 @@ import com.twitter.finagle.Resolver$; import com.twitter.thrift.Endpoint; import com.twitter.thrift.ServiceInstance; import com.twitter.thrift.Status; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.runtime.AbstractFunction1; import scala.runtime.BoxedUnit; +/** + * Finagle Name based {@link ServerSet} implementation. + */ class NameServerSet implements ServerSet { - static final Logger logger = LoggerFactory.getLogger(NameServerSet.class); + private static final Logger logger = LoggerFactory.getLogger(NameServerSet.class); - private volatile Set<HostChangeMonitor<ServiceInstance>> watchers = new HashSet<HostChangeMonitor<ServiceInstance>>(); + private volatile Set<HostChangeMonitor<ServiceInstance>> watchers = + new HashSet<HostChangeMonitor<ServiceInstance>>(); private volatile ImmutableSet<ServiceInstance> hostSet = ImmutableSet.of(); private AtomicBoolean resolutionPending = new AtomicBoolean(true); @@ -70,21 +73,22 @@ class NameServerSet implements ServerSet { private void initialize(Name name) { if (name instanceof TestName) { - ((TestName)name).changes(new AbstractFunction1<Addr, BoxedUnit>() { + ((TestName) name).changes(new AbstractFunction1<Addr, BoxedUnit>() { @Override public BoxedUnit apply(Addr varAddr) { return NameServerSet.this.respondToChanges(varAddr); } }); } else if (name instanceof Name.Bound) { - ((Name.Bound)name).addr().changes().respond(new AbstractFunction1<Addr, BoxedUnit>() { + ((Name.Bound) name).addr().changes().respond(new AbstractFunction1<Addr, BoxedUnit>() { @Override public BoxedUnit apply(Addr varAddr) { return NameServerSet.this.respondToChanges(varAddr); } }); } else { - logger.error("NameServerSet only supports Name.Bound. While the resolved name {} was {}", name, name.getClass()); + logger.error("NameServerSet only supports Name.Bound. While the resolved name {} was {}", + name, name.getClass()); throw new UnsupportedOperationException("NameServerSet only supports Name.Bound"); } } @@ -113,7 +117,7 @@ class NameServerSet implements ServerSet { ImmutableSet<ServiceInstance> newHostSet = oldHostSet; if (addr instanceof Addr.Bound) { - scala.collection.immutable.Set<Address> endpointAddresses = ((Addr.Bound)addr).addrs(); + scala.collection.immutable.Set<Address> endpointAddresses = ((Addr.Bound) addr).addrs(); scala.collection.Iterator<Address> endpointAddressesIterator = endpointAddresses.toIterator(); HashSet<ServiceInstance> serviceInstances = new HashSet<ServiceInstance>(); while (endpointAddressesIterator.hasNext()) { @@ -122,7 +126,7 @@ class NameServerSet implements ServerSet { newHostSet = ImmutableSet.copyOf(serviceInstances); } else if (addr instanceof Addr.Failed) { - logger.error("Name resolution failed", ((Addr.Failed)addr).cause()); + logger.error("Name resolution failed", ((Addr.Failed) addr).cause()); newHostSet = ImmutableSet.of(); } else if (addr.toString().equals("Pending")) { logger.info("Name resolution pending"); @@ -176,7 +180,9 @@ class NameServerSet implements ServerSet { * @deprecated The status field is deprecated. Please use {@link #join(java.net.InetSocketAddress, java.util.Map)} */ @Override - public EndpointStatus join(InetSocketAddress endpoint, Map<String, InetSocketAddress> additionalEndpoints, Status status) + public EndpointStatus join(InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints, + Status status) throws Group.JoinException, InterruptedException { throw new UnsupportedOperationException("NameServerSet does not support join"); } @@ -207,7 +213,9 @@ class NameServerSet implements ServerSet { * @throws InterruptedException if interrupted while waiting to join the server set */ @Override - public EndpointStatus join(InetSocketAddress endpoint, Map<String, InetSocketAddress> additionalEndpoints, int shardId) + public EndpointStatus join(InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints, + int shardId) throws Group.JoinException, InterruptedException { throw new UnsupportedOperationException("NameServerSet does not support join"); } @@ -246,7 +254,7 @@ class NameServerSet implements ServerSet { watchers.add(monitor); } - if(resolutionPending.compareAndSet(false, false)) { + if (resolutionPending.compareAndSet(false, false)) { monitor.onChange(hostSet); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RegionsRoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RegionsRoutingService.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RegionsRoutingService.java index beed2c9..4714270 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RegionsRoutingService.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RegionsRoutingService.java @@ -17,65 +17,83 @@ */ package com.twitter.distributedlog.client.routing; -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkNotNull; + import com.google.common.collect.Sets; import com.twitter.distributedlog.client.resolver.RegionResolver; import com.twitter.finagle.NoBrokersAvailableException; import com.twitter.finagle.stats.NullStatsReceiver; import com.twitter.finagle.stats.StatsReceiver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.net.SocketAddress; import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Chain multiple routing services + * Chain multiple routing services. */ public class RegionsRoutingService implements RoutingService { private static final Logger logger = LoggerFactory.getLogger(RegionsRoutingService.class); + /** + * Create a multiple regions routing services based on a list of region routing {@code services}. + * + * <p>It is deprecated. Please use {@link Builder} to build multiple regions routing service. + * + * @param regionResolver region resolver + * @param services a list of region routing services. + * @return multiple regions routing service + * @see Builder + */ @Deprecated public static RegionsRoutingService of(RegionResolver regionResolver, RoutingService...services) { return new RegionsRoutingService(regionResolver, services); } + /** + * Create a builder to build a multiple-regions routing service. + * + * @return builder to build a multiple-regions routing service. + */ public static Builder newBuilder() { return new Builder(); } + /** + * Builder to build a multiple-regions routing service. + */ public static class Builder implements RoutingService.Builder { - private RegionResolver _resolver; - private RoutingService.Builder[] _routingServiceBuilders; - private StatsReceiver _statsReceiver = NullStatsReceiver.get(); + private RegionResolver resolver; + private RoutingService.Builder[] routingServiceBuilders; + private StatsReceiver statsReceiver = NullStatsReceiver.get(); private Builder() {} public Builder routingServiceBuilders(RoutingService.Builder...builders) { - this._routingServiceBuilders = builders; + this.routingServiceBuilders = builders; return this; } public Builder resolver(RegionResolver regionResolver) { - this._resolver = regionResolver; + this.resolver = regionResolver; return this; } @Override public RoutingService.Builder statsReceiver(StatsReceiver statsReceiver) { - this._statsReceiver = statsReceiver; + this.statsReceiver = statsReceiver; return this; } @Override public RegionsRoutingService build() { - Preconditions.checkNotNull(_routingServiceBuilders, "No routing service builder provided."); - Preconditions.checkNotNull(_resolver, "No region resolver provided."); - Preconditions.checkNotNull(_statsReceiver, "No stats receiver provided"); - RoutingService[] services = new RoutingService[_routingServiceBuilders.length]; + checkNotNull(routingServiceBuilders, "No routing service builder provided."); + checkNotNull(resolver, "No region resolver provided."); + checkNotNull(statsReceiver, "No stats receiver provided"); + RoutingService[] services = new RoutingService[routingServiceBuilders.length]; for (int i = 0; i < services.length; i++) { String statsScope; if (0 == i) { @@ -83,11 +101,11 @@ public class RegionsRoutingService implements RoutingService { } else { statsScope = "remote_" + i; } - services[i] = _routingServiceBuilders[i] - .statsReceiver(_statsReceiver.scope(statsScope)) + services[i] = routingServiceBuilders[i] + .statsReceiver(statsReceiver.scope(statsScope)) .build(); } - return new RegionsRoutingService(_resolver, services); + return new RegionsRoutingService(resolver, services); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingService.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingService.java index c65cfa8..56446c1 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingService.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingService.java @@ -21,7 +21,6 @@ import com.twitter.distributedlog.client.resolver.RegionResolver; import com.twitter.distributedlog.thrift.service.StatusCode; import com.twitter.finagle.NoBrokersAvailableException; import com.twitter.finagle.stats.StatsReceiver; - import java.net.SocketAddress; import java.util.HashMap; import java.util.HashSet; @@ -33,10 +32,13 @@ import java.util.Set; */ public interface RoutingService { - public static interface Builder { + /** + * Builder to build routing service. + */ + interface Builder { /** - * Build routing service with stats receiver + * Build routing service with stats receiver. * * @param statsReceiver * stats receiver @@ -45,6 +47,8 @@ public interface RoutingService { Builder statsReceiver(StatsReceiver statsReceiver); /** + * Build the routing service. + * * @return built routing service */ RoutingService build(); @@ -52,9 +56,9 @@ public interface RoutingService { } /** - * Listener for server changes on routing service + * Listener for server changes on routing service. */ - public static interface RoutingListener { + interface RoutingListener { /** * Trigger when server left. * @@ -73,7 +77,7 @@ public interface RoutingService { /** * Routing Context of a request. */ - public static class RoutingContext { + class RoutingContext { public static RoutingContext of(RegionResolver resolver) { return new RoutingContext(resolver); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingUtils.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingUtils.java index 44f3179..2302e18 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingUtils.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingUtils.java @@ -18,18 +18,17 @@ package com.twitter.distributedlog.client.routing; import com.twitter.common.zookeeper.ServerSet; - import java.net.SocketAddress; /** - * Utils for routing services + * Utils for routing services. */ public class RoutingUtils { private static final int NUM_CONSISTENT_HASH_REPLICAS = 997; /** - * Building routing service from <code>finagleNameStr</code> + * Building routing service from <code>finagleNameStr</code>. * * @param finagleNameStr * finagle name str of a service http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ServerSetRoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ServerSetRoutingService.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ServerSetRoutingService.java index 02c0481..19ccfc4 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ServerSetRoutingService.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ServerSetRoutingService.java @@ -17,7 +17,8 @@ */ package com.twitter.distributedlog.client.routing; -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkNotNull; + import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.common.hash.HashFunction; @@ -25,9 +26,6 @@ import com.google.common.hash.Hashing; import com.twitter.distributedlog.service.DLSocketAddress; import com.twitter.finagle.NoBrokersAvailableException; import com.twitter.finagle.stats.StatsReceiver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.net.SocketAddress; import java.util.ArrayList; import java.util.Collections; @@ -39,26 +37,31 @@ import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Routing Service based on a given {@link com.twitter.common.zookeeper.ServerSet}. */ class ServerSetRoutingService extends Thread implements RoutingService { - static final Logger logger = LoggerFactory.getLogger(ServerSetRoutingService.class); + private static final Logger logger = LoggerFactory.getLogger(ServerSetRoutingService.class); static ServerSetRoutingServiceBuilder newServerSetRoutingServiceBuilder() { return new ServerSetRoutingServiceBuilder(); } + /** + * Builder to build {@link com.twitter.common.zookeeper.ServerSet} based routing service. + */ static class ServerSetRoutingServiceBuilder implements RoutingService.Builder { - private ServerSetWatcher _serverSetWatcher; + private ServerSetWatcher serverSetWatcher; private ServerSetRoutingServiceBuilder() {} public ServerSetRoutingServiceBuilder serverSetWatcher(ServerSetWatcher serverSetWatcher) { - this._serverSetWatcher = serverSetWatcher; + this.serverSetWatcher = serverSetWatcher; return this; } @@ -69,14 +72,14 @@ class ServerSetRoutingService extends Thread implements RoutingService { @Override public RoutingService build() { - Preconditions.checkNotNull(_serverSetWatcher, "No serverset watcher provided."); - return new ServerSetRoutingService(this._serverSetWatcher); + checkNotNull(serverSetWatcher, "No serverset watcher provided."); + return new ServerSetRoutingService(this.serverSetWatcher); } } private static class HostComparator implements Comparator<SocketAddress> { - static final HostComparator instance = new HostComparator(); + private static final HostComparator INSTANCE = new HostComparator(); @Override public int compare(SocketAddress o1, SocketAddress o2) { @@ -165,7 +168,7 @@ class ServerSetRoutingService extends Thread implements RoutingService { address = newList.get(hostId); int i = hostId; while (rContext.isTriedHost(address)) { - i = (i+1) % newList.size(); + i = (i + 1) % newList.size(); if (i == hostId) { address = null; break; @@ -188,7 +191,7 @@ class ServerSetRoutingService extends Thread implements RoutingService { logger.info("Node {} left due to : ", host, reason); } hostList = new ArrayList<SocketAddress>(hostSet); - Collections.sort(hostList, HostComparator.instance); + Collections.sort(hostList, HostComparator.INSTANCE); logger.info("Host list becomes : {}.", hostList); } } @@ -253,7 +256,7 @@ class ServerSetRoutingService extends Thread implements RoutingService { synchronized (hostSet) { hostList = new ArrayList<SocketAddress>(hostSet); - Collections.sort(hostList, HostComparator.instance); + Collections.sort(hostList, HostComparator.INSTANCE); logger.info("Host list becomes : {}.", hostList); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ServerSetWatcher.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ServerSetWatcher.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ServerSetWatcher.java index 16f96b8..1eccb63 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ServerSetWatcher.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ServerSetWatcher.java @@ -21,11 +21,14 @@ import com.google.common.collect.ImmutableSet; import com.twitter.distributedlog.service.DLSocketAddress; /** - * Watch on server set changes + * Watch on server set changes. */ public interface ServerSetWatcher { - public static class MonitorException extends Exception { + /** + * Exception thrown when failed to monitor serverset. + */ + class MonitorException extends Exception { private static final long serialVersionUID = 392751505154339548L; @@ -39,14 +42,15 @@ public interface ServerSetWatcher { } /** - * An interface to an object that is interested in receiving notification whenever the host set - * changes. + * An interface to an object that is interested in receiving notification whenever the host set changes. */ - public static interface ServerSetMonitor { + interface ServerSetMonitor { /** - * Called when either the available set of services changes (when a service dies or a new - * instance comes on-line) or when an existing service advertises a status or health change. + * Called when either the available set of services changes. + * + * <p>It happens either when a service dies or a new INSTANCE comes on-line or + * when an existing service advertises a status or health change. * * @param hostSet the current set of available ServiceInstances */ @@ -54,8 +58,9 @@ public interface ServerSetWatcher { } /** - * Registers a monitor to receive change notices for this server set as long as this jvm process - * is alive. Blocks until the initial server set can be gathered and delivered to the monitor. + * Registers a monitor to receive change notices for this server set as long as this jvm process is alive. + * + * <p>Blocks until the initial server set can be gathered and delivered to the monitor. * The monitor will be notified if the membership set or parameters of existing members have * changed. * http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/SingleHostRoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/SingleHostRoutingService.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/SingleHostRoutingService.java index 2cb9d89..15356ff 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/SingleHostRoutingService.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/SingleHostRoutingService.java @@ -17,15 +17,18 @@ */ package com.twitter.distributedlog.client.routing; -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkNotNull; + import com.google.common.collect.Sets; import com.twitter.finagle.NoBrokersAvailableException; import com.twitter.finagle.stats.StatsReceiver; - import java.net.SocketAddress; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; +/** + * Single Host Routing Service. + */ class SingleHostRoutingService implements RoutingService { @Deprecated @@ -33,18 +36,26 @@ class SingleHostRoutingService implements RoutingService { return new SingleHostRoutingService(address); } + /** + * Builder to build single host based routing service. + * + * @return builder to build single host based routing service. + */ public static Builder newBuilder() { return new Builder(); } + /** + * Builder to build single host based routing service. + */ public static class Builder implements RoutingService.Builder { - private SocketAddress _address; + private SocketAddress address; private Builder() {} public Builder address(SocketAddress address) { - this._address = address; + this.address = address; return this; } @@ -55,8 +66,8 @@ class SingleHostRoutingService implements RoutingService { @Override public RoutingService build() { - Preconditions.checkNotNull(_address, "Host is null"); - return new SingleHostRoutingService(_address); + checkNotNull(address, "Host is null"); + return new SingleHostRoutingService(address); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/TestName.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/TestName.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/TestName.java index 8d21aa2..8101075 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/TestName.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/TestName.java @@ -17,21 +17,23 @@ */ package com.twitter.distributedlog.client.routing; +import com.twitter.finagle.Addr; +import com.twitter.finagle.Address; +import com.twitter.finagle.Addrs; +import com.twitter.finagle.Name; import java.util.List; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.runtime.AbstractFunction1; import scala.runtime.BoxedUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +/** + * A {@link Name} implementation for testing purpose. + */ +public class TestName implements Name { -import com.twitter.finagle.Addrs; -import com.twitter.finagle.Addr; -import com.twitter.finagle.Address; -import com.twitter.finagle.Name; + private static final Logger LOG = LoggerFactory.getLogger(TestName.class); -public class TestName implements Name { - static final Logger LOG = LoggerFactory.getLogger(TestName.class); private AbstractFunction1<Addr, BoxedUnit> callback = null; public void changes(AbstractFunction1<Addr, BoxedUnit> callback) { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/TwitterServerSetWatcher.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/TwitterServerSetWatcher.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/TwitterServerSetWatcher.java index a9414f0..cffb9b9 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/TwitterServerSetWatcher.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/TwitterServerSetWatcher.java @@ -18,21 +18,29 @@ package com.twitter.distributedlog.client.routing; import com.google.common.collect.ImmutableSet; - import com.google.common.collect.Sets; import com.twitter.common.net.pool.DynamicHostSet; import com.twitter.common.zookeeper.ServerSet; import com.twitter.distributedlog.service.DLSocketAddress; import com.twitter.thrift.Endpoint; import com.twitter.thrift.ServiceInstance; - import java.net.InetSocketAddress; import java.util.Set; +/** + * Twitter {@link ServerSet} based watcher. + */ public class TwitterServerSetWatcher implements ServerSetWatcher { + private final ServerSet serverSet; private final boolean resolvedFromName; + /** + * Construct a {@link ServerSet} based watcher. + * + * @param serverSet server set. + * @param resolvedFromName whether to resolve hosts from {@link com.twitter.finagle.Name}. + */ public TwitterServerSetWatcher(ServerSet serverSet, boolean resolvedFromName) { this.serverSet = serverSet; @@ -40,8 +48,9 @@ public class TwitterServerSetWatcher implements ServerSetWatcher { } /** - * Registers a monitor to receive change notices for this server set as long as this jvm process - * is alive. Blocks until the initial server set can be gathered and delivered to the monitor. + * Registers a monitor to receive change notices for this server set as long as this jvm process is alive. + * + * <p>Blocks until the initial server set can be gathered and delivered to the monitor. * The monitor will be notified if the membership set or parameters of existing members have * changed. * http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/package-info.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/package-info.java new file mode 100644 index 0000000..a282b42 --- /dev/null +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Routing Mechanisms to route the traffic to the owner of streams. + */ +package com.twitter.distributedlog.client.routing; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/serverset/DLZkServerSet.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/serverset/DLZkServerSet.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/serverset/DLZkServerSet.java index 92c7c29..4ca3aa6 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/serverset/DLZkServerSet.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/serverset/DLZkServerSet.java @@ -24,14 +24,13 @@ import com.twitter.common.quantity.Time; import com.twitter.common.zookeeper.ServerSet; import com.twitter.common.zookeeper.ServerSets; import com.twitter.common.zookeeper.ZooKeeperClient; +import java.net.InetSocketAddress; +import java.net.URI; import org.apache.commons.lang.StringUtils; import org.apache.zookeeper.ZooDefs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.InetSocketAddress; -import java.net.URI; - /** * A wrapper over zookeeper client and its server set. */ http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/367335d5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/serverset/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/serverset/package-info.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/serverset/package-info.java new file mode 100644 index 0000000..49166ec --- /dev/null +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/serverset/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Utils related to server set. + */ +package com.twitter.distributedlog.client.serverset; \ No newline at end of file
