On 02/14/2011 09:45 PM, Gregory Bayard wrote:
Hi Gordon,I was originally working with 0.6 and the double-quoting was a quick attempt to find some problem with my exchange and queue specification. When I updated my addressing string for Qpid 0.8, I still get the same result. The message is picked up by topic listeners listening to the masterserver.summary.queue' topic on the 'masterserver.direct' exchange. The string, as you specified, is now: "masterserver.direct/masterserver.summary.queue; {create: always, node: {type:topic, x-declare:{type:direct}}}"
To be clear, I don't think you need the "; {create: always, node: {type:topic, x-declare:{type:direct}}}" part, that was just as an example of the correct syntax.
Keep in mind this queue will essentially always have been created by the C++ server which is essentially guaranteed to start before any Python clients. I just can't figure out anyway to get Python to send messages to the 'masterserver.summary.queue' on the 'masterserver.direct' exchange.
Just to be clear, are you saying that creating a sender in python for the address "masterserver.direct/masterserver.summary.queue" and then sending a message with that, the messages do not get to the queue in question?
That works fine for me, are you sure you don't have older python libs on your path?
However, I have no problem sending messages to the 'amq.direct' direct exchange from Python. This issue seems to be limited to custom direct exchanges. Any suggestions on how I could get a message via the custom direct exchange created by the C++ from Python to the C++ server's queue would be greatly appreciated.
Are you getting errors or the messages just appear to be dropped? As I say it works fine for me with 0.8 packages. See attached client and server. The server is a modified version of the old example that I think matches what you describe, the client is a simple python program that sends to "masterserver.direct/masterserver.summary.queue". Start the server first, then run the client and that all works for me, does it work for you?
Btw, there is a messaging API in c++ as well that I think is a lot simpler than the old qpid/client API. It is conceptually similar to the python API you are using.
#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import sys from qpid.messaging import * if len(sys.argv)<2: broker = "localhost:5672" else: broker = sys.argv[1] connection = Connection(broker) try: connection.open() session = connection.session() sender = session.sender("masterserver.direct/masterserver.summary.queue") receiver = session.receiver("response-queue-#; {create: always, node:{x-declare:{auto_delete:True}}}") msg = Message("Hello world!") msg.reply_to = receiver.source sender.send(msg); message = receiver.fetch() print message.content session.acknowledge() except MessagingError,m: print m connection.close()
/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include <qpid/client/Connection.h> #include <qpid/client/Session.h> #include <qpid/client/AsyncSession.h> #include <qpid/client/Message.h> #include <qpid/client/MessageListener.h> #include <qpid/client/SubscriptionManager.h> #include <cstdlib> #include <iostream> #include <sstream> #include <string> using namespace qpid::client; using namespace qpid::framing; using std::stringstream; using std::string; class Listener : public MessageListener{ private: SubscriptionManager& subscriptions; AsyncSession asyncSession; public: Listener(SubscriptionManager& subscriptions, Session& session); virtual void received(Message& message); }; Listener::Listener(SubscriptionManager& subs, Session& session) : subscriptions(subs), asyncSession(session) {} void Listener::received(Message& request) { Message response; if (!request.getMessageProperties().hasReplyTo()) { std::cout << "Error: " << "No reply-to for request (" << request.getData() << ")" << std::endl; return; } std::cout << "Request: " << request.getData() << " (" << request.getMessageProperties().getReplyTo() << ")" << std::endl; // Transform message content to upper case std::string s = request.getData(); std::transform (s.begin(), s.end(), s.begin(), toupper); response.setData(s); // Send it back to the user response.getDeliveryProperties().setRoutingKey(request.getMessageProperties().getReplyTo().getRoutingKey()); // Asynchronous transfer sends messages as quickly as // possible without waiting for confirmation. asyncSession.messageTransfer(arg::content=response, arg::destination=request.getMessageProperties().getReplyTo().getExchange()); } int main(int argc, char** argv) { const char* host = argc>1 ? argv[1] : "127.0.0.1"; int port = argc>2 ? atoi(argv[2]) : 5672; Connection connection; try { connection.open(host, port); Session session = connection.newSession(); session.exchangeDeclare(arg::exchange="masterserver.direct", arg::type="direct"); session.queueDeclare(arg::queue="masterserver.summary.queue"); session.exchangeBind(arg::exchange="masterserver.direct", arg::queue="masterserver.summary.queue", arg::bindingKey="masterserver.summary.queue"); SubscriptionManager subscriptions(session); Listener listener(subscriptions, session); subscriptions.subscribe(listener, "masterserver.summary.queue"); std::cout << "Waiting for requests" << std::endl; subscriptions.run(); connection.close(); return 0; } catch(const std::exception& error) { std::cout << error.what() << std::endl; } return 1; }
--------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:[email protected]
