Copilot commented on code in PR #1462:
URL: https://github.com/apache/pulsar-client-go/pull/1462#discussion_r2745257498
##########
pulsar/producer_partition.go:
##########
@@ -1302,6 +1307,20 @@ func (p *partitionProducer) updateChunkInfo(sr
*sendRequest) error {
sr.totalChunks = int(math.Max(1,
math.Ceil(float64(sr.compressedSize)/float64(sr.payloadChunkSize))))
}
+ // Since a single message first generates all chunks and then acquires
the semaphore at once,
+ // we need to ensure the number of chunks generated by a single message
is less than the size of the pendingQueue.
Review Comment:
The comment here says the number of chunks must be "less than" the size of
the pending queue, but the actual condition allows `totalChunks ==
MaxPermits()` (it only rejects when `totalChunks > MaxPermits`). To avoid
confusion for future maintainers, please either update the wording to "less
than or equal to" or tighten the condition to match the current wording.
```suggestion
// we need to ensure the number of chunks generated by a single message
is less than or equal to the size of the pendingQueue.
```
##########
pulsar/message_chunking_test.go:
##########
@@ -530,6 +530,39 @@ func TestChunkBlockIfQueueFull(t *testing.T) {
assert.Error(t, err)
}
+func TestChunkBlockIfQueueFullWithoutTimeout(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
Review Comment:
`NewClient` allocates network resources but this test never calls
`client.Close()`, which can leave goroutines and connections open across tests.
To keep resource usage consistent with other tests in this file, consider
deferring `client.Close()` after successful client creation.
##########
pulsar/message_chunking_test.go:
##########
@@ -578,3 +611,106 @@ func sendSingleChunk(p Producer, uuid string, chunkID
int, totalChunks int) {
uint32(internal.MaxMessageSize),
)
}
+
+func TestSemaphoreState(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: adminURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ _producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ MaxPendingMessages: 1000,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, _producer)
+ defer _producer.Close()
+
+ producerImpl :=
_producer.(*producer).getProducer(0).(*partitionProducer)
+ producerSemaphore := producerImpl.publishSemaphore
+ assert.Equal(t, 1000, producerSemaphore.MaxPermits())
+ for i := 1; i < 500; i++ {
+ msg := createTestMessagePayload(1000)
+ ID, err := _producer.Send(context.Background(),
&ProducerMessage{
+ Payload: msg,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, ID)
+ }
+ assert.Equal(t, 1000, producerSemaphore.MaxPermits())
+ assert.Equal(t, 0, producerSemaphore.CurrentPermits())
+}
+
+func TestSemaphoreStateWithChunk(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: adminURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ _producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ EnableChunking: true,
+ ChunkMaxMessageSize: 10,
+ MaxPendingMessages: 1000,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, _producer)
+ defer _producer.Close()
+
+ // test semaphore is recycled properly when using chunk messages
+ producerImpl :=
_producer.(*producer).getProducer(0).(*partitionProducer)
+ producerSemaphore := producerImpl.publishSemaphore
+ assert.Equal(t, 1000, producerSemaphore.MaxPermits())
+ for i := 1; i < 500; i++ {
+ msg := createTestMessagePayload(1000)
+ ID, err := _producer.Send(context.Background(),
&ProducerMessage{
+ Payload: msg,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, ID)
+ }
+ assert.Equal(t, 1000, producerSemaphore.MaxPermits())
+ assert.Equal(t, 0, producerSemaphore.CurrentPermits())
+}
+
+func TestSemaphoreStateWithChunkAndTimeout(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
Review Comment:
Similarly to `TestSemaphoreState` and `TestSemaphoreStateWithChunk`, this
test creates a client but never calls `client.Close()`, which can leak
resources across tests. Please add a `defer client.Close()` after client
creation for consistency and to avoid lingering connections.
```suggestion
}
defer client.Close()
```
##########
pulsar/message_chunking_test.go:
##########
@@ -578,3 +611,106 @@ func sendSingleChunk(p Producer, uuid string, chunkID
int, totalChunks int) {
uint32(internal.MaxMessageSize),
)
}
+
+func TestSemaphoreState(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: adminURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ _producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ MaxPendingMessages: 1000,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, _producer)
+ defer _producer.Close()
+
+ producerImpl :=
_producer.(*producer).getProducer(0).(*partitionProducer)
+ producerSemaphore := producerImpl.publishSemaphore
+ assert.Equal(t, 1000, producerSemaphore.MaxPermits())
+ for i := 1; i < 500; i++ {
+ msg := createTestMessagePayload(1000)
+ ID, err := _producer.Send(context.Background(),
&ProducerMessage{
+ Payload: msg,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, ID)
+ }
+ assert.Equal(t, 1000, producerSemaphore.MaxPermits())
+ assert.Equal(t, 0, producerSemaphore.CurrentPermits())
+}
+
+func TestSemaphoreStateWithChunk(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: adminURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ _producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ EnableChunking: true,
+ ChunkMaxMessageSize: 10,
+ MaxPendingMessages: 1000,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, _producer)
+ defer _producer.Close()
+
+ // test semaphore is recycled properly when using chunk messages
+ producerImpl :=
_producer.(*producer).getProducer(0).(*partitionProducer)
+ producerSemaphore := producerImpl.publishSemaphore
+ assert.Equal(t, 1000, producerSemaphore.MaxPermits())
+ for i := 1; i < 500; i++ {
+ msg := createTestMessagePayload(1000)
+ ID, err := _producer.Send(context.Background(),
&ProducerMessage{
+ Payload: msg,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, ID)
+ }
+ assert.Equal(t, 1000, producerSemaphore.MaxPermits())
+ assert.Equal(t, 0, producerSemaphore.CurrentPermits())
+}
+
+func TestSemaphoreStateWithChunkAndTimeout(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ topic := newTopicName()
+
+ _producer, err := client.CreateProducer(ProducerOptions{
+ Name: "test",
+ Topic: topic,
+ EnableChunking: true,
+ DisableBatching: true,
+ MaxPendingMessages: 5,
+ ChunkMaxMessageSize: 10,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, _producer)
+ defer _producer.Close()
+ producerImpl :=
_producer.(*producer).getProducer(0).(*partitionProducer)
+ producerSemaphore := producerImpl.publishSemaphore
+ assert.Equal(t, 5, producerSemaphore.MaxPermits())
+
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ // Large messages will be split into 11 chunks, exceeding the length of
pending queue
Review Comment:
The comment says this payload will be split into 11 chunks, but with
`ChunkMaxMessageSize` set to 10 and a payload size of 100, the chunking logic
computes `totalChunks = ceil(100 / 10) = 10`. To prevent confusion when someone
adjusts these tests or debugging chunking behavior, please update the comment
to reflect the actual number of chunks (or rephrase it generically as "more
chunks than the pending queue capacity").
```suggestion
// Large messages will be split into more chunks than the length of the
pending queue
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]