Copilot commented on code in PR #1457:
URL: https://github.com/apache/pulsar-client-go/pull/1457#discussion_r2745067307


##########
pulsar/producer_partition.go:
##########
@@ -523,9 +523,10 @@ func (p *partitionProducer) 
reconnectToBroker(connectionClosed *connectionClosed
                }
 
                if strings.Contains(errMsg, 
errMsgProducerBlockedQuotaExceededException) {
+                       //      ProducerBlockedQuotaExceededException is a 
retryable exception,
+                       //      we only fail pending messages but continue 
trying to reconnect
                        p.log.Warn("Producer was blocked by quota exceed 
exception, failing pending messages, stop reconnecting")

Review Comment:
   The log message states "stop reconnecting" but the code change actually 
allows reconnection to continue (which is the intent of this PR). The log 
message should be updated to accurately reflect that reconnection will 
continue, for example: "Producer was blocked by quota exceed exception, failing 
pending messages, will retry reconnecting"



##########
pulsar/producer_test.go:
##########
@@ -2952,3 +2953,135 @@ func TestPartitionUpdateFailed(t *testing.T) {
                time.Sleep(time.Second * 1)
        }
 }
+
+type testReconnectBackoffPolicy struct {
+       curBackoff, minBackoff, maxBackoff time.Duration
+       retryTime                          int
+       lock                               sync.Mutex
+}
+
+func newTestReconnectBackoffPolicy(minBackoff, maxBackoff time.Duration) 
*testReconnectBackoffPolicy {
+       return &testReconnectBackoffPolicy{
+               curBackoff: 0,
+               minBackoff: minBackoff,
+               maxBackoff: maxBackoff,
+       }
+}
+
+func (b *testReconnectBackoffPolicy) Next() time.Duration {
+       b.lock.Lock()
+       defer b.lock.Unlock()
+
+       // Double the delay each time
+       b.curBackoff += b.curBackoff
+       if b.curBackoff.Nanoseconds() < b.minBackoff.Nanoseconds() {
+               b.curBackoff = b.minBackoff
+       } else if b.curBackoff.Nanoseconds() > b.maxBackoff.Nanoseconds() {
+               b.curBackoff = b.maxBackoff
+       }
+       b.retryTime++
+       return b.curBackoff
+}
+func (b *testReconnectBackoffPolicy) IsMaxBackoffReached() bool {
+       b.lock.Lock()
+       defer b.lock.Unlock()
+       return b.curBackoff >= b.maxBackoff
+}
+
+func (b *testReconnectBackoffPolicy) Reset() {
+}
+
+func (b *testReconnectBackoffPolicy) IsExpectedIntervalFrom() bool {
+       return true

Review Comment:
   The IsExpectedIntervalFrom method signature is incorrect. It should accept a 
time.Time parameter like the testBackoffPolicy in reader_test.go (line 862). 
The current implementation returns a constant true value and doesn't validate 
timing at all, making it useless for testing.
   ```suggestion
   func (b *testReconnectBackoffPolicy) IsExpectedIntervalFrom(start time.Time) 
bool {
        // Verify that the elapsed time since 'start' is close to the current 
backoff interval.
        b.lock.Lock()
        expected := b.curBackoff
        b.lock.Unlock()
   
        // If no backoff is expected yet, do not enforce any interval.
        if expected <= 0 {
                return true
        }
   
        elapsed := time.Since(start)
   
        // Allow some tolerance around the expected backoff interval (50%–150%).
        lowerBound := expected / 2
        upperBound := expected + expected/2
   
        return elapsed >= lowerBound && elapsed <= upperBound
   ```



##########
pulsar/producer_test.go:
##########
@@ -2952,3 +2953,135 @@ func TestPartitionUpdateFailed(t *testing.T) {
                time.Sleep(time.Second * 1)
        }
 }
+
+type testReconnectBackoffPolicy struct {
+       curBackoff, minBackoff, maxBackoff time.Duration
+       retryTime                          int
+       lock                               sync.Mutex
+}
+
+func newTestReconnectBackoffPolicy(minBackoff, maxBackoff time.Duration) 
*testReconnectBackoffPolicy {
+       return &testReconnectBackoffPolicy{
+               curBackoff: 0,
+               minBackoff: minBackoff,
+               maxBackoff: maxBackoff,
+       }
+}
+
+func (b *testReconnectBackoffPolicy) Next() time.Duration {
+       b.lock.Lock()
+       defer b.lock.Unlock()
+
+       // Double the delay each time
+       b.curBackoff += b.curBackoff
+       if b.curBackoff.Nanoseconds() < b.minBackoff.Nanoseconds() {
+               b.curBackoff = b.minBackoff
+       } else if b.curBackoff.Nanoseconds() > b.maxBackoff.Nanoseconds() {
+               b.curBackoff = b.maxBackoff
+       }
+       b.retryTime++
+       return b.curBackoff
+}
+func (b *testReconnectBackoffPolicy) IsMaxBackoffReached() bool {
+       b.lock.Lock()
+       defer b.lock.Unlock()
+       return b.curBackoff >= b.maxBackoff
+}
+
+func (b *testReconnectBackoffPolicy) Reset() {

Review Comment:
   The Reset method is empty but should reset curBackoff to 0 to match the 
interface contract and the behavior of DefaultBackoff. While this may not 
affect the test since Reset isn't called in the test scenario, it makes the 
implementation incomplete and could lead to unexpected behavior if the backoff 
policy is reused.
   ```suggestion
   func (b *testReconnectBackoffPolicy) Reset() {
        b.lock.Lock()
        defer b.lock.Unlock()
        b.curBackoff = 0
        b.retryTime = 0
   ```



##########
pulsar/producer_test.go:
##########
@@ -2952,3 +2953,135 @@ func TestPartitionUpdateFailed(t *testing.T) {
                time.Sleep(time.Second * 1)
        }
 }
+
+type testReconnectBackoffPolicy struct {
+       curBackoff, minBackoff, maxBackoff time.Duration
+       retryTime                          int
+       lock                               sync.Mutex
+}
+
+func newTestReconnectBackoffPolicy(minBackoff, maxBackoff time.Duration) 
*testReconnectBackoffPolicy {
+       return &testReconnectBackoffPolicy{
+               curBackoff: 0,
+               minBackoff: minBackoff,
+               maxBackoff: maxBackoff,
+       }
+}
+
+func (b *testReconnectBackoffPolicy) Next() time.Duration {
+       b.lock.Lock()
+       defer b.lock.Unlock()
+
+       // Double the delay each time
+       b.curBackoff += b.curBackoff
+       if b.curBackoff.Nanoseconds() < b.minBackoff.Nanoseconds() {
+               b.curBackoff = b.minBackoff
+       } else if b.curBackoff.Nanoseconds() > b.maxBackoff.Nanoseconds() {
+               b.curBackoff = b.maxBackoff
+       }
+       b.retryTime++
+       return b.curBackoff
+}
+func (b *testReconnectBackoffPolicy) IsMaxBackoffReached() bool {
+       b.lock.Lock()
+       defer b.lock.Unlock()
+       return b.curBackoff >= b.maxBackoff
+}
+
+func (b *testReconnectBackoffPolicy) Reset() {
+}
+
+func (b *testReconnectBackoffPolicy) IsExpectedIntervalFrom() bool {
+       return true
+}
+
+func TestProducerReconnectWhenBacklogQuotaExceed(t *testing.T) {
+       logger := slog.New(slog.NewJSONHandler(os.Stdout, 
&slog.HandlerOptions{Level: slog.LevelInfo}))
+       slog.SetDefault(logger)
+       client, err := NewClient(ClientOptions{
+               URL:    serviceURL,
+               Logger: plog.NewLoggerWithSlog(logger),
+       })
+       defer client.Close()
+       namespace := "public/" + generateRandomName()
+       assert.NoError(t, err)
+       admin, err := pulsaradmin.NewClient(&config.Config{
+               WebServiceURL: adminURL,
+       })
+       assert.NoError(t, err)
+       // Step 1: Create namespace and configure 512KB backlog quota with 
producer_exception policy
+       // When subscription backlog stats refresh and reach the limit, 
producer will encounter BlockQuotaExceed exception
+       err = admin.Namespaces().CreateNamespace(namespace)
+       assert.NoError(t, err)
+       err = admin.Namespaces().SetBacklogQuota(
+               namespace,
+               utils.NewBacklogQuota(512*1024, -1, utils.ProducerException),
+               utils.DestinationStorage,
+       )
+       assert.NoError(t, err)
+
+       // Verify backlog quota configuration
+       quotaMap, err := admin.Namespaces().GetBacklogQuotaMap(namespace)
+       assert.NoError(t, err)
+       logger.Info(fmt.Sprintf("quotaMap = %v", quotaMap))
+
+       // Create test topic
+       topicName := namespace + "/test-topic"
+       tn, err := utils.GetTopicName(topicName)
+       assert.NoError(t, err)
+       err = admin.Topics().Create(*tn, 1)
+       assert.NoError(t, err)
+
+       // Step 2: Create consumer with small receiver queue size and earliest 
subscription position
+       // This ensures that once 512KB message is sent, producer will quickly 
reach backlog quota limit
+       _consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                       topicName,
+               SubscriptionName:            "my-sub",
+               Type:                        Exclusive,
+               ReceiverQueueSize:           1,
+               SubscriptionInitialPosition: SubscriptionPositionEarliest,
+       })
+       assert.Nil(t, err)
+       defer _consumer.Close()
+
+       // Step 3: Create producer with custom backoff policy to reduce retry 
interval
+       bo := newTestReconnectBackoffPolicy(100*time.Millisecond, 1*time.Second)
+       _producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topicName,
+               DisableBatching: true,
+               SendTimeout:     5 * time.Minute,
+               BackOffPolicyFunc: func() backoff.Policy {
+                       return bo
+               },
+       })
+       assert.NoError(t, err)
+       defer _producer.Close()
+
+       // Step 4: Start goroutine to continuously send 512KB messages and 
monitor statistics
+       go func() {
+               for {
+                       // Producer sends 512KB message every 10 seconds
+                       _producer.SendAsync(context.Background(), 
&ProducerMessage{
+                               Payload: make([]byte, 512*1024),
+                       }, func(msgId MessageID, _ *ProducerMessage, err error) 
{
+                               if err != nil {
+                                       logger.Error("sendAsync fail", "time", 
time.Now().String(), "err", err.Error())
+                                       return
+                               }
+                               logger.Info("sendAsync success", "msgId", 
msgId.String(), "time", time.Now().String())
+                       })
+
+                       // Get topic statistics for debugging
+                       stats, err := admin.Topics().GetPartitionedStats(*tn, 
false)
+                       assert.NoError(t, err)

Review Comment:
   The goroutine calls assert.NoError which doesn't properly fail the test when 
called from a goroutine. The testing.T methods are not safe to call from 
goroutines other than the one running the test. Use require.NoError with proper 
error handling or send errors through a channel to be checked in the main test 
goroutine.



##########
pulsar/producer_test.go:
##########
@@ -2952,3 +2953,135 @@ func TestPartitionUpdateFailed(t *testing.T) {
                time.Sleep(time.Second * 1)
        }
 }
+
+type testReconnectBackoffPolicy struct {
+       curBackoff, minBackoff, maxBackoff time.Duration
+       retryTime                          int
+       lock                               sync.Mutex
+}
+
+func newTestReconnectBackoffPolicy(minBackoff, maxBackoff time.Duration) 
*testReconnectBackoffPolicy {
+       return &testReconnectBackoffPolicy{
+               curBackoff: 0,
+               minBackoff: minBackoff,
+               maxBackoff: maxBackoff,
+       }
+}
+
+func (b *testReconnectBackoffPolicy) Next() time.Duration {
+       b.lock.Lock()
+       defer b.lock.Unlock()
+
+       // Double the delay each time
+       b.curBackoff += b.curBackoff
+       if b.curBackoff.Nanoseconds() < b.minBackoff.Nanoseconds() {
+               b.curBackoff = b.minBackoff
+       } else if b.curBackoff.Nanoseconds() > b.maxBackoff.Nanoseconds() {
+               b.curBackoff = b.maxBackoff
+       }
+       b.retryTime++
+       return b.curBackoff
+}
+func (b *testReconnectBackoffPolicy) IsMaxBackoffReached() bool {
+       b.lock.Lock()
+       defer b.lock.Unlock()
+       return b.curBackoff >= b.maxBackoff
+}
+
+func (b *testReconnectBackoffPolicy) Reset() {
+}
+
+func (b *testReconnectBackoffPolicy) IsExpectedIntervalFrom() bool {
+       return true
+}
+
+func TestProducerReconnectWhenBacklogQuotaExceed(t *testing.T) {
+       logger := slog.New(slog.NewJSONHandler(os.Stdout, 
&slog.HandlerOptions{Level: slog.LevelInfo}))
+       slog.SetDefault(logger)
+       client, err := NewClient(ClientOptions{
+               URL:    serviceURL,
+               Logger: plog.NewLoggerWithSlog(logger),
+       })
+       defer client.Close()
+       namespace := "public/" + generateRandomName()
+       assert.NoError(t, err)
+       admin, err := pulsaradmin.NewClient(&config.Config{
+               WebServiceURL: adminURL,
+       })
+       assert.NoError(t, err)
+       // Step 1: Create namespace and configure 512KB backlog quota with 
producer_exception policy
+       // When subscription backlog stats refresh and reach the limit, 
producer will encounter BlockQuotaExceed exception
+       err = admin.Namespaces().CreateNamespace(namespace)
+       assert.NoError(t, err)
+       err = admin.Namespaces().SetBacklogQuota(
+               namespace,
+               utils.NewBacklogQuota(512*1024, -1, utils.ProducerException),
+               utils.DestinationStorage,
+       )
+       assert.NoError(t, err)
+
+       // Verify backlog quota configuration
+       quotaMap, err := admin.Namespaces().GetBacklogQuotaMap(namespace)
+       assert.NoError(t, err)
+       logger.Info(fmt.Sprintf("quotaMap = %v", quotaMap))
+
+       // Create test topic
+       topicName := namespace + "/test-topic"
+       tn, err := utils.GetTopicName(topicName)
+       assert.NoError(t, err)
+       err = admin.Topics().Create(*tn, 1)
+       assert.NoError(t, err)
+
+       // Step 2: Create consumer with small receiver queue size and earliest 
subscription position
+       // This ensures that once 512KB message is sent, producer will quickly 
reach backlog quota limit
+       _consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                       topicName,
+               SubscriptionName:            "my-sub",
+               Type:                        Exclusive,
+               ReceiverQueueSize:           1,
+               SubscriptionInitialPosition: SubscriptionPositionEarliest,
+       })
+       assert.Nil(t, err)
+       defer _consumer.Close()
+
+       // Step 3: Create producer with custom backoff policy to reduce retry 
interval
+       bo := newTestReconnectBackoffPolicy(100*time.Millisecond, 1*time.Second)
+       _producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topicName,
+               DisableBatching: true,
+               SendTimeout:     5 * time.Minute,
+               BackOffPolicyFunc: func() backoff.Policy {
+                       return bo
+               },
+       })
+       assert.NoError(t, err)
+       defer _producer.Close()
+
+       // Step 4: Start goroutine to continuously send 512KB messages and 
monitor statistics
+       go func() {
+               for {
+                       // Producer sends 512KB message every 10 seconds
+                       _producer.SendAsync(context.Background(), 
&ProducerMessage{
+                               Payload: make([]byte, 512*1024),
+                       }, func(msgId MessageID, _ *ProducerMessage, err error) 
{
+                               if err != nil {
+                                       logger.Error("sendAsync fail", "time", 
time.Now().String(), "err", err.Error())
+                                       return
+                               }
+                               logger.Info("sendAsync success", "msgId", 
msgId.String(), "time", time.Now().String())
+                       })
+
+                       // Get topic statistics for debugging
+                       stats, err := admin.Topics().GetPartitionedStats(*tn, 
false)
+                       assert.NoError(t, err)
+                       logger.Info("current stats", "stats", stats)
+                       time.Sleep(10 * time.Second)
+               }
+       }()

Review Comment:
   The goroutine started at line 3061 runs an infinite loop and is never 
stopped. This creates a goroutine leak that will continue running even after 
the test completes. The goroutine should be controlled with a context, channel, 
or some other mechanism to ensure it stops when the test ends. Additionally, 
the infinite loop can continue sending messages after the test assertion 
passes, potentially interfering with other tests.



##########
pulsar/producer_test.go:
##########
@@ -2952,3 +2953,135 @@ func TestPartitionUpdateFailed(t *testing.T) {
                time.Sleep(time.Second * 1)
        }
 }
+
+type testReconnectBackoffPolicy struct {
+       curBackoff, minBackoff, maxBackoff time.Duration
+       retryTime                          int
+       lock                               sync.Mutex
+}
+
+func newTestReconnectBackoffPolicy(minBackoff, maxBackoff time.Duration) 
*testReconnectBackoffPolicy {
+       return &testReconnectBackoffPolicy{
+               curBackoff: 0,
+               minBackoff: minBackoff,
+               maxBackoff: maxBackoff,
+       }
+}
+
+func (b *testReconnectBackoffPolicy) Next() time.Duration {
+       b.lock.Lock()
+       defer b.lock.Unlock()
+
+       // Double the delay each time
+       b.curBackoff += b.curBackoff

Review Comment:
   The backoff calculation logic is incorrect. When curBackoff is 0, adding it 
to itself (curBackoff += curBackoff) keeps it at 0, so it never increases. This 
means the backoff will always stay at minBackoff and never reach maxBackoff. 
The correct implementation should use multiplication (e.g., curBackoff *= 2) or 
match the DefaultBackoff implementation from backoff.go which uses curBackoff 
+= curBackoff but initializes to a non-zero value or handles the zero case 
differently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to