http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java index 36904fd..55c1b48 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java @@ -20,11 +20,11 @@ package com.twitter.distributedlog.service.stream; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Stopwatch; -import com.twitter.distributedlog.exceptions.AlreadyClosedException; import com.twitter.distributedlog.AsyncLogWriter; import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.DistributedLogManager; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; +import com.twitter.distributedlog.exceptions.AlreadyClosedException; import com.twitter.distributedlog.exceptions.DLException; import com.twitter.distributedlog.exceptions.OverCapacityException; import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException; @@ -51,6 +51,12 @@ import com.twitter.util.FutureEventListener; import com.twitter.util.Promise; import com.twitter.util.TimeoutException; import com.twitter.util.Timer; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.feature.FeatureProvider; import org.apache.bookkeeper.stats.Counter; @@ -65,20 +71,17 @@ import org.slf4j.LoggerFactory; import scala.runtime.AbstractFunction1; import scala.runtime.BoxedUnit; -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantReadWriteLock; - +/** + * Implementation of {@link Stream}. + */ public class StreamImpl implements Stream { - static final Logger logger = LoggerFactory.getLogger(StreamImpl.class); + + private static final Logger logger = LoggerFactory.getLogger(StreamImpl.class); /** * The status of the stream. * - * The status change of the stream should just go in one direction. If a stream hits + * <p>The status change of the stream should just go in one direction. If a stream hits * any error, the stream should be put in error state. If a stream is in error state, * it should be removed and not reused anymore. */ @@ -405,7 +408,7 @@ public class StreamImpl implements Stream { // Stream is closed, fail the op immediately op.fail(new StreamUnavailableException("Stream " + name + " is closed.")); return; - } if (StreamStatus.INITIALIZED == status) { + } else if (StreamStatus.INITIALIZED == status) { completeOpNow = true; success = true; } else if (failFastOnStreamNotReady) { @@ -551,7 +554,8 @@ public class StreamImpl implements Stream { Future<Boolean> acquireStream() { final Stopwatch stopwatch = Stopwatch.createStarted(); final Promise<Boolean> acquirePromise = new Promise<Boolean>(); - manager.openAsyncLogWriter().addEventListener(FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<AsyncLogWriter>() { + manager.openAsyncLogWriter().addEventListener( + FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<AsyncLogWriter>() { @Override public void onSuccess(AsyncLogWriter w) { @@ -748,8 +752,8 @@ public class StreamImpl implements Stream { final boolean abort; closeLock.writeLock().lock(); try { - if (StreamStatus.CLOSING == status || - StreamStatus.CLOSED == status) { + if (StreamStatus.CLOSING == status + || StreamStatus.CLOSED == status) { return closePromise; } logger.info("Request to close stream {} : {}", getStreamName(), reason); @@ -875,7 +879,7 @@ public class StreamImpl implements Stream { } /** - * clean up the gauge to help GC + * clean up the gauge to help GC. */ private void unregisterGauge(){ streamLogger.unregisterGauge("stream_status", this.streamStatusGauge);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java index e171e46..7f7d44e 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java @@ -26,9 +26,9 @@ import java.util.Map; /** * Manage lifecycle of streams. * - * StreamManager is responsible for creating, destroying, and keeping track of Stream objects. + * <p>StreamManager is responsible for creating, destroying, and keeping track of Stream objects. * - * Stream objects, which are managed by StreamManager and created by StreamFactory, are essentially the + * <p>Stream objects, which are managed by StreamManager and created by StreamFactory, are essentially the * per stream request handlers, responsible fo dispatching ex. write requests to an underlying AsyncLogWriter, * managing stream lock, interpreting exceptions, error conditions, and etc. */ http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java index df336fe..8b36d3b 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java @@ -55,13 +55,14 @@ import org.slf4j.LoggerFactory; * StreamManagerImpl is the default implementation responsible for creating, destroying, and keeping track * of Streams. * - * StreamFactory, supplied to StreamManagerImpl in the constructor below, is reposible simply for creating + * <p>StreamFactory, supplied to StreamManagerImpl in the constructor below, is reposible simply for creating * a stream object in isolation from the rest of the system. We pass a StreamFactory in instead of simply * creating StreamImpl's ourselves in order to inject dependencies without bloating the StreamManagerImpl * constructor. */ public class StreamManagerImpl implements StreamManager { - static final Logger logger = LoggerFactory.getLogger(StreamManagerImpl.class); + + private static final Logger logger = LoggerFactory.getLogger(StreamManagerImpl.class); private final ConcurrentHashMap<String, Stream> streams = new ConcurrentHashMap<String, Stream>(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOp.java index de01f9f..a2cbc80 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOp.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOp.java @@ -24,8 +24,6 @@ import com.twitter.distributedlog.thrift.service.ResponseHeader; import com.twitter.distributedlog.util.Sequencer; import com.twitter.util.Future; -import java.nio.ByteBuffer; - /** * An operation applied to a stream. */ http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java index bfbc88c..c1019c6 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java @@ -17,8 +17,8 @@ */ package com.twitter.distributedlog.service.stream; -import com.twitter.distributedlog.stats.BroadCastStatsLogger; import com.twitter.distributedlog.service.streamset.Partition; +import com.twitter.distributedlog.stats.BroadCastStatsLogger; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/TruncateOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/TruncateOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/TruncateOp.java index f453dc2..b0b4df2 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/TruncateOp.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/TruncateOp.java @@ -19,26 +19,27 @@ package com.twitter.distributedlog.service.stream; import com.twitter.distributedlog.AsyncLogWriter; import com.twitter.distributedlog.DLSN; -import com.twitter.distributedlog.util.ProtocolUtils; import com.twitter.distributedlog.acl.AccessControlManager; import com.twitter.distributedlog.exceptions.DLException; import com.twitter.distributedlog.exceptions.RequestDeniedException; import com.twitter.distributedlog.service.ResponseUtils; import com.twitter.distributedlog.thrift.service.WriteResponse; +import com.twitter.distributedlog.util.ProtocolUtils; import com.twitter.distributedlog.util.Sequencer; import com.twitter.util.Future; - -import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.StatsLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.runtime.AbstractFunction1; +/** + * Operation to truncate a log stream. + */ public class TruncateOp extends AbstractWriteOp { - static final Logger logger = LoggerFactory.getLogger(TruncateOp.class); + private static final Logger logger = LoggerFactory.getLogger(TruncateOp.class); private final Counter deniedTruncateCounter; private final DLSN dlsn; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java index e9f2f4e..69739dc 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java @@ -20,35 +20,36 @@ package com.twitter.distributedlog.service.stream; import com.twitter.distributedlog.AsyncLogWriter; import com.twitter.distributedlog.DLSN; import com.twitter.distributedlog.LogRecord; -import com.twitter.distributedlog.util.ProtocolUtils; import com.twitter.distributedlog.acl.AccessControlManager; -import com.twitter.distributedlog.service.config.ServerConfiguration; import com.twitter.distributedlog.exceptions.DLException; import com.twitter.distributedlog.exceptions.RequestDeniedException; import com.twitter.distributedlog.service.ResponseUtils; +import com.twitter.distributedlog.service.config.ServerConfiguration; import com.twitter.distributedlog.service.streamset.Partition; import com.twitter.distributedlog.service.streamset.StreamPartitionConverter; -import com.twitter.distributedlog.thrift.service.WriteResponse; import com.twitter.distributedlog.thrift.service.ResponseHeader; import com.twitter.distributedlog.thrift.service.StatusCode; +import com.twitter.distributedlog.thrift.service.WriteResponse; +import com.twitter.distributedlog.util.ProtocolUtils; import com.twitter.distributedlog.util.Sequencer; -import com.twitter.util.FutureEventListener; import com.twitter.util.Future; - +import com.twitter.util.FutureEventListener; import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; - import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.runtime.AbstractFunction1; +/** + * Operation to write a single record to a log stream. + */ public class WriteOp extends AbstractWriteOp implements WriteOpWithPayload { - static final Logger logger = LoggerFactory.getLogger(WriteOp.class); + + private static final Logger logger = LoggerFactory.getLogger(WriteOp.class); private final byte[] payload; private final boolean isRecordSet; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOpWithPayload.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOpWithPayload.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOpWithPayload.java index eef4811..6cc9063 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOpWithPayload.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOpWithPayload.java @@ -17,6 +17,9 @@ */ package com.twitter.distributedlog.service.stream; +/** + * A write operation with payload. + */ public interface WriteOpWithPayload { // Return the payload size in bytes http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java index 7ac4986..6d2d2ea 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java @@ -21,9 +21,9 @@ import com.twitter.distributedlog.exceptions.DLException; import com.twitter.util.Future; /** - * An admin operation + * Admin operation interface. */ -public interface AdminOp<Response> { +public interface AdminOp<RespT> { /** * Invoked before the stream op is executed. @@ -35,6 +35,6 @@ public interface AdminOp<Response> { * * @return the future represents the response of the operation */ - Future<Response> execute(); + Future<RespT> execute(); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java index 2e1f490..478201e 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java @@ -17,6 +17,8 @@ */ package com.twitter.distributedlog.service.stream.admin; +import static com.twitter.distributedlog.service.stream.AbstractStreamOp.requestStat; + import com.twitter.distributedlog.service.ResponseUtils; import com.twitter.distributedlog.service.stream.StreamManager; import com.twitter.distributedlog.thrift.service.WriteResponse; @@ -25,8 +27,9 @@ import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.stats.StatsLogger; import scala.runtime.AbstractFunction1; -import static com.twitter.distributedlog.service.stream.AbstractStreamOp.requestStat; - +/** + * Operation to create log stream. + */ public class CreateOp extends StreamAdminOp { public CreateOp(String stream, http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java index 37c6e14..4fad542 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java @@ -26,13 +26,12 @@ import com.twitter.distributedlog.thrift.service.WriteResponse; import com.twitter.distributedlog.util.ProtocolUtils; import com.twitter.util.Future; import com.twitter.util.FutureTransformer; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.stats.OpStatsLogger; -import java.util.concurrent.TimeUnit; - /** - * Stream admin op + * Stream admin op. */ public abstract class StreamAdminOp implements AdminOp<WriteResponse> { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/package-info.java new file mode 100644 index 0000000..5ec997c --- /dev/null +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Stream Related Admin Operations. + */ +package com.twitter.distributedlog.service.stream.admin; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/DynamicRequestLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/DynamicRequestLimiter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/DynamicRequestLimiter.java index be8c457..d684de5 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/DynamicRequestLimiter.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/DynamicRequestLimiter.java @@ -20,32 +20,31 @@ package com.twitter.distributedlog.service.stream.limiter; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; import com.twitter.distributedlog.exceptions.OverCapacityException; import com.twitter.distributedlog.limiter.RequestLimiter; - import java.io.Closeable; - import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.commons.configuration.event.ConfigurationEvent; import org.apache.commons.configuration.event.ConfigurationListener; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Dynamically rebuild a rate limiter when the supplied dynamic config changes. Subclasses - * implement build() to build the limiter. DynamicRequestLimiter must be closed to deregister + * Dynamically rebuild a rate limiter when the supplied dynamic config changes. + * + * <p>Subclasses implement build() to build the limiter. DynamicRequestLimiter must be closed to deregister * the config listener. */ -public abstract class DynamicRequestLimiter<Request> implements RequestLimiter<Request>, Closeable { - static final Logger LOG = LoggerFactory.getLogger(DynamicRequestLimiter.class); +public abstract class DynamicRequestLimiter<Req> implements RequestLimiter<Req>, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(DynamicRequestLimiter.class); private final ConfigurationListener listener; private final Feature rateLimitDisabledFeature; - volatile RequestLimiter<Request> limiter; + volatile RequestLimiter<Req> limiter; final DynamicDistributedLogConfiguration dynConf; public DynamicRequestLimiter(DynamicDistributedLogConfiguration dynConf, - StatsLogger statsLogger, Feature rateLimitDisabledFeature) { + StatsLogger statsLogger, + Feature rateLimitDisabledFeature) { final StatsLogger limiterStatsLogger = statsLogger.scope("dynamic"); this.dynConf = dynConf; this.rateLimitDisabledFeature = rateLimitDisabledFeature; @@ -74,7 +73,7 @@ public abstract class DynamicRequestLimiter<Request> implements RequestLimiter<R } @Override - public void apply(Request request) throws OverCapacityException { + public void apply(Req request) throws OverCapacityException { if (rateLimitDisabledFeature.isAvailable()) { return; } @@ -91,5 +90,5 @@ public abstract class DynamicRequestLimiter<Request> implements RequestLimiter<R * Build the underlying limiter. Called when DynamicRequestLimiter detects config has changed. * This may be called multiple times so the method should be cheap. */ - protected abstract RequestLimiter<Request> build(); + protected abstract RequestLimiter<Req> build(); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/RequestLimiterBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/RequestLimiterBuilder.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/RequestLimiterBuilder.java index 157b1ec..c1a37bb 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/RequestLimiterBuilder.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/RequestLimiterBuilder.java @@ -17,26 +17,32 @@ */ package com.twitter.distributedlog.service.stream.limiter; -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkNotNull; + import com.twitter.distributedlog.exceptions.OverCapacityException; import com.twitter.distributedlog.limiter.ComposableRequestLimiter; -import com.twitter.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction; import com.twitter.distributedlog.limiter.ComposableRequestLimiter.CostFunction; +import com.twitter.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction; import com.twitter.distributedlog.limiter.GuavaRateLimiter; import com.twitter.distributedlog.limiter.RateLimiter; import com.twitter.distributedlog.limiter.RequestLimiter; import com.twitter.distributedlog.service.stream.StreamOp; import com.twitter.distributedlog.service.stream.WriteOpWithPayload; - import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; +/** + * Request limiter builder. + */ public class RequestLimiterBuilder { private OverlimitFunction<StreamOp> overlimitFunction = NOP_OVERLIMIT_FUNCTION; private RateLimiter limiter; private CostFunction<StreamOp> costFunction; private StatsLogger statsLogger = NullStatsLogger.INSTANCE; + /** + * Function to calculate the `RPS` (Request per second) cost of a given stream operation. + */ public static final CostFunction<StreamOp> RPS_COST_FUNCTION = new CostFunction<StreamOp>() { @Override public int apply(StreamOp op) { @@ -48,6 +54,9 @@ public class RequestLimiterBuilder { } }; + /** + * Function to calculate the `BPS` (Bytes per second) cost of a given stream operation. + */ public static final CostFunction<StreamOp> BPS_COST_FUNCTION = new CostFunction<StreamOp>() { @Override public int apply(StreamOp op) { @@ -60,6 +69,9 @@ public class RequestLimiterBuilder { } }; + /** + * Function to check if a stream operation will cause {@link OverCapacityException}. + */ public static final OverlimitFunction<StreamOp> NOP_OVERLIMIT_FUNCTION = new OverlimitFunction<StreamOp>() { @Override public void apply(StreamOp op) throws OverCapacityException { @@ -96,9 +108,9 @@ public class RequestLimiterBuilder { } public RequestLimiter<StreamOp> build() { - Preconditions.checkNotNull(limiter); - Preconditions.checkNotNull(overlimitFunction); - Preconditions.checkNotNull(costFunction); + checkNotNull(limiter); + checkNotNull(overlimitFunction); + checkNotNull(costFunction); return new ComposableRequestLimiter(limiter, overlimitFunction, costFunction, statsLogger); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/ServiceRequestLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/ServiceRequestLimiter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/ServiceRequestLimiter.java index 69a8470..a3e1efb 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/ServiceRequestLimiter.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/ServiceRequestLimiter.java @@ -22,9 +22,9 @@ import com.twitter.distributedlog.exceptions.OverCapacityException; import com.twitter.distributedlog.limiter.ChainedRequestLimiter; import com.twitter.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction; import com.twitter.distributedlog.limiter.RequestLimiter; +import com.twitter.distributedlog.rate.MovingAverageRate; import com.twitter.distributedlog.service.stream.StreamManager; import com.twitter.distributedlog.service.stream.StreamOp; -import com.twitter.distributedlog.rate.MovingAverageRate; import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.stats.StatsLogger; @@ -89,8 +89,10 @@ public class ServiceRequestLimiter extends DynamicRequestLimiter<StreamOp> { .limit(bpsSoftServiceLimit); ChainedRequestLimiter.Builder<StreamOp> builder = new ChainedRequestLimiter.Builder<StreamOp>(); - builder.addLimiter(new StreamAcquireLimiter(streamManager, serviceRps, rpsStreamAcquireLimit, limiterStatLogger.scope("rps_acquire"))); - builder.addLimiter(new StreamAcquireLimiter(streamManager, serviceBps, bpsStreamAcquireLimit, limiterStatLogger.scope("bps_acquire"))); + builder.addLimiter(new StreamAcquireLimiter( + streamManager, serviceRps, rpsStreamAcquireLimit, limiterStatLogger.scope("rps_acquire"))); + builder.addLimiter(new StreamAcquireLimiter( + streamManager, serviceBps, bpsStreamAcquireLimit, limiterStatLogger.scope("bps_acquire"))); builder.addLimiter(bpsHardLimiterBuilder.build()); builder.addLimiter(bpsSoftLimiterBuilder.build()); builder.addLimiter(rpsHardLimiterBuilder.build()); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamAcquireLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamAcquireLimiter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamAcquireLimiter.java index a417e81..5015751 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamAcquireLimiter.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamAcquireLimiter.java @@ -20,13 +20,15 @@ package com.twitter.distributedlog.service.stream.limiter; import com.twitter.distributedlog.exceptions.OverCapacityException; import com.twitter.distributedlog.exceptions.TooManyStreamsException; import com.twitter.distributedlog.limiter.RequestLimiter; +import com.twitter.distributedlog.rate.MovingAverageRate; import com.twitter.distributedlog.service.stream.StreamManager; import com.twitter.distributedlog.service.stream.StreamOp; -import com.twitter.distributedlog.rate.MovingAverageRate; - import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.StatsLogger; +/** + * A special limiter on limiting acquiring new streams. + */ public class StreamAcquireLimiter implements RequestLimiter<StreamOp> { private final StreamManager streamManager; private final MovingAverageRate serviceRps; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamRequestLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamRequestLimiter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamRequestLimiter.java index b4836d1..fa601d1 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamRequestLimiter.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamRequestLimiter.java @@ -19,13 +19,16 @@ package com.twitter.distributedlog.service.stream.limiter; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; import com.twitter.distributedlog.exceptions.OverCapacityException; -import com.twitter.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction; import com.twitter.distributedlog.limiter.ChainedRequestLimiter; +import com.twitter.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction; import com.twitter.distributedlog.limiter.RequestLimiter; import com.twitter.distributedlog.service.stream.StreamOp; import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.stats.StatsLogger; +/** + * A dynamic request limiter on limiting stream operations. + */ public class StreamRequestLimiter extends DynamicRequestLimiter<StreamOp> { private final DynamicDistributedLogConfiguration dynConf; private final StatsLogger limiterStatLogger; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/package-info.java new file mode 100644 index 0000000..533c75a --- /dev/null +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Request Rate Limiting. + */ +package com.twitter.distributedlog.service.stream.limiter; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/package-info.java new file mode 100644 index 0000000..389acd9 --- /dev/null +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Stream Related Operations. + */ +package com.twitter.distributedlog.service.stream; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/CacheableStreamPartitionConverter.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/CacheableStreamPartitionConverter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/CacheableStreamPartitionConverter.java index de35fc2..9afd234 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/CacheableStreamPartitionConverter.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/CacheableStreamPartitionConverter.java @@ -20,6 +20,9 @@ package com.twitter.distributedlog.service.streamset; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +/** + * A stream-to-partition converter that caches the mapping between stream and partitions. + */ public abstract class CacheableStreamPartitionConverter implements StreamPartitionConverter { private final ConcurrentMap<String, Partition> partitions; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java index 4576560..d69a393 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java @@ -20,7 +20,7 @@ package com.twitter.distributedlog.service.streamset; import org.apache.commons.lang3.StringUtils; /** - * Stream Partition Converter + * Stream Partition Converter that converts the stream name into a stream-to-partition mapping by delimiter. */ public class DelimiterStreamPartitionConverter extends CacheableStreamPartitionConverter { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java index d199f88..770c631 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java @@ -22,6 +22,7 @@ import com.google.common.base.Objects; /** * `Partition` defines the relationship between a `virtual` stream and a * physical DL stream. + * * <p>A `virtual` stream could be partitioned into multiple partitions * and each partition is effectively a DL stream. */ http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/PartitionMap.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/PartitionMap.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/PartitionMap.java index 9c882a6..1962e5f 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/PartitionMap.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/PartitionMap.java @@ -22,6 +22,9 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +/** + * A mapping between a logical stream and a set of physical partitions. + */ public class PartitionMap { private final Map<String, Set<Partition>> partitionMap; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/package-info.java new file mode 100644 index 0000000..3888e40 --- /dev/null +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * StreamSet - A logical set of streams. + */ +package com.twitter.distributedlog.service.streamset; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/ProxyTool.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/ProxyTool.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/ProxyTool.java index b37de10..8ff2f26 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/ProxyTool.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/ProxyTool.java @@ -30,6 +30,10 @@ import com.twitter.finagle.builder.ClientBuilder; import com.twitter.finagle.thrift.ClientId$; import com.twitter.util.Await; import com.twitter.util.Duration; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; @@ -37,18 +41,16 @@ import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.InetSocketAddress; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; - /** * Tools to interact with proxies. */ public class ProxyTool extends Tool { - static final Logger logger = LoggerFactory.getLogger(ProxyTool.class); + private static final Logger logger = LoggerFactory.getLogger(ProxyTool.class); + /** + * Abstract Cluster level command. + */ protected abstract static class ClusterCommand extends OptsCommand { protected Options options = new Options(); @@ -59,8 +61,8 @@ public class ProxyTool extends Tool { super(name, description); options.addOption("u", "uri", true, "DistributedLog URI"); options.addOption("r", "prefix", true, "Prefix of stream name. E.g. 'QuantumLeapTest-'."); - options.addOption("e", "expression", true, "Expression to generate stream suffix. " + - "Currently we support range '0-9', list '1,2,3' and name '143'"); + options.addOption("e", "expression", true, "Expression to generate stream suffix. " + + "Currently we support range '0-9', list '1,2,3' and name '143'"); } @Override @@ -157,6 +159,9 @@ public class ProxyTool extends Tool { } } + /** + * Command to release ownership of a log stream. + */ static class ReleaseCommand extends ClusterCommand { double rate = 100f; @@ -196,6 +201,9 @@ public class ProxyTool extends Tool { } } + /** + * Command to truncate a log stream. + */ static class TruncateCommand extends ClusterCommand { DLSN dlsn = DLSN.InitialDLSN; @@ -234,6 +242,9 @@ public class ProxyTool extends Tool { } } + /** + * Abstract command to operate on a single proxy server. + */ protected abstract static class ProxyCommand extends OptsCommand { protected Options options = new Options(); @@ -291,6 +302,9 @@ public class ProxyTool extends Tool { protected abstract int runCmd(Pair<DistributedLogClient, MonitorServiceClient> client) throws Exception; } + /** + * Command to enable/disable accepting new streams. + */ static class AcceptNewStreamCommand extends ProxyCommand { boolean enabled = false; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/package-info.java new file mode 100644 index 0000000..1e32fd3 --- /dev/null +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Service related tools. + */ +package com.twitter.distributedlog.service.tools; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/utils/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/utils/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/utils/package-info.java new file mode 100644 index 0000000..e6dcec6 --- /dev/null +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/utils/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Utilities used by proxy servers. + */ +package com.twitter.distributedlog.service.utils; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/CodahaleMetricsServletProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/CodahaleMetricsServletProvider.java b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/CodahaleMetricsServletProvider.java index c08f0f0..8db3e90 100644 --- a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/CodahaleMetricsServletProvider.java +++ b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/CodahaleMetricsServletProvider.java @@ -23,11 +23,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Extend the codahale metrics provider to run servlets + * Extend the codahale metrics provider to run servlets. */ public class CodahaleMetricsServletProvider extends CodahaleMetricsProvider { - private final static Logger logger = LoggerFactory.getLogger(CodahaleMetricsServletProvider.class); + private static final Logger logger = LoggerFactory.getLogger(CodahaleMetricsServletProvider.class); ServletReporter servletReporter = null; private final HealthCheckRegistry healthCheckRegistry = new HealthCheckRegistry(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/HealthCheckServletContextListener.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/HealthCheckServletContextListener.java b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/HealthCheckServletContextListener.java index 98cd8b3..348787a 100644 --- a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/HealthCheckServletContextListener.java +++ b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/HealthCheckServletContextListener.java @@ -21,7 +21,7 @@ import com.codahale.metrics.health.HealthCheckRegistry; import com.codahale.metrics.servlets.HealthCheckServlet; /** - * Health Check Servlet Listener + * Health Check Servlet Listener. */ public class HealthCheckServletContextListener extends HealthCheckServlet.ContextListener { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/MetricsServletContextListener.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/MetricsServletContextListener.java b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/MetricsServletContextListener.java index e401ac0..15279fe 100644 --- a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/MetricsServletContextListener.java +++ b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/MetricsServletContextListener.java @@ -20,6 +20,9 @@ package org.apache.bookkeeper.stats; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.servlets.MetricsServlet; +/** + * A servlet to report metrics. + */ public class MetricsServletContextListener extends MetricsServlet.ContextListener { private final MetricRegistry metricRegistry; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java index 9cf0610..267f75a 100644 --- a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java +++ b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java @@ -25,7 +25,7 @@ import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; /** - * Starts a jetty server on a configurable port to export stats + * Starts a jetty server on a configurable port to export stats. */ public class ServletReporter { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java new file mode 100644 index 0000000..d00b64d --- /dev/null +++ b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Extension of {@link org.apache.bookkeeper.stats.CodahaleMetricsProvider}. + */ +package org.apache.bookkeeper.stats; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java index 10941ba..922e901 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java @@ -20,7 +20,6 @@ package com.twitter.distributedlog.client.routing; import com.google.common.collect.Sets; import com.twitter.finagle.NoBrokersAvailableException; import com.twitter.finagle.stats.StatsReceiver; - import java.net.SocketAddress; import java.util.HashMap; import java.util.LinkedHashSet; @@ -28,12 +27,18 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +/** + * A local routing service that used for testing. + */ public class LocalRoutingService implements RoutingService { public static Builder newBuilder() { return new Builder(); } + /** + * Builder to build a local routing service for testing. + */ public static class Builder implements RoutingService.Builder { private Builder() {} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java index 4d29f21..f7e81dc 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java @@ -17,39 +17,38 @@ */ package com.twitter.distributedlog.service; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import com.google.common.base.Optional; import com.google.common.collect.Sets; +import com.twitter.distributedlog.DLMTestUtil; import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.client.DistributedLogClientImpl; import com.twitter.distributedlog.client.resolver.DefaultRegionResolver; import com.twitter.distributedlog.client.routing.LocalRoutingService; import com.twitter.distributedlog.client.routing.RegionsRoutingService; import com.twitter.distributedlog.service.DistributedLogCluster.DLServer; -import com.twitter.distributedlog.service.stream.StreamManagerImpl; import com.twitter.distributedlog.service.stream.StreamManager; -import com.twitter.distributedlog.DLMTestUtil; -import com.twitter.finagle.thrift.ClientId$; +import com.twitter.distributedlog.service.stream.StreamManagerImpl; import com.twitter.finagle.builder.ClientBuilder; +import com.twitter.finagle.thrift.ClientId$; import com.twitter.util.Duration; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.net.SocketAddress; import java.net.URI; import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; -import static org.junit.Assert.*; - +/** + * Base test case for distributedlog servers. + */ public abstract class DistributedLogServerTestCase { - protected static final Logger LOG = LoggerFactory.getLogger(DistributedLogServerTestCase.class); - protected static DistributedLogConfiguration conf = new DistributedLogConfiguration().setLockTimeout(10) .setOutputBufferSize(0).setPeriodicFlushFrequencyMilliSeconds(10); @@ -59,6 +58,9 @@ public abstract class DistributedLogServerTestCase { protected static DistributedLogCluster dlCluster; protected static DistributedLogCluster noAdHocCluster; + /** + * A distributedlog client wrapper for testing. + */ protected static class DLClient { public final LocalRoutingService routingService; public DistributedLogClientBuilder dlClientBuilder; @@ -94,6 +96,9 @@ public abstract class DistributedLogServerTestCase { } } + /** + * A distributedlog client wrapper that talks to two regions. + */ protected static class TwoRegionDLClient { public final LocalRoutingService localRoutingService; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java index 218ea06..24d7f07 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java @@ -17,22 +17,30 @@ */ package com.twitter.distributedlog.service; +import static com.google.common.base.Charsets.UTF_8; +import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import com.google.common.base.Optional; import com.twitter.distributedlog.AsyncLogReader; import com.twitter.distributedlog.DLMTestUtil; import com.twitter.distributedlog.DLSN; import com.twitter.distributedlog.DistributedLogManager; -import com.twitter.distributedlog.TestZooKeeperClientBuilder; -import com.twitter.distributedlog.annotations.DistributedLogAnnotations; -import com.twitter.distributedlog.exceptions.LogNotFoundException; import com.twitter.distributedlog.LogReader; import com.twitter.distributedlog.LogRecord; import com.twitter.distributedlog.LogRecordWithDLSN; +import com.twitter.distributedlog.TestZooKeeperClientBuilder; import com.twitter.distributedlog.ZooKeeperClient; import com.twitter.distributedlog.acl.AccessControlManager; -import com.twitter.distributedlog.impl.acl.ZKAccessControl; +import com.twitter.distributedlog.annotations.DistributedLogAnnotations; import com.twitter.distributedlog.client.routing.LocalRoutingService; import com.twitter.distributedlog.exceptions.DLException; +import com.twitter.distributedlog.exceptions.LogNotFoundException; +import com.twitter.distributedlog.impl.acl.ZKAccessControl; import com.twitter.distributedlog.impl.metadata.BKDLConfig; import com.twitter.distributedlog.namespace.DistributedLogNamespace; import com.twitter.distributedlog.service.stream.StreamManagerImpl; @@ -49,13 +57,6 @@ import com.twitter.util.Await; import com.twitter.util.Duration; import com.twitter.util.Future; import com.twitter.util.Futures; -import org.junit.Ignore; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -64,17 +65,19 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import static com.google.common.base.Charsets.UTF_8; -import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - +/** + * Test Case for {@link DistributedLogServer}. + */ public abstract class TestDistributedLogServerBase extends DistributedLogServerTestCase { - static final Logger logger = LoggerFactory.getLogger(TestDistributedLogServerBase.class); + + private static final Logger logger = LoggerFactory.getLogger(TestDistributedLogServerBase.class); @Rule public TestName testName = new TestName(); @@ -84,7 +87,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT } /** - * {@link https://issues.apache.org/jira/browse/DL-27} + * {@link https://issues.apache.org/jira/browse/DL-27}. */ @DistributedLogAnnotations.FlakyTest @Ignore @@ -212,10 +215,11 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT writes.add(null); try { - List<Future<DLSN>> futureResult = dlClient.dlClient.writeBulk(name, writes); + dlClient.dlClient.writeBulk(name, writes); fail("should not have succeeded"); } catch (NullPointerException npe) { - ; // expected + // expected + logger.info("Expected to catch NullPointException."); } } @@ -246,7 +250,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT for (int i = start; i < finish; i++) { Future<DLSN> future = futures.get(i); try { - DLSN dlsn = Await.result(future, Duration.fromSeconds(10)); + Await.result(future, Duration.fromSeconds(10)); fail("future should have failed!"); } catch (DLException cre) { ++failed; @@ -259,7 +263,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT void validateFailedAsLogRecordTooLong(Future<DLSN> future) { try { - DLSN dlsn = Await.result(future, Duration.fromSeconds(10)); + Await.result(future, Duration.fromSeconds(10)); fail("should have failed"); } catch (DLException dle) { assertEquals(StatusCode.TOO_LARGE_RECORD, dle.getCode()); @@ -276,7 +280,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT final int writeCount = 100; - List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount*2 + 1); + List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount * 2 + 1); for (long i = 1; i <= writeCount; i++) { writes.add(ByteBuffer.wrap(("" + i).getBytes())); } @@ -293,7 +297,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT for (int i = 0; i < writeCount; i++) { Future<DLSN> future = futures.get(i); try { - DLSN dlsn = Await.result(future, Duration.fromSeconds(10)); + Await.result(future, Duration.fromSeconds(10)); ++succeeded; } catch (Exception ex) { failDueToWrongException(ex); @@ -510,7 +514,9 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT } } - /** This tests that create has touch like behavior in that trying to create the stream twice, simply does nothing */ + /** + * This tests that create has touch like behavior in that trying to create the stream twice, simply does nothing. + */ @Test(timeout = 60000) public void testCreateStreamTwice() throws Exception { try { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerClientRouting.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerClientRouting.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerClientRouting.java index 0e5f187..b776543 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerClientRouting.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerClientRouting.java @@ -17,14 +17,13 @@ */ package com.twitter.distributedlog.service; +import static com.google.common.base.Charsets.UTF_8; +import static org.junit.Assert.fail; + import com.twitter.finagle.NoBrokersAvailableException; import com.twitter.util.Await; -import org.junit.Test; - import java.nio.ByteBuffer; - -import static com.google.common.base.Charsets.UTF_8; -import static org.junit.Assert.fail; +import org.junit.Test; /** * Test the server with client side routing. http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java index f97399d..d9d2b21 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java @@ -17,10 +17,17 @@ */ package com.twitter.distributedlog.service; +import static com.google.common.base.Charsets.UTF_8; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import com.google.common.collect.Lists; import com.twitter.distributedlog.DLSN; import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.util.ProtocolUtils; import com.twitter.distributedlog.TestDistributedLogBase; import com.twitter.distributedlog.acl.DefaultAccessControlManager; import com.twitter.distributedlog.client.routing.LocalRoutingService; @@ -29,11 +36,11 @@ import com.twitter.distributedlog.exceptions.StreamUnavailableException; import com.twitter.distributedlog.service.config.NullStreamConfigProvider; import com.twitter.distributedlog.service.config.ServerConfiguration; import com.twitter.distributedlog.service.placement.EqualLoadAppraiser; -import com.twitter.distributedlog.service.stream.WriteOp; -import com.twitter.distributedlog.service.stream.StreamImpl.StreamStatus; +import com.twitter.distributedlog.service.stream.Stream; import com.twitter.distributedlog.service.stream.StreamImpl; +import com.twitter.distributedlog.service.stream.StreamImpl.StreamStatus; import com.twitter.distributedlog.service.stream.StreamManagerImpl; -import com.twitter.distributedlog.service.stream.Stream; +import com.twitter.distributedlog.service.stream.WriteOp; import com.twitter.distributedlog.service.streamset.DelimiterStreamPartitionConverter; import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter; import com.twitter.distributedlog.service.streamset.StreamPartitionConverter; @@ -43,8 +50,15 @@ import com.twitter.distributedlog.thrift.service.WriteContext; import com.twitter.distributedlog.thrift.service.WriteResponse; import com.twitter.distributedlog.util.ConfUtils; import com.twitter.distributedlog.util.FutureUtils; +import com.twitter.distributedlog.util.ProtocolUtils; import com.twitter.util.Await; import com.twitter.util.Future; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.feature.SettableFeature; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.util.ReflectionUtils; @@ -57,22 +71,12 @@ import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import static com.google.common.base.Charsets.UTF_8; -import static org.junit.Assert.*; - /** - * Test Case for DistributedLog Service + * Test Case for DistributedLog Service. */ public class TestDistributedLogService extends TestDistributedLogBase { - static final Logger logger = LoggerFactory.getLogger(TestDistributedLogService.class); + private static final Logger logger = LoggerFactory.getLogger(TestDistributedLogService.class); @Rule public TestName testName = new TestName(); @@ -357,12 +361,12 @@ public class TestDistributedLogService extends TestDistributedLogBase { Future<Void> closeFuture0 = s.requestClose("close 0"); assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING, - StreamStatus.CLOSING == s.getStatus() || - StreamStatus.CLOSED == s.getStatus()); + StreamStatus.CLOSING == s.getStatus() + || StreamStatus.CLOSED == s.getStatus()); Future<Void> closeFuture1 = s.requestClose("close 1"); assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING, - StreamStatus.CLOSING == s.getStatus() || - StreamStatus.CLOSED == s.getStatus()); + StreamStatus.CLOSING == s.getStatus() + || StreamStatus.CLOSED == s.getStatus()); Await.result(closeFuture0); assertEquals("Stream " + streamName + " should be set to " + StreamStatus.CLOSED, @@ -386,8 +390,8 @@ public class TestDistributedLogService extends TestDistributedLogBase { Future<Void> closeFuture = s.requestClose("close"); assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING, - StreamStatus.CLOSING == s.getStatus() || - StreamStatus.CLOSED == s.getStatus()); + StreamStatus.CLOSING == s.getStatus() + || StreamStatus.CLOSED == s.getStatus()); WriteOp op1 = createWriteOp(service, streamName, 0L); s.submit(op1); WriteResponse response1 = Await.result(op1.result()); @@ -430,8 +434,8 @@ public class TestDistributedLogService extends TestDistributedLogBase { StreamImpl s = (StreamImpl) streamManager.getCachedStreams().get(streamName); // the stream should be set CLOSING - while (StreamStatus.CLOSING != s.getStatus() && - StreamStatus.CLOSED != s.getStatus()) { + while (StreamStatus.CLOSING != s.getStatus() + && StreamStatus.CLOSED != s.getStatus()) { TimeUnit.MILLISECONDS.sleep(20); } assertNotNull("Writer should be initialized", s.getWriter()); @@ -443,9 +447,9 @@ public class TestDistributedLogService extends TestDistributedLogBase { futureList.get(i).isDefined()); WriteResponse response = Await.result(futureList.get(i)); assertTrue("Op should fail with " + StatusCode.WRITE_CANCELLED_EXCEPTION, - StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode() || - StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() || - StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode()); + StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode() + || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() + || StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode()); } while (streamManager.getCachedStreams().containsKey(streamName)) { @@ -518,7 +522,7 @@ public class TestDistributedLogService extends TestDistributedLogBase { public void testTruncateOpNoChecksum() throws Exception { DistributedLogServiceImpl localService = createConfiguredLocalService(); WriteContext ctx = new WriteContext(); - Future<WriteResponse> result = localService.truncate("test", new DLSN(1,2,3).serialize(), ctx); + Future<WriteResponse> result = localService.truncate("test", new DLSN(1, 2, 3).serialize(), ctx); WriteResponse resp = Await.result(result); assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode()); localService.shutdown(); @@ -580,7 +584,7 @@ public class TestDistributedLogService extends TestDistributedLogBase { ProtocolUtils.writeOpCRC32("test", buffer.array())); // Overwrite 1 byte to corrupt data. - buffer.put(1, (byte)0xab); + buffer.put(1, (byte) 0xab); Future<WriteResponse> result = localService.writeWithContext("test", buffer, ctx); WriteResponse resp = Await.result(result); assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode()); @@ -607,7 +611,7 @@ public class TestDistributedLogService extends TestDistributedLogBase { public void testTruncateOpChecksumBadChecksum() throws Exception { DistributedLogServiceImpl localService = createConfiguredLocalService(); WriteContext ctx = new WriteContext().setCrc32(999); - Future<WriteResponse> result = localService.truncate("test", new DLSN(1,2,3).serialize(), ctx); + Future<WriteResponse> result = localService.truncate("test", new DLSN(1, 2, 3).serialize(), ctx); WriteResponse resp = Await.result(result); assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode()); localService.shutdown(); @@ -620,7 +624,7 @@ public class TestDistributedLogService extends TestDistributedLogBase { new NullStatsLogger(), new IdentityStreamPartitionConverter(), new ServerConfiguration(), - (byte)0, + (byte) 0, checksum, false, disabledFeature, @@ -651,8 +655,14 @@ public class TestDistributedLogService extends TestDistributedLogBase { String streamName = testName.getMethodName(); SettableFeature disabledFeature = new SettableFeature("", 1); - WriteOp writeOp0 = getWriteOp(streamName, disabledFeature, ProtocolUtils.writeOpCRC32(streamName, "test".getBytes())); - WriteOp writeOp1 = getWriteOp(streamName, disabledFeature, ProtocolUtils.writeOpCRC32(streamName, "test".getBytes())); + WriteOp writeOp0 = getWriteOp( + streamName, + disabledFeature, + ProtocolUtils.writeOpCRC32(streamName, "test".getBytes())); + WriteOp writeOp1 = getWriteOp( + streamName, + disabledFeature, + ProtocolUtils.writeOpCRC32(streamName, "test".getBytes())); writeOp0.preExecute(); disabledFeature.set(0); @@ -699,9 +709,9 @@ public class TestDistributedLogService extends TestDistributedLogBase { for (Future<WriteResponse> future : futureList) { WriteResponse response = Await.result(future); assertTrue("Op should succeed or be rejected : " + response.getHeader().getCode(), - StatusCode.SUCCESS == response.getHeader().getCode() || - StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() || - StatusCode.STREAM_UNAVAILABLE == response.getHeader().getCode()); + StatusCode.SUCCESS == response.getHeader().getCode() + || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() + || StatusCode.STREAM_UNAVAILABLE == response.getHeader().getCode()); } assertTrue("There should be no streams in the cache", streamManager.getCachedStreams().isEmpty()); @@ -757,9 +767,9 @@ public class TestDistributedLogService extends TestDistributedLogBase { WriteResponse response = Await.result(future); assertTrue("Op should fail with " + StatusCode.BK_TRANSMIT_ERROR + " or be rejected : " + response.getHeader().getCode(), - StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode() || - StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() || - StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode()); + StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode() + || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() + || StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode()); } // acquired streams should all been removed after we close them assertTrue("There should be no streams in the acquired cache", http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestRegionUnavailable.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestRegionUnavailable.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestRegionUnavailable.java index f4c140e..50c915f 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestRegionUnavailable.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestRegionUnavailable.java @@ -20,6 +20,12 @@ package com.twitter.distributedlog.service; import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.feature.DefaultFeatureProvider; import com.twitter.distributedlog.service.DistributedLogCluster.DLServer; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.feature.FeatureProvider; import org.apache.bookkeeper.feature.SettableFeature; @@ -28,18 +34,14 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.junit.Assert.*; - +/** + * Test Case for {@link com.twitter.distributedlog.exceptions.RegionUnavailableException}. + */ public class TestRegionUnavailable extends DistributedLogServerTestCase { + /** + * A feature provider for testing. + */ public static class TestFeatureProvider extends DefaultFeatureProvider { public TestFeatureProvider(String rootScope, http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestStatsFilter.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestStatsFilter.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestStatsFilter.java index eb65ad4..d8ef302 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestStatsFilter.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestStatsFilter.java @@ -17,18 +17,19 @@ */ package com.twitter.distributedlog.service; +import static org.junit.Assert.assertEquals; + import com.twitter.finagle.Service; import com.twitter.finagle.service.ConstantService; import com.twitter.util.Await; import com.twitter.util.Future; - -import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.stats.NullStatsLogger; - +import org.apache.bookkeeper.stats.StatsLogger; import org.junit.Test; -import static org.junit.Assert.*; - +/** + * Test Case for {@link StatsFilter}. + */ public class TestStatsFilter { class RuntimeExService<Req, Rep> extends Service<Req, Rep> { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestBalancerUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestBalancerUtils.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestBalancerUtils.java index 9e04089..2ae3a23 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestBalancerUtils.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestBalancerUtils.java @@ -17,13 +17,15 @@ */ package com.twitter.distributedlog.service.balancer; -import org.junit.Test; +import static org.junit.Assert.assertEquals; import java.util.HashMap; import java.util.Map; +import org.junit.Test; -import static org.junit.Assert.*; - +/** + * Test Case for {@link BalancerUtils}. + */ public class TestBalancerUtils { @Test(timeout = 60000) http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestClusterBalancer.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestClusterBalancer.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestClusterBalancer.java index 05eb724..8a24e21 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestClusterBalancer.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestClusterBalancer.java @@ -17,6 +17,9 @@ */ package com.twitter.distributedlog.service.balancer; +import static com.google.common.base.Charsets.UTF_8; +import static org.junit.Assert.fail; + import com.google.common.base.Optional; import com.google.common.util.concurrent.RateLimiter; import com.twitter.distributedlog.client.monitor.MonitorServiceClient; @@ -25,6 +28,11 @@ import com.twitter.distributedlog.service.DistributedLogClient; import com.twitter.distributedlog.service.DistributedLogCluster.DLServer; import com.twitter.distributedlog.service.DistributedLogServerTestCase; import com.twitter.util.Await; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import org.apache.commons.lang3.tuple.Pair; import org.junit.After; import org.junit.Before; @@ -33,18 +41,12 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import static com.google.common.base.Charsets.UTF_8; -import static org.junit.Assert.*; - +/** + * Test Case for {@link ClusterBalancer}. + */ public class TestClusterBalancer extends DistributedLogServerTestCase { - static final Logger logger = LoggerFactory.getLogger(TestClusterBalancer.class); + private static final Logger logger = LoggerFactory.getLogger(TestClusterBalancer.class); private final int numServers = 5; private final List<DLServer> cluster; @@ -108,7 +110,7 @@ public class TestClusterBalancer extends DistributedLogServerTestCase { for (int i = 0; i < numStreams; i++) { String name = namePrefix + (streamId++); try { - Await.result(((DistributedLogClient) client.dlClient).write(name, ByteBuffer.wrap(name.getBytes(UTF_8)))); + Await.result(client.dlClient.write(name, ByteBuffer.wrap(name.getBytes(UTF_8)))); } catch (Exception e) { logger.error("Error writing stream {} : ", name, e); throw e; @@ -145,7 +147,7 @@ public class TestClusterBalancer extends DistributedLogServerTestCase { Optional<RateLimiter> rateLimiter = Optional.absent(); Balancer balancer = new ClusterBalancer(client.dlClientBuilder, - Pair.of((DistributedLogClient)client.dlClient, (MonitorServiceClient)client.dlClient)); + Pair.of((DistributedLogClient) client.dlClient, (MonitorServiceClient) client.dlClient)); logger.info("Rebalancing from 'unknown' target"); try { balancer.balanceAll("unknown", 10, rateLimiter);