This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6efc27c5c fix(filter):solve side effects caused by 3184 refactor 
(#3230)
6efc27c5c is described below

commit 6efc27c5ce47c83c2b3a07dc707e17f29504ad83
Author: 花国栋 <[email protected]>
AuthorDate: Wed Mar 4 02:43:45 2026 +0800

    fix(filter):solve side effects caused by 3184 refactor (#3230)
    
    * fix(filter):solve side effects caused by 3184 refactor
    
    * fix(filter): switch URL storage from attachment to attribute
---
 filter/active/filter.go      | 36 +++++++++++++++++++++++++++++++-----
 filter/active/filter_test.go | 32 ++++++++++++++++++--------------
 2 files changed, 49 insertions(+), 19 deletions(-)

diff --git a/filter/active/filter.go b/filter/active/filter.go
index ab070fbf5..7e6c17e63 100644
--- a/filter/active/filter.go
+++ b/filter/active/filter.go
@@ -28,6 +28,7 @@ import (
 )
 
 import (
+       "dubbo.apache.org/dubbo-go/v3/common"
        "dubbo.apache.org/dubbo-go/v3/common/constant"
        "dubbo.apache.org/dubbo-go/v3/common/extension"
        "dubbo.apache.org/dubbo-go/v3/filter"
@@ -38,6 +39,7 @@ import (
 
 const (
        dubboInvokeStartTime = "dubboInvokeStartTime"
+       dubboInvokeURL       = "dubboInvokeURL"
 )
 
 var (
@@ -63,22 +65,46 @@ func newActiveFilter() filter.Filter {
 
 // Invoke starts to record the requests status
 func (f *activeFilter) Invoke(ctx context.Context, invoker base.Invoker, inv 
base.Invocation) result.Result {
-       inv.(*invocation.RPCInvocation).SetAttachment(dubboInvokeStartTime, 
strconv.FormatInt(base.CurrentTimeMillis(), 10))
-       base.BeginCount(invoker.GetURL(), inv.MethodName())
+       rpcInv := inv.(*invocation.RPCInvocation)
+
+       // Record the start time
+       rpcInv.SetAttachment(dubboInvokeStartTime, 
strconv.FormatInt(base.CurrentTimeMillis(), 10))
+
+       // Cache the URL object to ensure BeginCount and EndCount use the same 
URL instance
+       // This prevents statistics inconsistency when the invoker is destroyed 
concurrently
+       url := invoker.GetURL()
+       rpcInv.SetAttribute(dubboInvokeURL, url)
+       base.BeginCount(url, inv.MethodName())
        return invoker.Invoke(ctx, inv)
 }
 
 // OnResponse update the active count base on the request result.
 func (f *activeFilter) OnResponse(ctx context.Context, result result.Result, 
invoker base.Invoker, inv base.Invocation) result.Result {
-       startTime, err := 
strconv.ParseInt(inv.(*invocation.RPCInvocation).GetAttachmentWithDefaultValue(dubboInvokeStartTime,
 "0"), 10, 64)
+       rpcInv := inv.(*invocation.RPCInvocation)
+
+       /// Retrieve the cached URL from Invoke phase with a nil default value
+       rawUrl := rpcInv.GetAttributeWithDefaultValue(dubboInvokeURL, nil)
+
+       // Safely assert the type. If rawUrl is nil, ok will be false and panic 
is avoided.
+       url, ok := rawUrl.(*common.URL)
+
+       // If the URL is missing or invalid, it means the Invoke phase was 
likely interrupted.
+       // Skip EndCount to prevent statistic inconsistency or panics.
+       if !ok || url == nil {
+               logger.Warnf("activeFilter cannot get cached URL from 
attribute, skip EndCount. Invoker may not have passed Invoke phase.")
+               return result
+       }
+
+       startTime, err := 
strconv.ParseInt(rpcInv.GetAttachmentWithDefaultValue(dubboInvokeStartTime, 
"0"), 10, 64)
        if err != nil {
                result.SetError(err)
                logger.Errorf("parse dubbo_invoke_start_time to int64 failed")
                // When err is not nil, use default elapsed value of 1
-               base.EndCount(invoker.GetURL(), inv.MethodName(), 1, false)
+               base.EndCount(url, inv.MethodName(), 1, false)
                return result
        }
+
        elapsed := base.CurrentTimeMillis() - startTime
-       base.EndCount(invoker.GetURL(), inv.MethodName(), elapsed, 
result.Error() == nil)
+       base.EndCount(url, inv.MethodName(), elapsed, result.Error() == nil)
        return result
 }
diff --git a/filter/active/filter_test.go b/filter/active/filter_test.go
index f70bee173..b0a34b804 100644
--- a/filter/active/filter_test.go
+++ b/filter/active/filter_test.go
@@ -52,20 +52,24 @@ func TestFilterInvoke(t *testing.T) {
        invoker.EXPECT().GetURL().Return(url).Times(1)
        filter.Invoke(context.Background(), invoker, invoc)
        assert.NotEmpty(t, 
invoc.GetAttachmentWithDefaultValue(dubboInvokeStartTime, ""))
+       urlFromAttribute, ok := invoc.GetAttribute(dubboInvokeURL)
+       assert.True(t, ok, "dubboInvokeURL should be cached in attribute")
+       assert.Equal(t, url.Key(), urlFromAttribute.(*common.URL).Key(), 
"Cached URL should match original URL")
+
 }
 
 func TestFilterOnResponse(t *testing.T) {
        c := base.CurrentTimeMillis()
        elapsed := 100
+       url, _ := 
common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.alibaba.user.UserProvider", 
constant.LocalHostValue, constant.DefaultPort))
        invoc := invocation.NewRPCInvocation("test", []any{"OK"}, 
map[string]any{
                dubboInvokeStartTime: strconv.FormatInt(c-int64(elapsed), 10),
        })
-       url, _ := 
common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", 
constant.LocalHostValue, constant.DefaultPort))
+       invoc.SetAttribute(dubboInvokeURL, url)
        filter := activeFilter{}
        ctrl := gomock.NewController(t)
        defer ctrl.Finish()
        invoker := mock.NewMockInvoker(ctrl)
-       invoker.EXPECT().GetURL().Return(url).Times(1)
        rpcResult := &result.RPCResult{
                Err: errors.New("test"),
        }
@@ -88,20 +92,20 @@ func TestFilterOnResponse(t *testing.T) {
 func TestFilterOnResponseWithDefer(t *testing.T) {
        base.CleanAllStatus()
 
-       // Test scenario 1: dubboInvokeStartTime is parsed successfully and the 
result is correct.
+       // Test scenario 1: dubboInvokeStartTime is parsed successfully and 
result is correct.
        t.Run("ParseSuccessAndResultSuccess", func(t *testing.T) {
                defer base.CleanAllStatus()
 
                c := base.CurrentTimeMillis()
+               url, _ := 
common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.alibaba.user.UserProvider", 
constant.LocalHostValue, constant.DefaultPort))
                invoc := invocation.NewRPCInvocation("test1", []any{"OK"}, 
map[string]any{
                        dubboInvokeStartTime: strconv.FormatInt(c, 10),
                })
-               url, _ := 
common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", 
constant.LocalHostValue, constant.DefaultPort))
+               invoc.SetAttribute(dubboInvokeURL, url)
                filter := activeFilter{}
                ctrl := gomock.NewController(t)
                defer ctrl.Finish()
                invoker := mock.NewMockInvoker(ctrl)
-               invoker.EXPECT().GetURL().Return(url).Times(1)
                rpcResult := &result.RPCResult{}
 
                filter.OnResponse(context.TODO(), rpcResult, invoker, invoc)
@@ -119,20 +123,20 @@ func TestFilterOnResponseWithDefer(t *testing.T) {
                assert.GreaterOrEqual(t, urlStatus.GetTotalElapsed(), int64(0))
        })
 
-       // Test scenario 2: dubboInvokeStartTime is parsed successfully, but 
the result is incorrect
+       // Test scenario 2: dubboInvokeStartTime is parsed successfully, but 
result is incorrect
        t.Run("ParseSuccessAndResultFailed", func(t *testing.T) {
                defer base.CleanAllStatus()
 
                c := base.CurrentTimeMillis()
+               url, _ := 
common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", 
constant.LocalHostValue, constant.DefaultPort))
                invoc := invocation.NewRPCInvocation("test2", []any{"OK"}, 
map[string]any{
                        dubboInvokeStartTime: strconv.FormatInt(c, 10),
                })
-               url, _ := 
common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", 
constant.LocalHostValue, constant.DefaultPort))
+               invoc.SetAttribute(dubboInvokeURL, url)
                filter := activeFilter{}
                ctrl := gomock.NewController(t)
                defer ctrl.Finish()
                invoker := mock.NewMockInvoker(ctrl)
-               invoker.EXPECT().GetURL().Return(url).Times(1)
                rpcResult := &result.RPCResult{
                        Err: errors.New("test error"),
                }
@@ -158,15 +162,15 @@ func TestFilterOnResponseWithDefer(t *testing.T) {
        t.Run("ParseFailedWithInvalidString", func(t *testing.T) {
                defer base.CleanAllStatus()
 
+               url, _ := 
common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", 
constant.LocalHostValue, constant.DefaultPort))
                invoc := invocation.NewRPCInvocation("test3", []any{"OK"}, 
map[string]any{
                        dubboInvokeStartTime: "invalid-time",
                })
-               url, _ := 
common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", 
constant.LocalHostValue, constant.DefaultPort))
+               invoc.SetAttribute(dubboInvokeURL, url)
                filter := activeFilter{}
                ctrl := gomock.NewController(t)
                defer ctrl.Finish()
                invoker := mock.NewMockInvoker(ctrl)
-               invoker.EXPECT().GetURL().Return(url).Times(1)
                rpcResult := &result.RPCResult{}
 
                result := filter.OnResponse(context.TODO(), rpcResult, invoker, 
invoc)
@@ -175,7 +179,7 @@ func TestFilterOnResponseWithDefer(t *testing.T) {
                methodStatus := base.GetMethodStatus(url, "test3")
                urlStatus := base.GetURLStatus(url)
 
-               // Verification count and status - should use the default 
duration of 1 and be marked as failed
+               // Verification count and status - should use default duration 
of 1 and be marked as failed
                assert.Equal(t, int32(1), methodStatus.GetTotal())
                assert.Equal(t, int32(1), urlStatus.GetTotal())
                assert.Equal(t, int32(1), methodStatus.GetFailed())
@@ -186,17 +190,17 @@ func TestFilterOnResponseWithDefer(t *testing.T) {
                assert.GreaterOrEqual(t, urlStatus.GetFailedElapsed(), int64(1))
        })
 
-       // Test scenario 4: dubboInvokeStartTime does not exist (use the 
default value 0)
+       // Test scenario 4: dubboInvokeStartTime does not exist (use default 
value 0)
        t.Run("ParseFailedWithDefaultValue", func(t *testing.T) {
                defer base.CleanAllStatus()
 
-               invoc := invocation.NewRPCInvocation("test4", []any{"OK"}, 
make(map[string]any))
                url, _ := 
common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", 
constant.LocalHostValue, constant.DefaultPort))
+               invoc := invocation.NewRPCInvocation("test4", []any{"OK"}, 
map[string]any{})
+               invoc.SetAttribute(dubboInvokeURL, url)
                filter := activeFilter{}
                ctrl := gomock.NewController(t)
                defer ctrl.Finish()
                invoker := mock.NewMockInvoker(ctrl)
-               invoker.EXPECT().GetURL().Return(url).Times(1)
                rpcResult := &result.RPCResult{}
 
                filter.OnResponse(context.TODO(), rpcResult, invoker, invoc)

Reply via email to