RobertIndie commented on code in PR #1345:
URL: https://github.com/apache/pulsar-client-go/pull/1345#discussion_r2048704853


##########
pulsar/producer_test.go:
##########
@@ -2605,3 +2605,82 @@ func TestSelectConnectionForSameProducer(t *testing.T) {
 
        client.Close()
 }
+
+func TestProducerKeepReconnectingAndThenCallSendAsync(t *testing.T) {
+       testProducerKeepReconnectingAndThenCallSendAsync(t, false)
+       testProducerKeepReconnectingAndThenCallSendAsync(t, true)
+}
+
+func testProducerKeepReconnectingAndThenCallSendAsync(t *testing.T, 
isEnabledBatching bool) {
+       t.Helper()
+
+       req := testcontainers.ContainerRequest{
+               Image:        getPulsarTestImage(),
+               ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+               WaitingFor:   wait.ForExposedPort(),
+               Cmd:          []string{"bin/pulsar", "standalone", "-nfw"},
+       }
+       c, err := testcontainers.GenericContainer(context.Background(), 
testcontainers.GenericContainerRequest{
+               ContainerRequest: req,
+               Started:          true,
+       })
+       require.NoError(t, err, "Failed to start the pulsar container")
+       defer c.Terminate(context.Background())
+
+       endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+       require.NoError(t, err, "Failed to get the pulsar endpoint")
+
+       client, err := NewClient(ClientOptions{
+               URL:               endpoint,
+               ConnectionTimeout: 5 * time.Second,
+               OperationTimeout:  5 * time.Second,
+       })
+       require.NoError(t, err)
+       defer client.Close()
+
+       var testProducer Producer
+       require.Eventually(t, func() bool {
+               testProducer, err = client.CreateProducer(ProducerOptions{
+                       Topic:           newTopicName(),
+                       Schema:          NewBytesSchema(nil),
+                       SendTimeout:     3 * time.Second,
+                       DisableBatching: isEnabledBatching,

Review Comment:
   ```suggestion
                        DisableBatching: isDisableBatching,
   ```
   
   



##########
pulsar/producer_partition.go:
##########
@@ -898,16 +911,17 @@ func (p *partitionProducer) writeData(buffer 
internal.Buffer, sequenceID uint64,
        default:
                now := time.Now()
                ctx, cancel := context.WithCancel(context.Background())
-               p.pendingQueue.Put(&pendingItem{
+               item := pendingItem{
                        ctx:          ctx,
                        cancel:       cancel,
                        createdAt:    now,
                        sentAt:       now,
                        buffer:       buffer,
                        sequenceID:   sequenceID,
                        sendRequests: callbacks,
-               })
-               p._getConn().WriteData(ctx, buffer)
+               }
+               p.pendingQueue.Put(&item)
+               p.writeChan <- &item

Review Comment:
   The `writeChan` is a zero buffer channel. So if the client is reconnecting, 
the goroutine in line 564 will be stuck here. It couldn't timeout the message 
in the `dataChan`.



##########
pulsar/producer_test.go:
##########
@@ -2605,3 +2605,82 @@ func TestSelectConnectionForSameProducer(t *testing.T) {
 
        client.Close()
 }
+
+func TestProducerKeepReconnectingAndThenCallSendAsync(t *testing.T) {
+       testProducerKeepReconnectingAndThenCallSendAsync(t, false)
+       testProducerKeepReconnectingAndThenCallSendAsync(t, true)
+}
+
+func testProducerKeepReconnectingAndThenCallSendAsync(t *testing.T, 
isEnabledBatching bool) {
+       t.Helper()
+
+       req := testcontainers.ContainerRequest{
+               Image:        getPulsarTestImage(),
+               ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+               WaitingFor:   wait.ForExposedPort(),
+               Cmd:          []string{"bin/pulsar", "standalone", "-nfw"},
+       }
+       c, err := testcontainers.GenericContainer(context.Background(), 
testcontainers.GenericContainerRequest{
+               ContainerRequest: req,
+               Started:          true,
+       })
+       require.NoError(t, err, "Failed to start the pulsar container")
+       defer c.Terminate(context.Background())
+
+       endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+       require.NoError(t, err, "Failed to get the pulsar endpoint")
+
+       client, err := NewClient(ClientOptions{
+               URL:               endpoint,
+               ConnectionTimeout: 5 * time.Second,
+               OperationTimeout:  5 * time.Second,
+       })
+       require.NoError(t, err)
+       defer client.Close()
+
+       var testProducer Producer
+       require.Eventually(t, func() bool {
+               testProducer, err = client.CreateProducer(ProducerOptions{
+                       Topic:           newTopicName(),
+                       Schema:          NewBytesSchema(nil),
+                       SendTimeout:     3 * time.Second,
+                       DisableBatching: isEnabledBatching,
+               })
+               return err == nil
+       }, 30*time.Second, 1*time.Second)
+
+       // send a message
+       errChan := make(chan error)
+       defer close(errChan)
+
+       testProducer.SendAsync(context.Background(), &ProducerMessage{
+               Payload: []byte("test"),
+       }, func(_ MessageID, _ *ProducerMessage, err error) {
+               errChan <- err
+       })
+       select {
+       case <-time.After(10 * time.Second):
+               t.Fatal("test timeout")
+       case err := <-errChan:
+               require.NoError(t, err)
+       }
+
+       // stop pulsar server
+       timeout := 10 * time.Second
+       err = c.Stop(context.Background(), &timeout)
+       require.NoError(t, err)
+
+       // send again
+       testProducer.SendAsync(context.Background(), &ProducerMessage{
+               Payload: []byte("test"),
+       }, func(_ MessageID, _ *ProducerMessage, err error) {
+               errChan <- err
+       })

Review Comment:
   ```suggestion
        for i := 0; i < 10; i++ {
                testProducer.SendAsync(context.Background(), &ProducerMessage{
                        Payload: []byte("test"),
                }, func(id MessageID, producerMessage *ProducerMessage, err 
error) {
                        fmt.Println("send async callback", id, producerMessage, 
err)
                })
        }
        // send again
        testProducer.SendAsync(context.Background(), &ProducerMessage{
                Payload: []byte("test"),
        }, func(_ MessageID, _ *ProducerMessage, err error) {
                errChan <- err
        })
   ```
   
   You can eaisly reproduce the issue by sending some messages before this 
step. And also set the `BatchingMaxMessages` of the producer to a smaller value 
like `5`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to