http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java new file mode 100644 index 0000000..c3c5d81 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream.admin; + +import com.google.common.base.Stopwatch; +import org.apache.distributedlog.exceptions.ChecksumFailedException; +import org.apache.distributedlog.exceptions.DLException; +import org.apache.distributedlog.protocol.util.ProtocolUtils; +import org.apache.distributedlog.service.ResponseUtils; +import org.apache.distributedlog.service.stream.StreamManager; +import org.apache.distributedlog.thrift.service.WriteResponse; +import com.twitter.util.Future; +import com.twitter.util.FutureTransformer; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.stats.OpStatsLogger; + +/** + * Stream admin op. + */ +public abstract class StreamAdminOp implements AdminOp<WriteResponse> { + + protected final String stream; + protected final StreamManager streamManager; + protected final OpStatsLogger opStatsLogger; + protected final Stopwatch stopwatch = Stopwatch.createUnstarted(); + protected final Long checksum; + protected final Feature checksumDisabledFeature; + + protected StreamAdminOp(String stream, + StreamManager streamManager, + OpStatsLogger statsLogger, + Long checksum, + Feature checksumDisabledFeature) { + this.stream = stream; + this.streamManager = streamManager; + this.opStatsLogger = statsLogger; + // start here in case the operation is failed before executing. + stopwatch.reset().start(); + this.checksum = checksum; + this.checksumDisabledFeature = checksumDisabledFeature; + } + + protected Long computeChecksum() { + return ProtocolUtils.streamOpCRC32(stream); + } + + @Override + public void preExecute() throws DLException { + if (!checksumDisabledFeature.isAvailable() && null != checksum) { + Long serverChecksum = computeChecksum(); + if (null != serverChecksum && !checksum.equals(serverChecksum)) { + throw new ChecksumFailedException(); + } + } + } + + /** + * Execute the operation. + * + * @return execute operation + */ + protected abstract Future<WriteResponse> executeOp(); + + @Override + public Future<WriteResponse> execute() { + return executeOp().transformedBy(new FutureTransformer<WriteResponse, WriteResponse>() { + + @Override + public WriteResponse map(WriteResponse response) { + opStatsLogger.registerSuccessfulEvent( + stopwatch.elapsed(TimeUnit.MICROSECONDS)); + return response; + } + + @Override + public WriteResponse handle(Throwable cause) { + opStatsLogger.registerFailedEvent( + stopwatch.elapsed(TimeUnit.MICROSECONDS)); + return ResponseUtils.write(ResponseUtils.exceptionToHeader(cause)); + } + + }); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/package-info.java new file mode 100644 index 0000000..5b583e1 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Stream Related Admin Operations. + */ +package org.apache.distributedlog.service.stream.admin; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/DynamicRequestLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/DynamicRequestLimiter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/DynamicRequestLimiter.java new file mode 100644 index 0000000..5db2037 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/DynamicRequestLimiter.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream.limiter; + +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.exceptions.OverCapacityException; +import org.apache.distributedlog.limiter.RequestLimiter; +import java.io.Closeable; +import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.commons.configuration.event.ConfigurationEvent; +import org.apache.commons.configuration.event.ConfigurationListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Dynamically rebuild a rate limiter when the supplied dynamic config changes. + * + * <p>Subclasses implement build() to build the limiter. DynamicRequestLimiter must be closed to deregister + * the config listener. + */ +public abstract class DynamicRequestLimiter<Req> implements RequestLimiter<Req>, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(DynamicRequestLimiter.class); + + private final ConfigurationListener listener; + private final Feature rateLimitDisabledFeature; + volatile RequestLimiter<Req> limiter; + final DynamicDistributedLogConfiguration dynConf; + + public DynamicRequestLimiter(DynamicDistributedLogConfiguration dynConf, + StatsLogger statsLogger, + Feature rateLimitDisabledFeature) { + final StatsLogger limiterStatsLogger = statsLogger.scope("dynamic"); + this.dynConf = dynConf; + this.rateLimitDisabledFeature = rateLimitDisabledFeature; + this.listener = new ConfigurationListener() { + @Override + public void configurationChanged(ConfigurationEvent event) { + // Note that this method may be called several times if several config options + // are changed. The effect is harmless except that we create and discard more + // objects than we need to. + LOG.debug("Config changed callback invoked with event {} {} {} {}", new Object[] { + event.getPropertyName(), event.getPropertyValue(), event.getType(), + event.isBeforeUpdate()}); + if (!event.isBeforeUpdate()) { + limiterStatsLogger.getCounter("config_changed").inc(); + LOG.debug("Rebuilding limiter"); + limiter = build(); + } + } + }; + LOG.debug("Registering config changed callback"); + dynConf.addConfigurationListener(listener); + } + + public void initialize() { + this.limiter = build(); + } + + @Override + public void apply(Req request) throws OverCapacityException { + if (rateLimitDisabledFeature.isAvailable()) { + return; + } + limiter.apply(request); + } + + @Override + public void close() { + boolean success = dynConf.removeConfigurationListener(listener); + LOG.debug("Deregistering config changed callback success={}", success); + } + + /** + * Build the underlying limiter. Called when DynamicRequestLimiter detects config has changed. + * This may be called multiple times so the method should be cheap. + */ + protected abstract RequestLimiter<Req> build(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/RequestLimiterBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/RequestLimiterBuilder.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/RequestLimiterBuilder.java new file mode 100644 index 0000000..fc30599 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/RequestLimiterBuilder.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream.limiter; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.distributedlog.exceptions.OverCapacityException; +import org.apache.distributedlog.limiter.ComposableRequestLimiter; +import org.apache.distributedlog.limiter.ComposableRequestLimiter.CostFunction; +import org.apache.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction; +import org.apache.distributedlog.limiter.GuavaRateLimiter; +import org.apache.distributedlog.limiter.RateLimiter; +import org.apache.distributedlog.limiter.RequestLimiter; +import org.apache.distributedlog.service.stream.StreamOp; +import org.apache.distributedlog.service.stream.WriteOpWithPayload; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; + +/** + * Request limiter builder. + */ +public class RequestLimiterBuilder { + private OverlimitFunction<StreamOp> overlimitFunction = NOP_OVERLIMIT_FUNCTION; + private RateLimiter limiter; + private CostFunction<StreamOp> costFunction; + private StatsLogger statsLogger = NullStatsLogger.INSTANCE; + + /** + * Function to calculate the `RPS` (Request per second) cost of a given stream operation. + */ + public static final CostFunction<StreamOp> RPS_COST_FUNCTION = new CostFunction<StreamOp>() { + @Override + public int apply(StreamOp op) { + if (op instanceof WriteOpWithPayload) { + return 1; + } else { + return 0; + } + } + }; + + /** + * Function to calculate the `BPS` (Bytes per second) cost of a given stream operation. + */ + public static final CostFunction<StreamOp> BPS_COST_FUNCTION = new CostFunction<StreamOp>() { + @Override + public int apply(StreamOp op) { + if (op instanceof WriteOpWithPayload) { + WriteOpWithPayload writeOp = (WriteOpWithPayload) op; + return (int) Math.min(writeOp.getPayloadSize(), Integer.MAX_VALUE); + } else { + return 0; + } + } + }; + + /** + * Function to check if a stream operation will cause {@link OverCapacityException}. + */ + public static final OverlimitFunction<StreamOp> NOP_OVERLIMIT_FUNCTION = new OverlimitFunction<StreamOp>() { + @Override + public void apply(StreamOp op) throws OverCapacityException { + return; + } + }; + + public RequestLimiterBuilder limit(int limit) { + this.limiter = GuavaRateLimiter.of(limit); + return this; + } + + public RequestLimiterBuilder overlimit(OverlimitFunction<StreamOp> overlimitFunction) { + this.overlimitFunction = overlimitFunction; + return this; + } + + public RequestLimiterBuilder cost(CostFunction<StreamOp> costFunction) { + this.costFunction = costFunction; + return this; + } + + public RequestLimiterBuilder statsLogger(StatsLogger statsLogger) { + this.statsLogger = statsLogger; + return this; + } + + public static RequestLimiterBuilder newRpsLimiterBuilder() { + return new RequestLimiterBuilder().cost(RPS_COST_FUNCTION); + } + + public static RequestLimiterBuilder newBpsLimiterBuilder() { + return new RequestLimiterBuilder().cost(BPS_COST_FUNCTION); + } + + public RequestLimiter<StreamOp> build() { + checkNotNull(limiter); + checkNotNull(overlimitFunction); + checkNotNull(costFunction); + return new ComposableRequestLimiter(limiter, overlimitFunction, costFunction, statsLogger); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java new file mode 100644 index 0000000..de805aa --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream.limiter; + +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.exceptions.OverCapacityException; +import org.apache.distributedlog.limiter.ChainedRequestLimiter; +import org.apache.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction; +import org.apache.distributedlog.limiter.RequestLimiter; +import org.apache.distributedlog.rate.MovingAverageRate; +import org.apache.distributedlog.service.stream.StreamManager; +import org.apache.distributedlog.service.stream.StreamOp; +import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.stats.StatsLogger; + +/** + * Request limiter for the service instance (global request limiter). + */ +public class ServiceRequestLimiter extends DynamicRequestLimiter<StreamOp> { + private final StatsLogger limiterStatLogger; + private final MovingAverageRate serviceRps; + private final MovingAverageRate serviceBps; + private final StreamManager streamManager; + + public ServiceRequestLimiter(DynamicDistributedLogConfiguration dynConf, + StatsLogger statsLogger, + MovingAverageRate serviceRps, + MovingAverageRate serviceBps, + StreamManager streamManager, + Feature disabledFeature) { + super(dynConf, statsLogger, disabledFeature); + this.limiterStatLogger = statsLogger; + this.streamManager = streamManager; + this.serviceRps = serviceRps; + this.serviceBps = serviceBps; + this.limiter = build(); + } + + @Override + public RequestLimiter<StreamOp> build() { + int rpsStreamAcquireLimit = dynConf.getRpsStreamAcquireServiceLimit(); + int rpsSoftServiceLimit = dynConf.getRpsSoftServiceLimit(); + int rpsHardServiceLimit = dynConf.getRpsHardServiceLimit(); + int bpsStreamAcquireLimit = dynConf.getBpsStreamAcquireServiceLimit(); + int bpsSoftServiceLimit = dynConf.getBpsSoftServiceLimit(); + int bpsHardServiceLimit = dynConf.getBpsHardServiceLimit(); + + RequestLimiterBuilder rpsHardLimiterBuilder = RequestLimiterBuilder.newRpsLimiterBuilder() + .statsLogger(limiterStatLogger.scope("rps_hard_limit")) + .limit(rpsHardServiceLimit) + .overlimit(new OverlimitFunction<StreamOp>() { + @Override + public void apply(StreamOp request) throws OverCapacityException { + throw new OverCapacityException("Being rate limited: RPS limit exceeded for the service instance"); + } + }); + + RequestLimiterBuilder rpsSoftLimiterBuilder = RequestLimiterBuilder.newRpsLimiterBuilder() + .statsLogger(limiterStatLogger.scope("rps_soft_limit")) + .limit(rpsSoftServiceLimit); + + RequestLimiterBuilder bpsHardLimiterBuilder = RequestLimiterBuilder.newBpsLimiterBuilder() + .statsLogger(limiterStatLogger.scope("bps_hard_limit")) + .limit(bpsHardServiceLimit) + .overlimit(new OverlimitFunction<StreamOp>() { + @Override + public void apply(StreamOp request) throws OverCapacityException { + throw new OverCapacityException("Being rate limited: BPS limit exceeded for the service instance"); + } + }); + + RequestLimiterBuilder bpsSoftLimiterBuilder = RequestLimiterBuilder.newBpsLimiterBuilder() + .statsLogger(limiterStatLogger.scope("bps_soft_limit")) + .limit(bpsSoftServiceLimit); + + ChainedRequestLimiter.Builder<StreamOp> builder = new ChainedRequestLimiter.Builder<StreamOp>(); + builder.addLimiter(new StreamAcquireLimiter( + streamManager, serviceRps, rpsStreamAcquireLimit, limiterStatLogger.scope("rps_acquire"))); + builder.addLimiter(new StreamAcquireLimiter( + streamManager, serviceBps, bpsStreamAcquireLimit, limiterStatLogger.scope("bps_acquire"))); + builder.addLimiter(bpsHardLimiterBuilder.build()); + builder.addLimiter(bpsSoftLimiterBuilder.build()); + builder.addLimiter(rpsHardLimiterBuilder.build()); + builder.addLimiter(rpsSoftLimiterBuilder.build()); + builder.statsLogger(limiterStatLogger); + return builder.build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java new file mode 100644 index 0000000..7675d6f --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream.limiter; + +import org.apache.distributedlog.exceptions.OverCapacityException; +import org.apache.distributedlog.exceptions.TooManyStreamsException; +import org.apache.distributedlog.limiter.RequestLimiter; +import org.apache.distributedlog.rate.MovingAverageRate; +import org.apache.distributedlog.service.stream.StreamManager; +import org.apache.distributedlog.service.stream.StreamOp; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.StatsLogger; + +/** + * A special limiter on limiting acquiring new streams. + */ +public class StreamAcquireLimiter implements RequestLimiter<StreamOp> { + private final StreamManager streamManager; + private final MovingAverageRate serviceRps; + private final double serviceRpsLimit; + private final Counter overlimitCounter; + + public StreamAcquireLimiter(StreamManager streamManager, + MovingAverageRate serviceRps, + double serviceRpsLimit, + StatsLogger statsLogger) { + this.streamManager = streamManager; + this.serviceRps = serviceRps; + this.serviceRpsLimit = serviceRpsLimit; + this.overlimitCounter = statsLogger.getCounter("overlimit"); + } + + @Override + public void apply(StreamOp op) throws OverCapacityException { + String streamName = op.streamName(); + if (serviceRpsLimit > -1 && serviceRps.get() > serviceRpsLimit && !streamManager.isAcquired(streamName)) { + overlimitCounter.inc(); + throw new TooManyStreamsException("Request rate is too high to accept new stream " + streamName + "."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamRequestLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamRequestLimiter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamRequestLimiter.java new file mode 100644 index 0000000..42b4e1e --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamRequestLimiter.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream.limiter; + +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.exceptions.OverCapacityException; +import org.apache.distributedlog.limiter.ChainedRequestLimiter; +import org.apache.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction; +import org.apache.distributedlog.limiter.RequestLimiter; +import org.apache.distributedlog.service.stream.StreamOp; +import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.stats.StatsLogger; + +/** + * A dynamic request limiter on limiting stream operations. + */ +public class StreamRequestLimiter extends DynamicRequestLimiter<StreamOp> { + private final DynamicDistributedLogConfiguration dynConf; + private final StatsLogger limiterStatLogger; + private final String streamName; + + public StreamRequestLimiter(String streamName, + DynamicDistributedLogConfiguration dynConf, + StatsLogger statsLogger, + Feature disabledFeature) { + super(dynConf, statsLogger, disabledFeature); + this.limiterStatLogger = statsLogger; + this.dynConf = dynConf; + this.streamName = streamName; + this.limiter = build(); + } + + @Override + public RequestLimiter<StreamOp> build() { + + // RPS hard, soft limits + RequestLimiterBuilder rpsHardLimiterBuilder = RequestLimiterBuilder.newRpsLimiterBuilder() + .statsLogger(limiterStatLogger.scope("rps_hard_limit")) + .limit(dynConf.getRpsHardWriteLimit()) + .overlimit(new OverlimitFunction<StreamOp>() { + @Override + public void apply(StreamOp op) throws OverCapacityException { + throw new OverCapacityException("Being rate limited: RPS limit exceeded for stream " + streamName); + } + }); + RequestLimiterBuilder rpsSoftLimiterBuilder = RequestLimiterBuilder.newRpsLimiterBuilder() + .statsLogger(limiterStatLogger.scope("rps_soft_limit")) + .limit(dynConf.getRpsSoftWriteLimit()); + + // BPS hard, soft limits + RequestLimiterBuilder bpsHardLimiterBuilder = RequestLimiterBuilder.newBpsLimiterBuilder() + .statsLogger(limiterStatLogger.scope("bps_hard_limit")) + .limit(dynConf.getBpsHardWriteLimit()) + .overlimit(new OverlimitFunction<StreamOp>() { + @Override + public void apply(StreamOp op) throws OverCapacityException { + throw new OverCapacityException("Being rate limited: BPS limit exceeded for stream " + streamName); + } + }); + RequestLimiterBuilder bpsSoftLimiterBuilder = RequestLimiterBuilder.newBpsLimiterBuilder() + .statsLogger(limiterStatLogger.scope("bps_soft_limit")) + .limit(dynConf.getBpsSoftWriteLimit()); + + ChainedRequestLimiter.Builder<StreamOp> builder = new ChainedRequestLimiter.Builder<StreamOp>(); + builder.addLimiter(rpsSoftLimiterBuilder.build()); + builder.addLimiter(rpsHardLimiterBuilder.build()); + builder.addLimiter(bpsSoftLimiterBuilder.build()); + builder.addLimiter(bpsHardLimiterBuilder.build()); + builder.statsLogger(limiterStatLogger); + return builder.build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/package-info.java new file mode 100644 index 0000000..c666b08 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Request Rate Limiting. + */ +package org.apache.distributedlog.service.stream.limiter; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/package-info.java new file mode 100644 index 0000000..7429a85 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Stream Related Operations. + */ +package org.apache.distributedlog.service.stream; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/CacheableStreamPartitionConverter.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/CacheableStreamPartitionConverter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/CacheableStreamPartitionConverter.java new file mode 100644 index 0000000..72668c2 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/CacheableStreamPartitionConverter.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.streamset; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * A stream-to-partition converter that caches the mapping between stream and partitions. + */ +public abstract class CacheableStreamPartitionConverter implements StreamPartitionConverter { + + private final ConcurrentMap<String, Partition> partitions; + + protected CacheableStreamPartitionConverter() { + this.partitions = new ConcurrentHashMap<String, Partition>(); + } + + @Override + public Partition convert(String streamName) { + Partition p = partitions.get(streamName); + if (null != p) { + return p; + } + // not found + Partition newPartition = newPartition(streamName); + Partition oldPartition = partitions.putIfAbsent(streamName, newPartition); + if (null == oldPartition) { + return newPartition; + } else { + return oldPartition; + } + } + + /** + * Create the partition from <code>streamName</code>. + * + * @param streamName + * stream name + * @return partition id of the stream + */ + protected abstract Partition newPartition(String streamName); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java new file mode 100644 index 0000000..30b2896 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.streamset; + +import org.apache.commons.lang3.StringUtils; + +/** + * Stream Partition Converter that converts the stream name into a stream-to-partition mapping by delimiter. + */ +public class DelimiterStreamPartitionConverter extends CacheableStreamPartitionConverter { + + private final String delimiter; + + public DelimiterStreamPartitionConverter() { + this("_"); + } + + public DelimiterStreamPartitionConverter(String delimiter) { + this.delimiter = delimiter; + } + + @Override + protected Partition newPartition(String streamName) { + String[] parts = StringUtils.split(streamName, delimiter); + if (null != parts && parts.length == 2) { + try { + int partition = Integer.parseInt(parts[1]); + return new Partition(parts[0], partition); + } catch (NumberFormatException nfe) { + // ignore the exception + } + } + return new Partition(streamName, 0); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/IdentityStreamPartitionConverter.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/IdentityStreamPartitionConverter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/IdentityStreamPartitionConverter.java new file mode 100644 index 0000000..5be172f --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/IdentityStreamPartitionConverter.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.streamset; + +/** + * Map stream name to partition of the same name. + */ +public class IdentityStreamPartitionConverter extends CacheableStreamPartitionConverter { + @Override + protected Partition newPartition(String streamName) { + return new Partition(streamName, 0); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/Partition.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/Partition.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/Partition.java new file mode 100644 index 0000000..aa69276 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/Partition.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.streamset; + +import com.google.common.base.Objects; + +/** + * `Partition` defines the relationship between a `virtual` stream and a + * physical DL stream. + * + * <p>A `virtual` stream could be partitioned into multiple partitions + * and each partition is effectively a DL stream. + */ +public class Partition { + + // Name of its parent stream. + private final String stream; + + // Unique id of the partition within the stream. + // It can be just simply an index id. + public final int id; + + public Partition(String stream, int id) { + this.stream = stream; + this.id = id; + } + + /** + * Get the `virtual` stream name. + * + * @return the stream name. + */ + public String getStream() { + return stream; + } + + /** + * Get the partition id of this partition. + * + * @return partition id + */ + public int getId() { + return id; + } + + /** + * Get the 6 digit 0 padded id of this partition as a String. + * @return partition id + */ + public String getPaddedId() { + return String.format("%06d", getId()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof Partition)) { + return false; + } + Partition partition = (Partition) o; + + return id == partition.id && Objects.equal(stream, partition.stream); + } + + @Override + public int hashCode() { + int result = stream.hashCode(); + result = 31 * result + id; + return result; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Partition(") + .append(stream) + .append(", ") + .append(id) + .append(")"); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/PartitionMap.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/PartitionMap.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/PartitionMap.java new file mode 100644 index 0000000..bfcc5db --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/PartitionMap.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.streamset; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * A mapping between a logical stream and a set of physical partitions. + */ +public class PartitionMap { + + private final Map<String, Set<Partition>> partitionMap; + + public PartitionMap() { + partitionMap = new HashMap<String, Set<Partition>>(); + } + + public synchronized boolean addPartition(Partition partition, int maxPartitions) { + if (maxPartitions <= 0) { + return true; + } + Set<Partition> partitions = partitionMap.get(partition.getStream()); + if (null == partitions) { + partitions = new HashSet<Partition>(); + partitions.add(partition); + partitionMap.put(partition.getStream(), partitions); + return true; + } + if (partitions.contains(partition) || partitions.size() < maxPartitions) { + partitions.add(partition); + return true; + } + return false; + } + + public synchronized boolean removePartition(Partition partition) { + Set<Partition> partitions = partitionMap.get(partition.getStream()); + return null != partitions && partitions.remove(partition); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/StreamPartitionConverter.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/StreamPartitionConverter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/StreamPartitionConverter.java new file mode 100644 index 0000000..3ea1337 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/StreamPartitionConverter.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.streamset; + +/** + * Map stream name to a partition. + * + * @see Partition + */ +public interface StreamPartitionConverter { + + /** + * Convert the stream name to partition. + * + * @param streamName + * stream name + * @return partition + */ + Partition convert(String streamName); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/package-info.java new file mode 100644 index 0000000..d185e88 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * StreamSet - A logical set of streams. + */ +package org.apache.distributedlog.service.streamset; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/tools/ProxyTool.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/tools/ProxyTool.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/tools/ProxyTool.java new file mode 100644 index 0000000..3934eb5 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/tools/ProxyTool.java @@ -0,0 +1,350 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.tools; + +import com.google.common.util.concurrent.RateLimiter; +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.client.monitor.MonitorServiceClient; +import org.apache.distributedlog.client.serverset.DLZkServerSet; +import org.apache.distributedlog.service.ClientUtils; +import org.apache.distributedlog.service.DLSocketAddress; +import org.apache.distributedlog.service.DistributedLogClient; +import org.apache.distributedlog.service.DistributedLogClientBuilder; +import org.apache.distributedlog.tools.Tool; +import com.twitter.finagle.builder.ClientBuilder; +import com.twitter.finagle.thrift.ClientId$; +import com.twitter.util.Await; +import com.twitter.util.Duration; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tools to interact with proxies. + */ +public class ProxyTool extends Tool { + + private static final Logger logger = LoggerFactory.getLogger(ProxyTool.class); + + /** + * Abstract Cluster level command. + */ + protected abstract static class ClusterCommand extends OptsCommand { + + protected Options options = new Options(); + protected URI uri; + protected final List<String> streams = new ArrayList<String>(); + + protected ClusterCommand(String name, String description) { + super(name, description); + options.addOption("u", "uri", true, "DistributedLog URI"); + options.addOption("r", "prefix", true, "Prefix of stream name. E.g. 'QuantumLeapTest-'."); + options.addOption("e", "expression", true, "Expression to generate stream suffix. " + + "Currently we support range '0-9', list '1,2,3' and name '143'"); + } + + @Override + protected int runCmd(CommandLine commandLine) throws Exception { + try { + parseCommandLine(commandLine); + } catch (ParseException pe) { + System.err.println("ERROR: failed to parse commandline : '" + pe.getMessage() + "'"); + printUsage(); + return -1; + } + + DLZkServerSet serverSet = DLZkServerSet.of(uri, 60000); + logger.info("Created serverset for {}", uri); + try { + DistributedLogClient client = DistributedLogClientBuilder.newBuilder() + .name("proxy_tool") + .clientId(ClientId$.MODULE$.apply("proxy_tool")) + .maxRedirects(2) + .serverSet(serverSet.getServerSet()) + .clientBuilder(ClientBuilder.get() + .connectionTimeout(Duration.fromSeconds(2)) + .tcpConnectTimeout(Duration.fromSeconds(2)) + .requestTimeout(Duration.fromSeconds(10)) + .hostConnectionLimit(1) + .hostConnectionCoresize(1) + .keepAlive(true) + .failFast(false)) + .build(); + try { + return runCmd(client); + } finally { + client.close(); + } + } finally { + serverSet.close(); + } + } + + protected abstract int runCmd(DistributedLogClient client) throws Exception; + + @Override + protected Options getOptions() { + return options; + } + + protected void parseCommandLine(CommandLine cmdline) throws ParseException { + if (!cmdline.hasOption("u")) { + throw new ParseException("No distributedlog uri provided."); + } + this.uri = URI.create(cmdline.getOptionValue("u")); + + // get stream names + String streamPrefix = cmdline.hasOption("r") ? cmdline.getOptionValue("r") : ""; + String streamExpression = null; + if (cmdline.hasOption("e")) { + streamExpression = cmdline.getOptionValue("e"); + } + if (null == streamPrefix || null == streamExpression) { + throw new ParseException("Please specify stream prefix & expression."); + } + // parse the stream expression + if (streamExpression.contains("-")) { + // a range expression + String[] parts = streamExpression.split("-"); + if (parts.length != 2) { + throw new ParseException("Invalid stream index range : " + streamExpression); + } + try { + int start = Integer.parseInt(parts[0]); + int end = Integer.parseInt(parts[1]); + if (start > end) { + throw new ParseException("Invalid stream index range : " + streamExpression); + } + for (int i = start; i <= end; i++) { + streams.add(streamPrefix + i); + } + } catch (NumberFormatException nfe) { + throw new ParseException("Invalid stream index range : " + streamExpression); + } + } else if (streamExpression.contains(",")) { + // a list expression + String[] parts = streamExpression.split(","); + try { + for (String part : parts) { + streams.add(streamPrefix + part); + } + } catch (NumberFormatException nfe) { + throw new ParseException("Invalid stream suffix list : " + streamExpression); + } + } else { + streams.add(streamPrefix + streamExpression); + } + } + } + + /** + * Command to release ownership of a log stream. + */ + static class ReleaseCommand extends ClusterCommand { + + double rate = 100f; + + ReleaseCommand() { + super("release", "Release Stream Ownerships"); + options.addOption("t", "rate", true, "Rate to release streams"); + } + + @Override + protected void parseCommandLine(CommandLine cmdline) throws ParseException { + super.parseCommandLine(cmdline); + if (cmdline.hasOption("t")) { + rate = Double.parseDouble(cmdline.getOptionValue("t", "100")); + } + } + + @Override + protected int runCmd(DistributedLogClient client) throws Exception { + RateLimiter rateLimiter = RateLimiter.create(rate); + for (String stream : streams) { + rateLimiter.acquire(); + try { + Await.result(client.release(stream)); + System.out.println("Release ownership of stream " + stream); + } catch (Exception e) { + System.err.println("Failed to release ownership of stream " + stream); + throw e; + } + } + return 0; + } + + @Override + protected String getUsage() { + return "release [options]"; + } + } + + /** + * Command to truncate a log stream. + */ + static class TruncateCommand extends ClusterCommand { + + DLSN dlsn = DLSN.InitialDLSN; + + TruncateCommand() { + super("truncate", "Truncate streams until given dlsn."); + options.addOption("d", "dlsn", true, "DLSN to truncate until"); + } + + @Override + protected int runCmd(DistributedLogClient client) throws Exception { + System.out.println("Truncating streams : " + streams); + for (String stream : streams) { + boolean success = Await.result(client.truncate(stream, dlsn)); + System.out.println("Truncate " + stream + " to " + dlsn + " : " + success); + } + return 0; + } + + @Override + protected void parseCommandLine(CommandLine cmdline) throws ParseException { + super.parseCommandLine(cmdline); + if (!cmdline.hasOption("d")) { + throw new ParseException("No DLSN provided"); + } + String[] dlsnStrs = cmdline.getOptionValue("d").split(","); + if (dlsnStrs.length != 3) { + throw new ParseException("Invalid DLSN : " + cmdline.getOptionValue("d")); + } + dlsn = new DLSN(Long.parseLong(dlsnStrs[0]), Long.parseLong(dlsnStrs[1]), Long.parseLong(dlsnStrs[2])); + } + + @Override + protected String getUsage() { + return "truncate [options]"; + } + } + + /** + * Abstract command to operate on a single proxy server. + */ + protected abstract static class ProxyCommand extends OptsCommand { + + protected Options options = new Options(); + protected InetSocketAddress address; + + protected ProxyCommand(String name, String description) { + super(name, description); + options.addOption("H", "host", true, "Single Proxy Address"); + } + + @Override + protected Options getOptions() { + return options; + } + + protected void parseCommandLine(CommandLine cmdline) throws ParseException { + if (!cmdline.hasOption("H")) { + throw new ParseException("No proxy address provided"); + } + address = DLSocketAddress.parseSocketAddress(cmdline.getOptionValue("H")); + } + + @Override + protected int runCmd(CommandLine commandLine) throws Exception { + try { + parseCommandLine(commandLine); + } catch (ParseException pe) { + System.err.println("ERROR: failed to parse commandline : '" + pe.getMessage() + "'"); + printUsage(); + return -1; + } + + DistributedLogClientBuilder clientBuilder = DistributedLogClientBuilder.newBuilder() + .name("proxy_tool") + .clientId(ClientId$.MODULE$.apply("proxy_tool")) + .maxRedirects(2) + .host(address) + .clientBuilder(ClientBuilder.get() + .connectionTimeout(Duration.fromSeconds(2)) + .tcpConnectTimeout(Duration.fromSeconds(2)) + .requestTimeout(Duration.fromSeconds(10)) + .hostConnectionLimit(1) + .hostConnectionCoresize(1) + .keepAlive(true) + .failFast(false)); + Pair<DistributedLogClient, MonitorServiceClient> clientPair = + ClientUtils.buildClient(clientBuilder); + try { + return runCmd(clientPair); + } finally { + clientPair.getLeft().close(); + } + } + + protected abstract int runCmd(Pair<DistributedLogClient, MonitorServiceClient> client) throws Exception; + } + + /** + * Command to enable/disable accepting new streams. + */ + static class AcceptNewStreamCommand extends ProxyCommand { + + boolean enabled = false; + + AcceptNewStreamCommand() { + super("accept-new-stream", "Enable/Disable accepting new streams for one proxy"); + options.addOption("e", "enabled", true, "Enable/Disable accepting new streams"); + } + + @Override + protected void parseCommandLine(CommandLine cmdline) throws ParseException { + super.parseCommandLine(cmdline); + if (!cmdline.hasOption("e")) { + throw new ParseException("No action 'enable/disable' provided"); + } + enabled = Boolean.parseBoolean(cmdline.getOptionValue("e")); + } + + @Override + protected int runCmd(Pair<DistributedLogClient, MonitorServiceClient> client) + throws Exception { + Await.result(client.getRight().setAcceptNewStream(enabled)); + return 0; + } + + @Override + protected String getUsage() { + return "accept-new-stream [options]"; + } + } + + public ProxyTool() { + super(); + addCommand(new ReleaseCommand()); + addCommand(new TruncateCommand()); + addCommand(new AcceptNewStreamCommand()); + } + + @Override + protected String getName() { + return "proxy_tool"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/tools/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/tools/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/tools/package-info.java new file mode 100644 index 0000000..92d0a7d --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/tools/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Service related tools. + */ +package org.apache.distributedlog.service.tools; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/utils/ServerUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/utils/ServerUtils.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/utils/ServerUtils.java new file mode 100644 index 0000000..9ee93b4 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/utils/ServerUtils.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.utils; + +import java.io.IOException; +import java.net.InetAddress; + +/** + * Utils that used by servers. + */ +public class ServerUtils { + + /** + * Retrieve the ledger allocator pool name. + * + * @param serverRegionId region id that that server is running + * @param shardId shard id of the server + * @param useHostname whether to use hostname as the ledger allocator pool name + * @return ledger allocator pool name + * @throws IOException + */ + public static String getLedgerAllocatorPoolName(int serverRegionId, + int shardId, + boolean useHostname) + throws IOException { + if (useHostname) { + return String.format("allocator_%04d_%s", serverRegionId, + InetAddress.getLocalHost().getHostAddress()); + } else { + return String.format("allocator_%04d_%010d", serverRegionId, shardId); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/utils/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/utils/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/utils/package-info.java new file mode 100644 index 0000000..99cf736 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/utils/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Utilities used by proxy servers. + */ +package org.apache.distributedlog.service.utils; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/resources/config/server_decider.conf ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/resources/config/server_decider.conf b/distributedlog-proxy-server/src/main/resources/config/server_decider.conf new file mode 100644 index 0000000..d2fddf5 --- /dev/null +++ b/distributedlog-proxy-server/src/main/resources/config/server_decider.conf @@ -0,0 +1,31 @@ +#/** +# * Licensed to the Apache Software Foundation (ASF) under one +# * or more contributor license agreements. See the NOTICE file +# * distributed with this work for additional information +# * regarding copyright ownership. The ASF licenses this file +# * to you under the Apache License, Version 2.0 (the +# * "License"); you may not use this file except in compliance +# * with the License. You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# */ + +region_stop_accept_new_stream=0 +disable_durability_enforcement=0 +disable_write_limit=0 +bkc.repp_disable_durability_enforcement=0 +bkc.disable_ensemble_change=0 +dl.disable_logsegment_rolling=0 +dl.disable_write_limit=0 +bkc.atla.disallow_bookie_placement=0 +bkc.atlb.disallow_bookie_placement=0 +bkc.smf1.disallow_bookie_placement=0 +service_rate_limit_disabled=0 +service_checksum_disabled=0 +service_global_limiter_disabled=0 http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/resources/config/server_decider.yml ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/resources/config/server_decider.yml b/distributedlog-proxy-server/src/main/resources/config/server_decider.yml new file mode 100644 index 0000000..7df24bb --- /dev/null +++ b/distributedlog-proxy-server/src/main/resources/config/server_decider.yml @@ -0,0 +1,44 @@ +#/** +# * Licensed to the Apache Software Foundation (ASF) under one +# * or more contributor license agreements. See the NOTICE file +# * distributed with this work for additional information +# * regarding copyright ownership. The ASF licenses this file +# * to you under the Apache License, Version 2.0 (the +# * "License"); you may not use this file except in compliance +# * with the License. You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# */ + +region_stop_accept_new_stream: + default_availability: 0 +disable_durability_enforcement: + default_availability: 0 +disable_write_limit: + default_availability: 0 +bkc.repp_disable_durability_enforcement: + default_availability: 0 +bkc.disable_ensemble_change: + default_availability: 0 +dl.disable_logsegment_rolling: + default_availability: 0 +dl.disable_write_limit: + default_availability: 0 +bkc.atla.disallow_bookie_placement: + default_availability: 0 +bkc.atlb.disallow_bookie_placement: + default_availability: 0 +bkc.smf1.disallow_bookie_placement: + default_availability: 0 +service_rate_limit_disabled: + default_availability: 0 +service_checksum_disabled: + default_availability: 0 +service_global_limiter_disabled: + default_availability: 0 http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/resources/findbugsExclude.xml ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/resources/findbugsExclude.xml b/distributedlog-proxy-server/src/main/resources/findbugsExclude.xml new file mode 100644 index 0000000..e101a4d --- /dev/null +++ b/distributedlog-proxy-server/src/main/resources/findbugsExclude.xml @@ -0,0 +1,39 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +//--> +<FindBugsFilter> + <Match> + <!-- generated code, we can't be held responsible for findbugs in it //--> + <Class name="~org\.apache\.distributedlog\.thrift.*" /> + </Match> + <Match> + <!-- generated code, we can't be held responsible for findbugs in it //--> + <Class name="~org\.apache\.distributedlog\.service\.placement\.thrift.*" /> + </Match> + <Match> + <!-- it is safe to cast exception here. //--> + <Class name="org.apache.distributedlog.service.DistributedLogServiceImpl$Stream$2" /> + <Method name="onFailure" /> + <Bug pattern="BC_UNCONFIRMED_CAST" /> + </Match> + <Match> + <!-- it is safe to cast exception here. //--> + <Class name="org.apache.distributedlog.service.stream.BulkWriteOp" /> + <Method name="isDefiniteFailure" /> + <Bug pattern="BC_IMPOSSIBLE_INSTANCEOF" /> + </Match> +</FindBugsFilter> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/thrift/metadata.thrift ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/thrift/metadata.thrift b/distributedlog-proxy-server/src/main/thrift/metadata.thrift new file mode 100644 index 0000000..9cb3c72 --- /dev/null +++ b/distributedlog-proxy-server/src/main/thrift/metadata.thrift @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +namespace java org.apache.distributedlog.service.placement.thrift + +struct StreamLoad { + 1: optional string stream + 2: optional i32 load +} + +struct ServerLoad { + 1: optional string server + 2: optional i64 load + 3: optional list<StreamLoad> streams +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/client/routing/LocalRoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/client/routing/LocalRoutingService.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/client/routing/LocalRoutingService.java new file mode 100644 index 0000000..a9ddae5 --- /dev/null +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/client/routing/LocalRoutingService.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.routing; + +import com.google.common.collect.Sets; +import com.twitter.finagle.NoBrokersAvailableException; +import com.twitter.finagle.stats.StatsReceiver; +import java.net.SocketAddress; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * A local routing service that used for testing. + */ +public class LocalRoutingService implements RoutingService { + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder to build a local routing service for testing. + */ + public static class Builder implements RoutingService.Builder { + + private Builder() {} + + @Override + public RoutingService.Builder statsReceiver(StatsReceiver statsReceiver) { + return this; + } + + @Override + public LocalRoutingService build() { + return new LocalRoutingService(); + } + } + + private final Map<String, LinkedHashSet<SocketAddress>> localAddresses = + new HashMap<String, LinkedHashSet<SocketAddress>>(); + private final CopyOnWriteArrayList<RoutingListener> listeners = + new CopyOnWriteArrayList<RoutingListener>(); + + boolean allowRetrySameHost = true; + + @Override + public void startService() { + // nop + } + + @Override + public void stopService() { + // nop + } + + @Override + public synchronized Set<SocketAddress> getHosts() { + Set<SocketAddress> hosts = Sets.newHashSet(); + for (LinkedHashSet<SocketAddress> addresses : localAddresses.values()) { + hosts.addAll(addresses); + } + return hosts; + } + + @Override + public RoutingService registerListener(RoutingListener listener) { + listeners.add(listener); + return this; + } + + @Override + public RoutingService unregisterListener(RoutingListener listener) { + listeners.remove(listener); + return this; + } + + public LocalRoutingService setAllowRetrySameHost(boolean enabled) { + allowRetrySameHost = enabled; + return this; + } + + public LocalRoutingService addHost(String stream, SocketAddress address) { + boolean notify = false; + synchronized (this) { + LinkedHashSet<SocketAddress> addresses = localAddresses.get(stream); + if (null == addresses) { + addresses = new LinkedHashSet<SocketAddress>(); + localAddresses.put(stream, addresses); + } + if (addresses.add(address)) { + notify = true; + } + } + if (notify) { + for (RoutingListener listener : listeners) { + listener.onServerJoin(address); + } + } + return this; + } + + @Override + public synchronized SocketAddress getHost(String key, RoutingContext rContext) + throws NoBrokersAvailableException { + LinkedHashSet<SocketAddress> addresses = localAddresses.get(key); + + SocketAddress candidate = null; + if (null != addresses) { + for (SocketAddress host : addresses) { + if (rContext.isTriedHost(host) && !allowRetrySameHost) { + continue; + } else { + candidate = host; + break; + } + } + } + if (null != candidate) { + return candidate; + } + throw new NoBrokersAvailableException("No host available"); + } + + @Override + public void removeHost(SocketAddress address, Throwable reason) { + // nop + } +}