On 02/04/2011 05:56 AM, Damon Rolfs wrote:
I'm trying to write a simple Java client (using qpid-java-client-0.8) to
demonstrate communicating with a C++ QPID broker (AMQP 0.10). I would
appreciate any example code demonstrating a Java qpid client connecting to a
C++ qpid broker. I've found several examples of Java JMS and native clients
connecting to Java brokers, but none for Java -> C++.
Have you tried the Drain/Spout examples? They are useful for
experimenting with different types of addresses. There are equivalents
in other languages also.
I've been working to adapt the pure Java examples, however I've run into
issues I've been unable to resolve. I've had best luck using the AMQ*
classes rather than the JMS classes. I've been able to connect to the broker
and send a message into the exchange. The message gets dropped however
because my routing key is not set, despite my code setting it by explicitly
calling the AMQDestination.setRoutingKey() method. When I print the
Destination, I get: JMS Destination: :////?routingkey=''.
Using another (C++) client, I place a message for the Java client in a
queue. I have not been able to receive that message using the Java client.
Thank you in advance for your guidance.
Damon
Here's a snippet from my client sample (I removed some of the log code that
shows up below):
AMQConnection connection = new AMQConnection( host, port, "root", "r00tpw",
"10.1.3.3", "/" );
Session session = connection.createSession( true, Session.AUTO_ACKNOWLEDGE
);
AMQAnyDestination outboundQ = new AMQAnyDestination(
"ADDR:"+sendToExchange+"; {create: always}" );
That address will create a *queue* with whatever name the string
sendToExchange contains. If you want to create an exchange you need to
specify the node type. E.g. "my-exchange; {create: always, node:
{type:topic}}".
However my advice would not be to create your exchange on-demand as part
of you application, but preconfigure the broker. This is simpler and
less error prone in my opinion.
E.g. get the management scripts and then run:
qpid-config add exchange topic my-exchange
outboundQ.setRoutingKey( new AMQShortString(routingKey) ); // routing key is
a string: B.RRAA.AMQPApp2Key.Something
AMQAnyDestination inboundQ = new AMQAnyDestination(
"ADDR:"+receivingQueue+"; {create: always}" );
You don't have the receiving queue bound to the exchange you are sending
to here. (Though as above you aren't actually creating an exchange you
are creating a queue).
If you want this queue to exist independent of the lifecycle of the
receiver then I would create and bind it using qpid-config rather than
doing so on-demand.
If however you simply want a subscription queue for the lifetime of the
consumer, bound with a given routing-key, then you don't need to
explicitly create that queue. Just create a consumer from an address
that resolves to the exchange and specifies a subject containing the
desired routing-key.
There is a short description of addresses at
http://qpid.apache.org/books/0.8/Programming-In-Apache-Qpid/html/ch02s04.html
which might be worth a look if you have not done so already.
I've attached a very simple example that I think demonstrates what you
are attempting here. Hope that helps.
MessageProducer producer = session.createProducer( outboundQ );
MessageConsumer consumer = session.createConsumer( inboundQ );
BytesMessage outboundMessage = createEmpMessage( session, empDataString );
// creates and populates a JMS BytesMessage
producer.send( outboundMessage );
Message inboundMessage = consumer.receive( 10 * 1000 ); // I've also tried
setting up a listener to no avail
String msgType = (inboundMessage != null) ? inboundMessage.getJMSType :
"###NULL###";
Logger.info( "RECEIVED:type:"+msgType+"\
tcontent:"+inboundMessage );
connection.close();
import javax.jms.*;
import javax.naming.*;
import java.util.Properties;
public class Example {
public static void main(String[] args) {
try {
String url = "amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'";
String address = "my-exchange/B.RRAA.AMQPApp2Key.Something; {create: always, node: {type:topic}}";
Properties props = new Properties();
props.setProperty("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
props.setProperty("connectionfactory.host", url);
props.setProperty("destination.address", address);
Context ctx = new InitialContext(props);
ConnectionFactory factory = (ConnectionFactory) ctx.lookup("host");
Destination target = (Destination) ctx.lookup("address");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer sender = session.createProducer(target);
MessageConsumer receiver = session.createConsumer(target);
connection.start();
sender.send(session.createTextMessage("Hello World!"));
Message message = receiver.receive(10*1000);
if (message != null) {
if (message instanceof TextMessage) {
System.out.println("Received text message: " + ((TextMessage) message).getText());
} else {
System.out.println("Received message: " + message);
}
} else {
System.out.println("No message received");
}
connection.close();
} catch (Exception e) {
System.out.println("Exception occurred: " + e.toString());
e.printStackTrace();
}
}
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]