Similarityoung commented on code in PR #692: URL: https://github.com/apache/dubbo-go-pixiu/pull/692#discussion_r2259251008
########## pkg/cluster/retry/countbased/count_based.go: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package countbased + +import ( + "fmt" +) + +import ( + "github.com/apache/dubbo-go-pixiu/pkg/cluster/retry" + "github.com/apache/dubbo-go-pixiu/pkg/model" +) + +func init() { + retry.RegisterRetryPolicy(model.RetryerCountBased, newCountBasedRetry) +} + +type CountBasedRetry struct { + MaxAttempts uint + retryTimes uint +} + +func (r *CountBasedRetry) Attempt(err error) bool { + if r.retryTimes < r.MaxAttempts { + r.retryTimes++ + return true + } + return false +} + +func (r *CountBasedRetry) Reset() { + r.retryTimes = 0 +} + +func newCountBasedRetry(config map[string]any) (retry.RetryPolicy, error) { + timesValue, exists := config["times"] + if !exists { + return nil, fmt.Errorf("'times' field is missing in retry configuration") + } + + timesUint, ok := timesValue.(int) + if !ok { + return nil, fmt.Errorf("invalid type for 'retry.count_based.times', expected int but got %T", timesValue) + } + + // Total attempts = 1 initial try plus number of retries. + return &CountBasedRetry{MaxAttempts: uint(timesUint) + 1}, nil Review Comment: ```go timesValue, exists := config["times"] if !exists { return nil, fmt.Errorf("'times' field is missing in retry configuration") } ``` maybe we can set a default value (e.g., 3) and print warnf when 'times' field is missing. btw, does times require hard code? ########## pkg/cluster/retry/exponentialbackoff/exponential_backoff.go: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package exponentialbackoff + +import ( + "fmt" + "math" + "math/rand" + "time" +) + +import ( + "github.com/mitchellh/mapstructure" +) + +import ( + "github.com/apache/dubbo-go-pixiu/pkg/cluster/retry" + "github.com/apache/dubbo-go-pixiu/pkg/model" +) + +func init() { + retry.RegisterRetryPolicy(model.RetryerExponentialBackoff, newExponentialBackoffRetry) +} + +type ExponentialBackoffRetry struct { + MaxAttempts uint + InitialInterval time.Duration + MaxInterval time.Duration + Multiplier float64 + retryTimes uint +} + +type ExponentialBackoffConfig struct { + Times uint `mapstructure:"times" default:"3"` + InitialInterval string `mapstructure:"initialInterval" default:"100ms"` + MaxInterval string `mapstructure:"maxInterval" default:"5s"` + Multiplier float64 `mapstructure:"multiplier" default:"2.0"` +} + +func (e *ExponentialBackoffRetry) Attempt(err error) bool { + if e.retryTimes >= e.MaxAttempts { + return false + } + + // Don't wait before the first try + if e.retryTimes > 0 { + backoff := float64(e.InitialInterval) * math.Pow(e.Multiplier, float64(e.retryTimes-1)) + cappedBackoff := time.Duration(math.Min(backoff, float64(e.MaxInterval))) + // Add jitter to prevent thundering herd + jitter := time.Duration(rand.Intn(100)) * time.Millisecond // NOSONAR + time.Sleep(cappedBackoff + jitter) + } + + e.retryTimes++ + return true +} + +func (e *ExponentialBackoffRetry) Reset() { + e.retryTimes = 0 +} + +func newExponentialBackoffRetry(config map[string]any) (retry.RetryPolicy, error) { + var cfg ExponentialBackoffConfig + if err := mapstructure.Decode(config, &cfg); err != nil { Review Comment: `mapstructure.Decode` doesn't seem to process default tags on its own, maybe u can add `defaults.Set(f.cfg)` ########## pkg/cluster/retry/retry.go: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package retry + +import ( + "fmt" +) + +import ( + "github.com/apache/dubbo-go-pixiu/pkg/logger" + "github.com/apache/dubbo-go-pixiu/pkg/model" +) + +type ( + // RetryPolicy defines the interface for retry logic. + RetryPolicy interface { + // Attempt checks if a retry should be performed and potentially waits. + // It returns true if the request should be attempted, false otherwise. + // The `err` parameter can be used for policies that act on specific errors. + Attempt(err error) bool Review Comment: i do not find the usage of err, do we need it? ########## pkg/filter/llm/proxy/filter.go: ########## @@ -48,188 +49,271 @@ func init() { } type ( - // Plugin is http filter plugin. - Plugin struct { - } - // FilterFactory is http filter instance + // Plugin is the main plugin entrypoint. + Plugin struct{} + + // FilterFactory creates filter instances. FilterFactory struct { cfg *Config client http.Client } + + // Filter is the processing entity for each request. Filter struct { - client http.Client - scheme string + client http.Client + scheme string + strategy *Strategy + clusterManager *server.ClusterManager } - // Config describe the config of FilterFactory + + // Config describes the top-level configuration for the filter. + // Note: Strategy-specific configurations are now defined on the endpoints. Config struct { Timeout time.Duration `yaml:"timeout" json:"timeout,omitempty"` MaxIdleConns int `yaml:"maxIdleConns" json:"maxIdleConns,omitempty"` MaxIdleConnsPerHost int `yaml:"maxIdleConnsPerHost" json:"maxIdleConnsPerHost,omitempty"` MaxConnsPerHost int `yaml:"maxConnsPerHost" json:"maxConnsPerHost,omitempty"` Scheme string `yaml:"scheme" json:"scheme,omitempty" default:"http"` } + + RequestExecutor struct { + hc *contexthttp.HttpContext + filter *Filter + clusterName string + clusterManager *server.ClusterManager + } ) +// Kind returns the unique name of this filter. func (p *Plugin) Kind() string { return Kind } +// CreateFilterFactory creates a new factory instance for this filter. func (p *Plugin) CreateFilterFactory() (filter.HttpFilterFactory, error) { return &FilterFactory{cfg: &Config{}}, nil } +// Config returns the configuration struct for the factory. func (factory *FilterFactory) Config() any { return factory.cfg } +// Apply initializes the factory from its configuration. func (factory *FilterFactory) Apply() error { scheme := strings.TrimSpace(strings.ToLower(factory.cfg.Scheme)) - if scheme != "http" && scheme != "https" { return fmt.Errorf("%s: scheme must be http or https", Kind) } - factory.cfg.Scheme = scheme cfg := factory.cfg - client := http.Client{ + factory.client = http.Client{ Timeout: cfg.Timeout, Review Comment: nice try, could you also modify this part in `pkg/filter/http/httpproxy/routerfilter.go` to this more elegant way? ########## pkg/filter/llm/proxy/filter.go: ########## @@ -48,188 +49,271 @@ func init() { } type ( - // Plugin is http filter plugin. - Plugin struct { - } - // FilterFactory is http filter instance + // Plugin is the main plugin entrypoint. + Plugin struct{} + + // FilterFactory creates filter instances. FilterFactory struct { cfg *Config client http.Client } + + // Filter is the processing entity for each request. Filter struct { - client http.Client - scheme string + client http.Client + scheme string + strategy *Strategy + clusterManager *server.ClusterManager } - // Config describe the config of FilterFactory + + // Config describes the top-level configuration for the filter. + // Note: Strategy-specific configurations are now defined on the endpoints. Config struct { Timeout time.Duration `yaml:"timeout" json:"timeout,omitempty"` MaxIdleConns int `yaml:"maxIdleConns" json:"maxIdleConns,omitempty"` MaxIdleConnsPerHost int `yaml:"maxIdleConnsPerHost" json:"maxIdleConnsPerHost,omitempty"` MaxConnsPerHost int `yaml:"maxConnsPerHost" json:"maxConnsPerHost,omitempty"` Scheme string `yaml:"scheme" json:"scheme,omitempty" default:"http"` } + + RequestExecutor struct { + hc *contexthttp.HttpContext + filter *Filter + clusterName string + clusterManager *server.ClusterManager + } ) +// Kind returns the unique name of this filter. func (p *Plugin) Kind() string { return Kind } +// CreateFilterFactory creates a new factory instance for this filter. func (p *Plugin) CreateFilterFactory() (filter.HttpFilterFactory, error) { return &FilterFactory{cfg: &Config{}}, nil } +// Config returns the configuration struct for the factory. func (factory *FilterFactory) Config() any { return factory.cfg } +// Apply initializes the factory from its configuration. func (factory *FilterFactory) Apply() error { scheme := strings.TrimSpace(strings.ToLower(factory.cfg.Scheme)) - if scheme != "http" && scheme != "https" { return fmt.Errorf("%s: scheme must be http or https", Kind) } - factory.cfg.Scheme = scheme cfg := factory.cfg - client := http.Client{ + factory.client = http.Client{ Timeout: cfg.Timeout, - Transport: http.RoundTripper(&http.Transport{ + Transport: &http.Transport{ MaxIdleConns: cfg.MaxIdleConns, MaxIdleConnsPerHost: cfg.MaxIdleConnsPerHost, MaxConnsPerHost: cfg.MaxConnsPerHost, - }), + }, } - factory.client = client return nil } +// PrepareFilterChain creates a new Filter instance for a request chain. func (factory *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext, chain filter.FilterChain) error { - //reuse http client - f := &Filter{factory.client, factory.cfg.Scheme} + f := &Filter{ + client: factory.client, + scheme: factory.cfg.Scheme, + strategy: &Strategy{}, + clusterManager: server.GetClusterManager(), + } chain.AppendDecodeFilters(f) return nil } +// Decode is the main entry point for processing an incoming request. func (f *Filter) Decode(hc *contexthttp.HttpContext) filter.FilterStatus { rEntry := hc.GetRouteEntry() if rEntry == nil { - bt, _ := json.Marshal(contexthttp.ErrResponse{Message: "no route entry"}) - hc.SendLocalReply(http.StatusBadRequest, bt) + sendJSONError(hc, http.StatusBadRequest, "no route entry found for request") return filter.Stop } - logger.Debugf("[dubbo-go-pixiu] client choose endpoint from cluster: %v", rEntry.Cluster) - var ( - clusterName = rEntry.Cluster - clusterManager = server.GetClusterManager() - endpoint = clusterManager.PickEndpoint(clusterName, hc) - ) - - if endpoint == nil { - logger.Debugf("[dubbo-go-pixiu] cluster not found endpoint") - bt, _ := json.Marshal(contexthttp.ErrResponse{Message: "cluster not found endpoint"}) - hc.SendLocalReply(http.StatusServiceUnavailable, bt) + // Ensure the request body can be re-read for retries + if err := f.prepareRequestBody(hc); err != nil { + sendJSONError(hc, http.StatusInternalServerError, fmt.Sprintf("failed to read request body: %v", err)) return filter.Stop } + defer hc.Request.Body.Close() Review Comment: in prepareRequestBody(line 192) u close the original body, why do u close this again. but it is generally considered good practice to keep it to Defensive Programming andRobustness -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
