Pavel Moravec created QPID-6397:
-----------------------------------

             Summary: [C++ broker] segfault when processing QMF method during 
periodic processing
                 Key: QPID-6397
                 URL: https://issues.apache.org/jira/browse/QPID-6397
             Project: Qpid
          Issue Type: Bug
          Components: C++ Broker
    Affects Versions: 0.31
            Reporter: Pavel Moravec


There is a race condition causing segfault when:
- one thread executes periodic processing with traces enabled (at least for 
qpid::management::ManagementAgent::debugSnapshot)
- second thread is just processing QMF method from a client

The root cause is, the first thread iterates through managementObjects map in 
dumpMap (or through newManagementObjects in dumpVector) while the second thread 
is executing moveNewObjects that moves stuff from newManagementObjects to 
managementObjects.

See backtraces hit (dumpMap shown, hit also dumpVector):
(gdb) bt # of thread 1
#0  0x0000003f0c632885 in raise (sig=6) at 
../nptl/sysdeps/unix/sysv/linux/raise.c:64
#1  0x0000003f0c634065 in abort () at abort.c:92
#2  0x0000003f0c62b9fe in __assert_fail_base (fmt=<value optimized out>, 
assertion=0x7ff117bd5c94 "px != 0", 
    file=0x7ff117bd5c68 "/usr/include/boost/smart_ptr/shared_ptr.hpp", 
line=<value optimized out>, function=<value optimized out>)
    at assert.c:96
#3  0x0000003f0c62bac0 in __assert_fail (assertion=0x7ff117bd5c94 "px != 0", 
    file=0x7ff117bd5c68 "/usr/include/boost/smart_ptr/shared_ptr.hpp", 
line=418, 
    function=0x7ff117bd5e80 "T* boost::shared_ptr< <template-parameter-1-1> 
>::operator->() const [with T = qpid::management::ManagementObject]") at 
assert.c:105
#4  0x00007ff117947139 in 
boost::shared_ptr<qpid::management::ManagementObject>::operator->() const ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#5  0x00007ff117bafcd9 in qpid::management::(anonymous 
namespace)::dumpMap(std::map<qpid::management::ObjectId, 
boost::shared_ptr<qpid::management::ManagementObject>, 
std::less<qpid::management::ObjectId>, 
std::allocator<std::pair<qpid::management::ObjectId const, 
boost::shared_ptr<qpid::management::ManagementObject> > > > const&) () from 
/root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#6  0x00007ff117bb06ba in qpid::management::ManagementAgent::debugSnapshot(char 
const*) ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#7  0x00007ff117b954ed in 
qpid::management::ManagementAgent::periodicProcessing() ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#8  0x00007ff117bc5c1f in boost::_mfi::mf0<void, 
qpid::management::ManagementAgent>::operator()(qpid::management::ManagementAgent*)
 const () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#9  0x00007ff117bc4384 in void 
boost::_bi::list1<boost::_bi::value<qpid::management::ManagementAgent*> 
>::operator()<boost::_mfi::mf0<void, qpid::management::ManagementAgent>, 
boost::_bi::list0>(boost::_bi::type<void>, boost::_mfi::mf0<void, 
qpid::management::ManagementAgent>&, boost::_bi::list0&, int) () from 
/root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#10 0x00007ff117bc1269 in boost::_bi::bind_t<void, boost::_mfi::mf0<void, 
qpid::management::ManagementAgent>, 
boost::_bi::list1<boost::_bi::value<qpid::management::ManagementAgent*> > 
>::operator()() () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#11 0x00007ff117bbc1e0 in 
boost::detail::function::void_function_obj_invoker0<boost::_bi::bind_t<void, 
boost::_mfi::mf0<void, qpid::management::ManagementAgent>, 
boost::_bi::list1<boost::_bi::value<qpid::management::ManagementAgent*> > >, 
void>::invoke(boost::detail::function::function_buffer&) () from 
/root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#12 0x00007ff117a5e2af in boost::function0<void>::operator()() const () from 
/root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#13 0x00007ff117b8e400 in qpid::management::(anonymous 
namespace)::Periodic::fire() ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#14 0x00007ff1173b518f in qpid::sys::TimerTask::fireTask() () from 
/root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
---Type <return> to continue, or q <return> to quit---
#15 0x00007ff1173b63e9 in 
qpid::sys::Timer::fire(boost::intrusive_ptr<qpid::sys::TimerTask>) ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
#16 0x00007ff1173b5d1d in qpid::sys::Timer::run() () from 
/root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
#17 0x00007ff1173280ef in qpid::sys::(anonymous namespace)::runRunnable(void*) 
()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
#18 0x0000003f0ce077f1 in start_thread (arg=0x7ff116c8f700) at 
pthread_create.c:301
#19 0x0000003f0c6e570d in clone () at 
../sysdeps/unix/sysv/linux/x86_64/clone.S:115

(gdb) bt  # of thread 2
#0  0x00007ff116ca99c7 in qpid::types::VariantImpl::~VariantImpl() () from 
/root/qpid-trunk/qpid/cpp/BLD/src/libqpidtypes.so.1
#1  0x00007ff116caf2e9 in qpid::types::Variant::~Variant() () from 
/root/qpid-trunk/qpid/cpp/BLD/src/libqpidtypes.so.1
#2  0x00007ff117967876 in 
qmf::org::apache::qpid::broker::Connection::mapEncodeValues(std::map<std::basic_string<char,
 std::char_traits<char>, std::allocator<char> >, qpid::types::Variant, 
std::less<std::basic_string<char, std::char_traits<char>, std::allocator<char> 
> >, std::allocator<std::pair<std::basic_string<char, std::char_traits<char>, 
std::allocator<char> > const, qpid::types::Variant> > >&, bool, bool) () from 
/root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#3  0x00007ff117bb118d in 
qpid::management::ManagementAgent::DeletedObject::DeletedObject(boost::shared_ptr<qpid::management::ManagementObject>,
 bool, bool) () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#4  0x00007ff117b94ec1 in qpid::management::ManagementAgent::moveNewObjects() ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#5  0x00007ff117b9e79a in 
qpid::management::ManagementAgent::handleMethodRequest(std::basic_string<char, 
std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, 
std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, 
std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, 
std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, 
std::char_traits<char>, std::allocator<char> > const&, bool) ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#6  0x00007ff117bacb70 in 
qpid::management::ManagementAgent::dispatchAgentCommand(qpid::broker::Message&, 
bool) ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#7  0x00007ff117b9ccc3 in 
qpid::management::ManagementAgent::dispatchCommand(qpid::broker::Deliverable&, 
std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, 
qpid::framing::FieldTable const*, bool, int) ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#8  0x00007ff117bca08a in 
qpid::broker::ManagementDirectExchange::route(qpid::broker::Deliverable&) ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#9  0x00007ff117b49d65 in 
qpid::broker::SemanticState::route(qpid::broker::Message&, 
qpid::broker::Deliverable&) ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#10 0x00007ff117b6a8f3 in 
qpid::broker::SessionState::handleContent(qpid::framing::AMQFrame&) ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#11 0x00007ff117b6af31 in 
qpid::broker::SessionState::handleIn(qpid::framing::AMQFrame&) ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#12 0x00007ff117b72dc1 in 
qpid::framing::Handler<qpid::framing::AMQFrame&>::MemFunRef<qpid::framing::Handler<qpid::framing::AMQFrame&>::InOutHandlerInterface,
 
&(qpid::framing::Handler<qpid::framing::AMQFrame&>::InOutHandlerInterface::handleIn(qpid::framing::AMQFrame&))>::handle(qpid::framing::AMQFrame&)
 () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#13 0x00007ff117376ba3 in 
qpid::amqp_0_10::SessionHandler::handleIn(qpid::framing::AMQFrame&) ()
---Type <return> to continue, or q <return> to quit---
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
#14 0x00007ff117b72dc1 in 
qpid::framing::Handler<qpid::framing::AMQFrame&>::MemFunRef<qpid::framing::Handler<qpid::framing::AMQFrame&>::InOutHandlerInterface,
 
&(qpid::framing::Handler<qpid::framing::AMQFrame&>::InOutHandlerInterface::handleIn(qpid::framing::AMQFrame&))>::handle(qpid::framing::AMQFrame&)
 () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#15 0x00007ff117ac640a in 
qpid::framing::Handler<qpid::framing::AMQFrame&>::operator()(qpid::framing::AMQFrame&)
 ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#16 0x00007ff117ac1a8c in 
qpid::broker::ConnectionHandler::handle(qpid::framing::AMQFrame&) ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#17 0x00007ff117ab8444 in 
qpid::broker::amqp_0_10::Connection::received(qpid::framing::AMQFrame&) ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#18 0x00007ff117a23408 in qpid::amqp_0_10::Connection::decode(char const*, 
unsigned long) ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#19 0x00007ff117b34581 in qpid::broker::SecureConnection::decode(char const*, 
unsigned long) ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#20 0x00007ff1173abc2f in 
qpid::sys::AsynchIOHandler::readbuff(qpid::sys::AsynchIO&, 
qpid::sys::AsynchIOBufferBase*) ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
#21 0x00007ff117bd2ce6 in boost::_mfi::mf2<void, qpid::sys::AsynchIOHandler, 
qpid::sys::AsynchIO&, 
qpid::sys::AsynchIOBufferBase*>::operator()(qpid::sys::AsynchIOHandler*, 
qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*) const ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#22 0x00007ff117bd1a24 in void 
boost::_bi::list3<boost::_bi::value<qpid::sys::AsynchIOHandler*>, 
boost::arg<1>, boost::arg<2> >::operator()<boost::_mfi::mf2<void, 
qpid::sys::AsynchIOHandler, qpid::sys::AsynchIO&, 
qpid::sys::AsynchIOBufferBase*>, boost::_bi::list2<qpid::sys::AsynchIO&, 
qpid::sys::AsynchIOBufferBase*&> >(boost::_bi::type<void>, 
boost::_mfi::mf2<void, qpid::sys::AsynchIOHandler, qpid::sys::AsynchIO&, 
qpid::sys::AsynchIOBufferBase*>&, boost::_bi::list2<qpid::sys::AsynchIO&, 
qpid::sys::AsynchIOBufferBase*&>&, int) ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#23 0x00007ff117bd0eb6 in void boost::_bi::bind_t<void, boost::_mfi::mf2<void, 
qpid::sys::AsynchIOHandler, qpid::sys::AsynchIO&, 
qpid::sys::AsynchIOBufferBase*>, 
boost::_bi::list3<boost::_bi::value<qpid::sys::AsynchIOHandler*>, 
boost::arg<1>, boost::arg<2> > >::operator()<qpid::sys::AsynchIO, 
qpid::sys::AsynchIOBufferBase*>(qpid::sys::AsynchIO&, 
qpid::sys::AsynchIOBufferBase*&) ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#24 0x00007ff117bd018f in 
boost::detail::function::void_function_obj_invoker2<boost::_bi::bind_t<void, 
boost::_mfi::mf2<void, qpid::sys::AsynchIOHandler, qpid::sys::AsynchIO&, 
qpid::sys::AsynchIOBufferBase*>, 
boost::_bi::list3<boost::_bi::value<qpid::sys::AsynchIOHandler*>, 
boost::arg<1>, boost::arg<2> > >, void, qpid::sys::AsynchIO&, 
qpid::sys::AsynchIOBufferBase*>::invoke(boost::detail::function::function_buffer&,
 qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*) () from 
/root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
---Type <return> to continue, or q <return> to quit---
#25 0x00007ff11730f204 in boost::function2<void, qpid::sys::AsynchIO&, 
qpid::sys::AsynchIOBufferBase*>::operator()(qpid::sys::AsynchIO&, 
qpid::sys::AsynchIOBufferBase*) const () from 
/root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
#26 0x00007ff11730c28b in 
qpid::sys::posix::AsynchIO::readable(qpid::sys::DispatchHandle&) ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
#27 0x00007ff117314202 in boost::_mfi::mf1<void, qpid::sys::posix::AsynchIO, 
qpid::sys::DispatchHandle&>::operator()(qpid::sys::posix::AsynchIO*, 
qpid::sys::DispatchHandle&) const () from 
/root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
#28 0x00007ff117313553 in void 
boost::_bi::list2<boost::_bi::value<qpid::sys::posix::AsynchIO*>, boost::arg<1> 
>::operator()<boost::_mfi::mf1<void, qpid::sys::posix::AsynchIO, 
qpid::sys::DispatchHandle&>, boost::_bi::list1<qpid::sys::DispatchHandle&> 
>(boost::_bi::type<void>, boost::_mfi::mf1<void, qpid::sys::posix::AsynchIO, 
qpid::sys::DispatchHandle&>&, boost::_bi::list1<qpid::sys::DispatchHandle&>&, 
int) () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
#29 0x00007ff117312844 in void boost::_bi::bind_t<void, boost::_mfi::mf1<void, 
qpid::sys::posix::AsynchIO, qpid::sys::DispatchHandle&>, 
boost::_bi::list2<boost::_bi::value<qpid::sys::posix::AsynchIO*>, boost::arg<1> 
> >::operator()<qpid::sys::DispatchHandle>(qpid::sys::DispatchHandle&) () from 
/root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
#30 0x00007ff117311821 in 
boost::detail::function::void_function_obj_invoker1<boost::_bi::bind_t<void, 
boost::_mfi::mf1<void, qpid::sys::posix::AsynchIO, qpid::sys::DispatchHandle&>, 
boost::_bi::list2<boost::_bi::value<qpid::sys::posix::AsynchIO*>, boost::arg<1> 
> >, void, 
qpid::sys::DispatchHandle&>::invoke(boost::detail::function::function_buffer&, 
qpid::sys::DispatchHandle&) ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
#31 0x00007ff1173afc7a in boost::function1<void, 
qpid::sys::DispatchHandle&>::operator()(qpid::sys::DispatchHandle&) const ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
#32 0x00007ff1173af240 in 
qpid::sys::DispatchHandle::processEvent(qpid::sys::Poller::EventType) ()
   from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
#33 0x00007ff1173355d6 in qpid::sys::Poller::Event::process() () from 
/root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
#34 0x00007ff1173348d4 in qpid::sys::Poller::run() () from 
/root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
#35 0x00007ff1173ade21 in qpid::sys::Dispatcher::run() () from 
/root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
#36 0x00007ff117a30177 in qpid::broker::Broker::run() () from 
/root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
#37 0x000000000040760c in 
qpid::broker::QpiddBroker::execute(qpid::broker::QpiddOptions*) ()
#38 0x000000000040b461 in qpid::broker::run_broker(int, char**, bool) ()
#39 0x00000000004076a6 in main ()


Reproducer: little bit tricky but basically:
1) create as much objects (and continue in creating them to have something in 
newManagementObjects map)
2) invoke some QMF method just when periodic processing is starting

In particular, I used this repro:
1) populate_provisioning.sh:

for i in $(seq 1 50); do qpid-config add exchange fanout ex_${i} & done
wait

while true; do
        j=$(uuidgen)
        echo "$(date): starting with queue ${j}_q_1"
        for i in $(seq 1 200); do qpid-receive -a "${j}_q_${i}; {create:always, 
node:{durable:true, x-bindings:[{exchange:'amq.direct', queue:'${j}_q_${i}', 
key:''}, {exchange:'amq.fanout', queue:'${j}_q_${i}'}, {exchange:'ex_1', 
queue:'${j}_q_${i}'}, {exchange:'ex_2', queue:'${j}_q_${i}'}, {exchange:'ex_3', 
queue:'${j}_q_${i}'}, {exchange:'ex_4', queue:'${j}_q_${i}'}, {exchange:'ex_5', 
queue:'${j}_q_${i}'}, {exchange:'ex_6', queue:'${j}_q_${i}'}, {exchange:'ex_7', 
queue:'${j}_q_${i}'}, {exchange:'ex_8', queue:'${j}_q_${i}'}, {exchange:'ex_9', 
queue:'${j}_q_${i}'}, {exchange:'ex_10', queue:'${j}_q_${i}'}, 
{exchange:'ex_11', queue:'${j}_q_${i}'}, {exchange:'ex_12', 
queue:'${j}_q_${i}'}, {exchange:'ex_13', queue:'${j}_q_${i}'}, 
{exchange:'ex_14', queue:'${j}_q_${i}'}, {exchange:'ex_15', 
queue:'${j}_q_${i}'}, {exchange:'ex_16', queue:'${j}_q_${i}'}, 
{exchange:'ex_17', queue:'${j}_q_${i}'}, {exchange:'ex_18', 
queue:'${j}_q_${i}'}, {exchange:'ex_19', queue:'${j}_q_${i}'}, 
{exchange:'ex_20', queue:'${j}_q_${i}'}, {exchange:'ex_21', 
queue:'${j}_q_${i}'}, {exchange:'ex_22', queue:'${j}_q_${i}'}, 
{exchange:'ex_23', queue:'${j}_q_${i}'}, {exchange:'ex_24', 
queue:'${j}_q_${i}'}, {exchange:'ex_25', queue:'${j}_q_${i}'}, 
{exchange:'ex_26', queue:'${j}_q_${i}'}, {exchange:'ex_27', 
queue:'${j}_q_${i}'}, {exchange:'ex_28', queue:'${j}_q_${i}'}, 
{exchange:'ex_29', queue:'${j}_q_${i}'}, {exchange:'ex_30', 
queue:'${j}_q_${i}'}, {exchange:'ex_31', queue:'${j}_q_${i}'}, 
{exchange:'ex_32', queue:'${j}_q_${i}'}, {exchange:'ex_33', 
queue:'${j}_q_${i}'}, {exchange:'ex_34', queue:'${j}_q_${i}'}, 
{exchange:'ex_35', queue:'${j}_q_${i}'}, {exchange:'ex_36', 
queue:'${j}_q_${i}'}, {exchange:'ex_37', queue:'${j}_q_${i}'}, 
{exchange:'ex_38', queue:'${j}_q_${i}'}, {exchange:'ex_39', 
queue:'${j}_q_${i}'}, {exchange:'ex_40', queue:'${j}_q_${i}'}, 
{exchange:'ex_41', queue:'${j}_q_${i}'}, {exchange:'ex_42', 
queue:'${j}_q_${i}'}, {exchange:'ex_43', queue:'${j}_q_${i}'}, 
{exchange:'ex_44', queue:'${j}_q_${i}'}, {exchange:'ex_45', 
queue:'${j}_q_${i}'}, {exchange:'ex_46', queue:'${j}_q_${i}'}, 
{exchange:'ex_47', queue:'${j}_q_${i}'}, {exchange:'ex_48', 
queue:'${j}_q_${i}'}, {exchange:'ex_49', queue:'${j}_q_${i}'}, 
{exchange:'ex_50', queue:'${j}_q_${i}'}] }}" & sleep 0.1; done
        wait
done


2) while true; do date "+%T.%N"; ./purge_queue whateverQueue 0 > /dev/null 
2>&1; sleep 9.83; done

purge_queue.cpp:
/* To the extent possible under law, Red Hat, Inc. has dedicated all copyright
 * to this software to the public domain worldwide, pursuant to the CC0 Public
 * Domain Dedication. This software is distributed without any warranty.  See
 *  <http://creativecommons.org/publicdomain/zero/1.0/>.
*/

#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Address.h>

#include <iostream>
#include <cstdlib>

using namespace std;
using namespace qpid::messaging;
using namespace qpid::types;

int main(int argc, char** argv) {
  if (argc < 3) {
    cerr << "Invalid number of parameters, expecting: queue_name, quantity." << 
endl;
    return 1;
  }
  string queue_name = argv[1];
  uint32_t qty = atoi(argv[2]);

  Connection connection(argc>3?argv[3]:"localhost:5672");
  connection.open();
  Session session = connection.createSession();
  Sender sender = session.createSender("qmf.default.direct/broker");
  Address responseQueue("#reply-queue; {create:always, 
node:{x-declare:{auto-delete:true}}}");
  Receiver receiver = session.createReceiver(responseQueue);

  Message message;
  Variant::Map content;
  Variant::Map OID;
  Variant::Map arguments;
  string object_name = "org.apache.qpid.broker:queue:" + queue_name;
  OID["_object_name"] = object_name;

  arguments["request"] = qty;

  content["_object_id"] = OID;
  content["_method_name"] = "purge";
  content["_arguments"] = arguments;

  encode(content, message);
  message.setReplyTo(responseQueue);
  message.setProperty("x-amqp-0-10.app-id", "qmf2");
  message.setProperty("qmf.opcode", "_method_request");
  message.setContentType("amqp/map");
  Variant::Map map;
  decode(message, map);
  std::cout << map << std::endl;

  sender.send(message, true);

  Message response;
  if (receiver.fetch(response,qpid::messaging::Duration(30000)) == true) {
    qpid::types::Variant::Map recv_props = response.getProperties();
    if (recv_props["x-amqp-0-10.app-id"] == "qmf2")
      if (recv_props["qmf.opcode"] == "_method_response")
        std::cout << "Response: OK" << std::endl;
      else if (recv_props["qmf.opcode"] == "_exception")
        std::cerr << "Error: " << response.getContent() << std::endl;
      else
        std::cerr << "Invalid response received!" << std::endl;
    else
      std::cerr << "Invalid response not of qmf2 type received!" << std::endl;
  }
  else
    std::cout << "Timeout: No response received within 30 seconds!" << 
std::endl;

  receiver.close();
  sender.close();
  session.close();
  connection.close();
  return 0;
}





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org
For additional commands, e-mail: dev-h...@qpid.apache.org

Reply via email to