RobertIndie commented on code in PR #1345:
URL: https://github.com/apache/pulsar-client-go/pull/1345#discussion_r2048704853
##########
pulsar/producer_test.go:
##########
@@ -2605,3 +2605,82 @@ func TestSelectConnectionForSameProducer(t *testing.T) {
client.Close()
}
+
+func TestProducerKeepReconnectingAndThenCallSendAsync(t *testing.T) {
+ testProducerKeepReconnectingAndThenCallSendAsync(t, false)
+ testProducerKeepReconnectingAndThenCallSendAsync(t, true)
+}
+
+func testProducerKeepReconnectingAndThenCallSendAsync(t *testing.T,
isEnabledBatching bool) {
+ t.Helper()
+
+ req := testcontainers.ContainerRequest{
+ Image: getPulsarTestImage(),
+ ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+ WaitingFor: wait.ForExposedPort(),
+ Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
+ }
+ c, err := testcontainers.GenericContainer(context.Background(),
testcontainers.GenericContainerRequest{
+ ContainerRequest: req,
+ Started: true,
+ })
+ require.NoError(t, err, "Failed to start the pulsar container")
+ defer c.Terminate(context.Background())
+
+ endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+ require.NoError(t, err, "Failed to get the pulsar endpoint")
+
+ client, err := NewClient(ClientOptions{
+ URL: endpoint,
+ ConnectionTimeout: 5 * time.Second,
+ OperationTimeout: 5 * time.Second,
+ })
+ require.NoError(t, err)
+ defer client.Close()
+
+ var testProducer Producer
+ require.Eventually(t, func() bool {
+ testProducer, err = client.CreateProducer(ProducerOptions{
+ Topic: newTopicName(),
+ Schema: NewBytesSchema(nil),
+ SendTimeout: 3 * time.Second,
+ DisableBatching: isEnabledBatching,
Review Comment:
```suggestion
DisableBatching: isDisableBatching,
```
##########
pulsar/producer_partition.go:
##########
@@ -898,16 +911,17 @@ func (p *partitionProducer) writeData(buffer
internal.Buffer, sequenceID uint64,
default:
now := time.Now()
ctx, cancel := context.WithCancel(context.Background())
- p.pendingQueue.Put(&pendingItem{
+ item := pendingItem{
ctx: ctx,
cancel: cancel,
createdAt: now,
sentAt: now,
buffer: buffer,
sequenceID: sequenceID,
sendRequests: callbacks,
- })
- p._getConn().WriteData(ctx, buffer)
+ }
+ p.pendingQueue.Put(&item)
+ p.writeChan <- &item
Review Comment:
The `writeChan` is a zero buffer channel. So if the client is reconnecting,
the goroutine in line 564 will be stuck here. It couldn't timeout the message
in the `dataChan`.
##########
pulsar/producer_test.go:
##########
@@ -2605,3 +2605,82 @@ func TestSelectConnectionForSameProducer(t *testing.T) {
client.Close()
}
+
+func TestProducerKeepReconnectingAndThenCallSendAsync(t *testing.T) {
+ testProducerKeepReconnectingAndThenCallSendAsync(t, false)
+ testProducerKeepReconnectingAndThenCallSendAsync(t, true)
+}
+
+func testProducerKeepReconnectingAndThenCallSendAsync(t *testing.T,
isEnabledBatching bool) {
+ t.Helper()
+
+ req := testcontainers.ContainerRequest{
+ Image: getPulsarTestImage(),
+ ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+ WaitingFor: wait.ForExposedPort(),
+ Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
+ }
+ c, err := testcontainers.GenericContainer(context.Background(),
testcontainers.GenericContainerRequest{
+ ContainerRequest: req,
+ Started: true,
+ })
+ require.NoError(t, err, "Failed to start the pulsar container")
+ defer c.Terminate(context.Background())
+
+ endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+ require.NoError(t, err, "Failed to get the pulsar endpoint")
+
+ client, err := NewClient(ClientOptions{
+ URL: endpoint,
+ ConnectionTimeout: 5 * time.Second,
+ OperationTimeout: 5 * time.Second,
+ })
+ require.NoError(t, err)
+ defer client.Close()
+
+ var testProducer Producer
+ require.Eventually(t, func() bool {
+ testProducer, err = client.CreateProducer(ProducerOptions{
+ Topic: newTopicName(),
+ Schema: NewBytesSchema(nil),
+ SendTimeout: 3 * time.Second,
+ DisableBatching: isEnabledBatching,
+ })
+ return err == nil
+ }, 30*time.Second, 1*time.Second)
+
+ // send a message
+ errChan := make(chan error)
+ defer close(errChan)
+
+ testProducer.SendAsync(context.Background(), &ProducerMessage{
+ Payload: []byte("test"),
+ }, func(_ MessageID, _ *ProducerMessage, err error) {
+ errChan <- err
+ })
+ select {
+ case <-time.After(10 * time.Second):
+ t.Fatal("test timeout")
+ case err := <-errChan:
+ require.NoError(t, err)
+ }
+
+ // stop pulsar server
+ timeout := 10 * time.Second
+ err = c.Stop(context.Background(), &timeout)
+ require.NoError(t, err)
+
+ // send again
+ testProducer.SendAsync(context.Background(), &ProducerMessage{
+ Payload: []byte("test"),
+ }, func(_ MessageID, _ *ProducerMessage, err error) {
+ errChan <- err
+ })
Review Comment:
```suggestion
for i := 0; i < 10; i++ {
testProducer.SendAsync(context.Background(), &ProducerMessage{
Payload: []byte("test"),
}, func(id MessageID, producerMessage *ProducerMessage, err
error) {
fmt.Println("send async callback", id, producerMessage,
err)
})
}
// send again
testProducer.SendAsync(context.Background(), &ProducerMessage{
Payload: []byte("test"),
}, func(_ MessageID, _ *ProducerMessage, err error) {
errChan <- err
})
```
You can eaisly reproduce the issue by sending some messages before this
step. And also set the `BatchingMaxMessages` of the producer to a smaller value
like `5`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]