User: hiram
Date: 01/03/01 23:04:51
Added: src/main/org/jbossmq/cluster/jms
ClusterConnectionMetaData.java
ClusterTemporaryTopic.java ClusterTopic.java
ClusterTopicConnection.java
ClusterTopicConnectionFactory.java
ClusterTopicPublisher.java ClusterTopicSession.java
ClusterTopicSubscriber.java
Log:
- Changed Source references of GPL to LGPL.
- The cluster work that I had been doing is now accesible via a Pub-Sub JMS api.
- Take a look at the Cluster* examples in the sample directory to test it out.
Revision Changes Path
1.1
jbossmq/src/main/org/jbossmq/cluster/jms/ClusterConnectionMetaData.java
Index: ClusterConnectionMetaData.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.cluster.jms;
import javax.jms.ConnectionMetaData;
import javax.jms.JMSException;
import java.util.Enumeration;
import java.util.Vector;
/**
* This class implements javax.jms.ConnectionMetaData
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class ClusterConnectionMetaData implements ConnectionMetaData {
public String getJMSVersion() throws JMSException {
return "1.0";
}
public int getJMSMajorVersion() throws JMSException {
return 1;
}
public int getJMSMinorVersion() throws JMSException {
return 0;
}
public String getJMSProviderName() throws JMSException {
return "JBossMQ-Cluster";
}
public String getProviderVersion() throws JMSException {
return "0.1";
}
public int getProviderMajorVersion() throws JMSException {
return 0;
}
public int getProviderMinorVersion() throws JMSException {
return 1;
}
public Enumeration getJMSXPropertyNames() throws JMSException {
Vector vector = new Vector();
vector.add("JMSXGroupID");
vector.add("JMSXGroupSeq");
return vector.elements();
}
}
1.1
jbossmq/src/main/org/jbossmq/cluster/jms/ClusterTemporaryTopic.java
Index: ClusterTemporaryTopic.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.cluster.jms;
import javax.jms.JMSException;
import javax.jms.TemporaryTopic;
import java.io.Serializable;
import org.jbossmq.cluster.transport.NodeId;
/**
* This class implements javax.jms.TemporaryTopic
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class ClusterTemporaryTopic implements Serializable, TemporaryTopic {
NodeId nodeId;
int topicId;
/**
* ClusterTemporaryTopic constructor comment.
*/
public ClusterTemporaryTopic(NodeId n, int id) throws javax.jms.JMSException {
if (n == null)
throw new javax.jms.JMSException("NodeId was null");
nodeId = n;
topicId = id;
}
/**
* delete method comment.
*/
public void delete() throws javax.jms.JMSException {
}
public boolean equals(Object obj) {
try {
ClusterTemporaryTopic t = (ClusterTemporaryTopic) obj;
return t.topicId == topicId && t.nodeId.equals(nodeId);
} catch (Exception e) {
return false;
}
}
public String getTopicName() throws JMSException {
return toString();
}
public int hashCode() {
return topicId;
}
public String toString() {
return "TemporaryTopic:" + nodeId + ":" + topicId;
}
}
1.1 jbossmq/src/main/org/jbossmq/cluster/jms/ClusterTopic.java
Index: ClusterTopic.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.cluster.jms;
import javax.jms.Topic;
import javax.jms.JMSException;
import java.io.Serializable;
/**
* This class implements javax.jms.Topic
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class ClusterTopic implements Serializable, Topic {
String name;
short topicId;
public ClusterTopic(short id) {
topicId = id;
}
public String getTopicName() throws JMSException {
return toString();
}
public String toString() {
return "Topic:" + topicId;
}
public boolean equals(Object obj) {
try {
ClusterTopic t = (ClusterTopic) obj;
return t.topicId == topicId;
} catch (Exception e) {
return false;
}
}
public int hashCode() {
return topicId;
}
}
1.1
jbossmq/src/main/org/jbossmq/cluster/jms/ClusterTopicConnection.java
Index: ClusterTopicConnection.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.cluster.jms;
import javax.jms.JMSException;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.Topic;
import javax.jms.TemporaryTopic;
import javax.jms.ConnectionConsumer;
import javax.jms.ServerSessionPool;
import javax.jms.ExceptionListener;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ByteArrayInputStream;
import java.io.ObjectOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.Externalizable;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Properties;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Enumeration;
import org.jbossmq.Log;
import org.jbossmq.SpyMessage;
import org.jbossmq.xml.XElement;
import org.jbossmq.cluster.transport.TransportListener;
import org.jbossmq.cluster.transport.Transport;
import org.jbossmq.cluster.transport.NodeId;
import org.jbossmq.cluster.transport.udp.UDPTransport;
/**
* This class implements javax.jms.TopicConnection
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class ClusterTopicConnection implements Serializable, TopicConnection,
TransportListener {
final static short TEMPORARY_TOPIC_ID = 0;
// The transport protocal we will be using
Transport transport = new UDPTransport();
//Is the connection closed ?
boolean closed;
//LinkedList of all created sessions by this connection
HashSet createdSessions = new HashSet();
//the exceptionListener
private ExceptionListener exceptionListener;
//Last message ID returned
private int lastMessageId;
//Last temporary topic id
private int lastTemporaryTopicId = 0;
//Is the connection stopped ?
public boolean modeStop;
// the configuration of the server.
private XElement serverXElement;
//Maps a to to a LinkedList of Subscribers
public HashMap topicSubscribers = new HashMap();
public ClusterTopicConnection(XElement config) throws JMSException {
serverXElement = config;
try {
Properties connectionProperties = new Properties();
connectionProperties.setProperty("TransportMode", "Multicast");
connectionProperties.setProperty("Group", "225.1.1.1");
connectionProperties.setProperty("Port", "1000");
transport.setProperties(connectionProperties);
transport.setTransportListener(this);
transport.start();
} catch (Exception e) {
failureHandler(e, "Transport protocol could not be
initialized");
}
}
public TopicSession createTopicSession(boolean transacted, int
acknowledgeMode) throws JMSException {
if (closed)
throw new IllegalStateException("The connection is closed");
TopicSession session = new ClusterTopicSession(this);
//add the new session to the createdSessions list
synchronized (createdSessions) {
createdSessions.add(session);
}
return session;
}
public ConnectionConsumer createConnectionConsumer(
Topic topic,
String messageSelector,
ServerSessionPool sessionPool,
int maxMessages)
throws JMSException {
throw new javax.jms.JMSException("This feature is not implemented");
}
TemporaryTopic getTemporaryTopic() throws JMSException {
if (closed)
throw new IllegalStateException("The connection is closed");
try {
return new ClusterTemporaryTopic(transport.getLocalNodeId(),
lastTemporaryTopicId++);
} catch (Exception e) {
failureHandler(e, "Cannot create a temporary topic !");
return null;
}
}
Topic createTopic(String name) throws JMSException {
if (closed)
throw new IllegalStateException("The connection is closed");
try {
Enumeration topics = serverXElement.getElementsNamed("Topic");
while (topics.hasMoreElements()) {
XElement topic = (XElement) topics.nextElement();
if (topic.getField("Name").equals(name)) {
short id =
Short.parseShort(topic.getField("TopicId"));
return new ClusterTopic(id);
}
}
throw new JMSException("This destination does not exist !");
} catch (Exception e) {
throw new JMSException("This destination does not exist !");
}
}
public ConnectionConsumer createDurableConnectionConsumer(
Topic topic,
String subscriptionName,
String messageSelector,
ServerSessionPool sessionPool,
int maxMessages)
throws JMSException {
throw new javax.jms.JMSException("This feature is not implemented");
}
//A new Consumer has been created for the Destination dest
void addConsumer(ClusterTopicSubscriber consumer) throws JMSException {
Log.log("[" + this +"]: addConsumer(dest=" + consumer.topic + ")");
if (closed)
throw new IllegalStateException("The connection is closed");
// Write Lock
synchronized (topicSubscribers) {
// We do not modify the current topicSubscribers list
// so that iterators on the topicSubscribers do not fail.
LinkedList ll = (LinkedList)
topicSubscribers.get(consumer.topic);
if (ll == null) {
ll = new LinkedList();
} else {
ll = (LinkedList) ll.clone();
}
ll.add(consumer);
HashMap t = (HashMap) topicSubscribers.clone();
t.put(consumer.topic, ll);
topicSubscribers = t;
}
}
/**
* close method comment.
*/
public void close() throws javax.jms.JMSException {
if (closed)
return;
Log.log("Closing sessions, ClientID=" + getClientID());
//notify his sessions
synchronized (createdSessions) {
Object[] vect = createdSessions.toArray();
for (int i = 0; i < vect.length; i++) {
((ClusterTopicSession) vect[i]).close();
}
}
Log.log("Closed sessions");
Log.log("Disconnecting from the transport");
//Notify the JMSServer that I am closing
try {
transport.stop();
transport.close();
} catch (Exception e) {
failureHandler(e, "Cannot properly close the transport");
}
Log.log("Disconnected from the transport");
// Only set the closed flag after all the objects that depend
// on this connection have been closed.
closed = true;
}
public void failureHandler(Exception e, String reason) throws JMSException {
e.printStackTrace();
JMSException excep = new JMSException(reason);
excep.setLinkedException(e);
if (exceptionListener != null) {
synchronized (exceptionListener) {
exceptionListener.onException(excep);
}
} else {
Log.error(e);
}
throw excep;
}
/**
* getClientID method comment.
*/
public java.lang.String getClientID() {
return transport.getLocalNodeId().toString();
}
/**
* getExceptionListener method comment.
*/
public javax.jms.ExceptionListener getExceptionListener() throws
javax.jms.JMSException {
if (closed)
throw new IllegalStateException("The connection is closed");
return exceptionListener;
}
/**
* getMetaData method comment.
*/
public javax.jms.ConnectionMetaData getMetaData() throws
javax.jms.JMSException {
if (closed)
throw new IllegalStateException("The connection is closed");
return new ClusterConnectionMetaData();
}
public String getNextMessageId() {
return "ID:" + getClientID() + (++lastMessageId);
}
/**
* The isListeningOn method is used by the Transport
* to see if we are interested in messages of the given topic
*/
public boolean isListeningOn(short topic) {
if (modeStop)
return false;
if (topic == TEMPORARY_TOPIC_ID)
return true;
ClusterTopic t = new ClusterTopic(topic);
LinkedList ll = (LinkedList) topicSubscribers.get(t);
return (ll != null);
}
/**
* the messageArrivedEvent method is used by the Transport
* to deliver messages to us.
*/
public void messageArrivedEvent(short channelId, NodeId senderId, byte[]
message)
throws java.lang.InterruptedException {
if (modeStop)
return;
SpyMessage spyMessage = null;
try {
ByteArrayInputStream bais = new ByteArrayInputStream(message);
ObjectInputStream is = new ObjectInputStream(bais);
spyMessage = (SpyMessage) is.readObject();
} catch (Exception e) {
try {
failureHandler(e, "Invalid message received.");
} catch (JMSException ignore) {
}
}
onMessage(senderId, spyMessage);
}
/**
* Used internally to send a message to all the local listeners.
*/
private void onMessage(NodeId sender, SpyMessage message) throws
InterruptedException {
if (modeStop)
return;
LinkedList ll = (LinkedList)
topicSubscribers.get(message.getJMSDestination());
if (ll == null) {
// No listeners...
Log.log("No subscriptions for message: " + message);
return;
}
Log.log("Dispatching message: " + message);
Iterator i = ll.iterator();
while (i.hasNext()) {
ClusterTopicSubscriber c = (ClusterTopicSubscriber) i.next();
c.onMessage(sender, message);
}
}
/**
* Send a message to the node on the given channel.
*/
public void send(SpyMessage message) throws JMSException {
try {
NodeId dest = null;
;
short topic;
Object o = message.getJMSDestination();
if (o instanceof ClusterTemporaryTopic) {
topic = TEMPORARY_TOPIC_ID;
dest = ((ClusterTemporaryTopic) o).nodeId;
if (dest.equals(transport.getLocalNodeId())) {
onMessage(transport.getLocalNodeId(), message);
return;
}
} else {
topic = ((ClusterTopic) o).topicId;
onMessage(transport.getLocalNodeId(), message);
}
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream os = new ObjectOutputStream(bos);
os.writeObject(message);
os.close();
byte data[] = bos.toByteArray();
if (dest == null) {
transport.send(topic, data, false, true);
} else {
transport.send(dest, topic, data, false, true);
}
} catch (Exception e) {
failureHandler(e, "Could not send the message to the cluster");
}
}
//Called by a session when it is closing
void sessionClosing(ClusterTopicSession who) {
synchronized (createdSessions) {
createdSessions.remove(who);
}
}
/**
* setClientID method comment.
*/
public void setClientID(java.lang.String arg1) throws javax.jms.JMSException {
throw new javax.jms.JMSException("The client Id cannot be changed");
}
/**
* setExceptionListener method comment.
*/
public void setExceptionListener(javax.jms.ExceptionListener listener) throws
javax.jms.JMSException {
if (closed)
throw new IllegalStateException("The connection is closed");
exceptionListener = listener;
}
/**
* start method comment.
*/
public void start() throws javax.jms.JMSException {
if (closed)
throw new IllegalStateException("The connection is closed");
if (!modeStop)
return;
modeStop = false;
}
/**
* stop method comment.
*/
public void stop() throws javax.jms.JMSException {
if (closed)
throw new IllegalStateException("The connection is closed");
if (modeStop)
return;
modeStop = true;
}
}
1.1
jbossmq/src/main/org/jbossmq/cluster/jms/ClusterTopicConnectionFactory.java
Index: ClusterTopicConnectionFactory.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.cluster.jms;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.JMSException;
import java.io.Serializable;
import java.util.Properties;
import org.jbossmq.Log;
import org.jbossmq.xml.XElement;
/**
* This class implements javax.jms.TopicConnectionFactory
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class ClusterTopicConnectionFactory implements java.io.Serializable,
javax.jms.TopicConnectionFactory {
private XElement serverXElement;
public ClusterTopicConnectionFactory() throws Exception {
//Load the property file
java.io.InputStream in =
getClass().getClassLoader().getResource("jbossmq-cluster.xml").openStream();
serverXElement = XElement.createFrom(in);
in.close();
// Make sure that we loaded the right type of xml file
if (!serverXElement.getName().equals("Server")) {
throw new JMSException("The jbossmq-cluster.xml file is
invalid.");
}
}
public TopicConnection createTopicConnection() throws JMSException {
try {
return new ClusterTopicConnection(serverXElement);
} catch (JMSException e) {
throw e;
} catch (Exception e) {
failureHandler(e, "ClusterTopicConnection creation has failed
!");
return null;
}
}
public TopicConnection createTopicConnection(String userName, String password)
throws JMSException {
return createTopicConnection();
}
private void failureHandler(Exception e, String reason) throws JMSException {
Log.error(e);
throw new JMSException(reason);
}
}
1.1
jbossmq/src/main/org/jbossmq/cluster/jms/ClusterTopicPublisher.java
Index: ClusterTopicPublisher.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.cluster.jms;
import javax.jms.TopicPublisher;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Topic;
import javax.jms.InvalidDestinationException;
import javax.jms.DeliveryMode;
import org.jbossmq.SpyMessage;
/**
* This class implements javax.jms.TopicPublisher
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class ClusterTopicPublisher implements TopicPublisher {
protected int defaultDeliveryMode = SpyMessage.DEFAULT_DELIVERY_MODE;
protected int defaultPriority = SpyMessage.DEFAULT_PRIORITY;
protected long defaultTTL = SpyMessage.DEFAULT_TIME_TO_LIVE;
private boolean disableMessageID = false;
private boolean disableTS = false;
//The session to which this publisher is linked
private ClusterTopicSession session;
//The topic of this publisher
private Topic myTopic = null;
ClusterTopicPublisher(ClusterTopicSession s, Topic t) {
session = s;
myTopic = t;
}
public Topic getTopic() throws JMSException {
return myTopic;
}
public void publish(Message message) throws JMSException {
if (myTopic == null)
throw new InvalidDestinationException("I do not have a default
Destination !");
publish(myTopic, message, defaultDeliveryMode, defaultPriority,
defaultTTL);
}
public void publish(Topic topic, Message message) throws JMSException {
publish(topic, message, defaultDeliveryMode, defaultPriority,
defaultTTL);
}
public void publish(Message message, int deliveryMode, int priority, long
timeToLive) throws JMSException {
if (myTopic == null)
throw new InvalidDestinationException("Destination is null !");
publish(myTopic, message, deliveryMode, priority, timeToLive);
}
public void publish(Topic topic, Message message, int deliveryMode, int
priority, long timeToLive)
throws JMSException {
//Set the header fields
message.setJMSDestination(topic);
message.setJMSDeliveryMode(deliveryMode);
long ts = System.currentTimeMillis();
message.setJMSTimestamp(ts);
if (timeToLive == 0) {
message.setJMSExpiration(0);
} else {
message.setJMSExpiration(timeToLive + ts);
}
message.setJMSPriority(priority);
message.setJMSMessageID(session.connection.getNextMessageId());
//Clone the message so we can make the outbound message read only
SpyMessage clone = ((SpyMessage) message).myClone();
clone.setReadOnlyMode();
//Send the message
session.connection.send(clone);
}
public void close() throws JMSException {
//Is there anything useful to do ?
//Let the GC do its work !
}
public int getDeliveryMode() throws JMSException {
return defaultDeliveryMode;
}
public boolean getDisableMessageID() throws JMSException {
return disableMessageID;
}
public boolean getDisableMessageTimestamp() throws JMSException {
return disableTS;
}
public int getPriority() throws JMSException {
return defaultPriority;
}
public long getTimeToLive() throws JMSException {
return defaultTTL;
}
public void setDeliveryMode(int deli) throws JMSException {
if (deli == Message.DEFAULT_DELIVERY_MODE)
defaultDeliveryMode = DeliveryMode.NON_PERSISTENT;
else if (deli != DeliveryMode.NON_PERSISTENT && deli !=
DeliveryMode.PERSISTENT)
throw new JMSException("Bad DeliveryMode value");
else
defaultDeliveryMode = deli;
}
public void setDisableMessageID(boolean value) throws JMSException {
disableMessageID = value;
}
public void setDisableMessageTimestamp(boolean value) throws JMSException {
disableTS = value;
}
public void setPriority(int pri) throws JMSException {
if (pri == Message.DEFAULT_PRIORITY)
defaultPriority = 4;
else if (pri < 0 || pri > 9)
throw new JMSException("Bad priority value");
else
defaultPriority = pri;
}
public void setTimeToLive(int timeToLive) throws JMSException {
if (timeToLive == Message.DEFAULT_TIME_TO_LIVE)
timeToLive = 0;
else if (timeToLive < 0)
throw new JMSException("Bad TimeToLive value");
else
defaultTTL = timeToLive;
}
public void setTimeToLive(long timeToLive) throws JMSException {
if (timeToLive == Message.DEFAULT_TIME_TO_LIVE)
timeToLive = 0;
else if (timeToLive < 0)
throw new JMSException("Bad TimeToLive value");
else
defaultTTL = timeToLive;
}
}
1.1
jbossmq/src/main/org/jbossmq/cluster/jms/ClusterTopicSession.java
Index: ClusterTopicSession.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.cluster.jms;
import javax.jms.TopicSession;
import javax.jms.Topic;
import javax.jms.Destination;
import javax.jms.TopicSubscriber;
import javax.jms.JMSException;
import javax.jms.TopicPublisher;
import javax.jms.TemporaryTopic;
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.StreamMessage;
import javax.jms.BytesMessage;
import javax.jms.TextMessage;
import javax.jms.MapMessage;
import javax.jms.ObjectMessage;
import java.util.Collection;
import java.util.HashSet;
import java.util.HashMap;
import java.util.Iterator;
import java.io.Serializable;
import java.io.FileOutputStream;
import java.io.ObjectOutputStream;
import java.io.IOException;
import org.jbossmq.Log;
import org.jbossmq.selectors.Selector;
import org.jbossmq.SpyObjectMessage;
import org.jbossmq.SpyStreamMessage;
import org.jbossmq.SpyTextMessage;
import org.jbossmq.SpyMessage;
import org.jbossmq.SpyBytesMessage;
import org.jbossmq.SpyMapMessage;
/**
* This class implements javax.jms.TopicSession
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class ClusterTopicSession implements TopicSession
{
//Is the session closed ?
boolean closed;
ClusterTopicConnection connection;
//MessageConsumers created by this session
protected HashSet consumers = new HashSet();
ClusterTopicSession(ClusterTopicConnection myConnection) {
connection = myConnection;
}
public Topic createTopic(String topicName) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
return connection.createTopic(topicName);
}
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
return createSubscriber(topic,null,false);
}
public TopicSubscriber createSubscriber(Topic topic, String messageSelector,
boolean noLocal) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
ClusterTopicSubscriber sub=new
ClusterTopicSubscriber(this,topic,noLocal, messageSelector);
synchronized (consumers) {
HashSet newMap=(HashSet)consumers.clone();
newMap.add(sub);
consumers=newMap;
}
connection.addConsumer(sub);
return sub;
}
public TopicSubscriber createDurableSubscriber(Topic topic, String name)
throws JMSException
{
throw new javax.jms.JMSException("This feature is not implemented");
}
public TopicSubscriber createDurableSubscriber(Topic topic, String name,
String messageSelector, boolean noLocal) throws JMSException
{
throw new javax.jms.JMSException("This feature is not implemented");
}
public TopicPublisher createPublisher(Topic topic) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
return new ClusterTopicPublisher(this,topic);
}
public TemporaryTopic createTemporaryTopic() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
return connection.getTemporaryTopic();
}
public void unsubscribe(String name) throws JMSException
{
throw new javax.jms.JMSException("This feature is not implemented");
}
/**
* close method comment.
*/
synchronized public void close() throws javax.jms.JMSException {
if (closed)
return;
closed = true;
Iterator i =consumers.iterator();
while (i.hasNext()) {
ClusterTopicSubscriber s = (ClusterTopicSubscriber)i.next();
s.close();
}
connection.sessionClosing(this);
}
/**
* commit method comment.
*/
public void commit() throws javax.jms.JMSException {
throw new javax.jms.JMSException("This feature is not implemented");
}
public BytesMessage createBytesMessage() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
SpyBytesMessage message = new SpyBytesMessage();
message.producerClientId = connection.getClientID();
return message;
}
public MapMessage createMapMessage() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
SpyMapMessage message = new SpyMapMessage();
message.producerClientId = connection.getClientID();
return message;
}
public Message createMessage() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
SpyMessage message = new SpyMessage();
message.producerClientId = connection.getClientID();
return message;
}
public ObjectMessage createObjectMessage() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
SpyObjectMessage message = new SpyObjectMessage();
message.producerClientId = connection.getClientID();
return message;
}
public ObjectMessage createObjectMessage(Serializable object) throws
JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
SpyObjectMessage message=new SpyObjectMessage();
message.setObject(object);
message.producerClientId = connection.getClientID();
return message;
}
public StreamMessage createStreamMessage() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
SpyStreamMessage message = new SpyStreamMessage();
message.producerClientId = connection.getClientID();
return message;
}
public TextMessage createTextMessage() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
SpyTextMessage message = new SpyTextMessage();
message.producerClientId = connection.getClientID();
return message;
}
public TextMessage createTextMessage(String string) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
SpyTextMessage message=new SpyTextMessage();
message.setText(string);
message.producerClientId = connection.getClientID();
return message;
}
/**
* getMessageListener method comment.
*/
public javax.jms.MessageListener getMessageListener() throws
javax.jms.JMSException {
throw new javax.jms.JMSException("This feature is not implemented");
}
/**
* getTransacted method comment.
*/
public boolean getTransacted() throws javax.jms.JMSException {
throw new javax.jms.JMSException("This feature is not implemented");
}
/**
* recover method comment.
*/
public void recover() throws javax.jms.JMSException {
throw new javax.jms.JMSException("This feature is not implemented");
}
/**
* rollback method comment.
*/
public void rollback() throws javax.jms.JMSException {
throw new javax.jms.JMSException("This feature is not implemented");
}
/**
* run method comment.
*/
public void run() {
// This is an ASF requirment which we will not be implementing.
}
/**
* setMessageListener method comment.
*/
public void setMessageListener(javax.jms.MessageListener arg1) throws
javax.jms.JMSException {
throw new javax.jms.JMSException("This feature is not implemented");
}
}
1.1
jbossmq/src/main/org/jbossmq/cluster/jms/ClusterTopicSubscriber.java
Index: ClusterTopicSubscriber.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.cluster.jms;
import javax.jms.TopicSubscriber;
import javax.jms.JMSException;
import javax.jms.Topic;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.jbossmq.selectors.Selector;
import org.jbossmq.cluster.transport.NodeId;
import org.jbossmq.SpyMessage;
import EDU.oswego.cs.dl.util.concurrent.BoundedPriorityQueue;
/**
* This class implements javax.jms.TopicSubscriber
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class ClusterTopicSubscriber implements TopicSubscriber {
// The topic I registered
Topic topic;
//Am I closed ?
boolean closed;
//Do i have a thread in one of my receive methods?
boolean isReceiving;
//My message listener (null if none)
MessageListener messageListener;
BoundedPriorityQueue messageQueue = new BoundedPriorityQueue(50);
// Should I not accept local messages.
boolean noLocal;
// The selector to apply to the messages
String selector;
// The session this was created with
ClusterTopicSession session;
ClusterTopicSubscriber(ClusterTopicSession session, Topic topic, boolean
noLocal, String selector) {
this.session = session;
this.topic = topic;
this.selector = selector;
this.noLocal = noLocal;
}
synchronized public Topic getTopic() throws JMSException {
if (closed)
throw new IllegalStateException("The MessageConsumer is
closed");
return topic;
}
synchronized public boolean getNoLocal() throws JMSException {
if (closed)
throw new IllegalStateException("The MessageConsumer is
closed");
return noLocal;
}
/**
* close method comment.
*/
synchronized public void close() throws javax.jms.JMSException {
if (closed)
return;
if (messageListener == null && isReceiving) {
//A consumer could be waiting in receive()
try {
messageQueue.put(this);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}
closed = true;
}
/**
* getMessageListener method comment.
*/
synchronized public javax.jms.MessageListener getMessageListener() throws
javax.jms.JMSException {
if (closed)
throw new IllegalStateException("The MessageConsumer is
closed");
return messageListener;
}
/**
* getMessageSelector method comment.
*/
synchronized public java.lang.String getMessageSelector() throws
javax.jms.JMSException {
if (closed)
throw new IllegalStateException("The MessageConsumer is
closed");
return selector;
}
/**
* Used internally to dispatch a message to the client
*/
synchronized public void onMessage(NodeId sender, SpyMessage message) throws
InterruptedException {
if (closed)
return;
if (session.connection.modeStop)
return;
if (noLocal &&
sender.equals(session.connection.transport.getLocalNodeId()))
return;
if (messageListener == null) {
// If your receive messageQueue is full... You loose it buddy!
messageQueue.offer(message, 0);
} else {
messageListener.onMessage(message);
}
}
/**
* receive method comment.
*/
public javax.jms.Message receive() throws javax.jms.JMSException {
// close could be updated by a different thread
synchronized (this) {
if (closed)
throw new IllegalStateException("The MessageConsumer
is closed");
if (messageListener != null)
throw new JMSException("A message listener is already
registered");
isReceiving = true;
}
try {
while (session.connection.modeStop) {
if (closed)
return null;
// Wait for the connection to be started.
Thread.sleep(500);
}
Object o = messageQueue.take();
if (o == this) {
// this is a signal that we have closed
return null;
}
return (Message) o;
} catch (InterruptedException e) {
session.connection.failureHandler(e, "Receive interrupted");
} finally {
synchronized (this) {
isReceiving = false;
}
}
return null;
}
/**
* receive method comment.
*/
public javax.jms.Message receive(long arg1) throws javax.jms.JMSException {
throw new javax.jms.JMSException("This feature is not implemented");
}
/**
* receiveNoWait method comment.
*/
public javax.jms.Message receiveNoWait() throws javax.jms.JMSException {
throw new javax.jms.JMSException("This feature is not implemented");
}
/**
* setMessageListener method comment.
*/
synchronized public void setMessageListener(javax.jms.MessageListener
listener) throws javax.jms.JMSException {
if (closed)
throw new IllegalStateException("The MessageConsumer is
closed");
if (isReceiving)
throw new JMSException("This MessageConsumer is waiting in
receive() !");
messageListener = listener;
}
}