Copilot commented on code in PR #1422:
URL: https://github.com/apache/pulsar-client-go/pull/1422#discussion_r2342923681


##########
pulsar/producer_test.go:
##########
@@ -2741,3 +2789,112 @@ func TestSendBufferRetainWhenConnectionStuck(t 
*testing.T) {
        b := conn.buffers[0]
        assert.Equal(t, int64(1), b.RefCnt(), "Expected buffer to have a 
reference count of 1 after sending")
 }
+
+func TestSendAsyncCouldTimeoutWhileReconnecting(t *testing.T) {
+       testSendAsyncCouldTimeoutWhileReconnecting(t, false)
+       testSendAsyncCouldTimeoutWhileReconnecting(t, true)
+}
+
+func testSendAsyncCouldTimeoutWhileReconnecting(t *testing.T, 
isDisableBatching bool) {
+       t.Helper()
+
+       req := testcontainers.ContainerRequest{
+               Image:        getPulsarTestImage(),
+               ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+               WaitingFor:   wait.ForExposedPort(),
+               Cmd:          []string{"bin/pulsar", "standalone", "-nfw"},
+       }
+       c, err := testcontainers.GenericContainer(context.Background(), 
testcontainers.GenericContainerRequest{
+               ContainerRequest: req,
+               Started:          true,
+       })
+       require.NoError(t, err, "Failed to start the pulsar container")
+       defer func() {
+               err := c.Terminate(context.Background())
+               if err != nil {
+                       t.Fatal("Failed to terminate the pulsar container", err)
+               }
+       }()
+
+       endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+       require.NoError(t, err, "Failed to get the pulsar endpoint")
+
+       client, err := NewClient(ClientOptions{
+               URL:               endpoint,
+               ConnectionTimeout: 5 * time.Second,
+               OperationTimeout:  5 * time.Second,
+       })
+       require.NoError(t, err)
+       defer client.Close()
+
+       var testProducer Producer
+       require.Eventually(t, func() bool {
+               testProducer, err = client.CreateProducer(ProducerOptions{
+                       Topic:               newTopicName(),
+                       Schema:              NewBytesSchema(nil),
+                       SendTimeout:         3 * time.Second,
+                       DisableBatching:     isDisableBatching,
+                       BatchingMaxMessages: 5,
+                       MaxPendingMessages:  10,
+               })
+               return err == nil
+       }, 30*time.Second, 1*time.Second)
+
+       numMessages := 10
+       // Send 10 messages synchronously
+       for i := 0; i < numMessages; i++ {
+               send, err := testProducer.Send(context.Background(), 
&ProducerMessage{Payload: []byte("test")})
+               require.NoError(t, err)
+               require.NotNil(t, send)
+       }
+
+       // stop pulsar server
+       timeout := 10 * time.Second
+       err = c.Stop(context.Background(), &timeout)
+       require.NoError(t, err)
+
+       // Test the SendAsync could be timeout if the producer is reconnecting
+
+       finalErr := make(chan error, 1)
+       testProducer.SendAsync(context.Background(), &ProducerMessage{
+               Payload: []byte("test"),
+       }, func(_ MessageID, _ *ProducerMessage, err error) {
+               finalErr <- err
+       })
+       select {
+       case <-time.After(10 * time.Second):
+               t.Fatal("test timeout")
+       case err = <-finalErr:
+               // should get a timeout error
+               require.ErrorIs(t, err, ErrSendTimeout)
+       }
+       close(finalErr)

Review Comment:
   [nitpick] Closing the same channel variable name 'finalErr' twice in the 
same test function could be confusing. Consider using different variable names 
for the two test scenarios to improve code clarity.



##########
pulsar/producer_partition.go:
##########
@@ -385,31 +391,20 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL 
string) error {
                "epoch": atomic.LoadUint64(&p.epoch),
        }).Info("Connected producer")
 
-       pendingItems := p.pendingQueue.ReadableSlice()
-       viewSize := len(pendingItems)
-       if viewSize > 0 {
-               p.log.Infof("Resending %d pending batches", viewSize)
-               lastViewItem := pendingItems[viewSize-1].(*pendingItem)
-
-               // iterate at most pending items
-               for i := 0; i < viewSize; i++ {
-                       item := p.pendingQueue.Poll()
-                       if item == nil {
-                               continue
-                       }
-                       pi := item.(*pendingItem)
-                       // when resending pending batches, we update the sendAt 
timestamp to record the metric.
-                       pi.Lock()
-                       pi.sentAt = time.Now()
-                       pi.Unlock()
-                       pi.buffer.Retain() // Retain for writing to the 
connection
-                       p.pendingQueue.Put(pi)
-                       p._getConn().WriteData(pi.ctx, pi.buffer)
+       p.pendingQueue.Lock()
+       defer p.pendingQueue.Unlock()
+       p.pendingQueue.IterateUnsafe(func(item any) {
+               pi := item.(*pendingItem)
+               // when resending pending batches, we update the sendAt 
timestamp to record the metric.
+               pi.Lock()
+               pi.sentAt = time.Now()
+               pi.Unlock()
+               pi.buffer.Retain() // Retain for writing to the connection
+               p._getConn().WriteData(pi.ctx, pi.buffer)
+       })
 
-                       if pi == lastViewItem {
-                               break
-                       }
-               }
+       if !p.casProducerState(producerConnecting, producerReady) && 
p.isClosingOrClosed() {

Review Comment:
   The logic combines two conditions with AND, but should return 
ErrProducerClosed if EITHER the CAS fails OR the producer is closing/closed. 
The current logic only returns the error when both conditions are true, which 
may not handle all error cases correctly.
   ```suggestion
        if !p.casProducerState(producerConnecting, producerReady) || 
p.isClosingOrClosed() {
   ```



##########
pulsar/producer_test.go:
##########
@@ -2741,3 +2789,112 @@ func TestSendBufferRetainWhenConnectionStuck(t 
*testing.T) {
        b := conn.buffers[0]
        assert.Equal(t, int64(1), b.RefCnt(), "Expected buffer to have a 
reference count of 1 after sending")
 }
+
+func TestSendAsyncCouldTimeoutWhileReconnecting(t *testing.T) {
+       testSendAsyncCouldTimeoutWhileReconnecting(t, false)
+       testSendAsyncCouldTimeoutWhileReconnecting(t, true)
+}
+
+func testSendAsyncCouldTimeoutWhileReconnecting(t *testing.T, 
isDisableBatching bool) {
+       t.Helper()
+
+       req := testcontainers.ContainerRequest{
+               Image:        getPulsarTestImage(),
+               ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+               WaitingFor:   wait.ForExposedPort(),
+               Cmd:          []string{"bin/pulsar", "standalone", "-nfw"},
+       }
+       c, err := testcontainers.GenericContainer(context.Background(), 
testcontainers.GenericContainerRequest{
+               ContainerRequest: req,
+               Started:          true,
+       })
+       require.NoError(t, err, "Failed to start the pulsar container")
+       defer func() {
+               err := c.Terminate(context.Background())
+               if err != nil {
+                       t.Fatal("Failed to terminate the pulsar container", err)
+               }
+       }()
+
+       endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+       require.NoError(t, err, "Failed to get the pulsar endpoint")
+
+       client, err := NewClient(ClientOptions{
+               URL:               endpoint,
+               ConnectionTimeout: 5 * time.Second,
+               OperationTimeout:  5 * time.Second,
+       })
+       require.NoError(t, err)
+       defer client.Close()
+
+       var testProducer Producer
+       require.Eventually(t, func() bool {
+               testProducer, err = client.CreateProducer(ProducerOptions{
+                       Topic:               newTopicName(),
+                       Schema:              NewBytesSchema(nil),
+                       SendTimeout:         3 * time.Second,
+                       DisableBatching:     isDisableBatching,
+                       BatchingMaxMessages: 5,
+                       MaxPendingMessages:  10,
+               })
+               return err == nil
+       }, 30*time.Second, 1*time.Second)
+
+       numMessages := 10
+       // Send 10 messages synchronously
+       for i := 0; i < numMessages; i++ {
+               send, err := testProducer.Send(context.Background(), 
&ProducerMessage{Payload: []byte("test")})
+               require.NoError(t, err)
+               require.NotNil(t, send)
+       }
+
+       // stop pulsar server
+       timeout := 10 * time.Second
+       err = c.Stop(context.Background(), &timeout)
+       require.NoError(t, err)
+
+       // Test the SendAsync could be timeout if the producer is reconnecting
+
+       finalErr := make(chan error, 1)
+       testProducer.SendAsync(context.Background(), &ProducerMessage{
+               Payload: []byte("test"),
+       }, func(_ MessageID, _ *ProducerMessage, err error) {
+               finalErr <- err
+       })
+       select {
+       case <-time.After(10 * time.Second):
+               t.Fatal("test timeout")
+       case err = <-finalErr:
+               // should get a timeout error
+               require.ErrorIs(t, err, ErrSendTimeout)
+       }
+       close(finalErr)
+
+       // Test that the SendAsync could be timeout if the pending queue is full
+
+       go func() {
+               // Send 10 messages asynchronously to make the pending queue 
full
+               for i := 0; i < numMessages; i++ {
+                       testProducer.SendAsync(context.Background(), 
&ProducerMessage{
+                               Payload: []byte("test"),
+                       }, func(_ MessageID, _ *ProducerMessage, _ error) {
+                       })
+               }
+       }()
+
+       time.Sleep(3 * time.Second)
+       finalErr = make(chan error, 1)
+       testProducer.SendAsync(context.Background(), &ProducerMessage{
+               Payload: []byte("test"),
+       }, func(_ MessageID, _ *ProducerMessage, err error) {
+               finalErr <- err
+       })
+       select {
+       case <-time.After(10 * time.Second):
+               t.Fatal("test timeout")
+       case err = <-finalErr:
+               // should get a timeout error
+               require.ErrorIs(t, err, ErrSendTimeout)
+       }
+       close(finalErr)

Review Comment:
   [nitpick] Closing the same channel variable name 'finalErr' twice in the 
same test function could be confusing. Consider using different variable names 
for the two test scenarios to improve code clarity.
   ```suggestion
        finalErr2 := make(chan error, 1)
        testProducer.SendAsync(context.Background(), &ProducerMessage{
                Payload: []byte("test"),
        }, func(_ MessageID, _ *ProducerMessage, err error) {
                finalErr2 <- err
        })
        select {
        case <-time.After(10 * time.Second):
                t.Fatal("test timeout")
        case err = <-finalErr2:
                // should get a timeout error
                require.ErrorIs(t, err, ErrSendTimeout)
        }
        close(finalErr2)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to