DL-132: Enable check style for distributedlog service module. Author: Xi Liu <xiliu...@gmail.com>
Reviewers: Sijie Guo <si...@apache.org> Closes #89 from xiliuant/xi/checkstyle_service Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/1a30b0ce Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/1a30b0ce Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/1a30b0ce Branch: refs/heads/master Commit: 1a30b0ceb76f33eda08b611d97c150f45f239a95 Parents: 32a52a9 Author: Xi Liu <xiliu...@gmail.com> Authored: Wed Jan 4 00:43:56 2017 -0800 Committer: Sijie Guo <si...@apache.org> Committed: Wed Jan 4 00:43:56 2017 -0800 ---------------------------------------------------------------------- .../resources/distributedlog/checkstyle.xml | 2 +- distributedlog-service/pom.xml | 33 ++ .../distributedlog/service/ClientUtils.java | 3 + .../service/DistributedLogCluster.java | 78 ++--- .../service/DistributedLogServer.java | 64 ++-- .../service/DistributedLogServerApp.java | 38 ++- .../service/DistributedLogServiceImpl.java | 48 +-- .../service/FatalErrorHandler.java | 7 - .../distributedlog/service/MonitorService.java | 29 +- .../service/MonitorServiceApp.java | 16 +- .../service/ServerFeatureKeys.java | 2 +- .../distributedlog/service/StatsFilter.java | 8 +- .../service/announcer/Announcer.java | 2 +- .../service/announcer/NOPAnnouncer.java | 3 + .../service/announcer/ServerSetAnnouncer.java | 10 +- .../service/announcer/package-info.java | 21 ++ .../service/balancer/Balancer.java | 5 + .../service/balancer/BalancerTool.java | 42 ++- .../service/balancer/ClusterBalancer.java | 27 +- .../balancer/CountBasedStreamChooser.java | 9 +- .../service/balancer/LimitedStreamChooser.java | 10 + .../service/balancer/SimpleBalancer.java | 17 +- .../service/balancer/StreamChooser.java | 2 +- .../service/balancer/StreamMover.java | 5 +- .../service/balancer/StreamMoverImpl.java | 2 +- .../service/balancer/package-info.java | 21 ++ .../config/DefaultStreamConfigProvider.java | 29 +- .../service/config/ServerConfiguration.java | 106 ++++--- .../config/ServiceStreamConfigProvider.java | 19 +- .../service/config/package-info.java | 21 ++ .../distributedlog/service/package-info.java | 21 ++ .../service/placement/EqualLoadAppraiser.java | 26 +- .../placement/LeastLoadPlacementPolicy.java | 311 ++++++++++--------- .../service/placement/LoadAppraiser.java | 18 +- .../service/placement/PlacementPolicy.java | 194 ++++++------ .../placement/PlacementStateManager.java | 80 +++-- .../service/placement/ServerLoad.java | 227 +++++++------- .../service/placement/StreamLoad.java | 141 +++++---- .../placement/ZKPlacementStateManager.java | 236 +++++++------- .../service/placement/package-info.java | 21 ++ .../service/stream/AbstractStreamOp.java | 23 +- .../service/stream/AbstractWriteOp.java | 9 +- .../service/stream/BulkWriteOp.java | 29 +- .../distributedlog/service/stream/DeleteOp.java | 7 +- .../service/stream/HeartbeatOp.java | 11 +- .../service/stream/ReleaseOp.java | 7 +- .../distributedlog/service/stream/Stream.java | 7 +- .../service/stream/StreamFactory.java | 3 + .../service/stream/StreamFactoryImpl.java | 3 + .../service/stream/StreamImpl.java | 34 +- .../service/stream/StreamManager.java | 4 +- .../service/stream/StreamManagerImpl.java | 5 +- .../distributedlog/service/stream/StreamOp.java | 2 - .../service/stream/StreamOpStats.java | 2 +- .../service/stream/TruncateOp.java | 11 +- .../distributedlog/service/stream/WriteOp.java | 17 +- .../service/stream/WriteOpWithPayload.java | 3 + .../service/stream/admin/AdminOp.java | 6 +- .../service/stream/admin/CreateOp.java | 7 +- .../service/stream/admin/StreamAdminOp.java | 5 +- .../service/stream/admin/package-info.java | 21 ++ .../stream/limiter/DynamicRequestLimiter.java | 21 +- .../stream/limiter/RequestLimiterBuilder.java | 24 +- .../stream/limiter/ServiceRequestLimiter.java | 8 +- .../stream/limiter/StreamAcquireLimiter.java | 6 +- .../stream/limiter/StreamRequestLimiter.java | 5 +- .../service/stream/limiter/package-info.java | 21 ++ .../service/stream/package-info.java | 21 ++ .../CacheableStreamPartitionConverter.java | 3 + .../DelimiterStreamPartitionConverter.java | 2 +- .../service/streamset/Partition.java | 1 + .../service/streamset/PartitionMap.java | 3 + .../service/streamset/package-info.java | 21 ++ .../distributedlog/service/tools/ProxyTool.java | 30 +- .../service/tools/package-info.java | 21 ++ .../service/utils/package-info.java | 21 ++ .../stats/CodahaleMetricsServletProvider.java | 4 +- .../HealthCheckServletContextListener.java | 2 +- .../stats/MetricsServletContextListener.java | 3 + .../bookkeeper/stats/ServletReporter.java | 2 +- .../apache/bookkeeper/stats/package-info.java | 21 ++ .../client/routing/LocalRoutingService.java | 7 +- .../service/DistributedLogServerTestCase.java | 33 +- .../service/TestDistributedLogServerBase.java | 62 ++-- .../TestDistributedLogServerClientRouting.java | 9 +- .../service/TestDistributedLogService.java | 88 +++--- .../service/TestRegionUnavailable.java | 22 +- .../distributedlog/service/TestStatsFilter.java | 11 +- .../service/balancer/TestBalancerUtils.java | 8 +- .../service/balancer/TestClusterBalancer.java | 26 +- .../balancer/TestCountBasedStreamChooser.java | 16 +- .../service/balancer/TestSimpleBalancer.java | 22 +- .../service/balancer/TestStreamMover.java | 12 +- .../service/config/TestServerConfiguration.java | 8 +- .../config/TestStreamConfigProvider.java | 14 +- .../placement/TestLeastLoadPlacementPolicy.java | 247 ++++++++------- .../service/placement/TestServerLoad.java | 50 +-- .../service/placement/TestStreamLoad.java | 28 +- .../placement/TestZKPlacementStateManager.java | 197 ++++++------ .../service/stream/TestStreamManager.java | 24 +- .../service/stream/TestStreamOp.java | 27 +- .../limiter/TestServiceRequestLimiter.java | 32 +- .../TestDelimiterStreamPartitionConverter.java | 6 +- .../TestIdentityStreamPartitionConverter.java | 8 +- .../service/streamset/TestPartitionMap.java | 7 +- 105 files changed, 1974 insertions(+), 1400 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-build-tools/src/main/resources/distributedlog/checkstyle.xml ---------------------------------------------------------------------- diff --git a/distributedlog-build-tools/src/main/resources/distributedlog/checkstyle.xml b/distributedlog-build-tools/src/main/resources/distributedlog/checkstyle.xml index e1117c8..db3549f 100644 --- a/distributedlog-build-tools/src/main/resources/distributedlog/checkstyle.xml +++ b/distributedlog-build-tools/src/main/resources/distributedlog/checkstyle.xml @@ -260,7 +260,7 @@ page at http://checkstyle.sourceforge.net/config.html --> T, K, V, W, X or else be capital-case terminated with a T, such as MyGenericParameterT --> <module name="ClassTypeParameterName"> - <property name="format" value="^(((T|K|V|W|X)[0-9]*)|([A-Z][a-z][a-zA-Z]*T))$"/> + <property name="format" value="^(((T|K|V|W|X|R)[0-9]*)|([A-Z][a-z][a-zA-Z]*))$"/> <property name="severity" value="error"/> </module> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/pom.xml ---------------------------------------------------------------------- diff --git a/distributedlog-service/pom.xml b/distributedlog-service/pom.xml index 052ce15..154dedb 100644 --- a/distributedlog-service/pom.xml +++ b/distributedlog-service/pom.xml @@ -197,6 +197,39 @@ </execution> </executions> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>2.17</version> + <dependencies> + <dependency> + <groupId>com.puppycrawl.tools</groupId> + <artifactId>checkstyle</artifactId> + <version>6.19</version> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>distributedlog-build-tools</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + <configuration> + <configLocation>distributedlog/checkstyle.xml</configLocation> + <suppressionsLocation>distributedlog/suppressions.xml</suppressionsLocation> + <consoleOutput>true</consoleOutput> + <failOnViolation>true</failOnViolation> + <includeResources>false</includeResources> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + </configuration> + <executions> + <execution> + <phase>test-compile</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> <profiles> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ClientUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ClientUtils.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ClientUtils.java index dd9961d..da36014 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ClientUtils.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ClientUtils.java @@ -21,6 +21,9 @@ import com.twitter.distributedlog.client.DistributedLogClientImpl; import com.twitter.distributedlog.client.monitor.MonitorServiceClient; import org.apache.commons.lang3.tuple.Pair; +/** + * DistributedLog Client Related Utils. + */ public class ClientUtils { public static Pair<DistributedLogClient, MonitorServiceClient> buildClient(DistributedLogClientBuilder builder) { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java index a2a0ca6..029c822 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java @@ -25,6 +25,13 @@ import com.twitter.distributedlog.metadata.DLMetadata; import com.twitter.distributedlog.service.placement.EqualLoadAppraiser; import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter; import com.twitter.finagle.builder.Server; +import java.io.File; +import java.net.BindException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim; import org.apache.bookkeeper.stats.NullStatsProvider; @@ -35,14 +42,6 @@ import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.net.BindException; -import java.net.InetSocketAddress; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - /** * DistributedLog Cluster is an emulator to run distributedlog components. */ @@ -59,18 +58,18 @@ public class DistributedLogCluster { */ public static class Builder { - int _numBookies = 3; - boolean _shouldStartZK = true; - String _zkHost = "127.0.0.1"; - int _zkPort = 0; - boolean _shouldStartProxy = true; - int _proxyPort = 7000; - boolean _thriftmux = false; - DistributedLogConfiguration _dlConf = new DistributedLogConfiguration() + int numBookies = 3; + boolean shouldStartZK = true; + String zkHost = "127.0.0.1"; + int zkPort = 0; + boolean shouldStartProxy = true; + int proxyPort = 7000; + boolean thriftmux = false; + DistributedLogConfiguration dlConf = new DistributedLogConfiguration() .setLockTimeout(10) .setOutputBufferSize(0) .setImmediateFlushEnabled(true); - ServerConfiguration _bkConf = new ServerConfiguration(); + ServerConfiguration bkConf = new ServerConfiguration(); private Builder() {} @@ -80,7 +79,7 @@ public class DistributedLogCluster { * @return builder */ public Builder numBookies(int numBookies) { - this._numBookies = numBookies; + this.numBookies = numBookies; return this; } @@ -92,7 +91,7 @@ public class DistributedLogCluster { * @return builder */ public Builder shouldStartZK(boolean startZK) { - this._shouldStartZK = startZK; + this.shouldStartZK = startZK; return this; } @@ -104,7 +103,7 @@ public class DistributedLogCluster { * @return builder */ public Builder zkServers(String zkServers) { - this._zkHost = zkServers; + this.zkHost = zkServers; return this; } @@ -116,7 +115,7 @@ public class DistributedLogCluster { * @return builder. */ public Builder zkPort(int zkPort) { - this._zkPort = zkPort; + this.zkPort = zkPort; return this; } @@ -128,7 +127,7 @@ public class DistributedLogCluster { * @return builder */ public Builder shouldStartProxy(boolean startProxy) { - this._shouldStartProxy = startProxy; + this.shouldStartProxy = startProxy; return this; } @@ -140,60 +139,63 @@ public class DistributedLogCluster { * @return builder */ public Builder proxyPort(int proxyPort) { - this._proxyPort = proxyPort; + this.proxyPort = proxyPort; return this; } /** - * DistributedLog Configuration + * Set the distributedlog configuration. * * @param dlConf * distributedlog configuration * @return builder */ public Builder dlConf(DistributedLogConfiguration dlConf) { - this._dlConf = dlConf; + this.dlConf = dlConf; return this; } /** - * Bookkeeper server configuration + * Set the Bookkeeper server configuration. * * @param bkConf * bookkeeper server configuration * @return builder */ public Builder bkConf(ServerConfiguration bkConf) { - this._bkConf = bkConf; + this.bkConf = bkConf; return this; } /** - * Enable thriftmux for the dl server + * Enable thriftmux for the dl server. * * @param enabled flag to enable thriftmux * @return builder */ public Builder thriftmux(boolean enabled) { - this._thriftmux = enabled; + this.thriftmux = enabled; return this; } public DistributedLogCluster build() throws Exception { // build the cluster return new DistributedLogCluster( - _dlConf, - _bkConf, - _numBookies, - _shouldStartZK, - _zkHost, - _zkPort, - _shouldStartProxy, - _proxyPort, - _thriftmux); + dlConf, + bkConf, + numBookies, + shouldStartZK, + zkHost, + zkPort, + shouldStartProxy, + proxyPort, + thriftmux); } } + /** + * Run a distributedlog proxy server. + */ public static class DLServer { static final int MAX_RETRIES = 20; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java index a9ba125..248bcf7 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java @@ -17,32 +17,8 @@ */ package com.twitter.distributedlog.service; -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.MalformedURLException; -import java.net.URI; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import scala.Option; -import scala.Tuple2; - import com.google.common.base.Optional; import com.google.common.util.concurrent.ThreadFactoryBuilder; - -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.stats.StatsProvider; -import org.apache.bookkeeper.util.ReflectionUtils; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.client.routing.RoutingService; import com.twitter.distributedlog.config.DynamicConfigurationFactory; @@ -72,10 +48,33 @@ import com.twitter.finagle.thrift.ClientIdRequiredFilter; import com.twitter.finagle.thrift.ThriftServerFramedCodec; import com.twitter.finagle.transport.Transport; import com.twitter.util.Duration; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.stats.StatsProvider; +import org.apache.bookkeeper.util.ReflectionUtils; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.Tuple2; +/** + * Running the distributedlog proxy server. + */ public class DistributedLogServer { - static final Logger logger = LoggerFactory.getLogger(DistributedLogServer.class); + private static final Logger logger = LoggerFactory.getLogger(DistributedLogServer.class); private static final String DEFAULT_LOAD_APPRIASER = EqualLoadAppraiser.class.getCanonicalName(); private DistributedLogServiceImpl dlService = null; @@ -124,7 +123,8 @@ public class DistributedLogServer { this.loadAppraiserClassStr = loadAppraiserClass; } - public void runServer() throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException { + public void runServer() + throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException { if (!uri.isPresent()) { throw new IllegalArgumentException("No distributedlog uri provided."); } @@ -135,7 +135,8 @@ public class DistributedLogServer { try { dlConf.loadConf(new File(configFile).toURI().toURL()); } catch (ConfigurationException e) { - throw new IllegalArgumentException("Failed to load distributedlog configuration from " + configFile + "."); + throw new IllegalArgumentException("Failed to load distributedlog configuration from " + + configFile + "."); } catch (MalformedURLException e) { throw new IllegalArgumentException("Failed to load distributedlog configuration from malformed " + configFile + "."); @@ -185,7 +186,8 @@ public class DistributedLogServer { } Class loadAppraiserClass = Class.forName(loadAppraiserClassStr.or(DEFAULT_LOAD_APPRIASER)); LoadAppraiser loadAppraiser = (LoadAppraiser) ReflectionUtils.newInstance(loadAppraiserClass); - logger.info("Supplied load appraiser class is " + loadAppraiserClassStr.get() + " Instantiated " + loadAppraiser.getClass().getCanonicalName()); + logger.info("Supplied load appraiser class is " + loadAppraiserClassStr.get() + + " Instantiated " + loadAppraiser.getClass().getCanonicalName()); StreamConfigProvider streamConfProvider = getStreamConfigProvider(dlConf, converter); @@ -227,7 +229,8 @@ public class DistributedLogServer { } } - private DynamicDistributedLogConfiguration getServiceDynConf(DistributedLogConfiguration dlConf) throws ConfigurationException { + private DynamicDistributedLogConfiguration getServiceDynConf(DistributedLogConfiguration dlConf) + throws ConfigurationException { Optional<DynamicDistributedLogConfiguration> dynConf = Optional.absent(); if (conf.isPresent()) { DynamicConfigurationFactory configFactory = new DynamicConfigurationFactory( @@ -341,7 +344,8 @@ public class DistributedLogServer { logger.info("Using thriftmux."); Tuple2<Transport.Liveness, Stack.Param<Transport.Liveness>> livenessParam = new Transport.Liveness( Duration.Top(), Duration.Top(), Option.apply((Object) Boolean.valueOf(true))).mk(); - serverBuilder = serverBuilder.stack(ThriftMuxServer$.MODULE$.configured(livenessParam._1(), livenessParam._2())); + serverBuilder = serverBuilder.stack( + ThriftMuxServer$.MODULE$.configured(livenessParam._1(), livenessParam._2())); } logger.info("DistributedLogServer running with the following configuration : \n{}", dlConf.getPropsAsString()); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java index 1c3d8d4..55ed84f 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java @@ -17,15 +17,26 @@ */ package com.twitter.distributedlog.service; +import static com.google.common.base.Preconditions.checkArgument; +import static com.twitter.distributedlog.util.CommandLineUtils.getOptionalBooleanArg; +import static com.twitter.distributedlog.util.CommandLineUtils.getOptionalIntegerArg; +import static com.twitter.distributedlog.util.CommandLineUtils.getOptionalStringArg; + import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.client.routing.RoutingService; import com.twitter.distributedlog.client.routing.RoutingUtils; import com.twitter.distributedlog.client.serverset.DLZkServerSet; import com.twitter.finagle.stats.NullStatsReceiver; import com.twitter.finagle.stats.StatsReceiver; +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URI; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; import org.apache.bookkeeper.stats.NullStatsProvider; import org.apache.bookkeeper.stats.StatsProvider; import org.apache.bookkeeper.util.ReflectionUtils; @@ -38,21 +49,14 @@ import org.apache.commons.configuration.ConfigurationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; -import java.io.File; -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URI; -import java.util.Arrays; -import java.util.concurrent.TimeUnit; - -import static com.twitter.distributedlog.util.CommandLineUtils.*; - +/** + * The launcher of the distributedlog proxy server. + */ public class DistributedLogServerApp { - static final Logger logger = LoggerFactory.getLogger(DistributedLogServerApp.class); + private static final Logger logger = LoggerFactory.getLogger(DistributedLogServerApp.class); - private final static String USAGE = "DistributedLogServerApp [-u <uri>] [-c <conf>]"; + private static final String USAGE = "DistributedLogServerApp [-u <uri>] [-c <conf>]"; private final String[] args; private final Options options = new Options(); @@ -104,7 +108,8 @@ public class DistributedLogServerApp { } } - private void runCmd(CommandLine cmdline) throws IllegalArgumentException, IOException, ConfigurationException, ClassNotFoundException { + private void runCmd(CommandLine cmdline) + throws IllegalArgumentException, IOException, ConfigurationException, ClassNotFoundException { final StatsReceiver statsReceiver = NullStatsReceiver.get(); Optional<String> confOptional = getOptionalStringArg(cmdline, "c"); DistributedLogConfiguration dlConf = new DistributedLogConfiguration(); @@ -113,7 +118,8 @@ public class DistributedLogServerApp { try { dlConf.loadConf(new File(configFile).toURI().toURL()); } catch (ConfigurationException e) { - throw new IllegalArgumentException("Failed to load distributedlog configuration from " + configFile + "."); + throw new IllegalArgumentException("Failed to load distributedlog configuration from " + + configFile + "."); } catch (MalformedURLException e) { throw new IllegalArgumentException("Failed to load distributedlog configuration from malformed " + configFile + "."); @@ -130,7 +136,7 @@ public class DistributedLogServerApp { }).or(new NullStatsProvider()); final Optional<String> uriOption = getOptionalStringArg(cmdline, "u"); - Preconditions.checkArgument(uriOption.isPresent(), "No distributedlog uri provided."); + checkArgument(uriOption.isPresent(), "No distributedlog uri provided."); URI dlUri = URI.create(uriOption.get()); DLZkServerSet serverSet = DLZkServerSet.of(dlUri, (int) TimeUnit.SECONDS.toMillis(60)); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java index 5dee7fd..db1346e 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java @@ -47,7 +47,6 @@ import com.twitter.distributedlog.service.placement.PlacementPolicy; import com.twitter.distributedlog.service.placement.ZKPlacementStateManager; import com.twitter.distributedlog.service.stream.BulkWriteOp; import com.twitter.distributedlog.service.stream.DeleteOp; -import com.twitter.distributedlog.service.stream.admin.CreateOp; import com.twitter.distributedlog.service.stream.HeartbeatOp; import com.twitter.distributedlog.service.stream.ReleaseOp; import com.twitter.distributedlog.service.stream.Stream; @@ -59,8 +58,8 @@ import com.twitter.distributedlog.service.stream.StreamOp; import com.twitter.distributedlog.service.stream.StreamOpStats; import com.twitter.distributedlog.service.stream.TruncateOp; import com.twitter.distributedlog.service.stream.WriteOp; -import com.twitter.distributedlog.service.stream.admin.StreamAdminOp; import com.twitter.distributedlog.service.stream.WriteOpWithPayload; +import com.twitter.distributedlog.service.stream.admin.CreateOp; import com.twitter.distributedlog.service.stream.admin.StreamAdminOp; import com.twitter.distributedlog.service.stream.limiter.ServiceRequestLimiter; import com.twitter.distributedlog.service.streamset.StreamPartitionConverter; @@ -86,7 +85,6 @@ import com.twitter.util.Future; import com.twitter.util.FutureEventListener; import com.twitter.util.ScheduledThreadPoolTimer; import com.twitter.util.Timer; - import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; @@ -96,7 +94,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; - import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.feature.FeatureProvider; import org.apache.bookkeeper.stats.Counter; @@ -105,15 +102,17 @@ import org.apache.bookkeeper.stats.StatsLogger; import org.jboss.netty.util.HashedWheelTimer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.runtime.BoxedUnit; +/** + * Implementation of distributedlog thrift service. + */ public class DistributedLogServiceImpl implements DistributedLogService.ServiceIface, FatalErrorHandler { - static final Logger logger = LoggerFactory.getLogger(DistributedLogServiceImpl.class); + private static final Logger logger = LoggerFactory.getLogger(DistributedLogServiceImpl.class); - private final int MOVING_AVERAGE_WINDOW_SECS = 60; + private static final int MOVING_AVERAGE_WINDOW_SECS = 60; private final ServerConfiguration serverConfig; private final DistributedLogConfiguration dlConfig; @@ -294,8 +293,8 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI @Override public Number getSample() { - return ServerStatus.DOWN == serverStatus ? -1 : (featureRegionStopAcceptNewStream.isAvailable() ? - 3 : (ServerStatus.WRITE_AND_ACCEPT == serverStatus ? 1 : 2)); + return ServerStatus.DOWN == serverStatus ? -1 : (featureRegionStopAcceptNewStream.isAvailable() + ? 3 : (ServerStatus.WRITE_AND_ACCEPT == serverStatus ? 1 : 2)); } }; this.movingAvgRpsGauge = new Gauge<Number>() { @@ -364,8 +363,9 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI streamsStatsLogger.registerGauge("cached", this.streamCachedGauge); // Setup complete - logger.info("Running distributedlog server : client id {}, allocator pool {}, perstream stat {}, dlsn version {}.", - new Object[] { clientId, allocatorPoolName, serverConf.isPerStreamStatEnabled(), dlsnVersion }); + logger.info("Running distributedlog server : client id {}, allocator pool {}, perstream stat {}," + + " dlsn version {}.", + new Object[] { clientId, allocatorPoolName, serverConf.isPerStreamStatEnabled(), dlsnVersion }); } private void countStatusCode(StatusCode code) { @@ -440,7 +440,9 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI } @Override - public Future<BulkWriteResponse> writeBulkWithContext(final String stream, List<ByteBuffer> data, WriteContext ctx) { + public Future<BulkWriteResponse> writeBulkWithContext(final String stream, + List<ByteBuffer> data, + WriteContext ctx) { bulkWritePendingStat.inc(); receivedRecordCounter.add(data.size()); BulkWriteOp op = new BulkWriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter, @@ -480,8 +482,14 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI @Override public Future<WriteResponse> truncate(String stream, String dlsn, WriteContext ctx) { - TruncateOp op = new TruncateOp(stream, DLSN.deserialize(dlsn), statsLogger, perStreamStatsLogger, getChecksum(ctx), - featureChecksumDisabled, accessControlManager); + TruncateOp op = new TruncateOp( + stream, + DLSN.deserialize(dlsn), + statsLogger, + perStreamStatsLogger, + getChecksum(ctx), + featureChecksumDisabled, + accessControlManager); executeStreamOp(op); return op.result(); } @@ -730,14 +738,14 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI } /** - * clean up the gauge before we close to help GC + * clean up the gauge before we close to help GC. */ private void unregisterGauge(){ - this.statsLogger.unregisterGauge("proxy_status",this.proxyStatusGauge); - this.statsLogger.unregisterGauge("moving_avg_rps",this.movingAvgRpsGauge); - this.statsLogger.unregisterGauge("moving_avg_bps",this.movingAvgBpsGauge); - this.statsLogger.unregisterGauge("acquired",this.streamAcquiredGauge); - this.statsLogger.unregisterGauge("cached",this.streamCachedGauge); + this.statsLogger.unregisterGauge("proxy_status", this.proxyStatusGauge); + this.statsLogger.unregisterGauge("moving_avg_rps", this.movingAvgRpsGauge); + this.statsLogger.unregisterGauge("moving_avg_bps", this.movingAvgBpsGauge); + this.statsLogger.unregisterGauge("acquired", this.streamAcquiredGauge); + this.statsLogger.unregisterGauge("cached", this.streamCachedGauge); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/FatalErrorHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/FatalErrorHandler.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/FatalErrorHandler.java index e0a15e6..d6922b9 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/FatalErrorHandler.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/FatalErrorHandler.java @@ -17,13 +17,6 @@ */ package com.twitter.distributedlog.service; -import com.google.common.base.Optional; -import com.twitter.util.Future; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - /** * Implement handling for an unrecoverable error. */ http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java index 7edb778..4ff5b87 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java @@ -17,9 +17,9 @@ */ package com.twitter.distributedlog.service; +import static com.google.common.base.Preconditions.checkArgument; import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.collect.Sets; import com.google.common.hash.HashFunction; @@ -41,14 +41,6 @@ import com.twitter.finagle.stats.StatsReceiver; import com.twitter.finagle.thrift.ClientId$; import com.twitter.util.Duration; import com.twitter.util.FutureEventListener; - -import org.apache.bookkeeper.stats.Gauge; -import org.apache.bookkeeper.stats.StatsProvider; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.File; import java.io.IOException; import java.net.MalformedURLException; @@ -65,10 +57,19 @@ import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.stats.Gauge; +import org.apache.bookkeeper.stats.StatsProvider; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +/** + * Monitor Service. + */ public class MonitorService implements NamespaceListener { - static final Logger logger = LoggerFactory.getLogger(MonitorService.class); + private static final Logger logger = LoggerFactory.getLogger(MonitorService.class); private DistributedLogNamespace dlNamespace = null; private MonitorServiceClient dlClient = null; @@ -271,9 +272,9 @@ public class MonitorService implements NamespaceListener { } public void runServer() throws IllegalArgumentException, IOException { - Preconditions.checkArgument(uriArg.isPresent(), + checkArgument(uriArg.isPresent(), "No distributedlog uri provided."); - Preconditions.checkArgument(serverSetArg.isPresent(), + checkArgument(serverSetArg.isPresent(), "No proxy server set provided."); if (intervalArg.isPresent()) { interval = intervalArg.get(); @@ -422,7 +423,7 @@ public class MonitorService implements NamespaceListener { } /** - * Close the server + * Close the server. */ public void close() { logger.info("Closing monitor service."); @@ -459,7 +460,7 @@ public class MonitorService implements NamespaceListener { } /** - * clean up the gauge before we close to help GC + * clean up the gauge before we close to help GC. */ private void unregisterGauge(){ statsProvider.getStatsLogger("monitor").unregisterGauge("num_streams", numOfStreamsGauge); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java index a51a6a9..b5b4ca8 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java @@ -17,8 +17,13 @@ */ package com.twitter.distributedlog.service; +import static com.twitter.distributedlog.util.CommandLineUtils.getOptionalBooleanArg; +import static com.twitter.distributedlog.util.CommandLineUtils.getOptionalIntegerArg; +import static com.twitter.distributedlog.util.CommandLineUtils.getOptionalStringArg; + import com.twitter.finagle.stats.NullStatsReceiver; import com.twitter.finagle.stats.StatsReceiver; +import java.io.IOException; import org.apache.bookkeeper.stats.NullStatsProvider; import org.apache.bookkeeper.stats.StatsProvider; import org.apache.bookkeeper.util.ReflectionUtils; @@ -30,15 +35,16 @@ import org.apache.commons.cli.ParseException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - -import static com.twitter.distributedlog.util.CommandLineUtils.*; +/** + * The launcher to run monitor service. + */ public class MonitorServiceApp { - static final Logger logger = LoggerFactory.getLogger(MonitorServiceApp.class); + private static final Logger logger = LoggerFactory.getLogger(MonitorServiceApp.class); + + static final String USAGE = "MonitorService [-u <uri>] [-c <conf>] [-s serverset]"; - final static String USAGE = "MonitorService [-u <uri>] [-c <conf>] [-s serverset]"; final String[] args; final Options options = new Options(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ServerFeatureKeys.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ServerFeatureKeys.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ServerFeatureKeys.java index 798dcf5..d779cd0 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ServerFeatureKeys.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ServerFeatureKeys.java @@ -18,7 +18,7 @@ package com.twitter.distributedlog.service; /** - * List of feature keys used by distributedlog server + * List of feature keys used by distributedlog server. */ public enum ServerFeatureKeys { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/StatsFilter.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/StatsFilter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/StatsFilter.java index 6c570f6..bd0a992 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/StatsFilter.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/StatsFilter.java @@ -18,17 +18,13 @@ package com.twitter.distributedlog.service; import com.google.common.base.Stopwatch; - import com.twitter.finagle.Service; import com.twitter.finagle.SimpleFilter; import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; - -import org.apache.bookkeeper.stats.StatsLogger; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.OpStatsLogger; - -import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.stats.StatsLogger; /** * Track distributedlog server finagle-service stats. http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/Announcer.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/Announcer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/Announcer.java index 89e0665..cb37088 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/Announcer.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/Announcer.java @@ -20,7 +20,7 @@ package com.twitter.distributedlog.service.announcer; import java.io.IOException; /** - * Announce service information + * Announce service information. */ public interface Announcer { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/NOPAnnouncer.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/NOPAnnouncer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/NOPAnnouncer.java index c686408..471f954 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/NOPAnnouncer.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/NOPAnnouncer.java @@ -19,6 +19,9 @@ package com.twitter.distributedlog.service.announcer; import java.io.IOException; +/** + * A no-op implementation of {@link Announcer}. + */ public class NOPAnnouncer implements Announcer { @Override public void announce() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/ServerSetAnnouncer.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/ServerSetAnnouncer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/ServerSetAnnouncer.java index ada8710..eaf4c26 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/ServerSetAnnouncer.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/ServerSetAnnouncer.java @@ -20,9 +20,6 @@ package com.twitter.distributedlog.service.announcer; import com.twitter.common.zookeeper.Group; import com.twitter.common.zookeeper.ServerSet; import com.twitter.distributedlog.client.serverset.DLZkServerSet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -30,10 +27,15 @@ import java.net.URI; import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +/** + * ServerSet based announcer. + */ public class ServerSetAnnouncer implements Announcer { - static final Logger logger = LoggerFactory.getLogger(ServerSetAnnouncer.class); + private static final Logger logger = LoggerFactory.getLogger(ServerSetAnnouncer.class); final String localAddr; final InetSocketAddress serviceEndpoint; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/package-info.java new file mode 100644 index 0000000..bca36df --- /dev/null +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Announcers to announce servers to server set. + */ +package com.twitter.distributedlog.service.announcer; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/Balancer.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/Balancer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/Balancer.java index 9dd84d0..3ffe54b 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/Balancer.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/Balancer.java @@ -20,6 +20,11 @@ package com.twitter.distributedlog.service.balancer; import com.google.common.base.Optional; import com.google.common.util.concurrent.RateLimiter; +/** + * Balancer Interface. + * + * <p>A balancer is used for balance the streams across the proxy cluster. + */ public interface Balancer { /** http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java index 2e49d92..48430df 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java @@ -17,8 +17,9 @@ */ package com.twitter.distributedlog.service.balancer; +import static com.google.common.base.Preconditions.checkArgument; + import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import com.google.common.util.concurrent.RateLimiter; import com.twitter.common.zookeeper.ServerSet; import com.twitter.distributedlog.client.monitor.MonitorServiceClient; @@ -33,6 +34,8 @@ import com.twitter.finagle.builder.ClientBuilder; import com.twitter.finagle.thrift.ClientId$; import com.twitter.util.Await; import com.twitter.util.Duration; +import java.net.InetSocketAddress; +import java.net.URI; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; @@ -40,15 +43,12 @@ import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.InetSocketAddress; -import java.net.URI; - /** - * Tool to rebalance cluster + * Tool to rebalance cluster. */ public class BalancerTool extends Tool { - static final Logger logger = LoggerFactory.getLogger(BalancerTool.class); + private static final Logger logger = LoggerFactory.getLogger(BalancerTool.class); static DistributedLogClientBuilder createDistributedLogClientBuilder(ServerSet serverSet) { return DistributedLogClientBuilder.newBuilder() @@ -66,6 +66,9 @@ public class BalancerTool extends Tool { .failFast(false)); } + /** + * Base Command to run balancer. + */ protected abstract static class BalancerCommand extends OptsCommand { protected Options options = new Options(); @@ -78,7 +81,8 @@ public class BalancerTool extends Tool { BalancerCommand(String name, String description) { super(name, description); options.addOption("rwm", "rebalance-water-mark", true, "Rebalance water mark per proxy"); - options.addOption("rtp", "rebalance-tolerance-percentage", true, "Rebalance tolerance percentage per proxy"); + options.addOption("rtp", "rebalance-tolerance-percentage", true, + "Rebalance tolerance percentage per proxy"); options.addOption("rc", "rebalance-concurrency", true, "Concurrency to rebalance stream distribution"); options.addOption("r", "rate", true, "Rebalance rate"); } @@ -105,11 +109,11 @@ public class BalancerTool extends Tool { if (cmdline.hasOption("r")) { this.rate = Double.parseDouble(cmdline.getOptionValue("r")); } - Preconditions.checkArgument(rebalanceWaterMark >= 0, + checkArgument(rebalanceWaterMark >= 0, "Rebalance Water Mark should be a non-negative number"); - Preconditions.checkArgument(rebalanceTolerancePercentage >= 0.0f, + checkArgument(rebalanceTolerancePercentage >= 0.0f, "Rebalance Tolerance Percentage should be a non-negative number"); - Preconditions.checkArgument(rebalanceConcurrency > 0, + checkArgument(rebalanceConcurrency > 0, "Rebalance Concurrency should be a positive number"); if (null == rate || rate <= 0.0f) { rateLimiter = Optional.absent(); @@ -133,6 +137,9 @@ public class BalancerTool extends Tool { protected abstract int executeCommand(CommandLine cmdline) throws Exception; } + /** + * Command to balance streams within a cluster. + */ protected static class ClusterBalancerCommand extends BalancerCommand { protected URI uri; @@ -188,7 +195,11 @@ public class BalancerTool extends Tool { ClusterBalancer balancer) throws Exception { if (null == source) { - balancer.balance(rebalanceWaterMark, rebalanceTolerancePercentage, rebalanceConcurrency, getRateLimiter()); + balancer.balance( + rebalanceWaterMark, + rebalanceTolerancePercentage, + rebalanceConcurrency, + getRateLimiter()); } else { balanceFromSource(clientBuilder, balancer, source, getRateLimiter()); } @@ -217,6 +228,9 @@ public class BalancerTool extends Tool { } } + /** + * Command to balance streams between regions. + */ protected static class RegionBalancerCommand extends BalancerCommand { protected URI region1; @@ -288,7 +302,11 @@ public class BalancerTool extends Tool { protected int runBalancer(SimpleBalancer balancer) throws Exception { if (null == source) { - balancer.balance(rebalanceWaterMark, rebalanceTolerancePercentage, rebalanceConcurrency, getRateLimiter()); + balancer.balance( + rebalanceWaterMark, + rebalanceTolerancePercentage, + rebalanceConcurrency, + getRateLimiter()); } else { balancer.balanceAll(source, rebalanceConcurrency, getRateLimiter()); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/ClusterBalancer.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/ClusterBalancer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/ClusterBalancer.java index 6fef648..3a3dc1f 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/ClusterBalancer.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/ClusterBalancer.java @@ -28,10 +28,6 @@ import com.twitter.util.Await; import com.twitter.util.Function; import com.twitter.util.Future; import com.twitter.util.FutureEventListener; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.Serializable; import java.net.SocketAddress; import java.util.ArrayList; @@ -42,13 +38,16 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * A balancer balances ownerships with a cluster of targets + * A balancer balances ownerships with a cluster of targets. */ public class ClusterBalancer implements Balancer { - static final Logger logger = LoggerFactory.getLogger(ClusterBalancer.class); + private static final Logger logger = LoggerFactory.getLogger(ClusterBalancer.class); /** * Represent a single host. Ordered by number of streams in desc order. @@ -205,7 +204,8 @@ public class ClusterBalancer implements Balancer { } int moveFromLowWaterMark; - int moveToHighWaterMark = Math.max(1, (int) (averageLoad + averageLoad * rebalanceTolerancePercentage / 100.0f)); + int moveToHighWaterMark = + Math.max(1, (int) (averageLoad + averageLoad * rebalanceTolerancePercentage / 100.0f)); if (hostIdxMoveFrom >= 0) { moveFromLowWaterMark = Math.max(0, rebalanceWaterMark); @@ -220,7 +220,8 @@ public class ClusterBalancer implements Balancer { AtomicInteger moveFrom = new AtomicInteger(0); AtomicInteger moveTo = new AtomicInteger(hosts.size() - 1); while (moveFrom.get() < moveTo.get()) { - moveStreams(hosts, moveFrom, moveFromLowWaterMark, moveTo, moveToHighWaterMark, rebalanceRateLimiter); + moveStreams(hosts, moveFrom, moveFromLowWaterMark, + moveTo, moveToHighWaterMark, rebalanceRateLimiter); moveFrom.incrementAndGet(); } } @@ -244,8 +245,14 @@ public class ClusterBalancer implements Balancer { } if (logger.isDebugEnabled()) { - logger.debug("Moving streams : hosts = {}, from = {}, to = {} : from_low_water_mark = {}, to_high_water_mark = {}", - new Object[] { hosts, hostIdxMoveFrom.get(), hostIdxMoveTo.get(), moveFromLowWaterMark, moveToHighWaterMark }); + logger.debug("Moving streams : hosts = {}, from = {}, to = {} :" + + " from_low_water_mark = {}, to_high_water_mark = {}", + new Object[] { + hosts, + hostIdxMoveFrom.get(), + hostIdxMoveTo.get(), + moveFromLowWaterMark, + moveToHighWaterMark }); } Host hostMoveFrom = hosts.get(hostIdxMoveFrom.get()); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/CountBasedStreamChooser.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/CountBasedStreamChooser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/CountBasedStreamChooser.java index 0a267ea..fab37b3 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/CountBasedStreamChooser.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/CountBasedStreamChooser.java @@ -17,8 +17,7 @@ */ package com.twitter.distributedlog.service.balancer; -import com.google.common.base.Preconditions; -import org.apache.commons.lang3.tuple.Pair; +import static com.google.common.base.Preconditions.checkArgument; import java.io.Serializable; import java.net.SocketAddress; @@ -29,7 +28,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.commons.lang3.tuple.Pair; +/** + * A stream chooser based on number of streams. + */ class CountBasedStreamChooser implements StreamChooser, Serializable, Comparator<Pair<SocketAddress, LinkedList<String>>> { @@ -46,7 +49,7 @@ class CountBasedStreamChooser implements StreamChooser, Serializable, int next; CountBasedStreamChooser(Map<SocketAddress, Set<String>> streams) { - Preconditions.checkArgument(streams.size() > 0, "Only support no-empty streams distribution"); + checkArgument(streams.size() > 0, "Only support no-empty streams distribution"); streamsDistribution = new ArrayList<Pair<SocketAddress, LinkedList<String>>>(streams.size()); for (Map.Entry<SocketAddress, Set<String>> entry : streams.entrySet()) { LinkedList<String> randomizedStreams = new LinkedList<String>(entry.getValue()); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/LimitedStreamChooser.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/LimitedStreamChooser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/LimitedStreamChooser.java index d0e294d..069e596 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/LimitedStreamChooser.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/LimitedStreamChooser.java @@ -17,8 +17,18 @@ */ package com.twitter.distributedlog.service.balancer; +/** + * A stream chooser that can only choose limited number of streams. + */ public class LimitedStreamChooser implements StreamChooser { + /** + * Create a limited stream chooser by {@code limit}. + * + * @param underlying the underlying stream chooser. + * @param limit the limit of number of streams to choose. + * @return the limited stream chooser. + */ public static LimitedStreamChooser of(StreamChooser underlying, int limit) { return new LimitedStreamChooser(underlying, limit); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/SimpleBalancer.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/SimpleBalancer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/SimpleBalancer.java index 6913e4a..b205d5f 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/SimpleBalancer.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/SimpleBalancer.java @@ -21,9 +21,6 @@ import com.google.common.base.Optional; import com.google.common.util.concurrent.RateLimiter; import com.twitter.distributedlog.client.monitor.MonitorServiceClient; import com.twitter.distributedlog.service.DistributedLogClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.net.SocketAddress; import java.util.HashMap; import java.util.Map; @@ -31,13 +28,15 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A balancer balances ownerships between two targets. */ public class SimpleBalancer implements Balancer { - static final Logger logger = LoggerFactory.getLogger(SimpleBalancer.class); + private static final Logger logger = LoggerFactory.getLogger(SimpleBalancer.class); protected final String target1; protected final String target2; @@ -120,8 +119,8 @@ public class SimpleBalancer implements Balancer { loadDistribution.put(target, targetStreamCount); // Calculate how many streams to be rebalanced from src region to target region - int numStreamsToRebalance = - BalancerUtils.calculateNumStreamsToRebalance(source, loadDistribution, rebalanceWaterMark, rebalanceTolerancePercentage); + int numStreamsToRebalance = BalancerUtils.calculateNumStreamsToRebalance( + source, loadDistribution, rebalanceWaterMark, rebalanceTolerancePercentage); if (numStreamsToRebalance <= 0) { logger.info("No streams need to be rebalanced from '{}' to '{}'.", source, target); @@ -130,7 +129,8 @@ public class SimpleBalancer implements Balancer { StreamChooser streamChooser = LimitedStreamChooser.of(new CountBasedStreamChooser(srcDistribution), numStreamsToRebalance); - StreamMover streamMover = new StreamMoverImpl(source, srcClient, srcMonitor, target, targetClient, targetMonitor); + StreamMover streamMover = + new StreamMoverImpl(source, srcClient, srcMonitor, target, targetClient, targetMonitor); moveStreams(streamChooser, streamMover, rebalanceConcurrency, rebalanceRateLimiter); } @@ -166,7 +166,8 @@ public class SimpleBalancer implements Balancer { } StreamChooser streamChooser = new CountBasedStreamChooser(distribution); - StreamMover streamMover = new StreamMoverImpl(source, sourceClient, sourceMonitor, target, targetClient, targetMonitor); + StreamMover streamMover = + new StreamMoverImpl(source, sourceClient, sourceMonitor, target, targetClient, targetMonitor); moveStreams(streamChooser, streamMover, rebalanceConcurrency, rebalanceRateLimiter); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamChooser.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamChooser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamChooser.java index e0fcaf1..d92aef0 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamChooser.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamChooser.java @@ -18,7 +18,7 @@ package com.twitter.distributedlog.service.balancer; /** - * Choose a stream to rebalance + * Choose a stream to rebalance. */ public interface StreamChooser { /** http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMover.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMover.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMover.java index 6857560..6e4205b 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMover.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMover.java @@ -17,10 +17,13 @@ */ package com.twitter.distributedlog.service.balancer; +/** + * A stream mover to move streams between proxies. + */ public interface StreamMover { /** - * Move given stream <i>streamName</i> + * Move given stream <i>streamName</i>. * * @param streamName * stream name to move http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMoverImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMoverImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMoverImpl.java index 75df4ea..fc67fb2 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMoverImpl.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMoverImpl.java @@ -48,7 +48,7 @@ public class StreamMoverImpl implements StreamMover { } /** - * Move given stream <i>streamName</i> + * Move given stream <i>streamName</i>. * * @param streamName * stream name to move http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/package-info.java new file mode 100644 index 0000000..4ae8d44 --- /dev/null +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Balancer to move streams around to balance the traffic. + */ +package com.twitter.distributedlog.service.balancer; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/DefaultStreamConfigProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/DefaultStreamConfigProvider.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/DefaultStreamConfigProvider.java index d612195..b45b798 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/DefaultStreamConfigProvider.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/DefaultStreamConfigProvider.java @@ -25,34 +25,41 @@ import com.twitter.distributedlog.config.ConfigurationSubscription; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; import com.twitter.distributedlog.config.FileConfigurationBuilder; import com.twitter.distributedlog.config.PropertiesConfigurationBuilder; -import org.apache.commons.configuration.ConfigurationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.File; import java.net.MalformedURLException; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.commons.configuration.ConfigurationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * For all streams return the same dynamic config based on configFile. */ public class DefaultStreamConfigProvider implements StreamConfigProvider { - static final Logger LOG = LoggerFactory.getLogger(DefaultStreamConfigProvider.class); + + private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamConfigProvider.class); private final Optional<DynamicDistributedLogConfiguration> dynConf; private final ConfigurationSubscription confSub; - public DefaultStreamConfigProvider(String configFilePath, ScheduledExecutorService executorService, int reloadPeriod, - TimeUnit reloadUnit) throws ConfigurationException { + public DefaultStreamConfigProvider(String configFilePath, + ScheduledExecutorService executorService, + int reloadPeriod, + TimeUnit reloadUnit) + throws ConfigurationException { try { File configFile = new File(configFilePath); - FileConfigurationBuilder properties = new PropertiesConfigurationBuilder(configFile.toURI().toURL()); - ConcurrentConstConfiguration defaultConf = new ConcurrentConstConfiguration(new DistributedLogConfiguration()); - DynamicDistributedLogConfiguration conf = new DynamicDistributedLogConfiguration(defaultConf); + FileConfigurationBuilder properties = + new PropertiesConfigurationBuilder(configFile.toURI().toURL()); + ConcurrentConstConfiguration defaultConf = + new ConcurrentConstConfiguration(new DistributedLogConfiguration()); + DynamicDistributedLogConfiguration conf = + new DynamicDistributedLogConfiguration(defaultConf); List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(properties); - confSub = new ConfigurationSubscription(conf, fileConfigBuilders, executorService, reloadPeriod, reloadUnit); + confSub = new ConfigurationSubscription( + conf, fileConfigBuilders, executorService, reloadPeriod, reloadUnit); this.dynConf = Optional.of(conf); } catch (MalformedURLException ex) { throw new ConfigurationException(ex); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java index 5b19f6c..b3b4c4e 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java @@ -17,7 +17,8 @@ */ package com.twitter.distributedlog.service.config; -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkArgument; + import com.twitter.distributedlog.DLSN; import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.DistributedLogConstants; @@ -29,7 +30,7 @@ import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.SystemConfiguration; /** - * Configuration for DistributedLog Server + * Configuration for DistributedLog Server. */ public class ServerConfiguration extends CompositeConfiguration { @@ -43,36 +44,36 @@ public class ServerConfiguration extends CompositeConfiguration { } // Server DLSN version - protected final static String SERVER_DLSN_VERSION = "server_dlsn_version"; - protected final static byte SERVER_DLSN_VERSION_DEFAULT = DLSN.VERSION1; + protected static final String SERVER_DLSN_VERSION = "server_dlsn_version"; + protected static final byte SERVER_DLSN_VERSION_DEFAULT = DLSN.VERSION1; // Server Durable Write Enable/Disable Flag - protected final static String SERVER_DURABLE_WRITE_ENABLED = "server_durable_write_enabled"; - protected final static boolean SERVER_DURABLE_WRITE_ENABLED_DEFAULT = true; + protected static final String SERVER_DURABLE_WRITE_ENABLED = "server_durable_write_enabled"; + protected static final boolean SERVER_DURABLE_WRITE_ENABLED_DEFAULT = true; // Server Region Id - protected final static String SERVER_REGION_ID = "server_region_id"; - protected final static int SERVER_REGION_ID_DEFAULT = DistributedLogConstants.LOCAL_REGION_ID; + protected static final String SERVER_REGION_ID = "server_region_id"; + protected static final int SERVER_REGION_ID_DEFAULT = DistributedLogConstants.LOCAL_REGION_ID; // Server Port - protected final static String SERVER_PORT = "server_port"; - protected final static int SERVER_PORT_DEFAULT = 0; + protected static final String SERVER_PORT = "server_port"; + protected static final int SERVER_PORT_DEFAULT = 0; // Server Shard Id - protected final static String SERVER_SHARD_ID = "server_shard"; - protected final static int SERVER_SHARD_ID_DEFAULT = -1; + protected static final String SERVER_SHARD_ID = "server_shard"; + protected static final int SERVER_SHARD_ID_DEFAULT = -1; // Server Threads - protected final static String SERVER_NUM_THREADS = "server_threads"; - protected final static int SERVER_NUM_THREADS_DEFAULT = Runtime.getRuntime().availableProcessors(); + protected static final String SERVER_NUM_THREADS = "server_threads"; + protected static final int SERVER_NUM_THREADS_DEFAULT = Runtime.getRuntime().availableProcessors(); // Server enable per stream stat - protected final static String SERVER_ENABLE_PERSTREAM_STAT = "server_enable_perstream_stat"; - protected final static boolean SERVER_ENABLE_PERSTREAM_STAT_DEFAULT = true; + protected static final String SERVER_ENABLE_PERSTREAM_STAT = "server_enable_perstream_stat"; + protected static final boolean SERVER_ENABLE_PERSTREAM_STAT_DEFAULT = true; // Server graceful shutdown period (in millis) - protected final static String SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS = "server_graceful_shutdown_period_ms"; - protected final static long SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS_DEFAULT = 0L; + protected static final String SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS = "server_graceful_shutdown_period_ms"; + protected static final long SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS_DEFAULT = 0L; // Server service timeout public static final String SERVER_SERVICE_TIMEOUT_MS = "server_service_timeout_ms"; @@ -86,17 +87,18 @@ public class ServerConfiguration extends CompositeConfiguration { // Server stream probation timeout public static final String SERVER_STREAM_PROBATION_TIMEOUT_MS = "server_stream_probation_timeout_ms"; public static final String SERVER_STREAM_PROBATION_TIMEOUT_MS_OLD = "streamProbationTimeoutMs"; - public static final long SERVER_STREAM_PROBATION_TIMEOUT_MS_DEFAULT = 60*1000*5; + public static final long SERVER_STREAM_PROBATION_TIMEOUT_MS_DEFAULT = 60 * 1000 * 5; // Server stream to partition converter - protected final static String SERVER_STREAM_PARTITION_CONVERTER_CLASS = "stream_partition_converter_class"; + protected static final String SERVER_STREAM_PARTITION_CONVERTER_CLASS = "stream_partition_converter_class"; // Use hostname as the allocator pool name - protected static final String SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME - = "server_use_hostname_as_allocator_pool_name"; + protected static final String SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME = + "server_use_hostname_as_allocator_pool_name"; protected static final boolean SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME_DEFAULT = false; //Configure refresh interval for calculating resource placement in seconds - public static final String SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S = "server_resource_placement_refresh_interval_sec"; + public static final String SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S = + "server_resource_placement_refresh_interval_sec"; public static final int SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_DEFAULT = 120; public ServerConfiguration() { @@ -105,7 +107,7 @@ public class ServerConfiguration extends CompositeConfiguration { } /** - * Load configurations from {@link DistributedLogConfiguration} + * Load configurations from {@link DistributedLogConfiguration}. * * @param dlConf * distributedlog configuration @@ -137,7 +139,7 @@ public class ServerConfiguration extends CompositeConfiguration { } /** - * Set the flag to enable/disable durable write + * Set the flag to enable/disable durable write. * * @param enabled * flag to enable/disable durable write @@ -149,7 +151,7 @@ public class ServerConfiguration extends CompositeConfiguration { } /** - * Is durable write enabled? + * Is durable write enabled. * * @return true if waiting writes to be durable. otherwise false. */ @@ -158,7 +160,7 @@ public class ServerConfiguration extends CompositeConfiguration { } /** - * Set the region id used to instantiate DistributedLogNamespace + * Set the region id used to instantiate DistributedLogNamespace. * * @param regionId * region id @@ -170,8 +172,7 @@ public class ServerConfiguration extends CompositeConfiguration { } /** - * Get the region id used to instantiate - * {@link com.twitter.distributedlog.namespace.DistributedLogNamespace} + * Get the region id used to instantiate {@link com.twitter.distributedlog.namespace.DistributedLogNamespace}. * * @return region id used to instantiate DistributedLogNamespace */ @@ -213,8 +214,9 @@ public class ServerConfiguration extends CompositeConfiguration { } /** - * Get the shard id of this server. It would be used to instantiate the client id - * used for DistributedLogNamespace. + * Get the shard id of this server. + * + * <p>It would be used to instantiate the client id used for DistributedLogNamespace. * * @return shard id of this server. */ @@ -286,7 +288,9 @@ public class ServerConfiguration extends CompositeConfiguration { } /** - * Get timeout for stream op execution in proxy layer. 0 disables timeout. + * Get timeout for stream op execution in proxy layer. + * + * <p>0 disables timeout. * * @return timeout for stream operation in proxy layer. */ @@ -296,7 +300,9 @@ public class ServerConfiguration extends CompositeConfiguration { } /** - * Set timeout for stream op execution in proxy layer. 0 disables timeout. + * Set timeout for stream op execution in proxy layer. + * + * <p>0 disables timeout. * * @param timeoutMs * timeout for stream operation in proxy layer. @@ -308,7 +314,9 @@ public class ServerConfiguration extends CompositeConfiguration { } /** - * Get timeout for closing writer in proxy layer. 0 disables timeout. + * Get timeout for closing writer in proxy layer. + * + * <p>0 disables timeout. * * @return timeout for closing writer in proxy layer. */ @@ -317,7 +325,9 @@ public class ServerConfiguration extends CompositeConfiguration { } /** - * Set timeout for closing writer in proxy layer. 0 disables timeout. + * Set timeout for closing writer in proxy layer. + * + * <p>0 disables timeout. * * @param timeoutMs * timeout for closing writer in proxy layer. @@ -329,8 +339,9 @@ public class ServerConfiguration extends CompositeConfiguration { } /** - * After service timeout, how long should stream be kept in cache in probationary state in order - * to prevent reacquire. In millisec. + * How long should stream be kept in cache in probationary state after service timeout. + * + * <p>The setting is to prevent reacquire. The unit of this setting is milliseconds. * * @return stream probation timeout in ms. */ @@ -340,10 +351,12 @@ public class ServerConfiguration extends CompositeConfiguration { } /** - * After service timeout, how long should stream be kept in cache in probationary state in order - * to prevent reacquire. In millisec. + * How long should stream be kept in cache in probationary state after service timeout. + * + * <p>The setting is to prevent reacquire. The unit of this setting is milliseconds. * * @param timeoutMs probation timeout in ms. + * @return server configuration */ public ServerConfiguration setStreamProbationTimeoutMs(long timeoutMs) { setProperty(SERVER_STREAM_PROBATION_TIMEOUT_MS, timeoutMs); @@ -357,7 +370,8 @@ public class ServerConfiguration extends CompositeConfiguration { * stream partition converter class * @return server configuration */ - public ServerConfiguration setStreamPartitionConverterClass(Class<? extends StreamPartitionConverter> converterClass) { + public ServerConfiguration setStreamPartitionConverterClass( + Class<? extends StreamPartitionConverter> converterClass) { setProperty(SERVER_STREAM_PARTITION_CONVERTER_CLASS, converterClass.getName()); return this; } @@ -391,7 +405,7 @@ public class ServerConfiguration extends CompositeConfiguration { } /** - * Get if use hostname as the allocator pool name + * Get if use hostname as the allocator pool name. * * @return true if use hostname as the allocator pool name. otherwise, use * {@link #getServerShardId()} as the allocator pool name. @@ -412,15 +426,17 @@ public class ServerConfiguration extends CompositeConfiguration { } /** - * Validate the configuration + * Validate the configuration. + * + * @throws IllegalStateException when there are any invalid settings. */ public void validate() { byte dlsnVersion = getDlsnVersion(); - Preconditions.checkArgument(dlsnVersion >= DLSN.VERSION0 && dlsnVersion <= DLSN.VERSION1, + checkArgument(dlsnVersion >= DLSN.VERSION0 && dlsnVersion <= DLSN.VERSION1, "Unknown dlsn version " + dlsnVersion); - Preconditions.checkArgument(getServerThreads() > 0, + checkArgument(getServerThreads() > 0, "Invalid number of server threads : " + getServerThreads()); - Preconditions.checkArgument(getServerShardId() >= 0, + checkArgument(getServerShardId() >= 0, "Invalid server shard id : " + getServerShardId()); }