Hello,


We are currently facing some issues with the drain function in Proton C++
0.22.0 .



We are trying to do synchronous fetch with timeout by doing a drain after
the timeout expires.

However after the first drain request is done, the drain flag is not reset
to false causing all the following actions to also request a drain from the
sender.



We couldn’t find a way to change the drain flag in the
on_receiver_drain_finish function. Should I create a jira issue ?



You can find attached an example with a simple receiver trying to consume
from a Qpid Java Broker 7.0.3 with no messages, and the log file with the
 PN_TRACE_FRM activated.



Regards,

Ali
#include <proton/connection.hpp>
#include <proton/container.hpp>
#include <proton/work_queue.hpp>
#include <proton/message.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/connection_options.hpp>
#include <proton/receiver_options.hpp>
#include <proton/delivery.hpp>
#include <proton/transport.hpp>
#include <proton/error.hpp>
#include <proton/error_condition.hpp>

#include <future>

#include <iostream>
#include <string>
#include <chrono>
#include <thread>

class ReceiverClient : public proton::messaging_handler {
public:
   ReceiverClient(const std::string &s, std::promise<void> &promise) : 
m_url(s), m_promise(std::move(promise)) {
   }

   void add_credit(){
      m_receiver.work_queue().add([=](){
         m_receiver.add_credit(1);
      });
   }

   void drain(){
      m_receiver.work_queue().add([=](){
         m_receiver.drain();
      });
   }

   void close(){
      m_receiver.work_queue().add([=](){
         m_receiver.connection().close();
      });
   }

   void on_container_start(proton::container &c) override {
      std::cout << "ReceiverClient on_container_start" << std::endl;
      m_receiver = c.open_receiver(m_url, 
proton::receiver_options().credit_window(0));
   }

   void on_connection_open(proton::connection& c) {
      std::cout << "ReceiverClient on_connection_open" << std::endl;
      c.open();
   }

   void on_session_open(proton::session& s) {
      std::cout << "ReceiverClient on_session_open" << std::endl;
      std::this_thread::sleep_for(std::chrono::milliseconds(100));
      s.open();
   }

   void on_receiver_open(proton::receiver& r) {
      std::cout << "ReceiverClient on_receiver_open" << std::endl;
      r.open();
      m_promise.set_value();
   }

   void on_receiver_drain_finish(proton::receiver&) {
      std::cout << "ReceiverClient on_receiver_drain_finish" << std::endl;
   }

   void on_message(proton::delivery& , proton::message&) override {
      std::cout << "ReceiverClient::on_message" << std::endl;
   }

   void on_error(const proton::error_condition& error) override {
      std::cout << "ReceiverClient: on_error: " << error.what() << std::endl;
   }

private:
   std::string m_url;
   std::promise<void> m_promise;
   proton::receiver m_receiver;
};

int main(){
   std::promise<void> promise;
   auto future = promise.get_future();

   auto receiver_client = 
ReceiverClient("localhost:5672/myQueue",std::move(promise));
   std::thread ReceiverClientThread([&receiver_client]{
      try {
         proton::container(receiver_client).run();
      }
      catch (const std::exception& cause) {
         std::cout << "server_receiver threw: " << cause.what() << std::endl;
      }
   });

   future.get();

   std::cout << "first receive" << std::endl;
   receiver_client.add_credit();
   std::this_thread::sleep_for(std::chrono::milliseconds(300));

   std::cout << "drain" << std::endl;
   receiver_client.drain();
   std::this_thread::sleep_for(std::chrono::milliseconds(300));

   std::cout << "second receive" << std::endl;
   receiver_client.add_credit();
   std::this_thread::sleep_for(std::chrono::milliseconds(300));

   std::cout << "close" << std::endl;
   receiver_client.close();
   std::this_thread::sleep_for(std::chrono::milliseconds(300));

   return 0;
}
ReceiverClient on_container_start
[000001B1E5D22560]:  -> AMQP
[000001B1E5D22560]:0 -> @open(16) 
[container-id="292348be-1884-47e1-8a6c-3dd62cae7252", hostname="localhost", 
channel-max=32767]
[000001B1E5D22560]:0 -> @begin(17) [next-outgoing-id=0, 
incoming-window=2147483647, outgoing-window=2147483647]
[000001B1E5D22560]:0 -> @attach(18) 
[name="905f4916-f16d-415a-bb41-e926eb01b30b", handle=0, role=true, 
snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [address="myQueue", 
durable=0, timeout=0, dynamic=false], target=@target(41) [durable=0, timeout=0, 
dynamic=false], initial-delivery-count=0, max-message-size=0]
[000001B1E5D22560]:  <- AMQP
[000001B1E5D22560]:0 <- @open(16) 
[container-id="a51c297e-c548-42df-a42c-a3e8e0ae2356", max-frame-size=262144, 
channel-max=255, idle-time-out=0, 
offered-capabilities=@PN_SYMBOL[:"ANONYMOUS-RELAY", :"SHARED-SUBS", 
:"sole-connection-for-container"], properties={:product="qpid", 
:version="7.0.3", :"qpid.build"="62d7ae692b47746c42949c2e646f33a966e6a1a1", 
:"qpid.instance_name"="Broker", 
:"qpid.virtualhost_properties_supported"="true"}]
[000001B1E5D22560]:0 <- @begin(17) [remote-channel=0, next-outgoing-id=0, 
incoming-window=8192, outgoing-window=2048]
[000001B1E5D22560]:0 <- @attach(18) 
[name="905f4916-f16d-415a-bb41-e926eb01b30b", handle=0, role=false, 
snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [address="myQueue", 
durable=0, dynamic=false, default-outcome=@modified(39) [delivery-failed=true], 
outcomes=@PN_SYMBOL[:"amqp:accepted:list", :"amqp:released:list", 
:"amqp:rejected:list"], capabilities=@PN_SYMBOL[:queue]], target=@target(41) 
[durable=0, timeout=0, dynamic=false], unsettled={}, initial-delivery-count=0, 
offered-capabilities=@PN_SYMBOL[:"SHARED-SUBS"], properties={}]
ReceiverClient on_connection_open
ReceiverClient on_session_open
ReceiverClient on_receiver_open
first receive
[000001B1E5D22560]:0 -> @flow(19) [next-incoming-id=0, 
incoming-window=2147483647, next-outgoing-id=0, outgoing-window=2147483647, 
handle=0, delivery-count=0, link-credit=1, drain=false]
drain
[000001B1E5D22560]:0 -> @flow(19) [next-incoming-id=0, 
incoming-window=2147483647, next-outgoing-id=0, outgoing-window=2147483647, 
handle=0, delivery-count=0, link-credit=1, drain=true]
[000001B1E5D22560]:0 <- @flow(19) [next-incoming-id=0, incoming-window=8192, 
next-outgoing-id=0, outgoing-window=2048, handle=0, delivery-count=1, 
link-credit=0, available=0, drain=true, echo=false]
ReceiverClient on_receiver_drain_finish
second receive
[000001B1E5D22560]:0 -> @flow(19) [next-incoming-id=0, 
incoming-window=2147483647, next-outgoing-id=0, outgoing-window=2147483647, 
handle=0, delivery-count=1, link-credit=1, drain=true]
[000001B1E5D22560]:0 <- @flow(19) [next-incoming-id=0, incoming-window=8192, 
next-outgoing-id=0, outgoing-window=2048, handle=0, delivery-count=2, 
link-credit=0, available=0, drain=true, echo=false]
close
[000001B1E5D22560]:0 -> @close(24) []
[000001B1E5D22560]:  -> EOS
[000001B1E5D22560]:0 <- @close(24) []
[000001B1E5D22560]:  <- EOS

==============================================

Notes :

Ligne 20 : the drain flag should be false because the on_receiver_drain_finish 
is already called.
This implicates that the next add credit will consume a message if and only if 
a message is immediately available.  
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to