This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new edd5c71  NackBackoffPolicy.Next return time.Duration (#834)
edd5c71 is described below

commit edd5c71651b79bd35358a51ae3925905ed9f17e1
Author: Huanghy <h...@h-hy.com>
AuthorDate: Thu Sep 15 06:36:48 2022 +0800

    NackBackoffPolicy.Next return time.Duration (#834)
    
    Co-authored-by: tevinhuang <tevinhu...@tencent.com>
---
 pulsar/negative_acks_tracker.go        |  4 ++--
 pulsar/negative_acks_tracker_test.go   | 14 +++++++++-----
 pulsar/negative_backoff_policy.go      | 17 ++++++++++-------
 pulsar/negative_backoff_policy_test.go |  5 +++--
 4 files changed, 24 insertions(+), 16 deletions(-)

diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go
index 3485e1b..79ed694 100644
--- a/pulsar/negative_acks_tracker.go
+++ b/pulsar/negative_acks_tracker.go
@@ -54,7 +54,7 @@ func newNegativeAcksTracker(rc redeliveryConsumer, delay 
time.Duration,
 
        if nackBackoffPolicy != nil {
                firstDelayForNackBackoff := nackBackoffPolicy.Next(1)
-               t.delay = time.Duration(firstDelayForNackBackoff)
+               t.delay = firstDelayForNackBackoff
        } else {
                t.delay = delay
        }
@@ -109,7 +109,7 @@ func (t *negativeAcksTracker) AddMessage(msg Message) {
                return
        }
 
-       targetTime := time.Now().Add(time.Duration(nackBackoffDelay))
+       targetTime := time.Now().Add(nackBackoffDelay)
        t.negativeAcks[batchMsgID] = targetTime
 }
 
diff --git a/pulsar/negative_acks_tracker_test.go 
b/pulsar/negative_acks_tracker_test.go
index 537f0da..5faa947 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -35,14 +35,18 @@ type nackMockedConsumer struct {
        lock   sync.Mutex
 }
 
-func newNackMockedConsumer() *nackMockedConsumer {
+func newNackMockedConsumer(nackBackoffPolicy NackBackoffPolicy) 
*nackMockedConsumer {
        t := &nackMockedConsumer{
                ch: make(chan messageID, 10),
        }
        go func() {
                // since the client ticks at an interval of delay / 3
                // wait another interval to ensure we get all messages
-               time.Sleep(testNackDelay + 101*time.Millisecond)
+               if nackBackoffPolicy == nil {
+                       time.Sleep(testNackDelay + 101*time.Millisecond)
+               } else {
+                       time.Sleep(nackBackoffPolicy.Next(1) + 
101*time.Millisecond)
+               }
                t.lock.Lock()
                defer t.lock.Unlock()
                t.closed = true
@@ -74,7 +78,7 @@ func (nmc *nackMockedConsumer) Wait() <-chan messageID {
 }
 
 func TestNacksTracker(t *testing.T) {
-       nmc := newNackMockedConsumer()
+       nmc := newNackMockedConsumer(nil)
        nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, 
log.DefaultNopLogger())
 
        nacks.Add(messageID{
@@ -107,7 +111,7 @@ func TestNacksTracker(t *testing.T) {
 }
 
 func TestNacksWithBatchesTracker(t *testing.T) {
-       nmc := newNackMockedConsumer()
+       nmc := newNackMockedConsumer(nil)
        nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, 
log.DefaultNopLogger())
 
        nacks.Add(messageID{
@@ -150,7 +154,7 @@ func TestNacksWithBatchesTracker(t *testing.T) {
 }
 
 func TestNackBackoffTracker(t *testing.T) {
-       nmc := newNackMockedConsumer()
+       nmc := newNackMockedConsumer(new(defaultNackBackoffPolicy))
        nacks := newNegativeAcksTracker(nmc, testNackDelay, 
new(defaultNackBackoffPolicy), log.DefaultNopLogger())
 
        nacks.AddMessage(new(mockMessage1))
diff --git a/pulsar/negative_backoff_policy.go 
b/pulsar/negative_backoff_policy.go
index cf080ad..be72bfa 100644
--- a/pulsar/negative_backoff_policy.go
+++ b/pulsar/negative_backoff_policy.go
@@ -17,7 +17,10 @@
 
 package pulsar
 
-import "math"
+import (
+       "math"
+       "time"
+)
 
 // NackBackoffPolicy is a interface for custom message negativeAcked policy, 
users can specify a NackBackoffPolicy
 // for a consumer.
@@ -28,19 +31,19 @@ import "math"
 type NackBackoffPolicy interface {
        // The redeliveryCount indicates the number of times the message was 
redelivered.
        // We can get the redeliveryCount from the CommandMessage.
-       Next(redeliveryCount uint32) int64
+       Next(redeliveryCount uint32) time.Duration
 }
 
 // defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
 type defaultNackBackoffPolicy struct{}
 
-func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
-       minNackTimeMs := int64(1000 * 30) // 30sec
-       maxNackTimeMs := 1000 * 60 * 10   // 10min
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) 
time.Duration {
+       minNackTime := 1 * time.Second  // 1sec
+       maxNackTime := 10 * time.Minute // 10min
 
        if redeliveryCount < 0 {
-               return minNackTimeMs
+               return minNackTime
        }
 
-       return 
int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), 
float64(maxNackTimeMs)))
+       return 
time.Duration(math.Min(math.Abs(float64(minNackTime<<redeliveryCount)), 
float64(maxNackTime)))
 }
diff --git a/pulsar/negative_backoff_policy_test.go 
b/pulsar/negative_backoff_policy_test.go
index bfbb6a8..f37cf7c 100644
--- a/pulsar/negative_backoff_policy_test.go
+++ b/pulsar/negative_backoff_policy_test.go
@@ -19,6 +19,7 @@ package pulsar
 
 import (
        "testing"
+       "time"
 
        "github.com/stretchr/testify/assert"
 )
@@ -27,8 +28,8 @@ func TestDefaultNackBackoffPolicy_Next(t *testing.T) {
        defaultNackBackoff := new(defaultNackBackoffPolicy)
 
        res0 := defaultNackBackoff.Next(0)
-       assert.Equal(t, int64(1000*30), res0)
+       assert.Equal(t, 1*time.Second, res0)
 
        res5 := defaultNackBackoff.Next(5)
-       assert.Equal(t, int64(600000), res5)
+       assert.Equal(t, 32*time.Second, res5)
 }

Reply via email to