BewareMyPower commented on issue #8846:
URL: https://github.com/apache/pulsar/issues/8846#issuecomment-739970491
Firstly it's true, not like the producer's send operation, consumer's
receive operation has no timeout. So calling `receiveAsync` infinitely without
any limit while no messages could be received will make pending receives in
memory growing infinitely.
However, you can add the limit for the number of pending receives from a
client side:
```c++
#include <pulsar/Client.h>
#include <mutex>
#include <condition_variable>
using namespace pulsar;
struct PendingReceiveTracker {
std::mutex mutex_;
std::condition_variable cond_;
size_t numPendingReceives_ = 0;
const size_t maxNumPendingReceives_;
PendingReceiveTracker(size_t maxNumPendingReceives) :
maxNumPendingReceives_(maxNumPendingReceives) {}
void beforeReceiveAsync() {
std::unique_lock<std::mutex> lock(mutex_);
while (numPendingReceives_ >= maxNumPendingReceives_) {
// TODO: you can add other handler instead of wait here
cond_.wait(lock);
}
numPendingReceives_++;
}
void completeReceiveAsync() {
std::unique_lock<std::mutex> lock(mutex_);
if (numPendingReceives_ == 0) {
return;
}
numPendingReceives_--;
cond_.notify_all();
}
};
int main() {
Client client("pulsar://localhost:6650");
Consumer consumer;
client.subscribe("my-topic", "consumer-1", consumer); // ignore the
returned value check here
PendingReceiveTracker tracker(1000);
for (int i = 0; i < 100000; i++) {
// When the pending receives achieve 1000, this method blocks here
until any message is received.
// You can also change the strategy, see the TODO in
beforeReceiveAsync()
tracker.beforeReceiveAsync();
consumer.receiveAsync([&tracker](Result result, const Message& msg) {
if (result == ResultOk) {
tracker.completeReceiveAsync();
// TODO: process `msg`
} else {
// TODO: handle error
}
});
// do other things here...
}
client.close();
}
```
As for a timeout limit, it's a little more complicated that a timer is
needed.
Let's go back to the timeout case. Send timeout means some specific
messages' associated responses are not received during the timeout. Users can
choose to reproduce **the same messages** again if they don't care message
duplication.
However, unlike the send timeout, receive timeout is not related to any
message. So when you encounter the timeout with N pending receives, what will
you do? Clear these pending receives? OK, then you'll add more pending receives
again. Clear and then re-add the pending receives is not much different with
make these receives continue pending. The only behavior that make much
difference is stop receiving, like triggering an alarm.
A simpler way is just calling `receive()` with a timeout. IMO,
`receiveAsync()` doesn't has any more significant benefit than
`receive(Message& msg, int timeout)` except waiting for multiple messages at
the same time. C++ client doesn't provide something like Java's
`CompletableFuture`, so you need to play tricks with the callback.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]