This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch pub/perf in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 971f583ee0dfb1ae3e3d00aacf0f260f6c443e30 Author: Gao Hongtao <[email protected]> AuthorDate: Thu Sep 4 17:04:56 2025 +0800 Refactor circuit breaker tests and enhance probe logic - Introduced a constant for the test node name to improve code readability and maintainability. - Updated circuit breaker tests to use the new constant, ensuring consistency across test cases. --- banyand/queue/pub/circuitbreaker_test.go | 236 ++++++++++++++++++++++++++++--- banyand/queue/pub/client.go | 22 ++- 2 files changed, 235 insertions(+), 23 deletions(-) diff --git a/banyand/queue/pub/circuitbreaker_test.go b/banyand/queue/pub/circuitbreaker_test.go index 9f1976b1..73181b96 100644 --- a/banyand/queue/pub/circuitbreaker_test.go +++ b/banyand/queue/pub/circuitbreaker_test.go @@ -28,6 +28,8 @@ import ( "github.com/apache/skywalking-banyandb/pkg/logger" ) +const testNodeName = "test-node" + func TestCircuitBreakerStateTransitions(t *testing.T) { tests := []struct { name string @@ -39,7 +41,7 @@ func TestCircuitBreakerStateTransitions(t *testing.T) { { name: "closed_to_open_after_failures", setup: func(p *pub) { - p.recordSuccess("test-node") + p.recordSuccess(testNodeName) }, actions: []func(*pub, string){ func(p *pub, node string) { p.recordFailure(node) }, @@ -54,7 +56,7 @@ func TestCircuitBreakerStateTransitions(t *testing.T) { { name: "closed_remains_closed_below_threshold", setup: func(p *pub) { - p.recordSuccess("test-node") + p.recordSuccess(testNodeName) }, actions: []func(*pub, string){ func(p *pub, node string) { p.recordFailure(node) }, @@ -67,14 +69,14 @@ func TestCircuitBreakerStateTransitions(t *testing.T) { { name: "open_to_half_open_after_cooldown", setup: func(p *pub) { - p.recordSuccess("test-node") + p.recordSuccess(testNodeName) // Trip the circuit breaker for i := 0; i < defaultCBThreshold; i++ { - p.recordFailure("test-node") + p.recordFailure(testNodeName) } // Simulate cooldown period has passed p.cbMu.Lock() - cb := p.cbStates["test-node"] + cb := p.cbStates[testNodeName] cb.openTime = time.Now().Add(-defaultCBResetTimeout - time.Second) p.cbMu.Unlock() }, @@ -85,14 +87,14 @@ func TestCircuitBreakerStateTransitions(t *testing.T) { { name: "half_open_to_closed_on_success", setup: func(p *pub) { - p.recordSuccess("test-node") + p.recordSuccess(testNodeName) // Trip the circuit breaker for i := 0; i < defaultCBThreshold; i++ { - p.recordFailure("test-node") + p.recordFailure(testNodeName) } // Set to half-open state p.cbMu.Lock() - cb := p.cbStates["test-node"] + cb := p.cbStates[testNodeName] cb.state = StateHalfOpen p.cbMu.Unlock() }, @@ -105,14 +107,14 @@ func TestCircuitBreakerStateTransitions(t *testing.T) { { name: "half_open_to_open_on_failure", setup: func(p *pub) { - p.recordSuccess("test-node") + p.recordSuccess(testNodeName) // Trip the circuit breaker for i := 0; i < defaultCBThreshold; i++ { - p.recordFailure("test-node") + p.recordFailure(testNodeName) } // Set to half-open state p.cbMu.Lock() - cb := p.cbStates["test-node"] + cb := p.cbStates[testNodeName] cb.state = StateHalfOpen p.cbMu.Unlock() }, @@ -139,19 +141,19 @@ func TestCircuitBreakerStateTransitions(t *testing.T) { // Execute actions for _, action := range tt.actions { - action(p, "test-node") + action(p, testNodeName) } // Check final state p.cbMu.RLock() - cb, exists := p.cbStates["test-node"] + cb, exists := p.cbStates[testNodeName] p.cbMu.RUnlock() require.True(t, exists, "circuit breaker state should exist") assert.Equal(t, tt.expectedState, cb.state, "circuit breaker state mismatch") // Check if requests are allowed - allowed := p.isRequestAllowed("test-node") + allowed := p.isRequestAllowed(testNodeName) assert.Equal(t, tt.allowsRequests, allowed, "request allowance mismatch") }) } @@ -166,7 +168,7 @@ func TestCircuitBreakerConcurrency(t *testing.T) { const numGoroutines = 100 const numOperations = 50 - node := "test-node" + node := testNodeName var wg sync.WaitGroup wg.Add(numGoroutines * 2) // Half for success, half for failure @@ -260,7 +262,7 @@ func TestCircuitBreakerRecoveryAfterCooldown(t *testing.T) { log: logger.GetLogger("test"), } - node := "test-node" + node := testNodeName // Initialize node p.recordSuccess(node) @@ -329,7 +331,7 @@ func TestCircuitBreakerFailureThresholdEdgeCase(t *testing.T) { log: logger.GetLogger("test"), } - node := "test-node" + node := testNodeName // Initialize node p.recordSuccess(node) @@ -354,3 +356,203 @@ func TestCircuitBreakerFailureThresholdEdgeCase(t *testing.T) { assert.Equal(t, defaultCBThreshold, cb.consecutiveFailures, "failure count should be at threshold") p.cbMu.RUnlock() } + +func TestCircuitBreakerSingleProbeEnforcement(t *testing.T) { + p := &pub{ + cbStates: make(map[string]*circuitState), + cbMu: sync.RWMutex{}, + log: logger.GetLogger("test"), + } + + node := testNodeName + + // Initialize and trip circuit breaker + p.recordSuccess(node) + for i := 0; i < defaultCBThreshold; i++ { + p.recordFailure(node) + } + + // Verify circuit is open + assert.False(t, p.isRequestAllowed(node), "circuit should be open") + + // Simulate cooldown period passage + p.cbMu.Lock() + cb := p.cbStates[node] + cb.openTime = time.Now().Add(-defaultCBResetTimeout - time.Second) + p.cbMu.Unlock() + + // First request should transition to half-open and be allowed + allowed1 := p.isRequestAllowed(node) + assert.True(t, allowed1, "first request after cooldown should be allowed and transition to half-open") + + // Verify state is half-open with probe in flight + p.cbMu.RLock() + assert.Equal(t, StateHalfOpen, cb.state, "circuit should be in half-open state") + assert.True(t, cb.halfOpenProbeInFlight, "probe should be marked as in flight") + p.cbMu.RUnlock() + + // Second request should be denied while probe is in flight + allowed2 := p.isRequestAllowed(node) + assert.False(t, allowed2, "second request should be denied while probe is in flight") + + // Third request should also be denied + allowed3 := p.isRequestAllowed(node) + assert.False(t, allowed3, "third request should also be denied while probe is in flight") +} + +func TestCircuitBreakerSingleProbeSuccess(t *testing.T) { + p := &pub{ + cbStates: make(map[string]*circuitState), + cbMu: sync.RWMutex{}, + log: logger.GetLogger("test"), + } + + node := testNodeName + + // Setup half-open state with probe in flight + p.recordSuccess(node) + for i := 0; i < defaultCBThreshold; i++ { + p.recordFailure(node) + } + + p.cbMu.Lock() + cb := p.cbStates[node] + cb.openTime = time.Now().Add(-defaultCBResetTimeout - time.Second) + p.cbMu.Unlock() + + // Transition to half-open + allowed := p.isRequestAllowed(node) + assert.True(t, allowed, "first request should be allowed") + + // Verify probe is in flight + p.cbMu.RLock() + assert.True(t, cb.halfOpenProbeInFlight, "probe should be in flight") + p.cbMu.RUnlock() + + // Record success (probe succeeds) + p.recordSuccess(node) + + // Verify circuit is closed and probe token is cleared + p.cbMu.RLock() + assert.Equal(t, StateClosed, cb.state, "circuit should be closed after successful probe") + assert.False(t, cb.halfOpenProbeInFlight, "probe token should be cleared") + p.cbMu.RUnlock() + + // Subsequent requests should be allowed in closed state + assert.True(t, p.isRequestAllowed(node), "requests should be allowed in closed state") +} + +func TestCircuitBreakerSingleProbeFailure(t *testing.T) { + p := &pub{ + cbStates: make(map[string]*circuitState), + cbMu: sync.RWMutex{}, + log: logger.GetLogger("test"), + } + + node := testNodeName + + // Setup half-open state with probe in flight + p.recordSuccess(node) + for i := 0; i < defaultCBThreshold; i++ { + p.recordFailure(node) + } + + p.cbMu.Lock() + cb := p.cbStates[node] + cb.openTime = time.Now().Add(-defaultCBResetTimeout - time.Second) + p.cbMu.Unlock() + + // Transition to half-open + allowed := p.isRequestAllowed(node) + assert.True(t, allowed, "first request should be allowed") + + // Verify probe is in flight + p.cbMu.RLock() + assert.True(t, cb.halfOpenProbeInFlight, "probe should be in flight") + p.cbMu.RUnlock() + + // Record failure (probe fails) + p.recordFailure(node) + + // Verify circuit is back to open and probe token is cleared + p.cbMu.RLock() + assert.Equal(t, StateOpen, cb.state, "circuit should be back to open after failed probe") + assert.False(t, cb.halfOpenProbeInFlight, "probe token should be cleared") + p.cbMu.RUnlock() + + // Subsequent requests should be denied in open state + assert.False(t, p.isRequestAllowed(node), "requests should be denied in open state") +} + +func TestCircuitBreakerConcurrentProbeAttempts(t *testing.T) { + p := &pub{ + cbStates: make(map[string]*circuitState), + cbMu: sync.RWMutex{}, + log: logger.GetLogger("test"), + } + + node := testNodeName + + // Setup open state ready for half-open transition + p.recordSuccess(node) + for i := 0; i < defaultCBThreshold; i++ { + p.recordFailure(node) + } + + p.cbMu.Lock() + cb := p.cbStates[node] + cb.openTime = time.Now().Add(-defaultCBResetTimeout - time.Second) + p.cbMu.Unlock() + + const numGoroutines = 10 + var wg sync.WaitGroup + results := make(chan bool, numGoroutines) + + // Simulate concurrent requests attempting to probe + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + allowed := p.isRequestAllowed(node) + results <- allowed + }() + } + + wg.Wait() + close(results) + + // Count allowed requests + allowedCount := 0 + for result := range results { + if result { + allowedCount++ + } + } + + // Only one request should be allowed (the one that set the probe token) + assert.Equal(t, 1, allowedCount, "exactly one request should be allowed in half-open state") + + // Verify circuit is in half-open with probe in flight + p.cbMu.RLock() + assert.Equal(t, StateHalfOpen, cb.state, "circuit should be in half-open state") + assert.True(t, cb.halfOpenProbeInFlight, "probe should be marked as in flight") + p.cbMu.RUnlock() +} + +func TestCircuitBreakerProbeTokenInitialization(t *testing.T) { + p := &pub{ + cbStates: make(map[string]*circuitState), + cbMu: sync.RWMutex{}, + log: logger.GetLogger("test"), + } + + node := testNodeName + + // Test that new circuit breaker states have probe token cleared + p.recordSuccess(node) + + p.cbMu.RLock() + cb := p.cbStates[node] + assert.False(t, cb.halfOpenProbeInFlight, "new circuit breaker should have probe token cleared") + p.cbMu.RUnlock() +} diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go index 73f44491..cf665dc3 100644 --- a/banyand/queue/pub/client.go +++ b/banyand/queue/pub/client.go @@ -160,10 +160,11 @@ const ( // circuitState holds circuit breaker metadata; it does NOT duplicate gRPC clients/conns. type circuitState struct { - lastFailureTime time.Time - openTime time.Time - state CircuitState - consecutiveFailures int + lastFailureTime time.Time + openTime time.Time + state CircuitState + consecutiveFailures int + halfOpenProbeInFlight bool } type client struct { @@ -654,14 +655,21 @@ func (p *pub) isRequestAllowed(node string) bool { case StateOpen: // Check if cooldown period has expired if time.Since(cb.openTime) >= defaultCBResetTimeout { - // Transition to Half-Open to allow probe requests + // Transition to Half-Open to allow a single probe request cb.state = StateHalfOpen + cb.halfOpenProbeInFlight = true // Set token for the probe p.log.Info().Str("node", node).Msg("circuit breaker transitioned to half-open") return true } return false // Still in cooldown period case StateHalfOpen: - // Allow requests in half-open state (probe requests) + // In half-open state, deny requests if probe is already in flight + if cb.halfOpenProbeInFlight { + return false // Probe already in progress, deny additional requests + } + // This case should not normally happen since we set the token on transition, + // but handle it defensively by allowing the request and setting the token + cb.halfOpenProbeInFlight = true return true default: return true @@ -689,6 +697,7 @@ func (p *pub) recordSuccess(node string) { cb.consecutiveFailures = 0 cb.lastFailureTime = time.Time{} cb.openTime = time.Time{} + cb.halfOpenProbeInFlight = false // Clear probe token } // recordFailure updates the circuit breaker state on failed operation. @@ -720,6 +729,7 @@ func (p *pub) recordFailure(node string) { // Failed during half-open, go back to open cb.state = StateOpen cb.openTime = time.Now() + cb.halfOpenProbeInFlight = false // Clear probe token p.log.Warn().Str("node", node).Msg("circuit breaker reopened after half-open failure") } }
