Author: gsim Date: Thu Nov 12 10:30:53 2009 New Revision: 835323 URL: http://svn.apache.org/viewvc?rev=835323&view=rev Log: Merge branch 'next_receiver_changes' into trunk
Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h?rev=835323&r1=835322&r2=835323&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h (original) +++ qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h Thu Nov 12 10:30:53 2009 @@ -88,13 +88,14 @@ QPID_CLIENT_EXTERN bool fetch(Message& message, qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); QPID_CLIENT_EXTERN Message fetch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); QPID_CLIENT_EXTERN bool dispatch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); + QPID_CLIENT_EXTERN bool nextReceiver(Receiver&, qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); + QPID_CLIENT_EXTERN Receiver nextReceiver(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); + QPID_CLIENT_EXTERN Sender createSender(const Address& address); QPID_CLIENT_EXTERN Sender createSender(const std::string& address); QPID_CLIENT_EXTERN Receiver createReceiver(const Address& address); QPID_CLIENT_EXTERN Receiver createReceiver(const std::string& address); - - QPID_CLIENT_EXTERN Address createTempQueue(const std::string& baseName = std::string()); private: friend class qpid::client::PrivateImplRef<Session>; }; Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=835323&r1=835322&r2=835323&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Thu Nov 12 10:30:53 2009 @@ -123,6 +123,15 @@ return process(&handler, timeout); } +bool IncomingMessages::getNextDestination(std::string& destination, Duration timeout) +{ + //if there is not already a received message, we must wait for one + if (received.empty() && !wait(timeout)) return false; + //else we have a message in received; return the corresponding destination + destination = received.front()->as<MessageTransferBody>()->getDestination(); + return true; +} + void IncomingMessages::accept() { acceptTracker.accept(session); @@ -155,11 +164,11 @@ } /** - * Get a frameset from session queue, waiting for up to the specified - * duration and returning true if this could be achieved, false - * otherwise. If a destination is supplied, only return a message for - * that destination. In this case messages from other destinations - * will be held on a received queue. + * Get a frameset that is accepted by the specified handler from + * session queue, waiting for up to the specified duration and + * returning true if this could be achieved, false otherwise. Messages + * that are not accepted by the handler are pushed onto received queue + * for later retrieval. */ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) { @@ -183,6 +192,22 @@ return false; } +bool IncomingMessages::wait(qpid::sys::Duration duration) +{ + AbsTime deadline(AbsTime::now(), duration); + FrameSet::shared_ptr content; + for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) { + if (content->isA<MessageTransferBody>()) { + QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue"); + received.push_back(content); + return true; + } else { + //TODO: handle other types of commands (e.g. message-accept, message-flow etc) + } + } + return false; +} + uint32_t IncomingMessages::pendingAccept() { return acceptTracker.acceptsPending(); Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h?rev=835323&r1=835322&r2=835323&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h Thu Nov 12 10:30:53 2009 @@ -70,8 +70,7 @@ void setSession(qpid::client::AsyncSession session); bool get(Handler& handler, qpid::sys::Duration timeout); - //bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout); - //bool get(const std::string& destination, qpid::messaging::Message& message, qpid::sys::Duration timeout); + bool getNextDestination(std::string& destination, qpid::sys::Duration timeout); void accept(); void releaseAll(); void releasePending(const std::string& destination); @@ -90,6 +89,7 @@ AcceptTracker acceptTracker; bool process(Handler*, qpid::sys::Duration); + bool wait(qpid::sys::Duration); void retrieve(FrameSetPtr, qpid::messaging::Message*); }; Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=835323&r1=835322&r2=835323&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Thu Nov 12 10:30:53 2009 @@ -210,6 +210,19 @@ } + +bool SessionImpl::getNextReceiver(Receiver* receiver, IncomingMessages::MessageTransfer& transfer) +{ + Receivers::const_iterator i = receivers.find(transfer.getDestination()); + if (i == receivers.end()) { + QPID_LOG(error, "Received message for unknown destination " << transfer.getDestination()); + return false; + } else { + *receiver = i->second; + return true; + } +} + bool SessionImpl::accept(ReceiverImpl* receiver, qpid::messaging::Message* message, bool isDispatch, @@ -279,6 +292,37 @@ } } +bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::sys::Duration timeout) +{ + qpid::sys::Mutex::ScopedLock l(lock); + while (true) { + try { + std::string destination; + if (incoming.getNextDestination(destination, timeout)) { + Receivers::const_iterator i = receivers.find(destination); + if (i == receivers.end()) { + throw qpid::Exception(QPID_MSG("Received message for unknown destination " << destination)); + } else { + receiver = i->second; + } + return true; + } else { + return false; + } + } catch (TransportFailure&) { + reconnect(); + } + } +} + +qpid::messaging::Receiver SessionImpl::nextReceiver(qpid::sys::Duration timeout) +{ + qpid::messaging::Receiver receiver; + if (!nextReceiver(receiver, timeout)) throw Receiver::NoMessageAvailable(); + if (!receiver) throw qpid::Exception("Bad receiver returned!"); + return receiver; +} + uint32_t SessionImpl::available() { return get1<Available, uint32_t>((const std::string*) 0); Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h?rev=835323&r1=835322&r2=835323&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h Thu Nov 12 10:30:53 2009 @@ -73,6 +73,10 @@ qpid::messaging::Message fetch(qpid::sys::Duration timeout); bool dispatch(qpid::sys::Duration timeout); + bool nextReceiver(qpid::messaging::Receiver& receiver, qpid::sys::Duration timeout); + qpid::messaging::Receiver nextReceiver(qpid::sys::Duration timeout); + + bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout); void receiverCancelled(const std::string& name); @@ -115,6 +119,7 @@ bool acceptAny(qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&); bool accept(ReceiverImpl*, qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&); bool getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout); + bool getNextReceiver(qpid::messaging::Receiver* receiver, IncomingMessages::MessageTransfer& transfer); void reconnect(); void commitImpl(); Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp?rev=835323&r1=835322&r2=835323&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp Thu Nov 12 10:30:53 2009 @@ -65,11 +65,6 @@ return impl->createReceiver(Address(address)); } -Address Session::createTempQueue(const std::string& baseName) -{ - return impl->createTempQueue(baseName); -} - void Session::sync() { impl->sync(); @@ -94,6 +89,18 @@ { return impl->dispatch(timeout); } + +bool Session::nextReceiver(Receiver& receiver, qpid::sys::Duration timeout) +{ + return impl->nextReceiver(receiver, timeout); +} + + +Receiver Session::nextReceiver(qpid::sys::Duration timeout) +{ + return impl->nextReceiver(timeout); +} + uint32_t Session::available() { return impl->available(); } uint32_t Session::pendingAck() { return impl->pendingAck(); } Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h?rev=835323&r1=835322&r2=835323&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h Thu Nov 12 10:30:53 2009 @@ -51,9 +51,10 @@ virtual bool fetch(Message& message, qpid::sys::Duration timeout) = 0; virtual Message fetch(qpid::sys::Duration timeout) = 0; virtual bool dispatch(qpid::sys::Duration timeout) = 0; - virtual Address createTempQueue(const std::string& baseName) = 0; virtual Sender createSender(const Address& address) = 0; virtual Receiver createReceiver(const Address& address) = 0; + virtual bool nextReceiver(Receiver& receiver, qpid::sys::Duration timeout) = 0; + virtual Receiver nextReceiver(qpid::sys::Duration timeout) = 0; virtual uint32_t available() = 0; virtual uint32_t pendingAck() = 0; private: Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=835323&r1=835322&r2=835323&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Thu Nov 12 10:30:53 2009 @@ -354,6 +354,28 @@ BOOST_CHECK_EQUAL(collector.messageData, boost::assign::list_of<std::string>("Message_1")("Message_2")("Message_3")); } +QPID_AUTO_TEST_CASE(testNextReceiver) +{ + MultiQueueFixture fix; + + for (uint i = 0; i < fix.queues.size(); i++) { + Receiver r = fix.session.createReceiver(fix.queues[i]); + r.setCapacity(10u); + r.start();//TODO: add Session::start + } + + for (uint i = 0; i < fix.queues.size(); i++) { + Sender s = fix.session.createSender(fix.queues[i]); + Message msg((boost::format("Message_%1%") % (i+1)).str()); + s.send(msg); + } + + for (uint i = 0; i < fix.queues.size(); i++) { + Message msg; + BOOST_CHECK(fix.session.nextReceiver().fetch(msg, qpid::sys::TIME_SEC)); + BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str()); + } +} QPID_AUTO_TEST_CASE(testMapMessage) { --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org