This is an automated email from the ASF dual-hosted git repository. liujun pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/main by this push: new 4a18afeee update condition route config design v3.1 (#2700) 4a18afeee is described below commit 4a18afeee52d1ace88afd2faae7d18362247845d Author: YarBor <110281261+yar...@users.noreply.github.com> AuthorDate: Fri Jul 19 13:39:37 2024 +0800 update condition route config design v3.1 (#2700) --- cluster/router/affinity/factory.go | 53 +++ cluster/router/affinity/router.go | 223 ++++++++++ cluster/router/affinity/router_test.go | 231 ++++++++++ cluster/router/condition/dynamic_router.go | 118 ++++-- cluster/router/condition/route.go | 134 +++--- cluster/router/condition/router_test.go | 658 +++++++++++++++++++++-------- common/constant/key.go | 14 +- config/router_config.go | 57 ++- 8 files changed, 1183 insertions(+), 305 deletions(-) diff --git a/cluster/router/affinity/factory.go b/cluster/router/affinity/factory.go new file mode 100644 index 000000000..fb41497a6 --- /dev/null +++ b/cluster/router/affinity/factory.go @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package affinity + +import ( + "dubbo.apache.org/dubbo-go/v3/cluster/router" + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/common/extension" +) + +func init() { + extension.SetRouterFactory(constant.AffinityServiceRouterFactoryKey, NewServiceAffinityRouterFactory) + extension.SetRouterFactory(constant.AffinityAppRouterFactoryKey, NewApplicationAffinityRouterFactory) +} + +// ServiceAffinityRouterFactory router factory +type ServiceAffinityRouterFactory struct{} + +// NewServiceAffinityRouterFactory constructs a new PriorityRouterFactory +func NewServiceAffinityRouterFactory() router.PriorityRouterFactory { + return &ServiceAffinityRouterFactory{} +} + +func (n ServiceAffinityRouterFactory) NewPriorityRouter() (router.PriorityRouter, error) { + return newServiceAffinityRoute(), nil +} + +// ApplicationAffinityRouterFactory router factory +type ApplicationAffinityRouterFactory struct{} + +// NewApplicationAffinityRouterFactory constructs a new PriorityRouterFactory +func NewApplicationAffinityRouterFactory() router.PriorityRouterFactory { + return &ApplicationAffinityRouterFactory{} +} + +func (n ApplicationAffinityRouterFactory) NewPriorityRouter() (router.PriorityRouter, error) { + return newApplicationAffinityRouter(), nil +} diff --git a/cluster/router/affinity/router.go b/cluster/router/affinity/router.go new file mode 100644 index 000000000..1ced13dcd --- /dev/null +++ b/cluster/router/affinity/router.go @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package affinity + +import ( + "math" + "strings" + "sync" +) + +import ( + "github.com/dubbogo/gost/log/logger" + + "gopkg.in/yaml.v2" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/cluster/router/condition" + "dubbo.apache.org/dubbo-go/v3/common" + conf "dubbo.apache.org/dubbo-go/v3/common/config" + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/config" + "dubbo.apache.org/dubbo-go/v3/config_center" + "dubbo.apache.org/dubbo-go/v3/protocol" + "dubbo.apache.org/dubbo-go/v3/remoting" +) + +type ServiceAffinityRoute struct { + affinityRoute +} + +func newServiceAffinityRoute() *ServiceAffinityRoute { + return &ServiceAffinityRoute{} +} + +func (s *ServiceAffinityRoute) Notify(invokers []protocol.Invoker) { + if len(invokers) == 0 { + return + } + + url := invokers[0].GetURL() + if url == nil { + logger.Error("Failed to notify a Service Affinity rule, because url is empty") + return + } + + dynamicConfiguration := conf.GetEnvInstance().GetDynamicConfiguration() + if dynamicConfiguration == nil { + logger.Infof("Config center does not start, Condition router will not be enabled") + return + } + + key := strings.Join([]string{url.ColonSeparatedKey(), constant.AffinityRuleSuffix}, "") + dynamicConfiguration.AddListener(key, s) + value, err := dynamicConfiguration.GetRule(key) + if err != nil { + logger.Errorf("Failed to query affinity rule, key=%s, err=%v", key, err) + return + } + + s.Process(&config_center.ConfigChangeEvent{Key: key, Value: value, ConfigType: remoting.EventTypeAdd}) +} + +type ApplicationAffinityRoute struct { + affinityRoute + application string + currentApplication string +} + +func newApplicationAffinityRouter() *ApplicationAffinityRoute { + applicationName := config.GetApplicationConfig().Name + a := &ApplicationAffinityRoute{ + currentApplication: applicationName, + } + + dynamicConfiguration := conf.GetEnvInstance().GetDynamicConfiguration() + if dynamicConfiguration != nil { + dynamicConfiguration.AddListener(strings.Join([]string{applicationName, constant.AffinityRuleSuffix}, ""), a) + } + return a +} + +func (s *ApplicationAffinityRoute) Notify(invokers []protocol.Invoker) { + if len(invokers) == 0 { + return + } + url := invokers[0].GetURL() + if url == nil { + logger.Error("Failed to notify a dynamically condition rule, because url is empty") + return + } + + dynamicConfiguration := conf.GetEnvInstance().GetDynamicConfiguration() + if dynamicConfiguration == nil { + logger.Infof("Config center does not start, Condition router will not be enabled") + return + } + + providerApplication := url.GetParam("application", "") + if providerApplication == "" || providerApplication == s.currentApplication { + logger.Warn("condition router get providerApplication is empty, will not subscribe to provider app rules.") + return + } + + if providerApplication != s.application { + if s.application != "" { + dynamicConfiguration.RemoveListener(strings.Join([]string{s.application, constant.AffinityRuleSuffix}, ""), s) + } + s.application = providerApplication + + key := strings.Join([]string{providerApplication, constant.AffinityRuleSuffix}, "") + dynamicConfiguration.AddListener(key, s) + value, err := dynamicConfiguration.GetRule(key) + if err != nil { + logger.Errorf("Failed to query condition rule, key=%s, err=%v", key, err) + return + } + + s.Process(&config_center.ConfigChangeEvent{Key: key, Value: value, ConfigType: remoting.EventTypeUpdate}) + } +} + +type affinityRoute struct { + mu sync.RWMutex + matcher *condition.FieldMatcher + enabled bool + key string + ratio int32 +} + +func (a *affinityRoute) Process(event *config_center.ConfigChangeEvent) { + a.mu.Lock() + defer a.mu.Unlock() + a.matcher, a.enabled, a.key, a.ratio = nil, false, "", 0 + + switch event.ConfigType { + case remoting.EventTypeDel: + case remoting.EventTypeAdd, remoting.EventTypeUpdate: + cfg, err := parseConfig(event.Value.(string)) + if err != nil { + logger.Errorf("Failed to parse affinity config, key=%s, err=%v", a.key, err) + return + } + + if cfg.AffinityAware.Ratio < 0 || cfg.AffinityAware.Ratio > 100 { + logger.Errorf("Failed to parse affinity config, affinity.ratio=%d, expect 0-100", a.ratio) + return + } + + key := strings.TrimSpace(cfg.AffinityAware.Key) + if !cfg.Enabled || key == "" { + return + } + rule := strings.Join([]string{key, key}, "=$") + f, err := condition.NewFieldMatcher(rule) + if err != nil { + logger.Errorf("Failed to parse affinity config, key=%s, rule=%s ,err=%v", a.key, rule, err) + return + } + + a.matcher, a.enabled, a.key, a.ratio = &f, true, key, cfg.AffinityAware.Ratio + } +} + +func (a *affinityRoute) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { + if len(invokers) == 0 { + return invokers + } + + a.mu.RLock() + enabled, matcher, ratio := a.enabled, a.matcher, a.ratio + a.mu.RUnlock() + + if !enabled { + return invokers + } + + res := make([]protocol.Invoker, 0, len(invokers)) + for _, invoker := range invokers { + if matcher.MatchInvoker(url, invoker, invocation) { + res = append(res, invoker) + } + } + if float32(len(res))/float32(len(invokers)) >= float32(ratio)/float32(100) { + return res + } + + return invokers +} + +func (a *affinityRoute) URL() *common.URL { + return nil +} + +func (a *affinityRoute) Priority() int64 { + // expect this router is the last one in the router chain + return math.MinInt64 +} + +func (a *affinityRoute) Notify(_ []protocol.Invoker) { + panic("this function should not be called") +} + +func parseConfig(c string) (config.AffinityRouter, error) { + res := config.AffinityRouter{} + err := yaml.Unmarshal([]byte(c), &res) + return res, err +} diff --git a/cluster/router/affinity/router_test.go b/cluster/router/affinity/router_test.go new file mode 100644 index 000000000..ced11b6e7 --- /dev/null +++ b/cluster/router/affinity/router_test.go @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package affinity + +import ( + "dubbo.apache.org/dubbo-go/v3/cluster/router/condition" + "dubbo.apache.org/dubbo-go/v3/common" + "dubbo.apache.org/dubbo-go/v3/config_center" + "dubbo.apache.org/dubbo-go/v3/protocol" + "dubbo.apache.org/dubbo-go/v3/protocol/invocation" + "dubbo.apache.org/dubbo-go/v3/remoting" + "github.com/stretchr/testify/assert" + "testing" +) + +var providerUrls = []string{ + "dubbo://127.0.0.1/com.foo.BarService", + "dubbo://127.0.0.1/com.foo.BarService", + "dubbo://127.0.0.1/com.foo.BarService?env=normal", + "dubbo://127.0.0.1/com.foo.BarService?env=normal", + "dubbo://127.0.0.1/com.foo.BarService?env=normal", + "dubbo://127.0.0.1/com.foo.BarService?region=beijing", + "dubbo://127.0.0.1/com.foo.BarService?region=beijing", + "dubbo://127.0.0.1/com.foo.BarService?region=beijing", + "dubbo://127.0.0.1/com.foo.BarService?region=beijing&env=gray", + "dubbo://127.0.0.1/com.foo.BarService?region=beijing&env=gray", + "dubbo://127.0.0.1/com.foo.BarService?region=beijing&env=gray", + "dubbo://127.0.0.1/com.foo.BarService?region=beijing&env=gray", + "dubbo://127.0.0.1/com.foo.BarService?region=beijing&env=normal", + "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou", + "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou", + "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou&env=gray", + "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou&env=gray", + "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou&env=normal", + "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou&env=normal", + "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou&env=normal", + "dubbo://dubbo.apache.org/com.foo.BarService", + "dubbo://dubbo.apache.org/com.foo.BarService", + "dubbo://dubbo.apache.org/com.foo.BarService?env=normal", + "dubbo://dubbo.apache.org/com.foo.BarService?env=normal", + "dubbo://dubbo.apache.org/com.foo.BarService?env=normal", + "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing", + "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing", + "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing", + "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing&env=gray", + "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing&env=gray", + "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing&env=gray", + "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing&env=gray", + "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing&env=normal", + "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou", + "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou", + "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou&env=gray", + "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou&env=gray", + "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou&env=normal", + "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou&env=normal", + "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou&env=normal", +} + +func buildInvokers() []protocol.Invoker { + res := make([]protocol.Invoker, 0, len(providerUrls)) + for _, url := range providerUrls { + u, err := common.NewURL(url) + if err != nil { + panic(err) + } + res = append(res, protocol.NewBaseInvoker(u)) + } + return res +} + +func newUrl(s string) *common.URL { + res, err := common.NewURL(s) + if err != nil { + panic(err) + } + return res +} + +func gen_matcher(key string) *condition.FieldMatcher { + res, err := condition.NewFieldMatcher(key) + if err != nil { + panic(err) + } + return &res +} + +type InvokersFilters []condition.FieldMatcher + +func NewINVOKERS_FILTERS() InvokersFilters { + return []condition.FieldMatcher{} +} + +func (INV InvokersFilters) add(rule string) InvokersFilters { + m := gen_matcher(rule) + return append(INV, *m) +} + +func (INV InvokersFilters) filtrate(inv []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { + for _, cond := range INV { + tmpInv := make([]protocol.Invoker, 0) + for _, invoker := range inv { + if cond.MatchInvoker(url, invoker, invocation) { + tmpInv = append(tmpInv, invoker) + } + } + inv = tmpInv + } + return inv +} + +func Test_affinityRoute_Route(t *testing.T) { + type fields struct { + content string + } + type args struct { + invokers []protocol.Invoker + url *common.URL + invocation protocol.Invocation + } + tests := []struct { + name string + fields fields + args args + invokers_filters InvokersFilters + expectLen int + }{ + { + name: "test base affinity router", + fields: fields{`configVersion: v3.1 +scope: service # Or application +key: service.apache.com +enabled: true +runtime: true +affinityAware: + key: region + ratio: 20`}, + args: args{ + invokers: buildInvokers(), + url: newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"), + invocation: invocation.NewRPCInvocation("getComment", nil, nil), + }, + invokers_filters: NewINVOKERS_FILTERS().add("region=$region"), + }, { + name: "test bad ratio", + fields: fields{`configVersion: v3.1 +scope: service # Or application +key: service.apache.com +enabled: true +runtime: true +affinityAware: + key: region + ratio: 101`}, + args: args{ + invokers: buildInvokers(), + url: newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"), + invocation: invocation.NewRPCInvocation("getComment", nil, nil), + }, + invokers_filters: NewINVOKERS_FILTERS(), + }, { + name: "test ratio false", + fields: fields{`configVersion: v3.1 +scope: service # Or application +key: service.apache.com +enabled: true +runtime: true +affinityAware: + key: region + ratio: 80`}, + args: args{ + invokers: buildInvokers(), + url: newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"), + invocation: invocation.NewRPCInvocation("getComment", nil, nil), + }, + invokers_filters: NewINVOKERS_FILTERS(), + }, { + name: "test ignore affinity route", + fields: fields{`configVersion: v3.1 +scope: service # Or application +key: service.apache.com +enabled: true +runtime: true +affinityAware: + key: bad-key + ratio: 80`}, + args: args{ + invokers: buildInvokers(), + url: newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"), + invocation: invocation.NewRPCInvocation("getComment", nil, nil), + }, + invokers_filters: NewINVOKERS_FILTERS(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + a := &affinityRoute{} + a.Process(&config_center.ConfigChangeEvent{ + Value: tt.fields.content, + ConfigType: remoting.EventTypeUpdate, + }) + res := a.Route(tt.args.invokers, tt.args.url, tt.args.invocation) + if tt.invokers_filters != nil { + // check expect filtrate path + ans := tt.invokers_filters.filtrate(tt.args.invokers, tt.args.url, tt.args.invocation) + if len(ans) < int(float32(len(providerUrls))*(float32(a.ratio)/100.)) { + assert.Equalf(t, 0, len(ans), "route(%v, %v, %v)", tt.args.invokers, tt.args.url, tt.args.invocation) + } else { + assert.Equalf(t, ans, res, "route(%v, %v, %v)", tt.args.invokers, tt.args.url, tt.args.invocation) + } + } else { + ans := tt.invokers_filters.filtrate(tt.args.invokers, tt.args.url, tt.args.invocation) + assert.Equalf(t, tt.expectLen, len(ans), "route(%v, %v, %v)", tt.args.invokers, tt.args.url, tt.args.invocation) + } + }) + } + +} diff --git a/cluster/router/condition/dynamic_router.go b/cluster/router/condition/dynamic_router.go index dd8feca04..7c8aef1ee 100644 --- a/cluster/router/condition/dynamic_router.go +++ b/cluster/router/condition/dynamic_router.go @@ -19,7 +19,6 @@ package condition import ( "fmt" - "sort" "strconv" "strings" "sync" @@ -58,20 +57,41 @@ func (p stateRouters) route(invokers []protocol.Invoker, url *common.URL, invoca return invokers } -type multiplyConditionRoute []*MultiDestRouter +type multiplyConditionRoute struct { + trafficDisabled []*FieldMatcher + routes []*MultiDestRouter +} -func (m multiplyConditionRoute) route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { - if len(invokers) == 0 || len(m) == 0 { - return invokers +func (m *multiplyConditionRoute) route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { + if len(m.trafficDisabled) != 0 { + for _, cond := range m.trafficDisabled { + if cond.MatchRequest(url, invocation) { + logger.Warnf("Request has been disabled %s by Condition.trafficDisable.match=\"%s\"", url.String(), cond.rule) + invocation.SetAttachment(constant.TrafficDisableKey, struct{}{}) + return []protocol.Invoker{} + } + } } - for _, router := range m { - res, isMatchWhen := router.Route(invokers, url, invocation) - if !isMatchWhen || (len(res) == 0 && invocation.GetAttachmentInterface(constant.TrafficDisableKey) == nil && !router.force) { - continue + + if len(invokers) != 0 && len(m.routes) != 0 { + for _, router := range m.routes { + isMatchWhen := false + invokers, isMatchWhen = router.Route(invokers, url, invocation) + if !isMatchWhen { + continue + } + if len(invokers) == 0 { + routeChains, ok := invocation.Attributes()["condition-chain"].([]string) + if ok { + logger.Errorf("request[%s] route an empty set in condition-route:: %s", url.String(), strings.Join(routeChains, "-->")) + } + return []protocol.Invoker{} + } } - return res + delete(invocation.Attributes(), "condition-chain") } - return []protocol.Invoker{} + + return invokers } type condRouter interface { @@ -174,7 +194,7 @@ func generateCondition(rawConfig string) (condRouter, bool, bool, error) { } } -func generateMultiConditionRoute(rawConfig string) (multiplyConditionRoute, bool, bool, error) { +func generateMultiConditionRoute(rawConfig string) (*multiplyConditionRoute, bool, bool, error) { routerConfig, err := parseMultiConditionRoute(rawConfig) if err != nil { logger.Warnf("[condition router]Build a new condition route config error, %s and we will use the original condition rule configuration.", err.Error()) @@ -186,41 +206,42 @@ func generateMultiConditionRoute(rawConfig string) (multiplyConditionRoute, bool return nil, false, false, nil } + // remove same condition + removeDuplicates(routerConfig.Conditions) + conditionRouters := make([]*MultiDestRouter, 0, len(routerConfig.Conditions)) + disableMultiConditions := make([]*FieldMatcher, 0) for _, conditionRule := range routerConfig.Conditions { - url, err := common.NewURL("condition://") - if err != nil { - return nil, false, false, err + // removeDuplicates will set nil + if conditionRule == nil { + continue + } + url, err1 := common.NewURL("condition://") + if err1 != nil { + return nil, false, false, err1 } url.SetAttribute(constant.RuleKey, conditionRule) - url.AddParam(constant.TrafficDisableKey, strconv.FormatBool(conditionRule.Disable)) - url.AddParam(constant.ForceKey, strconv.FormatBool(conditionRule.Force)) - if conditionRule.Priority < 0 { - logger.Warnf("got conditionRouteConfig.conditions.priority (%d < 0) is invalid, ignore priority value, use defatult %d ", conditionRule.Priority, constant.DefaultRoutePriority) - } else { - url.AddParam(constant.PriorityKey, strconv.FormatInt(int64(conditionRule.Priority), 10)) + + conditionRoute, err2 := NewConditionMultiDestRouter(url) + if err2 != nil { + return nil, false, false, err2 } - if conditionRule.Ratio < 0 || conditionRule.Ratio > 100 { - logger.Warnf("got conditionRouteConfig.conditions.ratio (%d) is invalid, hope (0 - 100), ignore ratio value, use defatult %d ", conditionRule.Ratio, constant.DefaultRouteRatio) - } else { - url.AddParam(constant.RatioKey, strconv.FormatInt(int64(conditionRule.Ratio), 10)) + // got invalid condition config, continue + if conditionRoute == nil { + continue } - - conditionRoute, err := NewConditionMultiDestRouter(url) - if err != nil { - return nil, false, false, err + if conditionRoute.thenCondition != nil && len(conditionRoute.thenCondition) != 0 { + conditionRouters = append(conditionRouters, conditionRoute) + } else { + disableMultiConditions = append(disableMultiConditions, &conditionRoute.whenCondition) } - conditionRouters = append(conditionRouters, conditionRoute) } - sort.Slice(conditionRouters, func(i, j int) bool { - if conditionRouters[i].trafficDisable { - return true - } - return conditionRouters[i].priority > conditionRouters[j].priority - }) - return conditionRouters, force, enable, nil + return &multiplyConditionRoute{ + trafficDisabled: disableMultiConditions, + routes: conditionRouters, + }, force, enable, nil } func generateConditionsRoute(rawConfig string) (stateRouters, bool, bool, error) { @@ -330,20 +351,20 @@ func (a *ApplicationRouter) Notify(invokers []protocol.Invoker) { return } - providerApplicaton := url.GetParam("application", "") - if providerApplicaton == "" || providerApplicaton == a.currentApplication { + providerApplication := url.GetParam("application", "") + if providerApplication == "" || providerApplication == a.currentApplication { logger.Warn("condition router get providerApplication is empty, will not subscribe to provider app rules.") return } - if providerApplicaton != a.application { + if providerApplication != a.application { if a.application != "" { dynamicConfiguration.RemoveListener(strings.Join([]string{a.application, constant.ConditionRouterRuleSuffix}, ""), a) } - key := strings.Join([]string{providerApplicaton, constant.ConditionRouterRuleSuffix}, "") + key := strings.Join([]string{providerApplication, constant.ConditionRouterRuleSuffix}, "") dynamicConfiguration.AddListener(key, a) - a.application = providerApplicaton + a.application = providerApplication value, err := dynamicConfiguration.GetRule(key) if err != nil { logger.Errorf("Failed to query condition rule, key=%s, err=%v", key, err) @@ -352,3 +373,16 @@ func (a *ApplicationRouter) Notify(invokers []protocol.Invoker) { a.Process(&config_center.ConfigChangeEvent{Key: key, Value: value, ConfigType: remoting.EventTypeUpdate}) } } + +func removeDuplicates(rules []*config.ConditionRule) { + for i := 0; i < len(rules); i++ { + if rules[i] == nil { + continue + } + for j := i + 1; j < len(rules); j++ { + if rules[j] != nil && rules[i].Equal(rules[j]) { + rules[j] = nil + } + } + } +} diff --git a/cluster/router/condition/route.go b/cluster/router/condition/route.go index 55e85bdfd..cbe2995fb 100644 --- a/cluster/router/condition/route.go +++ b/cluster/router/condition/route.go @@ -299,63 +299,84 @@ func parseConditionRoute(routeContent string) (*config.RouterConfig, error) { return routerConfig, nil } +type FieldMatcher struct { + rule string + match map[string]matcher.Matcher +} + +func NewFieldMatcher(rule string) (FieldMatcher, error) { + m, err := parseRule(rule) + if err != nil { + return FieldMatcher{}, err + } + return FieldMatcher{rule: rule, match: m}, nil +} + +func (m *FieldMatcher) MatchRequest(url *common.URL, invocation protocol.Invocation) bool { + return doMatch(url, nil, invocation, m.match, true) +} + +func (m *FieldMatcher) MatchInvoker(url *common.URL, ivk protocol.Invoker, invocation protocol.Invocation) bool { + return doMatch(ivk.GetURL(), url, nil, m.match, false) +} + // MultiDestRouter Multiply-Destination-Router type MultiDestRouter struct { - whenCondition map[string]matcher.Matcher - trafficDisable bool - thenCondition []condSet - ratio int - priority int - force bool + whenCondition FieldMatcher + thenCondition []condSet } type condSet struct { - cond map[string]matcher.Matcher + FieldMatcher subSetWeight int } -func newCondSet(cond map[string]matcher.Matcher, subSetWeight int) *condSet { +func newCondSet(rule string, subSetWeight int) (condSet, error) { if subSetWeight <= 0 { subSetWeight = constant.DefaultRouteConditionSubSetWeight } - return &condSet{cond: cond, subSetWeight: subSetWeight} + m, err := NewFieldMatcher(rule) + if err != nil { + return condSet{}, err + } + return condSet{FieldMatcher: m, subSetWeight: subSetWeight}, nil +} + +type destination struct { + matchRule string + weight int + ivks []protocol.Invoker } type destSets struct { - dest []struct { - weight int - ivks []protocol.Invoker - } - weightSum int + destinations []*destination + weightSum int } func newDestSets() *destSets { return &destSets{ - dest: make([]struct { - weight int - ivks []protocol.Invoker - }, 0), - weightSum: 0, + destinations: []*destination{}, + weightSum: 0, } } -func (s *destSets) addDest(weight int, ivks []protocol.Invoker) { - s.dest = append(s.dest, struct { - weight int - ivks []protocol.Invoker - }{weight: weight, ivks: ivks}) +func (s *destSets) addDest(weight int, rule string, ivks []protocol.Invoker) { + s.destinations = append(s.destinations, &destination{weight: weight, matchRule: rule, ivks: ivks}) s.weightSum += weight } -func (s *destSets) randDest() []protocol.Invoker { - if len(s.dest) == 1 { - return s.dest[0].ivks +func (s *destSets) randDest() *destination { + if s.weightSum == 0 { + return nil + } + if len(s.destinations) == 1 { + return s.destinations[0] } sum := rand.Intn(s.weightSum) - for _, d := range s.dest { + for _, d := range s.destinations { sum -= d.weight if sum <= 0 { - return d.ivks + return d } } return nil @@ -366,15 +387,10 @@ func (m MultiDestRouter) Route(invokers []protocol.Invoker, url *common.URL, inv return invokers, false } - if !doMatch(url, nil, invocation, m.whenCondition, true) { + if !m.whenCondition.MatchRequest(url, invocation) { return invokers, false } - if m.trafficDisable { - invocation.SetAttachment(constant.TrafficDisableKey, struct{}{}) - return []protocol.Invoker{}, true - } - if len(m.thenCondition) == 0 { logger.Warn("condition state router thenCondition is empty") return []protocol.Invoker{}, true @@ -384,23 +400,31 @@ func (m MultiDestRouter) Route(invokers []protocol.Invoker, url *common.URL, inv for _, condition := range m.thenCondition { res := make([]protocol.Invoker, 0) for _, invoker := range invokers { - if doMatch(invoker.GetURL(), url, nil, condition.cond, false) { + if condition.MatchInvoker(url, invoker, invocation) { res = append(res, invoker) } } if len(res) != 0 { - destinations.addDest(condition.subSetWeight, res) + destinations.addDest(condition.subSetWeight, condition.rule, res) } } + // use to print log, if route empty + i, ok := invocation.Attributes()["condition-chain"].([]string) + if !ok { + i = []string{} + } - if len(destinations.dest) != 0 { - res := destinations.randDest() - // check x% > m.ratio% - if len(res)*100/len(invokers) > m.ratio { - return res, true - } + d := destinations.randDest() + if d != nil { + invocation.Attributes()["condition-chain"] = append(i, "request="+m.whenCondition.rule+",invokers="+d.matchRule) + return d.ivks, true } + thenRule := make([]string, 0, len(m.thenCondition)) + for _, set := range m.thenCondition { + thenRule = append(thenRule, set.rule) + } + invocation.Attributes()["condition-chain"] = append(i, "request="+m.whenCondition.rule+",invokers!="+strings.Join(thenRule, ",")) return []protocol.Invoker{}, true } @@ -415,35 +439,31 @@ func NewConditionMultiDestRouter(url *common.URL) (*MultiDestRouter, error) { if !ok { return nil, errors.Errorf("Condition Router can't get the rule key") } - condConf, ok := rawCondConf.(config.ConditionRule) + condConf, ok := rawCondConf.(*config.ConditionRule) if !ok { return nil, errors.Errorf("Condition Router get the rule key invaild , got %T", rawCondConf) } + // ensure config effective + if (condConf.To == nil || len(condConf.To) == 0) && condConf.From.Match == "" { + return nil, nil + } c := &MultiDestRouter{ - whenCondition: make(map[string]matcher.Matcher), - thenCondition: make([]condSet, 0, len(condConf.To)), - trafficDisable: url.GetParamBool(constant.TrafficDisableKey, false), - ratio: int(url.GetParamInt32(constant.RatioKey, constant.DefaultRouteRatio)), - priority: int(url.GetParamInt32(constant.PriorityKey, constant.DefaultRoutePriority)), - force: url.GetParamBool(constant.ForceKey, false), + thenCondition: make([]condSet, 0, len(condConf.To)), } - m, err := parseRule(condConf.From.Match) + var err error + c.whenCondition, err = NewFieldMatcher(condConf.From.Match) if err != nil { return nil, err } - for k, v := range m { - // if key same, cover - c.whenCondition[k] = v - } for _, ruleTo := range condConf.To { - cond, err := parseRule(ruleTo.Match) + cs, err := newCondSet(ruleTo.Match, ruleTo.Weight) if err != nil { return nil, err } - c.thenCondition = append(c.thenCondition, *newCondSet(cond, ruleTo.Weight)) + c.thenCondition = append(c.thenCondition, cs) } return c, nil diff --git a/cluster/router/condition/router_test.go b/cluster/router/condition/router_test.go index 04d67ae6f..ae1727415 100644 --- a/cluster/router/condition/router_test.go +++ b/cluster/router/condition/router_test.go @@ -18,6 +18,7 @@ package condition import ( + "sync" "testing" ) @@ -27,9 +28,10 @@ import ( import ( "dubbo.apache.org/dubbo-go/v3/common" - "dubbo.apache.org/dubbo-go/v3/common/config" + commonConfig "dubbo.apache.org/dubbo-go/v3/common/config" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" + "dubbo.apache.org/dubbo-go/v3/config" "dubbo.apache.org/dubbo-go/v3/config_center" "dubbo.apache.org/dubbo-go/v3/config_center/configurator" "dubbo.apache.org/dubbo-go/v3/protocol" @@ -750,7 +752,7 @@ conditions: - 'method=sayHello => region=hangzhou'`, } dc, _ := mockFactory.GetDynamicConfiguration(ccURL) - config.GetEnvInstance().SetDynamicConfiguration(dc) + commonConfig.GetEnvInstance().SetDynamicConfiguration(dc) router := NewServiceRouter() router.Notify(invokerList) @@ -795,7 +797,7 @@ conditions: - 'method=sayHello => region=hangzhou'`, } dc, _ := mockFactory.GetDynamicConfiguration(ccURL) - config.GetEnvInstance().SetDynamicConfiguration(dc) + commonConfig.GetEnvInstance().SetDynamicConfiguration(dc) router := NewApplicationRouter() router.Notify(invokerList) @@ -864,224 +866,514 @@ func buildInvokers() []protocol.Invoker { return res } -func TestConditionRoutePriority(t *testing.T) { - ivks := buildInvokers() - ar := NewApplicationRouter() - ar.Process(&config_center.ConfigChangeEvent{Key: "", Value: `configVersion: v3.1 +func Test_parseMultiConditionRoute(t *testing.T) { + type args struct { + routeContent string + } + tests := []struct { + name string + args args + want *config.ConditionRouter + wantErr assert.ErrorAssertionFunc + }{ + {name: "testParseConfig", args: args{`configVersion: v3.1 scope: service +key: org.apache.dubbo.samples.CommentService force: false runtime: true enabled: true -key: shop + +####### conditions: - from: - match: + match: tag=tag1 # disable traffic + - from: + match: tag=gray to: - - match: region=$region & version=v1 - - match: region=$region & version=v2 - weight: 200 - - match: region=$region & version=v3 - weight: 300 - force: false - ratio: 20 - priority: 20 - - from: - match: - region=beijing & version=v1 + - match: tag!=gray + weight: 100 + - match: tag=gray + weight: 900 + - from: + match: version=v1 to: - - match: env=$env & region=beijing - force: false - priority: 100 -`, ConfigType: remoting.EventTypeUpdate}) - consumerUrl, err := common.NewURL("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing&version=v1") - if err != nil { - panic(err) - } - got := ar.Route(ivks, consumerUrl, invocation.NewRPCInvocation("getComment", nil, nil)) - expLen := 0 - for _, ivk := range ivks { - if ivk.GetURL().GetParam("region", "") == "beijing" && "gray" == ivk.GetURL().GetParam("env", "") { - expLen++ - } + - match: version=v1`}, want: &config.ConditionRouter{ + Scope: "service", + Key: "org.apache.dubbo.samples.CommentService", + Force: false, + Runtime: true, + Enabled: true, + Conditions: []*config.ConditionRule{ + { + From: config.ConditionRuleFrom{Match: "tag=tag1"}, + To: nil, + }, { + From: config.ConditionRuleFrom{ + Match: `tag=gray`, + }, + To: []config.ConditionRuleTo{{ + Match: `tag!=gray`, + Weight: 100, + }, { + Match: `tag=gray`, + Weight: 900, + }}, + }, { + From: config.ConditionRuleFrom{ + Match: `version=v1`, + }, + To: []config.ConditionRuleTo{{ + Match: `version=v1`, + Weight: 0, + }}, + }}, + }}, } - if len(ivks)*100/expLen <= 20 { - expLen = 0 + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := parseMultiConditionRoute(tt.args.routeContent) + assert.Nil(t, err) + assert.Equalf(t, tt.want, got, "parseMultiConditionRoute(%v)", tt.args.routeContent) + }) } - assert.Equal(t, expLen, len(got)) } -func TestConditionRouteTrafficDisable(t *testing.T) { - ivks := buildInvokers() - ar := NewApplicationRouter() - ar.Process(&config_center.ConfigChangeEvent{Key: "", Value: `configVersion: v3.1 -scope: service -force: true -runtime: true -enabled: true -key: shop -conditions: - - from: - match: - to: - - match: region=$region & version=v1 - - match: region=$region & version=v2 - weight: 200 - - match: region=$region & version=v3 - weight: 300 - force: false - ratio: 20 - priority: 20 - - from: - match: - region=beijing & version=v1 - to: - force: true - ratio: 20 - priority: 100 -`, ConfigType: remoting.EventTypeUpdate}) - consumerUrl, err := common.NewURL("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing") +func genMatcher(rule string) FieldMatcher { + cond, err := parseRule(rule) if err != nil { panic(err) } - got := ar.Route(ivks, consumerUrl, invocation.NewRPCInvocation("echo", nil, nil)) - assert.Equal(t, 0, len(got)) + m := FieldMatcher{ + rule: rule, + match: cond, + } + return m } -func TestConditionRouteRegionPriority(t *testing.T) { - ivks := buildInvokers() - ar := NewApplicationRouter() - ar.Process(&config_center.ConfigChangeEvent{Key: "", Value: `configVersion: v3.1 -scope: service -force: true -runtime: true -enabled: true -key: shop -conditions: - - from: - match: - to: - - match: region=$region & env=$env -`, ConfigType: remoting.EventTypeUpdate}) - consumerUrl, err := common.NewURL("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing") - if err != nil { - panic(err) - } - got := ar.Route(ivks, consumerUrl, invocation.NewRPCInvocation("getComment", nil, nil)) - expLen := 0 - for _, ivk := range ivks { - if ivk.GetURL().GetRawParam("env") == consumerUrl.GetRawParam("env") && - ivk.GetURL().GetRawParam("region") == consumerUrl.GetRawParam("region") { - expLen++ +type InvokersFilters []FieldMatcher + +func NewINVOKERS_FILTERS() InvokersFilters { + return []FieldMatcher{} +} + +func (INV InvokersFilters) add(rule string) InvokersFilters { + m := genMatcher(rule) + return append(INV, m) +} + +func (INV InvokersFilters) filtrate(inv []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { + for _, cond := range INV { + tmpInv := make([]protocol.Invoker, 0) + for _, invoker := range inv { + if cond.MatchInvoker(url, invoker, invocation) { + tmpInv = append(tmpInv, invoker) + } } + inv = tmpInv } - assert.Equal(t, expLen, len(got)) - consumerUrl, err = common.NewURL("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=hangzhou") + return inv +} + +func newUrl(url string) *common.URL { + res, err := common.NewURL(url) if err != nil { panic(err) } - got = ar.Route(ivks, consumerUrl, invocation.NewRPCInvocation("getComment", nil, nil)) - expLen = 0 - for _, ivk := range ivks { - if ivk.GetURL().GetRawParam("env") == consumerUrl.GetRawParam("env") && - ivk.GetURL().GetRawParam("region") == consumerUrl.GetRawParam("region") { - expLen++ - } + return res +} + +func Test_multiplyConditionRoute_route(t *testing.T) { + type args struct { + invokers []protocol.Invoker + url *common.URL + invocation protocol.Invocation } - assert.Equal(t, expLen, len(got)) - consumerUrl, err = common.NewURL("consumer://127.0.0.1/com.foo.BarService?env=normal®ion=shanghai") - if err != nil { - panic(err) + d := DynamicRouter{ + mu: sync.RWMutex{}, + force: false, + enable: false, + conditionRouter: nil, } - got = ar.Route(ivks, consumerUrl, invocation.NewRPCInvocation("getComment", nil, nil)) - expLen = 0 - for _, ivk := range ivks { - if ivk.GetURL().GetRawParam("region") == consumerUrl.GetRawParam("region") && - ivk.GetURL().GetRawParam("env") == consumerUrl.GetRawParam("env") { - expLen++ + tests := []struct { + name string + content string + args args + invokers_filters InvokersFilters + expResLen int + multiDestination []struct { + invokers_filters InvokersFilters + weight float32 } - } - assert.Equal(t, expLen, len(got)) -} - -func TestConditionRouteRegionPriorityFail(t *testing.T) { - ivks := buildInvokers() - ar := NewApplicationRouter() - ar.Process(&config_center.ConfigChangeEvent{Key: "", Value: `configVersion: v3.1 -scope: service -force: true -runtime: true -enabled: true -key: shop + }{ + { + name: "test base condition", + content: `configVersion: v3.1 +scope: service +key: org.apache.dubbo.samples.CommentService +force: false +runtime: true +enabled: true conditions: - from: - match: + match: env=gray to: - - match: region=$region & env=$env - ratio: 100 -`, ConfigType: remoting.EventTypeUpdate}) - consumerUrl, err := common.NewURL("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing") - if err != nil { - panic(err) - } - got := ar.Route(ivks, consumerUrl, invocation.NewRPCInvocation("getComment", nil, nil)) - assert.Equal(t, 0, len(got)) -} - -func TestConditionRouteMatchFail(t *testing.T) { - ivks := buildInvokers() - ar := NewApplicationRouter() - ar.Process(&config_center.ConfigChangeEvent{Key: "", Value: `configVersion: v3.1 -scope: service -force: false -runtime: true -enabled: true -key: shop + - match: env!=gray + weight: 100 +`, + args: args{ + invokers: buildInvokers(), + url: newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"), + invocation: invocation.NewRPCInvocation("echo", nil, nil), + }, + invokers_filters: NewINVOKERS_FILTERS().add("env!=gray"), + }, { + name: "test removeDuplicates condition", + content: `configVersion: v3.1 +scope: service +key: org.apache.dubbo.samples.CommentService +force: false +runtime: true +enabled: true conditions: - from: - match: + match: env=gray to: - - match: region=$region & env=$env & err-tag=Err-tag + - match: env!=gray + weight: 100 - from: - match: - trafficDisable: true + match: env=gray to: - - match: -`, ConfigType: remoting.EventTypeUpdate}) - consumerUrl, err := common.NewURL("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing") - if err != nil { - panic(err) - } - got := ar.Route(ivks, consumerUrl, invocation.NewRPCInvocation("errMethod", nil, nil)) - assert.Equal(t, 0, len(got)) -} - -func TestConditionRouteBanSpecialTraffic(t *testing.T) { - ivks := buildInvokers() - ar := NewApplicationRouter() - ar.Process(&config_center.ConfigChangeEvent{Key: "", Value: `configVersion: v3.1 -scope: service -force: true -runtime: true -enabled: true -key: shop + - match: env!=gray + weight: 100 +`, + args: args{ + invokers: buildInvokers(), + url: newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"), + invocation: invocation.NewRPCInvocation("echo", nil, nil), + }, + invokers_filters: NewINVOKERS_FILTERS().add("env!=gray"), + }, { + name: "test consequent condition", + content: `configVersion: v3.1 +scope: service +key: org.apache.dubbo.samples.CommentService +force: false +runtime: true +enabled: true conditions: - from: match: env=gray to: - - match: - force: true - priority: 100 + - match: env!=gray + weight: 100 - from: - match: + match: region=beijing to: - - match: - force: true - priority: 100 -`, ConfigType: remoting.EventTypeUpdate}) - consumerUrl, err := common.NewURL("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing") - if err != nil { - panic(err) + - match: region=beijing + weight: 100 + - from: + to: + - match: host!=127.0.0.1 +`, + args: args{ + invokers: buildInvokers(), + url: newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"), + invocation: invocation.NewRPCInvocation("echo", nil, nil), + }, + invokers_filters: NewINVOKERS_FILTERS().add("env!=gray").add(`region=beijing`).add(`host!=127.0.0.1`), + }, { + name: "test unMatch condition", + content: `configVersion: v3.1 +scope: service +key: org.apache.dubbo.samples.CommentService +force: false +runtime: true +enabled: true +conditions: + - from: + match: env!=gray + to: + - match: env=gray + weight: 100 + - from: + match: region!=beijing + to: + - match: region=beijing + weight: 100 + - from: + to: + - match: host!=127.0.0.1 +`, + args: args{ + invokers: buildInvokers(), + url: newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"), + invocation: invocation.NewRPCInvocation("echo", nil, nil), + }, + invokers_filters: NewINVOKERS_FILTERS().add(`host!=127.0.0.1`), + }, { + name: "test Match and Route zero", + content: `configVersion: v3.1 +scope: service +key: org.apache.dubbo.samples.CommentService +force: true # <--- +runtime: true +enabled: true +conditions: + - from: + match: env=gray # match success here + to: + - match: env=ErrTag # all invoker can't match this + weight: 100 + - from: + match: region!=beijing + to: + - match: region=beijing + weight: 100 + - from: + to: + - match: host!=127.0.0.1 +`, + args: args{ + invokers: buildInvokers(), + url: newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"), + invocation: invocation.NewRPCInvocation("echo", nil, nil), + }, + expResLen: 0, + }, { + name: "test Match, Route zero and ignore ", + content: `configVersion: v3.1 +scope: service +key: org.apache.dubbo.samples.CommentService +force: false # <--- to ignore bad result +runtime: true +enabled: true +conditions: + - from: + match: region=beijing + to: + - match: region!=beijing + weight: 100 + - from: + to: + - match: host!=127.0.0.1 + - from: + match: env=gray # match success here + to: + - match: env=ErrTag # all invoker can't match this + weight: 100 +`, + args: args{ + invokers: buildInvokers(), + url: newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"), + invocation: invocation.NewRPCInvocation("echo", nil, nil), + }, + invokers_filters: NewINVOKERS_FILTERS(), + }, { + name: "test traffic disabled and ignore condition-route.force", + content: `configVersion: v3.1 +scope: service +key: org.apache.dubbo.samples.CommentService +force: false +runtime: true +enabled: true +conditions: + - from: + match: host=127.0.0.1 # <--- disabled + - from: + match: env=gray + to: + - match: env!=gray + weight: 100 + - to: + - match: region!=beijing +`, + args: args{ + invokers: buildInvokers(), + url: newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"), + invocation: invocation.NewRPCInvocation("echo", nil, nil), + }, + expResLen: 0, + }, { + name: "test multiply destination", + content: `configVersion: v3.1 +scope: service +key: org.apache.dubbo.samples.CommentService +force: false +runtime: true +enabled: true +conditions: + - from: + match: env=gray + to: + - match: env!=gray + weight: 100 + - match: env=gray + weight: 900 + - from: + match: region=beijing + to: + - match: region!=beijing + weight: 100 + - match: region=beijing + weight: 200 +`, + args: args{ + invokers: buildInvokers(), + url: newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"), + invocation: invocation.NewRPCInvocation("echo", nil, nil), + }, + multiDestination: []struct { + invokers_filters InvokersFilters + weight float32 + }{{ + invokers_filters: NewINVOKERS_FILTERS().add(`env=gray`).add(`region=beijing`), + weight: float32(900) / float32(1000) * float32(200) / float32(300), + }, { + invokers_filters: NewINVOKERS_FILTERS().add(`env!=gray`).add(`region=beijing`), + weight: float32(100) / float32(1000) * float32(200) / float32(300), + }, { + invokers_filters: NewINVOKERS_FILTERS().add(`env=gray`).add(`region!=beijing`), + weight: float32(900) / float32(1000) * float32(100) / float32(300), + }, { + invokers_filters: NewINVOKERS_FILTERS().add(`env!=gray`).add(`region!=beijing`), + weight: float32(100) / float32(1000) * float32(100) / float32(300), + }}, + }, { + name: "test multiply destination with ignore some condition", + content: `configVersion: v3.1 +scope: service +key: org.apache.dubbo.samples.CommentService +force: false +runtime: true +enabled: true +conditions: + - from: + match: env=gray + to: + - match: env!=gray + weight: 100 + - match: env=gray----error # will ignore this subset + weight: 900 + - from: + match: region=beijing + to: + - match: region!=beijing + weight: 100 + - match: region=beijing + weight: 200 +`, + args: args{ + invokers: buildInvokers(), + url: newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"), + invocation: invocation.NewRPCInvocation("echo", nil, nil), + }, + multiDestination: []struct { + invokers_filters InvokersFilters + weight float32 + }{{ + invokers_filters: NewINVOKERS_FILTERS().add(`env!=gray`).add(`region=beijing`), + weight: float32(200) / float32(300), + }, { + invokers_filters: NewINVOKERS_FILTERS().add(`env!=gray`).add(`region!=beijing`), + weight: float32(100) / float32(300), + }}, + }, { + name: "test multiply destination with ignore some condition node", + content: `configVersion: v3.1 +scope: service +key: org.apache.dubbo.samples.CommentService +force: false +runtime: true +enabled: true +conditions: + - from: + match: env=gray + to: + - match: env!=gray + weight: 100 + - match: env=gray + weight: 900 + - from: # <-- will ignore this condition + match: region!=beijing + to: + - match: env=normal + weight: 100 + - match: env=gray + weight: 200 + - from: + match: region=beijing + to: + - match: region!=beijing + weight: 100 + - match: region=beijing + weight: 200 +`, + args: args{ + invokers: buildInvokers(), + url: newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"), + invocation: invocation.NewRPCInvocation("echo", nil, nil), + }, + multiDestination: []struct { + invokers_filters InvokersFilters + weight float32 + }{{ + invokers_filters: NewINVOKERS_FILTERS().add(`env=gray`).add(`region=beijing`), + weight: float32(900) / float32(1000) * float32(200) / float32(300), + }, { + invokers_filters: NewINVOKERS_FILTERS().add(`env!=gray`).add(`region=beijing`), + weight: float32(100) / float32(1000) * float32(200) / float32(300), + }, { + invokers_filters: NewINVOKERS_FILTERS().add(`env=gray`).add(`region!=beijing`), + weight: float32(900) / float32(1000) * float32(100) / float32(300), + }, { + invokers_filters: NewINVOKERS_FILTERS().add(`env!=gray`).add(`region!=beijing`), + weight: float32(100) / float32(1000) * float32(100) / float32(300), + }}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + d.Process(&config_center.ConfigChangeEvent{ + Value: tt.content, + ConfigType: remoting.EventTypeUpdate, + }) + if tt.multiDestination == nil { + res := d.Route(tt.args.invokers, tt.args.url, tt.args.invocation) + if tt.invokers_filters != nil { + // check expect filtrate path + ans := tt.invokers_filters.filtrate(tt.args.invokers, tt.args.url, tt.args.invocation) + assert.Equalf(t, ans, res, "route(%v, %v, %v)", tt.args.invokers, tt.args.url, tt.args.invocation) + } else { + // check expect result.length + assert.Equalf(t, tt.expResLen, len(res), "route(%v, %v, %v)", tt.args.invokers, tt.args.url, tt.args.invocation) + } + } else { + // check multiply destination route successfully or not + ans := map[interface{}]float32{} + for _, s := range tt.multiDestination { + args := struct { + invokers []protocol.Invoker + url *common.URL + invocation protocol.Invocation + }{tt.args.invokers[:], tt.args.url.Clone(), tt.args.invocation} + ans[len(s.invokers_filters.filtrate(args.invokers, tt.args.url, tt.args.invocation))] = s.weight * 1000 + } + res := map[interface{}]int{} + for i := 0; i < 1000; i++ { + args := struct { + invokers []protocol.Invoker + url *common.URL + invocation protocol.Invocation + }{tt.args.invokers[:], tt.args.url.Clone(), tt.args.invocation} + res[len(d.Route(args.invokers, args.url, args.invocation))]++ + } + for k, v := range ans { + if float32(res[k]+50) > v && float32(res[k]-50) < v { + } else { + assert.Fail(t, "out of range") + } + } + } + }) } - got := ar.Route(ivks, consumerUrl, invocation.NewRPCInvocation("errMethod", nil, nil)) - assert.Equal(t, len(ivks), len(got)) } diff --git a/common/constant/key.go b/common/constant/key.go index 21b853989..393036b38 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -17,10 +17,6 @@ package constant -import ( - "math" -) - type DubboCtxKey string const ( @@ -313,6 +309,7 @@ const ( ScriptRouterRuleSuffix = ".script-router" TagRouterRuleSuffix = ".tag-router" ConditionRouterRuleSuffix = ".condition-router" // Specify condition router suffix + AffinityRuleSuffix = ".affinity-router" // Specify affinity router suffix MeshRouteSuffix = ".MESHAPPRULE" // Specify mesh router suffix ForceUseTag = "dubbo.force.tag" // the tag in attachment ForceUseCondition = "dubbo.force.condition" @@ -320,22 +317,20 @@ const ( ConditionKey = "dubbo.condition" AttachmentKey = DubboCtxKey("attachment") // key in context in invoker TagRouterFactoryKey = "tag" + AffinityAppRouterFactoryKey = "application.affinity" + AffinityServiceRouterFactoryKey = "service.affinity" ConditionAppRouterFactoryKey = "provider.condition" ConditionServiceRouterFactoryKey = "service.condition" ScriptRouterFactoryKey = "consumer.script" ForceKey = "force" TrafficDisableKey = "trafficDisable" - PriorityKey = "priority" - RatioKey = "RatioKey" Arguments = "arguments" Attachments = "attachments" Param = "param" Scope = "scope" Wildcard = "wildcard" MeshRouterFactoryKey = "mesh" - DefaultRouteRatio = 0 DefaultRouteConditionSubSetWeight = 100 - DefaultRoutePriority = 0 ) // Auth filter @@ -455,7 +450,4 @@ const ( // priority const ( DefaultPriority = 0 - HighestPriority = math.MinInt32 - // LowestPriority for metadata service - LowestPriority = math.MaxInt32 ) diff --git a/config/router_config.go b/config/router_config.go index e8f155789..7971fd58f 100644 --- a/config/router_config.go +++ b/config/router_config.go @@ -17,6 +17,10 @@ package config +import ( + "reflect" +) + import ( "github.com/creasty/defaults" ) @@ -50,12 +54,8 @@ type Tag struct { } type ConditionRule struct { - Priority int `default:"0" yaml:"priority" json:"priority,omitempty" property:"priority"` - From ConditionRuleFrom `yaml:"from" json:"from,omitempty" property:"from"` - Disable bool `default:"false" yaml:"trafficDisable" json:"trafficDisable,omitempty" property:"trafficDisable"` - To []ConditionRuleTo `yaml:"to" json:"to,omitempty" property:"to"` - Ratio int `default:"0" yaml:"ratio" json:"ratio,omitempty" property:"priority"` - Force bool `default:"false" yaml:"force" json:"force,omitempty" property:"force"` + From ConditionRuleFrom `yaml:"from" json:"from,omitempty" property:"from"` + To []ConditionRuleTo `yaml:"to" json:"to,omitempty" property:"to"` } type ConditionRuleFrom struct { @@ -67,14 +67,32 @@ type ConditionRuleTo struct { Weight int `default:"100" yaml:"weight" json:"weight,omitempty" property:"weight"` } +type ConditionRuleDisable struct { + Match string `yaml:"match" json:"match,omitempty" property:"match"` +} + +type AffinityAware struct { + Key string `default:"" yaml:"key" json:"key,omitempty" property:"key"` + Ratio int32 `default:"0" yaml:"ratio" json:"ratio,omitempty" property:"ratio"` +} + // ConditionRouter -- when RouteConfigVersion == v3.1, decode by this type ConditionRouter struct { - Scope string `validate:"required" yaml:"scope" json:"scope,omitempty" property:"scope"` // must be chosen from `service` and `application`. - Key string `validate:"required" yaml:"key" json:"key,omitempty" property:"key"` // specifies which service or application the rule body acts on. - Force bool `default:"false" yaml:"force" json:"force,omitempty" property:"force"` - Runtime bool `default:"false" yaml:"runtime" json:"runtime,omitempty" property:"runtime"` - Enabled bool `default:"true" yaml:"enabled" json:"enabled,omitempty" property:"enabled"` - Conditions []ConditionRule `yaml:"conditions" json:"conditions,omitempty" property:"conditions"` + Scope string `validate:"required" yaml:"scope" json:"scope,omitempty" property:"scope"` // must be chosen from `service` and `application`. + Key string `validate:"required" yaml:"key" json:"key,omitempty" property:"key"` // specifies which service or application the rule body acts on. + Force bool `default:"false" yaml:"force" json:"force,omitempty" property:"force"` + Runtime bool `default:"false" yaml:"runtime" json:"runtime,omitempty" property:"runtime"` + Enabled bool `default:"true" yaml:"enabled" json:"enabled,omitempty" property:"enabled"` + Conditions []*ConditionRule `yaml:"conditions" json:"conditions,omitempty" property:"conditions"` +} + +// AffinityRouter -- RouteConfigVersion == v3.1 +type AffinityRouter struct { + Scope string `validate:"required" yaml:"scope" json:"scope,omitempty" property:"scope"` // must be chosen from `service` and `application`. + Key string `validate:"required" yaml:"key" json:"key,omitempty" property:"key"` // specifies which service or application the rule body acts on. + Runtime bool `default:"false" yaml:"runtime" json:"runtime,omitempty" property:"runtime"` + Enabled bool `default:"true" yaml:"enabled" json:"enabled,omitempty" property:"enabled"` + AffinityAware AffinityAware `yaml:"affinityAware" json:"affinityAware,omitempty" property:"affinityAware"` } // Prefix dubbo.router @@ -179,3 +197,18 @@ func (rcb *RouterConfigBuilder) Build() *RouterConfig { } return rcb.routerConfig } + +func (x *ConditionRule) Equal(t *ConditionRule) bool { + if !reflect.DeepEqual(x.From, t.From) { + return false + } + if len(x.To) != len(t.To) { + return false + } + for i := range x.To { + if !reflect.DeepEqual(x.To[i], t.To[i]) { + return false + } + } + return true +}