On 09/26/2013 05:03 AM, Mitsuru Oka wrote:
Another question on Dispatch router is there.

Could you show me an example of request-reply/response pattern via
Dispatch router using proton messenger based sender and receiver? I
tried proton client/server python example, but the client couldn't
receive reply message from server.

$ dispatch-router -c qpid-dispatch.conf
$ ./server.py amqp://0.0.0.0/topic
$ ./client.py amqp://0.0.0.0/topic subject

server output:
amqp://b0f56a1c-573c-42e0-8c6f-46d5735b2301/replies
Dispatched subject None

client output:
<nothing and blocking>

I know this is not what you asked for, but just in case its interesting to you or anyone else reading the thread, attached is a request-response example using qpid::messaging that works with the dispatch router.

It does contain a few modifications compared with the example shipped with the qpid::messaging client (see changes, also attached) to get around https://issues.apache.org/jira/browse/QPID-5175 and to avoid requesting a dynamic reply queue.

/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>

#include <cstdlib>
#include <iostream>

#include <sstream>

using namespace qpid::messaging;

using std::stringstream;
using std::string;

int main(int argc, char** argv) {
    const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
    std::string connectionOptions = argc > 2 ? argv[2] : "{protocol:amqp1.0}";

    Connection connection(url, connectionOptions);
     try {
        connection.open();
        Session session = connection.createSession();

        Sender sender = session.createSender("service_queue");

        Receiver receiver = session.createReceiver(qpid::types::Uuid(true).str());

	// Now send some messages ...
	string s[] = {
            "Twas brillig, and the slithy toves",
            "Did gire and gymble in the wabe.",
            "All mimsy were the borogroves,",
            "And the mome raths outgrabe."
        };

    	Message request;
        request.setProperty("x-amqp-to", "service_queue");
        request.setReplyTo(receiver.getAddress());
	for (int i=0; i<4; i++) {
            request.setContent(s[i]);
            sender.send(request);
            Message response = receiver.fetch();
            std::cout << request.getContent() << " -> " << response.getContent() << std::endl;
	}
        connection.close();
        return 0;
    } catch(const std::exception& error) {
        std::cout << error.what() << std::endl;
        connection.close();
    }
    return 1;
}


/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>

#include <algorithm>
#include <cstdlib>
#include <iostream>
#include <memory>
#include <sstream>

using namespace qpid::messaging;

using std::stringstream;
using std::string;

int main(int argc, char** argv) {
    const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
    std::string connectionOptions = argc > 2 ? argv[2] : "{protocol:amqp1.0}";

    Connection connection(url, connectionOptions);
    try {
        connection.open();
        Session session = connection.createSession();
        Receiver receiver = session.createReceiver("service_queue; {create: always}");
        Sender sender = session.createSender("ignored");

        while (true) {
            Message request = receiver.fetch();
            const Address& address = request.getReplyTo();
            if (address) {
                std::string s = request.getContent();
                std::transform(s.begin(), s.end(), s.begin(), toupper);
                Message response(s);
                response.setProperty("x-amqp-to", address.getName());
                sender.send(response);
                std::cout << "Processed request: "
                          << request.getContent()
                          << " -> "
                          << response.getContent() << std::endl;
                session.acknowledge();
            } else {
                std::cerr << "Error: no reply address specified for request: " << request.getContent() << std::endl;
                session.reject(request);
            }
        }
        connection.close();
        return 0;
    } catch(const std::exception& error) {
        std::cout << error.what() << std::endl;
        connection.close();
    }
    return 1;
}


diff -up ./client.cpp.original ./client.cpp
--- ./client.cpp.original	2013-09-26 10:03:17.598255123 +0100
+++ ./client.cpp	2013-09-26 09:47:53.357205025 +0100
@@ -38,8 +38,8 @@ using std::string;
 
 int main(int argc, char** argv) {
     const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
-    std::string connectionOptions = argc > 2 ? argv[2] : "";
-    
+    std::string connectionOptions = argc > 2 ? argv[2] : "{protocol:amqp1.0}";
+
     Connection connection(url, connectionOptions);
      try {
         connection.open();
@@ -47,9 +47,7 @@ int main(int argc, char** argv) {
 
         Sender sender = session.createSender("service_queue");
 
-        //create temp queue & receiver...
-        Receiver receiver = session.createReceiver("#");
-        Address responseQueue = receiver.getAddress();
+        Receiver receiver = session.createReceiver(qpid::types::Uuid(true).str());
 
 	// Now send some messages ...
 	string s[] = {
@@ -60,7 +58,8 @@ int main(int argc, char** argv) {
         };
 
     	Message request;
-        request.setReplyTo(responseQueue);
+        request.setProperty("x-amqp-to", "service_queue");
+        request.setReplyTo(receiver.getAddress());
 	for (int i=0; i<4; i++) {
             request.setContent(s[i]);
             sender.send(request);
diff -up ./server.cpp.original ./server.cpp
--- ./server.cpp.original	2013-09-26 10:03:08.869254650 +0100
+++ ./server.cpp	2013-09-26 10:02:17.799251882 +0100
@@ -39,26 +39,27 @@ using std::string;
 
 int main(int argc, char** argv) {
     const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
-    std::string connectionOptions = argc > 2 ? argv[2] : "";
+    std::string connectionOptions = argc > 2 ? argv[2] : "{protocol:amqp1.0}";
 
     Connection connection(url, connectionOptions);
     try {
         connection.open();
         Session session = connection.createSession();
         Receiver receiver = session.createReceiver("service_queue; {create: always}");
+        Sender sender = session.createSender("ignored");
 
         while (true) {
             Message request = receiver.fetch();
             const Address& address = request.getReplyTo();
             if (address) {
-                Sender sender = session.createSender(address);
                 std::string s = request.getContent();
                 std::transform(s.begin(), s.end(), s.begin(), toupper);
                 Message response(s);
+                response.setProperty("x-amqp-to", address.getName());
                 sender.send(response);
-                std::cout << "Processed request: " 
-                          << request.getContent() 
-                          << " -> " 
+                std::cout << "Processed request: "
+                          << request.getContent()
+                          << " -> "
                           << response.getContent() << std::endl;
                 session.acknowledge();
             } else {

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org

Reply via email to