Hi Filip, I think I might have had the same problem as you in the past because the methods (I think) are required are not available on the JMS interface. My solution was a rather hacky approach of casting the JMS instance to the actual Qpid type, as in the following examples:
private void method(String method, Map arguments, String address) throws JMSException { Destination reply_to = session.createTemporaryQueue(); MessageConsumer receiver = session.createConsumer(reply_to); JmsMapMessage request = (JmsMapMessage) session.createMapMessage(); request.setValidatePropertyNames(false); request.setStringProperty("x-amqp-0-10.app-id", "qmf2"); request.setStringProperty("qmf.opcode", "_method_request"); request.setStringProperty("method", "request"); Map object_id_map = new HashMap(); object_id_map.put("_object_name", address); request.setString("_method_name", method); // Can't set map directly on object, so we'll hack it here JmsMapMessageFacade facade = (JmsMapMessageFacade)request.getFacade(); facade.put("_object_id", object_id_map); facade.put("_arguments", arguments); int correlation_id = sendMessage(request, reply_to); List<Message> response = awaitResponse(receiver, correlation_id, 10 * 1000); if (response != null && response.size() > 0) { new ResponseDecoder().decodeResponse(response); } else { throw new JMSException("No response received"); } } private int sendRequest(String opcode, Map<String, ?> query, Destination replyAddress) throws JMSException { JmsMapMessage request = (JmsMapMessage) session.createMapMessage(); request.setValidatePropertyNames(false); request.setStringProperty("x-amqp-0-10.app-id", "qmf2"); request.setStringProperty("qmf.opcode", opcode); // Can't set map directly on object, so we'll hack it here JmsMapMessageFacade facade = (JmsMapMessageFacade)request.getFacade(); for (Map.Entry<String, ?> entry : query.entrySet()) { facade.put(entry.getKey(), entry.getValue()); } return sendMessage(request, replyAddress); } Unfortunately I haven't structured our Java implementation of the QMF2 interface as well as I could have done and it's rather entrenched in another of our libraries but I'm sure no one will mind if I share the attached implementation (based on the Qpid Python implementation) for reference. /Chris On 19 December 2016 at 14:14, Filip Nguyen <fngu...@redhat.com> wrote: > We currently use Java JMS qpid-client 0.32. It is quite easy to use QMF > [1] just by using MapMessage etc. > > With new Java Qpid JMS client org.apache.qpid:qpid-jms-client:jar:0.10.0 > I couldn't find a way how to use Qpid QMF. Is there any existing usage or > example how to do that? > > [1] > > MapMessage request = session.createMapMessage(); > request.setJMSReplyTo(responseQueue); > request.setStringProperty("x-amqp-0-10.app-id", "qmf2"); > request.setStringProperty("qmf.opcode", "_query_request"); > > > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org > For additional commands, e-mail: users-h...@qpid.apache.org > > -- *Chris Richardson*, System Architect c...@fourc.eu *FourC AS, Vestre Rosten 81, Trekanten, NO-7075 Tiller, Norwaywww.fourc.eu <http://www.fourc.eu/>* *Follow us on LinkedIn <http://bit.ly/fourcli>, Facebook <http://bit.ly/fourcfb>, Google+ <http://bit.ly/fourcgp> and Twitter <http://bit.ly/fourctw>!*
/* * Copyright (C) 2015 FourC AS, http://www.fourc.eu/ * All Rights Reserved. */ package eu.fourc.messenger.fmf.impl.java; import com.google.common.base.MoreObjects; import eu.fourc.messenger.fmf.Exchange; import eu.fourc.messenger.fmf.Node; import eu.fourc.messenger.fmf.Queue; import eu.fourc.messenger.impl.ConnectionImpl; import org.apache.qpid.jms.JmsTopic; import org.apache.qpid.jms.message.JmsMapMessage; import org.apache.qpid.jms.message.JmsMessage; import org.apache.qpid.jms.message.facade.JmsMapMessageFacade; import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFacade; import javax.jms.*; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; public class BrokerAgent { private static final String TARGET_ADDRESS = "qmf.default.direct"; private static final String TARGET_SUBJECT = "broker"; private Session session; private int correlator = 1; // Maybe use atomic int? public BrokerAgent(Connection connection) throws JMSException { Connection conn = connection instanceof ConnectionImpl ? ((ConnectionImpl)connection).getConnection() : connection; session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); } Session getSession() { return session; } private int sendMessage(Message message, Destination replyAddress) throws JMSException { Destination target = new JmsTopic(TARGET_ADDRESS); MessageProducer sender = session.createProducer(target); message.setJMSReplyTo(replyAddress); int local_correlator = correlator++; message.setJMSCorrelationID("ID:" + String.valueOf(local_correlator)); // Routing key ((AmqpJmsMessageFacade)((JmsMessage) message).getFacade()).getAmqpMessage().setSubject(TARGET_SUBJECT); sender.send(message); return local_correlator; } private List<Message> awaitResponse(MessageConsumer receiver, int correlation_id, long timeout) throws JMSException { String local_correlation_id = "ID:" + String.valueOf(correlation_id); List<Message> messages = new LinkedList<>(); long start = System.currentTimeMillis(); boolean finished = false; while (!finished) { // Make sure we don't wait longer than "timeout" in total, even when receiving multiple messages long local_timeout = timeout - System.currentTimeMillis() + start; if (local_timeout < 0) { break; } Message message = receiver.receive(local_timeout); if (message != null) { if (message.getJMSCorrelationID().equals(local_correlation_id)) { messages.add(message); // If the message contains the "partial" flag, there should be more message to read. // Empirical data suggests the broker transmits a maximum of 100 records per message. finished = !message.propertyExists("partial"); } // Don't acknowledge the message here: it could be a message we want (in which case // we can acknowledge it later, when it's been processed) or there was a correlation mismatch // (in which case it has nothing to do with us). } } return messages; } private void method(String method, Map arguments) throws JMSException { method(method, arguments, "org.apache.qpid.broker:broker:amqp-broker"); } private void method(String method, Map arguments, String address) throws JMSException { Destination reply_to = session.createTemporaryQueue(); MessageConsumer receiver = session.createConsumer(reply_to); JmsMapMessage request = (JmsMapMessage) session.createMapMessage(); request.setValidatePropertyNames(false); request.setStringProperty("x-amqp-0-10.app-id", "qmf2"); request.setStringProperty("qmf.opcode", "_method_request"); request.setStringProperty("method", "request"); Map object_id_map = new HashMap(); object_id_map.put("_object_name", address); request.setString("_method_name", method); // Can't set map directly on object, so we'll hack it here JmsMapMessageFacade facade = (JmsMapMessageFacade)request.getFacade(); facade.put("_object_id", object_id_map); facade.put("_arguments", arguments); int correlation_id = sendMessage(request, reply_to); List<Message> response = awaitResponse(receiver, correlation_id, 10 * 1000); if (response != null && response.size() > 0) { new ResponseDecoder().decodeResponse(response); } else { throw new JMSException("No response received"); } } private int sendRequest(String opcode, Map<String, ?> query, Destination replyAddress) throws JMSException { JmsMapMessage request = (JmsMapMessage) session.createMapMessage(); request.setValidatePropertyNames(false); request.setStringProperty("x-amqp-0-10.app-id", "qmf2"); request.setStringProperty("qmf.opcode", opcode); // Can't set map directly on object, so we'll hack it here JmsMapMessageFacade facade = (JmsMapMessageFacade)request.getFacade(); for (Map.Entry<String, ?> entry : query.entrySet()) { facade.put(entry.getKey(), entry.getValue()); } return sendMessage(request, replyAddress); } private List objectQuery(String schemaMetaId, String objectMetaId, String objectId) throws JMSException { Destination reply_to = session.createTemporaryQueue(); MessageConsumer receiver = session.createConsumer(reply_to); Map schemaId = new HashMap(); schemaId.put(objectMetaId, objectId); Map query = new HashMap(); query.put("_what", "OBJECT"); query.put(schemaMetaId, schemaId); int correlation_id = sendRequest("_query_request", query, reply_to); List<Message> response = awaitResponse(receiver, correlation_id, 10 * 1000); return new ResponseDecoder().decodeResponse(response); } private List classQuery(String className) throws JMSException { return objectQuery("_schema_id", "_class_name", className); } private List nameQuery(String objectId) throws JMSException { return objectQuery("_object_id", "_object_name", objectId); } private Node getNode(String nodeType, String name) throws JMSException { String qpidName = String.format("org.apache.qpid.broker:%s:%s", nodeType, name); List nodes = nameQuery(qpidName); return nodes == null || nodes.isEmpty() ? null : (Node) nodes.get(0); } public Queue getQueue(String name) throws JMSException { return (Queue) getNode("queue", name); } public Exchange getExchange(String name) throws JMSException { return (Exchange) getNode("exchange", name); } public List getQueues() throws JMSException { return classQuery("queue"); } public List getExchanges() throws JMSException { return classQuery("exchange"); } public void createExchange(String type, String name, Map<String, String> properties) throws JMSException { Map<String, String> props = new HashMap<>(); props.put("exchange-type", type); if (properties != null) { props.putAll(properties); } Map<String, Object> args = new HashMap(); args.put("type", "exchange"); args.put("name", name); args.put("properties", props); args.put("strict", true); method("create", args); } public void deleteExchange(String name) throws JMSException { Map<String, Object> args = new HashMap<>(); args.put("type", "exchange"); args.put("name", name); method("delete", args); } public void createQueue(String name, Map<String, String> properties) throws JMSException { Map<String, String> props = new HashMap<>(); if (properties != null) { props.putAll(properties); } Map<String, Object> args = new HashMap(); args.put("type", "queue"); args.put("name", name); args.put("properties", props); args.put("strict", true); method("create", args); } public void deleteQueue(String name, boolean if_empty, boolean if_unused) throws JMSException { Map<String, String> options = new HashMap<>(); options.put("if_empty", String.valueOf(if_empty)); options.put("if_unused", String.valueOf(if_unused)); Map<String, Object> args = new HashMap<>(); args.put("type", "queue"); args.put("name", name); args.put("options", options); method("delete", args); } public void bind(String exchange, String queue, String key, Map<String, String> options) throws JMSException { Map<String, String> properties = options == null ? new HashMap<String, String>() : new HashMap<>(options); Map<String, Object> args = new HashMap<>(); args.put("type", "binding"); args.put("name", String.format("%s/%s/%s", exchange, queue, key)); args.put("properties", properties); args.put("strict", true); method("create", args); } public void unbind(String exchange, String queue, String key) throws JMSException { Map<String, Object> args = new HashMap<>(); args.put("type", "binding"); args.put("name", String.format("%s/%s/%s", exchange, queue, key)); args.put("strict", true); method("delete", args); } public void reloadACLFile() throws JMSException { Map arguments = new HashMap(); String address = "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker"; method("reloadACLFile", arguments, address); } @Override public String toString() { return MoreObjects.toStringHelper(this) .add("session", session) .add("correlator", correlator) .toString(); } }
--------------------------------------------------------------------- To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org For additional commands, e-mail: users-h...@qpid.apache.org