This is an automated email from the ASF dual-hosted git repository. liubao pushed a commit to branch 1.3.x in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
The following commit(s) were added to refs/heads/1.3.x by this push: new c0eb2d9 [SCB-2135]merge flow control function from 2.1.x to 1.3.x (#2100) c0eb2d9 is described below commit c0eb2d9f79500f87acf75f24d7c5db9b0410c800 Author: bao liu <bi...@qq.com> AuthorDate: Fri Dec 4 13:05:16 2020 +0800 [SCB-2135]merge flow control function from 2.1.x to 1.3.x (#2100) --- .../servicecomb/demo/pojo/client/PojoClient.java | 6 + .../demo/pojo/client/TestFlowControl.java | 82 +++++++ .../src/main/resources/microservice.yaml | 10 +- .../demo/pojo/server/FlowControlClientSchema.java | 31 +++ .../demo/pojo/server/FlowControlSchema.java | 31 +++ .../src/main/resources/microservice.yaml | 17 +- .../java/org/apache/servicecomb/qps/Config.java | 37 +++- .../qps/ConsumerQpsFlowControlHandler.java | 7 +- .../qps/ProviderQpsFlowControlHandler.java | 16 +- .../servicecomb/qps/QpsControllerManager.java | 218 +++++++++++++------ .../org/apache/servicecomb/qps/QpsStrategy.java | 25 +++ .../qps/strategy/AbstractQpsStrategy.java | 60 +++++ .../qps/strategy/DefaultStrategyFactory.java | 33 +++ .../FixedWindowStrategy.java} | 37 +--- .../servicecomb/qps/strategy/IStrategyFactory.java | 23 ++ .../qps/strategy/LeakyBucketStrategy.java | 70 ++++++ .../qps/strategy/TokenBucketStrategy.java | 28 +++ ...pache.servicecomb.qps.strategy.IStrategyFactory | 18 ++ .../servicecomb/qps/QpsControllerManagerTest.java | 241 ++++++++++++--------- .../org/apache/servicecomb/qps/TestConfig.java | 4 - .../qps/TestConsumerQpsFlowControlHandler.java | 54 ++--- .../qps/TestProviderQpsFlowControlHandler.java | 50 ++--- .../apache/servicecomb/qps/TestQpsStrategy.java | 56 +++++ 23 files changed, 871 insertions(+), 283 deletions(-) diff --git a/demo/demo-pojo/pojo-client/src/main/java/org/apache/servicecomb/demo/pojo/client/PojoClient.java b/demo/demo-pojo/pojo-client/src/main/java/org/apache/servicecomb/demo/pojo/client/PojoClient.java index ccb01ca..e325391 100644 --- a/demo/demo-pojo/pojo-client/src/main/java/org/apache/servicecomb/demo/pojo/client/PojoClient.java +++ b/demo/demo-pojo/pojo-client/src/main/java/org/apache/servicecomb/demo/pojo/client/PojoClient.java @@ -108,6 +108,7 @@ public class PojoClient { TestMgr.setMsg(microserviceName, transport); LOGGER.info("test {}, transport {}", microserviceName, transport); + testFlowControl(); testNull(testFromXml); testNull(test); testEmpty(test); @@ -240,6 +241,11 @@ public class PojoClient { TestMgr.check("code is ''", test.getTestString("")); } + private static void testFlowControl() throws Exception { + TestFlowControl flowControl = BeanUtils.getBean("TestFlowControl"); + flowControl.testAllTransport(); + } + private static void testNull(Test test) { TestMgr.check("code is 'null'", test.getTestString(null)); TestMgr.check(null, test.wrapParam(null)); diff --git a/demo/demo-pojo/pojo-client/src/main/java/org/apache/servicecomb/demo/pojo/client/TestFlowControl.java b/demo/demo-pojo/pojo-client/src/main/java/org/apache/servicecomb/demo/pojo/client/TestFlowControl.java new file mode 100644 index 0000000..b558d22 --- /dev/null +++ b/demo/demo-pojo/pojo-client/src/main/java/org/apache/servicecomb/demo/pojo/client/TestFlowControl.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.demo.pojo.client; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +import org.apache.servicecomb.demo.TestMgr; +import org.apache.servicecomb.provider.pojo.RpcReference; +import org.apache.servicecomb.swagger.invocation.exception.InvocationException; +import org.springframework.stereotype.Component; + +@Component("TestFlowControl") +public class TestFlowControl { + interface Client { + int foo(int num); + + int bar(int num); + } + + @RpcReference(microserviceName = "pojo", schemaId = "FlowControlSchema") + Client client1; + + @RpcReference(microserviceName = "pojo", schemaId = "FlowControlClientSchema") + Client client2; + + public void testAllTransport() throws Exception { + // 1.3.2 未统一。 2.1.5 统一了。 + String serverMsg = "InvocationException: code=429;msg={message=rejected by qps flowcontrol}"; + String clientMsg = "InvocationException: code=429;msg=CommonExceptionData [message=rejected by qps flowcontrol]"; + + testFlowControl((num) -> client1.foo(num), true, serverMsg); + testFlowControl((num) -> client1.bar(num), false, serverMsg); + testFlowControl((num) -> client2.foo(num), true, clientMsg); + testFlowControl((num) -> client2.bar(num), false, clientMsg); + } + + private void testFlowControl(Function<Integer, Integer> function, boolean expected, String message) + throws InterruptedException { + AtomicBoolean failed = new AtomicBoolean(false); + CountDownLatch countDownLatch = new CountDownLatch(10); + for (int i = 0; i < 10; i++) { + new Thread() { + public void run() { + for (int i = 0; i < 10; i++) { + try { + int result = function.apply(10); + if (result != 10) { + TestMgr.failed("", new Exception("not expected")); + } + } catch (InvocationException e) { + TestMgr.check(e.getStatusCode(), 429); + TestMgr.check(e.getMessage(), message); + failed.set(true); + break; + } + } + countDownLatch.countDown(); + } + }.start(); + } + countDownLatch.await(10, TimeUnit.SECONDS); + TestMgr.check(expected, failed.get()); + } +} diff --git a/demo/demo-pojo/pojo-client/src/main/resources/microservice.yaml b/demo/demo-pojo/pojo-client/src/main/resources/microservice.yaml index 47b01f5..fd5b8e3 100644 --- a/demo/demo-pojo/pojo-client/src/main/resources/microservice.yaml +++ b/demo/demo-pojo/pojo-client/src/main/resources/microservice.yaml @@ -38,4 +38,12 @@ servicecomb: enabled: false loadbalance: strategy: - name: Random \ No newline at end of file + name: Random + flowcontrol: + Consumer: + qps: + limit: + pojo: + FlowControlClientSchema: + foo: 3 + bar: 3000 \ No newline at end of file diff --git a/demo/demo-pojo/pojo-server/src/main/java/org/apache/servicecomb/demo/pojo/server/FlowControlClientSchema.java b/demo/demo-pojo/pojo-server/src/main/java/org/apache/servicecomb/demo/pojo/server/FlowControlClientSchema.java new file mode 100644 index 0000000..d94bcba --- /dev/null +++ b/demo/demo-pojo/pojo-server/src/main/java/org/apache/servicecomb/demo/pojo/server/FlowControlClientSchema.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.demo.pojo.server; + +import org.apache.servicecomb.provider.pojo.RpcSchema; + +@RpcSchema(schemaId = "FlowControlClientSchema") +public class FlowControlClientSchema { + public int foo(int num) { + return num; + } + + public int bar(int num) { + return num; + } +} diff --git a/demo/demo-pojo/pojo-server/src/main/java/org/apache/servicecomb/demo/pojo/server/FlowControlSchema.java b/demo/demo-pojo/pojo-server/src/main/java/org/apache/servicecomb/demo/pojo/server/FlowControlSchema.java new file mode 100644 index 0000000..ba2c293 --- /dev/null +++ b/demo/demo-pojo/pojo-server/src/main/java/org/apache/servicecomb/demo/pojo/server/FlowControlSchema.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.demo.pojo.server; + +import org.apache.servicecomb.provider.pojo.RpcSchema; + +@RpcSchema(schemaId = "FlowControlSchema") +public class FlowControlSchema { + public int foo(int num) { + return num; + } + + public int bar(int num) { + return num; + } +} diff --git a/demo/demo-pojo/pojo-server/src/main/resources/microservice.yaml b/demo/demo-pojo/pojo-server/src/main/resources/microservice.yaml index e705183..a18bb7e 100644 --- a/demo/demo-pojo/pojo-server/src/main/resources/microservice.yaml +++ b/demo/demo-pojo/pojo-server/src/main/resources/microservice.yaml @@ -27,9 +27,16 @@ servicecomb: address: 0.0.0.0:8080?protocol=http2 highway: address: 0.0.0.0:7070 - #executors: - #default: test - #Provider: - #server: test - #server.wrapParam: test + handler: + chain: + Provider: + default: qps-flowcontrol-provider + flowcontrol: + Provider: + qps: + limit: + ANY: + FlowControlSchema: + foo: 3 + bar: 3000 diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/Config.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/Config.java index 2c6d69a..8078428 100644 --- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/Config.java +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/Config.java @@ -26,6 +26,20 @@ import com.netflix.config.DynamicPropertyFactory; public final class Config { private static final Logger LOGGER = LoggerFactory.getLogger(Config.class); + public static final String STRATEGY_KEY = "servicecomb.flowcontrol.strategy"; + + public static final String ANY_SERVICE = "ANY"; + + public static final String CONSUMER_BUCKET_KEY_PREFIX = "servicecomb.flowcontrol.Consumer.qps.bucket."; + + public static final String PROVIDER_BUCKET_KEY_PREFIX = "servicecomb.flowcontrol.Provider.qps.bucket."; + + public static final String PROVIDER_BUCKET_KEY_GLOBAL = + "servicecomb.flowcontrol.Provider.qps.global.bucket"; + + public static final String CONSUMER_BUCKET_KEY_GLOBAL = + "servicecomb.flowcontrol.Consumer.qps.global.bucket"; + public static final String CONSUMER_LIMIT_KEY_PREFIX = "servicecomb.flowcontrol.Consumer.qps.limit."; public static final String PROVIDER_LIMIT_KEY_PREFIX = "servicecomb.flowcontrol.Provider.qps.limit."; @@ -33,35 +47,38 @@ public final class Config { public static final String PROVIDER_LIMIT_KEY_GLOBAL = "servicecomb.flowcontrol.Provider.qps.global.limit"; + public static final String CONSUMER_LIMIT_KEY_GLOBAL = + "servicecomb.flowcontrol.Consumer.qps.global.limit"; + public static final String CONSUMER_ENABLED = "servicecomb.flowcontrol.Consumer.qps.enabled"; public static final String PROVIDER_ENABLED = "servicecomb.flowcontrol.Provider.qps.enabled"; public static Config INSTANCE = new Config(); - private final DynamicBooleanProperty consumerEanbled = + private final DynamicBooleanProperty consumerEnabled = DynamicPropertyFactory.getInstance().getBooleanProperty(CONSUMER_ENABLED, true); - private final DynamicBooleanProperty providerEanbled = + private final DynamicBooleanProperty providerEnabled = DynamicPropertyFactory.getInstance().getBooleanProperty(PROVIDER_ENABLED, true); public Config() { - consumerEanbled.addCallback(() -> { - boolean newValue = consumerEanbled.get(); - LOGGER.info("{} changed from {} to {}", CONSUMER_ENABLED, consumerEanbled, newValue); + consumerEnabled.addCallback(() -> { + boolean newValue = consumerEnabled.get(); + LOGGER.info("{} changed from {} to {}", CONSUMER_ENABLED, consumerEnabled, newValue); }); - providerEanbled.addCallback(() -> { - boolean newValue = providerEanbled.get(); - LOGGER.info("{} changed from {} to {}", PROVIDER_ENABLED, providerEanbled, newValue); + providerEnabled.addCallback(() -> { + boolean newValue = providerEnabled.get(); + LOGGER.info("{} changed from {} to {}", PROVIDER_ENABLED, providerEnabled, newValue); }); } public boolean isConsumerEnabled() { - return consumerEanbled.get(); + return consumerEnabled.get(); } public boolean isProviderEnabled() { - return providerEanbled.get(); + return providerEnabled.get(); } } diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java index bc82c6c..1d81311 100644 --- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java @@ -28,8 +28,7 @@ import org.apache.servicecomb.swagger.invocation.exception.InvocationException; * Support 3 levels of microservice/schema/operation. */ public class ConsumerQpsFlowControlHandler implements Handler { - static final QpsControllerManager qpsControllerMgr = new QpsControllerManager() - .setConfigKeyPrefix(Config.CONSUMER_LIMIT_KEY_PREFIX); + private final QpsControllerManager qpsControllerMgr = new QpsControllerManager(false); @Override public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception { @@ -38,8 +37,8 @@ public class ConsumerQpsFlowControlHandler implements Handler { return; } - QpsController qpsController = qpsControllerMgr.getOrCreate(invocation.getMicroserviceName(), invocation); - if (qpsController.isLimitNewRequest()) { + QpsStrategy qpsStrategy = qpsControllerMgr.getOrCreate(invocation.getMicroserviceName(), invocation); + if (qpsStrategy.isLimitNewRequest()) { // return http status 429 CommonExceptionData errorData = new CommonExceptionData("rejected by qps flowcontrol"); asyncResp.consumerFail( diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java index 67ade94..0abdd47 100644 --- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java @@ -23,12 +23,9 @@ import org.apache.servicecomb.core.Invocation; import org.apache.servicecomb.swagger.invocation.AsyncResponse; import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData; import org.apache.servicecomb.swagger.invocation.exception.InvocationException; -import org.springframework.util.StringUtils; public class ProviderQpsFlowControlHandler implements Handler { - static final QpsControllerManager qpsControllerMgr = new QpsControllerManager() - .setConfigKeyPrefix(Config.PROVIDER_LIMIT_KEY_PREFIX) - .setGlobalQpsController(Config.PROVIDER_LIMIT_KEY_GLOBAL); + private final QpsControllerManager qpsControllerMgr = new QpsControllerManager(true); @Override public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception { @@ -46,15 +43,12 @@ public class ProviderQpsFlowControlHandler implements Handler { } String microserviceName = invocation.getContext(Const.SRC_MICROSERVICE); - QpsController qpsController = - StringUtils.isEmpty(microserviceName) - ? qpsControllerMgr.getGlobalQpsController() - : qpsControllerMgr.getOrCreate(microserviceName, invocation); - isLimitNewRequest(qpsController, asyncResp); + QpsStrategy qpsStrategy = qpsControllerMgr.getOrCreate(microserviceName, invocation); + isLimitNewRequest(qpsStrategy, asyncResp); } - private boolean isLimitNewRequest(QpsController qpsController, AsyncResponse asyncResp) { - if (qpsController.isLimitNewRequest()) { + private boolean isLimitNewRequest(QpsStrategy qpsStrategy, AsyncResponse asyncResp) { + if (qpsStrategy.isLimitNewRequest()) { CommonExceptionData errorData = new CommonExceptionData("rejected by qps flowcontrol"); asyncResp.producerFail(new InvocationException(QpsConst.TOO_MANY_REQUESTS_STATUS, errorData)); return true; diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java index d637e1f..57d1c96 100644 --- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java @@ -17,11 +17,17 @@ package org.apache.servicecomb.qps; +import java.util.List; import java.util.Map; import java.util.Map.Entry; +import org.apache.commons.lang3.StringUtils; import org.apache.servicecomb.core.Invocation; import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx; +import org.apache.servicecomb.foundation.common.exceptions.ServiceCombException; +import org.apache.servicecomb.foundation.common.utils.SPIServiceUtils; +import org.apache.servicecomb.qps.strategy.AbstractQpsStrategy; +import org.apache.servicecomb.qps.strategy.IStrategyFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,34 +36,82 @@ import com.netflix.config.DynamicProperty; public class QpsControllerManager { private static final Logger LOGGER = LoggerFactory.getLogger(QpsControllerManager.class); + public static final String SEPARATOR = "."; + /** * Describe the relationship between configuration and qpsController. */ - protected final Map<String, QpsController> configQpsControllerMap = new ConcurrentHashMapEx<>(); + private final Map<String, AbstractQpsStrategy> configQpsControllerMap = new ConcurrentHashMapEx<>(); /** * Describe the relationship between qualifiedKey(format is "microservice.schema.operation") and qpsController. */ - protected final Map<String, QpsController> qualifiedNameControllerMap = new ConcurrentHashMapEx<>(); + private final Map<String, AbstractQpsStrategy> qualifiedNameControllerMap = new ConcurrentHashMapEx<>(); - protected QpsController globalQpsController; + private AbstractQpsStrategy globalQpsStrategy; - public static final String SEPARATOR = "."; + private final String limitKeyPrefix; + + private final String bucketKeyPrefix; + + private final String globalLimitKey; - private String configKeyPrefix; + private final String globalBucketKey; - public QpsController getOrCreate(String microserviceName, Invocation invocation) { + public QpsControllerManager(boolean isProvider) { + if (isProvider) { + limitKeyPrefix = Config.PROVIDER_LIMIT_KEY_PREFIX; + bucketKeyPrefix = Config.PROVIDER_BUCKET_KEY_PREFIX; + globalLimitKey = Config.PROVIDER_LIMIT_KEY_GLOBAL; + globalBucketKey = Config.PROVIDER_BUCKET_KEY_GLOBAL; + } else { + limitKeyPrefix = Config.CONSUMER_LIMIT_KEY_PREFIX; + bucketKeyPrefix = Config.CONSUMER_BUCKET_KEY_PREFIX; + globalLimitKey = Config.CONSUMER_LIMIT_KEY_GLOBAL; + globalBucketKey = Config.CONSUMER_BUCKET_KEY_GLOBAL; + } + + initGlobalQpsController(); + } + + public QpsStrategy getOrCreate(String microserviceName, Invocation invocation) { + final String name = validatedName(microserviceName); return qualifiedNameControllerMap - .computeIfAbsent(microserviceName + SEPARATOR + invocation.getOperationMeta().getSchemaQualifiedName(), key -> { - return create(key, microserviceName, invocation); - }); + .computeIfAbsent( + name + SEPARATOR + invocation.getOperationMeta().getSchemaQualifiedName(), + key -> create(key, name, invocation)); + } + + private String validatedName(String microserviceName) { + String name = microserviceName; + if (StringUtils.isEmpty(microserviceName)) { + name = Config.ANY_SERVICE; + } + return name; } /** * Create relevant qpsLimit dynamicProperty and watch the configuration change. * Search and return a valid qpsController. */ - protected QpsController create(String qualifiedNameKey, String microserviceName, Invocation invocation) { + private AbstractQpsStrategy create(String qualifiedNameKey, String microserviceName, + Invocation invocation) { + createForService(qualifiedNameKey, microserviceName, invocation); + String qualifiedAnyServiceName = Config.ANY_SERVICE + qualifiedNameKey.substring(microserviceName.length()); + createForService(qualifiedAnyServiceName, Config.ANY_SERVICE, invocation); + + AbstractQpsStrategy strategy = searchQpsController(qualifiedNameKey); + if (strategy == null) { + strategy = searchQpsController(qualifiedAnyServiceName); + } + if (strategy == null) { + return globalQpsStrategy; + } + return strategy; + } + + private void createForService(String qualifiedNameKey, String microserviceName, + Invocation invocation) { // create "microservice" createQpsControllerIfNotExist(microserviceName); // create "microservice.schema" @@ -65,12 +119,10 @@ public class QpsControllerManager { qualifiedNameKey.substring(0, microserviceName.length() + invocation.getSchemaId().length() + 1)); // create "microservice.schema.operation" createQpsControllerIfNotExist(qualifiedNameKey); - - return searchQpsController(qualifiedNameKey); } /** - * <p> Use qualifiedNameKey to search {@link QpsController}. + * <p> Use qualifiedNameKey to search {@link QpsStrategy}. * Firstly try to search "microservice.schema.operation". If no valid result found, then try "microservice.schema", * and then "microservice" or global qpsController(If there is a global qpsController).</p> * <p> This method ensures that there is always an existing qpsController returned, as the relevant qpsController has @@ -79,96 +131,130 @@ public class QpsControllerManager { * @param qualifiedNameKey qualifiedNameKey in {@link #qualifiedNameControllerMap} * @return a qps controller, lower level controllers with valid qpsLimit have priority. */ - protected QpsController searchQpsController(String qualifiedNameKey) { - QpsController qpsController = configQpsControllerMap.get(qualifiedNameKey); - if (isValidQpsController(qpsController)) { - return qpsController; + private AbstractQpsStrategy searchQpsController(String qualifiedNameKey) { + AbstractQpsStrategy qpsStrategy = configQpsControllerMap.get(qualifiedNameKey); + if (isValidQpsController(qpsStrategy)) { + return qpsStrategy; } int index = qualifiedNameKey.lastIndexOf(SEPARATOR); while (index > 0) { - qpsController = configQpsControllerMap.get(qualifiedNameKey.substring(0, index)); - if (isValidQpsController(qpsController)) { - return qpsController; + qpsStrategy = configQpsControllerMap.get(qualifiedNameKey.substring(0, index)); + if (isValidQpsController(qpsStrategy)) { + return qpsStrategy; } index = qualifiedNameKey.lastIndexOf(SEPARATOR, index - 1); } - if (isValidQpsController(qpsController)) { - return qpsController; - } - - if (null != globalQpsController) { - return globalQpsController; + if (isValidQpsController(qpsStrategy)) { + return qpsStrategy; } - // if null is returned, maybe the operation qps controller is not initiated correctly. - // getOrCreateQpsController() should be invoked before. - return qpsController; + return null; } - private boolean keyMatch(String configKey, Entry<String, QpsController> controllerEntry) { + private boolean keyMatch(String configKey, Entry<String, AbstractQpsStrategy> controllerEntry) { return controllerEntry.getKey().equals(configKey) || controllerEntry.getKey().startsWith(configKey + SEPARATOR); } - private boolean isValidQpsController(QpsController qpsController) { - return null != qpsController && null != qpsController.getQpsLimit(); + private boolean isValidQpsController(AbstractQpsStrategy qpsStrategy) { + return null != qpsStrategy && null != qpsStrategy.getQpsLimit(); } private void createQpsControllerIfNotExist(String configKey) { - if (configQpsControllerMap.keySet().contains(configKey)) { + if (configQpsControllerMap.containsKey(configKey)) { return; } LOGGER.info("Create qpsController, configKey = [{}]", configKey); - DynamicProperty property = getDynamicProperty(configKey); - QpsController qpsController = new QpsController(configKey, property.getInteger()); - - configQpsControllerMap.put(configKey, qpsController); - - property.addCallback(() -> { - qpsController.setQpsLimit(property.getInteger()); - LOGGER.info("Qps limit updated, configKey = [{}], value = [{}]", configKey, property.getString()); + DynamicProperty limitProperty = DynamicProperty.getInstance(limitKeyPrefix + configKey); + DynamicProperty bucketProperty = DynamicProperty.getInstance(bucketKeyPrefix + configKey); + DynamicProperty strategyProperty = DynamicProperty.getInstance(Config.STRATEGY_KEY); + AbstractQpsStrategy qpsStrategy = chooseStrategy(configKey, limitProperty.getLong(), + bucketProperty.getLong(), strategyProperty.getString()); + + strategyProperty.addCallback(() -> { + AbstractQpsStrategy innerQpsStrategy = chooseStrategy(configKey, limitProperty.getLong(), + bucketProperty.getLong(), strategyProperty.getString()); + configQpsControllerMap.put(configKey, innerQpsStrategy); + LOGGER.info("Global flow control strategy update, value = [{}]", + strategyProperty.getString()); + updateObjMap(configKey); + }); + limitProperty.addCallback(() -> { + qpsStrategy.setQpsLimit(limitProperty.getLong()); + LOGGER.info("Qps limit updated, configKey = [{}], value = [{}]", configKey, + limitProperty.getString()); + updateObjMap(configKey); + }); + bucketProperty.addCallback(() -> { + qpsStrategy.setBucketLimit(bucketProperty.getLong()); + LOGGER.info("bucket limit updated, configKey = [{}], value = [{}]", configKey, + bucketProperty.getString()); updateObjMap(configKey); }); + + configQpsControllerMap.put(configKey, qpsStrategy); } protected void updateObjMap(String configKey) { - for (Entry<String, QpsController> controllerEntry : qualifiedNameControllerMap.entrySet()) { + for (Entry<String, AbstractQpsStrategy> controllerEntry : qualifiedNameControllerMap + .entrySet()) { if (keyMatch(configKey, controllerEntry)) { - QpsController qpsController = searchQpsController(controllerEntry.getKey()); - controllerEntry.setValue(qpsController); + AbstractQpsStrategy qpsStrategy = searchQpsController(controllerEntry.getKey()); + controllerEntry.setValue(qpsStrategy); LOGGER.info("QpsController updated, operationId = [{}], configKey = [{}], qpsLimit = [{}]", - controllerEntry.getKey(), qpsController.getKey(), qpsController.getQpsLimit()); + controllerEntry.getKey(), qpsStrategy.getKey(), qpsStrategy.getQpsLimit()); } } } - public QpsControllerManager setConfigKeyPrefix(String configKeyPrefix) { - this.configKeyPrefix = configKeyPrefix; - return this; - } - - public QpsControllerManager setGlobalQpsController(String globalConfigKey) { - DynamicProperty globalQpsProperty = DynamicProperty.getInstance(globalConfigKey); - QpsController qpsController = new QpsController(globalConfigKey, globalQpsProperty.getInteger()); - - globalQpsProperty.addCallback(() -> { - qpsController.setQpsLimit(globalQpsProperty.getInteger()); - LOGGER.info("Global qps limit update, value = [{}]", globalQpsProperty.getInteger()); + private void initGlobalQpsController() { + DynamicProperty globalLimitProperty = DynamicProperty.getInstance(globalLimitKey); + DynamicProperty globalBucketProperty = DynamicProperty.getInstance(globalBucketKey); + DynamicProperty globalStrategyProperty = DynamicProperty + .getInstance(Config.STRATEGY_KEY); + globalQpsStrategy = chooseStrategy(globalLimitKey, globalLimitProperty.getLong((long) Integer.MAX_VALUE), + globalBucketProperty.getLong(), globalStrategyProperty.getString()); + globalStrategyProperty.addCallback(() -> { + globalQpsStrategy = chooseStrategy(globalLimitKey, globalLimitProperty.getLong((long) Integer.MAX_VALUE), + globalBucketProperty.getLong(), globalStrategyProperty.getString()); + LOGGER.info("Global flow control strategy update, value = [{}]", + globalStrategyProperty.getString()); + }); + globalLimitProperty.addCallback(() -> { + globalQpsStrategy.setQpsLimit(globalLimitProperty.getLong((long) Integer.MAX_VALUE)); + LOGGER.info("Global qps limit update, value = [{}]", globalLimitProperty.getLong()); + }); + globalBucketProperty.addCallback(() -> { + globalQpsStrategy.setBucketLimit(globalBucketProperty.getLong()); + LOGGER.info("Global bucket limit update, value = [{}]", globalBucketProperty.getLong()); }); - - this.globalQpsController = qpsController; - return this; - } - - public QpsController getGlobalQpsController() { - return globalQpsController; } - protected DynamicProperty getDynamicProperty(String configKey) { - return DynamicProperty.getInstance(configKeyPrefix + configKey); + private AbstractQpsStrategy chooseStrategy(String configKey, Long limit, Long bucket, + String strategyName) { + if (StringUtils.isEmpty(strategyName)) { + strategyName = "FixedWindow"; + } + AbstractQpsStrategy strategy = null; + List<IStrategyFactory> strategyFactories = SPIServiceUtils + .getOrLoadSortedService(IStrategyFactory.class); + for (IStrategyFactory strategyFactory : strategyFactories) { + strategy = strategyFactory.createStrategy(strategyName); + if (strategy != null) { + break; + } + } + if (strategy == null) { + throw new ServiceCombException( + "the qps strategy name " + strategyName + " is not exist , please check."); + } + strategy.setKey(configKey); + strategy.setQpsLimit(limit); + strategy.setBucketLimit(bucket); + return strategy; } } diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsStrategy.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsStrategy.java new file mode 100644 index 0000000..8a712e3 --- /dev/null +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsStrategy.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.qps; + +public interface QpsStrategy { + + boolean isLimitNewRequest(); + + String name(); +} diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/AbstractQpsStrategy.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/AbstractQpsStrategy.java new file mode 100644 index 0000000..65d36aa --- /dev/null +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/AbstractQpsStrategy.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.qps.strategy; + +import org.apache.servicecomb.qps.QpsStrategy; + + +public abstract class AbstractQpsStrategy implements QpsStrategy { + + private Long qpsLimit; + + private Long bucketLimit; + + private String key; + + public Long getBucketLimit() { + return bucketLimit; + } + + public void setBucketLimit(Long bucketLimit) { + this.bucketLimit = bucketLimit; + } + + @Override + public abstract boolean isLimitNewRequest(); + + @Override + public abstract String name(); + + public void setQpsLimit(Long qpsLimit) { + this.qpsLimit = qpsLimit; + } + + public Long getQpsLimit() { + return qpsLimit; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } +} diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/DefaultStrategyFactory.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/DefaultStrategyFactory.java new file mode 100644 index 0000000..79037f7 --- /dev/null +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/DefaultStrategyFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.qps.strategy; + +public class DefaultStrategyFactory implements IStrategyFactory { + + public AbstractQpsStrategy createStrategy(String strategyName) { + switch (strategyName) { + case "TokenBucket": + return new TokenBucketStrategy(); + case "LeakyBucket": + return new LeakyBucketStrategy(); + case "FixedWindow": + return new FixedWindowStrategy(); + default: + return null; + } + } +} diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsController.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/FixedWindowStrategy.java similarity index 75% rename from handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsController.java rename to handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/FixedWindowStrategy.java index 5f294ba..f91f87f 100644 --- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsController.java +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/FixedWindowStrategy.java @@ -14,15 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package org.apache.servicecomb.qps; +package org.apache.servicecomb.qps.strategy; import java.util.concurrent.atomic.AtomicLong; -public class QpsController { - private String key; - - private Integer qpsLimit; +public class FixedWindowStrategy extends AbstractQpsStrategy { // Interval begin time private volatile long msCycleBegin; @@ -35,26 +31,13 @@ public class QpsController { private static final int CYCLE_LENGTH = 1000; - public QpsController(String key, Integer qpsLimit) { - this.key = key; - this.qpsLimit = qpsLimit; - this.msCycleBegin = System.currentTimeMillis(); - } - - public String getKey() { - return key; - } - - public Integer getQpsLimit() { - return qpsLimit; - } - - public void setQpsLimit(Integer qpsLimit) { - this.qpsLimit = qpsLimit; - } + private static final String STRATEGY_NAME = "FixedWindow"; // return true means new request need to be rejected public boolean isLimitNewRequest() { + if (this.getQpsLimit() == null) { + throw new IllegalStateException("should not happen"); + } long newCount = requestCount.incrementAndGet(); long msNow = System.currentTimeMillis(); //Time jump cause the new request injected @@ -66,7 +49,11 @@ public class QpsController { // Configuration update and use is at the situation of multi-threaded concurrency // It is possible that operation level updated to null,but schema level or microservice level does not updated - int limitValue = (qpsLimit == null) ? Integer.MAX_VALUE : qpsLimit; - return newCount - lastRequestCount >= limitValue; + return newCount - lastRequestCount >= this.getQpsLimit(); + } + + @Override + public String name() { + return STRATEGY_NAME; } } diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/IStrategyFactory.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/IStrategyFactory.java new file mode 100644 index 0000000..bebe8ce --- /dev/null +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/IStrategyFactory.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.qps.strategy; + +public interface IStrategyFactory { + + AbstractQpsStrategy createStrategy(String strategyName); +} diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/LeakyBucketStrategy.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/LeakyBucketStrategy.java new file mode 100644 index 0000000..d65d43b --- /dev/null +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/LeakyBucketStrategy.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.qps.strategy; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * leaky bucket algorithm include 2 implementation : + * 1. as a meter : it's same as the token bucket. + * 2. as a queue : the bucket size equal to qpsLimit. + * + **/ +public class LeakyBucketStrategy extends AbstractQpsStrategy { + + // Request count between Interval begin and now in one interval + private volatile AtomicLong requestCount = new AtomicLong(); + + private volatile long lastTime; + + private long remainder = 0; + + private static final String STRATEGY_NAME = "LeakyBucket"; + + @Override + public boolean isLimitNewRequest() { + if (this.getQpsLimit() == null) { + throw new IllegalStateException("should not happen"); + } + if (this.getBucketLimit() == null) { + this.setBucketLimit(Math.max(2 * this.getQpsLimit(), Integer.MAX_VALUE)); + } + long nowTime = System.currentTimeMillis(); + //get the num of te period time + long leakCount = ((nowTime - lastTime + remainder) / 1000) * this.getQpsLimit(); + remainder = (nowTime - lastTime + remainder) % 1000; + // leak the request + if (requestCount.longValue() > leakCount) { + requestCount.addAndGet(-leakCount); + } else { + requestCount.set(0); + } + lastTime = nowTime; + //compute this time + if (requestCount.longValue() < this.getBucketLimit()) { + requestCount.incrementAndGet(); + return false; + } + return true; + } + + @Override + public String name() { + return STRATEGY_NAME; + } +} diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/TokenBucketStrategy.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/TokenBucketStrategy.java new file mode 100644 index 0000000..082906f --- /dev/null +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/TokenBucketStrategy.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.qps.strategy; + +public class TokenBucketStrategy extends LeakyBucketStrategy { + + private static final String STRATEGY_NAME = "TokenBucket"; + + @Override + public String name() { + return STRATEGY_NAME; + } +} diff --git a/handlers/handler-flowcontrol-qps/src/main/resources/META-INF/services/org.apache.servicecomb.qps.strategy.IStrategyFactory b/handlers/handler-flowcontrol-qps/src/main/resources/META-INF/services/org.apache.servicecomb.qps.strategy.IStrategyFactory new file mode 100644 index 0000000..32f53fa --- /dev/null +++ b/handlers/handler-flowcontrol-qps/src/main/resources/META-INF/services/org.apache.servicecomb.qps.strategy.IStrategyFactory @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.servicecomb.qps.strategy.DefaultStrategyFactory diff --git a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/QpsControllerManagerTest.java b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/QpsControllerManagerTest.java index 44b86b2..990b44d 100644 --- a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/QpsControllerManagerTest.java +++ b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/QpsControllerManagerTest.java @@ -17,19 +17,17 @@ package org.apache.servicecomb.qps; -import java.util.Map; - import org.apache.servicecomb.core.Invocation; import org.apache.servicecomb.core.definition.OperationMeta; import org.apache.servicecomb.core.definition.SchemaMeta; import org.apache.servicecomb.foundation.test.scaffolding.config.ArchaiusUtils; +import org.apache.servicecomb.qps.strategy.AbstractQpsStrategy; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -import mockit.Deencapsulation; import mockit.Expectations; import mockit.Mocked; @@ -57,33 +55,33 @@ public class QpsControllerManagerTest { result = "server.test"; } }; - QpsControllerManager testQpsControllerManager = new QpsControllerManager() - .setConfigKeyPrefix(Config.CONSUMER_LIMIT_KEY_PREFIX); - initTestQpsControllerManager(testQpsControllerManager, invocation, operationMeta); + QpsControllerManager testQpsControllerManager = new QpsControllerManager(false); + initTestQpsControllerManager(false, testQpsControllerManager, invocation, operationMeta); // pojo - setConfigWithDefaultPrefix("pojo", 100); - QpsController qpsController = testQpsControllerManager.getOrCreate("pojo", invocation); - Assert.assertEquals("pojo", qpsController.getKey()); - Assert.assertTrue(100 == qpsController.getQpsLimit()); - qpsController = testQpsControllerManager.getOrCreate("pojo2", invocation); - Assert.assertEquals("pojo2", qpsController.getKey()); - Assert.assertNull(qpsController.getQpsLimit()); - qpsController = testQpsControllerManager.getOrCreate("poj", invocation); - Assert.assertEquals("poj", qpsController.getKey()); - Assert.assertNull(qpsController.getQpsLimit()); + setConfigWithDefaultPrefix(false, "pojo", 100); + QpsStrategy qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation); + Assert.assertEquals("pojo", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(100 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); + qpsStrategy = testQpsControllerManager.getOrCreate("pojo2", invocation); + Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); + + qpsStrategy = testQpsControllerManager.getOrCreate("poj", invocation); + Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); ArchaiusUtils.setProperty("servicecomb.flowcontrol.Consumer.qps.limit.poj.server", 10000); - qpsController = testQpsControllerManager.getOrCreate("poj", invocation); - Assert.assertEquals("poj.server", qpsController.getKey()); - Assert.assertEquals(qpsController.getQpsLimit(), (Integer) 10000); + qpsStrategy = testQpsControllerManager.getOrCreate("poj", invocation); + Assert.assertEquals("poj.server", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertEquals(((AbstractQpsStrategy) qpsStrategy).getQpsLimit(), (Long) 10000L); ArchaiusUtils.setProperty("servicecomb.flowcontrol.Consumer.qps.limit.poj.server.test", 20000); - qpsController = testQpsControllerManager.getOrCreate("poj", invocation); - Assert.assertEquals("poj.server.test", qpsController.getKey()); - Assert.assertEquals(qpsController.getQpsLimit(), (Integer) 20000); + qpsStrategy = testQpsControllerManager.getOrCreate("poj", invocation); + Assert.assertEquals("poj.server.test", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertEquals(((AbstractQpsStrategy) qpsStrategy).getQpsLimit(), (Long) 20000L); - testGetOrCreateCommon(testQpsControllerManager, invocation, operationMeta); + testGetOrCreateCommon(false, testQpsControllerManager, invocation, operationMeta); } @Test @@ -99,35 +97,33 @@ public class QpsControllerManagerTest { } }; - QpsControllerManager testQpsControllerManager = new QpsControllerManager() - .setGlobalQpsController(Config.PROVIDER_LIMIT_KEY_GLOBAL) - .setConfigKeyPrefix(Config.CONSUMER_LIMIT_KEY_PREFIX); + QpsControllerManager testQpsControllerManager = new QpsControllerManager(true); // global setConfig(Config.PROVIDER_LIMIT_KEY_GLOBAL, 50); - QpsController qpsController = testQpsControllerManager.getOrCreate("pojo", invocation); - Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, qpsController.getKey()); - Assert.assertTrue(50 == qpsController.getQpsLimit()); - qpsController = testQpsControllerManager.getOrCreate("pojo2", invocation); - Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, qpsController.getKey()); - Assert.assertTrue(50 == qpsController.getQpsLimit()); - qpsController = testQpsControllerManager.getOrCreate("poj", invocation); - Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, qpsController.getKey()); - Assert.assertTrue(50 == qpsController.getQpsLimit()); + QpsStrategy qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation); + Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(50 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); + qpsStrategy = testQpsControllerManager.getOrCreate("pojo2", invocation); + Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(50 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); + qpsStrategy = testQpsControllerManager.getOrCreate("poj", invocation); + Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(50 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); // pojo - setConfigWithDefaultPrefix("pojo", 100); - qpsController = testQpsControllerManager.getOrCreate("pojo", invocation); - Assert.assertEquals("pojo", qpsController.getKey()); - Assert.assertTrue(100 == qpsController.getQpsLimit()); - qpsController = testQpsControllerManager.getOrCreate("pojo2", invocation); - Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, qpsController.getKey()); - Assert.assertTrue(50 == qpsController.getQpsLimit()); - qpsController = testQpsControllerManager.getOrCreate("poj", invocation); - Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, qpsController.getKey()); - Assert.assertTrue(50 == qpsController.getQpsLimit()); - - testGetOrCreateCommon(testQpsControllerManager, invocation, operationMeta); + setConfigWithDefaultPrefix(true, "pojo", 100); + qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation); + Assert.assertEquals("pojo", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(100 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); + qpsStrategy = testQpsControllerManager.getOrCreate("pojo2", invocation); + Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(50 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); + qpsStrategy = testQpsControllerManager.getOrCreate("poj", invocation); + Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(50 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); + + testGetOrCreateCommon(true, testQpsControllerManager, invocation, operationMeta); } @Test @@ -142,10 +138,11 @@ public class QpsControllerManagerTest { result = "schema.opr"; } }; - QpsControllerManager qpsControllerManager = new QpsControllerManager(); - QpsController qpsController = qpsControllerManager.getOrCreate("service", invocation); - Assert.assertEquals("service", qpsController.getKey()); - Assert.assertNull(qpsController.getQpsLimit()); + QpsControllerManager qpsControllerManager = new QpsControllerManager(true); + QpsStrategy qpsStrategy = qpsControllerManager.getOrCreate("service", invocation); + Assert.assertEquals("servicecomb.flowcontrol.Provider.qps.global.limit", + ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); new Expectations() { { @@ -157,9 +154,10 @@ public class QpsControllerManagerTest { result = "test_schema.test_opr"; } }; - qpsController = qpsControllerManager.getOrCreate("test_service", invocation); - Assert.assertEquals("test_service", qpsController.getKey()); - Assert.assertNull(qpsController.getQpsLimit()); + qpsStrategy = qpsControllerManager.getOrCreate("test_service", invocation); + Assert.assertEquals("servicecomb.flowcontrol.Provider.qps.global.limit", + ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); new Expectations() { { @@ -171,9 +169,10 @@ public class QpsControllerManagerTest { result = "test-schema.test-opr"; } }; - qpsController = qpsControllerManager.getOrCreate("test-service", invocation); - Assert.assertEquals("test-service", qpsController.getKey()); - Assert.assertNull(qpsController.getQpsLimit()); + qpsStrategy = qpsControllerManager.getOrCreate("test-service", invocation); + Assert.assertEquals("servicecomb.flowcontrol.Provider.qps.global.limit", + ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); new Expectations() { { @@ -185,9 +184,10 @@ public class QpsControllerManagerTest { result = "schema.opr.tail"; } }; - qpsController = qpsControllerManager.getOrCreate("svc", invocation); - Assert.assertEquals("svc", qpsController.getKey()); - Assert.assertNull(qpsController.getQpsLimit()); + qpsStrategy = qpsControllerManager.getOrCreate("svc", invocation); + Assert.assertEquals("servicecomb.flowcontrol.Provider.qps.global.limit", + ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); new Expectations() { { @@ -199,12 +199,14 @@ public class QpsControllerManagerTest { result = "schema.opr2.tail"; } }; - qpsController = qpsControllerManager.getOrCreate("svc", invocation); - Assert.assertEquals("svc", qpsController.getKey()); - Assert.assertNull(qpsController.getQpsLimit()); + qpsStrategy = qpsControllerManager.getOrCreate("svc", invocation); + Assert.assertEquals("servicecomb.flowcontrol.Provider.qps.global.limit", + ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); } - private void testGetOrCreateCommon(QpsControllerManager testQpsControllerManager, Invocation invocation, + private void testGetOrCreateCommon(boolean isProvider, QpsControllerManager testQpsControllerManager, + Invocation invocation, OperationMeta operationMeta) { new Expectations() { { @@ -214,10 +216,10 @@ public class QpsControllerManagerTest { result = "server.test"; } }; - setConfigWithDefaultPrefix("pojo.server", 200); - QpsController qpsController = testQpsControllerManager.getOrCreate("pojo", invocation); - Assert.assertEquals("pojo.server", qpsController.getKey()); - Assert.assertTrue(200 == qpsController.getQpsLimit()); + setConfigWithDefaultPrefix(isProvider, "pojo.server", 200); + QpsStrategy qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation); + Assert.assertEquals("pojo.server", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(200 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); new Expectations() { { invocation.getOperationMeta(); @@ -226,9 +228,9 @@ public class QpsControllerManagerTest { result = "server2.test"; } }; - qpsController = testQpsControllerManager.getOrCreate("pojo", invocation); - Assert.assertEquals("pojo", qpsController.getKey()); - Assert.assertTrue(100 == qpsController.getQpsLimit()); + qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation); + Assert.assertEquals("pojo", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(100 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); new Expectations() { { invocation.getOperationMeta(); @@ -237,9 +239,9 @@ public class QpsControllerManagerTest { result = "serve.test"; } }; - qpsController = testQpsControllerManager.getOrCreate("pojo", invocation); - Assert.assertEquals("pojo", qpsController.getKey()); - Assert.assertTrue(100 == qpsController.getQpsLimit()); + qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation); + Assert.assertEquals("pojo", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(100 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); // pojo.server.test new Expectations() { @@ -250,10 +252,10 @@ public class QpsControllerManagerTest { result = "server.test"; } }; - setConfigWithDefaultPrefix("pojo.server.test", 300); - qpsController = testQpsControllerManager.getOrCreate("pojo", invocation); - Assert.assertEquals("pojo.server.test", qpsController.getKey()); - Assert.assertTrue(300 == qpsController.getQpsLimit()); + setConfigWithDefaultPrefix(isProvider, "pojo.server.test", 300); + qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation); + Assert.assertEquals("pojo.server.test", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(300 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); new Expectations() { { invocation.getOperationMeta(); @@ -262,9 +264,9 @@ public class QpsControllerManagerTest { result = "server.test2"; } }; - qpsController = testQpsControllerManager.getOrCreate("pojo", invocation); - Assert.assertEquals("pojo.server", qpsController.getKey()); - Assert.assertTrue(200 == qpsController.getQpsLimit()); + qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation); + Assert.assertEquals("pojo.server", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(200 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); new Expectations() { { invocation.getOperationMeta(); @@ -274,15 +276,16 @@ public class QpsControllerManagerTest { result = "server.tes"; } }; - qpsController = testQpsControllerManager.getOrCreate("pojo", invocation); - Assert.assertEquals("pojo.server", qpsController.getKey()); - Assert.assertTrue(200 == qpsController.getQpsLimit()); + qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation); + Assert.assertEquals("pojo.server", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(200 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); } /** * Init testQpsControllerManager to test search function. */ - private void initTestQpsControllerManager(QpsControllerManager testQpsControllerManager, Invocation invocation, + private void initTestQpsControllerManager(boolean isProvider, QpsControllerManager testQpsControllerManager, + Invocation invocation, OperationMeta operationMeta) { // pojo.server.test new Expectations() { @@ -295,9 +298,13 @@ public class QpsControllerManagerTest { result = "server.test"; } }; - QpsController qpsController = testQpsControllerManager.getOrCreate("pojo", invocation); - Assert.assertEquals("pojo", qpsController.getKey()); - Assert.assertNull(qpsController.getQpsLimit()); + QpsStrategy qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation); + if (isProvider) { + Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } else { + Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); // pojo.server.test2 new Expectations() { @@ -311,6 +318,12 @@ public class QpsControllerManagerTest { } }; testQpsControllerManager.getOrCreate("pojo", invocation); + if (isProvider) { + Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } else { + Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); // pojo.server.tes new Expectations() { @@ -324,6 +337,12 @@ public class QpsControllerManagerTest { } }; testQpsControllerManager.getOrCreate("pojo", invocation); + if (isProvider) { + Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } else { + Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); // pojo.server2.test new Expectations() { @@ -337,6 +356,12 @@ public class QpsControllerManagerTest { } }; testQpsControllerManager.getOrCreate("pojo", invocation); + if (isProvider) { + Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } else { + Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); // pojo.serve.test new Expectations() { @@ -350,6 +375,12 @@ public class QpsControllerManagerTest { } }; testQpsControllerManager.getOrCreate("pojo", invocation); + if (isProvider) { + Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } else { + Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); // pojo2.server.test new Expectations() { @@ -362,9 +393,13 @@ public class QpsControllerManagerTest { result = "server.test"; } }; - qpsController = testQpsControllerManager.getOrCreate("pojo2", invocation); - Assert.assertEquals("pojo2", qpsController.getKey()); - Assert.assertNull(qpsController.getQpsLimit()); + qpsStrategy = testQpsControllerManager.getOrCreate("pojo2", invocation); + if (isProvider) { + Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } else { + Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); // poj.server.test new Expectations() { @@ -377,9 +412,13 @@ public class QpsControllerManagerTest { result = "server.test"; } }; - qpsController = testQpsControllerManager.getOrCreate("poj", invocation); - Assert.assertEquals("poj", qpsController.getKey()); - Assert.assertNull(qpsController.getQpsLimit()); + qpsStrategy = testQpsControllerManager.getOrCreate("poj", invocation); + if (isProvider) { + Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } else { + Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); } @Test @@ -425,16 +464,12 @@ public class QpsControllerManagerTest { ArchaiusUtils.setProperty(key, value); } - public static void setConfigWithDefaultPrefix(String key, int value) { + private static void setConfigWithDefaultPrefix(boolean isProvider, String key, int value) { String configKey = Config.CONSUMER_LIMIT_KEY_PREFIX + key; - ArchaiusUtils.setProperty(configKey, value); - } + if (isProvider) { + configKey = Config.PROVIDER_LIMIT_KEY_PREFIX + key; + } - public static void clearState(QpsControllerManager qpsControllerManager) { - Map<String, QpsController> objMap = Deencapsulation.getField(qpsControllerManager, "qualifiedNameControllerMap"); - objMap.clear(); - Map<String, QpsController> configQpsControllerMap = Deencapsulation - .getField(qpsControllerManager, "configQpsControllerMap"); - configQpsControllerMap.clear(); + ArchaiusUtils.setProperty(configKey, value); } } diff --git a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestConfig.java b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestConfig.java index 04f9400..9035ba5 100644 --- a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestConfig.java +++ b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestConfig.java @@ -23,10 +23,6 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -/** - * - * - */ public class TestConfig { @BeforeClass public static void classSetup() { diff --git a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestConsumerQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestConsumerQpsFlowControlHandler.java index 3e6bbba..5aef2dd 100644 --- a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestConsumerQpsFlowControlHandler.java +++ b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestConsumerQpsFlowControlHandler.java @@ -24,6 +24,8 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.servicecomb.core.Invocation; import org.apache.servicecomb.core.definition.OperationMeta; import org.apache.servicecomb.foundation.test.scaffolding.config.ArchaiusUtils; +import org.apache.servicecomb.qps.strategy.AbstractQpsStrategy; +import org.apache.servicecomb.qps.strategy.FixedWindowStrategy; import org.apache.servicecomb.swagger.invocation.AsyncResponse; import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData; import org.apache.servicecomb.swagger.invocation.exception.InvocationException; @@ -42,7 +44,7 @@ import mockit.MockUp; public class TestConsumerQpsFlowControlHandler { - ConsumerQpsFlowControlHandler handler = new ConsumerQpsFlowControlHandler(); + ConsumerQpsFlowControlHandler handler; Invocation invocation = Mockito.mock(Invocation.class); @@ -56,42 +58,38 @@ public class TestConsumerQpsFlowControlHandler { @Before public void setUP() { ArchaiusUtils.resetConfig(); - QpsControllerManagerTest.clearState(ConsumerQpsFlowControlHandler.qpsControllerMgr); + handler = new ConsumerQpsFlowControlHandler(); } @After public void afterTest() { ArchaiusUtils.resetConfig(); - QpsControllerManagerTest.clearState(ConsumerQpsFlowControlHandler.qpsControllerMgr); } @Test public void testQpsController() { - // to avoid time influence on QpsController - new MockUp<System>() { - @Mock - long currentTimeMillis() { - return 1L; - } - }; - QpsController qpsController = new QpsController("abc", 100); - Assert.assertEquals(false, qpsController.isLimitNewRequest()); + AbstractQpsStrategy qpsStrategy = new FixedWindowStrategy(); + qpsStrategy.setKey("abc"); + qpsStrategy.setQpsLimit(100L); + Assert.assertEquals(false, qpsStrategy.isLimitNewRequest()); - qpsController.setQpsLimit(1); - Assert.assertEquals(true, qpsController.isLimitNewRequest()); + qpsStrategy.setQpsLimit(1L); + Assert.assertEquals(true, qpsStrategy.isLimitNewRequest()); } @Test public void testHandle() throws Exception { String key = "svc.schema.opr"; - QpsController qpsController = new QpsController("key", 12); + AbstractQpsStrategy qpsStrategy = new FixedWindowStrategy(); + qpsStrategy.setKey("key"); + qpsStrategy.setQpsLimit(12L); Mockito.when(invocation.getOperationMeta()).thenReturn(operationMeta); Mockito.when(operationMeta.getSchemaQualifiedName()).thenReturn("schema.opr"); Mockito.when(invocation.getSchemaId()).thenReturn("schema"); Mockito.when(invocation.getMicroserviceName()).thenReturn("svc"); - setQpsController(key, qpsController); - new MockUp<QpsController>() { + setQpsController(key, qpsStrategy); + new MockUp<FixedWindowStrategy>() { @Mock public boolean isLimitNewRequest() { return true; @@ -100,8 +98,8 @@ public class TestConsumerQpsFlowControlHandler { new MockUp<QpsControllerManager>() { @Mock - protected QpsController create(String qualifiedNameKey) { - return qpsController; + protected QpsStrategy create(String qualifiedNameKey) { + return qpsStrategy; } }; @@ -118,14 +116,16 @@ public class TestConsumerQpsFlowControlHandler { @Test public void testHandleIsLimitNewRequestAsFalse() throws Exception { String key = "service.schema.id"; - QpsController qpsController = new QpsController("service", 12); + AbstractQpsStrategy qpsStrategy = new FixedWindowStrategy(); + qpsStrategy.setKey("service"); + qpsStrategy.setQpsLimit(12L); Mockito.when(invocation.getMicroserviceName()).thenReturn("service"); Mockito.when(invocation.getOperationMeta()).thenReturn(operationMeta); Mockito.when(operationMeta.getSchemaQualifiedName()).thenReturn("schema.id"); - setQpsController(key, qpsController); + setQpsController(key, qpsStrategy); - new MockUp<QpsController>() { + new MockUp<QpsStrategy>() { @Mock public boolean isLimitNewRequest() { return false; @@ -135,8 +135,8 @@ public class TestConsumerQpsFlowControlHandler { new MockUp<QpsControllerManager>() { @Mock - protected QpsController create(String qualifiedNameKey) { - return qpsController; + protected QpsStrategy create(String qualifiedNameKey) { + return qpsStrategy; } }; handler.handle(invocation, asyncResp); @@ -144,10 +144,10 @@ public class TestConsumerQpsFlowControlHandler { Mockito.verify(invocation).next(asyncResp); } - private void setQpsController(String key, QpsController qpsController) { + private void setQpsController(String key, QpsStrategy qpsStrategy) { QpsControllerManager qpsControllerManager = Deencapsulation.getField(handler, "qpsControllerMgr"); - ConcurrentHashMap<String, QpsController> objMap = Deencapsulation + ConcurrentHashMap<String, QpsStrategy> objMap = Deencapsulation .getField(qpsControllerManager, "qualifiedNameControllerMap"); - objMap.put(key, qpsController); + objMap.put(key, qpsStrategy); } } diff --git a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java index 9f73155..dfbecfd 100644 --- a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java +++ b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java @@ -26,6 +26,8 @@ import org.apache.servicecomb.core.Const; import org.apache.servicecomb.core.Invocation; import org.apache.servicecomb.core.definition.OperationMeta; import org.apache.servicecomb.foundation.test.scaffolding.config.ArchaiusUtils; +import org.apache.servicecomb.qps.strategy.AbstractQpsStrategy; +import org.apache.servicecomb.qps.strategy.FixedWindowStrategy; import org.apache.servicecomb.swagger.invocation.AsyncResponse; import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData; import org.apache.servicecomb.swagger.invocation.exception.InvocationException; @@ -43,28 +45,25 @@ import mockit.Mock; import mockit.MockUp; public class TestProviderQpsFlowControlHandler { - ProviderQpsFlowControlHandler handler = new ProviderQpsFlowControlHandler(); + ProviderQpsFlowControlHandler handler; Invocation invocation = Mockito.mock(Invocation.class); AsyncResponse asyncResp = Mockito.mock(AsyncResponse.class); - OperationMeta operationMeta = Mockito.mock(OperationMeta.class); - @Rule public ExpectedException expectedException = ExpectedException.none(); @Before public void setUP() { ArchaiusUtils.resetConfig(); - QpsControllerManagerTest.clearState(ProviderQpsFlowControlHandler.qpsControllerMgr); + handler = new ProviderQpsFlowControlHandler(); ArchaiusUtils.setProperty(Config.PROVIDER_LIMIT_KEY_PREFIX + "test", 1); } @After public void afterTest() { ArchaiusUtils.resetConfig(); - QpsControllerManagerTest.clearState(ProviderQpsFlowControlHandler.qpsControllerMgr); } @Test @@ -84,7 +83,6 @@ public class TestProviderQpsFlowControlHandler { result = new RuntimeException("test error"); } }; - mockUpSystemTime(); ProviderQpsFlowControlHandler gHandler = new ProviderQpsFlowControlHandler(); gHandler.handle(invocation, asyncResp); @@ -100,22 +98,24 @@ public class TestProviderQpsFlowControlHandler { @Test public void testQpsController() { - mockUpSystemTime(); - QpsController qpsController = new QpsController("abc", 100); - assertFalse(qpsController.isLimitNewRequest()); + AbstractQpsStrategy qpsStrategy = new FixedWindowStrategy(); + qpsStrategy.setKey("abc"); + qpsStrategy.setQpsLimit(100L); + assertFalse(qpsStrategy.isLimitNewRequest()); - qpsController.setQpsLimit(1); - assertTrue(qpsController.isLimitNewRequest()); + qpsStrategy.setQpsLimit(1L); + assertTrue(qpsStrategy.isLimitNewRequest()); } @Test public void testHandleOnSourceMicroserviceNameIsNull() throws Exception { Mockito.when(invocation.getContext(Const.SRC_MICROSERVICE)).thenReturn(null); + OperationMeta operationMeta = QpsControllerManagerTest.getMockOperationMeta("pojo", "server", "opr"); + Mockito.when(invocation.getOperationMeta()).thenReturn(operationMeta); + Mockito.when(invocation.getSchemaId()).thenReturn("server"); // only when handler index <= 0, the qps logic works Mockito.when(invocation.getHandlerIndex()).thenReturn(0); ArchaiusUtils.setProperty("servicecomb.flowcontrol.Provider.qps.global.limit", 1); - ProviderQpsFlowControlHandler.qpsControllerMgr - .setGlobalQpsController("servicecomb.flowcontrol.Provider.qps.global.limit"); handler.handle(invocation, asyncResp); handler.handle(invocation, asyncResp); @@ -145,8 +145,11 @@ public class TestProviderQpsFlowControlHandler { new MockUp<QpsControllerManager>() { @Mock - protected QpsController create(String qualifiedNameKey) { - return new QpsController(qualifiedNameKey, 1); + protected QpsStrategy create(String qualifiedNameKey) { + AbstractQpsStrategy strategy = new FixedWindowStrategy(); + strategy.setKey(qualifiedNameKey); + strategy.setQpsLimit(1L); + return strategy; } }; @@ -172,8 +175,11 @@ public class TestProviderQpsFlowControlHandler { new MockUp<QpsControllerManager>() { @Mock - protected QpsController create(String qualifiedNameKey) { - return new QpsController(qualifiedNameKey, 1); + protected QpsStrategy create(String qualifiedNameKey) { + AbstractQpsStrategy strategy = new FixedWindowStrategy(); + strategy.setKey(qualifiedNameKey); + strategy.setQpsLimit(1L); + return strategy; } }; handler.handle(invocation, asyncResp); @@ -181,14 +187,4 @@ public class TestProviderQpsFlowControlHandler { Mockito.verify(invocation, times(0)).next(asyncResp); Mockito.verify(asyncResp, times(0)).producerFail(Mockito.any(Exception.class)); } - - private void mockUpSystemTime() { - // to avoid time influence on QpsController - new MockUp<System>() { - @Mock - long currentTimeMillis() { - return 1L; - } - }; - } } diff --git a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestQpsStrategy.java b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestQpsStrategy.java new file mode 100644 index 0000000..04c6f02 --- /dev/null +++ b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestQpsStrategy.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.qps; + +import org.apache.servicecomb.qps.strategy.AbstractQpsStrategy; +import org.apache.servicecomb.qps.strategy.FixedWindowStrategy; +import org.apache.servicecomb.qps.strategy.LeakyBucketStrategy; +import org.junit.Assert; +import org.junit.Test; + +/** + * @Author GuoYl123 + * @Date 2020/7/16 + **/ +public class TestQpsStrategy { + + @Test + public void testFixedWindowStrategy() { + AbstractQpsStrategy qpsStrategy = new FixedWindowStrategy(); + qpsStrategy.setKey("abc"); + qpsStrategy.setQpsLimit(100L); + Assert.assertEquals(false, qpsStrategy.isLimitNewRequest()); + + qpsStrategy.setQpsLimit(1L); + Assert.assertEquals(true, qpsStrategy.isLimitNewRequest()); + } + + + @Test + public void testLeakyBucketStrategy() { + LeakyBucketStrategy qpsStrategy = new LeakyBucketStrategy(); + qpsStrategy.setKey("abc"); + qpsStrategy.setQpsLimit(100L); + Assert.assertEquals(false, qpsStrategy.isLimitNewRequest()); + + qpsStrategy.setQpsLimit(1L); + qpsStrategy.setBucketLimit(1L); + Assert.assertEquals(true, qpsStrategy.isLimitNewRequest()); + } + +}