Copilot commented on code in PR #692:
URL: https://github.com/apache/dubbo-go-pixiu/pull/692#discussion_r2187157584


##########
pkg/cluster/retry/count_based/count_based.go:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package count_based
+
+import (
+       "fmt"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/cluster/retry"
+       "github.com/apache/dubbo-go-pixiu/pkg/model"
+)
+
+func init() {
+       retry.RegisterRetryPolicy(model.RetryerCountBased, newCountBasedRetry)
+}
+
+type CountBasedRetry struct {
+       MaxAttempts uint
+       currentTry  uint
+}
+
+func (r *CountBasedRetry) Attempt(err error) bool {
+       if r.currentTry < r.MaxAttempts {
+               r.currentTry++
+               return true
+       }
+       return false
+}
+
+func (r *CountBasedRetry) Reset() {
+       r.currentTry = 0
+}
+
+func newCountBasedRetry(config map[string]any) (retry.Retryer, error) {
+       timesValue, exists := config["times"]
+       if !exists {
+               return nil, fmt.Errorf("'times' field is missing in retry 
configuration")
+       }
+
+       timesUint, ok := timesValue.(int)
+       if !ok {
+               return nil, fmt.Errorf("invalid type for 
'retry.count_based.times', expected int but got %T", timesValue)
+       }
+
+       // Total attempts = 1 initial try plus number of retries.
+       return &CountBasedRetry{MaxAttempts: uint(timesUint) + 1}, nil

Review Comment:
   Directly asserting `config["times"]` to `int` may fail if YAML/JSON decoding 
yields `float64`. Consider supporting both `int` and `float64` (and other 
numeric types) before converting to `uint`.
   ```suggestion
        var timesUint uint
        switch v := timesValue.(type) {
        case int:
                if v < 0 {
                        return nil, fmt.Errorf("'retry.count_based.times' must 
be non-negative, got %d", v)
                }
                timesUint = uint(v)
        case float64:
                if v < 0 || v != float64(uint(v)) {
                        return nil, fmt.Errorf("'retry.count_based.times' must 
be a non-negative integer, got %f", v)
                }
                timesUint = uint(v)
        default:
                return nil, fmt.Errorf("invalid type for 
'retry.count_based.times', expected int or float64 but got %T", timesValue)
        }
   
        // Total attempts = 1 initial try plus number of retries.
        return &CountBasedRetry{MaxAttempts: timesUint + 1}, nil
   ```



##########
pkg/filter/llm/proxy/filter.go:
##########
@@ -48,188 +49,271 @@ func init() {
 }
 
 type (
-       // Plugin is http filter plugin.
-       Plugin struct {
-       }
-       // FilterFactory is http filter instance
+       // Plugin is the main plugin entrypoint.
+       Plugin struct{}
+
+       // FilterFactory creates filter instances.
        FilterFactory struct {
                cfg    *Config
                client http.Client
        }
+
+       // Filter is the processing entity for each request.
        Filter struct {
-               client http.Client
-               scheme string
+               client         http.Client
+               scheme         string
+               strategy       *Strategy
+               clusterManager *server.ClusterManager
        }
-       // Config describe the config of FilterFactory
+
+       // Config describes the top-level configuration for the filter.
+       // Note: Strategy-specific configurations are now defined on the 
endpoints.
        Config struct {
                Timeout             time.Duration `yaml:"timeout" 
json:"timeout,omitempty"`
                MaxIdleConns        int           `yaml:"maxIdleConns" 
json:"maxIdleConns,omitempty"`
                MaxIdleConnsPerHost int           `yaml:"maxIdleConnsPerHost" 
json:"maxIdleConnsPerHost,omitempty"`
                MaxConnsPerHost     int           `yaml:"maxConnsPerHost" 
json:"maxConnsPerHost,omitempty"`
                Scheme              string        `yaml:"scheme" 
json:"scheme,omitempty" default:"http"`
        }
+
+       RequestExecutor struct {
+               hc             *contexthttp.HttpContext
+               filter         *Filter
+               clusterName    string
+               clusterManager *server.ClusterManager
+       }
 )
 
+// Kind returns the unique name of this filter.
 func (p *Plugin) Kind() string {
        return Kind
 }
 
+// CreateFilterFactory creates a new factory instance for this filter.
 func (p *Plugin) CreateFilterFactory() (filter.HttpFilterFactory, error) {
        return &FilterFactory{cfg: &Config{}}, nil
 }
 
+// Config returns the configuration struct for the factory.
 func (factory *FilterFactory) Config() any {
        return factory.cfg
 }
 
+// Apply initializes the factory from its configuration.
 func (factory *FilterFactory) Apply() error {
        scheme := strings.TrimSpace(strings.ToLower(factory.cfg.Scheme))
-
        if scheme != "http" && scheme != "https" {
                return fmt.Errorf("%s: scheme must be http or https", Kind)
        }
-
        factory.cfg.Scheme = scheme
 
        cfg := factory.cfg
-       client := http.Client{
+       factory.client = http.Client{
                Timeout: cfg.Timeout,
-               Transport: http.RoundTripper(&http.Transport{
+               Transport: &http.Transport{
                        MaxIdleConns:        cfg.MaxIdleConns,
                        MaxIdleConnsPerHost: cfg.MaxIdleConnsPerHost,
                        MaxConnsPerHost:     cfg.MaxConnsPerHost,
-               }),
+               },
        }
-       factory.client = client
        return nil
 }
 
+// PrepareFilterChain creates a new Filter instance for a request chain.
 func (factory *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext, 
chain filter.FilterChain) error {
-       //reuse http client
-       f := &Filter{factory.client, factory.cfg.Scheme}
+       f := &Filter{
+               client:         factory.client,
+               scheme:         factory.cfg.Scheme,
+               strategy:       &Strategy{},
+               clusterManager: server.GetClusterManager(),
+       }
        chain.AppendDecodeFilters(f)
        return nil
 }
 
+// Decode is the main entry point for processing an incoming request.
 func (f *Filter) Decode(hc *contexthttp.HttpContext) filter.FilterStatus {
        rEntry := hc.GetRouteEntry()
        if rEntry == nil {
-               bt, _ := json.Marshal(contexthttp.ErrResponse{Message: "no 
route entry"})
-               hc.SendLocalReply(http.StatusBadRequest, bt)
+               sendJSONError(hc, http.StatusBadRequest, "no route entry found 
for request")
                return filter.Stop
        }
-
        logger.Debugf("[dubbo-go-pixiu] client choose endpoint from cluster: 
%v", rEntry.Cluster)
 
-       var (
-               clusterName    = rEntry.Cluster
-               clusterManager = server.GetClusterManager()
-               endpoint       = clusterManager.PickEndpoint(clusterName, hc)
-       )
-
-       if endpoint == nil {
-               logger.Debugf("[dubbo-go-pixiu] cluster not found endpoint")
-               bt, _ := json.Marshal(contexthttp.ErrResponse{Message: "cluster 
not found endpoint"})
-               hc.SendLocalReply(http.StatusServiceUnavailable, bt)
+       // Ensure the request body can be re-read for retries
+       if err := f.prepareRequestBody(hc); err != nil {
+               sendJSONError(hc, http.StatusInternalServerError, 
fmt.Sprintf("failed to read request body: %v", err))
                return filter.Stop
        }
+       defer hc.Request.Body.Close()
+
+       // Set up the context for our strategy executor
+       executor := &RequestExecutor{
+               hc:             hc,
+               filter:         f,
+               clusterName:    rEntry.Cluster,
+               clusterManager: f.clusterManager,
+       }
+
+       // Delegate the complex execution logic to the strategy
+       resp, err := f.strategy.Execute(executor)
+
+       // Handle the outcome
+       if err != nil {
+               logger.Infof("[dubbo-go-pixiu] request execution failed after 
all attempts: %v", err)
+               var urlErr *url.Error
+               if errors.As(err, &urlErr) && urlErr.Timeout() {
+                       sendJSONError(hc, http.StatusGatewayTimeout, 
err.Error())
+               } else if resp == nil {
+                       // This handles errors where no response was ever 
received (e.g., DNS error, connection refused)
+                       sendJSONError(hc, http.StatusServiceUnavailable, 
err.Error())
+               } else {
+                       // A response was received, but it was a failure. Pass 
it along.
+                       hc.SourceResp = resp
+               }
+               return filter.Continue // Let the response writer handle the 
failed response
+       }
+
+       logger.Debugf("[dubbo-go-pixiu] client call successful, resp status: 
%s", resp.Status)
+       hc.SourceResp = resp
+       return filter.Continue
+}
+
+// prepareRequestBody ensures the request body can be read multiple times.
+func (f *Filter) prepareRequestBody(hc *contexthttp.HttpContext) error {
+       if hc.Request.Body == nil || hc.Request.GetBody != nil {
+               return nil // Nothing to do
+       }
+
+       bodyBytes, err := io.ReadAll(hc.Request.Body)
+       if err != nil {
+               return err
+       }
+       hc.Request.Body.Close() // Close the original body
+
+       // Set the body to a new reader and provide a function to get a new 
reader for later reads
+       hc.Request.Body = io.NopCloser(bytes.NewReader(bodyBytes))
+       hc.Request.GetBody = func() (io.ReadCloser, error) {
+               return io.NopCloser(bytes.NewReader(bodyBytes)), nil
+       }
+       return nil
+}
+
+// assembleRequest creates a new http.Request for a specific endpoint.
+func (f *Filter) assembleRequest(endpoint *model.Endpoint, r *http.Request) 
(*http.Request, error) {
+       // Reset the body to the beginning for each new request attempt
+       if r.GetBody != nil {
+               var err error
+               r.Body, err = r.GetBody()
+               if err != nil {
+                       return nil, err
+               }
+       }
+
+       parsedURL := url.URL{
+               Host:     endpoint.Address.GetAddress(),
+               Scheme:   f.scheme,
+               Path:     r.URL.Path,
+               RawQuery: r.URL.RawQuery,
+       }
+
+       req, err := http.NewRequest(r.Method, parsedURL.String(), r.Body)
+       if err != nil {
+               return nil, err
+       }
+       // Copy headers from original request
+       req.Header = r.Header
+
+       return req, nil
+}
 
-       r := hc.Request
-       defer r.Body.Close()
+// Strategy is a stateless executor that orchestrates the request lifecycle.
+type Strategy struct{}
 
+// Execute orchestrates the request lifecycle using dynamic policies from 
endpoints.
+func (s *Strategy) Execute(executor *RequestExecutor) (*http.Response, error) {
        var (
-               req  *http.Request
                resp *http.Response
                err  error
        )
 
-       if hc.Request.Body != nil && hc.Request.GetBody == nil {
-               bodyBytes, err := io.ReadAll(hc.Request.Body)
-               hc.Request.Body.Close()
+       // 1. Pick initial endpoint from the cluster
+       endpoint := executor.clusterManager.PickEndpoint(executor.clusterName, 
executor.hc)
 
-               if err != nil {
-                       bt, _ := json.Marshal(contexthttp.ErrResponse{Message: 
fmt.Sprintf("failed to read request body: %v", err)})
-                       hc.SendLocalReply(http.StatusInternalServerError, bt)
-                       return filter.Stop
-               }
+       // 2. The main fallback loop. It continues as long as we have a valid 
endpoint to try.
+       for endpoint != nil {
+               logger.Debugf("[dubbo-go-pixiu] client attempting endpoint [%s: 
%v]", endpoint.ID, endpoint.Address.GetAddress())
 
-               hc.Request.Body = io.NopCloser(bytes.NewReader(bodyBytes))
-               hc.Request.GetBody = func() (io.ReadCloser, error) {
-                       return io.NopCloser(bytes.NewReader(bodyBytes)), nil
+               // 3. Dynamically load the retry policy for the current endpoint
+               var retryPolicy retry.Retryer
+               retryPolicy, err = retry.GetRetryPolicy(endpoint)
+               if err != nil {
+                       logger.Errorf("could not load retry policy for endpoint 
%s: %v. Skipping to next endpoint.", endpoint.ID, err)
+                       endpoint = getNextFallbackEndpoint(endpoint, executor)
+                       continue
                }
-       }
-
-       logger.Debugf("[dubbo-go-pixiu] client choose endpoint [%s: %v]", 
endpoint.ID, endpoint.Address.GetAddress())
+               retryPolicy.Reset()
 
-       // make request
-FALLBACK:
-       for {
-       RETRY:
-               for retry := uint(0); retry <= endpoint.LLMMeta.RetryTimes; 
retry++ {
-                       req, err = f.assembleRequest(endpoint, r)
+               // 4. The retry loop for the current endpoint.
+               for retryPolicy.Attempt(err) {
+                       var req *http.Request
+                       req, err = executor.filter.assembleRequest(endpoint, 
executor.hc.Request)
                        if err != nil {
-                               logger.Warnf("[dubbo-go-pixiu] client assemble 
request failed: %v", err)
-                               break RETRY
+                               // Request assembly error is fatal for this 
endpoint, break retry loop to go to fallback
+                               logger.Warnf("[dubbo-go-pixiu] failed to 
assemble request for endpoint [%s: %v]: %v. Skipping to next endpoint.")

Review Comment:
   The `Warnf` call has format verbs but no arguments passed. Include 
`endpoint.ID`, `endpoint.Address.GetAddress()`, and `err` to match the format 
string.
   ```suggestion
                                logger.Warnf("[dubbo-go-pixiu] failed to 
assemble request for endpoint [%s: %v]: %v. Skipping to next endpoint.", 
endpoint.ID, endpoint.Address.GetAddress(), err)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to