This is an automated email from the ASF dual-hosted git repository. wujimin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
The following commit(s) were added to refs/heads/master by this push: new 3cb3649 [SCB-2135]provider flow control support for operation to any client service 3cb3649 is described below commit 3cb36498ca5732767bb93818dde11ed407e48397 Author: liubao <bi...@qq.com> AuthorDate: Thu Dec 3 15:45:30 2020 +0800 [SCB-2135]provider flow control support for operation to any client service --- .../org/apache/servicecomb/core/SCBEngine.java | 3 +- .../demo/pojo/client/TestFlowControl.java | 80 +++++++++++++ .../src/main/resources/microservice.yaml | 10 +- .../demo/pojo/server/FlowControlClientSchema.java | 31 +++++ .../demo/pojo/server/FlowControlSchema.java | 31 +++++ .../src/main/resources/microservice.yaml | 17 ++- .../java/org/apache/servicecomb/qps/Config.java | 30 +++-- .../qps/ConsumerQpsFlowControlHandler.java | 4 +- .../qps/ProviderQpsFlowControlHandler.java | 11 +- .../servicecomb/qps/QpsControllerManager.java | 115 ++++++++++-------- .../qps/strategy/FixedWindowStrategy.java | 2 +- .../qps/strategy/LeakyBucketStrategy.java | 5 +- .../servicecomb/qps/QpsControllerManagerTest.java | 129 +++++++++++++-------- .../qps/TestConsumerQpsFlowControlHandler.java | 5 +- .../qps/TestProviderQpsFlowControlHandler.java | 10 +- .../provider/pojo/PojoConsumerMetaRefresher.java | 3 +- .../swagger/invocation/response/ResponsesMeta.java | 3 + .../transport/highway/HighwayClient.java | 3 +- .../rest/client/http/RestClientInvocation.java | 3 +- 19 files changed, 355 insertions(+), 140 deletions(-) diff --git a/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java b/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java index edf7645..44a912e 100644 --- a/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java +++ b/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java @@ -62,6 +62,7 @@ import org.apache.servicecomb.registry.consumer.MicroserviceVersions; import org.apache.servicecomb.registry.definition.MicroserviceNameParser; import org.apache.servicecomb.registry.swagger.SwaggerLoader; import org.apache.servicecomb.swagger.engine.SwaggerEnvironment; +import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData; import org.apache.servicecomb.swagger.invocation.exception.InvocationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -498,7 +499,7 @@ public class SCBEngine { String message = "The request is rejected. Cannot process the request due to STATUS = " + currentStatus; LOGGER.warn(message); - throw new InvocationException(Status.SERVICE_UNAVAILABLE, message); + throw new InvocationException(Status.SERVICE_UNAVAILABLE, new CommonExceptionData(message)); } } diff --git a/demo/demo-pojo/pojo-client/src/main/java/org/apache/servicecomb/demo/pojo/client/TestFlowControl.java b/demo/demo-pojo/pojo-client/src/main/java/org/apache/servicecomb/demo/pojo/client/TestFlowControl.java new file mode 100644 index 0000000..abcd772 --- /dev/null +++ b/demo/demo-pojo/pojo-client/src/main/java/org/apache/servicecomb/demo/pojo/client/TestFlowControl.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.demo.pojo.client; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +import org.apache.servicecomb.demo.CategorizedTestCase; +import org.apache.servicecomb.demo.TestMgr; +import org.apache.servicecomb.provider.pojo.RpcReference; +import org.apache.servicecomb.swagger.invocation.exception.InvocationException; +import org.springframework.stereotype.Component; + +@Component +public class TestFlowControl implements CategorizedTestCase { + interface Client { + int foo(int num); + + int bar(int num); + } + + @RpcReference(microserviceName = "pojo", schemaId = "FlowControlSchema") + Client client1; + + @RpcReference(microserviceName = "pojo", schemaId = "FlowControlClientSchema") + Client client2; + + @Override + public void testAllTransport() throws Exception { + testFlowControl((num) -> client1.foo(num), true); + testFlowControl((num) -> client1.bar(num), false); + testFlowControl((num) -> client2.foo(num), true); + testFlowControl((num) -> client2.bar(num), false); + } + + private void testFlowControl(Function<Integer, Integer> function, boolean expected) throws InterruptedException { + AtomicBoolean failed = new AtomicBoolean(false); + CountDownLatch countDownLatch = new CountDownLatch(10); + for (int i = 0; i < 10; i++) { + new Thread() { + public void run() { + for (int i = 0; i < 10; i++) { + try { + int result = function.apply(10); + if (result != 10) { + TestMgr.failed("", new Exception("not expected")); + } + } catch (InvocationException e) { + TestMgr.check(e.getStatusCode(), 429); + TestMgr.check(e.getMessage(), + "InvocationException: code=429;msg=CommonExceptionData [message=rejected by qps flowcontrol]"); + failed.set(true); + break; + } + } + countDownLatch.countDown(); + } + }.start(); + } + countDownLatch.await(10, TimeUnit.SECONDS); + TestMgr.check(expected, failed.get()); + } +} diff --git a/demo/demo-pojo/pojo-client/src/main/resources/microservice.yaml b/demo/demo-pojo/pojo-client/src/main/resources/microservice.yaml index ad15db8..0b561ff 100644 --- a/demo/demo-pojo/pojo-client/src/main/resources/microservice.yaml +++ b/demo/demo-pojo/pojo-client/src/main/resources/microservice.yaml @@ -42,4 +42,12 @@ servicecomb: name: Random metrics: window_time: 12000 - publisher.defaultLog.enabled: false # when in testing , can turn on \ No newline at end of file + publisher.defaultLog.enabled: false # when in testing , can turn on + flowcontrol: + Consumer: + qps: + limit: + pojo: + FlowControlClientSchema: + foo: 3 + bar: 3000 \ No newline at end of file diff --git a/demo/demo-pojo/pojo-server/src/main/java/org/apache/servicecomb/demo/pojo/server/FlowControlClientSchema.java b/demo/demo-pojo/pojo-server/src/main/java/org/apache/servicecomb/demo/pojo/server/FlowControlClientSchema.java new file mode 100644 index 0000000..d94bcba --- /dev/null +++ b/demo/demo-pojo/pojo-server/src/main/java/org/apache/servicecomb/demo/pojo/server/FlowControlClientSchema.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.demo.pojo.server; + +import org.apache.servicecomb.provider.pojo.RpcSchema; + +@RpcSchema(schemaId = "FlowControlClientSchema") +public class FlowControlClientSchema { + public int foo(int num) { + return num; + } + + public int bar(int num) { + return num; + } +} diff --git a/demo/demo-pojo/pojo-server/src/main/java/org/apache/servicecomb/demo/pojo/server/FlowControlSchema.java b/demo/demo-pojo/pojo-server/src/main/java/org/apache/servicecomb/demo/pojo/server/FlowControlSchema.java new file mode 100644 index 0000000..ba2c293 --- /dev/null +++ b/demo/demo-pojo/pojo-server/src/main/java/org/apache/servicecomb/demo/pojo/server/FlowControlSchema.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.demo.pojo.server; + +import org.apache.servicecomb.provider.pojo.RpcSchema; + +@RpcSchema(schemaId = "FlowControlSchema") +public class FlowControlSchema { + public int foo(int num) { + return num; + } + + public int bar(int num) { + return num; + } +} diff --git a/demo/demo-pojo/pojo-server/src/main/resources/microservice.yaml b/demo/demo-pojo/pojo-server/src/main/resources/microservice.yaml index 8ae2db4..b38218e 100644 --- a/demo/demo-pojo/pojo-server/src/main/resources/microservice.yaml +++ b/demo/demo-pojo/pojo-server/src/main/resources/microservice.yaml @@ -30,9 +30,16 @@ servicecomb: useAlpnEnabled: false highway: address: 0.0.0.0:7070 - #executors: - #default: test - #Provider: - #server: test - #server.wrapParam: test + handler: + chain: + Provider: + default: qps-flowcontrol-provider + flowcontrol: + Provider: + qps: + limit: + ANY: + FlowControlSchema: + foo: 3 + bar: 3000 diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/Config.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/Config.java index ea8e4a1..8078428 100644 --- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/Config.java +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/Config.java @@ -26,7 +26,9 @@ import com.netflix.config.DynamicPropertyFactory; public final class Config { private static final Logger LOGGER = LoggerFactory.getLogger(Config.class); - public static final String STRATEGY_KEY_PREFIX = "servicecomb.flowcontrol.strategy"; + public static final String STRATEGY_KEY = "servicecomb.flowcontrol.strategy"; + + public static final String ANY_SERVICE = "ANY"; public static final String CONSUMER_BUCKET_KEY_PREFIX = "servicecomb.flowcontrol.Consumer.qps.bucket."; @@ -35,6 +37,9 @@ public final class Config { public static final String PROVIDER_BUCKET_KEY_GLOBAL = "servicecomb.flowcontrol.Provider.qps.global.bucket"; + public static final String CONSUMER_BUCKET_KEY_GLOBAL = + "servicecomb.flowcontrol.Consumer.qps.global.bucket"; + public static final String CONSUMER_LIMIT_KEY_PREFIX = "servicecomb.flowcontrol.Consumer.qps.limit."; public static final String PROVIDER_LIMIT_KEY_PREFIX = "servicecomb.flowcontrol.Provider.qps.limit."; @@ -42,35 +47,38 @@ public final class Config { public static final String PROVIDER_LIMIT_KEY_GLOBAL = "servicecomb.flowcontrol.Provider.qps.global.limit"; + public static final String CONSUMER_LIMIT_KEY_GLOBAL = + "servicecomb.flowcontrol.Consumer.qps.global.limit"; + public static final String CONSUMER_ENABLED = "servicecomb.flowcontrol.Consumer.qps.enabled"; public static final String PROVIDER_ENABLED = "servicecomb.flowcontrol.Provider.qps.enabled"; public static Config INSTANCE = new Config(); - private final DynamicBooleanProperty consumerEanbled = + private final DynamicBooleanProperty consumerEnabled = DynamicPropertyFactory.getInstance().getBooleanProperty(CONSUMER_ENABLED, true); - private final DynamicBooleanProperty providerEanbled = + private final DynamicBooleanProperty providerEnabled = DynamicPropertyFactory.getInstance().getBooleanProperty(PROVIDER_ENABLED, true); public Config() { - consumerEanbled.addCallback(() -> { - boolean newValue = consumerEanbled.get(); - LOGGER.info("{} changed from {} to {}", CONSUMER_ENABLED, consumerEanbled, newValue); + consumerEnabled.addCallback(() -> { + boolean newValue = consumerEnabled.get(); + LOGGER.info("{} changed from {} to {}", CONSUMER_ENABLED, consumerEnabled, newValue); }); - providerEanbled.addCallback(() -> { - boolean newValue = providerEanbled.get(); - LOGGER.info("{} changed from {} to {}", PROVIDER_ENABLED, providerEanbled, newValue); + providerEnabled.addCallback(() -> { + boolean newValue = providerEnabled.get(); + LOGGER.info("{} changed from {} to {}", PROVIDER_ENABLED, providerEnabled, newValue); }); } public boolean isConsumerEnabled() { - return consumerEanbled.get(); + return consumerEnabled.get(); } public boolean isProviderEnabled() { - return providerEanbled.get(); + return providerEnabled.get(); } } diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java index 0972433..1d81311 100644 --- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java @@ -28,9 +28,7 @@ import org.apache.servicecomb.swagger.invocation.exception.InvocationException; * Support 3 levels of microservice/schema/operation. */ public class ConsumerQpsFlowControlHandler implements Handler { - static final QpsControllerManager qpsControllerMgr = new QpsControllerManager() - .setLimitKeyPrefix(Config.CONSUMER_LIMIT_KEY_PREFIX) - .setBucketKeyPrefix(Config.CONSUMER_BUCKET_KEY_PREFIX); + private final QpsControllerManager qpsControllerMgr = new QpsControllerManager(false); @Override public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception { diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java index c5e4445..0abdd47 100644 --- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java @@ -23,13 +23,9 @@ import org.apache.servicecomb.core.Invocation; import org.apache.servicecomb.swagger.invocation.AsyncResponse; import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData; import org.apache.servicecomb.swagger.invocation.exception.InvocationException; -import org.springframework.util.StringUtils; public class ProviderQpsFlowControlHandler implements Handler { - static final QpsControllerManager qpsControllerMgr = new QpsControllerManager() - .setLimitKeyPrefix(Config.PROVIDER_LIMIT_KEY_PREFIX) - .setBucketKeyPrefix(Config.PROVIDER_BUCKET_KEY_PREFIX) - .setGlobalQpsStrategy(Config.PROVIDER_LIMIT_KEY_GLOBAL, Config.PROVIDER_BUCKET_KEY_GLOBAL); + private final QpsControllerManager qpsControllerMgr = new QpsControllerManager(true); @Override public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception { @@ -47,10 +43,7 @@ public class ProviderQpsFlowControlHandler implements Handler { } String microserviceName = invocation.getContext(Const.SRC_MICROSERVICE); - QpsStrategy qpsStrategy = - StringUtils.isEmpty(microserviceName) - ? qpsControllerMgr.getGlobalQpsStrategy() - : qpsControllerMgr.getOrCreate(microserviceName, invocation); + QpsStrategy qpsStrategy = qpsControllerMgr.getOrCreate(microserviceName, invocation); isLimitNewRequest(qpsStrategy, asyncResp); } diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java index a287ca2..57d1c96 100644 --- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import org.apache.commons.lang3.StringUtils; import org.apache.servicecomb.core.Invocation; import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx; import org.apache.servicecomb.foundation.common.exceptions.ServiceCombException; @@ -31,41 +32,85 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.netflix.config.DynamicProperty; -import org.apache.commons.lang3.StringUtils; public class QpsControllerManager { private static final Logger LOGGER = LoggerFactory.getLogger(QpsControllerManager.class); + public static final String SEPARATOR = "."; + /** * Describe the relationship between configuration and qpsController. */ - protected final Map<String, AbstractQpsStrategy> configQpsControllerMap = new ConcurrentHashMapEx<>(); + private final Map<String, AbstractQpsStrategy> configQpsControllerMap = new ConcurrentHashMapEx<>(); /** * Describe the relationship between qualifiedKey(format is "microservice.schema.operation") and qpsController. */ - protected final Map<String, AbstractQpsStrategy> qualifiedNameControllerMap = new ConcurrentHashMapEx<>(); + private final Map<String, AbstractQpsStrategy> qualifiedNameControllerMap = new ConcurrentHashMapEx<>(); - protected AbstractQpsStrategy globalQpsStrategy; + private AbstractQpsStrategy globalQpsStrategy; - public static final String SEPARATOR = "."; + private final String limitKeyPrefix; - private String limitKeyPrefix; + private final String bucketKeyPrefix; - private String bucketKeyPrefix; + private final String globalLimitKey; + + private final String globalBucketKey; + + public QpsControllerManager(boolean isProvider) { + if (isProvider) { + limitKeyPrefix = Config.PROVIDER_LIMIT_KEY_PREFIX; + bucketKeyPrefix = Config.PROVIDER_BUCKET_KEY_PREFIX; + globalLimitKey = Config.PROVIDER_LIMIT_KEY_GLOBAL; + globalBucketKey = Config.PROVIDER_BUCKET_KEY_GLOBAL; + } else { + limitKeyPrefix = Config.CONSUMER_LIMIT_KEY_PREFIX; + bucketKeyPrefix = Config.CONSUMER_BUCKET_KEY_PREFIX; + globalLimitKey = Config.CONSUMER_LIMIT_KEY_GLOBAL; + globalBucketKey = Config.CONSUMER_BUCKET_KEY_GLOBAL; + } + + initGlobalQpsController(); + } public QpsStrategy getOrCreate(String microserviceName, Invocation invocation) { + final String name = validatedName(microserviceName); return qualifiedNameControllerMap .computeIfAbsent( - microserviceName + SEPARATOR + invocation.getOperationMeta().getSchemaQualifiedName(), - key -> create(key, microserviceName, invocation)); + name + SEPARATOR + invocation.getOperationMeta().getSchemaQualifiedName(), + key -> create(key, name, invocation)); + } + + private String validatedName(String microserviceName) { + String name = microserviceName; + if (StringUtils.isEmpty(microserviceName)) { + name = Config.ANY_SERVICE; + } + return name; } /** * Create relevant qpsLimit dynamicProperty and watch the configuration change. * Search and return a valid qpsController. */ - protected AbstractQpsStrategy create(String qualifiedNameKey, String microserviceName, + private AbstractQpsStrategy create(String qualifiedNameKey, String microserviceName, + Invocation invocation) { + createForService(qualifiedNameKey, microserviceName, invocation); + String qualifiedAnyServiceName = Config.ANY_SERVICE + qualifiedNameKey.substring(microserviceName.length()); + createForService(qualifiedAnyServiceName, Config.ANY_SERVICE, invocation); + + AbstractQpsStrategy strategy = searchQpsController(qualifiedNameKey); + if (strategy == null) { + strategy = searchQpsController(qualifiedAnyServiceName); + } + if (strategy == null) { + return globalQpsStrategy; + } + return strategy; + } + + private void createForService(String qualifiedNameKey, String microserviceName, Invocation invocation) { // create "microservice" createQpsControllerIfNotExist(microserviceName); @@ -74,8 +119,6 @@ public class QpsControllerManager { qualifiedNameKey.substring(0, microserviceName.length() + invocation.getSchemaId().length() + 1)); // create "microservice.schema.operation" createQpsControllerIfNotExist(qualifiedNameKey); - - return searchQpsController(qualifiedNameKey); } /** @@ -88,7 +131,7 @@ public class QpsControllerManager { * @param qualifiedNameKey qualifiedNameKey in {@link #qualifiedNameControllerMap} * @return a qps controller, lower level controllers with valid qpsLimit have priority. */ - protected AbstractQpsStrategy searchQpsController(String qualifiedNameKey) { + private AbstractQpsStrategy searchQpsController(String qualifiedNameKey) { AbstractQpsStrategy qpsStrategy = configQpsControllerMap.get(qualifiedNameKey); if (isValidQpsController(qpsStrategy)) { return qpsStrategy; @@ -108,13 +151,7 @@ public class QpsControllerManager { return qpsStrategy; } - if (null != globalQpsStrategy) { - return globalQpsStrategy; - } - - // if null is returned, maybe the operation qps controller is not initiated correctly. - // getOrCreateQpsController() should be invoked before. - return qpsStrategy; + return null; } private boolean keyMatch(String configKey, Entry<String, AbstractQpsStrategy> controllerEntry) { @@ -127,15 +164,14 @@ public class QpsControllerManager { } private void createQpsControllerIfNotExist(String configKey) { - if (configQpsControllerMap.keySet().contains(configKey)) { + if (configQpsControllerMap.containsKey(configKey)) { return; } LOGGER.info("Create qpsController, configKey = [{}]", configKey); DynamicProperty limitProperty = DynamicProperty.getInstance(limitKeyPrefix + configKey); DynamicProperty bucketProperty = DynamicProperty.getInstance(bucketKeyPrefix + configKey); - DynamicProperty strategyProperty = DynamicProperty - .getInstance(Config.STRATEGY_KEY_PREFIX); + DynamicProperty strategyProperty = DynamicProperty.getInstance(Config.STRATEGY_KEY); AbstractQpsStrategy qpsStrategy = chooseStrategy(configKey, limitProperty.getLong(), bucketProperty.getLong(), strategyProperty.getString()); @@ -175,41 +211,30 @@ public class QpsControllerManager { } } - public QpsControllerManager setLimitKeyPrefix(String limitKeyPrefix) { - this.limitKeyPrefix = limitKeyPrefix; - return this; - } - - public QpsControllerManager setBucketKeyPrefix(String bucketKeyPrefix) { - this.bucketKeyPrefix = bucketKeyPrefix; - return this; - } - - public QpsControllerManager setGlobalQpsStrategy(String globalLimitKey, String globalBucketKey) { + private void initGlobalQpsController() { DynamicProperty globalLimitProperty = DynamicProperty.getInstance(globalLimitKey); DynamicProperty globalBucketProperty = DynamicProperty.getInstance(globalBucketKey); DynamicProperty globalStrategyProperty = DynamicProperty - .getInstance(Config.STRATEGY_KEY_PREFIX); - globalQpsStrategy = chooseStrategy(globalLimitKey, globalLimitProperty.getLong(), + .getInstance(Config.STRATEGY_KEY); + globalQpsStrategy = chooseStrategy(globalLimitKey, globalLimitProperty.getLong((long) Integer.MAX_VALUE), globalBucketProperty.getLong(), globalStrategyProperty.getString()); globalStrategyProperty.addCallback(() -> { - globalQpsStrategy = chooseStrategy(globalLimitKey, globalLimitProperty.getLong(), + globalQpsStrategy = chooseStrategy(globalLimitKey, globalLimitProperty.getLong((long) Integer.MAX_VALUE), globalBucketProperty.getLong(), globalStrategyProperty.getString()); LOGGER.info("Global flow control strategy update, value = [{}]", globalStrategyProperty.getString()); }); globalLimitProperty.addCallback(() -> { - globalQpsStrategy.setQpsLimit(globalLimitProperty.getLong()); - LOGGER.info("Global qps limit update, value = [{}]", globalLimitProperty.getInteger()); + globalQpsStrategy.setQpsLimit(globalLimitProperty.getLong((long) Integer.MAX_VALUE)); + LOGGER.info("Global qps limit update, value = [{}]", globalLimitProperty.getLong()); }); globalBucketProperty.addCallback(() -> { globalQpsStrategy.setBucketLimit(globalBucketProperty.getLong()); - LOGGER.info("Global bucket limit update, value = [{}]", globalBucketProperty.getInteger()); + LOGGER.info("Global bucket limit update, value = [{}]", globalBucketProperty.getLong()); }); - return this; } - private AbstractQpsStrategy chooseStrategy(String globalConfigKey, Long limit, Long bucket, + private AbstractQpsStrategy chooseStrategy(String configKey, Long limit, Long bucket, String strategyName) { if (StringUtils.isEmpty(strategyName)) { strategyName = "FixedWindow"; @@ -227,13 +252,9 @@ public class QpsControllerManager { throw new ServiceCombException( "the qps strategy name " + strategyName + " is not exist , please check."); } - strategy.setKey(globalConfigKey); + strategy.setKey(configKey); strategy.setQpsLimit(limit); strategy.setBucketLimit(bucket); return strategy; } - - public QpsStrategy getGlobalQpsStrategy() { - return globalQpsStrategy; - } } diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/FixedWindowStrategy.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/FixedWindowStrategy.java index 3c5fe63..f91f87f 100644 --- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/FixedWindowStrategy.java +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/FixedWindowStrategy.java @@ -36,7 +36,7 @@ public class FixedWindowStrategy extends AbstractQpsStrategy { // return true means new request need to be rejected public boolean isLimitNewRequest() { if (this.getQpsLimit() == null) { - this.setQpsLimit(Long.MAX_VALUE); + throw new IllegalStateException("should not happen"); } long newCount = requestCount.incrementAndGet(); long msNow = System.currentTimeMillis(); diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/LeakyBucketStrategy.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/LeakyBucketStrategy.java index d6a65ee..d65d43b 100644 --- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/LeakyBucketStrategy.java +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/LeakyBucketStrategy.java @@ -39,11 +39,10 @@ public class LeakyBucketStrategy extends AbstractQpsStrategy { @Override public boolean isLimitNewRequest() { if (this.getQpsLimit() == null) { - this.setQpsLimit(Long.MAX_VALUE); + throw new IllegalStateException("should not happen"); } if (this.getBucketLimit() == null) { - this.setBucketLimit( - this.getQpsLimit() <= Long.MAX_VALUE / 2 ? this.getQpsLimit() * 2 : this.getQpsLimit()); + this.setBucketLimit(Math.max(2 * this.getQpsLimit(), Integer.MAX_VALUE)); } long nowTime = System.currentTimeMillis(); //get the num of te period time diff --git a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/QpsControllerManagerTest.java b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/QpsControllerManagerTest.java index 6089eac..990b44d 100644 --- a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/QpsControllerManagerTest.java +++ b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/QpsControllerManagerTest.java @@ -17,8 +17,6 @@ package org.apache.servicecomb.qps; -import java.util.Map; - import org.apache.servicecomb.core.Invocation; import org.apache.servicecomb.core.definition.OperationMeta; import org.apache.servicecomb.core.definition.SchemaMeta; @@ -30,7 +28,6 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -import mockit.Deencapsulation; import mockit.Expectations; import mockit.Mocked; @@ -58,21 +55,21 @@ public class QpsControllerManagerTest { result = "server.test"; } }; - QpsControllerManager testQpsControllerManager = new QpsControllerManager() - .setLimitKeyPrefix(Config.CONSUMER_LIMIT_KEY_PREFIX); - initTestQpsControllerManager(testQpsControllerManager, invocation, operationMeta); + QpsControllerManager testQpsControllerManager = new QpsControllerManager(false); + initTestQpsControllerManager(false, testQpsControllerManager, invocation, operationMeta); // pojo - setConfigWithDefaultPrefix("pojo", 100); + setConfigWithDefaultPrefix(false, "pojo", 100); QpsStrategy qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation); Assert.assertEquals("pojo", ((AbstractQpsStrategy) qpsStrategy).getKey()); Assert.assertTrue(100 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); qpsStrategy = testQpsControllerManager.getOrCreate("pojo2", invocation); - Assert.assertEquals("pojo2", ((AbstractQpsStrategy) qpsStrategy).getKey()); - Assert.assertNull(((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); + Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); + qpsStrategy = testQpsControllerManager.getOrCreate("poj", invocation); - Assert.assertEquals("poj", ((AbstractQpsStrategy) qpsStrategy).getKey()); - Assert.assertNull(((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); + Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); ArchaiusUtils.setProperty("servicecomb.flowcontrol.Consumer.qps.limit.poj.server", 10000); qpsStrategy = testQpsControllerManager.getOrCreate("poj", invocation); @@ -84,7 +81,7 @@ public class QpsControllerManagerTest { Assert.assertEquals("poj.server.test", ((AbstractQpsStrategy) qpsStrategy).getKey()); Assert.assertEquals(((AbstractQpsStrategy) qpsStrategy).getQpsLimit(), (Long) 20000L); - testGetOrCreateCommon(testQpsControllerManager, invocation, operationMeta); + testGetOrCreateCommon(false, testQpsControllerManager, invocation, operationMeta); } @Test @@ -100,9 +97,7 @@ public class QpsControllerManagerTest { } }; - QpsControllerManager testQpsControllerManager = new QpsControllerManager() - .setGlobalQpsStrategy(Config.PROVIDER_LIMIT_KEY_GLOBAL, Config.PROVIDER_BUCKET_KEY_GLOBAL) - .setLimitKeyPrefix(Config.CONSUMER_LIMIT_KEY_PREFIX); + QpsControllerManager testQpsControllerManager = new QpsControllerManager(true); // global setConfig(Config.PROVIDER_LIMIT_KEY_GLOBAL, 50); @@ -117,7 +112,7 @@ public class QpsControllerManagerTest { Assert.assertTrue(50 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); // pojo - setConfigWithDefaultPrefix("pojo", 100); + setConfigWithDefaultPrefix(true, "pojo", 100); qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation); Assert.assertEquals("pojo", ((AbstractQpsStrategy) qpsStrategy).getKey()); Assert.assertTrue(100 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); @@ -128,7 +123,7 @@ public class QpsControllerManagerTest { Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); Assert.assertTrue(50 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); - testGetOrCreateCommon(testQpsControllerManager, invocation, operationMeta); + testGetOrCreateCommon(true, testQpsControllerManager, invocation, operationMeta); } @Test @@ -143,10 +138,11 @@ public class QpsControllerManagerTest { result = "schema.opr"; } }; - QpsControllerManager qpsControllerManager = new QpsControllerManager(); + QpsControllerManager qpsControllerManager = new QpsControllerManager(true); QpsStrategy qpsStrategy = qpsControllerManager.getOrCreate("service", invocation); - Assert.assertEquals("service", ((AbstractQpsStrategy) qpsStrategy).getKey()); - Assert.assertNull(((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); + Assert.assertEquals("servicecomb.flowcontrol.Provider.qps.global.limit", + ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); new Expectations() { { @@ -159,8 +155,9 @@ public class QpsControllerManagerTest { } }; qpsStrategy = qpsControllerManager.getOrCreate("test_service", invocation); - Assert.assertEquals("test_service", ((AbstractQpsStrategy) qpsStrategy).getKey()); - Assert.assertNull(((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); + Assert.assertEquals("servicecomb.flowcontrol.Provider.qps.global.limit", + ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); new Expectations() { { @@ -173,8 +170,9 @@ public class QpsControllerManagerTest { } }; qpsStrategy = qpsControllerManager.getOrCreate("test-service", invocation); - Assert.assertEquals("test-service", ((AbstractQpsStrategy) qpsStrategy).getKey()); - Assert.assertNull(((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); + Assert.assertEquals("servicecomb.flowcontrol.Provider.qps.global.limit", + ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); new Expectations() { { @@ -187,8 +185,9 @@ public class QpsControllerManagerTest { } }; qpsStrategy = qpsControllerManager.getOrCreate("svc", invocation); - Assert.assertEquals("svc", ((AbstractQpsStrategy) qpsStrategy).getKey()); - Assert.assertNull(((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); + Assert.assertEquals("servicecomb.flowcontrol.Provider.qps.global.limit", + ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); new Expectations() { { @@ -201,11 +200,13 @@ public class QpsControllerManagerTest { } }; qpsStrategy = qpsControllerManager.getOrCreate("svc", invocation); - Assert.assertEquals("svc", ((AbstractQpsStrategy) qpsStrategy).getKey()); - Assert.assertNull(((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); + Assert.assertEquals("servicecomb.flowcontrol.Provider.qps.global.limit", + ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); } - private void testGetOrCreateCommon(QpsControllerManager testQpsControllerManager, Invocation invocation, + private void testGetOrCreateCommon(boolean isProvider, QpsControllerManager testQpsControllerManager, + Invocation invocation, OperationMeta operationMeta) { new Expectations() { { @@ -215,7 +216,7 @@ public class QpsControllerManagerTest { result = "server.test"; } }; - setConfigWithDefaultPrefix("pojo.server", 200); + setConfigWithDefaultPrefix(isProvider, "pojo.server", 200); QpsStrategy qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation); Assert.assertEquals("pojo.server", ((AbstractQpsStrategy) qpsStrategy).getKey()); Assert.assertTrue(200 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); @@ -251,7 +252,7 @@ public class QpsControllerManagerTest { result = "server.test"; } }; - setConfigWithDefaultPrefix("pojo.server.test", 300); + setConfigWithDefaultPrefix(isProvider, "pojo.server.test", 300); qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation); Assert.assertEquals("pojo.server.test", ((AbstractQpsStrategy) qpsStrategy).getKey()); Assert.assertTrue(300 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); @@ -283,7 +284,8 @@ public class QpsControllerManagerTest { /** * Init testQpsControllerManager to test search function. */ - private void initTestQpsControllerManager(QpsControllerManager testQpsControllerManager, Invocation invocation, + private void initTestQpsControllerManager(boolean isProvider, QpsControllerManager testQpsControllerManager, + Invocation invocation, OperationMeta operationMeta) { // pojo.server.test new Expectations() { @@ -297,8 +299,12 @@ public class QpsControllerManagerTest { } }; QpsStrategy qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation); - Assert.assertEquals("pojo", ((AbstractQpsStrategy) qpsStrategy).getKey()); - Assert.assertNull(((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); + if (isProvider) { + Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } else { + Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); // pojo.server.test2 new Expectations() { @@ -312,6 +318,12 @@ public class QpsControllerManagerTest { } }; testQpsControllerManager.getOrCreate("pojo", invocation); + if (isProvider) { + Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } else { + Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); // pojo.server.tes new Expectations() { @@ -325,6 +337,12 @@ public class QpsControllerManagerTest { } }; testQpsControllerManager.getOrCreate("pojo", invocation); + if (isProvider) { + Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } else { + Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); // pojo.server2.test new Expectations() { @@ -338,6 +356,12 @@ public class QpsControllerManagerTest { } }; testQpsControllerManager.getOrCreate("pojo", invocation); + if (isProvider) { + Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } else { + Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); // pojo.serve.test new Expectations() { @@ -351,6 +375,12 @@ public class QpsControllerManagerTest { } }; testQpsControllerManager.getOrCreate("pojo", invocation); + if (isProvider) { + Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } else { + Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); // pojo2.server.test new Expectations() { @@ -364,8 +394,12 @@ public class QpsControllerManagerTest { } }; qpsStrategy = testQpsControllerManager.getOrCreate("pojo2", invocation); - Assert.assertEquals("pojo2", ((AbstractQpsStrategy) qpsStrategy).getKey()); - Assert.assertNull(((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); + if (isProvider) { + Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } else { + Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); // poj.server.test new Expectations() { @@ -379,8 +413,12 @@ public class QpsControllerManagerTest { } }; qpsStrategy = testQpsControllerManager.getOrCreate("poj", invocation); - Assert.assertEquals("poj", ((AbstractQpsStrategy) qpsStrategy).getKey()); - Assert.assertNull(((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); + if (isProvider) { + Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } else { + Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + } + Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue()); } @Test @@ -426,17 +464,12 @@ public class QpsControllerManagerTest { ArchaiusUtils.setProperty(key, value); } - public static void setConfigWithDefaultPrefix(String key, int value) { + private static void setConfigWithDefaultPrefix(boolean isProvider, String key, int value) { String configKey = Config.CONSUMER_LIMIT_KEY_PREFIX + key; - ArchaiusUtils.setProperty(configKey, value); - } + if (isProvider) { + configKey = Config.PROVIDER_LIMIT_KEY_PREFIX + key; + } - public static void clearState(QpsControllerManager qpsControllerManager) { - Map<String, QpsStrategy> objMap = Deencapsulation - .getField(qpsControllerManager, "qualifiedNameControllerMap"); - objMap.clear(); - Map<String, QpsStrategy> configQpsControllerMap = Deencapsulation - .getField(qpsControllerManager, "configQpsControllerMap"); - configQpsControllerMap.clear(); + ArchaiusUtils.setProperty(configKey, value); } } diff --git a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestConsumerQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestConsumerQpsFlowControlHandler.java index 1cda2cf..5aef2dd 100644 --- a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestConsumerQpsFlowControlHandler.java +++ b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestConsumerQpsFlowControlHandler.java @@ -44,7 +44,7 @@ import mockit.MockUp; public class TestConsumerQpsFlowControlHandler { - ConsumerQpsFlowControlHandler handler = new ConsumerQpsFlowControlHandler(); + ConsumerQpsFlowControlHandler handler; Invocation invocation = Mockito.mock(Invocation.class); @@ -58,14 +58,13 @@ public class TestConsumerQpsFlowControlHandler { @Before public void setUP() { ArchaiusUtils.resetConfig(); - QpsControllerManagerTest.clearState(ConsumerQpsFlowControlHandler.qpsControllerMgr); + handler = new ConsumerQpsFlowControlHandler(); } @After public void afterTest() { ArchaiusUtils.resetConfig(); - QpsControllerManagerTest.clearState(ConsumerQpsFlowControlHandler.qpsControllerMgr); } @Test diff --git a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java index 39cc1b9..dfbecfd 100644 --- a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java +++ b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java @@ -45,7 +45,7 @@ import mockit.Mock; import mockit.MockUp; public class TestProviderQpsFlowControlHandler { - ProviderQpsFlowControlHandler handler = new ProviderQpsFlowControlHandler(); + ProviderQpsFlowControlHandler handler; Invocation invocation = Mockito.mock(Invocation.class); @@ -57,14 +57,13 @@ public class TestProviderQpsFlowControlHandler { @Before public void setUP() { ArchaiusUtils.resetConfig(); - QpsControllerManagerTest.clearState(ProviderQpsFlowControlHandler.qpsControllerMgr); + handler = new ProviderQpsFlowControlHandler(); ArchaiusUtils.setProperty(Config.PROVIDER_LIMIT_KEY_PREFIX + "test", 1); } @After public void afterTest() { ArchaiusUtils.resetConfig(); - QpsControllerManagerTest.clearState(ProviderQpsFlowControlHandler.qpsControllerMgr); } @Test @@ -111,11 +110,12 @@ public class TestProviderQpsFlowControlHandler { @Test public void testHandleOnSourceMicroserviceNameIsNull() throws Exception { Mockito.when(invocation.getContext(Const.SRC_MICROSERVICE)).thenReturn(null); + OperationMeta operationMeta = QpsControllerManagerTest.getMockOperationMeta("pojo", "server", "opr"); + Mockito.when(invocation.getOperationMeta()).thenReturn(operationMeta); + Mockito.when(invocation.getSchemaId()).thenReturn("server"); // only when handler index <= 0, the qps logic works Mockito.when(invocation.getHandlerIndex()).thenReturn(0); ArchaiusUtils.setProperty("servicecomb.flowcontrol.Provider.qps.global.limit", 1); - ProviderQpsFlowControlHandler.qpsControllerMgr - .setGlobalQpsStrategy(Config.PROVIDER_LIMIT_KEY_GLOBAL, Config.PROVIDER_BUCKET_KEY_GLOBAL); handler.handle(invocation, asyncResp); handler.handle(invocation, asyncResp); diff --git a/providers/provider-pojo/src/main/java/org/apache/servicecomb/provider/pojo/PojoConsumerMetaRefresher.java b/providers/provider-pojo/src/main/java/org/apache/servicecomb/provider/pojo/PojoConsumerMetaRefresher.java index e137761..ef31036 100644 --- a/providers/provider-pojo/src/main/java/org/apache/servicecomb/provider/pojo/PojoConsumerMetaRefresher.java +++ b/providers/provider-pojo/src/main/java/org/apache/servicecomb/provider/pojo/PojoConsumerMetaRefresher.java @@ -25,6 +25,7 @@ import org.apache.servicecomb.core.definition.SchemaMeta; import org.apache.servicecomb.core.provider.consumer.MicroserviceReferenceConfig; import org.apache.servicecomb.provider.pojo.definition.PojoConsumerMeta; import org.apache.servicecomb.swagger.engine.SwaggerConsumer; +import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData; import org.apache.servicecomb.swagger.invocation.exception.InvocationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,7 +65,7 @@ public class PojoConsumerMetaRefresher { String message = "The request is rejected. Cannot process the request due to SCBEngine not ready."; LOGGER.warn(message); - throw new InvocationException(Status.SERVICE_UNAVAILABLE, message); + throw new InvocationException(Status.SERVICE_UNAVAILABLE, new CommonExceptionData(message)); } this.scbEngine = SCBEngine.getInstance(); diff --git a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/ResponsesMeta.java b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/ResponsesMeta.java index 0067945..a72b260 100644 --- a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/ResponsesMeta.java +++ b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/ResponsesMeta.java @@ -97,6 +97,9 @@ public class ResponsesMeta { responseMap.putIfAbsent(ExceptionFactory.CONSUMER_INNER_STATUS_CODE, COMMON_EXCEPTION_JAVA_TYPE); responseMap.putIfAbsent(ExceptionFactory.PRODUCER_INNER_STATUS_CODE, COMMON_EXCEPTION_JAVA_TYPE); + responseMap.putIfAbsent(Status.TOO_MANY_REQUESTS.getStatusCode(), COMMON_EXCEPTION_JAVA_TYPE); + responseMap.putIfAbsent(Status.REQUEST_TIMEOUT.getStatusCode(), COMMON_EXCEPTION_JAVA_TYPE); + responseMap.putIfAbsent(Status.SERVICE_UNAVAILABLE.getStatusCode(), COMMON_EXCEPTION_JAVA_TYPE); if (defaultResponse == null) { // swagger中没有定义default,加上default专用于处理exception diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayClient.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayClient.java index b80cd43..f54663a 100644 --- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayClient.java +++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayClient.java @@ -35,6 +35,7 @@ import org.apache.servicecomb.foundation.vertx.client.ClientVerticle; import org.apache.servicecomb.foundation.vertx.client.tcp.TcpClientConfig; import org.apache.servicecomb.swagger.invocation.AsyncResponse; import org.apache.servicecomb.swagger.invocation.Response; +import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData; import org.apache.servicecomb.swagger.invocation.exception.InvocationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,7 +118,7 @@ public class HighwayClient { if (ar.cause() instanceof TimeoutException) { // give an accurate cause for timeout exception asyncResp.consumerFail(new InvocationException(Status.REQUEST_TIMEOUT, - String.format("Request Timeout. Details: %s", ar.cause().getMessage()))); + new CommonExceptionData(String.format("Request Timeout. Details: %s", ar.cause().getMessage())))); return; } asyncResp.consumerFail(ar.cause()); diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java index 18f6f58..251b27e 100644 --- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java +++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java @@ -45,6 +45,7 @@ import org.apache.servicecomb.foundation.vertx.metrics.metric.DefaultHttpSocketM import org.apache.servicecomb.registry.definition.DefinitionConst; import org.apache.servicecomb.swagger.invocation.AsyncResponse; import org.apache.servicecomb.swagger.invocation.Response; +import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData; import org.apache.servicecomb.swagger.invocation.exception.InvocationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -284,7 +285,7 @@ public class RestClientInvocation { if (e instanceof TimeoutException) { // give an accurate cause for timeout exception asyncResp.consumerFail(new InvocationException(Status.REQUEST_TIMEOUT, - String.format("Request Timeout. Details: %s", e.getMessage()))); + new CommonExceptionData(String.format("Request Timeout. Details: %s", e.getMessage())))); return; } asyncResp.fail(invocation.getInvocationType(), e);