Hello,

I am using proton cpp 0.27.1.

While working on the C++ imperative API, I encountered a problem while
using container.stop.
Please find the test code attached (a simplified version of the current
implementation).
I attached the core I am getting too.

The problem:
In the example I attached, the container.stop in the ImperativeContainer
destructor might have a race condition with the closing of the connection.
Is everything related to this connection is freed when on_transport_close
is called? because in the on_transport_close I signal for the main thread
to continue and execute container.stop which might create a race condition
if some actions are taken after on_transport_close is called...

A probable solution:
I think we need a sure way to be able to release a handler:
It might be an unregister handler function or a call back after
on_transport_close that tells us that we can release the memory of the
handler.

Best regards,
Rabih

struct ContainerHandler : public proton::messaging_handler {
   void on_container_start(proton::container&) override {
      std::cout << "client on_container_start" << std::endl;
      m_onOpenPromise.set_value();
   }

   void on_container_stop(proton::container&) override {
      std::cout << "client on_container_stop" << std::endl;
      m_onStopPromise.set_value();
   }

   std::promise<void> m_onOpenPromise;
   std::promise<void> m_onStopPromise;
};

struct ImperativeContainer {
   ImperativeContainer()
   : m_container(m_containerHand)
   {
      auto openContainerFuture = m_containerHand.m_onOpenPromise.get_future();
      m_thread = std::thread([&]() {
         try {
            m_container.auto_stop(false);
            m_container.run();
         }
         catch (const std::exception& e) {
            std::cout << "std::exception caught on pn container, message:" << 
e.what() << std::endl;
         }});
      openContainerFuture.get();
   }

   ~ImperativeContainer() {
      auto closeContainerFuture = m_containerHand.m_onStopPromise.get_future();
      m_container.stop();
      closeContainerFuture.get();
      if (m_thread.joinable()) {
         m_thread.join();
      }
   }

   ContainerHandler m_containerHand;
   proton::container m_container;
   std::thread m_thread;
};

struct ConnectionHandler : public proton::messaging_handler {
   void on_connection_open(proton::connection& conn) override {
      std::cout << "client on_connection_open" << std::endl;
      m_connection = conn;
      m_work = &m_connection.work_queue();
      m_onOpenPromise.set_value();
   }

   void on_transport_close(proton::transport&) override {
      std::cout << "client on_transport_close" << std::endl;
          //to avoid race condition in free
          m_connection = proton::connection();
      m_onClosePromise.set_value();
      std::cout << "client on_transport_close finished" << std::endl;
   }

   void on_connection_close(proton::connection&) {
      std::cout << "client on_connection_close" << std::endl;
   }

   void on_connection_error(proton::connection& conn) {
      std::cout << "client on_connection_error: " << conn.error().what() << 
std::endl;
   }

   void on_transport_open(proton::transport&) {
      std::cout << "client on_transport_open" << std::endl;
   }

   void on_transport_error(proton::transport& t) {
      std::cout << "client on_transport_error: " << t.error().what() << 
std::endl;
   }

   proton::connection m_connection;
   proton::work_queue* m_work;

   std::promise<void> m_onOpenPromise;
   std::promise<void> m_onClosePromise;
};

class ImperativeConnection {
public:
   ImperativeConnection(proton::container& myContainer, const std::strind& url) 
{
      auto openConnectionFuture = m_connectionHand.m_onOpenPromise.get_future();
      myContainer.connect(url, m_connectionHand);
      openConnectionFuture.get();
   }

   ~ImperativeConnection() {
      auto closeConnectionFuture = 
m_connectionHand.m_onClosePromise.get_future();
      m_connectionHand.m_work->add([=]() 
{m_connectionHand.m_connection.close(); });
      closeConnectionFuture.wait();
   }

private:
   ConnectionHandler m_connectionHand;
};

int main(int argc, char **argv) {
    std::string url = argc > 1 ? argv[1] : "//127.0.0.1:5672";
    ImperativeContainer cont;
    ImperativeConnection conn(cont.m_container, url);
}
#0  pn_error_free (error=0xffffffff00000000)
    at /data/home/proton/qpid-proton-0.27.1/c/src/core/error.c:50
#1  0x00007f5439a301b7 in pn_data_finalize (object=0x1a22420)
    at /data/home/proton/qpid-proton-0.27.1/c/src/core/codec.c:90
#2  0x00007f5439a2d218 in pn_class_decref (clazz=0x7f5439c54880 <clazz>, 
object=0x1a22420)
    at /data/home/proton/qpid-proton-0.27.1/c/src/core/object/object.c:95
#3  0x00007f5439a36f1d in pn_condition_tini (condition=0x1a24990)
    at /data/home/proton/qpid-proton-0.27.1/c/src/core/engine.c:219
#4  0x00007f5439a396a6 in pn_condition_free (c=0x1a24990)
    at /data/home/proton/qpid-proton-0.27.1/c/src/core/engine.c:227
#5  0x00007f5439c59728 in pconnection_final_free (pc=0x1a2e4a0)
    at /data/home/proton/qpid-proton-0.27.1/c/src/proactor/epoll.c:888
#6  0x00007f5439c5a384 in pconnection_cleanup (pc=<optimized out>)
    at /data/home/proton/qpid-proton-0.27.1/c/src/proactor/epoll.c:905
#7  0x00007f5439c5b21a in pconnection_process (pc=0x1a2e4a0, 
events=events@entry=0, timeout=timeout@entry=false, topup=topup@entry=false, 
is_io_2=is_io_2@entry=false)
    at /data/home/proton/qpid-proton-0.27.1/c/src/proactor/epoll.c:1272
#8  0x00007f5439c5bcf2 in process_inbound_wake (ee=0x1a24510, p=0x1a24410)
    at /data/home/proton/qpid-proton-0.27.1/c/src/proactor/epoll.c:2092
#9  proactor_do_epoll (p=0x1a24410, can_block=true)
    at /data/home/proton/qpid-proton-0.27.1/c/src/proactor/epoll.c:2129
#10 0x00007f543a9c3c1c in proton::container::impl::thread 
(this=this@entry=0x1a238a0)
    at 
/data/home/proton/qpid-proton-0.27.1/cpp/src/proactor_container_impl.cpp:736
#11 0x00007f543a9c4250 in proton::container::impl::run (this=0x1a238a0, 
threads=<optimized out>)
    at 
/data/home/proton/qpid-proton-0.27.1/cpp/src/proactor_container_impl.cpp:788
#12 0x00007f543ac32906 in 
proton::Container::Container()::{lambda()#1}::operator()() const () at 
/src/new/rmourad/qpid-imperative-proton/src/Container.cpp:17
#13 0x00007f543ac341e2 in 
std::_Bind_simple<proton::Container::Container()::<lambda()>()>::_M_invoke<>(std::_Index_tuple<>)
 (this=0x1a21178)
    at /opt/rh/devtoolset-3/root/usr/include/c++/4.9.2/functional:1700
#14 0x00007f543ac34127 in 
std::_Bind_simple<proton::Container::Container()::<lambda()>()>::operator()(void)
 (this=0x1a21178) at 
/opt/rh/devtoolset-3/root/usr/include/c++/4.9.2/functional:1688
#15 0x00007f543ac340a4 in 
std::thread::_Impl<std::_Bind_simple<proton::Container::Container()::<lambda()>()>
 >::_M_run(void) (this=0x1a21160)
    at /opt/rh/devtoolset-3/root/usr/include/c++/4.9.2/thread:115
#16 0x00007f543a744470 in ?? () from /usr/lib64/libstdc++.so.6
#17 0x00007f543ae66aa1 in start_thread () from /lib64/libpthread.so.0
#18 0x00007f5439f48c4d in clone () from /lib64/libc.so.6
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to