maishivamhoo123 commented on issue #1427:
URL:
https://github.com/apache/pulsar-client-go/issues/1427#issuecomment-3615207474
Hi @danielchang-Z, @thomas-bousquet @jaysonsantos and @team
I was able to reproduce the issue, but only after modifying the example to
allow the producer to actually send messages in batches. In the current example
code, the producer is not batching messages:
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "topic-1",
})
To trigger the memory-reservation path in consumer_partition.go, I enabled
batching and added a short publish delay:
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "topic-1",
Batching: true,
BatchingMaxMessages: 10,
BatchingMaxPublishDelay: 2 * time.Millisecond,
})
After enabling batching, the consumer receives batched messages and the
problematic code path becomes reachable.
Inside consumer_partition.go, the memory reservation happens here:
pc.options.interceptors.BeforeConsume(ConsumerMessage{
Consumer: pc.parentConsumer,
Message: msg,
})
messages = append(messages, msg)
bytesReceived += msg.size()
if pc.options.autoReceiverQueueSize {
pc.client.memLimit.ForceReserveMemory(int64(msg.size()))
pc.incomingMessages.Add(1)
pc.markScaleIfNeed()
}
Please let me know if my understanding is correct.
If this approach is appropriate, I will prepare a PR .
If I am missing anything or should adjust the method, please guide me—I'd be
happy to refine the PR accordingly.
Thank you!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]