BewareMyPower commented on issue #8846:
URL: https://github.com/apache/pulsar/issues/8846#issuecomment-739970491


   Firstly it's true, not like the producer's send operation, consumer's 
receive operation has no timeout. So calling `receiveAsync` infinitely without 
any limit while no messages could be received will make pending receives in 
memory growing infinitely.
   
   However, you can add the limit for the number of pending receives from a 
client side:
   
   ```c++
   #include <pulsar/Client.h>
   #include <mutex>
   #include <condition_variable>
   
   using namespace pulsar;
   
   struct PendingReceiveTracker {
       std::mutex mutex_;
       std::condition_variable cond_;
   
       size_t numPendingReceives_ = 0;
       const size_t maxNumPendingReceives_;
   
       PendingReceiveTracker(size_t maxNumPendingReceives) : 
maxNumPendingReceives_(maxNumPendingReceives) {}
   
       void beforeReceiveAsync() {
           std::unique_lock<std::mutex> lock(mutex_);
           while (numPendingReceives_ >= maxNumPendingReceives_) {
               // TODO: you can add other handler instead of wait here
               cond_.wait(lock);
           }
           numPendingReceives_++;
       }
   
       void completeReceiveAsync() {
           std::unique_lock<std::mutex> lock(mutex_);
           if (numPendingReceives_ == 0) {
               return;
           }
           numPendingReceives_--;
           cond_.notify_all();
       }
   };
   
   int main() {
       Client client("pulsar://localhost:6650");
       Consumer consumer;
       client.subscribe("my-topic", "consumer-1", consumer);  // ignore the 
returned value check here
   
       PendingReceiveTracker tracker(1000);
       for (int i = 0; i < 100000; i++) {
           // When the pending receives achieve 1000, this method blocks here 
until any message is received.
           // You can also change the strategy, see the TODO in 
beforeReceiveAsync()
           tracker.beforeReceiveAsync();
           consumer.receiveAsync([&tracker](Result result, const Message& msg) {
               if (result == ResultOk) {
                   tracker.completeReceiveAsync();
                   // TODO: process `msg`
               } else {
                   // TODO: handle error
               }
           });
           // do other things here...
       }
   
       client.close();
   }
   ```
   
   As for a timeout limit, it's a little more complicated that a timer is 
needed.
   
   Let's go back to the timeout case. Send timeout means some specific 
messages' associated responses are not received during the timeout. Users can 
choose to reproduce **the same messages** again if they don't care message 
duplication.
   
   However, unlike the send timeout, receive timeout is not related to any 
message. So when you encounter the timeout with N pending receives, what will 
you do? Clear these pending receives? OK, then you'll add more pending receives 
again.  Clear and then re-add the pending receives is not much different with 
make these receives continue pending. The only behavior that make much 
difference is stop receiving, like triggering an alarm.
   
   A simpler way is just calling `receive()` with a timeout. IMO, 
`receiveAsync()` doesn't has any more significant benefit than 
`receive(Message& msg, int timeout)` except waiting for multiple messages at 
the same time. C++ client doesn't provide something like Java's 
`CompletableFuture`, so you need to play tricks with the callback.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to