Copilot commented on code in PR #1422: URL: https://github.com/apache/pulsar-client-go/pull/1422#discussion_r2342923681
########## pulsar/producer_test.go: ########## @@ -2741,3 +2789,112 @@ func TestSendBufferRetainWhenConnectionStuck(t *testing.T) { b := conn.buffers[0] assert.Equal(t, int64(1), b.RefCnt(), "Expected buffer to have a reference count of 1 after sending") } + +func TestSendAsyncCouldTimeoutWhileReconnecting(t *testing.T) { + testSendAsyncCouldTimeoutWhileReconnecting(t, false) + testSendAsyncCouldTimeoutWhileReconnecting(t, true) +} + +func testSendAsyncCouldTimeoutWhileReconnecting(t *testing.T, isDisableBatching bool) { + t.Helper() + + req := testcontainers.ContainerRequest{ + Image: getPulsarTestImage(), + ExposedPorts: []string{"6650/tcp", "8080/tcp"}, + WaitingFor: wait.ForExposedPort(), + Cmd: []string{"bin/pulsar", "standalone", "-nfw"}, + } + c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + require.NoError(t, err, "Failed to start the pulsar container") + defer func() { + err := c.Terminate(context.Background()) + if err != nil { + t.Fatal("Failed to terminate the pulsar container", err) + } + }() + + endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar") + require.NoError(t, err, "Failed to get the pulsar endpoint") + + client, err := NewClient(ClientOptions{ + URL: endpoint, + ConnectionTimeout: 5 * time.Second, + OperationTimeout: 5 * time.Second, + }) + require.NoError(t, err) + defer client.Close() + + var testProducer Producer + require.Eventually(t, func() bool { + testProducer, err = client.CreateProducer(ProducerOptions{ + Topic: newTopicName(), + Schema: NewBytesSchema(nil), + SendTimeout: 3 * time.Second, + DisableBatching: isDisableBatching, + BatchingMaxMessages: 5, + MaxPendingMessages: 10, + }) + return err == nil + }, 30*time.Second, 1*time.Second) + + numMessages := 10 + // Send 10 messages synchronously + for i := 0; i < numMessages; i++ { + send, err := testProducer.Send(context.Background(), &ProducerMessage{Payload: []byte("test")}) + require.NoError(t, err) + require.NotNil(t, send) + } + + // stop pulsar server + timeout := 10 * time.Second + err = c.Stop(context.Background(), &timeout) + require.NoError(t, err) + + // Test the SendAsync could be timeout if the producer is reconnecting + + finalErr := make(chan error, 1) + testProducer.SendAsync(context.Background(), &ProducerMessage{ + Payload: []byte("test"), + }, func(_ MessageID, _ *ProducerMessage, err error) { + finalErr <- err + }) + select { + case <-time.After(10 * time.Second): + t.Fatal("test timeout") + case err = <-finalErr: + // should get a timeout error + require.ErrorIs(t, err, ErrSendTimeout) + } + close(finalErr) Review Comment: [nitpick] Closing the same channel variable name 'finalErr' twice in the same test function could be confusing. Consider using different variable names for the two test scenarios to improve code clarity. ########## pulsar/producer_partition.go: ########## @@ -385,31 +391,20 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error { "epoch": atomic.LoadUint64(&p.epoch), }).Info("Connected producer") - pendingItems := p.pendingQueue.ReadableSlice() - viewSize := len(pendingItems) - if viewSize > 0 { - p.log.Infof("Resending %d pending batches", viewSize) - lastViewItem := pendingItems[viewSize-1].(*pendingItem) - - // iterate at most pending items - for i := 0; i < viewSize; i++ { - item := p.pendingQueue.Poll() - if item == nil { - continue - } - pi := item.(*pendingItem) - // when resending pending batches, we update the sendAt timestamp to record the metric. - pi.Lock() - pi.sentAt = time.Now() - pi.Unlock() - pi.buffer.Retain() // Retain for writing to the connection - p.pendingQueue.Put(pi) - p._getConn().WriteData(pi.ctx, pi.buffer) + p.pendingQueue.Lock() + defer p.pendingQueue.Unlock() + p.pendingQueue.IterateUnsafe(func(item any) { + pi := item.(*pendingItem) + // when resending pending batches, we update the sendAt timestamp to record the metric. + pi.Lock() + pi.sentAt = time.Now() + pi.Unlock() + pi.buffer.Retain() // Retain for writing to the connection + p._getConn().WriteData(pi.ctx, pi.buffer) + }) - if pi == lastViewItem { - break - } - } + if !p.casProducerState(producerConnecting, producerReady) && p.isClosingOrClosed() { Review Comment: The logic combines two conditions with AND, but should return ErrProducerClosed if EITHER the CAS fails OR the producer is closing/closed. The current logic only returns the error when both conditions are true, which may not handle all error cases correctly. ```suggestion if !p.casProducerState(producerConnecting, producerReady) || p.isClosingOrClosed() { ``` ########## pulsar/producer_test.go: ########## @@ -2741,3 +2789,112 @@ func TestSendBufferRetainWhenConnectionStuck(t *testing.T) { b := conn.buffers[0] assert.Equal(t, int64(1), b.RefCnt(), "Expected buffer to have a reference count of 1 after sending") } + +func TestSendAsyncCouldTimeoutWhileReconnecting(t *testing.T) { + testSendAsyncCouldTimeoutWhileReconnecting(t, false) + testSendAsyncCouldTimeoutWhileReconnecting(t, true) +} + +func testSendAsyncCouldTimeoutWhileReconnecting(t *testing.T, isDisableBatching bool) { + t.Helper() + + req := testcontainers.ContainerRequest{ + Image: getPulsarTestImage(), + ExposedPorts: []string{"6650/tcp", "8080/tcp"}, + WaitingFor: wait.ForExposedPort(), + Cmd: []string{"bin/pulsar", "standalone", "-nfw"}, + } + c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + require.NoError(t, err, "Failed to start the pulsar container") + defer func() { + err := c.Terminate(context.Background()) + if err != nil { + t.Fatal("Failed to terminate the pulsar container", err) + } + }() + + endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar") + require.NoError(t, err, "Failed to get the pulsar endpoint") + + client, err := NewClient(ClientOptions{ + URL: endpoint, + ConnectionTimeout: 5 * time.Second, + OperationTimeout: 5 * time.Second, + }) + require.NoError(t, err) + defer client.Close() + + var testProducer Producer + require.Eventually(t, func() bool { + testProducer, err = client.CreateProducer(ProducerOptions{ + Topic: newTopicName(), + Schema: NewBytesSchema(nil), + SendTimeout: 3 * time.Second, + DisableBatching: isDisableBatching, + BatchingMaxMessages: 5, + MaxPendingMessages: 10, + }) + return err == nil + }, 30*time.Second, 1*time.Second) + + numMessages := 10 + // Send 10 messages synchronously + for i := 0; i < numMessages; i++ { + send, err := testProducer.Send(context.Background(), &ProducerMessage{Payload: []byte("test")}) + require.NoError(t, err) + require.NotNil(t, send) + } + + // stop pulsar server + timeout := 10 * time.Second + err = c.Stop(context.Background(), &timeout) + require.NoError(t, err) + + // Test the SendAsync could be timeout if the producer is reconnecting + + finalErr := make(chan error, 1) + testProducer.SendAsync(context.Background(), &ProducerMessage{ + Payload: []byte("test"), + }, func(_ MessageID, _ *ProducerMessage, err error) { + finalErr <- err + }) + select { + case <-time.After(10 * time.Second): + t.Fatal("test timeout") + case err = <-finalErr: + // should get a timeout error + require.ErrorIs(t, err, ErrSendTimeout) + } + close(finalErr) + + // Test that the SendAsync could be timeout if the pending queue is full + + go func() { + // Send 10 messages asynchronously to make the pending queue full + for i := 0; i < numMessages; i++ { + testProducer.SendAsync(context.Background(), &ProducerMessage{ + Payload: []byte("test"), + }, func(_ MessageID, _ *ProducerMessage, _ error) { + }) + } + }() + + time.Sleep(3 * time.Second) + finalErr = make(chan error, 1) + testProducer.SendAsync(context.Background(), &ProducerMessage{ + Payload: []byte("test"), + }, func(_ MessageID, _ *ProducerMessage, err error) { + finalErr <- err + }) + select { + case <-time.After(10 * time.Second): + t.Fatal("test timeout") + case err = <-finalErr: + // should get a timeout error + require.ErrorIs(t, err, ErrSendTimeout) + } + close(finalErr) Review Comment: [nitpick] Closing the same channel variable name 'finalErr' twice in the same test function could be confusing. Consider using different variable names for the two test scenarios to improve code clarity. ```suggestion finalErr2 := make(chan error, 1) testProducer.SendAsync(context.Background(), &ProducerMessage{ Payload: []byte("test"), }, func(_ MessageID, _ *ProducerMessage, err error) { finalErr2 <- err }) select { case <-time.After(10 * time.Second): t.Fatal("test timeout") case err = <-finalErr2: // should get a timeout error require.ErrorIs(t, err, ErrSendTimeout) } close(finalErr2) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org