danielchang-Z opened a new issue, #1427: URL: https://github.com/apache/pulsar-client-go/issues/1427
#### Expected behavior When the consumer receives messages, the memory reserved by `memLimit.ForceReserveMemory` should be fully released by `memLimit.ReleaseMemory`, ensuring the memory counter remains balanced and does not exceed the configured limit. #### Actual behavior In `consumer_partition.go`, the logic is inconsistent: * At [consumer_partition.go#L1181-L1436](https://github.com/apache/pulsar-client-go/blob/v0.16.0/pulsar/consumer_partition.go#L1181-L1436), `memLimit.ForceReserveMemory` uses the accumulated size of all messages. * At [consumer_partition.go#L1587-L1708](https://github.com/apache/pulsar-client-go/blob/v0.16.0/pulsar/consumer_partition.go#L1587-L1708), `memLimit.ReleaseMemory` is called with only the raw/original size. This leads to a mismatch. For example, if a batch of 3 messages (each 1 KiB) is received: * `memLimit.ForceReserveMemory` reserves **6 KiB**. * Later, `memLimit.ReleaseMemory` releases only **3 KiB**. As a result, the memory usage counter can grow beyond the actual limit. #### Steps to reproduce 1. Run a Pulsar Go consumer with `memLimit` configured. 2. Consume batched messages where the aggregated size differs from the raw size (e.g., multiple small messages). 3. Observe that memory usage grows inconsistently and eventually exceeds the expected limit. #### System configuration **Pulsar Go Client version**: v0.14.0, v0.15.1, v0.16.0 (and possibly others) #### Suggested fix Ensure that `memLimit.ReleaseMemory` is called with the same size used by `memLimit.ForceReserveMemory`. #### Relevant code - [consumer_partition.go#L1181-L1436](https://github.com/apache/pulsar-client-go/blob/v0.16.0/pulsar/consumer_partition.go#L1181-L1436) ```go func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error { ... var ( bytesReceived int skippedMessages int32 ) for i := 0; i < numMsgs; i++ { ... messages = append(messages, msg) bytesReceived += msg.size() if pc.options.autoReceiverQueueSize { pc.client.memLimit.ForceReserveMemory(int64(bytesReceived)) // should be int64(msg.size()) pc.incomingMessages.Add(int32(1)) pc.markScaleIfNeed() } } ... } ``` - [consumer_partition.go#L1587-L1708](https://github.com/apache/pulsar-client-go/blob/v0.16.0/pulsar/consumer_partition.go#L1587-L1708) ```go func (pc *partitionConsumer) dispatcher() { ... for { ... select { ... // if the messageCh is nil or the messageCh is full this will not be selected case messageCh <- nextMessage: // allow this message to be garbage collected messages[0] = nil messages = messages[1:] // for the zeroQueueConsumer, the permits controlled by itself if pc.options.receiverQueueSize > 0 { pc.availablePermits.inc() } if pc.options.autoReceiverQueueSize { pc.incomingMessages.Dec() pc.client.memLimit.ReleaseMemory(int64(nextMessageSize)) // this is right pc.expectMoreIncomingMessages() } ... } ... } ... } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
