Similarityoung commented on code in PR #692:
URL: https://github.com/apache/dubbo-go-pixiu/pull/692#discussion_r2259251008


##########
pkg/cluster/retry/countbased/count_based.go:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package countbased
+
+import (
+       "fmt"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/cluster/retry"
+       "github.com/apache/dubbo-go-pixiu/pkg/model"
+)
+
+func init() {
+       retry.RegisterRetryPolicy(model.RetryerCountBased, newCountBasedRetry)
+}
+
+type CountBasedRetry struct {
+       MaxAttempts uint
+       retryTimes  uint
+}
+
+func (r *CountBasedRetry) Attempt(err error) bool {
+       if r.retryTimes < r.MaxAttempts {
+               r.retryTimes++
+               return true
+       }
+       return false
+}
+
+func (r *CountBasedRetry) Reset() {
+       r.retryTimes = 0
+}
+
+func newCountBasedRetry(config map[string]any) (retry.RetryPolicy, error) {
+       timesValue, exists := config["times"]
+       if !exists {
+               return nil, fmt.Errorf("'times' field is missing in retry 
configuration")
+       }
+
+       timesUint, ok := timesValue.(int)
+       if !ok {
+               return nil, fmt.Errorf("invalid type for 
'retry.count_based.times', expected int but got %T", timesValue)
+       }
+
+       // Total attempts = 1 initial try plus number of retries.
+       return &CountBasedRetry{MaxAttempts: uint(timesUint) + 1}, nil

Review Comment:
   ```go
   timesValue, exists := config["times"]
        if !exists {
                return nil, fmt.Errorf("'times' field is missing in retry 
configuration")
        }
   ```
   maybe we can set a default value (e.g., 3)  and print warnf when 'times' 
field is missing.
   
   btw, does times require hard code?
   



##########
pkg/cluster/retry/exponentialbackoff/exponential_backoff.go:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package exponentialbackoff
+
+import (
+       "fmt"
+       "math"
+       "math/rand"
+       "time"
+)
+
+import (
+       "github.com/mitchellh/mapstructure"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/cluster/retry"
+       "github.com/apache/dubbo-go-pixiu/pkg/model"
+)
+
+func init() {
+       retry.RegisterRetryPolicy(model.RetryerExponentialBackoff, 
newExponentialBackoffRetry)
+}
+
+type ExponentialBackoffRetry struct {
+       MaxAttempts     uint
+       InitialInterval time.Duration
+       MaxInterval     time.Duration
+       Multiplier      float64
+       retryTimes      uint
+}
+
+type ExponentialBackoffConfig struct {
+       Times           uint    `mapstructure:"times" default:"3"`
+       InitialInterval string  `mapstructure:"initialInterval" default:"100ms"`
+       MaxInterval     string  `mapstructure:"maxInterval" default:"5s"`
+       Multiplier      float64 `mapstructure:"multiplier" default:"2.0"`
+}
+
+func (e *ExponentialBackoffRetry) Attempt(err error) bool {
+       if e.retryTimes >= e.MaxAttempts {
+               return false
+       }
+
+       // Don't wait before the first try
+       if e.retryTimes > 0 {
+               backoff := float64(e.InitialInterval) * math.Pow(e.Multiplier, 
float64(e.retryTimes-1))
+               cappedBackoff := time.Duration(math.Min(backoff, 
float64(e.MaxInterval)))
+               // Add jitter to prevent thundering herd
+               jitter := time.Duration(rand.Intn(100)) * time.Millisecond // 
NOSONAR
+               time.Sleep(cappedBackoff + jitter)
+       }
+
+       e.retryTimes++
+       return true
+}
+
+func (e *ExponentialBackoffRetry) Reset() {
+       e.retryTimes = 0
+}
+
+func newExponentialBackoffRetry(config map[string]any) (retry.RetryPolicy, 
error) {
+       var cfg ExponentialBackoffConfig
+       if err := mapstructure.Decode(config, &cfg); err != nil {

Review Comment:
   `mapstructure.Decode` doesn't seem to process default tags on its own, maybe 
u can add `defaults.Set(f.cfg)`



##########
pkg/cluster/retry/retry.go:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package retry
+
+import (
+       "fmt"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+       "github.com/apache/dubbo-go-pixiu/pkg/model"
+)
+
+type (
+       // RetryPolicy defines the interface for retry logic.
+       RetryPolicy interface {
+               // Attempt checks if a retry should be performed and 
potentially waits.
+               // It returns true if the request should be attempted, false 
otherwise.
+               // The `err` parameter can be used for policies that act on 
specific errors.
+               Attempt(err error) bool

Review Comment:
   i do not find the usage of err, do we need it?



##########
pkg/filter/llm/proxy/filter.go:
##########
@@ -48,188 +49,271 @@ func init() {
 }
 
 type (
-       // Plugin is http filter plugin.
-       Plugin struct {
-       }
-       // FilterFactory is http filter instance
+       // Plugin is the main plugin entrypoint.
+       Plugin struct{}
+
+       // FilterFactory creates filter instances.
        FilterFactory struct {
                cfg    *Config
                client http.Client
        }
+
+       // Filter is the processing entity for each request.
        Filter struct {
-               client http.Client
-               scheme string
+               client         http.Client
+               scheme         string
+               strategy       *Strategy
+               clusterManager *server.ClusterManager
        }
-       // Config describe the config of FilterFactory
+
+       // Config describes the top-level configuration for the filter.
+       // Note: Strategy-specific configurations are now defined on the 
endpoints.
        Config struct {
                Timeout             time.Duration `yaml:"timeout" 
json:"timeout,omitempty"`
                MaxIdleConns        int           `yaml:"maxIdleConns" 
json:"maxIdleConns,omitempty"`
                MaxIdleConnsPerHost int           `yaml:"maxIdleConnsPerHost" 
json:"maxIdleConnsPerHost,omitempty"`
                MaxConnsPerHost     int           `yaml:"maxConnsPerHost" 
json:"maxConnsPerHost,omitempty"`
                Scheme              string        `yaml:"scheme" 
json:"scheme,omitempty" default:"http"`
        }
+
+       RequestExecutor struct {
+               hc             *contexthttp.HttpContext
+               filter         *Filter
+               clusterName    string
+               clusterManager *server.ClusterManager
+       }
 )
 
+// Kind returns the unique name of this filter.
 func (p *Plugin) Kind() string {
        return Kind
 }
 
+// CreateFilterFactory creates a new factory instance for this filter.
 func (p *Plugin) CreateFilterFactory() (filter.HttpFilterFactory, error) {
        return &FilterFactory{cfg: &Config{}}, nil
 }
 
+// Config returns the configuration struct for the factory.
 func (factory *FilterFactory) Config() any {
        return factory.cfg
 }
 
+// Apply initializes the factory from its configuration.
 func (factory *FilterFactory) Apply() error {
        scheme := strings.TrimSpace(strings.ToLower(factory.cfg.Scheme))
-
        if scheme != "http" && scheme != "https" {
                return fmt.Errorf("%s: scheme must be http or https", Kind)
        }
-
        factory.cfg.Scheme = scheme
 
        cfg := factory.cfg
-       client := http.Client{
+       factory.client = http.Client{
                Timeout: cfg.Timeout,

Review Comment:
   nice try, could you also modify this part in 
`pkg/filter/http/httpproxy/routerfilter.go` to this more elegant way?



##########
pkg/filter/llm/proxy/filter.go:
##########
@@ -48,188 +49,271 @@ func init() {
 }
 
 type (
-       // Plugin is http filter plugin.
-       Plugin struct {
-       }
-       // FilterFactory is http filter instance
+       // Plugin is the main plugin entrypoint.
+       Plugin struct{}
+
+       // FilterFactory creates filter instances.
        FilterFactory struct {
                cfg    *Config
                client http.Client
        }
+
+       // Filter is the processing entity for each request.
        Filter struct {
-               client http.Client
-               scheme string
+               client         http.Client
+               scheme         string
+               strategy       *Strategy
+               clusterManager *server.ClusterManager
        }
-       // Config describe the config of FilterFactory
+
+       // Config describes the top-level configuration for the filter.
+       // Note: Strategy-specific configurations are now defined on the 
endpoints.
        Config struct {
                Timeout             time.Duration `yaml:"timeout" 
json:"timeout,omitempty"`
                MaxIdleConns        int           `yaml:"maxIdleConns" 
json:"maxIdleConns,omitempty"`
                MaxIdleConnsPerHost int           `yaml:"maxIdleConnsPerHost" 
json:"maxIdleConnsPerHost,omitempty"`
                MaxConnsPerHost     int           `yaml:"maxConnsPerHost" 
json:"maxConnsPerHost,omitempty"`
                Scheme              string        `yaml:"scheme" 
json:"scheme,omitempty" default:"http"`
        }
+
+       RequestExecutor struct {
+               hc             *contexthttp.HttpContext
+               filter         *Filter
+               clusterName    string
+               clusterManager *server.ClusterManager
+       }
 )
 
+// Kind returns the unique name of this filter.
 func (p *Plugin) Kind() string {
        return Kind
 }
 
+// CreateFilterFactory creates a new factory instance for this filter.
 func (p *Plugin) CreateFilterFactory() (filter.HttpFilterFactory, error) {
        return &FilterFactory{cfg: &Config{}}, nil
 }
 
+// Config returns the configuration struct for the factory.
 func (factory *FilterFactory) Config() any {
        return factory.cfg
 }
 
+// Apply initializes the factory from its configuration.
 func (factory *FilterFactory) Apply() error {
        scheme := strings.TrimSpace(strings.ToLower(factory.cfg.Scheme))
-
        if scheme != "http" && scheme != "https" {
                return fmt.Errorf("%s: scheme must be http or https", Kind)
        }
-
        factory.cfg.Scheme = scheme
 
        cfg := factory.cfg
-       client := http.Client{
+       factory.client = http.Client{
                Timeout: cfg.Timeout,
-               Transport: http.RoundTripper(&http.Transport{
+               Transport: &http.Transport{
                        MaxIdleConns:        cfg.MaxIdleConns,
                        MaxIdleConnsPerHost: cfg.MaxIdleConnsPerHost,
                        MaxConnsPerHost:     cfg.MaxConnsPerHost,
-               }),
+               },
        }
-       factory.client = client
        return nil
 }
 
+// PrepareFilterChain creates a new Filter instance for a request chain.
 func (factory *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext, 
chain filter.FilterChain) error {
-       //reuse http client
-       f := &Filter{factory.client, factory.cfg.Scheme}
+       f := &Filter{
+               client:         factory.client,
+               scheme:         factory.cfg.Scheme,
+               strategy:       &Strategy{},
+               clusterManager: server.GetClusterManager(),
+       }
        chain.AppendDecodeFilters(f)
        return nil
 }
 
+// Decode is the main entry point for processing an incoming request.
 func (f *Filter) Decode(hc *contexthttp.HttpContext) filter.FilterStatus {
        rEntry := hc.GetRouteEntry()
        if rEntry == nil {
-               bt, _ := json.Marshal(contexthttp.ErrResponse{Message: "no 
route entry"})
-               hc.SendLocalReply(http.StatusBadRequest, bt)
+               sendJSONError(hc, http.StatusBadRequest, "no route entry found 
for request")
                return filter.Stop
        }
-
        logger.Debugf("[dubbo-go-pixiu] client choose endpoint from cluster: 
%v", rEntry.Cluster)
 
-       var (
-               clusterName    = rEntry.Cluster
-               clusterManager = server.GetClusterManager()
-               endpoint       = clusterManager.PickEndpoint(clusterName, hc)
-       )
-
-       if endpoint == nil {
-               logger.Debugf("[dubbo-go-pixiu] cluster not found endpoint")
-               bt, _ := json.Marshal(contexthttp.ErrResponse{Message: "cluster 
not found endpoint"})
-               hc.SendLocalReply(http.StatusServiceUnavailable, bt)
+       // Ensure the request body can be re-read for retries
+       if err := f.prepareRequestBody(hc); err != nil {
+               sendJSONError(hc, http.StatusInternalServerError, 
fmt.Sprintf("failed to read request body: %v", err))
                return filter.Stop
        }
+       defer hc.Request.Body.Close()

Review Comment:
   in prepareRequestBody(line 192) u close the original body, why do u close 
this again.
   
   but it is generally considered good practice to keep it to Defensive 
Programming andRobustness



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to