danielchang-Z opened a new issue, #1427:
URL: https://github.com/apache/pulsar-client-go/issues/1427

   #### Expected behavior
   
   When the consumer receives messages, the memory reserved by 
`memLimit.ForceReserveMemory` should be fully released by 
`memLimit.ReleaseMemory`, ensuring the memory counter remains balanced and does 
not exceed the configured limit.
   
   #### Actual behavior
   
   In `consumer_partition.go`, the logic is inconsistent:
   
   * At 
[consumer_partition.go#L1181-L1436](https://github.com/apache/pulsar-client-go/blob/v0.16.0/pulsar/consumer_partition.go#L1181-L1436),
 `memLimit.ForceReserveMemory` uses the accumulated size of all messages.
   * At 
[consumer_partition.go#L1587-L1708](https://github.com/apache/pulsar-client-go/blob/v0.16.0/pulsar/consumer_partition.go#L1587-L1708),
 `memLimit.ReleaseMemory` is called with only the raw/original size.
   
   This leads to a mismatch. For example, if a batch of 3 messages (each 1 KiB) 
is received:
   
   * `memLimit.ForceReserveMemory` reserves **6 KiB**.
   * Later, `memLimit.ReleaseMemory` releases only **3 KiB**.
   
   As a result, the memory usage counter can grow beyond the actual limit.
   
   #### Steps to reproduce
   
   1. Run a Pulsar Go consumer with `memLimit` configured.
   2. Consume batched messages where the aggregated size differs from the raw 
size (e.g., multiple small messages).
   3. Observe that memory usage grows inconsistently and eventually exceeds the 
expected limit.
   
   #### System configuration
   
   **Pulsar Go Client version**: v0.14.0, v0.15.1, v0.16.0 (and possibly others)
   
   #### Suggested fix
   
   Ensure that `memLimit.ReleaseMemory` is called with the same size used by 
`memLimit.ForceReserveMemory`.
   
   #### Relevant code
   
   - 
[consumer_partition.go#L1181-L1436](https://github.com/apache/pulsar-client-go/blob/v0.16.0/pulsar/consumer_partition.go#L1181-L1436)
   
   ```go
   func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, 
headersAndPayload internal.Buffer) error {
       ...
        var (
                bytesReceived   int
                skippedMessages int32
        )
        for i := 0; i < numMsgs; i++ {
           ...
                messages = append(messages, msg)
                bytesReceived += msg.size()
                if pc.options.autoReceiverQueueSize {
                        
pc.client.memLimit.ForceReserveMemory(int64(bytesReceived)) // should be 
int64(msg.size())
                        pc.incomingMessages.Add(int32(1))
                        pc.markScaleIfNeed()
                }
       }
       ...
   }
   ```
   
   - 
[consumer_partition.go#L1587-L1708](https://github.com/apache/pulsar-client-go/blob/v0.16.0/pulsar/consumer_partition.go#L1587-L1708)
   
   ```go
   func (pc *partitionConsumer) dispatcher() {
        ...
        for {
                ...
                select {
                ...
                // if the messageCh is nil or the messageCh is full this will 
not be selected
                case messageCh <- nextMessage:
                        // allow this message to be garbage collected
                        messages[0] = nil
                        messages = messages[1:]
   
                        // for the zeroQueueConsumer, the permits controlled by 
itself
                        if pc.options.receiverQueueSize > 0 {
                                pc.availablePermits.inc()
                        }
   
                        if pc.options.autoReceiverQueueSize {
                                pc.incomingMessages.Dec()
                                
pc.client.memLimit.ReleaseMemory(int64(nextMessageSize)) // this is right
                                pc.expectMoreIncomingMessages()
                        }
                ...
                }
                ...
        }
        ...
   }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to