On 02/14/2011 09:45 PM, Gregory Bayard wrote:
Hi Gordon,

I was originally working with 0.6 and the double-quoting was a quick
attempt to find some problem with my exchange and queue specification.
  When I updated my addressing string for Qpid 0.8, I still get the
same result.  The message is picked up by topic listeners listening to
the masterserver.summary.queue' topic on the 'masterserver.direct'
exchange.  The string, as you specified, is now:

"masterserver.direct/masterserver.summary.queue; {create: always,
node: {type:topic, x-declare:{type:direct}}}"

To be clear, I don't think you need the "; {create: always, node: {type:topic, x-declare:{type:direct}}}" part, that was just as an example of the correct syntax.

Keep in mind this queue will essentially always have been created by
the C++ server which is essentially guaranteed to start before any
Python clients.  I just can't figure out anyway to get Python to send
messages to the 'masterserver.summary.queue' on the
'masterserver.direct' exchange.

Just to be clear, are you saying that creating a sender in python for the address "masterserver.direct/masterserver.summary.queue" and then sending a message with that, the messages do not get to the queue in question?

That works fine for me, are you sure you don't have older python libs on your path?

However, I have no problem sending
messages to the 'amq.direct' direct exchange from Python.  This issue
seems to be limited to custom direct exchanges.  Any suggestions on
how I could get a message via the custom direct exchange created by
the C++ from Python to the C++ server's queue would be greatly
appreciated.

Are you getting errors or the messages just appear to be dropped? As I say it works fine for me with 0.8 packages. See attached client and server. The server is a modified version of the old example that I think matches what you describe, the client is a simple python program that sends to "masterserver.direct/masterserver.summary.queue". Start the server first, then run the client and that all works for me, does it work for you?

Btw, there is a messaging API in c++ as well that I think is a lot simpler than the old qpid/client API. It is conceptually similar to the python API you are using.
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# 
#   http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

import sys
from qpid.messaging import *

if len(sys.argv)<2:
  broker =  "localhost:5672" 
else:
  broker = sys.argv[1]

connection = Connection(broker)

try:
  connection.open()
  session = connection.session()

  sender = session.sender("masterserver.direct/masterserver.summary.queue")
  receiver = session.receiver("response-queue-#; {create: always, 
node:{x-declare:{auto_delete:True}}}")

  msg = Message("Hello world!")
  msg.reply_to = receiver.source
  sender.send(msg);

  message = receiver.fetch()
  print message.content
  session.acknowledge()

except MessagingError,m:
  print m

connection.close()
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
#include <qpid/client/Connection.h>
#include <qpid/client/Session.h>

#include <qpid/client/AsyncSession.h>
#include <qpid/client/Message.h>
#include <qpid/client/MessageListener.h>
#include <qpid/client/SubscriptionManager.h>

#include <cstdlib>
#include <iostream>

#include <sstream>
#include <string>

using namespace qpid::client;
using namespace qpid::framing;
using std::stringstream;
using std::string;

class Listener : public MessageListener{
  private:
    SubscriptionManager& subscriptions;
    AsyncSession asyncSession;
  public:
    Listener(SubscriptionManager& subscriptions, Session& session);
    virtual void received(Message& message);
};

Listener::Listener(SubscriptionManager& subs, Session& session)
    : subscriptions(subs), asyncSession(session)
{}

void Listener::received(Message& request) {
    Message response;

    if (!request.getMessageProperties().hasReplyTo()) {
        std::cout << "Error: " << "No reply-to for request (" << request.getData() << ")" << std::endl;
        return;
    } 

    std::cout << "Request: " << request.getData() << " (" << request.getMessageProperties().getReplyTo() << ")" << std::endl;

    // Transform message content to upper case
    std::string s = request.getData();
    std::transform (s.begin(), s.end(), s.begin(), toupper);
    response.setData(s);

    // Send it back to the user
    response.getDeliveryProperties().setRoutingKey(request.getMessageProperties().getReplyTo().getRoutingKey());

    // Asynchronous transfer sends messages as quickly as
    // possible without waiting for confirmation.
    asyncSession.messageTransfer(arg::content=response,
                                 arg::destination=request.getMessageProperties().getReplyTo().getExchange());
}


int main(int argc, char** argv) {
    const char* host = argc>1 ? argv[1] : "127.0.0.1";
    int port = argc>2 ? atoi(argv[2]) : 5672;
    Connection connection;

    try {
        connection.open(host, port);
        Session session =  connection.newSession();

	session.exchangeDeclare(arg::exchange="masterserver.direct", arg::type="direct");
	session.queueDeclare(arg::queue="masterserver.summary.queue");
        session.exchangeBind(arg::exchange="masterserver.direct",
                             arg::queue="masterserver.summary.queue",
                             arg::bindingKey="masterserver.summary.queue");

        SubscriptionManager subscriptions(session);
        Listener listener(subscriptions, session);
        subscriptions.subscribe(listener, "masterserver.summary.queue");
        std::cout << "Waiting for requests" << std::endl;
        subscriptions.run();

        connection.close();
        return 0;
    } catch(const std::exception& error) {
        std::cout << error.what() << std::endl;
    }
    return 1;
}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to