Alanxtl commented on code in PR #777:
URL: https://github.com/apache/dubbo-go-pixiu/pull/777#discussion_r2553759553
##########
pkg/common/router/router.go:
##########
@@ -37,94 +38,149 @@ import (
"github.com/apache/dubbo-go-pixiu/pkg/server"
)
-type (
- // RouterCoordinator the router coordinator for http connection manager
- RouterCoordinator struct {
- activeConfig *model.RouteConfiguration
- rw sync.RWMutex
- }
-)
+// RouterCoordinator the router coordinator for http connection manager
+type RouterCoordinator struct {
+ mainSnapshot atomic.Pointer[model.RouteSnapshot] // atomic snapshot
+ mu sync.Mutex
+
+ nextSnapshot map[string]*model.Router // temp store for dynamic update,
DO NOT read directly
+
+ timer *time.Timer // debounce timer
+ debounce time.Duration // merge window, default 50ms
+}
// CreateRouterCoordinator create coordinator for http connection manager
func CreateRouterCoordinator(routeConfig *model.RouteConfiguration)
*RouterCoordinator {
- rc := &RouterCoordinator{activeConfig: routeConfig}
+ rc := &RouterCoordinator{
+ nextSnapshot: make(map[string]*model.Router),
+ debounce: 50 * time.Millisecond, // merge window
+ }
if routeConfig.Dynamic {
server.GetRouterManager().AddRouterListener(rc)
}
- rc.initTrie()
- rc.initRegex()
+ // build initial config and store snapshot
+
rc.mainSnapshot.Store(model.ToSnapshot(buildRouteConfiguration(routeConfig.Routes)))
+ // copy initial routes to store
+ for _, r := range routeConfig.Routes {
+ rc.nextSnapshot[r.ID] = r
+ }
return rc
}
-// Route find routeAction for request
func (rm *RouterCoordinator) Route(hc *http.HttpContext) (*model.RouteAction,
error) {
- rm.rw.RLock()
- defer rm.rw.RUnlock()
-
return rm.route(hc.Request)
}
func (rm *RouterCoordinator) RouteByPathAndName(path, method string)
(*model.RouteAction, error) {
- rm.rw.RLock()
- defer rm.rw.RUnlock()
-
- return rm.activeConfig.RouteByPathAndMethod(path, method)
+ s := rm.mainSnapshot.Load()
+ if s == nil {
+ return nil, errors.New("router configuration is empty")
+ }
+ t := s.MethodTries[method]
+ if t == nil {
+ return nil, errors.Errorf("route failed for %s, no rules
matched.", stringutil.GetTrieKey(method, path))
+ }
+ node, _, ok := t.Match(stringutil.GetTrieKey(method, path))
+ if !ok || node == nil || node.GetBizInfo() == nil {
+ return nil, errors.Errorf("route failed for %s, no rules
matched.", stringutil.GetTrieKey(method, path))
+ }
+ act := node.GetBizInfo().(model.RouteAction)
+ return &act, nil
}
func (rm *RouterCoordinator) route(req *stdHttp.Request) (*model.RouteAction,
error) {
- // match those route that only contains headers first
- var matched []*model.Router
- for _, route := range rm.activeConfig.Routes {
- if len(route.Match.Prefix) > 0 {
+ s := rm.mainSnapshot.Load()
+ if s == nil {
+ return nil, errors.New("router configuration is empty")
+ }
+
+ // header-only first
+ for _, hr := range s.HeaderOnly {
+ if !model.MethodAllowed(hr.Methods, req.Method) {
continue
}
- if route.Match.MatchHeader(req) {
- matched = append(matched, route)
+ if matchHeaders(hr.Headers, req) {
+ return &hr.Action, nil
}
}
+ // Trie
+ t := s.MethodTries[req.Method]
+ if t == nil {
+ return nil, errors.Errorf("route failed for %s, no rules
matched.", stringutil.GetTrieKey(req.Method, req.URL.Path))
- // always return the first match of header if got any
- if len(matched) > 0 {
- if len(matched[0].Route.Cluster) == 0 {
- return nil, errors.New("action is nil. please check
your configuration.")
- }
- return &matched[0].Route, nil
}
- // match those route that only contains prefix
- // TODO: may consider implementing both prefix and header in the future
- return rm.activeConfig.Route(req)
+ node, _, ok := t.Match(stringutil.GetTrieKey(req.Method, req.URL.Path))
+ if !ok || node == nil || node.GetBizInfo() == nil {
+ return nil, errors.Errorf("route failed for %s, no rules
matched.", stringutil.GetTrieKey(req.Method, req.URL.Path))
+ }
+ act := node.GetBizInfo().(model.RouteAction)
+ return &act, nil
}
-func getTrieKey(method string, path string, isPrefix bool) string {
- if isPrefix {
- if !strings.HasSuffix(path, constant.PathSlash) {
- path = path + constant.PathSlash
+// reset timer or publish directly
+func (rm *RouterCoordinator) schedulePublishLocked() {
+ if rm.debounce <= 0 {
+ // fallback: immediate
+ rm.publishLocked()
+ return
+ }
+ if rm.timer == nil {
+ rm.timer = time.NewTimer(rm.debounce)
+ go rm.awaitAndPublish()
+ return
+ }
+ // clear timer channel
+ if !rm.timer.Stop() {
+ select {
+ case <-rm.timer.C:
+ default:
}
- path = path + "**"
}
- return stringutil.GetTrieKey(method, path)
+ rm.timer.Reset(rm.debounce)
+}
+
+// wait for timer and publish
+func (rm *RouterCoordinator) awaitAndPublish() {
+ <-rm.timer.C
+ rm.mu.Lock()
+ defer rm.mu.Unlock()
+ rm.publishLocked()
+ rm.timer = nil
}
-func (rm *RouterCoordinator) initTrie() {
- if rm.activeConfig.RouteTrie.IsEmpty() {
- rm.activeConfig.RouteTrie = trie.NewTrie()
+// publish: clone from store -> build new config -> atomic switch
+func (rm *RouterCoordinator) publishLocked() {
+ // 1) clone routes
+ next := make([]*model.Router, 0, len(rm.nextSnapshot))
+ for _, r := range rm.nextSnapshot {
+ next = append(next, r)
}
- for _, router := range rm.activeConfig.Routes {
- rm.OnAddRouter(router)
+ // 2) build new config
+ cfg := buildRouteConfiguration(next)
+ // 3) atomic switch
+ rm.mainSnapshot.Store(model.ToSnapshot(cfg))
+}
+
+func buildRouteConfiguration(routes []*model.Router) *model.RouteConfiguration
{
+ cfg := &model.RouteConfiguration{
+ RouteTrie: trie.NewTrie(),
+ Routes: make([]*model.Router, 0, len(routes)),
+ Dynamic: false,
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]