Similarityoung commented on code in PR #777:
URL: https://github.com/apache/dubbo-go-pixiu/pull/777#discussion_r2542630426
##########
pkg/common/router/router.go:
##########
@@ -37,94 +37,148 @@ import (
"github.com/apache/dubbo-go-pixiu/pkg/server"
)
-type (
- // RouterCoordinator the router coordinator for http connection manager
- RouterCoordinator struct {
- activeConfig *model.RouteConfiguration
- rw sync.RWMutex
- }
-)
+// RouterCoordinator the router coordinator for http connection manager
+type RouterCoordinator struct {
+ active snapshotHolder // atomic snapshot
+ mu sync.Mutex
+ store map[string]*model.Router // temp store for dynamic update, DO
NOT read directly
+ timer *time.Timer // debounce timer
+ debounce time.Duration // merge window, default 50ms
+}
// CreateRouterCoordinator create coordinator for http connection manager
func CreateRouterCoordinator(routeConfig *model.RouteConfiguration)
*RouterCoordinator {
- rc := &RouterCoordinator{activeConfig: routeConfig}
+ rc := &RouterCoordinator{
+ store: make(map[string]*model.Router),
+ debounce: 50 * time.Millisecond, // merge window
+ }
if routeConfig.Dynamic {
server.GetRouterManager().AddRouterListener(rc)
}
- rc.initTrie()
- rc.initRegex()
+ // build initial config and store snapshot
+ first := buildConfig(routeConfig.Routes)
+ rc.active.store(model.ToSnapshot(first))
+ // copy initial routes to store
+ for _, r := range routeConfig.Routes {
+ rc.store[r.ID] = r
+ }
return rc
}
-// Route find routeAction for request
func (rm *RouterCoordinator) Route(hc *http.HttpContext) (*model.RouteAction,
error) {
- rm.rw.RLock()
- defer rm.rw.RUnlock()
-
return rm.route(hc.Request)
}
func (rm *RouterCoordinator) RouteByPathAndName(path, method string)
(*model.RouteAction, error) {
- rm.rw.RLock()
- defer rm.rw.RUnlock()
-
- return rm.activeConfig.RouteByPathAndMethod(path, method)
+ s := rm.active.load()
+ if s == nil {
+ return nil, errors.New("router configuration is empty")
+ }
+ t := s.MethodTries[method]
+ if t == nil {
+ return nil, errors.Errorf("route failed for %s, no rules
matched.", stringutil.GetTrieKey(method, path))
+ }
+ node, _, ok := t.Match(stringutil.GetTrieKey(method, path))
+ if !ok || node == nil || node.GetBizInfo() == nil {
+ return nil, errors.Errorf("route failed for %s, no rules
matched.", stringutil.GetTrieKey(method, path))
+ }
+ act := node.GetBizInfo().(model.RouteAction)
+ return &act, nil
}
func (rm *RouterCoordinator) route(req *stdHttp.Request) (*model.RouteAction,
error) {
- // match those route that only contains headers first
- var matched []*model.Router
- for _, route := range rm.activeConfig.Routes {
- if len(route.Match.Prefix) > 0 {
+ s := rm.active.load()
+ if s == nil {
+ return nil, errors.New("router configuration is empty")
+ }
+
+ // header-only first
+ for _, hr := range s.HeaderOnly {
+ if !model.MethodAllowed(hr.Methods, req.Method) {
continue
}
- if route.Match.MatchHeader(req) {
- matched = append(matched, route)
+ if matchHeaders(hr.Headers, req) {
+ return &hr.Action, nil
}
}
+ // Trie
+ t := s.MethodTries[req.Method]
+ if t == nil {
+ return nil, errors.Errorf("route failed for %s, no rules
matched.", stringutil.GetTrieKey(req.Method, req.URL.Path))
- // always return the first match of header if got any
- if len(matched) > 0 {
- if len(matched[0].Route.Cluster) == 0 {
- return nil, errors.New("action is nil. please check
your configuration.")
- }
- return &matched[0].Route, nil
}
- // match those route that only contains prefix
- // TODO: may consider implementing both prefix and header in the future
- return rm.activeConfig.Route(req)
+ node, _, ok := t.Match(stringutil.GetTrieKey(req.Method, req.URL.Path))
+ if !ok || node == nil || node.GetBizInfo() == nil {
+ return nil, errors.Errorf("route failed for %s, no rules
matched.", stringutil.GetTrieKey(req.Method, req.URL.Path))
+ }
+ act := node.GetBizInfo().(model.RouteAction)
Review Comment:
recommend
```go
act, ok := node.GetBizInfo().(model.RouteAction)
if !ok {
return nil, errors.Errorf("invalid route action type for %s", ...)
}
```
##########
pkg/common/router/router.go:
##########
@@ -37,94 +38,149 @@ import (
"github.com/apache/dubbo-go-pixiu/pkg/server"
)
-type (
- // RouterCoordinator the router coordinator for http connection manager
- RouterCoordinator struct {
- activeConfig *model.RouteConfiguration
- rw sync.RWMutex
- }
-)
+// RouterCoordinator the router coordinator for http connection manager
+type RouterCoordinator struct {
+ mainSnapshot atomic.Pointer[model.RouteSnapshot] // atomic snapshot
+ mu sync.Mutex
+
+ nextSnapshot map[string]*model.Router // temp store for dynamic update,
DO NOT read directly
+
+ timer *time.Timer // debounce timer
+ debounce time.Duration // merge window, default 50ms
+}
// CreateRouterCoordinator create coordinator for http connection manager
func CreateRouterCoordinator(routeConfig *model.RouteConfiguration)
*RouterCoordinator {
- rc := &RouterCoordinator{activeConfig: routeConfig}
+ rc := &RouterCoordinator{
+ nextSnapshot: make(map[string]*model.Router),
+ debounce: 50 * time.Millisecond, // merge window
+ }
if routeConfig.Dynamic {
server.GetRouterManager().AddRouterListener(rc)
}
- rc.initTrie()
- rc.initRegex()
+ // build initial config and store snapshot
+
rc.mainSnapshot.Store(model.ToSnapshot(buildRouteConfiguration(routeConfig.Routes)))
+ // copy initial routes to store
+ for _, r := range routeConfig.Routes {
+ rc.nextSnapshot[r.ID] = r
+ }
return rc
}
-// Route find routeAction for request
func (rm *RouterCoordinator) Route(hc *http.HttpContext) (*model.RouteAction,
error) {
- rm.rw.RLock()
- defer rm.rw.RUnlock()
-
return rm.route(hc.Request)
}
func (rm *RouterCoordinator) RouteByPathAndName(path, method string)
(*model.RouteAction, error) {
- rm.rw.RLock()
- defer rm.rw.RUnlock()
-
- return rm.activeConfig.RouteByPathAndMethod(path, method)
+ s := rm.mainSnapshot.Load()
+ if s == nil {
+ return nil, errors.New("router configuration is empty")
+ }
+ t := s.MethodTries[method]
+ if t == nil {
+ return nil, errors.Errorf("route failed for %s, no rules
matched.", stringutil.GetTrieKey(method, path))
+ }
+ node, _, ok := t.Match(stringutil.GetTrieKey(method, path))
+ if !ok || node == nil || node.GetBizInfo() == nil {
+ return nil, errors.Errorf("route failed for %s, no rules
matched.", stringutil.GetTrieKey(method, path))
+ }
+ act := node.GetBizInfo().(model.RouteAction)
+ return &act, nil
}
func (rm *RouterCoordinator) route(req *stdHttp.Request) (*model.RouteAction,
error) {
- // match those route that only contains headers first
- var matched []*model.Router
- for _, route := range rm.activeConfig.Routes {
- if len(route.Match.Prefix) > 0 {
+ s := rm.mainSnapshot.Load()
+ if s == nil {
+ return nil, errors.New("router configuration is empty")
+ }
+
+ // header-only first
+ for _, hr := range s.HeaderOnly {
+ if !model.MethodAllowed(hr.Methods, req.Method) {
continue
}
- if route.Match.MatchHeader(req) {
- matched = append(matched, route)
+ if matchHeaders(hr.Headers, req) {
+ return &hr.Action, nil
}
}
+ // Trie
+ t := s.MethodTries[req.Method]
+ if t == nil {
+ return nil, errors.Errorf("route failed for %s, no rules
matched.", stringutil.GetTrieKey(req.Method, req.URL.Path))
- // always return the first match of header if got any
- if len(matched) > 0 {
- if len(matched[0].Route.Cluster) == 0 {
- return nil, errors.New("action is nil. please check
your configuration.")
- }
- return &matched[0].Route, nil
}
- // match those route that only contains prefix
- // TODO: may consider implementing both prefix and header in the future
- return rm.activeConfig.Route(req)
+ node, _, ok := t.Match(stringutil.GetTrieKey(req.Method, req.URL.Path))
+ if !ok || node == nil || node.GetBizInfo() == nil {
+ return nil, errors.Errorf("route failed for %s, no rules
matched.", stringutil.GetTrieKey(req.Method, req.URL.Path))
+ }
+ act := node.GetBizInfo().(model.RouteAction)
+ return &act, nil
}
-func getTrieKey(method string, path string, isPrefix bool) string {
- if isPrefix {
- if !strings.HasSuffix(path, constant.PathSlash) {
- path = path + constant.PathSlash
+// reset timer or publish directly
+func (rm *RouterCoordinator) schedulePublishLocked() {
+ if rm.debounce <= 0 {
+ // fallback: immediate
+ rm.publishLocked()
+ return
+ }
+ if rm.timer == nil {
+ rm.timer = time.NewTimer(rm.debounce)
+ go rm.awaitAndPublish()
+ return
+ }
+ // clear timer channel
+ if !rm.timer.Stop() {
+ select {
+ case <-rm.timer.C:
+ default:
}
- path = path + "**"
}
- return stringutil.GetTrieKey(method, path)
+ rm.timer.Reset(rm.debounce)
+}
+
+// wait for timer and publish
+func (rm *RouterCoordinator) awaitAndPublish() {
+ <-rm.timer.C
+ rm.mu.Lock()
+ defer rm.mu.Unlock()
+ rm.publishLocked()
+ rm.timer = nil
}
-func (rm *RouterCoordinator) initTrie() {
- if rm.activeConfig.RouteTrie.IsEmpty() {
- rm.activeConfig.RouteTrie = trie.NewTrie()
+// publish: clone from store -> build new config -> atomic switch
+func (rm *RouterCoordinator) publishLocked() {
+ // 1) clone routes
+ next := make([]*model.Router, 0, len(rm.nextSnapshot))
+ for _, r := range rm.nextSnapshot {
+ next = append(next, r)
}
- for _, router := range rm.activeConfig.Routes {
- rm.OnAddRouter(router)
+ // 2) build new config
+ cfg := buildRouteConfiguration(next)
+ // 3) atomic switch
+ rm.mainSnapshot.Store(model.ToSnapshot(cfg))
+}
+
+func buildRouteConfiguration(routes []*model.Router) *model.RouteConfiguration
{
+ cfg := &model.RouteConfiguration{
+ RouteTrie: trie.NewTrie(),
+ Routes: make([]*model.Router, 0, len(routes)),
+ Dynamic: false,
Review Comment:
这个 cfg,我看别的地方也没有用到,除了model.ToSnapshot 里,model.ToSnapshot和这个
buildRouteConfiguration 都创建了树,但是 cfg 的树好像没有在其他地方使用过,所以是不是这个地方可以直接删去,直接在
ToSnapshot(cfg *RouteConfiguration) 修改入参为 []*Router 会不会好点。
##########
pkg/common/router/router.go:
##########
@@ -133,40 +189,44 @@ func (rm *RouterCoordinator) initRegex() {
// OnAddRouter add router
func (rm *RouterCoordinator) OnAddRouter(r *model.Router) {
- //TODO: lock move to trie node
- rm.rw.Lock()
- defer rm.rw.Unlock()
- if r.Match.Methods == nil {
- r.Match.Methods = []string{constant.Get, constant.Put,
constant.Delete, constant.Post, constant.Options}
- }
- isPrefix := r.Match.Prefix != ""
- for _, method := range r.Match.Methods {
- var key string
- if isPrefix {
- key = getTrieKey(method, r.Match.Prefix, isPrefix)
- } else {
- key = getTrieKey(method, r.Match.Path, isPrefix)
+ rm.mu.Lock()
+ defer rm.mu.Unlock()
+ rm.nextSnapshot[r.ID] = r
+ rm.schedulePublishLocked()
+}
+
+func fillTrieFromRoutes(cfg *model.RouteConfiguration) {
+ for _, r := range cfg.Routes {
+ methods := r.Match.Methods
+ if len(methods) == 0 {
+ methods = []string{"GET", "POST", "PUT", "DELETE",
"PATCH", "OPTIONS", "HEAD"}
+ }
+ for _, m := range methods {
+ key := stringutil.GetTrieKeyWithPrefix(m, r.Match.Path,
r.Match.Prefix, r.Match.Prefix != "")
+ _, _ = cfg.RouteTrie.Put(key, r.Route)
}
- _, _ = rm.activeConfig.RouteTrie.Put(key, r.Route)
}
}
// OnDeleteRouter delete router
func (rm *RouterCoordinator) OnDeleteRouter(r *model.Router) {
- rm.rw.Lock()
- defer rm.rw.Unlock()
-
- if r.Match.Methods == nil {
- r.Match.Methods = []string{constant.Get, constant.Put,
constant.Delete, constant.Post}
- }
- isPrefix := r.Match.Prefix != ""
- for _, method := range r.Match.Methods {
- var key string
- if isPrefix {
- key = getTrieKey(method, r.Match.Prefix, isPrefix)
- } else {
- key = getTrieKey(method, r.Match.Path, isPrefix)
+ rm.mu.Lock()
+ defer rm.mu.Unlock()
+ delete(rm.nextSnapshot, r.ID)
+ rm.schedulePublishLocked()
+}
+
+func matchHeaders(chs []model.CompiledHeader, r *stdHttp.Request) bool {
Review Comment:
这里的逻辑是只要 header 有一个匹配就算匹配上了?是 AND 逻辑还是就是 OR 逻辑
##########
pkg/common/router/router.go:
##########
@@ -37,94 +37,148 @@ import (
"github.com/apache/dubbo-go-pixiu/pkg/server"
)
-type (
- // RouterCoordinator the router coordinator for http connection manager
- RouterCoordinator struct {
- activeConfig *model.RouteConfiguration
- rw sync.RWMutex
- }
-)
+// RouterCoordinator the router coordinator for http connection manager
+type RouterCoordinator struct {
+ active snapshotHolder // atomic snapshot
+ mu sync.Mutex
+ store map[string]*model.Router // temp store for dynamic update, DO
NOT read directly
+ timer *time.Timer // debounce timer
+ debounce time.Duration // merge window, default 50ms
+}
// CreateRouterCoordinator create coordinator for http connection manager
func CreateRouterCoordinator(routeConfig *model.RouteConfiguration)
*RouterCoordinator {
- rc := &RouterCoordinator{activeConfig: routeConfig}
+ rc := &RouterCoordinator{
+ store: make(map[string]*model.Router),
+ debounce: 50 * time.Millisecond, // merge window
+ }
if routeConfig.Dynamic {
server.GetRouterManager().AddRouterListener(rc)
}
- rc.initTrie()
- rc.initRegex()
+ // build initial config and store snapshot
+ first := buildConfig(routeConfig.Routes)
+ rc.active.store(model.ToSnapshot(first))
+ // copy initial routes to store
+ for _, r := range routeConfig.Routes {
+ rc.store[r.ID] = r
+ }
return rc
}
-// Route find routeAction for request
func (rm *RouterCoordinator) Route(hc *http.HttpContext) (*model.RouteAction,
error) {
- rm.rw.RLock()
- defer rm.rw.RUnlock()
-
return rm.route(hc.Request)
}
func (rm *RouterCoordinator) RouteByPathAndName(path, method string)
(*model.RouteAction, error) {
- rm.rw.RLock()
- defer rm.rw.RUnlock()
-
- return rm.activeConfig.RouteByPathAndMethod(path, method)
+ s := rm.active.load()
+ if s == nil {
+ return nil, errors.New("router configuration is empty")
+ }
+ t := s.MethodTries[method]
+ if t == nil {
+ return nil, errors.Errorf("route failed for %s, no rules
matched.", stringutil.GetTrieKey(method, path))
+ }
+ node, _, ok := t.Match(stringutil.GetTrieKey(method, path))
+ if !ok || node == nil || node.GetBizInfo() == nil {
+ return nil, errors.Errorf("route failed for %s, no rules
matched.", stringutil.GetTrieKey(method, path))
+ }
+ act := node.GetBizInfo().(model.RouteAction)
Review Comment:
recommend
```go
act, ok := node.GetBizInfo().(model.RouteAction)
if !ok {
return nil, errors.Errorf("invalid route action type for %s", ...)
}
```
##########
pkg/model/router_snapshot.go:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package model
+
+import (
+ "regexp"
+ "sync"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/common/router/trie"
+ "github.com/apache/dubbo-go-pixiu/pkg/common/util/stringutil"
+ "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+var (
+ constMethods = []string{"GET", "POST", "PUT", "DELETE", "PATCH",
"OPTIONS", "HEAD"}
+)
+
+// RouteSnapshot Read-only snapshot for routing
+type RouteSnapshot struct {
+ // multi-trie for each method, built once and read-only
+ MethodTries map[string]*trie.Trie
+
+ // precompiled regex for header-only routes
+ HeaderOnly []HeaderRoute
+}
+
+type HeaderRoute struct {
+ Methods []string
+ Headers []CompiledHeader
+ Action RouteAction
+}
+
+type CompiledHeader struct {
+ Name string
+ Regex *regexp.Regexp
+ Values []string
+}
+
+func MethodAllowed(methods []string, m string) bool {
+ if len(methods) == 0 {
+ return true
+ }
+ for _, x := range methods {
+ if x == m {
+ return true
+ }
+ }
+ return false
+}
+
+var regexCache sync.Map // map[string]*regexp.Regexp
+
+func getRegexpWithCache(pat string) *regexp.Regexp {
+ if v, ok := regexCache.Load(pat); ok {
+ return v.(*regexp.Regexp)
+ }
+ // Compile fail return nil (caller will ignore this regex)
+ re, err := regexp.Compile(pat)
+ if err != nil {
+ return nil
+ }
+ if v, ok := regexCache.LoadOrStore(pat, re); ok {
+ return v.(*regexp.Regexp)
+ }
+ return re
+}
+
+// compiledHeaderSlicePool is a pool for temporary []CompiledHeader slices
during snapshot building
+var compiledHeaderSlicePool = sync.Pool{
+ New: func() any {
+ s := make([]CompiledHeader, 0, 4) // start with small capacity,
grow as needed
+ return &s
+ },
+}
+
+func ToSnapshot(cfg *RouteConfiguration) *RouteSnapshot {
+ s := &RouteSnapshot{
+ MethodTries: make(map[string]*trie.Trie, 8),
+ }
+
+ // pre-scan header-only routes count
+ headerOnlyCount := 0
+ for _, r := range cfg.Routes {
+ if r.Match.Path == "" && r.Match.Prefix == "" &&
len(r.Match.Headers) > 0 {
+ headerOnlyCount++
+ }
+ }
+
+ if headerOnlyCount > 0 {
+ s.HeaderOnly = make([]HeaderRoute, 0, headerOnlyCount)
+ }
+
+ // part to get or create trie for a method
+ getTrie := func(m string) *trie.Trie {
+ if t := s.MethodTries[m]; t != nil {
+ return t
+ }
+ nt := trie.NewTrie()
+ s.MethodTries[m] = &nt
+ return &nt
+ }
+
+ for _, r := range cfg.Routes {
+ // A) header-only:with Headers, without Path / Prefix
+ if r.Match.Path == "" && r.Match.Prefix == "" &&
len(r.Match.Headers) > 0 {
+ hr := HeaderRoute{
+ Methods: r.Match.Methods,
+ Action: r.Route,
+ }
+
+ // use temporary slice from pool to build compiled
headers
+ chPtr :=
compiledHeaderSlicePool.Get().(*[]CompiledHeader)
+ ch := (*chPtr)[:0] // reset
+
+ for _, h := range r.Match.Headers {
+ c := CompiledHeader{Name: h.Name}
+ if h.Regex {
+ // 1) the model already has compiled
regex (if any) → use it directly
+ if h.valueRE != nil {
+ c.Regex = h.valueRE
+ } else if len(h.Values) > 0 &&
h.Values[0] != "" {
+ // 2) else use global
cache/compile (cross-snapshot reuse)
+ if re :=
getRegexpWithCache(h.Values[0]); re != nil {
+ c.Regex = re
+ } else {
+ // invalid regex → skip
this header matcher
+ logger.Errorf("Header
regex compiled fail for %v", h.Values[0])
+ continue
+ }
+ }
+ } else {
+ // not regex → copy values directly (if
any)
+ if len(h.Values) > 0 {
+ // direct assignment is ok here
(string slice)
+ c.Values = append(c.Values,
h.Values...)
+ }
+ }
+ ch = append(ch, c)
+ }
+
+ // move the temporary slice content to snapshot
(ownership transferred)
+ hr.Headers = make([]CompiledHeader, len(ch))
+ copy(hr.Headers, ch)
+
+ // reset and put back the temporary slice to pool
+ *chPtr = (*chPtr)[:0]
+ compiledHeaderSlicePool.Put(chPtr)
+
+ s.HeaderOnly = append(s.HeaderOnly, hr)
+ continue
+ }
+
+ // B) Trie
+ methods := r.Match.Methods
+ if len(methods) == 0 {
+ methods = constMethods
+ }
+ for _, m := range methods {
Review Comment:
这里,你的 trie 已经按照 method 隔离了,是不是拼接的时候不需要 method 了
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]