Hello,

We are using proton cpp 0.27.1 with visual studio 2013.

We have the attached Main.cpp that is hanging on line 58 on windows
randomly.
As if line 57 was not executed.

For the working case we have the following logs :
broker on_container_start
client on_container_start
[000000000260A1C0]:  -> AMQP
[000000000260A1C0]:0 -> @open(16)
[container-id="c77d9e59-2061-4804-850c-0d039a0df414", hostname="127.0.0.1",
channel-max=32767]
[000000000261FB50]:  <- AMQP
[000000000261FB50]:0 <- @open(16)
[container-id="c77d9e59-2061-4804-850c-0d039a0df414", hostname="127.0.0.1",
channel-max=32767]
[000000000261FB50]:  -> AMQP
b[000000000260A1C0]:  <- AMQP
roker on_connection_open
[000000000261FB50]:0 -> @open(16)
[container-id="292348be-1884-47e1-8a6c-3dd62cae7252", channel-max=32767]
[000000000260A1C0]:0 <- @open(16)
[container-id="292348be-1884-47e1-8a6c-3dd62cae7252", channel-max=32767]
client on_connection_open
[000000000260A1C0]:0 -> @close(24) [error=@error(29)
[condition=:"amqp:connection:framing-error", description="connection
aborted"]]
[000000000260A1C0]:  <- EOS
[000000000261FB50]:0 -> @close(24) [error=@error(29)
[condition=:"amqp:connection:framing-error", description="connection
aborted"]]
[000000000261FB50]:  <- EOS
[000000000261FB50]:  -> EOS
broker on_transport_error
Broker::on_transport_error: amqp:connection:framing-error: connection
aborted
client on_container_stop

For the error case:
broker on_container_start
client on_container_start

Is it a bug? on linux we do not have this problem.

Best regards,
Rabih & Jeremy
#include <proton/connection.hpp>
#include <proton/container.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/connection_options.hpp>

#include <Broker.hpp>

#include <iostream>
#include <thread>
#include <future>
#include <string>

struct ContainerHandler : public proton::messaging_handler
{
   void on_container_start(proton::container& c) override
   {
      std::cout << "client on_container_start" << std::endl;
      containerStarted.set_value();
   }

   void on_container_stop(proton::container&) override
   {
      std::cout << "client on_container_stop" << std::endl;
   }

   std::promise<void> containerStarted;
};

struct ConnectionHandler : public proton::messaging_handler
{
   void on_connection_open(proton::connection&) override
   {
      std::cout << "client on_connection_open" << std::endl;
      connectionStarted.set_value();
   }

   std::promise<void> connectionStarted;
};

int main()
{
   try {
      Broker brk("//127.0.0.1:5672", "examples");

      ContainerHandler containerHandler;
      proton::container cont(containerHandler);
      auto containerStopped = std::async(std::launch::async, [&]() {
         try {
            cont.run();
         }
         catch (const std::exception& e) {
            std::cout << "std::exception caught, message:" << e.what() << 
std::endl;
         }});
      containerHandler.containerStarted.get_future().get();

      ConnectionHandler connectionHandler;
      cont.connect("//127.0.0.1:5672", 
proton::connection_options().handler(connectionHandler));
      connectionHandler.connectionStarted.get_future().get();

      cont.stop();
      containerStopped.get();
   }
   catch (std::exception& e) {
      std::cerr << "Expected exception: " << e.what() << std::endl;
   }
   return 0;
}

#ifndef TEST_BROKER_HPP
#define TEST_BROKER_HPP

#include <proton/listener.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/listen_handler.hpp>

#include <proton/imperative/ThreadRAII.hpp>

#include <string>
#include <iostream>
#include <thread>
#include <future>

class listenerHandler : public proton::listen_handler
{
public:
   std::future<void> getStartedFuture()
   {
      return m_containerStarted.get_future();
   }

private:
   void on_open(proton::listener&) override
   {
      m_containerStarted.set_value();
   }

   void on_error(proton::listener&, const std::string& s) override
   {
      
m_containerStarted.set_exception(std::make_exception_ptr(std::runtime_error(s)));
   }

   std::promise<void> m_containerStarted;
};

class Broker : public proton::messaging_handler
{
public:
   Broker(const std::string& url, const std::string& destination)
      :m_url(url + "/" + destination)
   {
      m_brokerThread = std::thread([&]() {
         try
         {
            proton::container(*this).run();
         }
         catch (const std::exception& e) {
            std::cerr << "Broker threw exception: " << e.what() << std::endl;
         }});

      // Wait for the container to start
      m_listenerHandler.getStartedFuture().get();
   }

   ~Broker()
   {
      if (!m_isClosed) {
         m_listener.stop();
      }
   }

   void injectMessages(std::vector<proton::message> messages)
   {
      m_messages.insert(m_messages.end(), messages.begin(), messages.end());
   }

   int m_acceptedMsgs = 0;
   int m_rejectedMsgs = 0;
   int m_releasedMsgs = 0;

private:
   void on_container_start(proton::container &c) override
   {
      std::cout << "broker on_container_start" << std::endl;
      c.receiver_options(proton::receiver_options());
      m_listener = c.listen(m_url, m_listenerHandler);
   }

   void on_connection_open(proton::connection &c) override
   {
      std::cout << "broker on_connection_open" << std::endl;
      m_connection = c;
      c.open();
   }

   void on_connection_close(proton::connection&) override
   {
      std::cout << "broker on_connection_close" << std::endl;
      m_listener.stop();
   }

   void on_transport_error(proton::transport &t) override
   {
      std::cout << "broker on_transport_error" << std::endl;
      std::cerr << "Broker::on_transport_error: " << t.error().what() << 
std::endl;
      m_listener.stop();
   }

   void on_error(const proton::error_condition &c) override
   {
      std::cout << "broker on_error" << std::endl;
      std::cerr << "Broker::on_error: " << c.what() << std::endl;

      m_isClosed = true;
      m_listener.stop();
   }

   void on_message(proton::delivery& delivery, proton::message& message) 
override
   {
      std::cout << "broker on_message" << std::endl;
      m_currentDelivery = { message, delivery };
   }

   void sendMessages(proton::sender& sender)
   {
      size_t numberOfMessagesToSend = 
std::min(static_cast<size_t>(sender.credit()), m_messages.size());

      for (auto count = numberOfMessagesToSend; count > 0; --count)
      {
         auto message = m_messages.front();
         m_messages.pop_front();
         sender.send(message);
      }
   }

   void on_sendable(proton::sender& sender) override
   {
      std::cout << "broker on_sendable" << std::endl;
      sendMessages(sender);
   }

   void on_tracker_accept(proton::tracker& ) override
   {
      ++m_acceptedMsgs;
   }

   void on_tracker_reject(proton::tracker&) override
   {
      ++m_rejectedMsgs;
   }

   void on_tracker_release(proton::tracker&) override
   {
      ++m_releasedMsgs;
   }

private:
   std::string m_url;
   bool m_isClosed = false;
   proton::ThreadRAII m_brokerThread;
   std::deque<proton::message> m_messages;
   std::pair<proton::message, proton::delivery> m_currentDelivery;
   listenerHandler m_listenerHandler;

   proton::listener m_listener;
   proton::connection m_connection;
};

#endif
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to