User: hiram
Date: 00/12/12 13:30:52
Added: examples AckEquivExample.java DurableSubscriberExample.java
TextListener.java README AsynchTopicExample.java
MessageConversion.java AsynchQueueReceiver.java
TransactedExample.java RequestReplyQueue.java
ObjectMessages.java SynchTopicExample.java
SampleUtilities.java SynchQueueReceiver.java
MessageHeadersTopic.java run.bat BytesMessages.java
TopicSelectors.java TestClient.java
SenderToQueue.java run.sh MessageFormats.java
Removed: examples client.bat client.sh
Log:
More example clients using spyderMQ
Revision Changes Path
1.1 spyderMQ/examples/AckEquivExample.java
Index: AckEquivExample.java
===================================================================
/*
* @(#)AckEquivExample.java 1.6 00/08/18
*
* Copyright (c) 2000 Sun Microsystems, Inc. All Rights Reserved.
*
* Sun grants you ("Licensee") a non-exclusive, royalty free, license to use,
* modify and redistribute this software in source and binary code form,
* provided that i) this copyright notice and license appear on all copies of
* the software; and ii) Licensee does not utilize the software in a manner
* which is disparaging to Sun.
*
* This software is provided "AS IS," without a warranty of any kind. ALL
* EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING ANY
* IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR
* NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN AND ITS LICENSORS SHALL NOT BE
* LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING
* OR DISTRIBUTING THE SOFTWARE OR ITS DERIVATIVES. IN NO EVENT WILL SUN OR ITS
* LICENSORS BE LIABLE FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT,
* INDIRECT, SPECIAL, CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER
* CAUSED AND REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF
* OR INABILITY TO USE SOFTWARE, EVEN IF SUN HAS BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGES.
*
* This software is not designed or intended for use in on-line control of
* aircraft, air traffic, aircraft navigation or aircraft communications; or in
* the design, construction, operation or maintenance of any nuclear
* facility. Licensee represents and warrants that it will not use or
* redistribute the Software for such purposes.
*/
import javax.jms.*;
/**
* The AckEquivExample class shows how the following two scenarios both ensure
* that a message will not be acknowledged until processing of it is complete:
* <ul>
* <li> Using an asynchronous receiver (message listener) in an
* AUTO_ACKNOWLEDGE session
* <li> Using a synchronous receiver in a CLIENT_ACKNOWLEDGE session
* </ul>
* <p>
* With a message listener, the automatic acknowledgment happens when the
* onMessage method returns -- that is, after message processing has finished.
* <p>
* With a synchronous receive, the client acknowledges the message after
* processing is complete. (If you use AUTO_ACKNOWLEDGE with a synchronous
* receive, the acknowledgement happens immediately after the receive call; if
* any subsequent processing steps fail, the message cannot be redelivered.)
* <p>
* The program contains a SynchSender class, a SynchReceiver class, an
* AsynchSubscriber class with a TextListener class, a MultiplePublisher class,
* a main method, and a method that runs the other classes' threads.
* <p>
* Specify a queue name and a topic name on the command line when you run the
* program. The program also uses a queue named "controlQueue", which should be
* created before you run the program.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
public class AckEquivExample {
final String CONTROL_QUEUE = "controlQueue";
String queueName = null;
String topicName = null;
int exitResult = 0;
/**
* The SynchSender class creates a session in CLIENT_ACKNOWLEDGE mode and
* sends a message.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
public class SynchSender extends Thread {
/**
* Runs the thread.
*/
public void run() {
QueueConnectionFactory queueConnectionFactory = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;
Queue queue = null;
QueueSender queueSender = null;
final String MSG_TEXT =
new String("Here is a client-acknowledge message");
TextMessage message = null;
try {
queueConnectionFactory =
SampleUtilities.getQueueConnectionFactory();
queueConnection =
queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false,
Session.CLIENT_ACKNOWLEDGE);
queue = SampleUtilities.getQueue(queueName, queueSession);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
/*
* Create client-acknowledge sender.
* Create and send message.
*/
try {
System.out.println(" SENDER: Created client-acknowledge session");
queueSender = queueSession.createSender(queue);
message = queueSession.createTextMessage();
message.setText(MSG_TEXT);
System.out.println(" SENDER: Sending message: "
+ message.getText());
queueSender.send(message);
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
} finally {
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}
}
/**
* The SynchReceiver class creates a session in CLIENT_ACKNOWLEDGE mode and
* receives the message sent by the SynchSender class.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
public class SynchReceiver extends Thread {
/**
* Runs the thread.
*/
public void run() {
QueueConnectionFactory queueConnectionFactory = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;
Queue queue = null;
QueueReceiver queueReceiver = null;
TextMessage message = null;
try {
queueConnectionFactory =
SampleUtilities.getQueueConnectionFactory();
queueConnection =
queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false,
Session.CLIENT_ACKNOWLEDGE);
queue = SampleUtilities.getQueue(queueName, queueSession);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
/*
* Create client-acknowledge receiver.
* Receive message and process it.
* Acknowledge message.
*/
try {
System.out.println(" RECEIVER: Created client-acknowledge session");
queueReceiver = queueSession.createReceiver(queue);
queueConnection.start();
message = (TextMessage) queueReceiver.receive();
System.out.println(" RECEIVER: Processing message: "
+ message.getText());
System.out.println(" RECEIVER: Now I'll acknowledge the message");
message.acknowledge();
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
} finally {
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}
}
/**
* The AsynchSubscriber class creates a session in AUTO_ACKNOWLEDGE mode
* and fetches several messages from a topic asynchronously, using a
* message listener, TextListener.
* <p>
* Each message is acknowledged after the onMessage method completes.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
public class AsynchSubscriber extends Thread {
/**
* The TextListener class implements the MessageListener interface by
* defining an onMessage method for the AsynchSubscriber class.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
private class TextListener implements MessageListener {
final SampleUtilities.DoneLatch monitor =
new SampleUtilities.DoneLatch();
/**
* Casts the message to a TextMessage and displays its text.
* A non-text message is interpreted as the end of the message
* stream, and the message listener sets its monitor state to all
* done processing messages.
*
* @param message the incoming message
*/
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage msg = (TextMessage) message;
try {
System.out.println("SUBSCRIBER: Processing message: "
+ msg.getText());
} catch (JMSException e) {
System.out.println("Exception in onMessage(): "
+ e.toString());
}
} else {
monitor.allDone();
}
}
}
/**
* Runs the thread.
*/
public void run() {
TopicConnectionFactory topicConnectionFactory = null;
TopicConnection topicConnection = null;
TopicSession topicSession = null;
Topic topic = null;
TopicSubscriber topicSubscriber = null;
TextListener topicListener = null;
try {
topicConnectionFactory =
SampleUtilities.getTopicConnectionFactory();
topicConnection =
topicConnectionFactory.createTopicConnection();
topicConnection.setClientID("AckEquivExample");
topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
System.out.println("SUBSCRIBER: Created auto-acknowledge session");
topic = SampleUtilities.getTopic(topicName, topicSession);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
/*
* Create auto-acknowledge subscriber.
* Register message listener (TextListener).
* Start message delivery.
* Send synchronize message to publisher, then wait till all
* messages have arrived.
* Listener displays the messages obtained.
*/
try {
topicSubscriber = topicSession.createDurableSubscriber(topic,
"AckEquivExampleSubscription");
topicListener = new TextListener();
topicSubscriber.setMessageListener(topicListener);
topicConnection.start();
// Let publisher know that subscriber is ready.
try {
SampleUtilities.sendSynchronizeMessage("SUBSCRIBER: ",
CONTROL_QUEUE);
} catch (Exception e) {
System.out.println("Queue probably missing: " + e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
/*
* Asynchronously process messages.
* Block until publisher issues a control message indicating
* end of publish stream.
*/
topicListener.monitor.waitTillDone();
topicSubscriber.close();
topicSession.unsubscribe("AckEquivExampleSubscription");
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
} finally {
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}
}
/**
* The MultiplePublisher class creates a session in AUTO_ACKNOWLEDGE mode
* and publishes three messages to a topic.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
public class MultiplePublisher extends Thread {
/**
* Runs the thread.
*/
public void run() {
TopicConnectionFactory topicConnectionFactory = null;
TopicConnection topicConnection = null;
TopicSession topicSession = null;
Topic topic = null;
TopicPublisher topicPublisher = null;
TextMessage message = null;
final int NUMMSGS = 3;
final String MSG_TEXT =
new String("Here is an auto-acknowledge message");
try {
topicConnectionFactory =
SampleUtilities.getTopicConnectionFactory();
topicConnection =
topicConnectionFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
System.out.println("PUBLISHER: Created auto-acknowledge session");
topic = SampleUtilities.getTopic(topicName, topicSession);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
/*
* After synchronizing with subscriber, create publisher.
* Send 3 messages, varying text slightly.
* Send end-of-messages message.
*/
try {
/*
* Synchronize with subscriber. Wait for message indicating
* that subscriber is ready to receive messages.
*/
try {
SampleUtilities.receiveSynchronizeMessages("PUBLISHER: ",
CONTROL_QUEUE, 1);
} catch (Exception e) {
System.out.println("Queue probably missing: " + e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
topicPublisher = topicSession.createPublisher(topic);
message = topicSession.createTextMessage();
for (int i = 0; i < NUMMSGS; i++) {
message.setText(MSG_TEXT + " " + (i + 1));
System.out.println("PUBLISHER: Publishing message: "
+ message.getText());
topicPublisher.publish(message);
}
// Send a non-text control message indicating end of messages.
topicPublisher.publish(topicSession.createMessage());
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
} finally {
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}
}
/**
* Instantiates the sender, receiver, subscriber, and publisher classes and
* starts their threads.
* Calls the join method to wait for the threads to die.
*/
public void run_threads() {
SynchSender synchSender = new SynchSender();
SynchReceiver synchReceiver = new SynchReceiver();
AsynchSubscriber asynchSubscriber = new AsynchSubscriber();
MultiplePublisher multiplePublisher = new MultiplePublisher();
synchSender.start();
synchReceiver.start();
try {
synchSender.join();
synchReceiver.join();
} catch (InterruptedException e) {}
asynchSubscriber.start();
multiplePublisher.start();
try {
asynchSubscriber.join();
multiplePublisher.join();
} catch (InterruptedException e) {}
}
/**
* Reads the queue and topic names from the command line, then calls the
* run_threads method to execute the program threads.
*
* @param args the topic used by the example
*/
public static void main(String[] args) {
AckEquivExample aee = new AckEquivExample();
if (args.length != 2) {
System.out.println("Usage: java AckEquivExample <queue_name>
<topic_name>");
System.exit(1);
}
aee.queueName = new String(args[0]);
aee.topicName = new String(args[1]);
System.out.println("Queue name is " + aee.queueName);
System.out.println("Topic name is " + aee.topicName);
aee.run_threads();
SampleUtilities.exit(aee.exitResult);
}
}
1.1 spyderMQ/examples/DurableSubscriberExample.java
Index: DurableSubscriberExample.java
===================================================================
/*
* @(#)DurableSubscriberExample.java 1.6 00/08/18
*
* Copyright (c) 2000 Sun Microsystems, Inc. All Rights Reserved.
*
* Sun grants you ("Licensee") a non-exclusive, royalty free, license to use,
* modify and redistribute this software in source and binary code form,
* provided that i) this copyright notice and license appear on all copies of
* the software; and ii) Licensee does not utilize the software in a manner
* which is disparaging to Sun.
*
* This software is provided "AS IS," without a warranty of any kind. ALL
* EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING ANY
* IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR
* NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN AND ITS LICENSORS SHALL NOT BE
* LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING
* OR DISTRIBUTING THE SOFTWARE OR ITS DERIVATIVES. IN NO EVENT WILL SUN OR ITS
* LICENSORS BE LIABLE FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT,
* INDIRECT, SPECIAL, CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER
* CAUSED AND REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF
* OR INABILITY TO USE SOFTWARE, EVEN IF SUN HAS BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGES.
*
* This software is not designed or intended for use in on-line control of
* aircraft, air traffic, aircraft navigation or aircraft communications; or in
* the design, construction, operation or maintenance of any nuclear
* facility. Licensee represents and warrants that it will not use or
* redistribute the Software for such purposes.
*/
import javax.jms.*;
/**
* The DurableSubscriberExample class demonstrates that a durable subscription
* is active even when the subscriber is not active.
* <p>
* The program contains a DurableSubscriber class, a MultiplePublisher class,
* a main method, and a method that instantiates the classes and calls their
* methods in sequence.
* <p>
* The program begins like any publish/subscribe program: the subscriber starts,
* the publisher publishes some messages, and the subscriber receives them.
* <p>
* At this point the subscriber closes itself. The publisher then publishes
* some messages while the subscriber is not active. The subscriber then
* restarts and receives the messages.
* <p>
* Specify a topic name on the command line when you run the program.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
public class DurableSubscriberExample {
String topicName = null;
int exitResult = 0;
static int startindex = 0;
/**
* The DurableSubscriber class contains a constructor, a startSubscriber
* method, a closeSubscriber method, and a finish method.
* <p>
* The class fetches messages asynchronously, using a message listener,
* TextListener.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
public class DurableSubscriber {
TopicConnection topicConnection = null;
TopicSession topicSession = null;
Topic topic = null;
TopicSubscriber topicSubscriber = null;
TextListener topicListener = null;
/**
* The TextListener class implements the MessageListener interface by
* defining an onMessage method for the DurableSubscriber class.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
private class TextListener implements MessageListener {
final SampleUtilities.DoneLatch monitor =
new SampleUtilities.DoneLatch();
/**
* Casts the message to a TextMessage and displays its text.
* A non-text message is interpreted as the end of the message
* stream, and the message listener sets its monitor state to all
* done processing messages.
*
* @param message the incoming message
*/
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage msg = (TextMessage) message;
try {
System.out.println("SUBSCRIBER: Reading message: "
+ msg.getText());
} catch (JMSException e) {
System.out.println("Exception in onMessage(): "
+ e.toString());
}
} else {
monitor.allDone();
}
}
}
/**
* Constructor: looks up a connection factory and topic and creates a
* connection and session.
*/
public DurableSubscriber() {
TopicConnectionFactory topicConnectionFactory = null;
try {
topicConnectionFactory =
SampleUtilities.getTopicConnectionFactory();
topicConnection =
topicConnectionFactory.createTopicConnection();
topicConnection.setClientID("DurableSubscriberExample");
topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
topic = SampleUtilities.getTopic(topicName, topicSession);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
}
/**
* Stops connection, then creates durable subscriber, registers message
* listener (TextListener), and starts message delivery; listener
* displays the messages obtained.
*/
public void startSubscriber() {
try {
System.out.println("Starting subscriber");
topicConnection.stop();
topicSubscriber = topicSession.createDurableSubscriber(topic,
"MakeItLast");
topicListener = new TextListener();
topicSubscriber.setMessageListener(topicListener);
topicConnection.start();
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
}
}
/**
* Blocks until publisher issues a control message indicating
* end of publish stream, then closes subscriber.
*/
public void closeSubscriber() {
try {
topicListener.monitor.waitTillDone();
System.out.println("Closing subscriber");
topicSubscriber.close();
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
}
}
/**
* Closes the connection.
*/
public void finish() {
if (topicConnection != null) {
try {
topicSession.unsubscribe("MakeItLast");
topicConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}
/**
* The MultiplePublisher class publishes several messages to a topic. It
* contains a constructor, a publishMessages method, and a finish method.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
public class MultiplePublisher {
TopicConnection topicConnection = null;
TopicSession topicSession = null;
Topic topic = null;
TopicPublisher topicPublisher = null;
/**
* Constructor: looks up a connection factory and topic and creates a
* connection and session. Also creates the publisher.
*/
public MultiplePublisher() {
TopicConnectionFactory topicConnectionFactory = null;
try {
topicConnectionFactory =
SampleUtilities.getTopicConnectionFactory();
topicConnection =
topicConnectionFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
topic = SampleUtilities.getTopic(topicName, topicSession);
topicPublisher = topicSession.createPublisher(topic);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
}
/**
* Creates text message.
* Sends some messages, varying text slightly.
* Messages must be persistent.
*/
public void publishMessages() {
TextMessage message = null;
int i;
final int NUMMSGS = 3;
final String MSG_TEXT = new String("Here is a message");
try {
message = topicSession.createTextMessage();
for (i = startindex; i < startindex + NUMMSGS; i++) {
message.setText(MSG_TEXT + " " + (i + 1));
System.out.println("PUBLISHER: Publishing message: "
+ message.getText());
topicPublisher.publish(message);
}
// Send a non-text control message indicating end of messages.
topicPublisher.publish(topicSession.createMessage());
startindex = i;
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
}
}
/**
* Closes the connection.
*/
public void finish() {
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}
/**
* Instantiates the subscriber and publisher classes.
*
* Starts the subscriber; the publisher publishes some messages.
*
* Closes the subscriber; while it is closed, the publisher publishes
* some more messages.
*
* Restarts the subscriber and fetches the messages.
*
* Finally, closes the connections.
*/
public void run_program() {
DurableSubscriber durableSubscriber = new DurableSubscriber();
MultiplePublisher multiplePublisher = new MultiplePublisher();
durableSubscriber.startSubscriber();
multiplePublisher.publishMessages();
durableSubscriber.closeSubscriber();
multiplePublisher.publishMessages();
durableSubscriber.startSubscriber();
durableSubscriber.closeSubscriber();
multiplePublisher.finish();
durableSubscriber.finish();
}
/**
* Reads the topic name from the command line, then calls the
* run_program method.
*
* @param args the topic used by the example
*/
public static void main(String[] args) {
DurableSubscriberExample dse = new DurableSubscriberExample();
if (args.length != 1) {
System.out.println("Usage: java DurableSubscriberExample <topic_name>");
System.exit(1);
}
dse.topicName = new String(args[0]);
System.out.println("Topic name is " + dse.topicName);
dse.run_program();
SampleUtilities.exit(dse.exitResult);
}
}
1.1 spyderMQ/examples/TextListener.java
Index: TextListener.java
===================================================================
/*
* @(#)TextListener.java 1.5 00/08/09
*
* Copyright (c) 2000 Sun Microsystems, Inc. All Rights Reserved.
*
* Sun grants you ("Licensee") a non-exclusive, royalty free, license to use,
* modify and redistribute this software in source and binary code form,
* provided that i) this copyright notice and license appear on all copies of
* the software; and ii) Licensee does not utilize the software in a manner
* which is disparaging to Sun.
*
* This software is provided "AS IS," without a warranty of any kind. ALL
* EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING ANY
* IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR
* NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN AND ITS LICENSORS SHALL NOT BE
* LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING
* OR DISTRIBUTING THE SOFTWARE OR ITS DERIVATIVES. IN NO EVENT WILL SUN OR ITS
* LICENSORS BE LIABLE FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT,
* INDIRECT, SPECIAL, CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER
* CAUSED AND REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF
* OR INABILITY TO USE SOFTWARE, EVEN IF SUN HAS BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGES.
*
* This software is not designed or intended for use in on-line control of
* aircraft, air traffic, aircraft navigation or aircraft communications; or in
* the design, construction, operation or maintenance of any nuclear
* facility. Licensee represents and warrants that it will not use or
* redistribute the Software for such purposes.
*/
import javax.jms.*;
/**
* The TextListener class implements the MessageListener interface by
* defining an onMessage method that displays the contents of a TextMessage.
* <p>
* This class acts as the listener for the AsynchQueueReceiver class.
*
* @author Kim Haase
* @version 1.5, 08/09/00
*/
public class TextListener implements MessageListener {
final SampleUtilities.DoneLatch monitor =
new SampleUtilities.DoneLatch();
/**
* Casts the message to a TextMessage and displays its text.
* A non-text message is interpreted as the end of the message
* stream, and the message listener sets its monitor state to all
* done processing messages.
*
* @param message the incoming message
*/
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage msg = (TextMessage) message;
try {
System.out.println("Reading message: " + msg.getText());
} catch (JMSException e) {
System.out.println("Exception in onMessage(): " + e.toString());
}
} else {
monitor.allDone();
}
}
}
1.1 spyderMQ/examples/README
Index: README
===================================================================
These code examples are the one provided by SUN.
They have been configured so you can run them with spyderMQ.
Most of them work!
----------------------------------------------------------------------------
JAVA(TM) MESSAGE SERVICE CODE EXAMPLES
The Java Message Service (JMS) code examples show how to write a simple
application using JMS. They demonstrate most of the important features of JMS.
The JMS examples are divided into three groups:
- Basic examples provide a simple introduction to JMS. They show how to send
and synchronously receive a single message using either a queue or a topic.
- Intermediate examples demonstrate somewhat more complex features of JMS:
- using message listeners for asynchronous receiving of messages
- using the five supported message formats
- Advanced examples demonstrate still more advanced features of JMS:
- using message headers
- using message selectors
- using durable subscriptions
- using acknowledgement modes
- using transactions
- using the request/reply facility
You can run these examples using any JMS implementation. Small changes to
have been made to the file SampleUtilities.java so that they would work
with the default distribution of spyderMQ. By default, SampleUtilities.java is
configured to use the Java Naming and Directory Interface (JNDI) to look up
the JMS administered objects (connection factories and destinations). The
default JNDI connection factory names used by the application are
"TopicConnectionFactory" and "QueueConnectionFactory". In the
interest of not requiring JNDI in order to run the sample applications, you can
easily modify SampleUtilities.java to directly call the JMS-Provider-specific
method for creating a connection factory.
You can run the simpler queue examples in pairs, each program in a separate
terminal window. This allows you to simulate two separate applications, one
sending a message, the other receiving it.
For the other examples, the sender and receiver (or the publisher and the
subscriber, for topic examples) are each a separate class within the overall
program class. When you run these examples, the two classes use threads to
send and receive messages within the same program.
Before You Start
================
Before you begin, start the spyderMQ server. The queues and topics
need to run the examples come pre-configured in the default spyderMQ
distribution. Most of the examples take either a queue name or a
topic name as an argument. You could used the 'testQueue' and 'testTopic'
names as those arguments respectivly.
The sample programs should allready be compiled in the examples/
directory of the spyderMQ distribution. If you would like to compile
them from the source, obtain the spyderMQ source, and run the build
script.
To run the sample programs, you can use the included run.bat or run.sh
file that is in the examples/ directory. It will start the java interpreter
with the CLASSPATH set so that the examples work in the default spyderMQ
distribution. It is recomended that you used the provided 'run' script
in place of the 'java' command when starting the examples.
What All the Examples Have in Common
====================================
All the examples use the utility class SampleUtilities.java. It contains the
following methods:
- The methods getQueueConnectionFactory, getTopicConnectionFactory, getQueue,
and getTopic, which obtain a connection factory or destination either by
calling the method jndiLookup or, if you choose not to use JNDI, by a
call to createTopic or createQueue or by some other vendor-specific method
that you may fill in.
- The methods sendSynchronizeMessage and receiveSynchronizeMessages, which
are used to ensure that a publisher does not publish messages until its
subscriber or subscribers are ready for message delivery. These methods
use a queue named "controlQueue".
- The class DoneLatch, which allows a program to synchronize between an
asynchronous consumer and another thread in the receiving class.
- An exit method that all the examples call.
Most of the JMS examples execute the same basic setup steps:
1. They read a topic or queue name from the command line. The topic or
queue should have been created previously using an administrative tool.
2. They look up a connection factory and the topic or queue using the
jndiLookup method in the class SampleUtilities.
3. They use the connection factory to create a connection.
4. They use the connection to create a session.
5. They use the session to create message producers and/or consumers for
the topic or queue.
The publish/subscribe examples begin by calling the sendSynchronizeMessage and
receiveSynchronizeMessages methods to ensure that the subscriber gets all the
messages the publisher sends. The subscriber calls sendSynchronizeMessage when
it is ready to receive messages. The publisher waits for the synchronize
message; when the message arrives, the publisher starts sending its messages.
Most of the message-producing examples send an empty message at the end of the
program to indicate that they have finished sending messages. The
message-consuming examples use this message as a signal to stop reading
messages. The asynchronous message consumers use the DoneLatch class to pass
this signal from the message listener to the consuming class.
Each example contains comments that provide details on what it does and how it
works.
Basic Examples
==============
The most basic JMS examples do the following:
- SenderToQueue.java and SynchQueueReceiver.java can be used to send and
synchronously receive a single text message using a queue.
If you run these programs in two different windows, the order in which you
start them does not matter. If you run them in the same window, run
SenderToQueue first. Each program takes a queue name as a command-line
argument.
The output of SenderToQueue looks like this (the queue name is SQ):
% java SenderToQueue SQ
Queue name is SQ
Sending message: Here is a message 1
The output of SynchQueueReceiver looks like this:
% java SynchQueueReceiver SQ
Queue name is SQ
Reading message: Here is a message
- SynchTopicExample.java uses a publisher class and a subscriber class to
publish and synchronously receive a single text message using a topic. The
program takes a topic name as a command-line argument.
The output of SynchTopicExample looks like this (the topic name is ST):
% java SynchTopicExample ST
Topic name is ST
PUBLISHER THREAD: Publishing message: Here is a message 1
SUBSCRIBER THREAD: Reading message: Here is a message 1
These examples contain more detailed explanatory comments than the others.
Intermediate Examples
=====================
The intermediate JMS examples do the following:
- SenderToQueue.java and AsynchQueueReceiver.java send a specified number of
text messages to a queue and asynchronously receive them using a message
listener (TextListener), which is in the file TextListener.java.
To use SenderToQueue to send more than one message, specify a number after
the queue name when you run the program. For example:
% java SenderToQueue SQ 3
If you run these programs in two different windows, the order in which you
start them does not matter. If you run them in the same window, run
SenderToQueue first.
- AsynchTopicExample.java uses a publisher class and a subscriber class to
publish five text messages to a topic and asynchronously get them using a
message listener (TextListener).
- MessageFormats.java writes and reads messages in the five supported message
formats. The messages are not sent, so you do not need to specify a queue
or topic argument when you run the program.
- MessageConversion.java shows that for some message formats, you can write
a message using one data type and read it using another. The StreamMessage
format allows conversion between String objects and other data types. The
BytesMessage format allows more limited conversions. You do not need to
specify a queue or topic argument.
- ObjectMessages.java shows that objects are copied into messages, not passed
by reference: once you create a message from a given object, you can change
the original object, but the contents of the message do not change. You do
not need to specify a queue or topic argument.
- BytesMessages.java shows how to write, then read, a BytesMessage of
indeterminate length. It reads the message content from a text file, but
the same basic technique can be used with any kind of file, including a
binary one. Specify a text file on the command line when you run the
program:
% java BytesMessages <filename>
Advanced Examples
=================
The advanced examples do the following:
- MessageHeadersTopic.java illustrates the use of the JMS message header
fields. It displays the values of the header fields both before and after
a message is sent, and shows how the publish method sets the fields.
- TopicSelectors.java shows how to use message header fields as message
selectors. The program consists of one publisher and several subscribers.
Each subscriber uses a message selector to receive a subset of the messages
sent by the publisher.
- DurableSubscriberExample.java shows how you can create a durable subscriber
that retains messages published to a topic while the subscriber is inactive.
- AckEquivExample.java shows that to ensure that a message will not be
acknowledged until processing is complete, you can use either of the
following methods:
* An asynchronous receiver (message listener) in an AUTO_ACKNOWLEDGE session
* A synchronous receiver in a CLIENT_ACKNOWLEDGE session
This example takes both a queue name and a topic name as arguments.
- TransactedExample.java demonstrates the use of transactions in a simulated
e-commerce application. The classes within the example commit a transaction
only after they have received messages they were expecting and have sent
appropriate messages as a result. This example takes an integer argument
(the number of items being ordered). It uses five queues named A, B, C,
D, and E, which you must create in order to run the program.
- RequestReplyQueue.java uses the JMS request/reply facility, which supports
situations in which every message sent requires a response. The sending
application creates a QueueRequestor, which encapsulates the creation and
use of a destination where a reply is sent.
1.1 spyderMQ/examples/AsynchTopicExample.java
Index: AsynchTopicExample.java
===================================================================
/*
* @(#)AsynchTopicExample.java 1.6 00/08/18
*
* Copyright (c) 2000 Sun Microsystems, Inc. All Rights Reserved.
*
* Sun grants you ("Licensee") a non-exclusive, royalty free, license to use,
* modify and redistribute this software in source and binary code form,
* provided that i) this copyright notice and license appear on all copies of
* the software; and ii) Licensee does not utilize the software in a manner
* which is disparaging to Sun.
*
* This software is provided "AS IS," without a warranty of any kind. ALL
* EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING ANY
* IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR
* NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN AND ITS LICENSORS SHALL NOT BE
* LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING
* OR DISTRIBUTING THE SOFTWARE OR ITS DERIVATIVES. IN NO EVENT WILL SUN OR ITS
* LICENSORS BE LIABLE FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT,
* INDIRECT, SPECIAL, CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER
* CAUSED AND REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF
* OR INABILITY TO USE SOFTWARE, EVEN IF SUN HAS BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGES.
*
* This software is not designed or intended for use in on-line control of
* aircraft, air traffic, aircraft navigation or aircraft communications; or in
* the design, construction, operation or maintenance of any nuclear
* facility. Licensee represents and warrants that it will not use or
* redistribute the Software for such purposes.
*/
import javax.jms.*;
/**
* The AsynchTopicExample class demonstrates the use of a message listener in
* the publish/subscribe model. The publisher publishes several messages, and
* the subscriber reads them asynchronously.
* <p>
* The program contains a MultiplePublisher class, an AsynchSubscriber class
* with a listener class, a main method, and a method that runs the subscriber
* and publisher threads.
* <p>
* Specify a topic name on the command line when you run the program. The
* program also uses a queue named "controlQueue", which should be created
* before you run the program.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
public class AsynchTopicExample {
final String CONTROL_QUEUE = "controlQueue";
String topicName = null;
int exitResult = 0;
/**
* The AsynchSubscriber class fetches several messages from a topic
* asynchronously, using a message listener, TextListener.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
public class AsynchSubscriber extends Thread {
/**
* The TextListener class implements the MessageListener interface by
* defining an onMessage method for the AsynchSubscriber class.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
private class TextListener implements MessageListener {
final SampleUtilities.DoneLatch monitor =
new SampleUtilities.DoneLatch();
/**
* Casts the message to a TextMessage and displays its text.
* A non-text message is interpreted as the end of the message
* stream, and the message listener sets its monitor state to all
* done processing messages.
*
* @param message the incoming message
*/
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage msg = (TextMessage) message;
try {
System.out.println("SUBSCRIBER THREAD: Reading message: "
+ msg.getText());
} catch (JMSException e) {
System.out.println("Exception in onMessage(): "
+ e.toString());
}
} else {
monitor.allDone();
}
}
}
/**
* Runs the thread.
*/
public void run() {
TopicConnectionFactory topicConnectionFactory = null;
TopicConnection topicConnection = null;
TopicSession topicSession = null;
Topic topic = null;
TopicSubscriber topicSubscriber = null;
TextListener topicListener = null;
try {
topicConnectionFactory =
SampleUtilities.getTopicConnectionFactory();
topicConnection =
topicConnectionFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
topic = SampleUtilities.getTopic(topicName, topicSession);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
/*
* Create subscriber.
* Register message listener (TextListener).
* Start message delivery.
* Send synchronize message to publisher, then wait till all
* messages have arrived.
* Listener displays the messages obtained.
*/
try {
topicSubscriber = topicSession.createSubscriber(topic);
topicListener = new TextListener();
topicSubscriber.setMessageListener(topicListener);
topicConnection.start();
// Let publisher know that subscriber is ready.
try {
SampleUtilities.sendSynchronizeMessage("SUBSCRIBER THREAD: ",
CONTROL_QUEUE);
} catch (Exception e) {
System.out.println("Queue probably missing: " + e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
/*
* Asynchronously process messages.
* Block until publisher issues a control message indicating
* end of publish stream.
*/
topicListener.monitor.waitTillDone();
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
} finally {
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}
}
/**
* The MultiplePublisher class publishes several message to a topic.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
public class MultiplePublisher extends Thread {
/**
* Runs the thread.
*/
public void run() {
TopicConnectionFactory topicConnectionFactory = null;
TopicConnection topicConnection = null;
TopicSession topicSession = null;
Topic topic = null;
TopicPublisher topicPublisher = null;
TextMessage message = null;
final int NUMMSGS = 20;
final String MSG_TEXT = new String("Here is a message");
try {
topicConnectionFactory =
SampleUtilities.getTopicConnectionFactory();
topicConnection =
topicConnectionFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
topic = SampleUtilities.getTopic(topicName, topicSession);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
/*
* After synchronizing with subscriber, create publisher.
* Create text message.
* Send messages, varying text slightly.
* Send end-of-messages message.
* Finally, close connection.
*/
try {
/*
* Synchronize with subscriber. Wait for message indicating
* that subscriber is ready to receive messages.
*/
try {
SampleUtilities.receiveSynchronizeMessages("PUBLISHER THREAD: ",
CONTROL_QUEUE, 1);
} catch (Exception e) {
System.out.println("Queue probably missing: " + e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
topicPublisher = topicSession.createPublisher(topic);
message = topicSession.createTextMessage();
for (int i = 0; i < NUMMSGS; i++) {
message.setText(MSG_TEXT + " " + (i + 1));
System.out.println("PUBLISHER THREAD: Publishing message: "
+ message.getText());
topicPublisher.publish(message);
}
// Send a non-text control message indicating end of messages.
topicPublisher.publish(topicSession.createMessage());
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
} finally {
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}
}
/**
* Instantiates the subscriber and publisher classes and starts their
* threads.
* Calls the join method to wait for the threads to die.
* <p>
* It is essential to start the subscriber before starting the publisher.
* In the publish/subscribe model, a subscriber can ordinarily receive only
* messages published while it is active.
*/
public void run_threads() {
AsynchSubscriber asynchSubscriber = new AsynchSubscriber();
MultiplePublisher multiplePublisher = new MultiplePublisher();
multiplePublisher.start();
asynchSubscriber.start();
try {
asynchSubscriber.join();
multiplePublisher.join();
} catch (InterruptedException e) {}
}
/**
* Reads the topic name from the command line, then calls the
* run_threads method to execute the program threads.
*
* @param args the topic used by the example
*/
public static void main(String[] args) {
AsynchTopicExample ate = new AsynchTopicExample();
if (args.length != 1) {
System.out.println("Usage: java AsynchTopicExample <topic_name>");
System.exit(1);
}
ate.topicName = new String(args[0]);
System.out.println("Topic name is " + ate.topicName);
ate.run_threads();
SampleUtilities.exit(ate.exitResult);
}
}
1.1 spyderMQ/examples/MessageConversion.java
Index: MessageConversion.java
===================================================================
/*
* @(#)MessageConversion.java 1.4 00/08/09
*
* Copyright (c) 2000 Sun Microsystems, Inc. All Rights Reserved.
*
* Sun grants you ("Licensee") a non-exclusive, royalty free, license to use,
* modify and redistribute this software in source and binary code form,
* provided that i) this copyright notice and license appear on all copies of
* the software; and ii) Licensee does not utilize the software in a manner
* which is disparaging to Sun.
*
* This software is provided "AS IS," without a warranty of any kind. ALL
* EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING ANY
* IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR
* NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN AND ITS LICENSORS SHALL NOT BE
* LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING
* OR DISTRIBUTING THE SOFTWARE OR ITS DERIVATIVES. IN NO EVENT WILL SUN OR ITS
* LICENSORS BE LIABLE FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT,
* INDIRECT, SPECIAL, CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER
* CAUSED AND REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF
* OR INABILITY TO USE SOFTWARE, EVEN IF SUN HAS BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGES.
*
* This software is not designed or intended for use in on-line control of
* aircraft, air traffic, aircraft navigation or aircraft communications; or in
* the design, construction, operation or maintenance of any nuclear
* facility. Licensee represents and warrants that it will not use or
* redistribute the Software for such purposes.
*/
import javax.jms.*;
/**
* The MessageConversion class consists only of a main method, which creates
* and then reads a StreamMessage and a BytesMessage. It does not send the
* messages.
* <p>
* The program demonstrates type conversions in StreamMessages: you can write
* data as a String and read it as an Int, and vice versa. The program also
* calls clearBody() to clear the message so that it can be rewritten.
* <p>
* The program also shows how to write and read a BytesMessage using data types
* other than a byte array. Conversion between String and other types is
* not supported.
* <p>
* Before it can read a BytesMessage or StreamMessage that has not been sent,
* the program must call reset() to put the message body in read-only mode
* and reposition the stream.
*
* @author Kim Haase
* @version 1.4, 08/09/00
*/
public class MessageConversion {
/**
* Main method. Takes no arguments.
*/
public static void main(String[] args) {
QueueConnectionFactory queueConnectionFactory = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;
BytesMessage bytesMessage = null;
StreamMessage streamMessage = null;
int exitResult = 0;
try {
queueConnectionFactory =
SampleUtilities.getQueueConnectionFactory();
queueConnection =
queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
try {
/*
* Create a StreamMessage and write values of various data types
* to it.
* Reset the message, then read the values as Strings.
* Values written to a StreamMessage as one data type can be read
* as Strings and vice versa (except for String to char conversion).
*/
streamMessage = queueSession.createStreamMessage();
streamMessage.writeBoolean(false);
streamMessage.writeDouble(123.456789e222);
streamMessage.writeInt(223344);
streamMessage.writeChar('q');
streamMessage.reset();
System.out.println("Reading StreamMessage items of various data"
+ " types as String:");
System.out.println(" Boolean: " + streamMessage.readString());
System.out.println(" Double: " + streamMessage.readString());
System.out.println(" Int: " + streamMessage.readString());
System.out.println(" Char: " + streamMessage.readString());
/*
* Clear the body of the StreamMessage and write several Strings
* to it.
* Reset the message and read the values back as other data types.
*/
streamMessage.clearBody();
streamMessage.writeString("true");
streamMessage.writeString("123.456789e111");
streamMessage.writeString("556677");
// Not char: String to char conversion isn't valid
streamMessage.reset();
System.out.println("Reading StreamMessage String items as other"
+ " data types:");
System.out.println(" Boolean: " + streamMessage.readBoolean());
System.out.println(" Double: " + streamMessage.readDouble());
System.out.println(" Int: " + streamMessage.readInt());
/*
* Create a BytesMessage and write values of various types into
* it.
*/
bytesMessage = queueSession.createBytesMessage();
bytesMessage.writeBoolean(false);
bytesMessage.writeDouble(123.456789e22);
bytesMessage.writeInt(778899);
bytesMessage.writeInt(0x7f800000);
bytesMessage.writeChar('z');
/*
* Reset the message and read the values back. Only limited
* type conversions are possible.
*/
bytesMessage.reset();
System.out.println("Reading BytesMessages of various types:");
System.out.println(" Boolean: " + bytesMessage.readBoolean());
System.out.println(" Double: " + bytesMessage.readDouble());
System.out.println(" Int: " + bytesMessage.readInt());
System.out.println(" Float: " + bytesMessage.readFloat());
System.out.println(" Char: " + bytesMessage.readChar());
} catch (JMSException e) {
System.out.println("JMS Exception occurred: " + e.toString());
exitResult = 1;
} finally {
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
SampleUtilities.exit(exitResult);
}
}
1.1 spyderMQ/examples/AsynchQueueReceiver.java
Index: AsynchQueueReceiver.java
===================================================================
/*
* @(#)AsynchQueueReceiver.java 1.6 00/08/14
*
* Copyright (c) 2000 Sun Microsystems, Inc. All Rights Reserved.
*
* Sun grants you ("Licensee") a non-exclusive, royalty free, license to use,
* modify and redistribute this software in source and binary code form,
* provided that i) this copyright notice and license appear on all copies of
* the software; and ii) Licensee does not utilize the software in a manner
* which is disparaging to Sun.
*
* This software is provided "AS IS," without a warranty of any kind. ALL
* EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING ANY
* IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR
* NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN AND ITS LICENSORS SHALL NOT BE
* LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING
* OR DISTRIBUTING THE SOFTWARE OR ITS DERIVATIVES. IN NO EVENT WILL SUN OR ITS
* LICENSORS BE LIABLE FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT,
* INDIRECT, SPECIAL, CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER
* CAUSED AND REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF
* OR INABILITY TO USE SOFTWARE, EVEN IF SUN HAS BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGES.
*
* This software is not designed or intended for use in on-line control of
* aircraft, air traffic, aircraft navigation or aircraft communications; or in
* the design, construction, operation or maintenance of any nuclear
* facility. Licensee represents and warrants that it will not use or
* redistribute the Software for such purposes.
*/
import javax.jms.*;
/**
* The AsynchQueueReceiver class consists only of a main method, which fetches
* one or more messages from a queue using asynchronous message delivery.
* <p>
* Compile TextListener.java before you run this program.
* <p>
* Run this program in conjunction with SenderToQueue. Specify a queue name
* on the command line when you run the program.
*
* @author Kim Haase
* @version 1.6, 08/14/00
*/
public class AsynchQueueReceiver {
/**
* Main method.
*
* @param args the queue used by the example
*/
public static void main(String[] args) {
String queueName = null;
QueueConnectionFactory queueConnectionFactory = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;
Queue queue = null;
QueueReceiver queueReceiver = null;
TextListener textListener = null;
int exitResult = 0;
if (args.length != 1) {
System.out.println("Usage: java AsynchQueueReceiver <queue_name>");
System.exit(1);
}
queueName = new String(args[0]);
System.out.println("Queue name is " + queueName);
try {
queueConnectionFactory =
SampleUtilities.getQueueConnectionFactory();
queueConnection =
queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
queue = SampleUtilities.getQueue(queueName, queueSession);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
/*
* Create receiver.
* Register message listener (TextListener).
* Start message delivery; listener displays the message obtained.
* Block until publisher issues a control message indicating
* end of publish stream.
*/
try {
queueReceiver = queueSession.createReceiver(queue);
textListener = new TextListener();
queueReceiver.setMessageListener(textListener);
queueConnection.start();
textListener.monitor.waitTillDone();
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
} finally {
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
SampleUtilities.exit(exitResult);
}
}
1.1 spyderMQ/examples/TransactedExample.java
Index: TransactedExample.java
===================================================================
/*
* @(#)TransactedExample.java 1.3 00/08/18
*
* Copyright (c) 2000 Sun Microsystems, Inc. All Rights Reserved.
*
* Sun grants you ("Licensee") a non-exclusive, royalty free, license to use,
* modify and redistribute this software in source and binary code form,
* provided that i) this copyright notice and license appear on all copies of
* the software; and ii) Licensee does not utilize the software in a manner
* which is disparaging to Sun.
*
* This software is provided "AS IS," without a warranty of any kind. ALL
* EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING ANY
* IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR
* NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN AND ITS LICENSORS SHALL NOT BE
* LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING
* OR DISTRIBUTING THE SOFTWARE OR ITS DERIVATIVES. IN NO EVENT WILL SUN OR ITS
* LICENSORS BE LIABLE FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT,
* INDIRECT, SPECIAL, CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER
* CAUSED AND REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF
* OR INABILITY TO USE SOFTWARE, EVEN IF SUN HAS BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGES.
*
* This software is not designed or intended for use in on-line control of
* aircraft, air traffic, aircraft navigation or aircraft communications; or in
* the design, construction, operation or maintenance of any nuclear
* facility. Licensee represents and warrants that it will not use or
* redistribute the Software for such purposes.
*/
import java.util.*;
import javax.jms.*;
/**
* The TransactedExample class demonstrates the use of transactions in a JMS
* application. It represents a highly simplified eCommerce application, in
* which the following things happen:
*
* <pre>
* Legend
* R - Retailer
* V - Vendor
* S - Supplier
* O - Order Queue
* C - Confirmation Queue
* ()- Thread
* []- Queue
*
* 2(b) 3
* 1 2(a) /+------->[S1 O]<-----------(S1)
* /+-->[V O]<----+ / |
* / \ / 3 |
* / \ / 5 v------------------+
* (R) ( V )-------->[V C] 4
* \ / \ ^------------------+
* \ / \ |
* \ 7 6 / \ |
* +---->[R C]<--+ \ 2(c) 4 |
* +------->[SN O]<-----------(SN)
* </pre>
*
* <ol>
* <li>A retailer sends a message to the vendor order queue ordering a quantity
* of computers. It waits for the vendor's reply.
* <li>The vendor receives the retailer's order message and places an order
* message into each of its suppliers' order queues, all in one transaction.
* This JMS transaction combines one synchronous receive with multiple sends.
* <li>One supplier receives the order from its order queue, checks its
* inventory, and sends the items ordered to the order message's replyTo
* field. If it does not have enough in stock, it sends what it has.
* The synchronous receive and the send take place in one JMS transaction.
* <li>The other supplier receives the order from its order queue, checks its
* inventory, and sends the items ordered to the order message's replyTo
* field. If it does not have enough in stock, it sends what it has.
* The synchronous receive and the send take place in one JMS transaction.
* <li>The vendor receives the replies from the suppliers from its confirmation
* queue and updates the state of the order. Messages are processed by an
* asynchronous message listener; this step illustrates using JMS transactions
* with a message listener.
* <li>When all outstanding replies are processed for a given order, the vendor
* sends a message notifying the retailer whether or not it can fulfill the
* order.
* <li>The retailer receives the message from the vendor.
* </ol>
* <p>
* The program contains five classes: Retailer, Vendor, GenericSupplier,
* VendorMessageListener, and Order. It also contains a main method and a
* method that runs the threads of the Retail, Vendor, and two supplier classes.
* <p>
* All the messages use the MapMessage message type. Synchronous receives are
* used for all message reception except for the case of the vendor processing
* the replies of the suppliers. These replies are processed asynchronously
* and demonstrate how to use transactions within a message listener.
* <p>
* All classes except Retailer use transacted sessions.
* <p>
* The program uses five queues. Before you run the program, create the
* queues and name them A, B, C, D and E.
* <p>
* When you run the program, specify on the command line the number of
* computers to be ordered.
*
* @author Kim Haase
* @author Joseph Fialli
* @version 1.3, 08/18/00
*/
public class TransactedExample {
public static String vendorOrderQueueName = null;
public static String retailerConfirmationQueueName = null;
public static String monitorOrderQueueName = null;
public static String storageOrderQueueName = null;
public static String vendorConfirmationQueueName = null;
public static int exitResult = 0;
/**
* The Retailer class orders a number of computers by sending a message
* to a vendor. It then waits for the order to be confirmed.
* <p>
* In this example, the Retailer places two orders, one for the quantity
* specified on the command line and one for twice that number.
* <p>
* This class does not use transactions.
*
* @author Kim Haase
* @author Joseph Fialli
* @version 1.3, 08/18/00
*/
public static class Retailer extends Thread {
int quantity = 0;
/**
* Constructor. Instantiates the retailer with the quantity of
* computers being ordered.
*
* @param q the quantity specified in the program arguments
*/
public Retailer(int q) {
quantity = q;
}
/**
* Runs the thread.
*/
public void run() {
QueueConnectionFactory queueConnectionFactory = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;
Queue vendorOrderQueue = null;
Queue retailerConfirmationQueue = null;
QueueSender queueSender = null;
MapMessage outMessage = null;
QueueReceiver orderConfirmationReceiver = null;
MapMessage inMessage = null;
try {
queueConnectionFactory =
SampleUtilities.getQueueConnectionFactory();
queueConnection =
queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
vendorOrderQueue =
SampleUtilities.getQueue(vendorOrderQueueName, queueSession);
retailerConfirmationQueue =
SampleUtilities.getQueue(retailerConfirmationQueueName,
queueSession);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
System.out.println("Program assumes five queues named A B C D E");
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
/*
* Create non-transacted session and sender for vendor order
* queue.
* Create message to vendor, setting item and quantity values.
* Send message.
* Create receiver for retailer confirmation queue.
* Get message and report result.
* Send an end-of-message-stream message so vendor will
* stop processing orders.
*/
try {
queueSender = queueSession.createSender(vendorOrderQueue);
outMessage = queueSession.createMapMessage();
outMessage.setString("Item", "Computer(s)");
outMessage.setInt("Quantity", quantity);
outMessage.setJMSReplyTo(retailerConfirmationQueue);
queueSender.send(outMessage);
System.out.println("Retailer: ordered " + quantity
+ " computer(s)");
orderConfirmationReceiver =
queueSession.createReceiver(retailerConfirmationQueue);
queueConnection.start();
inMessage = (MapMessage) orderConfirmationReceiver.receive();
if (inMessage.getBoolean("OrderAccepted") == true) {
System.out.println("Retailer: Order filled");
} else {
System.out.println("Retailer: Order not filled");
}
System.out.println("Retailer: placing another order");
outMessage.setInt("Quantity", quantity * 2);
queueSender.send(outMessage);
System.out.println("Retailer: ordered "
+ outMessage.getInt("Quantity")
+ " computer(s)");
inMessage =
(MapMessage) orderConfirmationReceiver.receive();
if (inMessage.getBoolean("OrderAccepted") == true) {
System.out.println("Retailer: Order filled");
} else {
System.out.println("Retailer: Order not filled");
}
// Send a non-text control message indicating end of messages.
queueSender.send(queueSession.createMessage());
} catch (Exception e) {
System.out.println("Retailer: Exception occurred: "
+ e.toString());
e.printStackTrace();
exitResult = 1;
} finally {
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}
}
/**
* The Vendor class uses one transaction to receive the computer order from
* the retailer and order the needed number of monitors and disk drives
* from its suppliers. At random intervals, it throws an exception to
* simulate a database problem and cause a rollback.
* <p>
* The class uses an asynchronous message listener to process replies from
* suppliers. When all outstanding supplier inquiries complete, it sends a
* message to the Retailer accepting or refusing the order.
*
* @author Kim Haase
* @author Joseph Fialli
* @version 1.3, 08/18/00
*/
public static class Vendor extends Thread {
Random rgen = new Random();
int throwException = 1;
/**
* Runs the thread.
*/
public void run() {
QueueConnectionFactory queueConnectionFactory = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;
QueueSession asyncQueueSession = null;
Queue vendorOrderQueue = null;
Queue monitorOrderQueue = null;
Queue storageOrderQueue = null;
Queue vendorConfirmationQueue = null;
QueueReceiver vendorOrderQueueReceiver = null;
QueueSender monitorOrderQueueSender = null;
QueueSender storageOrderQueueSender = null;
MapMessage orderMessage = null;
QueueReceiver vendorConfirmationQueueReceiver = null;
VendorMessageListener listener = null;
Message inMessage = null;
MapMessage vendorOrderMessage = null;
Message endOfMessageStream = null;
Order order = null;
int quantity = 0;
try {
queueConnectionFactory =
SampleUtilities.getQueueConnectionFactory();
queueConnection =
queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(true, 0);
asyncQueueSession = queueConnection.createQueueSession(true, 0);
vendorOrderQueue =
SampleUtilities.getQueue(vendorOrderQueueName, queueSession);
monitorOrderQueue =
SampleUtilities.getQueue(monitorOrderQueueName, queueSession);
storageOrderQueue =
SampleUtilities.getQueue(storageOrderQueueName, queueSession);
vendorConfirmationQueue =
SampleUtilities.getQueue(vendorConfirmationQueueName,
queueSession);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
System.out.println("Program assumes six queues named A B C D E F");
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
try {
/*
* Create receiver for vendor order queue, sender for
* supplier order queues, and message to send to suppliers.
*/
vendorOrderQueueReceiver =
queueSession.createReceiver(vendorOrderQueue);
monitorOrderQueueSender =
queueSession.createSender(monitorOrderQueue);
storageOrderQueueSender =
queueSession.createSender(storageOrderQueue);
orderMessage = queueSession.createMapMessage();
/*
* Configure an asynchronous message listener to process
* supplier replies to inquiries for parts to fill order.
* Start delivery.
*/
vendorConfirmationQueueReceiver =
asyncQueueSession.createReceiver(vendorConfirmationQueue);
listener = new VendorMessageListener(asyncQueueSession, 2);
vendorConfirmationQueueReceiver.setMessageListener(listener);
queueConnection.start();
/*
* Process orders in vendor order queue.
* Use one transaction to receive order from order queue
* and send messages to suppliers' order queues to order
* components to fulfill the order placed with the vendor.
*/
while (true) {
try {
// Receive an order from a retailer.
inMessage = vendorOrderQueueReceiver.receive();
if (inMessage instanceof MapMessage) {
vendorOrderMessage = (MapMessage) inMessage;
} else {
/*
* Message is an end-of-message-stream message from
* retailer. Send similar messages to suppliers,
* then break out of processing loop.
*/
endOfMessageStream = queueSession.createMessage();
endOfMessageStream.setJMSReplyTo(vendorConfirmationQueue);
monitorOrderQueueSender.send(endOfMessageStream);
storageOrderQueueSender.send(endOfMessageStream);
queueSession.commit();
break;
}
/*
* A real application would check an inventory database
* and order only the quantities needed. Throw an
* exception every few times to simulate a database
* concurrent-access exception and cause a rollback.
*/
if (rgen.nextInt(3) == throwException) {
throw new JMSException("Simulated database concurrent
access exception");
}
// Record retailer order as a pending order.
order = new Order(vendorOrderMessage);
/*
* Set order number and reply queue for outgoing
* message.
*/
orderMessage.setInt("VendorOrderNumber",
order.orderNumber);
orderMessage.setJMSReplyTo(vendorConfirmationQueue);
quantity = vendorOrderMessage.getInt("Quantity");
System.out.println("Vendor: Retailer ordered " +
quantity + " " +
vendorOrderMessage.getString("Item"));
// Send message to monitor supplier.
orderMessage.setString("Item", "Monitor");
orderMessage.setInt("Quantity", quantity);
monitorOrderQueueSender.send(orderMessage);
System.out.println("Vendor: ordered " + quantity + " "
+ orderMessage.getString("Item")
+ "(s)");
/*
* Reuse message to send to storage supplier, changing
* only item name.
*/
orderMessage.setString("Item", "Hard Drive");
storageOrderQueueSender.send(orderMessage);
System.out.println("Vendor: ordered " + quantity + " "
+ orderMessage.getString("Item")
+ "(s)");
// Commit session.
queueSession.commit();
System.out.println(" Vendor: committed transaction 1");
} catch(JMSException e) {
System.out.println("Vendor: JMSException occurred: "
+ e.toString());
e.printStackTrace();
queueSession.rollback();
System.out.println(" Vendor: rolled back transaction 1");
exitResult = 1;
}
}
// Wait till suppliers get back with answers.
listener.monitor.waitTillDone();
} catch (JMSException e) {
System.out.println("Vendor: Exception occurred: " +
e.toString());
e.printStackTrace();
exitResult = 1;
} finally {
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}
}
/**
* The Order class represents a Retailer order placed with a Vendor.
* It maintains a table of pending orders.
*
* @author Joseph Fialli
* @version 1.3, 08/18/00
*/
public static class Order {
private static Hashtable pendingOrders = new Hashtable();
private static int nextOrderNumber = 1;
private static final int PENDING_STATUS = 1;
private static final int CANCELLED_STATUS = 2;
private static final int FULFILLED_STATUS = 3;
int status;
public final int orderNumber;
public int quantity;
public final MapMessage order; // original order from retailer
public MapMessage monitor = null; // reply from supplier
public MapMessage storage = null; // reply from supplier
/**
* Returns the next order number and increments the static variable
* that holds this value.
*
* @return the next order number
*/
private static int getNextOrderNumber() {
int result = nextOrderNumber;
nextOrderNumber++;
return result;
}
/**
* Constructor. Sets order number; sets order and quantity from
* incoming message. Sets status to pending, and adds order to hash
* table of pending orders.
*
* @param order the message containing the order
*/
public Order(MapMessage order) {
this.orderNumber = getNextOrderNumber();
this.order = order;
try {
this.quantity = order.getInt("Quantity");
} catch (JMSException je) {
System.err.println("Unexpected error. Message missing Quantity");
this.quantity = 0;
}
status = PENDING_STATUS;
pendingOrders.put(new Integer(orderNumber), this);
}
/**
* Returns the number of orders in the hash table.
*
* @return the number of pending orders
*/
public static int outstandingOrders() {
return pendingOrders.size();
}
/**
* Returns the order corresponding to a given order number.
*
* @param orderNumber the number of the requested order
* @return the requested order
*/
public static Order getOrder(int orderNumber) {
return (Order) pendingOrders.get(new Integer(orderNumber));
}
/**
* Called by the onMessage method of the VendorMessageListener class
* to process a reply from a supplier to the Vendor.
*
* @param component the message from the supplier
* @return the order with updated status information
*/
public Order processSubOrder(MapMessage component) {
String itemName = null;
// Determine which subcomponent this is.
try {
itemName = component.getString("Item");
} catch (JMSException je) {
System.err.println("Unexpected exception. Message missing Item");
}
if (itemName.compareTo("Monitor") == 0) {
monitor = component;
} else if (itemName.compareTo("Hard Drive") == 0 ) {
storage = component;
}
/*
* If notification for all subcomponents has been received,
* verify the quantities to compute if able to fulfill order.
*/
if ( (monitor != null) && (storage != null) ) {
try {
if (quantity > monitor.getInt("Quantity")) {
status = CANCELLED_STATUS;
} else if (quantity > storage.getInt("Quantity")) {
status = CANCELLED_STATUS;
} else {
status = FULFILLED_STATUS;
}
} catch (JMSException je) {
System.err.println("Unexpected exception " + je);
status = CANCELLED_STATUS;
}
/*
* Processing of order is complete, so remove it from
* pending-order list.
*/
pendingOrders.remove(new Integer(orderNumber));
}
return this;
}
/**
* Determines if order status is pending.
*
* @return true if order is pending, false if not
*/
public boolean isPending() {
return status == PENDING_STATUS;
}
/**
* Determines if order status is cancelled.
*
* @return true if order is cancelled, false if not
*/
public boolean isCancelled() {
return status == CANCELLED_STATUS;
}
/**
* Determines if order status is fulfilled.
*
* @return true if order is fulfilled, false if not
*/
public boolean isFulfilled() {
return status == FULFILLED_STATUS;
}
}
/**
* The VendorMessageListener class processes an order confirmation message
* from a supplier to the vendor.
* <p>
* It demonstrates the use of transactions within message listeners.
*
* @author Joseph Fialli
* @version 1.3, 08/18/00
*/
public static class VendorMessageListener implements MessageListener {
final SampleUtilities.DoneLatch monitor =
new SampleUtilities.DoneLatch();
private final QueueSession session;
int numSuppliers;
/**
* Constructor. Instantiates the message listener with the session
* of the consuming class (the vendor).
*
* @param qs the session of the consumer
* @param numSuppliers the number of suppliers
*/
public VendorMessageListener(QueueSession qs, int numSuppliers) {
this.session = qs;
this.numSuppliers = numSuppliers;
}
/**
* Casts the message to a MapMessage and processes the order.
* A message that is not a MapMessage is interpreted as the end of the
* message stream, and the message listener sets its monitor state to
* all done processing messages.
* <p>
* Each message received represents a fulfillment message from
* a supplier.
*
* @param message the incoming message
*/
public void onMessage(Message message) {
/*
* If message is an end-of-message-stream message and this is the
* last such message, set monitor status to all done processing
* messages and commit transaction.
*/
if (! (message instanceof MapMessage)) {
if (Order.outstandingOrders() == 0) {
numSuppliers--;
if (numSuppliers == 0) {
monitor.allDone();
}
}
try {
session.commit();
} catch (JMSException je) {}
return;
}
/*
* Message is an order confirmation message from a supplier.
*/
int orderNumber = -1;
try {
MapMessage component = (MapMessage) message;
/*
* Process the order confirmation message and commit the
* transaction.
*/
orderNumber = component.getInt("VendorOrderNumber");
Order order =
Order.getOrder(orderNumber).processSubOrder(component);
session.commit();
/*
* If this message is the last supplier message, send message
* to Retailer and commit transaction.
*/
if (! order.isPending()) {
System.out.println("Vendor: Completed processing for order "
+ order.orderNumber);
Queue replyQueue = (Queue) order.order.getJMSReplyTo();
QueueSender qs = session.createSender(replyQueue);
MapMessage retailerConfirmationMessage =
session.createMapMessage();
if (order.isFulfilled()) {
retailerConfirmationMessage.setBoolean("OrderAccepted",
true);
System.out.println("Vendor: sent " + order.quantity
+ " computer(s)");
} else if (order.isCancelled()) {
retailerConfirmationMessage.setBoolean("OrderAccepted",
false);
System.out.println("Vendor: unable to send " +
order.quantity + " computer(s)");
}
qs.send(retailerConfirmationMessage);
session.commit();
System.out.println(" Vendor: committed transaction 2");
}
} catch (JMSException je) {
je.printStackTrace();
try {
session.rollback();
} catch (JMSException je2) {}
} catch (Exception e) {
e.printStackTrace();
try {
session.rollback();
} catch (JMSException je2) {}
}
}
}
/**
* The GenericSupplier class receives an item order from the
* vendor and sends a message accepting or refusing it.
*
* @author Kim Haase
* @author Joseph Fialli
* @version 1.3, 08/18/00
*/
public static class GenericSupplier extends Thread {
final String PRODUCT_NAME;
final String IN_ORDER_QUEUE;
int quantity = 0;
/**
* Constructor. Instantiates the supplier as the supplier for the
* kind of item being ordered.
*
* @param itemName the name of the item being ordered
* @param inQueue the queue from which the order is obtained
*/
public GenericSupplier(String itemName, String inQueue) {
PRODUCT_NAME = itemName;
IN_ORDER_QUEUE = inQueue;
}
/**
* Checks to see if there are enough items in inventory.
* Rather than go to a database, it generates a random number related
* to the order quantity, so that some of the time there won't be
* enough in stock.
*
* @return the number of items in inventory
*/
public int checkInventory() {
Random rgen = new Random();
return (rgen.nextInt(quantity * 5));
}
/**
* Runs the thread.
*/
public void run() {
QueueConnectionFactory queueConnectionFactory = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;
Queue orderQueue = null;
QueueReceiver queueReceiver = null;
Message inMessage = null;
MapMessage orderMessage = null;
MapMessage outMessage = null;
try {
queueConnectionFactory =
SampleUtilities.getQueueConnectionFactory();
queueConnection =
queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(true, 0);
orderQueue =
SampleUtilities.getQueue(IN_ORDER_QUEUE, queueSession);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
System.out.println("Program assumes six queues named A B C D E F");
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
// Create receiver for order queue and start message delivery.
try {
queueReceiver = queueSession.createReceiver(orderQueue);
queueConnection.start();
} catch (JMSException je) {
exitResult = 1;
}
/*
* Keep checking supplier order queue for order request until
* end-of-message-stream message is received.
* Receive order and send an order confirmation as one transaction.
*/
while (true) {
try {
inMessage = queueReceiver.receive();
if (inMessage instanceof MapMessage) {
orderMessage = (MapMessage) inMessage;
} else {
/*
* Message is an end-of-message-stream message.
* Send a similar message to reply queue, commit
* transaction, then stop processing orders by breaking
* out of loop.
*/
QueueSender queueSender =
queueSession.createSender((Queue)
inMessage.getJMSReplyTo());
queueSender.send(queueSession.createMessage());
queueSession.commit();
break;
}
// Extract quantity ordered from order message.
quantity = orderMessage.getInt("Quantity");
System.out.println(PRODUCT_NAME
+ " Supplier: Vendor ordered " + quantity + " "
+ orderMessage.getString("Item") + "(s)");
/*
* Create sender and message for reply queue.
* Set order number and item; check inventory and set
* quantity available.
* Send message to vendor and commit transaction.
*/
QueueSender queueSender =
queueSession.createSender((Queue)
orderMessage.getJMSReplyTo());
outMessage = queueSession.createMapMessage();
outMessage.setInt("VendorOrderNumber",
orderMessage.getInt("VendorOrderNumber"));
outMessage.setString("Item", PRODUCT_NAME);
int numAvailable = checkInventory();
if (numAvailable >= quantity) {
outMessage.setInt("Quantity", quantity);
} else {
outMessage.setInt("Quantity", numAvailable);
}
queueSender.send(outMessage);
System.out.println(PRODUCT_NAME + " Supplier: sent "
+ outMessage.getInt("Quantity") + " "
+ outMessage.getString("Item") + "(s)");
queueSession.commit();
System.out.println(" " + PRODUCT_NAME
+ " Supplier: committed transaction");
} catch (Exception e) {
System.out.println(PRODUCT_NAME
+ " Supplier: Exception occurred: " + e.toString());
e.printStackTrace();
exitResult = 1;
}
}
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}
/**
* Creates the Retailer and Vendor classes and the two supplier classes,
* then starts the threads.
*
* @param quantity the quantity specified on the command line
*/
public static void run_threads(int quantity) {
Retailer r = new Retailer(quantity);
Vendor v = new Vendor();
GenericSupplier ms = new GenericSupplier("Monitor",
monitorOrderQueueName);
GenericSupplier ss = new GenericSupplier("Hard Drive",
storageOrderQueueName);
r.start();
v.start();
ms.start();
ss.start();
try {
r.join();
v.join();
ms.join();
ss.join();
} catch (InterruptedException e) {}
}
/**
* Reads the order quantity from the command line, then
* calls the run_threads method to execute the program threads.
*
* @param args the quantity of computers being ordered
*/
public static void main(String[] args) {
TransactedExample te = new TransactedExample();
int quantity = 0;
if (args.length != 1) {
System.out.println("Usage: java TransactedExample <integer>");
System.out.println("Program assumes five queues named A B C D E");
System.exit(1);
}
te.vendorOrderQueueName = new String("A");
te.retailerConfirmationQueueName = new String("B");
te.monitorOrderQueueName = new String("C");
te.storageOrderQueueName = new String("D");
te.vendorConfirmationQueueName = new String("E");
quantity = (new Integer(args[0])).intValue();
System.out.println("Quantity to be ordered is " + quantity);
if (quantity > 0) {
te.run_threads(quantity);
} else {
System.out.println("Quantity must be positive and nonzero");
te.exitResult = 1;
}
SampleUtilities.exit(te.exitResult);
}
}
1.1 spyderMQ/examples/RequestReplyQueue.java
Index: RequestReplyQueue.java
===================================================================
/*
* @(#)RequestReplyQueue.java 1.5 00/08/14
*
* Copyright (c) 2000 Sun Microsystems, Inc. All Rights Reserved.
*
* Sun grants you ("Licensee") a non-exclusive, royalty free, license to use,
* modify and redistribute this software in source and binary code form,
* provided that i) this copyright notice and license appear on all copies of
* the software; and ii) Licensee does not utilize the software in a manner
* which is disparaging to Sun.
*
* This software is provided "AS IS," without a warranty of any kind. ALL
* EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING ANY
* IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR
* NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN AND ITS LICENSORS SHALL NOT BE
* LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING
* OR DISTRIBUTING THE SOFTWARE OR ITS DERIVATIVES. IN NO EVENT WILL SUN OR ITS
* LICENSORS BE LIABLE FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT,
* INDIRECT, SPECIAL, CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER
* CAUSED AND REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF
* OR INABILITY TO USE SOFTWARE, EVEN IF SUN HAS BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGES.
*
* This software is not designed or intended for use in on-line control of
* aircraft, air traffic, aircraft navigation or aircraft communications; or in
* the design, construction, operation or maintenance of any nuclear
* facility. Licensee represents and warrants that it will not use or
* redistribute the Software for such purposes.
*/
import javax.jms.*;
/**
* The RequestReplyQueue class illustrates a simple implementation of a
* request/reply message exchange. It uses the QueueRequestor class provided
* by JMS. Providers and clients can create more sophisticated versions of
* this facility.
* <p>
* The program contains a Request class, a Reply class, a main method, and
* a method that runs the sender and receiver threads.
*
* @author Kim Haase
* @version 1.5, 08/14/00
*/
public class RequestReplyQueue {
String queueName = null;
int exitResult = 0;
/**
* The Request class represents the request half of the message exchange.
*
* @author Kim Haase
* @version 1.5, 08/14/00
*/
public class Request extends Thread {
/**
* Runs the thread.
*/
public void run() {
QueueConnectionFactory queueConnectionFactory = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;
Queue queue = null;
QueueRequestor queueRequestor = null;
TextMessage message = null;
final String MSG_TEXT = new String("Here is a request");
TextMessage reply = null;
String replyID = null;
try {
queueConnectionFactory =
SampleUtilities.getQueueConnectionFactory();
queueConnection =
queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
queue = SampleUtilities.getQueue(queueName, queueSession);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
/*
* Create a QueueRequestor.
* Create a text message and set its text.
* Start delivery of incoming messages.
* Send the text message as the argument to the request method,
* which returns the reply message. The request method also
* creates a temporary queue and places it in the JMSReplyTo
* message header field.
* Extract and display the reply message.
* Read the JMSCorrelationID of the reply message and confirm that
* it matches the JMSMessageID of the message that was sent.
* Finally, close the connection.
*/
try {
queueRequestor = new QueueRequestor(queueSession, queue);
message = queueSession.createTextMessage();
message.setText(MSG_TEXT);
System.out.println("REQUEST: Sending message: "
+ message.getText());
queueConnection.start();
reply = (TextMessage) queueRequestor.request(message);
System.out.println("REQUEST: Reply received: "
+ reply.getText());
replyID = new String(reply.getJMSCorrelationID());
if (replyID.equals(message.getJMSMessageID())) {
System.out.println("REQUEST: OK: Reply matches sent message");
} else {
System.out.println("REQUEST: ERROR: Reply does not match sent
message");
}
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
} catch (Exception ee) {
System.out.println("Unexpected exception: " + ee.toString());
ee.printStackTrace();
exitResult = 1;
} finally {
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}
}
/**
* The Reply class represents the reply half of the message exchange.
*
* @author Kim Haase
* @version 1.5, 08/14/00
*/
public class Reply extends Thread {
/**
* Runs the thread.
*/
public void run() {
QueueConnectionFactory queueConnectionFactory = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;
Queue queue = null;
QueueReceiver queueReceiver = null;
TextMessage message = null;
Queue tempQueue = null;
QueueSender replySender = null;
TextMessage reply = null;
final String REPLY_TEXT = new String("Here is a reply");
try {
queueConnectionFactory =
SampleUtilities.getQueueConnectionFactory();
queueConnection =
queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
queue = SampleUtilities.getQueue(queueName, queueSession);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
/*
* Create a QueueReceiver.
* Start delivery of incoming messages.
* Call receive, which blocks until it obtains a message.
* Display the message obtained.
* Extract the temporary reply queue from the JMSReplyTo field of
* the message header.
* Use the temporary queue to create a sender for the reply message.
* Create the reply message, setting the JMSCorrelationID to the
* value of the incoming message's JMSMessageID.
* Send the reply message.
* Finally, close the connection.
*/
try {
queueReceiver = queueSession.createReceiver(queue);
queueConnection.start();
message = (TextMessage) queueReceiver.receive();
System.out.println("REPLY: Message received: "
+ message.getText());
tempQueue = (Queue) message.getJMSReplyTo();
replySender = queueSession.createSender(tempQueue);
reply = queueSession.createTextMessage();
reply.setText(REPLY_TEXT);
reply.setJMSCorrelationID(message.getJMSMessageID());
System.out.println("REPLY: Sending reply: " + reply.getText());
replySender.send(reply);
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
} catch (Exception ee) {
System.out.println("Unexpected exception: " + ee.toString());
ee.printStackTrace();
exitResult = 1;
} finally {
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}
}
/**
* Instantiates the Request and Reply classes and starts their
* threads.
* Calls the join method to wait for the threads to die.
*/
public void run_threads() {
Request request = new Request();
Reply reply = new Reply();
request.start();
reply.start();
try {
request.join();
reply.join();
} catch (InterruptedException e) {}
}
/**
* Reads the queue name from the command line, then calls the
* run_threads method to execute the program threads.
*
* @param args the queue used by the example
*/
public static void main(String[] args) {
RequestReplyQueue rrq = new RequestReplyQueue();
if (args.length != 1) {
System.out.println("Usage: java RequestReplyQueue <queue_name>");
System.exit(1);
}
rrq.queueName = new String(args[0]);
System.out.println("Queue name is " + rrq.queueName);
rrq.run_threads();
SampleUtilities.exit(rrq.exitResult);
}
}
1.1 spyderMQ/examples/ObjectMessages.java
Index: ObjectMessages.java
===================================================================
/*
* @(#)ObjectMessages.java 1.4 00/08/09
*
* Copyright (c) 2000 Sun Microsystems, Inc. All Rights Reserved.
*
* Sun grants you ("Licensee") a non-exclusive, royalty free, license to use,
* modify and redistribute this software in source and binary code form,
* provided that i) this copyright notice and license appear on all copies of
* the software; and ii) Licensee does not utilize the software in a manner
* which is disparaging to Sun.
*
* This software is provided "AS IS," without a warranty of any kind. ALL
* EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING ANY
* IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR
* NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN AND ITS LICENSORS SHALL NOT BE
* LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING
* OR DISTRIBUTING THE SOFTWARE OR ITS DERIVATIVES. IN NO EVENT WILL SUN OR ITS
* LICENSORS BE LIABLE FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT,
* INDIRECT, SPECIAL, CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER
* CAUSED AND REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF
* OR INABILITY TO USE SOFTWARE, EVEN IF SUN HAS BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGES.
*
* This software is not designed or intended for use in on-line control of
* aircraft, air traffic, aircraft navigation or aircraft communications; or in
* the design, construction, operation or maintenance of any nuclear
* facility. Licensee represents and warrants that it will not use or
* redistribute the Software for such purposes.
*/
import javax.jms.*;
/**
* The ObjectMessages class consists only of a main method, which demonstrates
* that mutable objects are copied, not passed by reference, when you use them
* to create message objects.
* <p>
* The example uses only an ObjectMessage and a BytesMessage, but the same is
* true for all message formats.
*
* @author Kim Haase
* @version 1.4, 08/09/00
*/
public class ObjectMessages {
/**
* Main method. Takes no arguments.
*/
public static void main(String[] args) {
QueueConnectionFactory queueConnectionFactory = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;
ObjectMessage objectMessage = null;
String object = "A String is an object.";
BytesMessage bytesMessage = null;
byte[] byteArray = {3, 5, 7, 9, 11};
final int ARRLEN = 5;
int length = 0;
byte[] inByteData = new byte[ARRLEN];
int exitResult = 0;
try {
queueConnectionFactory =
SampleUtilities.getQueueConnectionFactory();
queueConnection =
queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
try {
/*
* Create an ObjectMessage from a String.
* Modify the original object.
* Read the message, proving that the object in the message
* has not changed.
*/
objectMessage = queueSession.createObjectMessage();
System.out.println("Writing ObjectMessage with string: " + object);
objectMessage.setObject(object);
object = "I'm a different String now.";
System.out.println("Changed string; object is now: " + object);
System.out.println("ObjectMessage contains: " +
(String) objectMessage.getObject());
/*
* Create a BytesMessage from an array.
* Modify an element of the original array.
* Reset and read the message, proving that contents of the message
* have not changed.
*/
bytesMessage = queueSession.createBytesMessage();
System.out.print("Writing BytesMessage with array: ");
for (int i = 0; i < ARRLEN; i++) {
System.out.print(" " + byteArray[i]);
}
System.out.println();
bytesMessage.writeBytes(byteArray);
byteArray[1] = 13;
System.out.print("Changed array element; array is now: ");
for (int i = 0; i < ARRLEN; i++) {
System.out.print(" " + byteArray[i]);
}
System.out.println();
bytesMessage.reset();
length = bytesMessage.readBytes(inByteData);
System.out.print("BytesMessage contains: ");
for (int i = 0; i < length; i++) {
System.out.print(" " + inByteData[i]);
}
System.out.println();
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
} finally {
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
SampleUtilities.exit(exitResult);
}
}
1.1 spyderMQ/examples/SynchTopicExample.java
Index: SynchTopicExample.java
===================================================================
/*
* @(#)SynchTopicExample.java 1.7 00/08/18
*
* Copyright (c) 2000 Sun Microsystems, Inc. All Rights Reserved.
*
* Sun grants you ("Licensee") a non-exclusive, royalty free, license to use,
* modify and redistribute this software in source and binary code form,
* provided that i) this copyright notice and license appear on all copies of
* the software; and ii) Licensee does not utilize the software in a manner
* which is disparaging to Sun.
*
* This software is provided "AS IS," without a warranty of any kind. ALL
* EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING ANY
* IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR
* NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN AND ITS LICENSORS SHALL NOT BE
* LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING
* OR DISTRIBUTING THE SOFTWARE OR ITS DERIVATIVES. IN NO EVENT WILL SUN OR ITS
* LICENSORS BE LIABLE FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT,
* INDIRECT, SPECIAL, CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER
* CAUSED AND REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF
* OR INABILITY TO USE SOFTWARE, EVEN IF SUN HAS BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGES.
*
* This software is not designed or intended for use in on-line control of
* aircraft, air traffic, aircraft navigation or aircraft communications; or in
* the design, construction, operation or maintenance of any nuclear
* facility. Licensee represents and warrants that it will not use or
* redistribute the Software for such purposes.
*/
import javax.jms.*;
/**
* The SynchTopicExample class demonstrates the simplest form of the
* publish/subscribe model: the publisher publishes a message, and the
* subscriber reads it using a synchronous receive.
* <p>
* The program contains a SimplePublisher class, a SynchSubscriber class, a
* main method, and a method that runs the subscriber and publisher
* threads.
* <p>
* Specify a topic name on the command line when you run the program.
* <p>
* The program calls methods in the SampleUtilities class.
*
* @author Kim Haase
* @version 1.7, 08/18/00
*/
public class SynchTopicExample {
String topicName = null;
int exitResult = 0;
/**
* The SynchSubscriber class fetches a single message from a topic using
* synchronous message delivery.
*
* @author Kim Haase
* @version 1.7, 08/18/00
*/
public class SynchSubscriber extends Thread {
/**
* Runs the thread.
*/
public void run() {
TopicConnectionFactory topicConnectionFactory = null;
TopicConnection topicConnection = null;
TopicSession topicSession = null;
Topic topic = null;
TopicSubscriber topicSubscriber = null;
final boolean NOLOCAL = true;
TextMessage inMessage = null;
TextMessage outMessage = null;
TopicPublisher topicPublisher = null;
/*
* Obtain connection factory.
* Create connection.
* Create session from connection; false means session is not
* transacted.
* Obtain topic name.
*/
try {
topicConnectionFactory =
SampleUtilities.getTopicConnectionFactory();
topicConnection =
topicConnectionFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
topic = SampleUtilities.getTopic(topicName, topicSession);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
/*
* Create subscriber, then start message delivery. Subscriber is
* non-local so that it won't receive the message we publish.
* Wait for text message to arrive, then display its contents.
* Close connection and exit.
*/
try {
topicSubscriber =
topicSession.createSubscriber(topic, null, NOLOCAL);
topicConnection.start();
inMessage = (TextMessage) topicSubscriber.receive();
System.out.println("SUBSCRIBER THREAD: Reading message: "
+ inMessage.getText());
/*
* Notify publisher that we received a message and it
* can stop broadcasting.
*/
topicPublisher = topicSession.createPublisher(topic);
outMessage = topicSession.createTextMessage();
outMessage.setText("Done");
topicPublisher.publish(outMessage);
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
} finally {
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}
}
/**
* The SimplePublisher class publishes a single message to a topic.
*
* @author Kim Haase
* @version 1.7, 08/18/00
*/
public class SimplePublisher extends Thread {
/**
* Runs the thread.
*/
public void run() {
TopicConnectionFactory topicConnectionFactory = null;
TopicConnection topicConnection = null;
TopicSession topicSession = null;
Topic topic = null;
TopicSubscriber publisherControlSubscriber = null;
final boolean NOLOCAL = true;
TopicPublisher topicPublisher = null;
TextMessage sentMessage = null;
final String MSG_TEXT = new String("Here is a message ");
Message receivedMessage = null;
/*
* Obtain connection factory.
* Create connection.
* Create session from connection; false means session is not
* transacted.
* Obtain topic name.
*/
try {
topicConnectionFactory =
SampleUtilities.getTopicConnectionFactory();
topicConnection =
topicConnectionFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
topic = SampleUtilities.getTopic(topicName, topicSession);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
/*
* Create non-local subscriber to receive "Done" message from
* another connection; start delivery.
* Create publisher and text message.
* Set message text, display it, and publish message.
* Close connection and exit.
*/
try {
publisherControlSubscriber =
topicSession.createSubscriber(topic, null, NOLOCAL);
topicConnection.start();
/*
* Publish a message once per second until subscriber
* reports that it has finished receiving messages.
*/
topicPublisher = topicSession.createPublisher(topic);
sentMessage = topicSession.createTextMessage();
for (int i = 1; receivedMessage == null; i++) {
sentMessage.setText(MSG_TEXT + i);
System.out.println("PUBLISHER THREAD: Publishing message: "
+ sentMessage.getText());
topicPublisher.publish(sentMessage);
try { Thread.sleep(1000); } catch (InterruptedException ie){}
receivedMessage = publisherControlSubscriber.receiveNoWait();
}
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
} finally {
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}
}
/**
* Instantiates the subscriber and publisher classes and starts their
* threads.
* Calls the join method to wait for the threads to die.
* <p>
* It is essential to start the subscriber before starting the publisher.
* In the publish/subscribe model, a subscriber can ordinarily receive only
* messages published while it is active.
*/
public void run_threads() {
SynchSubscriber synchSubscriber = new SynchSubscriber();
SimplePublisher simplePublisher = new SimplePublisher();
synchSubscriber.start();
simplePublisher.start();
try {
synchSubscriber.join();
simplePublisher.join();
} catch (InterruptedException e) {}
}
/**
* Reads the topic name from the command line and displays it. The
* topic must have been created by the jmsadmin tool.
* Calls the run_threads method to execute the program threads.
* Exits program.
*
* @param args the topic used by the example
*/
public static void main(String[] args) {
SynchTopicExample ste = new SynchTopicExample();
if (args.length != 1) {
System.out.println("Usage: java SynchTopicExample <topic_name>");
System.exit(1);
}
ste.topicName = new String(args[0]);
System.out.println("Topic name is " + ste.topicName);
ste.run_threads();
SampleUtilities.exit(ste.exitResult);
}
}
1.1 spyderMQ/examples/SampleUtilities.java
Index: SampleUtilities.java
===================================================================
/*
* @(#)SampleUtilities.java 1.7 00/08/18
*
* Copyright (c) 2000 Sun Microsystems, Inc. All Rights Reserved.
*
* Sun grants you ("Licensee") a non-exclusive, royalty free, license to use,
* modify and redistribute this software in source and binary code form,
* provided that i) this copyright notice and license appear on all copies of
* the software; and ii) Licensee does not utilize the software in a manner
* which is disparaging to Sun.
*
* This software is provided "AS IS," without a warranty of any kind. ALL
* EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING ANY
* IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR
* NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN AND ITS LICENSORS SHALL NOT BE
* LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING
* OR DISTRIBUTING THE SOFTWARE OR ITS DERIVATIVES. IN NO EVENT WILL SUN OR ITS
* LICENSORS BE LIABLE FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT,
* INDIRECT, SPECIAL, CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER
* CAUSED AND REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF
* OR INABILITY TO USE SOFTWARE, EVEN IF SUN HAS BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGES.
*
* This software is not designed or intended for use in on-line control of
* aircraft, air traffic, aircraft navigation or aircraft communications; or in
* the design, construction, operation or maintenance of any nuclear
* facility. Licensee represents and warrants that it will not use or
* redistribute the Software for such purposes.
*/
import javax.naming.*;
import javax.jms.*;
/**
* Utility class for JMS sample programs.
* <p>
* Set the <code>USE_JNDI</code> variable to true or false depending on whether
* your provider uses JNDI.
* <p>
* Contains the following methods:
* <ul>
* <li> getQueueConnectionFactory
* <li> getTopicConnectionFactory
* <li> getQueue
* <li> getTopic
* <li> jndiLookup
* <li> exit
* <li> receiveSynchronizeMessages
* <li> sendSynchronizeMessages
* </ul>
*
* Also contains the class DoneLatch, which contains the following methods:
* <ul>
* <li> waitTillDone
* <li> allDone
* </ul>
*
* @author Kim Haase
* @author Joseph Fialli
* @version 1.7, 08/18/00
*/
public class SampleUtilities {
public static final boolean USE_JNDI = true;
public static final String QUEUECONFAC = "QueueConnectionFactory";
public static final String TOPICCONFAC = "TopicConnectionFactory";
private static Context jndiContext = null;
/**
* Returns a QueueConnectionFactory object.
* If provider uses JNDI, serves as a wrapper around jndiLookup method.
* If provider does not use JNDI, substitute provider-specific code here.
*
* @return a QueueConnectionFactory object
* @throws javax.naming.NamingException (or other exception)
* if name cannot be found
*/
public static javax.jms.QueueConnectionFactory getQueueConnectionFactory()
throws Exception {
if (USE_JNDI) {
return (javax.jms.QueueConnectionFactory) jndiLookup(QUEUECONFAC);
} else {
// return new provider-specific QueueConnectionFactory
return null;
}
}
/**
* Returns a TopicConnectionFactory object.
* If provider uses JNDI, serves as a wrapper around jndiLookup method.
* If provider does not use JNDI, substitute provider-specific code here.
*
* @return a TopicConnectionFactory object
* @throws javax.naming.NamingException (or other exception)
* if name cannot be found
*/
public static javax.jms.TopicConnectionFactory getTopicConnectionFactory()
throws Exception {
if (USE_JNDI) {
return (javax.jms.TopicConnectionFactory) jndiLookup(TOPICCONFAC);
} else {
// return new provider-specific TopicConnectionFactory
return null;
}
}
/**
* Returns a Queue object.
* If provider uses JNDI, serves as a wrapper around jndiLookup method.
* If provider does not use JNDI, substitute provider-specific code here.
*
* @param name String specifying queue name
* @param session a QueueSession object
*
* @return a Queue object
* @throws javax.naming.NamingException (or other exception)
* if name cannot be found
*/
public static javax.jms.Queue getQueue(String name,
javax.jms.QueueSession session)
throws Exception {
if (USE_JNDI) {
return (javax.jms.Queue) jndiLookup("queue/"+name);
} else {
return session.createQueue(name);
}
}
/**
* Returns a Topic object.
* If provider uses JNDI, serves as a wrapper around jndiLookup method.
* If provider does not use JNDI, substitute provider-specific code here.
*
* @param name String specifying topic name
* @param session a TopicSession object
*
* @return a Topic object
* @throws javax.naming.NamingException (or other exception)
* if name cannot be found
*/
public static javax.jms.Topic getTopic(String name,
javax.jms.TopicSession session)
throws Exception {
if (USE_JNDI) {
return (javax.jms.Topic) jndiLookup("topic/"+name);
} else {
return session.createTopic(name);
}
}
/**
* Creates a JNDI InitialContext object if none exists yet. Then looks up
* the string argument and returns the associated object.
*
* @param name the name of the object to be looked up
*
* @return the object bound to <code>name</code>
* @throws javax.naming.NamingException if name cannot be found
*/
public static Object jndiLookup(String name) throws NamingException {
Object obj = null;
if (jndiContext == null) {
try {
jndiContext = new InitialContext();
} catch (NamingException e) {
System.out.println("Could not create JNDI context: " +
e.toString());
throw e;
}
}
try {
obj = jndiContext.lookup(name);
} catch (NamingException e) {
System.out.println("JNDI lookup failed: " + e.toString());
throw e;
}
return obj;
}
/**
* Calls System.exit().
*
* @param result The exit result; 0 indicates no errors
*/
public static void exit(int result) {
System.exit(result);
}
/**
* Wait for 'count' messages on controlQueue before continuing. Called by
* a publisher to make sure that subscribers have started before it begins
* publishing messages.
* <p>
* If controlQueue doesn't exist, the method throws an exception.
*
* @param prefix prefix (publisher or subscriber) to be displayed
* @param controlQueueName name of control queue
* @param count number of messages to receive
*/
public static void receiveSynchronizeMessages(String prefix,
String controlQueueName,
int count)
throws Exception {
QueueConnectionFactory queueConnectionFactory = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;
Queue controlQueue = null;
QueueReceiver queueReceiver = null;
try {
queueConnectionFactory =
SampleUtilities.getQueueConnectionFactory();
queueConnection = queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
controlQueue = getQueue(controlQueueName, queueSession);
queueConnection.start();
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException ee) {}
}
throw e;
}
try {
System.out.println(prefix + "Receiving synchronize messages from "
+ controlQueueName + "; count = " + count);
queueReceiver = queueSession.createReceiver(controlQueue);
while (count > 0) {
queueReceiver.receive();
count--;
System.out.println(prefix
+ "Received synchronize message; expect "
+ count + " more");
}
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
throw e;
} finally {
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException e) {}
}
}
}
/**
* Send a message to controlQueue. Called by a subscriber to notify a
* publisher that it is ready to receive messages.
* <p>
* If controlQueue doesn't exist, the method throws an exception.
*
* @param prefix prefix (publisher or subscriber) to be displayed
* @param controlQueueName name of control queue
*/
public static void sendSynchronizeMessage(String prefix,
String controlQueueName)
throws Exception {
QueueConnectionFactory queueConnectionFactory = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;
Queue controlQueue = null;
QueueSender queueSender = null;
TextMessage message = null;
try {
queueConnectionFactory =
SampleUtilities.getQueueConnectionFactory();
queueConnection = queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
controlQueue = getQueue(controlQueueName, queueSession);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException ee) {}
}
throw e;
}
try {
queueSender = queueSession.createSender(controlQueue);
message = queueSession.createTextMessage();
message.setText("synchronize");
System.out.println(prefix + "Sending synchronize message to "
+ controlQueueName);
queueSender.send(message);
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
throw e;
} finally {
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException e) {}
}
}
}
/**
* Monitor class for asynchronous examples. Producer signals end of
* message stream; listener calls allDone() to notify consumer that the
* signal has arrived, while consumer calls waitTillDone() to wait for this
* notification.
*
* @author Joseph Fialli
* @version 1.7, 08/18/00
*/
static public class DoneLatch {
boolean done = false;
/**
* Waits until done is set to true.
*/
public void waitTillDone() {
synchronized (this) {
while (! done) {
try {
this.wait();
} catch (InterruptedException ie) {}
}
}
}
/**
* Sets done to true.
*/
public void allDone() {
synchronized (this) {
done = true;
this.notify();
}
}
}
}
1.1 spyderMQ/examples/SynchQueueReceiver.java
Index: SynchQueueReceiver.java
===================================================================
/*
* @(#)SynchQueueReceiver.java 1.7 00/08/14
*
* Copyright (c) 2000 Sun Microsystems, Inc. All Rights Reserved.
*
* Sun grants you ("Licensee") a non-exclusive, royalty free, license to use,
* modify and redistribute this software in source and binary code form,
* provided that i) this copyright notice and license appear on all copies of
* the software; and ii) Licensee does not utilize the software in a manner
* which is disparaging to Sun.
*
* This software is provided "AS IS," without a warranty of any kind. ALL
* EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING ANY
* IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR
* NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN AND ITS LICENSORS SHALL NOT BE
* LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING
* OR DISTRIBUTING THE SOFTWARE OR ITS DERIVATIVES. IN NO EVENT WILL SUN OR ITS
* LICENSORS BE LIABLE FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT,
* INDIRECT, SPECIAL, CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER
* CAUSED AND REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF
* OR INABILITY TO USE SOFTWARE, EVEN IF SUN HAS BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGES.
*
* This software is not designed or intended for use in on-line control of
* aircraft, air traffic, aircraft navigation or aircraft communications; or in
* the design, construction, operation or maintenance of any nuclear
* facility. Licensee represents and warrants that it will not use or
* redistribute the Software for such purposes.
*/
import javax.jms.*;
/**
* The SynchQueueReceiver class consists only of a main method, which fetches
* one or more messages from a queue using synchronous message delivery. Run
* this program in conjunction with SenderToQueue. Specify a queue name on the
* command line when you run the program.
* <p>
* The program calls methods in the SampleUtilities class.
*
* @author Kim Haase
* @version 1.7, 08/14/00
*/
public class SynchQueueReceiver {
/**
* Main method.
*
* @param args the queue used by the example
*/
public static void main(String[] args) {
String queueName = null;
QueueConnectionFactory queueConnectionFactory = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;
Queue queue = null;
QueueReceiver queueReceiver = null;
TextMessage message = null;
int exitResult = 0;
/*
* Read queue name from command line and display it.
*/
if (args.length != 1) {
System.out.println("Usage: java SynchQueueReceiver <queue_name>");
System.exit(1);
}
queueName = new String(args[0]);
System.out.println("Queue name is " + queueName);
/*
* Obtain connection factory.
* Create connection.
* Create session from connection; false means session is not
* transacted.
* Obtain queue name.
*/
try {
queueConnectionFactory =
SampleUtilities.getQueueConnectionFactory();
queueConnection =
queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
queue = SampleUtilities.getQueue(queueName, queueSession);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
/*
* Create receiver, then start message delivery.
* Receive all text messages from queue until
* a non-text message is received indicating end of
* message stream.
* Close connection and exit.
*/
try {
queueReceiver = queueSession.createReceiver(queue);
queueConnection.start();
while (true) {
Message m = queueReceiver.receive();
if (m instanceof TextMessage) {
message = (TextMessage) m;
System.out.println("Reading message: " + message.getText());
} else {
// Non-text control message indicates end of messages.
break;
}
}
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
} finally {
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
SampleUtilities.exit(exitResult);
}
}
1.1 spyderMQ/examples/MessageHeadersTopic.java
Index: MessageHeadersTopic.java
===================================================================
/*
* @(#)MessageHeadersTopic.java 1.8 00/08/18
*
* Copyright (c) 2000 Sun Microsystems, Inc. All Rights Reserved.
*
* Sun grants you ("Licensee") a non-exclusive, royalty free, license to use,
* modify and redistribute this software in source and binary code form,
* provided that i) this copyright notice and license appear on all copies of
* the software; and ii) Licensee does not utilize the software in a manner
* which is disparaging to Sun.
*
* This software is provided "AS IS," without a warranty of any kind. ALL
* EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING ANY
* IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR
* NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN AND ITS LICENSORS SHALL NOT BE
* LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING
* OR DISTRIBUTING THE SOFTWARE OR ITS DERIVATIVES. IN NO EVENT WILL SUN OR ITS
* LICENSORS BE LIABLE FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT,
* INDIRECT, SPECIAL, CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER
* CAUSED AND REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF
* OR INABILITY TO USE SOFTWARE, EVEN IF SUN HAS BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGES.
*
* This software is not designed or intended for use in on-line control of
* aircraft, air traffic, aircraft navigation or aircraft communications; or in
* the design, construction, operation or maintenance of any nuclear
* facility. Licensee represents and warrants that it will not use or
* redistribute the Software for such purposes.
*/
import java.sql.*;
import java.util.*;
import javax.jms.*;
/**
* The MessageHeadersTopic class demonstrates the use of message header fields.
* <p>
* The program contains a HeaderPublisher class, a HeaderSubscriber class, a
* display_headers() method that is called by both classes, a main method, and
* a method that runs the subscriber and publisher threads.
* <p>
* The publishing class sends three messages, and the subscribing class
* receives them. The program displays the message headers just before the
* publish call and just after the receive so that you can see which ones are
* set by the publish method.
* <p>
* Specify a topic name on the command line when you run the program. The
* program also uses a queue named "controlQueue", which should be created
* before you run the program.
*
* @author Kim Haase
* @version 1.8, 08/18/00
*/
public class MessageHeadersTopic {
final String CONTROL_QUEUE = "controlQueue";
String topicName = null;
int exitResult = 0;
/**
* The HeaderPublisher class publishes three messages, setting the JMSType
* message header field, one of three header fields that are not set by
* the publish method. (The others, JMSCorrelationID and JMSReplyTo, are
* demonstrated in the RequestReplyQueue example.) It also sets a
* client property, "messageNumber".
*
* The displayHeaders method is called just before the publish method.
*
* @author Kim Haase
* @version 1.8, 08/18/00
*/
public class HeaderPublisher extends Thread {
/**
* Runs the thread.
*/
public void run() {
TopicConnectionFactory topicConnectionFactory = null;
TopicConnection topicConnection = null;
TopicSession topicSession = null;
Topic topic = null;
TopicPublisher topicPublisher = null;
TextMessage message = null;
final String MSG_TEXT = new String("Read My Headers");
try {
topicConnectionFactory =
SampleUtilities.getTopicConnectionFactory();
topicConnection =
topicConnectionFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
topic = SampleUtilities.getTopic(topicName, topicSession);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
try {
/*
* Synchronize with subscriber. Wait for message indicating
* that subscriber is ready to receive messages.
*/
try {
SampleUtilities.receiveSynchronizeMessages("PUBLISHER THREAD: ",
CONTROL_QUEUE, 1);
} catch (Exception e) {
System.out.println("Queue probably missing: " + e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
// Create publisher.
topicPublisher = topicSession.createPublisher(topic);
// First message: no-argument form of publish method
message = topicSession.createTextMessage();
message.setJMSType("Simple");
System.out.println("PUBLISHER THREAD: Setting JMSType to "
+ message.getJMSType());
message.setIntProperty("messageNumber", 1);
System.out.println("PUBLISHER THREAD: Setting client property
messageNumber to "
+ message.getIntProperty("messageNumber"));
message.setText(MSG_TEXT);
System.out.println("PUBLISHER THREAD: Setting message text to: "
+ message.getText());
System.out.println("PUBLISHER THREAD: Headers before message is
sent:");
displayHeaders(message, "PUBLISHER THREAD: ");
topicPublisher.publish(message);
/*
* Second message: 3-argument form of publish method;
* explicit setting of delivery mode, priority, and
* expiration
*/
message = topicSession.createTextMessage();
message.setJMSType("Less Simple");
System.out.println("\nPUBLISHER THREAD: Setting JMSType to "
+ message.getJMSType());
message.setIntProperty("messageNumber", 2);
System.out.println("PUBLISHER THREAD: Setting client property
messageNumber to "
+ message.getIntProperty("messageNumber"));
message.setText(MSG_TEXT + " Again");
System.out.println("PUBLISHER THREAD: Setting message text to: "
+ message.getText());
displayHeaders(message, "PUBLISHER THREAD: ");
topicPublisher.publish(message, DeliveryMode.NON_PERSISTENT,
3, 10000);
/*
* Third message: no-argument form of publish method,
* MessageID and Timestamp disabled
*/
message = topicSession.createTextMessage();
message.setJMSType("Disable Test");
System.out.println("\nPUBLISHER THREAD: Setting JMSType to "
+ message.getJMSType());
message.setIntProperty("messageNumber", 3);
System.out.println("PUBLISHER THREAD: Setting client property
messageNumber to "
+ message.getIntProperty("messageNumber"));
message.setText(MSG_TEXT
+ " with MessageID and Timestamp disabled");
System.out.println("PUBLISHER THREAD: Setting message text to: "
+ message.getText());
topicPublisher.setDisableMessageID(true);
topicPublisher.setDisableMessageTimestamp(true);
System.out.println("PUBLISHER THREAD: Disabling Message ID and
Timestamp");
displayHeaders(message, "PUBLISHER THREAD: ");
topicPublisher.publish(message);
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
} finally {
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}
}
/**
* The HeaderSubscriber class receives the three messages and calls the
* displayHeaders method to show how the publish method changed the
* header values.
* <p>
* The first message, in which no fields were set explicitly by the publish
* method, shows the default values of these fields.
* <p>
* The second message shows the values set explicitly by the publish method.
* <p>
* The third message shows whether disabling the MessageID and Timestamp
* has any effect in the current JMS implementation.
*
* @author Kim Haase
* @version 1.8, 08/18/00
*/
public class HeaderSubscriber extends Thread {
/**
* Runs the thread.
*/
public void run() {
TopicConnectionFactory topicConnectionFactory = null;
TopicConnection topicConnection = null;
TopicSession topicSession = null;
Topic topic = null;
TopicSubscriber topicSubscriber = null;
final boolean NOLOCAL = true;
TextMessage message = null;
try {
topicConnectionFactory =
SampleUtilities.getTopicConnectionFactory();
topicConnection =
topicConnectionFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
topic = SampleUtilities.getTopic(topicName, topicSession);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
/*
* Create subscriber and start message delivery.
* Send synchronize message to publisher.
* Receive the three messages.
* Call the displayHeaders method to display the message headers.
*/
try {
topicSubscriber =
topicSession.createSubscriber(topic, null, NOLOCAL);
topicConnection.start();
// Let publisher know that subscriber is ready.
try {
SampleUtilities.sendSynchronizeMessage("SUBSCRIBER THREAD: ",
CONTROL_QUEUE);
} catch (Exception e) {
System.out.println("Queue probably missing: " + e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
for (int i = 0; i < 3; i++) {
message = (TextMessage) topicSubscriber.receive();
System.out.println("\nSUBSCRIBER THREAD: Message received: "
+ message.getText());
System.out.println("SUBSCRIBER THREAD: Headers after message is
received:");
displayHeaders(message, "SUBSCRIBER THREAD: ");
}
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
} finally {
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}
}
/**
* Displays all message headers. Each display is in a try/catch block in
* case the header is not set before the message is published.
*
* @param message the message whose headers are to be displayed
* @param prefix the prefix (publisher or subscriber) to be displayed
*/
public void displayHeaders (Message message, String prefix) {
Destination dest = null;
int delMode = 0;
long expiration = 0;
Time expTime = null;
int priority = 0;
String msgID = null;
long timestamp = 0;
Time timestampTime = null;
String correlID = null;
Destination replyTo = null;
boolean redelivered = false;
String type = null;
String propertyName = null;
try {
System.out.println(prefix + "Headers set by publish/send method: ");
// Display the destination (topic, in this case).
try {
dest = message.getJMSDestination();
System.out.println(prefix + " JMSDestination: " + dest);
} catch (Exception e) {
System.out.println(prefix + "Exception occurred: "
+ e.toString());
exitResult = 1;
}
// Display the delivery mode.
try {
delMode = message.getJMSDeliveryMode();
System.out.print(prefix);
if (delMode == DeliveryMode.NON_PERSISTENT) {
System.out.println(" JMSDeliveryMode: non-persistent");
} else if (delMode == DeliveryMode.PERSISTENT) {
System.out.println(" JMSDeliveryMode: persistent");
} else {
System.out.println(" JMSDeliveryMode: neither persistent nor
non-persistent; error");
}
} catch (Exception e) {
System.out.println(prefix + "Exception occurred: "
+ e.toString());
exitResult = 1;
}
/*
* Display the expiration time. If value is 0 (the default),
* the message never expires. Otherwise, cast the value
* to a Time object for display.
*/
try {
expiration = message.getJMSExpiration();
System.out.print(prefix);
if (expiration != 0) {
expTime = new Time(expiration);
System.out.println(" JMSExpiration: " + expTime);
} else {
System.out.println(" JMSExpiration: " + expiration);
}
} catch (Exception e) {
System.out.println(prefix + "Exception occurred: "
+ e.toString());
exitResult = 1;
}
// Display the priority.
try {
priority = message.getJMSPriority();
System.out.println(prefix + " JMSPriority: " + priority);
} catch (Exception e) {
System.out.println(prefix + "Exception occurred: "
+ e.toString());
exitResult = 1;
}
// Display the message ID.
try {
msgID = message.getJMSMessageID();
System.out.println(prefix + " JMSMessageID: " + msgID);
} catch (Exception e) {
System.out.println(prefix + "Exception occurred: "
+ e.toString());
exitResult = 1;
}
/*
* Display the timestamp.
* If value is not 0, cast it to a Time object for display.
*/
try {
timestamp = message.getJMSTimestamp();
System.out.print(prefix);
if (timestamp != 0) {
timestampTime = new Time(timestamp);
System.out.println(" JMSTimestamp: " + timestampTime);
} else {
System.out.println(" JMSTimestamp: " + timestamp);
}
} catch (Exception e) {
System.out.println(prefix + "Exception occurred: "
+ e.toString());
exitResult = 1;
}
// Display the correlation ID.
try {
correlID = message.getJMSCorrelationID();
System.out.println(prefix + " JMSCorrelationID: " + correlID);
} catch (Exception e) {
System.out.println(prefix + "Exception occurred: "
+ e.toString());
exitResult = 1;
}
// Display the ReplyTo destination.
try {
replyTo = message.getJMSReplyTo();
System.out.println(prefix + " JMSReplyTo: " + replyTo);
} catch (Exception e) {
System.out.println(prefix + "Exception occurred: "
+ e.toString());
exitResult = 1;
}
// Display the Redelivered value (usually false).
System.out.println(prefix + "Header set by JMS provider:");
try {
redelivered = message.getJMSRedelivered();
System.out.println(prefix + " JMSRedelivered: " + redelivered);
} catch (Exception e) {
System.out.println(prefix + "Exception occurred: "
+ e.toString());
exitResult = 1;
}
// Display the JMSType.
System.out.println(prefix + "Headers set by client program:");
try {
type = message.getJMSType();
System.out.println(prefix + " JMSType: " + type);
} catch (Exception e) {
System.out.println(prefix + "Exception occurred: "
+ e.toString());
exitResult = 1;
}
// Display any client properties.
try {
for (Enumeration e = message.getPropertyNames(); e.hasMoreElements()
;) {
propertyName = new String((String) e.nextElement());
System.out.println(prefix + " Client property "
+ propertyName + ": "
+ message.getObjectProperty(propertyName));
}
} catch (Exception e) {
System.out.println(prefix + "Exception occurred: "
+ e.toString());
exitResult = 1;
}
} catch (Exception e) {
System.out.println(prefix + "Exception occurred: "
+ e.toString());
exitResult = 1;
}
}
/**
* Instantiates the subscriber and publisher classes and starts their
* threads.
* Calls the join method to wait for the threads to die.
* <p>
* It is essential to start the subscriber before starting the publisher.
* In the publish/subscribe model, a subscriber can ordinarily receive only
* messages published while it is active.
*/
public void run_threads() {
HeaderSubscriber headerSubscriber = new HeaderSubscriber();
HeaderPublisher headerPublisher = new HeaderPublisher();
headerSubscriber.start();
headerPublisher.start();
try {
headerSubscriber.join();
headerPublisher.join();
} catch (InterruptedException e) {}
}
/**
* Reads the topic name from the command line, then calls the
* run_threads method to execute the program threads.
*
* @param args the topic used by the example
*/
public static void main(String[] args) {
MessageHeadersTopic mht = new MessageHeadersTopic();
if (args.length != 1) {
System.out.println("Usage: java MessageHeadersTopic <topic_name>");
System.exit(1);
}
mht.topicName = new String(args[0]);
System.out.println("Topic name is " + mht.topicName);
mht.run_threads();
SampleUtilities.exit(mht.exitResult);
}
}
1.1 spyderMQ/examples/run.bat
Index: run.bat
===================================================================
java -cp
".;..\conf\default;..\client\jnp-client.jar;..\client\spydermq-client.jar;..\lib\ext\jms.jar"
%1 %2 %3 %4 %5 %6 %7 %8 %9
1.1 spyderMQ/examples/BytesMessages.java
Index: BytesMessages.java
===================================================================
/*
* @(#)BytesMessages.java 1.5 00/08/09
*
* Copyright (c) 2000 Sun Microsystems, Inc. All Rights Reserved.
*
* Sun grants you ("Licensee") a non-exclusive, royalty free, license to use,
* modify and redistribute this software in source and binary code form,
* provided that i) this copyright notice and license appear on all copies of
* the software; and ii) Licensee does not utilize the software in a manner
* which is disparaging to Sun.
*
* This software is provided "AS IS," without a warranty of any kind. ALL
* EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING ANY
* IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR
* NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN AND ITS LICENSORS SHALL NOT BE
* LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING
* OR DISTRIBUTING THE SOFTWARE OR ITS DERIVATIVES. IN NO EVENT WILL SUN OR ITS
* LICENSORS BE LIABLE FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT,
* INDIRECT, SPECIAL, CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER
* CAUSED AND REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF
* OR INABILITY TO USE SOFTWARE, EVEN IF SUN HAS BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGES.
*
* This software is not designed or intended for use in on-line control of
* aircraft, air traffic, aircraft navigation or aircraft communications; or in
* the design, construction, operation or maintenance of any nuclear
* facility. Licensee represents and warrants that it will not use or
* redistribute the Software for such purposes.
*/
import java.io.*;
import javax.jms.*;
/**
* The BytesMessages class consists only of a main method, which reads a
* textfile, creates a BytesMessage from it, then reads the message. It does
* not send the message.
* <p>
* Specify an existing text file name on the command line when you run
* the program.
* <p>
* This is not a realistic example of the use of the BytesMessage message type,
* which is intended for client encoding of existing message formats. (If
* possible, one of the other message types, such as StreamMessage or
* MapMessage, should be used instead.) However, it shows how to use a buffer
* to write or read a BytesMessage when you do not know its length.
*
* @author Kim Haase
* @version 1.5, 08/09/00
*/
public class BytesMessages {
/**
* Main method.
*
* @param args the name of the text file used by the example
*/
public static void main(String[] args) {
String filename = null;
FileInputStream inStream = null;
QueueConnectionFactory queueConnectionFactory = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;
BytesMessage bytesMessage = null;
int bytes_read = 0;
final int BUFLEN = 64;
byte[] buf1 = new byte[BUFLEN];
byte[] buf2 = new byte[BUFLEN];
int length = 0;
int exitResult = 0;
/*
* Read text file name from command line and create input stream.
*/
if (args.length != 1) {
System.out.println("Usage: java BytesMessages <filename>");
System.exit(1);
}
try {
filename = new String(args[0]);
inStream = new FileInputStream(filename);
} catch (IOException e) {
System.out.println("Problem getting file: " + e.toString());
System.exit(1);
}
try {
queueConnectionFactory =
SampleUtilities.getQueueConnectionFactory();
queueConnection =
queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
try {
/*
* Create a BytesMessage.
* Read a byte stream from the input stream into a buffer and
* construct a BytesMessage, using the three-argument form
* of the writeBytes method to ensure that the message contains
* only the bytes read from the file, not any leftover characters
* in the buffer.
*/
bytesMessage = queueSession.createBytesMessage();
while ((bytes_read = inStream.read(buf1)) != -1) {
bytesMessage.writeBytes(buf1, 0, bytes_read);
System.out.println("Writing " + bytes_read
+ " bytes into message");
}
/*
* Reset the message to the beginning, then use readBytes to
* extract its contents into another buffer, casting the byte array
* elements to char so that they will display intelligibly.
*/
bytesMessage.reset();
do {
length = bytesMessage.readBytes(buf2);
if (length != -1) {
System.out.println("Reading " + length
+ " bytes from message: ");
for (int i = 0; i < length; i++) {
System.out.print((char)buf2[i]);
}
}
System.out.println();
} while (length >= BUFLEN);
} catch (JMSException e) {
System.out.println("JMS exception occurred: " + e.toString());
exitResult = 1;
} catch (IOException e) {
System.out.println("I/O exception occurred: " + e.toString());
exitResult = 1;
} finally {
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
SampleUtilities.exit(exitResult);
}
}
1.1 spyderMQ/examples/TopicSelectors.java
Index: TopicSelectors.java
===================================================================
/*
* @(#)TopicSelectors.java 1.6 00/08/18
*
* Copyright (c) 2000 Sun Microsystems, Inc. All Rights Reserved.
*
* Sun grants you ("Licensee") a non-exclusive, royalty free, license to use,
* modify and redistribute this software in source and binary code form,
* provided that i) this copyright notice and license appear on all copies of
* the software; and ii) Licensee does not utilize the software in a manner
* which is disparaging to Sun.
*
* This software is provided "AS IS," without a warranty of any kind. ALL
* EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING ANY
* IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR
* NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN AND ITS LICENSORS SHALL NOT BE
* LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING
* OR DISTRIBUTING THE SOFTWARE OR ITS DERIVATIVES. IN NO EVENT WILL SUN OR ITS
* LICENSORS BE LIABLE FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT,
* INDIRECT, SPECIAL, CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER
* CAUSED AND REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF
* OR INABILITY TO USE SOFTWARE, EVEN IF SUN HAS BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGES.
*
* This software is not designed or intended for use in on-line control of
* aircraft, air traffic, aircraft navigation or aircraft communications; or in
* the design, construction, operation or maintenance of any nuclear
* facility. Licensee represents and warrants that it will not use or
* redistribute the Software for such purposes.
*/
import java.util.*;
import javax.jms.*;
/**
* The TopicSelectors class demonstrates the use of multiple
* subscribers and message selectors.
* <p>
* The program contains a Publisher class, a Subscriber class with a listener
* class, a main method, and a method that runs the subscriber and publisher
* threads.
* <p>
* The Publisher class publishes 30 messages of 6 different types, randomly
* selected, then publishes a "Finished" message. The program creates four
* instances of the Subscriber class, one for each of three types and one that
* listens for the "Finished" message. Each subscriber instance uses a
* different message selector to fetch messages of only one type. The publisher
* displays the messages it sends, and the listener displays the messages that
* the subscribers receive. Because all the objects run in threads, the
* displays are interspersed when the program runs.
* <p>
* Specify a topic name on the command line when you run the program. The
* program also uses a queue named "controlQueue", which should be created
* before you run the program.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
public class TopicSelectors {
final String CONTROL_QUEUE = "controlQueue";
String topicName = null;
static final String MESSAGE_TYPES[] =
{"Nation/World", "Metro/Region", "Business",
"Sports", "Living/Arts", "Opinion",
// always last type
"Finished"
};
static final String END_OF_MESSAGE_STREAM_TYPE =
MESSAGE_TYPES[MESSAGE_TYPES.length-1];
int exitResult = 0;
/**
* The Publisher class publishes a number of messages. For each, it
* randomly chooses a message type. It creates a message and sets its
* text to a message that indicates the message type.
* It also sets the client property NewsType, which the Subscriber
* objects use as the message selector.
* After a pause to allow the subscribers to get all the messages, the
* publisher sends a final message with a NewsType of "Finished", which
* signals the end of the messages.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
public class Publisher extends Thread {
final int NUM_SUBSCRIBERS;
final int ARRSIZE = 6;
public Publisher(int numSubscribers) {
NUM_SUBSCRIBERS = numSubscribers;
}
/**
* Chooses a message type by using the random number generator
* found in java.util.
*
* @return the String representing the message type
*/
public String chooseType() {
int whichMsg;
Random rgen = new Random();
whichMsg = rgen.nextInt(ARRSIZE);
return MESSAGE_TYPES[whichMsg];
}
/**
* Runs the thread.
*/
public void run() {
TopicConnectionFactory topicConnectionFactory = null;
TopicConnection topicConnection = null;
TopicSession topicSession = null;
Topic topic = null;
TopicPublisher topicPublisher = null;
TextMessage message = null;
int numMsgs = ARRSIZE * 5;
String messageType = null;
try {
topicConnectionFactory =
SampleUtilities.getTopicConnectionFactory();
topicConnection =
topicConnectionFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
topic = SampleUtilities.getTopic(topicName, topicSession);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
/*
* After synchronizing with subscriber, create publisher.
* Create and send news messages.
* Send end-of-messages message.
*/
try {
/*
* Synchronize with subscribers. Wait for messages indicating
* that all subscribers are ready to receive messages.
*/
try {
SampleUtilities.receiveSynchronizeMessages("PUBLISHER THREAD: ",
CONTROL_QUEUE,
NUM_SUBSCRIBERS);
} catch (Exception e) {
System.out.println("Queue probably missing: " + e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
topicPublisher = topicSession.createPublisher(topic);
message = topicSession.createTextMessage();
for (int i = 0; i < numMsgs; i++) {
messageType = chooseType();
message.setStringProperty("NewsType", messageType);
message.setText("Item " + i + ": " + messageType);
System.out.println("PUBLISHER THREAD: Setting message text to: "
+ message.getText());
topicPublisher.publish(message);
}
message.setStringProperty("NewsType", END_OF_MESSAGE_STREAM_TYPE);
message.setText("That's all the news for today.");
System.out.println("PUBLISHER THREAD: Setting message text to: "
+ message.getText());
topicPublisher.publish(message);
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
} finally {
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}
}
/**
* Each instance of the Subscriber class creates a subscriber that uses
* a message selector that is based on the string passed to its
* constructor.
* It registers its message listener, then starts listening
* for messages. It does not exit until the message listener sets the
* variable done to true, which happens when the listener gets the last
* message.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
public class Subscriber extends Thread {
String whatKind;
int subscriberNumber;
/**
* The MultipleListener class implements the MessageListener interface
* by defining an onMessage method for the Subscriber class.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
private class MultipleListener implements MessageListener {
final SampleUtilities.DoneLatch monitor =
new SampleUtilities.DoneLatch();
/**
* Displays the message text.
* If the value of the NewsType property is "Finished", the message
* listener sets its monitor state to all done processing messages.
*
* @param inMessage the incoming message
*/
public void onMessage(Message inMessage) {
TextMessage msg = (TextMessage) inMessage;
String newsType;
try {
System.out.println("SUBSCRIBER " + subscriberNumber
+ " THREAD: Message received: "
+ msg.getText());
newsType = msg.getStringProperty("NewsType");
if (newsType.equals(TopicSelectors.END_OF_MESSAGE_STREAM_TYPE)) {
System.out.println("SUBSCRIBER " + subscriberNumber
+ " THREAD: Received finished-publishing message");
monitor.allDone();
}
} catch(JMSException e) {
System.out.println("Exception in onMessage(): "
+ e.toString());
}
}
}
/**
* Constructor. Sets whatKind to indicate the type of
* message this Subscriber object will listen for; sets
* subscriberNumber based on Subscriber array index.
*
* @param str a String from the MESSAGE_TYPES array
* @param num the index of the Subscriber array
*/
public Subscriber(String str, int num) {
whatKind = str;
subscriberNumber = num + 1;
}
/**
* Runs the thread.
*/
public void run() {
TopicConnectionFactory topicConnectionFactory = null;
TopicConnection topicConnection = null;
TopicSession topicSession = null;
Topic topic = null;
String selector = null;
TopicSubscriber topicSubscriber = null;
MultipleListener multipleListener = new MultipleListener();
try {
topicConnectionFactory =
SampleUtilities.getTopicConnectionFactory();
topicConnection =
topicConnectionFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
topic = SampleUtilities.getTopic(topicName, topicSession);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
/*
* Create subscriber with message selector.
* Start message delivery.
* Send synchronize message to publisher, then wait till all
* messages have arrived.
* Listener displays the messages obtained.
*/
try {
selector = new String("NewsType = '" + whatKind + "'" +
" OR NewsType = '" +
END_OF_MESSAGE_STREAM_TYPE + "'");
System.out.println("SUBSCRIBER " + subscriberNumber
+ " THREAD: selector is \"" + selector + "\"");
topicSubscriber =
topicSession.createSubscriber(topic, selector, false);
topicSubscriber.setMessageListener(multipleListener);
topicConnection.start();
// Let publisher know that subscriber is ready.
try {
SampleUtilities.sendSynchronizeMessage("SUBSCRIBER "
+ subscriberNumber
+ " THREAD: ",
CONTROL_QUEUE);
} catch (Exception e) {
System.out.println("Queue probably missing: " + e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
/*
* Asynchronously process appropriate news messages.
* Block until publisher issues a finished message.
*/
multipleListener.monitor.waitTillDone();
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
} finally {
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}
}
/**
* Creates an array of Subscriber objects, one for each of three message
* types including the Finished type, and starts their threads.
* Creates a Publisher object and starts its thread.
* Calls the join method to wait for the threads to die.
*/
public void run_threads() {
final int NUM_SUBSCRIBERS = 3;
Subscriber subscriberArray[] = new Subscriber[NUM_SUBSCRIBERS];
Publisher publisher = new Publisher(NUM_SUBSCRIBERS);
subscriberArray[0] = new Subscriber(MESSAGE_TYPES[2], 0);
subscriberArray[0].start();
subscriberArray[1] = new Subscriber(MESSAGE_TYPES[3], 1);
subscriberArray[1].start();
subscriberArray[2] = new Subscriber(MESSAGE_TYPES[4], 2);
subscriberArray[2].start();
publisher.start();
for (int i = 0; i < subscriberArray.length; i++) {
try {
subscriberArray[i].join();
} catch (InterruptedException e) {}
}
try {
publisher.join();
} catch (InterruptedException e) {}
}
/**
* Reads the topic name from the command line, then calls the
* run_threads method to execute the program threads.
*
* @param args the topic used by the example
*/
public static void main(String[] args) {
TopicSelectors ts = new TopicSelectors();
if (args.length != 1) {
System.out.println("Usage: java TopicSelectors <topic_name>");
System.exit(1);
}
ts.topicName = new String(args[0]);
System.out.println("Topic name is " + ts.topicName);
ts.run_threads();
SampleUtilities.exit(ts.exitResult);
}
}
1.43 +21 -6 spyderMQ/examples/TestClient.java
1.1 spyderMQ/examples/SenderToQueue.java
Index: SenderToQueue.java
===================================================================
/*
* @(#)SenderToQueue.java 1.2 00/08/18
*
* Copyright (c) 2000 Sun Microsystems, Inc. All Rights Reserved.
*
* Sun grants you ("Licensee") a non-exclusive, royalty free, license to use,
* modify and redistribute this software in source and binary code form,
* provided that i) this copyright notice and license appear on all copies of
* the software; and ii) Licensee does not utilize the software in a manner
* which is disparaging to Sun.
*
* This software is provided "AS IS," without a warranty of any kind. ALL
* EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING ANY
* IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR
* NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN AND ITS LICENSORS SHALL NOT BE
* LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING
* OR DISTRIBUTING THE SOFTWARE OR ITS DERIVATIVES. IN NO EVENT WILL SUN OR ITS
* LICENSORS BE LIABLE FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT,
* INDIRECT, SPECIAL, CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER
* CAUSED AND REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF
* OR INABILITY TO USE SOFTWARE, EVEN IF SUN HAS BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGES.
*
* This software is not designed or intended for use in on-line control of
* aircraft, air traffic, aircraft navigation or aircraft communications; or in
* the design, construction, operation or maintenance of any nuclear
* facility. Licensee represents and warrants that it will not use or
* redistribute the Software for such purposes.
*/
import javax.jms.*;
/**
* The SenderToQueue class consists only of a main method, which sends
* several messages to a queue.
* <p>
* Run this program in conjunction with either SynchQueueReceiver or
* AsynchQueueReceiver. Specify a queue name on the command line when you run
* the program. By default, the program sends one message. Specify a number
* after the queue name to send that number of messages.
*
* @author Kim Haase
* @version 1.2, 08/18/00
*/
public class SenderToQueue {
/**
* Main method.
*
* @param args the queue used by the example and, optionally, the
* number of messages to send
*/
public static void main(String[] args) {
String queueName = null;
QueueConnectionFactory queueConnectionFactory = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;
Queue queue = null;
QueueSender queueSender = null;
TextMessage message = null;
final int NUM_MSGS;
final String MSG_TEXT = new String("Here is a message");
int exitResult = 0;
if ( (args.length < 1) || (args.length > 2) ) {
System.out.println("Usage: java SenderToQueue <queue_name>
[<number_of_messages>]");
System.exit(1);
}
queueName = new String(args[0]);
System.out.println("Queue name is " + queueName);
if (args.length == 2){
NUM_MSGS = (new Integer(args[1])).intValue();
} else {
NUM_MSGS = 1;
}
try {
queueConnectionFactory =
SampleUtilities.getQueueConnectionFactory();
queueConnection =
queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
queue = SampleUtilities.getQueue(queueName, queueSession);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
/*
* Create sender.
* Create text message.
* Send five messages, varying text slightly.
* Send end-of-messages message.
* Finally, close connection.
*/
try {
queueSender = queueSession.createSender(queue);
message = queueSession.createTextMessage();
for (int i = 0; i < NUM_MSGS; i++) {
message.setText(MSG_TEXT + " " + (i + 1));
System.out.println("Sending message: " + message.getText());
queueSender.send(message);
}
// Send a non-text control message indicating end of messages.
queueSender.send(queueSession.createMessage());
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
} finally {
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
SampleUtilities.exit(exitResult);
}
}
1.1 spyderMQ/examples/run.sh
Index: run.sh
===================================================================
#!/bin/sh
java -cp
".:../conf:../conf/default:../client/spydermq-client.jar:../client/jnp-client.jar:../lib/ext/jms.jar"
$1 $2 $3 $4 $4 $5 $6 $7 $8 $9
1.1 spyderMQ/examples/MessageFormats.java
Index: MessageFormats.java
===================================================================
/*
* @(#)MessageFormats.java 1.4 00/08/09
*
* Copyright (c) 2000 Sun Microsystems, Inc. All Rights Reserved.
*
* Sun grants you ("Licensee") a non-exclusive, royalty free, license to use,
* modify and redistribute this software in source and binary code form,
* provided that i) this copyright notice and license appear on all copies of
* the software; and ii) Licensee does not utilize the software in a manner
* which is disparaging to Sun.
*
* This software is provided "AS IS," without a warranty of any kind. ALL
* EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING ANY
* IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR
* NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN AND ITS LICENSORS SHALL NOT BE
* LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING
* OR DISTRIBUTING THE SOFTWARE OR ITS DERIVATIVES. IN NO EVENT WILL SUN OR ITS
* LICENSORS BE LIABLE FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT,
* INDIRECT, SPECIAL, CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER
* CAUSED AND REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF
* OR INABILITY TO USE SOFTWARE, EVEN IF SUN HAS BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGES.
*
* This software is not designed or intended for use in on-line control of
* aircraft, air traffic, aircraft navigation or aircraft communications; or in
* the design, construction, operation or maintenance of any nuclear
* facility. Licensee represents and warrants that it will not use or
* redistribute the Software for such purposes.
*/
import javax.jms.*;
/**
* The MessageFormats class consists only of a main method, which creates and
* then reads messages in all supported JMS message formats: BytesMessage,
* TextMessage, MapMessage, StreamMessage, and ObjectMessage. It does not send
* the messages.
* <p>
* Before it can read a BytesMessage or StreamMessage that has not been sent,
* the program must call reset() to put the message body in read-only mode
* and reposition the stream.
*
* @author Kim Haase
* @version 1.4, 08/09/00
*/
public class MessageFormats {
/**
* Main method. Takes no arguments.
*/
public static void main(String[] args) {
QueueConnectionFactory queueConnectionFactory = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;
BytesMessage bytesMessage = null;
byte[] byteData = {-128, 127, -1, 0, 1, -64, 64};
int length = 0;
byte[] inByteData = new byte[7];
TextMessage textMessage = null;
String msgText = "This is a text message.";
MapMessage mapMessage = null;
StreamMessage streamMessage = null;
ObjectMessage objectMessage = null;
String object = "A String is an object.";
int exitResult = 0;
try {
queueConnectionFactory =
SampleUtilities.getQueueConnectionFactory();
queueConnection =
queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
try {
/*
* Create a BytesMessage, then write it from an array of
* bytes (signed 8-bit integers).
* Reset the message for reading, then read the bytes into a
* second array.
* A BytesMessage is an undifferentiated stream of bytes that can
* be read in various formats.
*/
bytesMessage = queueSession.createBytesMessage();
bytesMessage.writeBytes(byteData);
bytesMessage.reset();
length = bytesMessage.readBytes(inByteData);
System.out.println("Reading BytesMessage " + length
+ " bytes long:");
for (int i = 0; i < length; i++) {
System.out.print(" " + inByteData[i]);
}
System.out.println();
/*
* Create, write, and display the contents of a TextMessage.
* A TextMessage contains a String of any length.
*/
textMessage = queueSession.createTextMessage();
textMessage.setText(msgText);
System.out.println("Reading TextMessage:");
System.out.println(" " + textMessage.getText());
/*
* Create and write a MapMessage, then display its contents in
* a different order.
* A MapMessage contains a series of name/value pairs.
* The name is a string; the value can be of various types.
* The receiving program can read any or all of the values,
* in any order.
*/
mapMessage = queueSession.createMapMessage();
mapMessage.setString("Message type", "Map");
mapMessage.setInt("An Integer", 3456);
mapMessage.setDouble("A Double", 1.23456789);
System.out.println("Reading MapMessage in a different order"
+ " from the way it was generated:");
System.out.println(" Type: "
+ mapMessage.getString("Message type"));
System.out.println(" Double: "
+ mapMessage.getDouble("A Double"));
System.out.println(" Integer: "
+ mapMessage.getInt("An Integer"));
/*
* Create and write a StreamMessage.
* Reset the message for reading and display the values.
* A StreamMessage can also contain values of various types.
* They must be read in the same order in which they were
* written.
*/
streamMessage = queueSession.createStreamMessage();
streamMessage.writeString("Stream message");
streamMessage.writeDouble(123.456789e222);
streamMessage.writeInt(223344);
streamMessage.reset();
System.out.println("Reading StreamMessage in the order"
+ " in which it was generated:");
System.out.println(" String: "
+ streamMessage.readString());
System.out.println(" Double: "
+ streamMessage.readDouble());
System.out.println(" Integer: "
+ streamMessage.readInt());
/*
* Create an ObjectMessage from a String object, then
* display its contents.
* An ObjectMessage can contain any Java object. This example
* uses a String for the sake of simplicity. The program that
* reads the object casts it to the appropriate type.
*/
objectMessage = queueSession.createObjectMessage();
objectMessage.setObject(object);
System.out.println("Reading ObjectMessage:");
System.out.println(" " + (String) objectMessage.getObject());
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
} finally {
if (queueConnection != null) {
try {
queueConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
SampleUtilities.exit(exitResult);
}
}