On 09/18/2014 12:56 AM, Spencer.Doak wrote:
Hello,

I'm faced with a bit of an issue. For an application I'm working on, I'm in
need of several abilities:

1. Create, destroy, and clear queues on an exchange.
2. Create and destroy exchanges on a broker.
3. Fetch a list of all existing connections on a broker.
4. If possible, send a management message telling the broker to shutdown.
5. Verify the existence of a queue. (I have a workaround for this)

Based on what I've been reading, I believe QMF would be the way to go for
this. The issue is that I cannot seem to find much information about QMF
despite all of the digging I've been doing.

Yes, sorry about that, it is indeed not documented very well at all. Fortunately, it's quite simple.

All you do is send specially structured map messages to the broker of interest, and then process the replies that come back.

There are two categories of request. The first is a method request, the two methods of most interest being 'create' and 'delete'. With these you can create or delete queues, exchanges and bindings. The second category is a query request. With these you get back the objects of a particular type (e.g. connection, queue, exchange) or indeed a specific object by id.

All qmf request messages are sent to 'qmf.default.direct/broker '(i.e. to the exchange named 'qmf.default.direct', with a routing key or subject of 'broker'). The message should contain a reply-to address through which responses will be sent.

There are two message properties that must also be set. These are 'x-amqp-0-10.app-id', which should always have the value 'qmf2' and 'qmf.opcode' which for method requests should always have the value '_method_request' and for query requests should have the value '_query_request'.

For a method requests the content is a map with three top-level entries. The first is an entry with key _object_id whose value is a nested map identifying the target of the command. For the commands considered here (create and delete) the target is always the broker itself. Thus the _object_id map contains a single value with key _object_name and value org.apache.qpid.broker:broker:amqp-broker. The second is an entry with key '_method_name' whose value is the method name (i.e. create or delete). The third is an entry with key '_arguments' which is a nested map represention the arguments to the method. For create and delete, the name and type arguments are required. For create you often want an additional 'properties' entry, whose value is a nested map containing the properties of the object you want to create (e.g. for a queue qpid.max_count or similar).

For a query request the content is a map with one entry named '_schema_id' that usually contains a nested map with one entry, '_class_name' containing the type of object to query, e.g. queue, exchange etc. In addition there is an entry with key '_what' and value 'OBJECT'.

The expected response for a method request will have the qmf.opcode property set to _method_response. For a query request it will be _query_response.

I've attached a very rudimentary example that shows create, delete and querying which I suspect is easier to follow then the textual description of the structure. To use it specify one of create, delete or list and a type (queue, exchange, binding etc). For create and delete you also need a 'name'. The 'name' for a binding is a concatenation of exchange name, queue name and key name, with '/' separating them. E.g.

   qmf_ctrl create queue my-queue
   qmf_ctrl create exchange my-exchange
   qmf_ctrl create binding my-exchange/my-queue/my-key

   qmf_ctrl create queue another-queue
   qmf_ctrl list queue
   qmf_ctrl create queue another-queue
   qmf_ctrl list queue

   qmf_ctrl list connection

Hopefully this is enough to get you started. If you have further questions please don't hesitate to ask.

Regarding point 4. in your mail, there is at present no way to shutdown the broker via qmf. You could perhaps create a special 'agent' program to do that. I.e. it would run colocated with the broker, and listen on a special address for a shutdown message, on receiving which it would send a signal to the broker. You might want some sort of security around that, either message based or acl controls on the address in question for the broker.

--Gordon.



/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */


#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/types/Variant.h>
#include <iostream>

using namespace qpid::messaging;
using namespace qpid::types;

using std::string;

int main(int argc, char** argv)
{
    if (argc < 3) {
        std::cout << "Usage: " << argv[0] << " command type [name]" << std::endl << std::endl;
        std::cout << "  commands: list, create or delete" << std::endl;
        std::cout << "  type:     queue, exchange, connection, binding etc" << std::endl << std::endl;
        std::cout << "  delete and create commands require name in addition to type" << std::endl;
        return 1;
    }
    std::string command = argv[1];
    std::string type = argv[2];

    Connection c("localhost");
    try {
        c.open();
        Session session = c.createSession();
        Receiver r = session.createReceiver("#");
        Sender s = session.createSender("qmf.default.direct/broker");

        if (command == "list") {
            Message request;
            request.setReplyTo(r.getAddress());
            request.setProperty("x-amqp-0-10.app-id", "qmf2");
            request.setProperty("qmf.opcode", "_query_request");
            Variant::Map schemaId;
            schemaId["_class_name"] = type;
            Variant::Map content;
            content["_what"] = "OBJECT";
            content["_schema_id"] = schemaId;
            request.setContentObject(content);
            s.send(request);

            Message response = r.fetch();
            Variant::List contentIn = response.getContentObject().asList();
            for (Variant::List::const_iterator i = contentIn.begin(); i != contentIn.end(); ++i) {
                Variant::Map item = i->asMap();
                Variant::Map properties = item["_values"].asMap();
                if (properties.find("name") != properties.end()) {
                    std::cout << properties["name"] << std::endl;
                } else {
                    std::cout << properties << std::endl;
                }
            }
            session.acknowledge();
        } else if (command == "create" || command == "delete") {
            if (argc < 4) {
                std::cout << "command: " << command << " requires object name in addition to type"  << std::endl;
                return 1;
            }
            Variant::Map args;
            args["type"] = type;
            args["name"] = argv[3];

            Message request;
            request.setReplyTo(r.getAddress());
            request.setProperty("x-amqp-0-10.app-id", "qmf2");
            request.setProperty("qmf.opcode", "_method_request");
            Variant::Map content;
            Variant::Map objectId;
            objectId["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
            content["_object_id"] = objectId;
            content["_method_name"] = command;
            content["_arguments"] = args;
            request.setContentObject(content);
            s.send(request);

            Message response = r.fetch();
            if (response.getProperties()["qmf.opcode"] == "_exception") {
                Variant::Map result = response.getContentObject().asMap();
                std::cerr << "Error on method: " << result["_values"].asMap()["error_text"] << std::endl;
            } else if (response.getProperties()["qmf.opcode"] == "_method_response") {
                std::cout << "OK." << std::endl;
            } else {
                std::cerr << "Unexpected opcode: "  << response.getProperties()["qmf.opcode"] << std::endl;
            }
        } else {
            std::cout << "invalid command: " << command << " (use list, create or delete)"  << std::endl;
        }
    } catch(const std::exception& error) {
        std::cout << "ERROR: " << error.what() << std::endl;
    }
    c.close();
    return 0;
}



---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org

Reply via email to