Hi Filip,

I think I might have had the same problem as you in the past because the
methods (I think) are required are not available on the JMS interface. My
solution was a rather hacky approach of casting the JMS instance to the
actual Qpid type, as in the following examples:

  private void method(String method, Map arguments, String address)
throws JMSException {        Destination reply_to =
session.createTemporaryQueue();        MessageConsumer receiver =
session.createConsumer(reply_to);        JmsMapMessage request =
(JmsMapMessage) session.createMapMessage();
request.setValidatePropertyNames(false);
request.setStringProperty("x-amqp-0-10.app-id", "qmf2");
request.setStringProperty("qmf.opcode", "_method_request");
request.setStringProperty("method", "request");        Map
object_id_map = new HashMap();
object_id_map.put("_object_name", address);
request.setString("_method_name", method);        // Can't set map
directly on object, so we'll hack it here        JmsMapMessageFacade
facade = (JmsMapMessageFacade)request.getFacade();
facade.put("_object_id", object_id_map);
facade.put("_arguments", arguments);        int correlation_id =
sendMessage(request, reply_to);        List<Message> response =
awaitResponse(receiver, correlation_id, 10 * 1000);        if
(response != null && response.size() > 0) {            new
ResponseDecoder().decodeResponse(response);        } else {
throw new JMSException("No response received");        }    }
private int sendRequest(String opcode, Map<String, ?> query,
Destination replyAddress) throws JMSException {        JmsMapMessage
request = (JmsMapMessage) session.createMapMessage();
request.setValidatePropertyNames(false);
request.setStringProperty("x-amqp-0-10.app-id", "qmf2");
request.setStringProperty("qmf.opcode", opcode);        // Can't set
map directly on object, so we'll hack it here
JmsMapMessageFacade facade = (JmsMapMessageFacade)request.getFacade();
       for (Map.Entry<String, ?> entry : query.entrySet()) {
 facade.put(entry.getKey(), entry.getValue());        }        return
sendMessage(request, replyAddress);    }


Unfortunately I haven't structured our Java implementation of the QMF2
interface as well as I could have done and it's rather entrenched in
another of our libraries but I'm sure no one will mind if I share the
attached implementation (based on the Qpid Python implementation) for
reference.

/Chris


On 19 December 2016 at 14:14, Filip Nguyen <fngu...@redhat.com> wrote:

> We currently use Java JMS qpid-client 0.32. It is quite easy to use QMF
> [1] just by using MapMessage etc.
>
> With new Java Qpid JMS client org.apache.qpid:qpid-jms-client:jar:0.10.0
> I couldn't find a way how to use Qpid QMF. Is there any existing usage or
> example how to do that?
>
> [1]
>
>             MapMessage request = session.createMapMessage();
>             request.setJMSReplyTo(responseQueue);
>             request.setStringProperty("x-amqp-0-10.app-id", "qmf2");
>             request.setStringProperty("qmf.opcode", "_query_request");
>
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
> For additional commands, e-mail: users-h...@qpid.apache.org
>
>


-- 

*Chris Richardson*, System Architect
c...@fourc.eu


*FourC AS, Vestre Rosten 81, Trekanten, NO-7075 Tiller, Norwaywww.fourc.eu
<http://www.fourc.eu/>*

*Follow us on LinkedIn <http://bit.ly/fourcli>, Facebook
<http://bit.ly/fourcfb>, Google+ <http://bit.ly/fourcgp> and Twitter
<http://bit.ly/fourctw>!*
/*
 * Copyright (C) 2015 FourC AS, http://www.fourc.eu/
 * All Rights Reserved.
 */
package eu.fourc.messenger.fmf.impl.java;

import com.google.common.base.MoreObjects;
import eu.fourc.messenger.fmf.Exchange;
import eu.fourc.messenger.fmf.Node;
import eu.fourc.messenger.fmf.Queue;
import eu.fourc.messenger.impl.ConnectionImpl;
import org.apache.qpid.jms.JmsTopic;
import org.apache.qpid.jms.message.JmsMapMessage;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.message.facade.JmsMapMessageFacade;
import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFacade;

import javax.jms.*;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

public class BrokerAgent {
    private static final String TARGET_ADDRESS = "qmf.default.direct";
    private static final String TARGET_SUBJECT = "broker";

    private Session session;

    private int correlator = 1; // Maybe use atomic int?

    public BrokerAgent(Connection connection) throws JMSException {
        Connection conn = connection instanceof ConnectionImpl ? ((ConnectionImpl)connection).getConnection() : connection;
        session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
    }

    Session getSession() {
        return session;
    }

    private int sendMessage(Message message, Destination replyAddress) throws JMSException {
        Destination target = new JmsTopic(TARGET_ADDRESS);
        MessageProducer sender = session.createProducer(target);

        message.setJMSReplyTo(replyAddress);

        int local_correlator = correlator++;
        message.setJMSCorrelationID("ID:" + String.valueOf(local_correlator));

        // Routing key
        ((AmqpJmsMessageFacade)((JmsMessage) message).getFacade()).getAmqpMessage().setSubject(TARGET_SUBJECT);

        sender.send(message);

        return local_correlator;
    }

    private List<Message> awaitResponse(MessageConsumer receiver, int correlation_id, long timeout) throws JMSException {
        String local_correlation_id = "ID:" + String.valueOf(correlation_id);
        List<Message> messages = new LinkedList<>();
        long start = System.currentTimeMillis();
        boolean finished = false;
        while (!finished) {
            // Make sure we don't wait longer than "timeout" in total, even when receiving multiple messages
            long local_timeout = timeout - System.currentTimeMillis() + start;
            if (local_timeout < 0) {
                break;
            }
            Message message = receiver.receive(local_timeout);
            if (message != null) {
                if (message.getJMSCorrelationID().equals(local_correlation_id)) {
                    messages.add(message);

                    // If the message contains the "partial" flag, there should be more message to read.
                    // Empirical data suggests the broker transmits a maximum of 100 records per message.
                    finished = !message.propertyExists("partial");
                }
                // Don't acknowledge the message here: it could be a message we want (in which case
                // we can acknowledge it later, when it's been processed) or there was a correlation mismatch
                // (in which case it has nothing to do with us).
            }
        }
        return messages;
    }

    private void method(String method, Map arguments) throws JMSException {
        method(method, arguments, "org.apache.qpid.broker:broker:amqp-broker");
    }

    private void method(String method, Map arguments, String address) throws JMSException {
        Destination reply_to = session.createTemporaryQueue();
        MessageConsumer receiver = session.createConsumer(reply_to);

        JmsMapMessage request = (JmsMapMessage) session.createMapMessage();
        request.setValidatePropertyNames(false);
        request.setStringProperty("x-amqp-0-10.app-id", "qmf2");
        request.setStringProperty("qmf.opcode", "_method_request");
        request.setStringProperty("method", "request");

        Map object_id_map = new HashMap();
        object_id_map.put("_object_name", address);

        request.setString("_method_name", method);

        // Can't set map directly on object, so we'll hack it here
        JmsMapMessageFacade facade = (JmsMapMessageFacade)request.getFacade();
        facade.put("_object_id", object_id_map);
        facade.put("_arguments", arguments);

        int correlation_id = sendMessage(request, reply_to);

        List<Message> response = awaitResponse(receiver, correlation_id, 10 * 1000);
        if (response != null && response.size() > 0) {
            new ResponseDecoder().decodeResponse(response);
        } else {
            throw new JMSException("No response received");
        }
    }

    private int sendRequest(String opcode, Map<String, ?> query, Destination replyAddress) throws JMSException {
        JmsMapMessage request = (JmsMapMessage) session.createMapMessage();
        request.setValidatePropertyNames(false);
        request.setStringProperty("x-amqp-0-10.app-id", "qmf2");
        request.setStringProperty("qmf.opcode", opcode);

        // Can't set map directly on object, so we'll hack it here
        JmsMapMessageFacade facade = (JmsMapMessageFacade)request.getFacade();
        for (Map.Entry<String, ?> entry : query.entrySet()) {
            facade.put(entry.getKey(), entry.getValue());
        }

        return sendMessage(request, replyAddress);
    }

    private List objectQuery(String schemaMetaId, String objectMetaId, String objectId) throws JMSException {
        Destination reply_to = session.createTemporaryQueue();
        MessageConsumer receiver = session.createConsumer(reply_to);

        Map schemaId = new HashMap();
        schemaId.put(objectMetaId, objectId);
        Map query = new HashMap();
        query.put("_what", "OBJECT");
        query.put(schemaMetaId, schemaId);

        int correlation_id = sendRequest("_query_request", query, reply_to);

        List<Message> response = awaitResponse(receiver, correlation_id, 10 * 1000);
        return new ResponseDecoder().decodeResponse(response);
    }

    private List classQuery(String className) throws JMSException {
        return objectQuery("_schema_id", "_class_name", className);
    }

    private List nameQuery(String objectId) throws JMSException {
        return objectQuery("_object_id", "_object_name", objectId);
    }

    private Node getNode(String nodeType, String name) throws JMSException {
        String qpidName = String.format("org.apache.qpid.broker:%s:%s", nodeType, name);

        List nodes = nameQuery(qpidName);

        return nodes == null || nodes.isEmpty() ? null : (Node) nodes.get(0);
    }

    public Queue getQueue(String name) throws JMSException {
        return (Queue) getNode("queue", name);
    }

    public Exchange getExchange(String name) throws JMSException {
        return (Exchange) getNode("exchange", name);
    }

    public List getQueues() throws JMSException {
        return classQuery("queue");
    }

    public List getExchanges() throws JMSException {
        return classQuery("exchange");
    }

    public void createExchange(String type, String name, Map<String, String> properties) throws JMSException {
        Map<String, String> props = new HashMap<>();
        props.put("exchange-type", type);
        if (properties != null) {
            props.putAll(properties);
        }

        Map<String, Object> args = new HashMap();
        args.put("type", "exchange");
        args.put("name", name);
        args.put("properties", props);
        args.put("strict", true);

        method("create", args);
    }

    public void deleteExchange(String name) throws JMSException {
        Map<String, Object> args = new HashMap<>();
        args.put("type", "exchange");
        args.put("name", name);

        method("delete", args);
    }

    public void createQueue(String name, Map<String, String> properties) throws JMSException {
        Map<String, String> props = new HashMap<>();
        if (properties != null) {
            props.putAll(properties);
        }

        Map<String, Object> args = new HashMap();
        args.put("type", "queue");
        args.put("name", name);
        args.put("properties", props);
        args.put("strict", true);

        method("create", args);
    }

    public void deleteQueue(String name, boolean if_empty, boolean if_unused) throws JMSException {
        Map<String, String> options = new HashMap<>();
        options.put("if_empty", String.valueOf(if_empty));
        options.put("if_unused", String.valueOf(if_unused));

        Map<String, Object> args = new HashMap<>();
        args.put("type", "queue");
        args.put("name", name);
        args.put("options", options);

        method("delete", args);
    }

    public void bind(String exchange, String queue, String key, Map<String, String> options) throws JMSException {
        Map<String, String> properties = options == null ? new HashMap<String, String>() : new HashMap<>(options);

        Map<String, Object> args = new HashMap<>();
        args.put("type", "binding");
        args.put("name", String.format("%s/%s/%s", exchange, queue, key));
        args.put("properties", properties);
        args.put("strict", true);

        method("create", args);
    }

    public void unbind(String exchange, String queue, String key) throws JMSException {
        Map<String, Object> args = new HashMap<>();
        args.put("type", "binding");
        args.put("name", String.format("%s/%s/%s", exchange, queue, key));
        args.put("strict", true);

        method("delete", args);
    }

    public void reloadACLFile() throws JMSException {
        Map arguments = new HashMap();
        String address = "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker";

        method("reloadACLFile", arguments, address);
    }

    @Override
    public String toString() {
        return MoreObjects.toStringHelper(this)
                .add("session", session)
                .add("correlator", correlator)
                .toString();
    }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org

Reply via email to