This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 06da7ce  fix: fix adaptive service issues (#1718)
06da7ce is described below

commit 06da7cefeb5d64251c75482697b3d74477150608
Author: Xuewei Niu <[email protected]>
AuthorDate: Tue Jan 18 18:35:32 2022 +0800

    fix: fix adaptive service issues (#1718)
    
    * fix: fix adaptive service issues
    
    * fix: add license
    
    * fix: fix logging message
---
 cluster/cluster/adaptivesvc/cluster_invoker.go | 15 ++++++++-
 cluster/utils/adaptivesvc.go                   | 46 ++++++++++++++++++++++++++
 filter/adaptivesvc/filter.go                   | 41 +++++++++++++++++------
 3 files changed, 91 insertions(+), 11 deletions(-)

diff --git a/cluster/cluster/adaptivesvc/cluster_invoker.go 
b/cluster/cluster/adaptivesvc/cluster_invoker.go
index d21c8d7..e4fb479 100644
--- a/cluster/cluster/adaptivesvc/cluster_invoker.go
+++ b/cluster/cluster/adaptivesvc/cluster_invoker.go
@@ -30,6 +30,7 @@ import (
        "dubbo.apache.org/dubbo-go/v3/cluster/cluster/base"
        "dubbo.apache.org/dubbo-go/v3/cluster/directory"
        "dubbo.apache.org/dubbo-go/v3/cluster/metrics"
+       clsutils "dubbo.apache.org/dubbo-go/v3/cluster/utils"
        "dubbo.apache.org/dubbo-go/v3/common/constant"
        "dubbo.apache.org/dubbo-go/v3/common/extension"
        "dubbo.apache.org/dubbo-go/v3/common/logger"
@@ -67,8 +68,20 @@ func (ivk *adaptiveServiceClusterInvoker) Invoke(ctx 
context.Context, invocation
        // invoke
        result := invoker.Invoke(ctx, invocation)
 
+       // if the adaptive service encounters an error, DO NOT
+       // update the metrics.
+       if clsutils.IsAdaptiveServiceFailed(result.Error()) {
+               return result
+       }
+
        // update metrics
-       remainingStr := result.Attachment(constant.AdaptiveServiceRemainingKey, 
"").(string)
+       remainingIface := 
result.Attachment(constant.AdaptiveServiceRemainingKey, "")
+       remainingStr, ok := remainingIface.(string)
+       if !ok {
+               logger.Errorf("[adasvc cluster] The %s field type of value %v 
should be string.",
+                       constant.AdaptiveServiceRemainingKey, remainingIface)
+               return result
+       }
        remaining, err := strconv.Atoi(remainingStr)
        if err != nil {
                logger.Warnf("the remaining is unexpected, we need a int type, 
but we got %s, err: %v.", remainingStr, err)
diff --git a/cluster/utils/adaptivesvc.go b/cluster/utils/adaptivesvc.go
new file mode 100644
index 0000000..e92f549
--- /dev/null
+++ b/cluster/utils/adaptivesvc.go
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+       "fmt"
+       "strings"
+)
+
+import (
+       "dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc"
+       adasvcfilter "dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc/limiter"
+)
+
+var ReachLimitationErrorString = fmt.Sprintf("%s: %s",
+       adaptivesvc.ErrAdaptiveSvcInterrupted.Error(),
+       adasvcfilter.ErrReachLimitation.Error())
+
+func DoesAdaptiveServiceReachLimitation(err error) bool {
+       if err == nil {
+               return false
+       }
+       return err.Error() == ReachLimitationErrorString
+}
+
+func IsAdaptiveServiceFailed(err error) bool {
+       if err == nil {
+               return false
+       }
+       return strings.HasPrefix(err.Error(), 
adaptivesvc.ErrAdaptiveSvcInterrupted.Error())
+}
diff --git a/filter/adaptivesvc/filter.go b/filter/adaptivesvc/filter.go
index 17472ba..1365208 100644
--- a/filter/adaptivesvc/filter.go
+++ b/filter/adaptivesvc/filter.go
@@ -20,6 +20,7 @@ package adaptivesvc
 import (
        "context"
        "fmt"
+       "strings"
        "sync"
 )
 
@@ -40,8 +41,9 @@ var (
        adaptiveServiceProviderFilterOnce sync.Once
        instance                          filter.Filter
 
-       ErrUpdaterNotFound       = fmt.Errorf("updater not found")
-       ErrUnexpectedUpdaterType = fmt.Errorf("unexpected updater type")
+       ErrAdaptiveSvcInterrupted = fmt.Errorf("adaptive service interrupted")
+       ErrUpdaterNotFound        = fmt.Errorf("updater not found")
+       ErrUnexpectedUpdaterType  = fmt.Errorf("unexpected updater type")
 )
 
 func init() {
@@ -70,47 +72,55 @@ func (f *adaptiveServiceProviderFilter) Invoke(ctx 
context.Context, invoker prot
                        // a new limiter
                        if l, err = 
limiterMapperSingleton.newAndSetMethodLimiter(invoker.GetURL(),
                                invocation.MethodName(), 
limiter.HillClimbingLimiter); err != nil {
-                               return &protocol.RPCResult{Err: err}
+                               return &protocol.RPCResult{Err: 
wrapErrAdaptiveSvcInterrupted(err)}
                        }
                } else {
                        // unexpected errors
-                       return &protocol.RPCResult{Err: err}
+                       return &protocol.RPCResult{Err: 
wrapErrAdaptiveSvcInterrupted(err)}
                }
        }
 
        updater, err := l.Acquire()
        if err != nil {
-               return &protocol.RPCResult{Err: err}
+               return &protocol.RPCResult{Err: 
wrapErrAdaptiveSvcInterrupted(err)}
        }
 
-       invocation.Attributes()[constant.AdaptiveServiceUpdaterKey] = updater
-
+       invocation.SetAttribute(constant.AdaptiveServiceUpdaterKey, updater)
        return invoker.Invoke(ctx, invocation)
 }
 
 func (f *adaptiveServiceProviderFilter) OnResponse(_ context.Context, result 
protocol.Result, invoker protocol.Invoker,
        invocation protocol.Invocation) protocol.Result {
+
+       if isErrAdaptiveSvcInterrupted(result.Error()) {
+               // If the Invoke method of the adaptiveServiceProviderFilter 
returns an error,
+               // the OnResponse of the adaptiveServiceProviderFilter should 
not be performed.
+               return result
+       }
+
        // get updater from the attributes
        updaterIface, _ := 
invocation.GetAttribute(constant.AdaptiveServiceUpdaterKey)
        if updaterIface == nil {
+               logger.Errorf("[adasvc filter] The updater is not found on the 
attributes: %#v",
+                       invocation.Attributes())
                return &protocol.RPCResult{Err: ErrUpdaterNotFound}
        }
        updater, ok := updaterIface.(limiter.Updater)
        if !ok {
+               logger.Errorf("[adasvc filter] The type of the updater is not 
unexpected, we got %#v", updaterIface)
                return &protocol.RPCResult{Err: ErrUnexpectedUpdaterType}
        }
 
        err := updater.DoUpdate()
        if err != nil {
-               // DoUpdate was failed, but the invocation is not failed.
-               // Printing the error to logs is better than returning a
-               // result with an error.
                logger.Errorf("[adasvc filter] The DoUpdate method was failed, 
err: %s.", err)
+               return &protocol.RPCResult{Err: err}
        }
 
        // get limiter for the mapper
        l, err := limiterMapperSingleton.getMethodLimiter(invoker.GetURL(), 
invocation.MethodName())
        if err != nil {
+               logger.Errorf("[adasvc filter] The method limiter for \"%s\" is 
not found.", invocation.MethodName())
                return &protocol.RPCResult{Err: err}
        }
 
@@ -123,3 +133,14 @@ func (f *adaptiveServiceProviderFilter) OnResponse(_ 
context.Context, result pro
 
        return result
 }
+
+func wrapErrAdaptiveSvcInterrupted(customizedErr interface{}) error {
+       return fmt.Errorf("%w: %v", ErrAdaptiveSvcInterrupted, customizedErr)
+}
+
+func isErrAdaptiveSvcInterrupted(err error) bool {
+       if err == nil {
+               return false
+       }
+       return strings.HasPrefix(err.Error(), ErrAdaptiveSvcInterrupted.Error())
+}

Reply via email to