Copilot commented on code in PR #692: URL: https://github.com/apache/dubbo-go-pixiu/pull/692#discussion_r2187157584
########## pkg/cluster/retry/count_based/count_based.go: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package count_based + +import ( + "fmt" +) + +import ( + "github.com/apache/dubbo-go-pixiu/pkg/cluster/retry" + "github.com/apache/dubbo-go-pixiu/pkg/model" +) + +func init() { + retry.RegisterRetryPolicy(model.RetryerCountBased, newCountBasedRetry) +} + +type CountBasedRetry struct { + MaxAttempts uint + currentTry uint +} + +func (r *CountBasedRetry) Attempt(err error) bool { + if r.currentTry < r.MaxAttempts { + r.currentTry++ + return true + } + return false +} + +func (r *CountBasedRetry) Reset() { + r.currentTry = 0 +} + +func newCountBasedRetry(config map[string]any) (retry.Retryer, error) { + timesValue, exists := config["times"] + if !exists { + return nil, fmt.Errorf("'times' field is missing in retry configuration") + } + + timesUint, ok := timesValue.(int) + if !ok { + return nil, fmt.Errorf("invalid type for 'retry.count_based.times', expected int but got %T", timesValue) + } + + // Total attempts = 1 initial try plus number of retries. + return &CountBasedRetry{MaxAttempts: uint(timesUint) + 1}, nil Review Comment: Directly asserting `config["times"]` to `int` may fail if YAML/JSON decoding yields `float64`. Consider supporting both `int` and `float64` (and other numeric types) before converting to `uint`. ```suggestion var timesUint uint switch v := timesValue.(type) { case int: if v < 0 { return nil, fmt.Errorf("'retry.count_based.times' must be non-negative, got %d", v) } timesUint = uint(v) case float64: if v < 0 || v != float64(uint(v)) { return nil, fmt.Errorf("'retry.count_based.times' must be a non-negative integer, got %f", v) } timesUint = uint(v) default: return nil, fmt.Errorf("invalid type for 'retry.count_based.times', expected int or float64 but got %T", timesValue) } // Total attempts = 1 initial try plus number of retries. return &CountBasedRetry{MaxAttempts: timesUint + 1}, nil ``` ########## pkg/filter/llm/proxy/filter.go: ########## @@ -48,188 +49,271 @@ func init() { } type ( - // Plugin is http filter plugin. - Plugin struct { - } - // FilterFactory is http filter instance + // Plugin is the main plugin entrypoint. + Plugin struct{} + + // FilterFactory creates filter instances. FilterFactory struct { cfg *Config client http.Client } + + // Filter is the processing entity for each request. Filter struct { - client http.Client - scheme string + client http.Client + scheme string + strategy *Strategy + clusterManager *server.ClusterManager } - // Config describe the config of FilterFactory + + // Config describes the top-level configuration for the filter. + // Note: Strategy-specific configurations are now defined on the endpoints. Config struct { Timeout time.Duration `yaml:"timeout" json:"timeout,omitempty"` MaxIdleConns int `yaml:"maxIdleConns" json:"maxIdleConns,omitempty"` MaxIdleConnsPerHost int `yaml:"maxIdleConnsPerHost" json:"maxIdleConnsPerHost,omitempty"` MaxConnsPerHost int `yaml:"maxConnsPerHost" json:"maxConnsPerHost,omitempty"` Scheme string `yaml:"scheme" json:"scheme,omitempty" default:"http"` } + + RequestExecutor struct { + hc *contexthttp.HttpContext + filter *Filter + clusterName string + clusterManager *server.ClusterManager + } ) +// Kind returns the unique name of this filter. func (p *Plugin) Kind() string { return Kind } +// CreateFilterFactory creates a new factory instance for this filter. func (p *Plugin) CreateFilterFactory() (filter.HttpFilterFactory, error) { return &FilterFactory{cfg: &Config{}}, nil } +// Config returns the configuration struct for the factory. func (factory *FilterFactory) Config() any { return factory.cfg } +// Apply initializes the factory from its configuration. func (factory *FilterFactory) Apply() error { scheme := strings.TrimSpace(strings.ToLower(factory.cfg.Scheme)) - if scheme != "http" && scheme != "https" { return fmt.Errorf("%s: scheme must be http or https", Kind) } - factory.cfg.Scheme = scheme cfg := factory.cfg - client := http.Client{ + factory.client = http.Client{ Timeout: cfg.Timeout, - Transport: http.RoundTripper(&http.Transport{ + Transport: &http.Transport{ MaxIdleConns: cfg.MaxIdleConns, MaxIdleConnsPerHost: cfg.MaxIdleConnsPerHost, MaxConnsPerHost: cfg.MaxConnsPerHost, - }), + }, } - factory.client = client return nil } +// PrepareFilterChain creates a new Filter instance for a request chain. func (factory *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext, chain filter.FilterChain) error { - //reuse http client - f := &Filter{factory.client, factory.cfg.Scheme} + f := &Filter{ + client: factory.client, + scheme: factory.cfg.Scheme, + strategy: &Strategy{}, + clusterManager: server.GetClusterManager(), + } chain.AppendDecodeFilters(f) return nil } +// Decode is the main entry point for processing an incoming request. func (f *Filter) Decode(hc *contexthttp.HttpContext) filter.FilterStatus { rEntry := hc.GetRouteEntry() if rEntry == nil { - bt, _ := json.Marshal(contexthttp.ErrResponse{Message: "no route entry"}) - hc.SendLocalReply(http.StatusBadRequest, bt) + sendJSONError(hc, http.StatusBadRequest, "no route entry found for request") return filter.Stop } - logger.Debugf("[dubbo-go-pixiu] client choose endpoint from cluster: %v", rEntry.Cluster) - var ( - clusterName = rEntry.Cluster - clusterManager = server.GetClusterManager() - endpoint = clusterManager.PickEndpoint(clusterName, hc) - ) - - if endpoint == nil { - logger.Debugf("[dubbo-go-pixiu] cluster not found endpoint") - bt, _ := json.Marshal(contexthttp.ErrResponse{Message: "cluster not found endpoint"}) - hc.SendLocalReply(http.StatusServiceUnavailable, bt) + // Ensure the request body can be re-read for retries + if err := f.prepareRequestBody(hc); err != nil { + sendJSONError(hc, http.StatusInternalServerError, fmt.Sprintf("failed to read request body: %v", err)) return filter.Stop } + defer hc.Request.Body.Close() + + // Set up the context for our strategy executor + executor := &RequestExecutor{ + hc: hc, + filter: f, + clusterName: rEntry.Cluster, + clusterManager: f.clusterManager, + } + + // Delegate the complex execution logic to the strategy + resp, err := f.strategy.Execute(executor) + + // Handle the outcome + if err != nil { + logger.Infof("[dubbo-go-pixiu] request execution failed after all attempts: %v", err) + var urlErr *url.Error + if errors.As(err, &urlErr) && urlErr.Timeout() { + sendJSONError(hc, http.StatusGatewayTimeout, err.Error()) + } else if resp == nil { + // This handles errors where no response was ever received (e.g., DNS error, connection refused) + sendJSONError(hc, http.StatusServiceUnavailable, err.Error()) + } else { + // A response was received, but it was a failure. Pass it along. + hc.SourceResp = resp + } + return filter.Continue // Let the response writer handle the failed response + } + + logger.Debugf("[dubbo-go-pixiu] client call successful, resp status: %s", resp.Status) + hc.SourceResp = resp + return filter.Continue +} + +// prepareRequestBody ensures the request body can be read multiple times. +func (f *Filter) prepareRequestBody(hc *contexthttp.HttpContext) error { + if hc.Request.Body == nil || hc.Request.GetBody != nil { + return nil // Nothing to do + } + + bodyBytes, err := io.ReadAll(hc.Request.Body) + if err != nil { + return err + } + hc.Request.Body.Close() // Close the original body + + // Set the body to a new reader and provide a function to get a new reader for later reads + hc.Request.Body = io.NopCloser(bytes.NewReader(bodyBytes)) + hc.Request.GetBody = func() (io.ReadCloser, error) { + return io.NopCloser(bytes.NewReader(bodyBytes)), nil + } + return nil +} + +// assembleRequest creates a new http.Request for a specific endpoint. +func (f *Filter) assembleRequest(endpoint *model.Endpoint, r *http.Request) (*http.Request, error) { + // Reset the body to the beginning for each new request attempt + if r.GetBody != nil { + var err error + r.Body, err = r.GetBody() + if err != nil { + return nil, err + } + } + + parsedURL := url.URL{ + Host: endpoint.Address.GetAddress(), + Scheme: f.scheme, + Path: r.URL.Path, + RawQuery: r.URL.RawQuery, + } + + req, err := http.NewRequest(r.Method, parsedURL.String(), r.Body) + if err != nil { + return nil, err + } + // Copy headers from original request + req.Header = r.Header + + return req, nil +} - r := hc.Request - defer r.Body.Close() +// Strategy is a stateless executor that orchestrates the request lifecycle. +type Strategy struct{} +// Execute orchestrates the request lifecycle using dynamic policies from endpoints. +func (s *Strategy) Execute(executor *RequestExecutor) (*http.Response, error) { var ( - req *http.Request resp *http.Response err error ) - if hc.Request.Body != nil && hc.Request.GetBody == nil { - bodyBytes, err := io.ReadAll(hc.Request.Body) - hc.Request.Body.Close() + // 1. Pick initial endpoint from the cluster + endpoint := executor.clusterManager.PickEndpoint(executor.clusterName, executor.hc) - if err != nil { - bt, _ := json.Marshal(contexthttp.ErrResponse{Message: fmt.Sprintf("failed to read request body: %v", err)}) - hc.SendLocalReply(http.StatusInternalServerError, bt) - return filter.Stop - } + // 2. The main fallback loop. It continues as long as we have a valid endpoint to try. + for endpoint != nil { + logger.Debugf("[dubbo-go-pixiu] client attempting endpoint [%s: %v]", endpoint.ID, endpoint.Address.GetAddress()) - hc.Request.Body = io.NopCloser(bytes.NewReader(bodyBytes)) - hc.Request.GetBody = func() (io.ReadCloser, error) { - return io.NopCloser(bytes.NewReader(bodyBytes)), nil + // 3. Dynamically load the retry policy for the current endpoint + var retryPolicy retry.Retryer + retryPolicy, err = retry.GetRetryPolicy(endpoint) + if err != nil { + logger.Errorf("could not load retry policy for endpoint %s: %v. Skipping to next endpoint.", endpoint.ID, err) + endpoint = getNextFallbackEndpoint(endpoint, executor) + continue } - } - - logger.Debugf("[dubbo-go-pixiu] client choose endpoint [%s: %v]", endpoint.ID, endpoint.Address.GetAddress()) + retryPolicy.Reset() - // make request -FALLBACK: - for { - RETRY: - for retry := uint(0); retry <= endpoint.LLMMeta.RetryTimes; retry++ { - req, err = f.assembleRequest(endpoint, r) + // 4. The retry loop for the current endpoint. + for retryPolicy.Attempt(err) { + var req *http.Request + req, err = executor.filter.assembleRequest(endpoint, executor.hc.Request) if err != nil { - logger.Warnf("[dubbo-go-pixiu] client assemble request failed: %v", err) - break RETRY + // Request assembly error is fatal for this endpoint, break retry loop to go to fallback + logger.Warnf("[dubbo-go-pixiu] failed to assemble request for endpoint [%s: %v]: %v. Skipping to next endpoint.") Review Comment: The `Warnf` call has format verbs but no arguments passed. Include `endpoint.ID`, `endpoint.Address.GetAddress()`, and `err` to match the format string. ```suggestion logger.Warnf("[dubbo-go-pixiu] failed to assemble request for endpoint [%s: %v]: %v. Skipping to next endpoint.", endpoint.ID, endpoint.Address.GetAddress(), err) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
