http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java deleted file mode 100644 index 3f28c42..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream.admin; - -import com.google.common.base.Stopwatch; -import org.apache.distributedlog.exceptions.ChecksumFailedException; -import org.apache.distributedlog.exceptions.DLException; -import org.apache.distributedlog.service.ResponseUtils; -import org.apache.distributedlog.service.stream.StreamManager; -import org.apache.distributedlog.thrift.service.WriteResponse; -import org.apache.distributedlog.util.ProtocolUtils; -import com.twitter.util.Future; -import com.twitter.util.FutureTransformer; -import java.util.concurrent.TimeUnit; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.stats.OpStatsLogger; - -/** - * Stream admin op. - */ -public abstract class StreamAdminOp implements AdminOp<WriteResponse> { - - protected final String stream; - protected final StreamManager streamManager; - protected final OpStatsLogger opStatsLogger; - protected final Stopwatch stopwatch = Stopwatch.createUnstarted(); - protected final Long checksum; - protected final Feature checksumDisabledFeature; - - protected StreamAdminOp(String stream, - StreamManager streamManager, - OpStatsLogger statsLogger, - Long checksum, - Feature checksumDisabledFeature) { - this.stream = stream; - this.streamManager = streamManager; - this.opStatsLogger = statsLogger; - // start here in case the operation is failed before executing. - stopwatch.reset().start(); - this.checksum = checksum; - this.checksumDisabledFeature = checksumDisabledFeature; - } - - protected Long computeChecksum() { - return ProtocolUtils.streamOpCRC32(stream); - } - - @Override - public void preExecute() throws DLException { - if (!checksumDisabledFeature.isAvailable() && null != checksum) { - Long serverChecksum = computeChecksum(); - if (null != serverChecksum && !checksum.equals(serverChecksum)) { - throw new ChecksumFailedException(); - } - } - } - - /** - * Execute the operation. - * - * @return execute operation - */ - protected abstract Future<WriteResponse> executeOp(); - - @Override - public Future<WriteResponse> execute() { - return executeOp().transformedBy(new FutureTransformer<WriteResponse, WriteResponse>() { - - @Override - public WriteResponse map(WriteResponse response) { - opStatsLogger.registerSuccessfulEvent( - stopwatch.elapsed(TimeUnit.MICROSECONDS)); - return response; - } - - @Override - public WriteResponse handle(Throwable cause) { - opStatsLogger.registerFailedEvent( - stopwatch.elapsed(TimeUnit.MICROSECONDS)); - return ResponseUtils.write(ResponseUtils.exceptionToHeader(cause)); - } - - }); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/package-info.java deleted file mode 100644 index 5b583e1..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Stream Related Admin Operations. - */ -package org.apache.distributedlog.service.stream.admin; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/DynamicRequestLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/DynamicRequestLimiter.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/DynamicRequestLimiter.java deleted file mode 100644 index 5db2037..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/DynamicRequestLimiter.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream.limiter; - -import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; -import org.apache.distributedlog.exceptions.OverCapacityException; -import org.apache.distributedlog.limiter.RequestLimiter; -import java.io.Closeable; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.commons.configuration.event.ConfigurationEvent; -import org.apache.commons.configuration.event.ConfigurationListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Dynamically rebuild a rate limiter when the supplied dynamic config changes. - * - * <p>Subclasses implement build() to build the limiter. DynamicRequestLimiter must be closed to deregister - * the config listener. - */ -public abstract class DynamicRequestLimiter<Req> implements RequestLimiter<Req>, Closeable { - private static final Logger LOG = LoggerFactory.getLogger(DynamicRequestLimiter.class); - - private final ConfigurationListener listener; - private final Feature rateLimitDisabledFeature; - volatile RequestLimiter<Req> limiter; - final DynamicDistributedLogConfiguration dynConf; - - public DynamicRequestLimiter(DynamicDistributedLogConfiguration dynConf, - StatsLogger statsLogger, - Feature rateLimitDisabledFeature) { - final StatsLogger limiterStatsLogger = statsLogger.scope("dynamic"); - this.dynConf = dynConf; - this.rateLimitDisabledFeature = rateLimitDisabledFeature; - this.listener = new ConfigurationListener() { - @Override - public void configurationChanged(ConfigurationEvent event) { - // Note that this method may be called several times if several config options - // are changed. The effect is harmless except that we create and discard more - // objects than we need to. - LOG.debug("Config changed callback invoked with event {} {} {} {}", new Object[] { - event.getPropertyName(), event.getPropertyValue(), event.getType(), - event.isBeforeUpdate()}); - if (!event.isBeforeUpdate()) { - limiterStatsLogger.getCounter("config_changed").inc(); - LOG.debug("Rebuilding limiter"); - limiter = build(); - } - } - }; - LOG.debug("Registering config changed callback"); - dynConf.addConfigurationListener(listener); - } - - public void initialize() { - this.limiter = build(); - } - - @Override - public void apply(Req request) throws OverCapacityException { - if (rateLimitDisabledFeature.isAvailable()) { - return; - } - limiter.apply(request); - } - - @Override - public void close() { - boolean success = dynConf.removeConfigurationListener(listener); - LOG.debug("Deregistering config changed callback success={}", success); - } - - /** - * Build the underlying limiter. Called when DynamicRequestLimiter detects config has changed. - * This may be called multiple times so the method should be cheap. - */ - protected abstract RequestLimiter<Req> build(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/RequestLimiterBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/RequestLimiterBuilder.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/RequestLimiterBuilder.java deleted file mode 100644 index fc30599..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/RequestLimiterBuilder.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream.limiter; - -import static com.google.common.base.Preconditions.checkNotNull; - -import org.apache.distributedlog.exceptions.OverCapacityException; -import org.apache.distributedlog.limiter.ComposableRequestLimiter; -import org.apache.distributedlog.limiter.ComposableRequestLimiter.CostFunction; -import org.apache.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction; -import org.apache.distributedlog.limiter.GuavaRateLimiter; -import org.apache.distributedlog.limiter.RateLimiter; -import org.apache.distributedlog.limiter.RequestLimiter; -import org.apache.distributedlog.service.stream.StreamOp; -import org.apache.distributedlog.service.stream.WriteOpWithPayload; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; - -/** - * Request limiter builder. - */ -public class RequestLimiterBuilder { - private OverlimitFunction<StreamOp> overlimitFunction = NOP_OVERLIMIT_FUNCTION; - private RateLimiter limiter; - private CostFunction<StreamOp> costFunction; - private StatsLogger statsLogger = NullStatsLogger.INSTANCE; - - /** - * Function to calculate the `RPS` (Request per second) cost of a given stream operation. - */ - public static final CostFunction<StreamOp> RPS_COST_FUNCTION = new CostFunction<StreamOp>() { - @Override - public int apply(StreamOp op) { - if (op instanceof WriteOpWithPayload) { - return 1; - } else { - return 0; - } - } - }; - - /** - * Function to calculate the `BPS` (Bytes per second) cost of a given stream operation. - */ - public static final CostFunction<StreamOp> BPS_COST_FUNCTION = new CostFunction<StreamOp>() { - @Override - public int apply(StreamOp op) { - if (op instanceof WriteOpWithPayload) { - WriteOpWithPayload writeOp = (WriteOpWithPayload) op; - return (int) Math.min(writeOp.getPayloadSize(), Integer.MAX_VALUE); - } else { - return 0; - } - } - }; - - /** - * Function to check if a stream operation will cause {@link OverCapacityException}. - */ - public static final OverlimitFunction<StreamOp> NOP_OVERLIMIT_FUNCTION = new OverlimitFunction<StreamOp>() { - @Override - public void apply(StreamOp op) throws OverCapacityException { - return; - } - }; - - public RequestLimiterBuilder limit(int limit) { - this.limiter = GuavaRateLimiter.of(limit); - return this; - } - - public RequestLimiterBuilder overlimit(OverlimitFunction<StreamOp> overlimitFunction) { - this.overlimitFunction = overlimitFunction; - return this; - } - - public RequestLimiterBuilder cost(CostFunction<StreamOp> costFunction) { - this.costFunction = costFunction; - return this; - } - - public RequestLimiterBuilder statsLogger(StatsLogger statsLogger) { - this.statsLogger = statsLogger; - return this; - } - - public static RequestLimiterBuilder newRpsLimiterBuilder() { - return new RequestLimiterBuilder().cost(RPS_COST_FUNCTION); - } - - public static RequestLimiterBuilder newBpsLimiterBuilder() { - return new RequestLimiterBuilder().cost(BPS_COST_FUNCTION); - } - - public RequestLimiter<StreamOp> build() { - checkNotNull(limiter); - checkNotNull(overlimitFunction); - checkNotNull(costFunction); - return new ComposableRequestLimiter(limiter, overlimitFunction, costFunction, statsLogger); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java deleted file mode 100644 index de805aa..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream.limiter; - -import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; -import org.apache.distributedlog.exceptions.OverCapacityException; -import org.apache.distributedlog.limiter.ChainedRequestLimiter; -import org.apache.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction; -import org.apache.distributedlog.limiter.RequestLimiter; -import org.apache.distributedlog.rate.MovingAverageRate; -import org.apache.distributedlog.service.stream.StreamManager; -import org.apache.distributedlog.service.stream.StreamOp; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.stats.StatsLogger; - -/** - * Request limiter for the service instance (global request limiter). - */ -public class ServiceRequestLimiter extends DynamicRequestLimiter<StreamOp> { - private final StatsLogger limiterStatLogger; - private final MovingAverageRate serviceRps; - private final MovingAverageRate serviceBps; - private final StreamManager streamManager; - - public ServiceRequestLimiter(DynamicDistributedLogConfiguration dynConf, - StatsLogger statsLogger, - MovingAverageRate serviceRps, - MovingAverageRate serviceBps, - StreamManager streamManager, - Feature disabledFeature) { - super(dynConf, statsLogger, disabledFeature); - this.limiterStatLogger = statsLogger; - this.streamManager = streamManager; - this.serviceRps = serviceRps; - this.serviceBps = serviceBps; - this.limiter = build(); - } - - @Override - public RequestLimiter<StreamOp> build() { - int rpsStreamAcquireLimit = dynConf.getRpsStreamAcquireServiceLimit(); - int rpsSoftServiceLimit = dynConf.getRpsSoftServiceLimit(); - int rpsHardServiceLimit = dynConf.getRpsHardServiceLimit(); - int bpsStreamAcquireLimit = dynConf.getBpsStreamAcquireServiceLimit(); - int bpsSoftServiceLimit = dynConf.getBpsSoftServiceLimit(); - int bpsHardServiceLimit = dynConf.getBpsHardServiceLimit(); - - RequestLimiterBuilder rpsHardLimiterBuilder = RequestLimiterBuilder.newRpsLimiterBuilder() - .statsLogger(limiterStatLogger.scope("rps_hard_limit")) - .limit(rpsHardServiceLimit) - .overlimit(new OverlimitFunction<StreamOp>() { - @Override - public void apply(StreamOp request) throws OverCapacityException { - throw new OverCapacityException("Being rate limited: RPS limit exceeded for the service instance"); - } - }); - - RequestLimiterBuilder rpsSoftLimiterBuilder = RequestLimiterBuilder.newRpsLimiterBuilder() - .statsLogger(limiterStatLogger.scope("rps_soft_limit")) - .limit(rpsSoftServiceLimit); - - RequestLimiterBuilder bpsHardLimiterBuilder = RequestLimiterBuilder.newBpsLimiterBuilder() - .statsLogger(limiterStatLogger.scope("bps_hard_limit")) - .limit(bpsHardServiceLimit) - .overlimit(new OverlimitFunction<StreamOp>() { - @Override - public void apply(StreamOp request) throws OverCapacityException { - throw new OverCapacityException("Being rate limited: BPS limit exceeded for the service instance"); - } - }); - - RequestLimiterBuilder bpsSoftLimiterBuilder = RequestLimiterBuilder.newBpsLimiterBuilder() - .statsLogger(limiterStatLogger.scope("bps_soft_limit")) - .limit(bpsSoftServiceLimit); - - ChainedRequestLimiter.Builder<StreamOp> builder = new ChainedRequestLimiter.Builder<StreamOp>(); - builder.addLimiter(new StreamAcquireLimiter( - streamManager, serviceRps, rpsStreamAcquireLimit, limiterStatLogger.scope("rps_acquire"))); - builder.addLimiter(new StreamAcquireLimiter( - streamManager, serviceBps, bpsStreamAcquireLimit, limiterStatLogger.scope("bps_acquire"))); - builder.addLimiter(bpsHardLimiterBuilder.build()); - builder.addLimiter(bpsSoftLimiterBuilder.build()); - builder.addLimiter(rpsHardLimiterBuilder.build()); - builder.addLimiter(rpsSoftLimiterBuilder.build()); - builder.statsLogger(limiterStatLogger); - return builder.build(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java deleted file mode 100644 index 7675d6f..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream.limiter; - -import org.apache.distributedlog.exceptions.OverCapacityException; -import org.apache.distributedlog.exceptions.TooManyStreamsException; -import org.apache.distributedlog.limiter.RequestLimiter; -import org.apache.distributedlog.rate.MovingAverageRate; -import org.apache.distributedlog.service.stream.StreamManager; -import org.apache.distributedlog.service.stream.StreamOp; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.StatsLogger; - -/** - * A special limiter on limiting acquiring new streams. - */ -public class StreamAcquireLimiter implements RequestLimiter<StreamOp> { - private final StreamManager streamManager; - private final MovingAverageRate serviceRps; - private final double serviceRpsLimit; - private final Counter overlimitCounter; - - public StreamAcquireLimiter(StreamManager streamManager, - MovingAverageRate serviceRps, - double serviceRpsLimit, - StatsLogger statsLogger) { - this.streamManager = streamManager; - this.serviceRps = serviceRps; - this.serviceRpsLimit = serviceRpsLimit; - this.overlimitCounter = statsLogger.getCounter("overlimit"); - } - - @Override - public void apply(StreamOp op) throws OverCapacityException { - String streamName = op.streamName(); - if (serviceRpsLimit > -1 && serviceRps.get() > serviceRpsLimit && !streamManager.isAcquired(streamName)) { - overlimitCounter.inc(); - throw new TooManyStreamsException("Request rate is too high to accept new stream " + streamName + "."); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamRequestLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamRequestLimiter.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamRequestLimiter.java deleted file mode 100644 index 42b4e1e..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamRequestLimiter.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream.limiter; - -import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; -import org.apache.distributedlog.exceptions.OverCapacityException; -import org.apache.distributedlog.limiter.ChainedRequestLimiter; -import org.apache.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction; -import org.apache.distributedlog.limiter.RequestLimiter; -import org.apache.distributedlog.service.stream.StreamOp; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.stats.StatsLogger; - -/** - * A dynamic request limiter on limiting stream operations. - */ -public class StreamRequestLimiter extends DynamicRequestLimiter<StreamOp> { - private final DynamicDistributedLogConfiguration dynConf; - private final StatsLogger limiterStatLogger; - private final String streamName; - - public StreamRequestLimiter(String streamName, - DynamicDistributedLogConfiguration dynConf, - StatsLogger statsLogger, - Feature disabledFeature) { - super(dynConf, statsLogger, disabledFeature); - this.limiterStatLogger = statsLogger; - this.dynConf = dynConf; - this.streamName = streamName; - this.limiter = build(); - } - - @Override - public RequestLimiter<StreamOp> build() { - - // RPS hard, soft limits - RequestLimiterBuilder rpsHardLimiterBuilder = RequestLimiterBuilder.newRpsLimiterBuilder() - .statsLogger(limiterStatLogger.scope("rps_hard_limit")) - .limit(dynConf.getRpsHardWriteLimit()) - .overlimit(new OverlimitFunction<StreamOp>() { - @Override - public void apply(StreamOp op) throws OverCapacityException { - throw new OverCapacityException("Being rate limited: RPS limit exceeded for stream " + streamName); - } - }); - RequestLimiterBuilder rpsSoftLimiterBuilder = RequestLimiterBuilder.newRpsLimiterBuilder() - .statsLogger(limiterStatLogger.scope("rps_soft_limit")) - .limit(dynConf.getRpsSoftWriteLimit()); - - // BPS hard, soft limits - RequestLimiterBuilder bpsHardLimiterBuilder = RequestLimiterBuilder.newBpsLimiterBuilder() - .statsLogger(limiterStatLogger.scope("bps_hard_limit")) - .limit(dynConf.getBpsHardWriteLimit()) - .overlimit(new OverlimitFunction<StreamOp>() { - @Override - public void apply(StreamOp op) throws OverCapacityException { - throw new OverCapacityException("Being rate limited: BPS limit exceeded for stream " + streamName); - } - }); - RequestLimiterBuilder bpsSoftLimiterBuilder = RequestLimiterBuilder.newBpsLimiterBuilder() - .statsLogger(limiterStatLogger.scope("bps_soft_limit")) - .limit(dynConf.getBpsSoftWriteLimit()); - - ChainedRequestLimiter.Builder<StreamOp> builder = new ChainedRequestLimiter.Builder<StreamOp>(); - builder.addLimiter(rpsSoftLimiterBuilder.build()); - builder.addLimiter(rpsHardLimiterBuilder.build()); - builder.addLimiter(bpsSoftLimiterBuilder.build()); - builder.addLimiter(bpsHardLimiterBuilder.build()); - builder.statsLogger(limiterStatLogger); - return builder.build(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/package-info.java deleted file mode 100644 index c666b08..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Request Rate Limiting. - */ -package org.apache.distributedlog.service.stream.limiter; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/package-info.java deleted file mode 100644 index 7429a85..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Stream Related Operations. - */ -package org.apache.distributedlog.service.stream; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/CacheableStreamPartitionConverter.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/CacheableStreamPartitionConverter.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/CacheableStreamPartitionConverter.java deleted file mode 100644 index 72668c2..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/CacheableStreamPartitionConverter.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.streamset; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * A stream-to-partition converter that caches the mapping between stream and partitions. - */ -public abstract class CacheableStreamPartitionConverter implements StreamPartitionConverter { - - private final ConcurrentMap<String, Partition> partitions; - - protected CacheableStreamPartitionConverter() { - this.partitions = new ConcurrentHashMap<String, Partition>(); - } - - @Override - public Partition convert(String streamName) { - Partition p = partitions.get(streamName); - if (null != p) { - return p; - } - // not found - Partition newPartition = newPartition(streamName); - Partition oldPartition = partitions.putIfAbsent(streamName, newPartition); - if (null == oldPartition) { - return newPartition; - } else { - return oldPartition; - } - } - - /** - * Create the partition from <code>streamName</code>. - * - * @param streamName - * stream name - * @return partition id of the stream - */ - protected abstract Partition newPartition(String streamName); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java deleted file mode 100644 index 30b2896..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.streamset; - -import org.apache.commons.lang3.StringUtils; - -/** - * Stream Partition Converter that converts the stream name into a stream-to-partition mapping by delimiter. - */ -public class DelimiterStreamPartitionConverter extends CacheableStreamPartitionConverter { - - private final String delimiter; - - public DelimiterStreamPartitionConverter() { - this("_"); - } - - public DelimiterStreamPartitionConverter(String delimiter) { - this.delimiter = delimiter; - } - - @Override - protected Partition newPartition(String streamName) { - String[] parts = StringUtils.split(streamName, delimiter); - if (null != parts && parts.length == 2) { - try { - int partition = Integer.parseInt(parts[1]); - return new Partition(parts[0], partition); - } catch (NumberFormatException nfe) { - // ignore the exception - } - } - return new Partition(streamName, 0); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/IdentityStreamPartitionConverter.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/IdentityStreamPartitionConverter.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/IdentityStreamPartitionConverter.java deleted file mode 100644 index 5be172f..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/IdentityStreamPartitionConverter.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.streamset; - -/** - * Map stream name to partition of the same name. - */ -public class IdentityStreamPartitionConverter extends CacheableStreamPartitionConverter { - @Override - protected Partition newPartition(String streamName) { - return new Partition(streamName, 0); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/Partition.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/Partition.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/Partition.java deleted file mode 100644 index aa69276..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/Partition.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.streamset; - -import com.google.common.base.Objects; - -/** - * `Partition` defines the relationship between a `virtual` stream and a - * physical DL stream. - * - * <p>A `virtual` stream could be partitioned into multiple partitions - * and each partition is effectively a DL stream. - */ -public class Partition { - - // Name of its parent stream. - private final String stream; - - // Unique id of the partition within the stream. - // It can be just simply an index id. - public final int id; - - public Partition(String stream, int id) { - this.stream = stream; - this.id = id; - } - - /** - * Get the `virtual` stream name. - * - * @return the stream name. - */ - public String getStream() { - return stream; - } - - /** - * Get the partition id of this partition. - * - * @return partition id - */ - public int getId() { - return id; - } - - /** - * Get the 6 digit 0 padded id of this partition as a String. - * @return partition id - */ - public String getPaddedId() { - return String.format("%06d", getId()); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof Partition)) { - return false; - } - Partition partition = (Partition) o; - - return id == partition.id && Objects.equal(stream, partition.stream); - } - - @Override - public int hashCode() { - int result = stream.hashCode(); - result = 31 * result + id; - return result; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("Partition(") - .append(stream) - .append(", ") - .append(id) - .append(")"); - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/PartitionMap.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/PartitionMap.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/PartitionMap.java deleted file mode 100644 index bfcc5db..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/PartitionMap.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.streamset; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -/** - * A mapping between a logical stream and a set of physical partitions. - */ -public class PartitionMap { - - private final Map<String, Set<Partition>> partitionMap; - - public PartitionMap() { - partitionMap = new HashMap<String, Set<Partition>>(); - } - - public synchronized boolean addPartition(Partition partition, int maxPartitions) { - if (maxPartitions <= 0) { - return true; - } - Set<Partition> partitions = partitionMap.get(partition.getStream()); - if (null == partitions) { - partitions = new HashSet<Partition>(); - partitions.add(partition); - partitionMap.put(partition.getStream(), partitions); - return true; - } - if (partitions.contains(partition) || partitions.size() < maxPartitions) { - partitions.add(partition); - return true; - } - return false; - } - - public synchronized boolean removePartition(Partition partition) { - Set<Partition> partitions = partitionMap.get(partition.getStream()); - return null != partitions && partitions.remove(partition); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/StreamPartitionConverter.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/StreamPartitionConverter.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/StreamPartitionConverter.java deleted file mode 100644 index 3ea1337..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/StreamPartitionConverter.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.streamset; - -/** - * Map stream name to a partition. - * - * @see Partition - */ -public interface StreamPartitionConverter { - - /** - * Convert the stream name to partition. - * - * @param streamName - * stream name - * @return partition - */ - Partition convert(String streamName); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/package-info.java deleted file mode 100644 index d185e88..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * StreamSet - A logical set of streams. - */ -package org.apache.distributedlog.service.streamset; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/tools/ProxyTool.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/tools/ProxyTool.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/tools/ProxyTool.java deleted file mode 100644 index 3934eb5..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/tools/ProxyTool.java +++ /dev/null @@ -1,350 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.tools; - -import com.google.common.util.concurrent.RateLimiter; -import org.apache.distributedlog.DLSN; -import org.apache.distributedlog.client.monitor.MonitorServiceClient; -import org.apache.distributedlog.client.serverset.DLZkServerSet; -import org.apache.distributedlog.service.ClientUtils; -import org.apache.distributedlog.service.DLSocketAddress; -import org.apache.distributedlog.service.DistributedLogClient; -import org.apache.distributedlog.service.DistributedLogClientBuilder; -import org.apache.distributedlog.tools.Tool; -import com.twitter.finagle.builder.ClientBuilder; -import com.twitter.finagle.thrift.ClientId$; -import com.twitter.util.Await; -import com.twitter.util.Duration; -import java.net.InetSocketAddress; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Tools to interact with proxies. - */ -public class ProxyTool extends Tool { - - private static final Logger logger = LoggerFactory.getLogger(ProxyTool.class); - - /** - * Abstract Cluster level command. - */ - protected abstract static class ClusterCommand extends OptsCommand { - - protected Options options = new Options(); - protected URI uri; - protected final List<String> streams = new ArrayList<String>(); - - protected ClusterCommand(String name, String description) { - super(name, description); - options.addOption("u", "uri", true, "DistributedLog URI"); - options.addOption("r", "prefix", true, "Prefix of stream name. E.g. 'QuantumLeapTest-'."); - options.addOption("e", "expression", true, "Expression to generate stream suffix. " - + "Currently we support range '0-9', list '1,2,3' and name '143'"); - } - - @Override - protected int runCmd(CommandLine commandLine) throws Exception { - try { - parseCommandLine(commandLine); - } catch (ParseException pe) { - System.err.println("ERROR: failed to parse commandline : '" + pe.getMessage() + "'"); - printUsage(); - return -1; - } - - DLZkServerSet serverSet = DLZkServerSet.of(uri, 60000); - logger.info("Created serverset for {}", uri); - try { - DistributedLogClient client = DistributedLogClientBuilder.newBuilder() - .name("proxy_tool") - .clientId(ClientId$.MODULE$.apply("proxy_tool")) - .maxRedirects(2) - .serverSet(serverSet.getServerSet()) - .clientBuilder(ClientBuilder.get() - .connectionTimeout(Duration.fromSeconds(2)) - .tcpConnectTimeout(Duration.fromSeconds(2)) - .requestTimeout(Duration.fromSeconds(10)) - .hostConnectionLimit(1) - .hostConnectionCoresize(1) - .keepAlive(true) - .failFast(false)) - .build(); - try { - return runCmd(client); - } finally { - client.close(); - } - } finally { - serverSet.close(); - } - } - - protected abstract int runCmd(DistributedLogClient client) throws Exception; - - @Override - protected Options getOptions() { - return options; - } - - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - if (!cmdline.hasOption("u")) { - throw new ParseException("No distributedlog uri provided."); - } - this.uri = URI.create(cmdline.getOptionValue("u")); - - // get stream names - String streamPrefix = cmdline.hasOption("r") ? cmdline.getOptionValue("r") : ""; - String streamExpression = null; - if (cmdline.hasOption("e")) { - streamExpression = cmdline.getOptionValue("e"); - } - if (null == streamPrefix || null == streamExpression) { - throw new ParseException("Please specify stream prefix & expression."); - } - // parse the stream expression - if (streamExpression.contains("-")) { - // a range expression - String[] parts = streamExpression.split("-"); - if (parts.length != 2) { - throw new ParseException("Invalid stream index range : " + streamExpression); - } - try { - int start = Integer.parseInt(parts[0]); - int end = Integer.parseInt(parts[1]); - if (start > end) { - throw new ParseException("Invalid stream index range : " + streamExpression); - } - for (int i = start; i <= end; i++) { - streams.add(streamPrefix + i); - } - } catch (NumberFormatException nfe) { - throw new ParseException("Invalid stream index range : " + streamExpression); - } - } else if (streamExpression.contains(",")) { - // a list expression - String[] parts = streamExpression.split(","); - try { - for (String part : parts) { - streams.add(streamPrefix + part); - } - } catch (NumberFormatException nfe) { - throw new ParseException("Invalid stream suffix list : " + streamExpression); - } - } else { - streams.add(streamPrefix + streamExpression); - } - } - } - - /** - * Command to release ownership of a log stream. - */ - static class ReleaseCommand extends ClusterCommand { - - double rate = 100f; - - ReleaseCommand() { - super("release", "Release Stream Ownerships"); - options.addOption("t", "rate", true, "Rate to release streams"); - } - - @Override - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - super.parseCommandLine(cmdline); - if (cmdline.hasOption("t")) { - rate = Double.parseDouble(cmdline.getOptionValue("t", "100")); - } - } - - @Override - protected int runCmd(DistributedLogClient client) throws Exception { - RateLimiter rateLimiter = RateLimiter.create(rate); - for (String stream : streams) { - rateLimiter.acquire(); - try { - Await.result(client.release(stream)); - System.out.println("Release ownership of stream " + stream); - } catch (Exception e) { - System.err.println("Failed to release ownership of stream " + stream); - throw e; - } - } - return 0; - } - - @Override - protected String getUsage() { - return "release [options]"; - } - } - - /** - * Command to truncate a log stream. - */ - static class TruncateCommand extends ClusterCommand { - - DLSN dlsn = DLSN.InitialDLSN; - - TruncateCommand() { - super("truncate", "Truncate streams until given dlsn."); - options.addOption("d", "dlsn", true, "DLSN to truncate until"); - } - - @Override - protected int runCmd(DistributedLogClient client) throws Exception { - System.out.println("Truncating streams : " + streams); - for (String stream : streams) { - boolean success = Await.result(client.truncate(stream, dlsn)); - System.out.println("Truncate " + stream + " to " + dlsn + " : " + success); - } - return 0; - } - - @Override - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - super.parseCommandLine(cmdline); - if (!cmdline.hasOption("d")) { - throw new ParseException("No DLSN provided"); - } - String[] dlsnStrs = cmdline.getOptionValue("d").split(","); - if (dlsnStrs.length != 3) { - throw new ParseException("Invalid DLSN : " + cmdline.getOptionValue("d")); - } - dlsn = new DLSN(Long.parseLong(dlsnStrs[0]), Long.parseLong(dlsnStrs[1]), Long.parseLong(dlsnStrs[2])); - } - - @Override - protected String getUsage() { - return "truncate [options]"; - } - } - - /** - * Abstract command to operate on a single proxy server. - */ - protected abstract static class ProxyCommand extends OptsCommand { - - protected Options options = new Options(); - protected InetSocketAddress address; - - protected ProxyCommand(String name, String description) { - super(name, description); - options.addOption("H", "host", true, "Single Proxy Address"); - } - - @Override - protected Options getOptions() { - return options; - } - - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - if (!cmdline.hasOption("H")) { - throw new ParseException("No proxy address provided"); - } - address = DLSocketAddress.parseSocketAddress(cmdline.getOptionValue("H")); - } - - @Override - protected int runCmd(CommandLine commandLine) throws Exception { - try { - parseCommandLine(commandLine); - } catch (ParseException pe) { - System.err.println("ERROR: failed to parse commandline : '" + pe.getMessage() + "'"); - printUsage(); - return -1; - } - - DistributedLogClientBuilder clientBuilder = DistributedLogClientBuilder.newBuilder() - .name("proxy_tool") - .clientId(ClientId$.MODULE$.apply("proxy_tool")) - .maxRedirects(2) - .host(address) - .clientBuilder(ClientBuilder.get() - .connectionTimeout(Duration.fromSeconds(2)) - .tcpConnectTimeout(Duration.fromSeconds(2)) - .requestTimeout(Duration.fromSeconds(10)) - .hostConnectionLimit(1) - .hostConnectionCoresize(1) - .keepAlive(true) - .failFast(false)); - Pair<DistributedLogClient, MonitorServiceClient> clientPair = - ClientUtils.buildClient(clientBuilder); - try { - return runCmd(clientPair); - } finally { - clientPair.getLeft().close(); - } - } - - protected abstract int runCmd(Pair<DistributedLogClient, MonitorServiceClient> client) throws Exception; - } - - /** - * Command to enable/disable accepting new streams. - */ - static class AcceptNewStreamCommand extends ProxyCommand { - - boolean enabled = false; - - AcceptNewStreamCommand() { - super("accept-new-stream", "Enable/Disable accepting new streams for one proxy"); - options.addOption("e", "enabled", true, "Enable/Disable accepting new streams"); - } - - @Override - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - super.parseCommandLine(cmdline); - if (!cmdline.hasOption("e")) { - throw new ParseException("No action 'enable/disable' provided"); - } - enabled = Boolean.parseBoolean(cmdline.getOptionValue("e")); - } - - @Override - protected int runCmd(Pair<DistributedLogClient, MonitorServiceClient> client) - throws Exception { - Await.result(client.getRight().setAcceptNewStream(enabled)); - return 0; - } - - @Override - protected String getUsage() { - return "accept-new-stream [options]"; - } - } - - public ProxyTool() { - super(); - addCommand(new ReleaseCommand()); - addCommand(new TruncateCommand()); - addCommand(new AcceptNewStreamCommand()); - } - - @Override - protected String getName() { - return "proxy_tool"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/tools/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/tools/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/tools/package-info.java deleted file mode 100644 index 92d0a7d..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/tools/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Service related tools. - */ -package org.apache.distributedlog.service.tools; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/utils/ServerUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/utils/ServerUtils.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/utils/ServerUtils.java deleted file mode 100644 index 9ee93b4..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/utils/ServerUtils.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.utils; - -import java.io.IOException; -import java.net.InetAddress; - -/** - * Utils that used by servers. - */ -public class ServerUtils { - - /** - * Retrieve the ledger allocator pool name. - * - * @param serverRegionId region id that that server is running - * @param shardId shard id of the server - * @param useHostname whether to use hostname as the ledger allocator pool name - * @return ledger allocator pool name - * @throws IOException - */ - public static String getLedgerAllocatorPoolName(int serverRegionId, - int shardId, - boolean useHostname) - throws IOException { - if (useHostname) { - return String.format("allocator_%04d_%s", serverRegionId, - InetAddress.getLocalHost().getHostAddress()); - } else { - return String.format("allocator_%04d_%010d", serverRegionId, shardId); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/utils/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/utils/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/utils/package-info.java deleted file mode 100644 index 99cf736..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/utils/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Utilities used by proxy servers. - */ -package org.apache.distributedlog.service.utils; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/resources/config/server_decider.conf ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/resources/config/server_decider.conf b/distributedlog-service/src/main/resources/config/server_decider.conf deleted file mode 100644 index d2fddf5..0000000 --- a/distributedlog-service/src/main/resources/config/server_decider.conf +++ /dev/null @@ -1,31 +0,0 @@ -#/** -# * Licensed to the Apache Software Foundation (ASF) under one -# * or more contributor license agreements. See the NOTICE file -# * distributed with this work for additional information -# * regarding copyright ownership. The ASF licenses this file -# * to you under the Apache License, Version 2.0 (the -# * "License"); you may not use this file except in compliance -# * with the License. You may obtain a copy of the License at -# * -# * http://www.apache.org/licenses/LICENSE-2.0 -# * -# * Unless required by applicable law or agreed to in writing, software -# * distributed under the License is distributed on an "AS IS" BASIS, -# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# * See the License for the specific language governing permissions and -# * limitations under the License. -# */ - -region_stop_accept_new_stream=0 -disable_durability_enforcement=0 -disable_write_limit=0 -bkc.repp_disable_durability_enforcement=0 -bkc.disable_ensemble_change=0 -dl.disable_logsegment_rolling=0 -dl.disable_write_limit=0 -bkc.atla.disallow_bookie_placement=0 -bkc.atlb.disallow_bookie_placement=0 -bkc.smf1.disallow_bookie_placement=0 -service_rate_limit_disabled=0 -service_checksum_disabled=0 -service_global_limiter_disabled=0 http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/resources/config/server_decider.yml ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/resources/config/server_decider.yml b/distributedlog-service/src/main/resources/config/server_decider.yml deleted file mode 100644 index 7df24bb..0000000 --- a/distributedlog-service/src/main/resources/config/server_decider.yml +++ /dev/null @@ -1,44 +0,0 @@ -#/** -# * Licensed to the Apache Software Foundation (ASF) under one -# * or more contributor license agreements. See the NOTICE file -# * distributed with this work for additional information -# * regarding copyright ownership. The ASF licenses this file -# * to you under the Apache License, Version 2.0 (the -# * "License"); you may not use this file except in compliance -# * with the License. You may obtain a copy of the License at -# * -# * http://www.apache.org/licenses/LICENSE-2.0 -# * -# * Unless required by applicable law or agreed to in writing, software -# * distributed under the License is distributed on an "AS IS" BASIS, -# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# * See the License for the specific language governing permissions and -# * limitations under the License. -# */ - -region_stop_accept_new_stream: - default_availability: 0 -disable_durability_enforcement: - default_availability: 0 -disable_write_limit: - default_availability: 0 -bkc.repp_disable_durability_enforcement: - default_availability: 0 -bkc.disable_ensemble_change: - default_availability: 0 -dl.disable_logsegment_rolling: - default_availability: 0 -dl.disable_write_limit: - default_availability: 0 -bkc.atla.disallow_bookie_placement: - default_availability: 0 -bkc.atlb.disallow_bookie_placement: - default_availability: 0 -bkc.smf1.disallow_bookie_placement: - default_availability: 0 -service_rate_limit_disabled: - default_availability: 0 -service_checksum_disabled: - default_availability: 0 -service_global_limiter_disabled: - default_availability: 0 http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/resources/findbugsExclude.xml ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/resources/findbugsExclude.xml b/distributedlog-service/src/main/resources/findbugsExclude.xml deleted file mode 100644 index e101a4d..0000000 --- a/distributedlog-service/src/main/resources/findbugsExclude.xml +++ /dev/null @@ -1,39 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -//--> -<FindBugsFilter> - <Match> - <!-- generated code, we can't be held responsible for findbugs in it //--> - <Class name="~org\.apache\.distributedlog\.thrift.*" /> - </Match> - <Match> - <!-- generated code, we can't be held responsible for findbugs in it //--> - <Class name="~org\.apache\.distributedlog\.service\.placement\.thrift.*" /> - </Match> - <Match> - <!-- it is safe to cast exception here. //--> - <Class name="org.apache.distributedlog.service.DistributedLogServiceImpl$Stream$2" /> - <Method name="onFailure" /> - <Bug pattern="BC_UNCONFIRMED_CAST" /> - </Match> - <Match> - <!-- it is safe to cast exception here. //--> - <Class name="org.apache.distributedlog.service.stream.BulkWriteOp" /> - <Method name="isDefiniteFailure" /> - <Bug pattern="BC_IMPOSSIBLE_INSTANCEOF" /> - </Match> -</FindBugsFilter> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/thrift/metadata.thrift ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/thrift/metadata.thrift b/distributedlog-service/src/main/thrift/metadata.thrift deleted file mode 100644 index 9cb3c72..0000000 --- a/distributedlog-service/src/main/thrift/metadata.thrift +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -namespace java org.apache.distributedlog.service.placement.thrift - -struct StreamLoad { - 1: optional string stream - 2: optional i32 load -} - -struct ServerLoad { - 1: optional string server - 2: optional i64 load - 3: optional list<StreamLoad> streams -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/client/routing/LocalRoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/client/routing/LocalRoutingService.java b/distributedlog-service/src/test/java/org/apache/distributedlog/client/routing/LocalRoutingService.java deleted file mode 100644 index a9ddae5..0000000 --- a/distributedlog-service/src/test/java/org/apache/distributedlog/client/routing/LocalRoutingService.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.client.routing; - -import com.google.common.collect.Sets; -import com.twitter.finagle.NoBrokersAvailableException; -import com.twitter.finagle.stats.StatsReceiver; -import java.net.SocketAddress; -import java.util.HashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; - -/** - * A local routing service that used for testing. - */ -public class LocalRoutingService implements RoutingService { - - public static Builder newBuilder() { - return new Builder(); - } - - /** - * Builder to build a local routing service for testing. - */ - public static class Builder implements RoutingService.Builder { - - private Builder() {} - - @Override - public RoutingService.Builder statsReceiver(StatsReceiver statsReceiver) { - return this; - } - - @Override - public LocalRoutingService build() { - return new LocalRoutingService(); - } - } - - private final Map<String, LinkedHashSet<SocketAddress>> localAddresses = - new HashMap<String, LinkedHashSet<SocketAddress>>(); - private final CopyOnWriteArrayList<RoutingListener> listeners = - new CopyOnWriteArrayList<RoutingListener>(); - - boolean allowRetrySameHost = true; - - @Override - public void startService() { - // nop - } - - @Override - public void stopService() { - // nop - } - - @Override - public synchronized Set<SocketAddress> getHosts() { - Set<SocketAddress> hosts = Sets.newHashSet(); - for (LinkedHashSet<SocketAddress> addresses : localAddresses.values()) { - hosts.addAll(addresses); - } - return hosts; - } - - @Override - public RoutingService registerListener(RoutingListener listener) { - listeners.add(listener); - return this; - } - - @Override - public RoutingService unregisterListener(RoutingListener listener) { - listeners.remove(listener); - return this; - } - - public LocalRoutingService setAllowRetrySameHost(boolean enabled) { - allowRetrySameHost = enabled; - return this; - } - - public LocalRoutingService addHost(String stream, SocketAddress address) { - boolean notify = false; - synchronized (this) { - LinkedHashSet<SocketAddress> addresses = localAddresses.get(stream); - if (null == addresses) { - addresses = new LinkedHashSet<SocketAddress>(); - localAddresses.put(stream, addresses); - } - if (addresses.add(address)) { - notify = true; - } - } - if (notify) { - for (RoutingListener listener : listeners) { - listener.onServerJoin(address); - } - } - return this; - } - - @Override - public synchronized SocketAddress getHost(String key, RoutingContext rContext) - throws NoBrokersAvailableException { - LinkedHashSet<SocketAddress> addresses = localAddresses.get(key); - - SocketAddress candidate = null; - if (null != addresses) { - for (SocketAddress host : addresses) { - if (rContext.isTriedHost(host) && !allowRetrySameHost) { - continue; - } else { - candidate = host; - break; - } - } - } - if (null != candidate) { - return candidate; - } - throw new NoBrokersAvailableException("No host available"); - } - - @Override - public void removeHost(SocketAddress address, Throwable reason) { - // nop - } -}