This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch pub/perf
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 971f583ee0dfb1ae3e3d00aacf0f260f6c443e30
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Sep 4 17:04:56 2025 +0800

    Refactor circuit breaker tests and enhance probe logic
    
    - Introduced a constant for the test node name to improve code readability 
and maintainability.
    - Updated circuit breaker tests to use the new constant, ensuring 
consistency across test cases.
---
 banyand/queue/pub/circuitbreaker_test.go | 236 ++++++++++++++++++++++++++++---
 banyand/queue/pub/client.go              |  22 ++-
 2 files changed, 235 insertions(+), 23 deletions(-)

diff --git a/banyand/queue/pub/circuitbreaker_test.go 
b/banyand/queue/pub/circuitbreaker_test.go
index 9f1976b1..73181b96 100644
--- a/banyand/queue/pub/circuitbreaker_test.go
+++ b/banyand/queue/pub/circuitbreaker_test.go
@@ -28,6 +28,8 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
+const testNodeName = "test-node"
+
 func TestCircuitBreakerStateTransitions(t *testing.T) {
        tests := []struct {
                name           string
@@ -39,7 +41,7 @@ func TestCircuitBreakerStateTransitions(t *testing.T) {
                {
                        name: "closed_to_open_after_failures",
                        setup: func(p *pub) {
-                               p.recordSuccess("test-node")
+                               p.recordSuccess(testNodeName)
                        },
                        actions: []func(*pub, string){
                                func(p *pub, node string) { 
p.recordFailure(node) },
@@ -54,7 +56,7 @@ func TestCircuitBreakerStateTransitions(t *testing.T) {
                {
                        name: "closed_remains_closed_below_threshold",
                        setup: func(p *pub) {
-                               p.recordSuccess("test-node")
+                               p.recordSuccess(testNodeName)
                        },
                        actions: []func(*pub, string){
                                func(p *pub, node string) { 
p.recordFailure(node) },
@@ -67,14 +69,14 @@ func TestCircuitBreakerStateTransitions(t *testing.T) {
                {
                        name: "open_to_half_open_after_cooldown",
                        setup: func(p *pub) {
-                               p.recordSuccess("test-node")
+                               p.recordSuccess(testNodeName)
                                // Trip the circuit breaker
                                for i := 0; i < defaultCBThreshold; i++ {
-                                       p.recordFailure("test-node")
+                                       p.recordFailure(testNodeName)
                                }
                                // Simulate cooldown period has passed
                                p.cbMu.Lock()
-                               cb := p.cbStates["test-node"]
+                               cb := p.cbStates[testNodeName]
                                cb.openTime = 
time.Now().Add(-defaultCBResetTimeout - time.Second)
                                p.cbMu.Unlock()
                        },
@@ -85,14 +87,14 @@ func TestCircuitBreakerStateTransitions(t *testing.T) {
                {
                        name: "half_open_to_closed_on_success",
                        setup: func(p *pub) {
-                               p.recordSuccess("test-node")
+                               p.recordSuccess(testNodeName)
                                // Trip the circuit breaker
                                for i := 0; i < defaultCBThreshold; i++ {
-                                       p.recordFailure("test-node")
+                                       p.recordFailure(testNodeName)
                                }
                                // Set to half-open state
                                p.cbMu.Lock()
-                               cb := p.cbStates["test-node"]
+                               cb := p.cbStates[testNodeName]
                                cb.state = StateHalfOpen
                                p.cbMu.Unlock()
                        },
@@ -105,14 +107,14 @@ func TestCircuitBreakerStateTransitions(t *testing.T) {
                {
                        name: "half_open_to_open_on_failure",
                        setup: func(p *pub) {
-                               p.recordSuccess("test-node")
+                               p.recordSuccess(testNodeName)
                                // Trip the circuit breaker
                                for i := 0; i < defaultCBThreshold; i++ {
-                                       p.recordFailure("test-node")
+                                       p.recordFailure(testNodeName)
                                }
                                // Set to half-open state
                                p.cbMu.Lock()
-                               cb := p.cbStates["test-node"]
+                               cb := p.cbStates[testNodeName]
                                cb.state = StateHalfOpen
                                p.cbMu.Unlock()
                        },
@@ -139,19 +141,19 @@ func TestCircuitBreakerStateTransitions(t *testing.T) {
 
                        // Execute actions
                        for _, action := range tt.actions {
-                               action(p, "test-node")
+                               action(p, testNodeName)
                        }
 
                        // Check final state
                        p.cbMu.RLock()
-                       cb, exists := p.cbStates["test-node"]
+                       cb, exists := p.cbStates[testNodeName]
                        p.cbMu.RUnlock()
 
                        require.True(t, exists, "circuit breaker state should 
exist")
                        assert.Equal(t, tt.expectedState, cb.state, "circuit 
breaker state mismatch")
 
                        // Check if requests are allowed
-                       allowed := p.isRequestAllowed("test-node")
+                       allowed := p.isRequestAllowed(testNodeName)
                        assert.Equal(t, tt.allowsRequests, allowed, "request 
allowance mismatch")
                })
        }
@@ -166,7 +168,7 @@ func TestCircuitBreakerConcurrency(t *testing.T) {
 
        const numGoroutines = 100
        const numOperations = 50
-       node := "test-node"
+       node := testNodeName
 
        var wg sync.WaitGroup
        wg.Add(numGoroutines * 2) // Half for success, half for failure
@@ -260,7 +262,7 @@ func TestCircuitBreakerRecoveryAfterCooldown(t *testing.T) {
                log:      logger.GetLogger("test"),
        }
 
-       node := "test-node"
+       node := testNodeName
 
        // Initialize node
        p.recordSuccess(node)
@@ -329,7 +331,7 @@ func TestCircuitBreakerFailureThresholdEdgeCase(t 
*testing.T) {
                log:      logger.GetLogger("test"),
        }
 
-       node := "test-node"
+       node := testNodeName
 
        // Initialize node
        p.recordSuccess(node)
@@ -354,3 +356,203 @@ func TestCircuitBreakerFailureThresholdEdgeCase(t 
*testing.T) {
        assert.Equal(t, defaultCBThreshold, cb.consecutiveFailures, "failure 
count should be at threshold")
        p.cbMu.RUnlock()
 }
+
+func TestCircuitBreakerSingleProbeEnforcement(t *testing.T) {
+       p := &pub{
+               cbStates: make(map[string]*circuitState),
+               cbMu:     sync.RWMutex{},
+               log:      logger.GetLogger("test"),
+       }
+
+       node := testNodeName
+
+       // Initialize and trip circuit breaker
+       p.recordSuccess(node)
+       for i := 0; i < defaultCBThreshold; i++ {
+               p.recordFailure(node)
+       }
+
+       // Verify circuit is open
+       assert.False(t, p.isRequestAllowed(node), "circuit should be open")
+
+       // Simulate cooldown period passage
+       p.cbMu.Lock()
+       cb := p.cbStates[node]
+       cb.openTime = time.Now().Add(-defaultCBResetTimeout - time.Second)
+       p.cbMu.Unlock()
+
+       // First request should transition to half-open and be allowed
+       allowed1 := p.isRequestAllowed(node)
+       assert.True(t, allowed1, "first request after cooldown should be 
allowed and transition to half-open")
+
+       // Verify state is half-open with probe in flight
+       p.cbMu.RLock()
+       assert.Equal(t, StateHalfOpen, cb.state, "circuit should be in 
half-open state")
+       assert.True(t, cb.halfOpenProbeInFlight, "probe should be marked as in 
flight")
+       p.cbMu.RUnlock()
+
+       // Second request should be denied while probe is in flight
+       allowed2 := p.isRequestAllowed(node)
+       assert.False(t, allowed2, "second request should be denied while probe 
is in flight")
+
+       // Third request should also be denied
+       allowed3 := p.isRequestAllowed(node)
+       assert.False(t, allowed3, "third request should also be denied while 
probe is in flight")
+}
+
+func TestCircuitBreakerSingleProbeSuccess(t *testing.T) {
+       p := &pub{
+               cbStates: make(map[string]*circuitState),
+               cbMu:     sync.RWMutex{},
+               log:      logger.GetLogger("test"),
+       }
+
+       node := testNodeName
+
+       // Setup half-open state with probe in flight
+       p.recordSuccess(node)
+       for i := 0; i < defaultCBThreshold; i++ {
+               p.recordFailure(node)
+       }
+
+       p.cbMu.Lock()
+       cb := p.cbStates[node]
+       cb.openTime = time.Now().Add(-defaultCBResetTimeout - time.Second)
+       p.cbMu.Unlock()
+
+       // Transition to half-open
+       allowed := p.isRequestAllowed(node)
+       assert.True(t, allowed, "first request should be allowed")
+
+       // Verify probe is in flight
+       p.cbMu.RLock()
+       assert.True(t, cb.halfOpenProbeInFlight, "probe should be in flight")
+       p.cbMu.RUnlock()
+
+       // Record success (probe succeeds)
+       p.recordSuccess(node)
+
+       // Verify circuit is closed and probe token is cleared
+       p.cbMu.RLock()
+       assert.Equal(t, StateClosed, cb.state, "circuit should be closed after 
successful probe")
+       assert.False(t, cb.halfOpenProbeInFlight, "probe token should be 
cleared")
+       p.cbMu.RUnlock()
+
+       // Subsequent requests should be allowed in closed state
+       assert.True(t, p.isRequestAllowed(node), "requests should be allowed in 
closed state")
+}
+
+func TestCircuitBreakerSingleProbeFailure(t *testing.T) {
+       p := &pub{
+               cbStates: make(map[string]*circuitState),
+               cbMu:     sync.RWMutex{},
+               log:      logger.GetLogger("test"),
+       }
+
+       node := testNodeName
+
+       // Setup half-open state with probe in flight
+       p.recordSuccess(node)
+       for i := 0; i < defaultCBThreshold; i++ {
+               p.recordFailure(node)
+       }
+
+       p.cbMu.Lock()
+       cb := p.cbStates[node]
+       cb.openTime = time.Now().Add(-defaultCBResetTimeout - time.Second)
+       p.cbMu.Unlock()
+
+       // Transition to half-open
+       allowed := p.isRequestAllowed(node)
+       assert.True(t, allowed, "first request should be allowed")
+
+       // Verify probe is in flight
+       p.cbMu.RLock()
+       assert.True(t, cb.halfOpenProbeInFlight, "probe should be in flight")
+       p.cbMu.RUnlock()
+
+       // Record failure (probe fails)
+       p.recordFailure(node)
+
+       // Verify circuit is back to open and probe token is cleared
+       p.cbMu.RLock()
+       assert.Equal(t, StateOpen, cb.state, "circuit should be back to open 
after failed probe")
+       assert.False(t, cb.halfOpenProbeInFlight, "probe token should be 
cleared")
+       p.cbMu.RUnlock()
+
+       // Subsequent requests should be denied in open state
+       assert.False(t, p.isRequestAllowed(node), "requests should be denied in 
open state")
+}
+
+func TestCircuitBreakerConcurrentProbeAttempts(t *testing.T) {
+       p := &pub{
+               cbStates: make(map[string]*circuitState),
+               cbMu:     sync.RWMutex{},
+               log:      logger.GetLogger("test"),
+       }
+
+       node := testNodeName
+
+       // Setup open state ready for half-open transition
+       p.recordSuccess(node)
+       for i := 0; i < defaultCBThreshold; i++ {
+               p.recordFailure(node)
+       }
+
+       p.cbMu.Lock()
+       cb := p.cbStates[node]
+       cb.openTime = time.Now().Add(-defaultCBResetTimeout - time.Second)
+       p.cbMu.Unlock()
+
+       const numGoroutines = 10
+       var wg sync.WaitGroup
+       results := make(chan bool, numGoroutines)
+
+       // Simulate concurrent requests attempting to probe
+       for i := 0; i < numGoroutines; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       allowed := p.isRequestAllowed(node)
+                       results <- allowed
+               }()
+       }
+
+       wg.Wait()
+       close(results)
+
+       // Count allowed requests
+       allowedCount := 0
+       for result := range results {
+               if result {
+                       allowedCount++
+               }
+       }
+
+       // Only one request should be allowed (the one that set the probe token)
+       assert.Equal(t, 1, allowedCount, "exactly one request should be allowed 
in half-open state")
+
+       // Verify circuit is in half-open with probe in flight
+       p.cbMu.RLock()
+       assert.Equal(t, StateHalfOpen, cb.state, "circuit should be in 
half-open state")
+       assert.True(t, cb.halfOpenProbeInFlight, "probe should be marked as in 
flight")
+       p.cbMu.RUnlock()
+}
+
+func TestCircuitBreakerProbeTokenInitialization(t *testing.T) {
+       p := &pub{
+               cbStates: make(map[string]*circuitState),
+               cbMu:     sync.RWMutex{},
+               log:      logger.GetLogger("test"),
+       }
+
+       node := testNodeName
+
+       // Test that new circuit breaker states have probe token cleared
+       p.recordSuccess(node)
+
+       p.cbMu.RLock()
+       cb := p.cbStates[node]
+       assert.False(t, cb.halfOpenProbeInFlight, "new circuit breaker should 
have probe token cleared")
+       p.cbMu.RUnlock()
+}
diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go
index 73f44491..cf665dc3 100644
--- a/banyand/queue/pub/client.go
+++ b/banyand/queue/pub/client.go
@@ -160,10 +160,11 @@ const (
 
 // circuitState holds circuit breaker metadata; it does NOT duplicate gRPC 
clients/conns.
 type circuitState struct {
-       lastFailureTime     time.Time
-       openTime            time.Time
-       state               CircuitState
-       consecutiveFailures int
+       lastFailureTime       time.Time
+       openTime              time.Time
+       state                 CircuitState
+       consecutiveFailures   int
+       halfOpenProbeInFlight bool
 }
 
 type client struct {
@@ -654,14 +655,21 @@ func (p *pub) isRequestAllowed(node string) bool {
        case StateOpen:
                // Check if cooldown period has expired
                if time.Since(cb.openTime) >= defaultCBResetTimeout {
-                       // Transition to Half-Open to allow probe requests
+                       // Transition to Half-Open to allow a single probe 
request
                        cb.state = StateHalfOpen
+                       cb.halfOpenProbeInFlight = true // Set token for the 
probe
                        p.log.Info().Str("node", node).Msg("circuit breaker 
transitioned to half-open")
                        return true
                }
                return false // Still in cooldown period
        case StateHalfOpen:
-               // Allow requests in half-open state (probe requests)
+               // In half-open state, deny requests if probe is already in 
flight
+               if cb.halfOpenProbeInFlight {
+                       return false // Probe already in progress, deny 
additional requests
+               }
+               // This case should not normally happen since we set the token 
on transition,
+               // but handle it defensively by allowing the request and 
setting the token
+               cb.halfOpenProbeInFlight = true
                return true
        default:
                return true
@@ -689,6 +697,7 @@ func (p *pub) recordSuccess(node string) {
        cb.consecutiveFailures = 0
        cb.lastFailureTime = time.Time{}
        cb.openTime = time.Time{}
+       cb.halfOpenProbeInFlight = false // Clear probe token
 }
 
 // recordFailure updates the circuit breaker state on failed operation.
@@ -720,6 +729,7 @@ func (p *pub) recordFailure(node string) {
                // Failed during half-open, go back to open
                cb.state = StateOpen
                cb.openTime = time.Now()
+               cb.halfOpenProbeInFlight = false // Clear probe token
                p.log.Warn().Str("node", node).Msg("circuit breaker reopened 
after half-open failure")
        }
 }

Reply via email to