This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 06da7ce fix: fix adaptive service issues (#1718)
06da7ce is described below
commit 06da7cefeb5d64251c75482697b3d74477150608
Author: Xuewei Niu <[email protected]>
AuthorDate: Tue Jan 18 18:35:32 2022 +0800
fix: fix adaptive service issues (#1718)
* fix: fix adaptive service issues
* fix: add license
* fix: fix logging message
---
cluster/cluster/adaptivesvc/cluster_invoker.go | 15 ++++++++-
cluster/utils/adaptivesvc.go | 46 ++++++++++++++++++++++++++
filter/adaptivesvc/filter.go | 41 +++++++++++++++++------
3 files changed, 91 insertions(+), 11 deletions(-)
diff --git a/cluster/cluster/adaptivesvc/cluster_invoker.go
b/cluster/cluster/adaptivesvc/cluster_invoker.go
index d21c8d7..e4fb479 100644
--- a/cluster/cluster/adaptivesvc/cluster_invoker.go
+++ b/cluster/cluster/adaptivesvc/cluster_invoker.go
@@ -30,6 +30,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/cluster/cluster/base"
"dubbo.apache.org/dubbo-go/v3/cluster/directory"
"dubbo.apache.org/dubbo-go/v3/cluster/metrics"
+ clsutils "dubbo.apache.org/dubbo-go/v3/cluster/utils"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
@@ -67,8 +68,20 @@ func (ivk *adaptiveServiceClusterInvoker) Invoke(ctx
context.Context, invocation
// invoke
result := invoker.Invoke(ctx, invocation)
+ // if the adaptive service encounters an error, DO NOT
+ // update the metrics.
+ if clsutils.IsAdaptiveServiceFailed(result.Error()) {
+ return result
+ }
+
// update metrics
- remainingStr := result.Attachment(constant.AdaptiveServiceRemainingKey,
"").(string)
+ remainingIface :=
result.Attachment(constant.AdaptiveServiceRemainingKey, "")
+ remainingStr, ok := remainingIface.(string)
+ if !ok {
+ logger.Errorf("[adasvc cluster] The %s field type of value %v
should be string.",
+ constant.AdaptiveServiceRemainingKey, remainingIface)
+ return result
+ }
remaining, err := strconv.Atoi(remainingStr)
if err != nil {
logger.Warnf("the remaining is unexpected, we need a int type,
but we got %s, err: %v.", remainingStr, err)
diff --git a/cluster/utils/adaptivesvc.go b/cluster/utils/adaptivesvc.go
new file mode 100644
index 0000000..e92f549
--- /dev/null
+++ b/cluster/utils/adaptivesvc.go
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+ "fmt"
+ "strings"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc"
+ adasvcfilter "dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc/limiter"
+)
+
+var ReachLimitationErrorString = fmt.Sprintf("%s: %s",
+ adaptivesvc.ErrAdaptiveSvcInterrupted.Error(),
+ adasvcfilter.ErrReachLimitation.Error())
+
+func DoesAdaptiveServiceReachLimitation(err error) bool {
+ if err == nil {
+ return false
+ }
+ return err.Error() == ReachLimitationErrorString
+}
+
+func IsAdaptiveServiceFailed(err error) bool {
+ if err == nil {
+ return false
+ }
+ return strings.HasPrefix(err.Error(),
adaptivesvc.ErrAdaptiveSvcInterrupted.Error())
+}
diff --git a/filter/adaptivesvc/filter.go b/filter/adaptivesvc/filter.go
index 17472ba..1365208 100644
--- a/filter/adaptivesvc/filter.go
+++ b/filter/adaptivesvc/filter.go
@@ -20,6 +20,7 @@ package adaptivesvc
import (
"context"
"fmt"
+ "strings"
"sync"
)
@@ -40,8 +41,9 @@ var (
adaptiveServiceProviderFilterOnce sync.Once
instance filter.Filter
- ErrUpdaterNotFound = fmt.Errorf("updater not found")
- ErrUnexpectedUpdaterType = fmt.Errorf("unexpected updater type")
+ ErrAdaptiveSvcInterrupted = fmt.Errorf("adaptive service interrupted")
+ ErrUpdaterNotFound = fmt.Errorf("updater not found")
+ ErrUnexpectedUpdaterType = fmt.Errorf("unexpected updater type")
)
func init() {
@@ -70,47 +72,55 @@ func (f *adaptiveServiceProviderFilter) Invoke(ctx
context.Context, invoker prot
// a new limiter
if l, err =
limiterMapperSingleton.newAndSetMethodLimiter(invoker.GetURL(),
invocation.MethodName(),
limiter.HillClimbingLimiter); err != nil {
- return &protocol.RPCResult{Err: err}
+ return &protocol.RPCResult{Err:
wrapErrAdaptiveSvcInterrupted(err)}
}
} else {
// unexpected errors
- return &protocol.RPCResult{Err: err}
+ return &protocol.RPCResult{Err:
wrapErrAdaptiveSvcInterrupted(err)}
}
}
updater, err := l.Acquire()
if err != nil {
- return &protocol.RPCResult{Err: err}
+ return &protocol.RPCResult{Err:
wrapErrAdaptiveSvcInterrupted(err)}
}
- invocation.Attributes()[constant.AdaptiveServiceUpdaterKey] = updater
-
+ invocation.SetAttribute(constant.AdaptiveServiceUpdaterKey, updater)
return invoker.Invoke(ctx, invocation)
}
func (f *adaptiveServiceProviderFilter) OnResponse(_ context.Context, result
protocol.Result, invoker protocol.Invoker,
invocation protocol.Invocation) protocol.Result {
+
+ if isErrAdaptiveSvcInterrupted(result.Error()) {
+ // If the Invoke method of the adaptiveServiceProviderFilter
returns an error,
+ // the OnResponse of the adaptiveServiceProviderFilter should
not be performed.
+ return result
+ }
+
// get updater from the attributes
updaterIface, _ :=
invocation.GetAttribute(constant.AdaptiveServiceUpdaterKey)
if updaterIface == nil {
+ logger.Errorf("[adasvc filter] The updater is not found on the
attributes: %#v",
+ invocation.Attributes())
return &protocol.RPCResult{Err: ErrUpdaterNotFound}
}
updater, ok := updaterIface.(limiter.Updater)
if !ok {
+ logger.Errorf("[adasvc filter] The type of the updater is not
unexpected, we got %#v", updaterIface)
return &protocol.RPCResult{Err: ErrUnexpectedUpdaterType}
}
err := updater.DoUpdate()
if err != nil {
- // DoUpdate was failed, but the invocation is not failed.
- // Printing the error to logs is better than returning a
- // result with an error.
logger.Errorf("[adasvc filter] The DoUpdate method was failed,
err: %s.", err)
+ return &protocol.RPCResult{Err: err}
}
// get limiter for the mapper
l, err := limiterMapperSingleton.getMethodLimiter(invoker.GetURL(),
invocation.MethodName())
if err != nil {
+ logger.Errorf("[adasvc filter] The method limiter for \"%s\" is
not found.", invocation.MethodName())
return &protocol.RPCResult{Err: err}
}
@@ -123,3 +133,14 @@ func (f *adaptiveServiceProviderFilter) OnResponse(_
context.Context, result pro
return result
}
+
+func wrapErrAdaptiveSvcInterrupted(customizedErr interface{}) error {
+ return fmt.Errorf("%w: %v", ErrAdaptiveSvcInterrupted, customizedErr)
+}
+
+func isErrAdaptiveSvcInterrupted(err error) bool {
+ if err == nil {
+ return false
+ }
+ return strings.HasPrefix(err.Error(), ErrAdaptiveSvcInterrupted.Error())
+}