Author: gsim
Date: Thu Nov 12 10:30:53 2009
New Revision: 835323

URL: http://svn.apache.org/viewvc?rev=835323&view=rev
Log:
Merge branch 'next_receiver_changes' into trunk

Modified:
    qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h
    qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp

Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h?rev=835323&r1=835322&r2=835323&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h Thu Nov 12 10:30:53 
2009
@@ -88,13 +88,14 @@
     QPID_CLIENT_EXTERN bool fetch(Message& message, qpid::sys::Duration 
timeout=qpid::sys::TIME_INFINITE);
     QPID_CLIENT_EXTERN Message fetch(qpid::sys::Duration 
timeout=qpid::sys::TIME_INFINITE);
     QPID_CLIENT_EXTERN bool dispatch(qpid::sys::Duration 
timeout=qpid::sys::TIME_INFINITE);
+    QPID_CLIENT_EXTERN bool nextReceiver(Receiver&, qpid::sys::Duration 
timeout=qpid::sys::TIME_INFINITE);
+    QPID_CLIENT_EXTERN Receiver nextReceiver(qpid::sys::Duration 
timeout=qpid::sys::TIME_INFINITE);
+    
 
     QPID_CLIENT_EXTERN Sender createSender(const Address& address);
     QPID_CLIENT_EXTERN Sender createSender(const std::string& address);
     QPID_CLIENT_EXTERN Receiver createReceiver(const Address& address);
     QPID_CLIENT_EXTERN Receiver createReceiver(const std::string& address);
-
-    QPID_CLIENT_EXTERN Address createTempQueue(const std::string& baseName = 
std::string());
   private:
   friend class qpid::client::PrivateImplRef<Session>;
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=835323&r1=835322&r2=835323&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Thu Nov 
12 10:30:53 2009
@@ -123,6 +123,15 @@
     return process(&handler, timeout);
 }
 
+bool IncomingMessages::getNextDestination(std::string& destination, Duration 
timeout)
+{
+    //if there is not already a received message, we must wait for one
+    if (received.empty() && !wait(timeout)) return false;
+    //else we have a message in received; return the corresponding destination
+    destination = 
received.front()->as<MessageTransferBody>()->getDestination();
+    return true;
+}
+
 void IncomingMessages::accept()
 {
     acceptTracker.accept(session);
@@ -155,11 +164,11 @@
 }
 
 /**
- * Get a frameset from session queue, waiting for up to the specified
- * duration and returning true if this could be achieved, false
- * otherwise. If a destination is supplied, only return a message for
- * that destination. In this case messages from other destinations
- * will be held on a received queue.
+ * Get a frameset that is accepted by the specified handler from
+ * session queue, waiting for up to the specified duration and
+ * returning true if this could be achieved, false otherwise. Messages
+ * that are not accepted by the handler are pushed onto received queue
+ * for later retrieval.
  */
 bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
 {
@@ -183,6 +192,22 @@
     return false;
 }
 
+bool IncomingMessages::wait(qpid::sys::Duration duration)
+{
+    AbsTime deadline(AbsTime::now(), duration);
+    FrameSet::shared_ptr content;
+    for (Duration timeout = duration; incoming->pop(content, timeout); timeout 
= Duration(AbsTime::now(), deadline)) {
+        if (content->isA<MessageTransferBody>()) {
+            QPID_LOG(debug, "Pushed " << *content->getMethod() << " to 
received queue");
+            received.push_back(content);
+            return true;
+        } else {
+            //TODO: handle other types of commands (e.g. message-accept, 
message-flow etc)
+        }
+    }
+    return false;
+}
+
 uint32_t IncomingMessages::pendingAccept()
 {
     return acceptTracker.acceptsPending();

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h?rev=835323&r1=835322&r2=835323&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h Thu Nov 12 
10:30:53 2009
@@ -70,8 +70,7 @@
 
     void setSession(qpid::client::AsyncSession session);
     bool get(Handler& handler, qpid::sys::Duration timeout);
-    //bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout);
-    //bool get(const std::string& destination, qpid::messaging::Message& 
message, qpid::sys::Duration timeout);
+    bool getNextDestination(std::string& destination, qpid::sys::Duration 
timeout);
     void accept();
     void releaseAll();
     void releasePending(const std::string& destination);
@@ -90,6 +89,7 @@
     AcceptTracker acceptTracker;
 
     bool process(Handler*, qpid::sys::Duration);
+    bool wait(qpid::sys::Duration);
     void retrieve(FrameSetPtr, qpid::messaging::Message*);
 
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=835323&r1=835322&r2=835323&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Thu Nov 12 
10:30:53 2009
@@ -210,6 +210,19 @@
 
 }
 
+
+bool SessionImpl::getNextReceiver(Receiver* receiver, 
IncomingMessages::MessageTransfer& transfer)
+{
+    Receivers::const_iterator i = receivers.find(transfer.getDestination());
+    if (i == receivers.end()) {
+        QPID_LOG(error, "Received message for unknown destination " << 
transfer.getDestination());
+        return false;
+    } else {
+        *receiver = i->second;
+        return true;
+    }
+}
+
 bool SessionImpl::accept(ReceiverImpl* receiver, 
                          qpid::messaging::Message* message, 
                          bool isDispatch, 
@@ -279,6 +292,37 @@
     }
 }
 
+bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, 
qpid::sys::Duration timeout)
+{
+    qpid::sys::Mutex::ScopedLock l(lock);
+    while (true) {
+        try {
+            std::string destination;
+            if (incoming.getNextDestination(destination, timeout)) {
+                Receivers::const_iterator i = receivers.find(destination);
+                if (i == receivers.end()) {
+                    throw qpid::Exception(QPID_MSG("Received message for 
unknown destination " << destination));
+                } else {
+                    receiver = i->second;
+                }
+                return true;
+            } else {
+                return false;
+            }
+        } catch (TransportFailure&) {
+            reconnect();
+        }
+    }
+}
+
+qpid::messaging::Receiver SessionImpl::nextReceiver(qpid::sys::Duration 
timeout)
+{
+    qpid::messaging::Receiver receiver;
+    if (!nextReceiver(receiver, timeout)) throw Receiver::NoMessageAvailable();
+    if (!receiver) throw qpid::Exception("Bad receiver returned!");
+    return receiver;
+}
+
 uint32_t SessionImpl::available()
 {
     return get1<Available, uint32_t>((const std::string*) 0);

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h?rev=835323&r1=835322&r2=835323&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h Thu Nov 12 
10:30:53 2009
@@ -73,6 +73,10 @@
     qpid::messaging::Message fetch(qpid::sys::Duration timeout);
     bool dispatch(qpid::sys::Duration timeout);
 
+    bool nextReceiver(qpid::messaging::Receiver& receiver, qpid::sys::Duration 
timeout);
+    qpid::messaging::Receiver nextReceiver(qpid::sys::Duration timeout);
+
+
     bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, 
qpid::sys::Duration timeout);    
 
     void receiverCancelled(const std::string& name);
@@ -115,6 +119,7 @@
     bool acceptAny(qpid::messaging::Message*, bool, 
IncomingMessages::MessageTransfer&);
     bool accept(ReceiverImpl*, qpid::messaging::Message*, bool, 
IncomingMessages::MessageTransfer&);
     bool getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration 
timeout);
+    bool getNextReceiver(qpid::messaging::Receiver* receiver, 
IncomingMessages::MessageTransfer& transfer);
     void reconnect();
 
     void commitImpl();

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp?rev=835323&r1=835322&r2=835323&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp Thu Nov 12 10:30:53 2009
@@ -65,11 +65,6 @@
     return impl->createReceiver(Address(address)); 
 }
 
-Address Session::createTempQueue(const std::string& baseName)
-{ 
-    return impl->createTempQueue(baseName); 
-}
-
 void Session::sync()
 {
     impl->sync();
@@ -94,6 +89,18 @@
 {
     return impl->dispatch(timeout);
 }
+
+bool Session::nextReceiver(Receiver& receiver, qpid::sys::Duration timeout)
+{
+    return impl->nextReceiver(receiver, timeout);
+}
+
+
+Receiver Session::nextReceiver(qpid::sys::Duration timeout)
+{
+    return impl->nextReceiver(timeout);
+}
+
 uint32_t Session::available() { return impl->available(); }
 uint32_t Session::pendingAck() { return impl->pendingAck(); }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h?rev=835323&r1=835322&r2=835323&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h Thu Nov 12 10:30:53 
2009
@@ -51,9 +51,10 @@
     virtual bool fetch(Message& message, qpid::sys::Duration timeout) = 0;
     virtual Message fetch(qpid::sys::Duration timeout) = 0;
     virtual bool dispatch(qpid::sys::Duration timeout) = 0;
-    virtual Address createTempQueue(const std::string& baseName) = 0;
     virtual Sender createSender(const Address& address) = 0;
     virtual Receiver createReceiver(const Address& address) = 0;
+    virtual bool nextReceiver(Receiver& receiver, qpid::sys::Duration timeout) 
= 0;
+    virtual Receiver nextReceiver(qpid::sys::Duration timeout) = 0;
     virtual uint32_t available() = 0;
     virtual uint32_t pendingAck() = 0;
   private:

Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=835323&r1=835322&r2=835323&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Thu Nov 12 10:30:53 
2009
@@ -354,6 +354,28 @@
     BOOST_CHECK_EQUAL(collector.messageData, 
boost::assign::list_of<std::string>("Message_1")("Message_2")("Message_3"));
 }
 
+QPID_AUTO_TEST_CASE(testNextReceiver)
+{
+    MultiQueueFixture fix;
+
+    for (uint i = 0; i < fix.queues.size(); i++) {
+        Receiver r = fix.session.createReceiver(fix.queues[i]);
+        r.setCapacity(10u);
+        r.start();//TODO: add Session::start
+    }
+
+    for (uint i = 0; i < fix.queues.size(); i++) {
+        Sender s = fix.session.createSender(fix.queues[i]);
+        Message msg((boost::format("Message_%1%") % (i+1)).str());
+        s.send(msg);
+    }
+
+    for (uint i = 0; i < fix.queues.size(); i++) {
+        Message msg;
+        BOOST_CHECK(fix.session.nextReceiver().fetch(msg, 
qpid::sys::TIME_SEC));
+        BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % 
(i+1)).str());
+    }
+}
 
 QPID_AUTO_TEST_CASE(testMapMessage)
 {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to