This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new edd5c71 NackBackoffPolicy.Next return time.Duration (#834) edd5c71 is described below commit edd5c71651b79bd35358a51ae3925905ed9f17e1 Author: Huanghy <h...@h-hy.com> AuthorDate: Thu Sep 15 06:36:48 2022 +0800 NackBackoffPolicy.Next return time.Duration (#834) Co-authored-by: tevinhuang <tevinhu...@tencent.com> --- pulsar/negative_acks_tracker.go | 4 ++-- pulsar/negative_acks_tracker_test.go | 14 +++++++++----- pulsar/negative_backoff_policy.go | 17 ++++++++++------- pulsar/negative_backoff_policy_test.go | 5 +++-- 4 files changed, 24 insertions(+), 16 deletions(-) diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go index 3485e1b..79ed694 100644 --- a/pulsar/negative_acks_tracker.go +++ b/pulsar/negative_acks_tracker.go @@ -54,7 +54,7 @@ func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, if nackBackoffPolicy != nil { firstDelayForNackBackoff := nackBackoffPolicy.Next(1) - t.delay = time.Duration(firstDelayForNackBackoff) + t.delay = firstDelayForNackBackoff } else { t.delay = delay } @@ -109,7 +109,7 @@ func (t *negativeAcksTracker) AddMessage(msg Message) { return } - targetTime := time.Now().Add(time.Duration(nackBackoffDelay)) + targetTime := time.Now().Add(nackBackoffDelay) t.negativeAcks[batchMsgID] = targetTime } diff --git a/pulsar/negative_acks_tracker_test.go b/pulsar/negative_acks_tracker_test.go index 537f0da..5faa947 100644 --- a/pulsar/negative_acks_tracker_test.go +++ b/pulsar/negative_acks_tracker_test.go @@ -35,14 +35,18 @@ type nackMockedConsumer struct { lock sync.Mutex } -func newNackMockedConsumer() *nackMockedConsumer { +func newNackMockedConsumer(nackBackoffPolicy NackBackoffPolicy) *nackMockedConsumer { t := &nackMockedConsumer{ ch: make(chan messageID, 10), } go func() { // since the client ticks at an interval of delay / 3 // wait another interval to ensure we get all messages - time.Sleep(testNackDelay + 101*time.Millisecond) + if nackBackoffPolicy == nil { + time.Sleep(testNackDelay + 101*time.Millisecond) + } else { + time.Sleep(nackBackoffPolicy.Next(1) + 101*time.Millisecond) + } t.lock.Lock() defer t.lock.Unlock() t.closed = true @@ -74,7 +78,7 @@ func (nmc *nackMockedConsumer) Wait() <-chan messageID { } func TestNacksTracker(t *testing.T) { - nmc := newNackMockedConsumer() + nmc := newNackMockedConsumer(nil) nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, log.DefaultNopLogger()) nacks.Add(messageID{ @@ -107,7 +111,7 @@ func TestNacksTracker(t *testing.T) { } func TestNacksWithBatchesTracker(t *testing.T) { - nmc := newNackMockedConsumer() + nmc := newNackMockedConsumer(nil) nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, log.DefaultNopLogger()) nacks.Add(messageID{ @@ -150,7 +154,7 @@ func TestNacksWithBatchesTracker(t *testing.T) { } func TestNackBackoffTracker(t *testing.T) { - nmc := newNackMockedConsumer() + nmc := newNackMockedConsumer(new(defaultNackBackoffPolicy)) nacks := newNegativeAcksTracker(nmc, testNackDelay, new(defaultNackBackoffPolicy), log.DefaultNopLogger()) nacks.AddMessage(new(mockMessage1)) diff --git a/pulsar/negative_backoff_policy.go b/pulsar/negative_backoff_policy.go index cf080ad..be72bfa 100644 --- a/pulsar/negative_backoff_policy.go +++ b/pulsar/negative_backoff_policy.go @@ -17,7 +17,10 @@ package pulsar -import "math" +import ( + "math" + "time" +) // NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy // for a consumer. @@ -28,19 +31,19 @@ import "math" type NackBackoffPolicy interface { // The redeliveryCount indicates the number of times the message was redelivered. // We can get the redeliveryCount from the CommandMessage. - Next(redeliveryCount uint32) int64 + Next(redeliveryCount uint32) time.Duration } // defaultNackBackoffPolicy is default impl for NackBackoffPolicy. type defaultNackBackoffPolicy struct{} -func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 { - minNackTimeMs := int64(1000 * 30) // 30sec - maxNackTimeMs := 1000 * 60 * 10 // 10min +func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) time.Duration { + minNackTime := 1 * time.Second // 1sec + maxNackTime := 10 * time.Minute // 10min if redeliveryCount < 0 { - return minNackTimeMs + return minNackTime } - return int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), float64(maxNackTimeMs))) + return time.Duration(math.Min(math.Abs(float64(minNackTime<<redeliveryCount)), float64(maxNackTime))) } diff --git a/pulsar/negative_backoff_policy_test.go b/pulsar/negative_backoff_policy_test.go index bfbb6a8..f37cf7c 100644 --- a/pulsar/negative_backoff_policy_test.go +++ b/pulsar/negative_backoff_policy_test.go @@ -19,6 +19,7 @@ package pulsar import ( "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -27,8 +28,8 @@ func TestDefaultNackBackoffPolicy_Next(t *testing.T) { defaultNackBackoff := new(defaultNackBackoffPolicy) res0 := defaultNackBackoff.Next(0) - assert.Equal(t, int64(1000*30), res0) + assert.Equal(t, 1*time.Second, res0) res5 := defaultNackBackoff.Next(5) - assert.Equal(t, int64(600000), res5) + assert.Equal(t, 32*time.Second, res5) }