User: hiram   
  Date: 01/03/01 23:04:51

  Added:       src/main/org/jbossmq/cluster/jms
                        ClusterConnectionMetaData.java
                        ClusterTemporaryTopic.java ClusterTopic.java
                        ClusterTopicConnection.java
                        ClusterTopicConnectionFactory.java
                        ClusterTopicPublisher.java ClusterTopicSession.java
                        ClusterTopicSubscriber.java
  Log:
  - Changed Source references of GPL to LGPL.
  - The cluster work that I had been doing is now accesible via a Pub-Sub JMS api.
  - Take a look at the Cluster* examples in the sample directory to test it out.
  
  Revision  Changes    Path
  1.1                  
jbossmq/src/main/org/jbossmq/cluster/jms/ClusterConnectionMetaData.java
  
  Index: ClusterConnectionMetaData.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.cluster.jms;
  
  import javax.jms.ConnectionMetaData;
  import javax.jms.JMSException;
  
  import java.util.Enumeration;
  import java.util.Vector;
  
  /**
   *    This class implements javax.jms.ConnectionMetaData
   *      
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class ClusterConnectionMetaData implements ConnectionMetaData {
  
        public String getJMSVersion() throws JMSException {
                return "1.0";
        }
  
        public int getJMSMajorVersion() throws JMSException {
                return 1;
        }
  
        public int getJMSMinorVersion() throws JMSException {
                return 0;
        }
  
        public String getJMSProviderName() throws JMSException {
                return "JBossMQ-Cluster";
        }
  
        public String getProviderVersion() throws JMSException {
                return "0.1";
        }
  
        public int getProviderMajorVersion() throws JMSException {
                return 0;
        }
  
        public int getProviderMinorVersion() throws JMSException {
                return 1;
        }
  
        public Enumeration getJMSXPropertyNames() throws JMSException {
                Vector vector = new Vector();
                vector.add("JMSXGroupID");
                vector.add("JMSXGroupSeq");
                return vector.elements();
        }
  
  }
  
  
  1.1                  
jbossmq/src/main/org/jbossmq/cluster/jms/ClusterTemporaryTopic.java
  
  Index: ClusterTemporaryTopic.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.cluster.jms;
  
  import javax.jms.JMSException;
  import javax.jms.TemporaryTopic;
  
  import java.io.Serializable;
  
  import org.jbossmq.cluster.transport.NodeId;
  
  /**
   *    This class implements javax.jms.TemporaryTopic
   *      
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class ClusterTemporaryTopic implements Serializable, TemporaryTopic {
        NodeId nodeId;
        int topicId;
      
        /**
         * ClusterTemporaryTopic constructor comment.
         */
        public ClusterTemporaryTopic(NodeId n, int id) throws javax.jms.JMSException {
                if (n == null)
                        throw new javax.jms.JMSException("NodeId was null");
                nodeId = n;
                topicId = id;
        }
      
        /**
         * delete method comment.
         */
        public void delete() throws javax.jms.JMSException {
        }
      
        public boolean equals(Object obj) {
                try {
                        ClusterTemporaryTopic t = (ClusterTemporaryTopic) obj;
                        return t.topicId == topicId && t.nodeId.equals(nodeId);
                } catch (Exception e) {
                        return false;
                }
        }
      
        public String getTopicName() throws JMSException {
                return toString();
        }
      
        public int hashCode() {
                return topicId;
        }
      
        public String toString() {
                return "TemporaryTopic:" + nodeId + ":" + topicId;
        }
  }
  
  
  1.1                  jbossmq/src/main/org/jbossmq/cluster/jms/ClusterTopic.java
  
  Index: ClusterTopic.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.cluster.jms;
  
  import javax.jms.Topic;
  import javax.jms.JMSException;
  
  import java.io.Serializable;
  
  /**
   *    This class implements javax.jms.Topic
   *      
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class ClusterTopic implements Serializable, Topic {
  
        String name;
        short topicId;
  
        public ClusterTopic(short id) {
                topicId = id;
        }
  
        public String getTopicName() throws JMSException {
                return toString();
        }
  
        public String toString() {
                return "Topic:" + topicId;
        }
  
        public boolean equals(Object obj) {
                try {
                        ClusterTopic t = (ClusterTopic) obj;
                        return t.topicId == topicId;
                } catch (Exception e) {
                        return false;
                }
        }
  
        public int hashCode() {
                return topicId;
        }
  }
  
  
  1.1                  
jbossmq/src/main/org/jbossmq/cluster/jms/ClusterTopicConnection.java
  
  Index: ClusterTopicConnection.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.cluster.jms;
  
  import javax.jms.JMSException;
  import javax.jms.TopicConnection;
  import javax.jms.TopicSession;
  import javax.jms.Topic;
  import javax.jms.TemporaryTopic;
  import javax.jms.ConnectionConsumer;
  import javax.jms.ServerSessionPool;
  import javax.jms.ExceptionListener;
  
  import java.io.IOException;
  import java.io.InputStream;
  import java.io.ObjectInputStream;
  import java.io.ByteArrayInputStream;
  import java.io.ObjectOutputStream;
  import java.io.ByteArrayOutputStream;
  import java.io.Externalizable;
  import java.io.Serializable;
  import java.util.HashSet;
  import java.util.Properties;
  import java.util.LinkedList;
  import java.util.HashMap;
  import java.util.Iterator;
  import java.util.Enumeration;
  
  import org.jbossmq.Log;
  import org.jbossmq.SpyMessage;
  import org.jbossmq.xml.XElement;
  import org.jbossmq.cluster.transport.TransportListener;
  import org.jbossmq.cluster.transport.Transport;
  import org.jbossmq.cluster.transport.NodeId;
  import org.jbossmq.cluster.transport.udp.UDPTransport;
  
  /**
   *    This class implements javax.jms.TopicConnection
   *      
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class ClusterTopicConnection implements Serializable, TopicConnection, 
TransportListener {
  
        final static short TEMPORARY_TOPIC_ID = 0;
  
        // The transport protocal we will be using
        Transport transport = new UDPTransport();
        //Is the connection closed ?
        boolean closed;
        //LinkedList of all created sessions by this connection 
        HashSet createdSessions = new HashSet();
        //the exceptionListener
        private ExceptionListener exceptionListener;
        //Last message ID returned
        private int lastMessageId;
        //Last temporary topic id
        private int lastTemporaryTopicId = 0;
        //Is the connection stopped ?
        public boolean modeStop;
        // the configuration of the server.
        private XElement serverXElement;
        //Maps a to to a LinkedList of Subscribers
        public HashMap topicSubscribers = new HashMap();
  
        public ClusterTopicConnection(XElement config) throws JMSException {
                serverXElement = config;
  
                try {
  
                        Properties connectionProperties = new Properties();
                        connectionProperties.setProperty("TransportMode", "Multicast");
                        connectionProperties.setProperty("Group", "225.1.1.1");
                        connectionProperties.setProperty("Port", "1000");
                        transport.setProperties(connectionProperties);
                        transport.setTransportListener(this);
                        transport.start();
  
                } catch (Exception e) {
                        failureHandler(e, "Transport protocol could not be 
initialized");
                }
  
        }
  
        public TopicSession createTopicSession(boolean transacted, int 
acknowledgeMode) throws JMSException {
                if (closed)
                        throw new IllegalStateException("The connection is closed");
  
                TopicSession session = new ClusterTopicSession(this);
  
                //add the new session to the createdSessions list 
                synchronized (createdSessions) {
                        createdSessions.add(session);
                }
  
                return session;
        }
  
        public ConnectionConsumer createConnectionConsumer(
                Topic topic,
                String messageSelector,
                ServerSessionPool sessionPool,
                int maxMessages)
                throws JMSException {
                throw new javax.jms.JMSException("This feature is not implemented");
        }
  
        TemporaryTopic getTemporaryTopic() throws JMSException {
                if (closed)
                        throw new IllegalStateException("The connection is closed");
  
                try {
                        return new ClusterTemporaryTopic(transport.getLocalNodeId(), 
lastTemporaryTopicId++);
                } catch (Exception e) {
                        failureHandler(e, "Cannot create a temporary topic !");
                        return null;
                }
        }
  
        Topic createTopic(String name) throws JMSException {
                if (closed)
                        throw new IllegalStateException("The connection is closed");
                try {
  
                        Enumeration topics = serverXElement.getElementsNamed("Topic");
                        while (topics.hasMoreElements()) {
                                XElement topic = (XElement) topics.nextElement();
                                if (topic.getField("Name").equals(name)) {
                                        short id = 
Short.parseShort(topic.getField("TopicId"));
                                        return new ClusterTopic(id);
                                }
                        }
                        throw new JMSException("This destination does not exist !");
  
                } catch (Exception e) {
                        throw new JMSException("This destination does not exist !");
                }
        }
  
        public ConnectionConsumer createDurableConnectionConsumer(
                Topic topic,
                String subscriptionName,
                String messageSelector,
                ServerSessionPool sessionPool,
                int maxMessages)
                throws JMSException {
                throw new javax.jms.JMSException("This feature is not implemented");
        }
  
        //A new Consumer has been created for the Destination dest
        void addConsumer(ClusterTopicSubscriber consumer) throws JMSException {
                Log.log("[" + this +"]: addConsumer(dest=" + consumer.topic + ")");
  
                if (closed)
                        throw new IllegalStateException("The connection is closed");
  
                // Write Lock
                synchronized (topicSubscribers) {
  
                        // We do not modify the current topicSubscribers list
                        // so that iterators on the topicSubscribers do not fail.
                        LinkedList ll = (LinkedList) 
topicSubscribers.get(consumer.topic);
                        if (ll == null) {
                                ll = new LinkedList();
                        } else {
                                ll = (LinkedList) ll.clone();
                        }
                        ll.add(consumer);
  
                        HashMap t = (HashMap) topicSubscribers.clone();
                        t.put(consumer.topic, ll);
                        topicSubscribers = t;
                }
        }
  
        /**
         * close method comment.
         */
        public void close() throws javax.jms.JMSException {
                if (closed)
                        return;
  
                Log.log("Closing sessions, ClientID=" + getClientID());
                //notify his sessions
                synchronized (createdSessions) {
                        Object[] vect = createdSessions.toArray();
                        for (int i = 0; i < vect.length; i++) {
                                ((ClusterTopicSession) vect[i]).close();
                        }
                }
                Log.log("Closed sessions");
  
                Log.log("Disconnecting from the transport");
                //Notify the JMSServer that I am closing
                try {
                        transport.stop();
                        transport.close();
                } catch (Exception e) {
                        failureHandler(e, "Cannot properly close the transport");
                }
  
                Log.log("Disconnected from the transport");
  
                // Only set the closed flag after all the objects that depend 
                // on this connection have been closed.
                closed = true;
        }
  
        public void failureHandler(Exception e, String reason) throws JMSException {
                e.printStackTrace();
  
                JMSException excep = new JMSException(reason);
                excep.setLinkedException(e);
  
                if (exceptionListener != null) {
                        synchronized (exceptionListener) {
                                exceptionListener.onException(excep);
                        }
                } else {
                        Log.error(e);
                }
  
                throw excep;
        }
  
        /**
         * getClientID method comment.
         */
        public java.lang.String getClientID() {
                return transport.getLocalNodeId().toString();
        }
  
        /**
         * getExceptionListener method comment.
         */
        public javax.jms.ExceptionListener getExceptionListener() throws 
javax.jms.JMSException {
                if (closed)
                        throw new IllegalStateException("The connection is closed");
  
                return exceptionListener;
        }
  
        /**
         * getMetaData method comment.
         */
        public javax.jms.ConnectionMetaData getMetaData() throws 
javax.jms.JMSException {
                if (closed)
                        throw new IllegalStateException("The connection is closed");
  
                return new ClusterConnectionMetaData();
        }
  
        public String getNextMessageId() {
                return "ID:" + getClientID() + (++lastMessageId);
        }
  
        /**
         * The isListeningOn method is used by the Transport
         * to see if we are interested in messages of the given topic
         */
        public boolean isListeningOn(short topic) {
                if (modeStop)
                        return false;
  
                if (topic == TEMPORARY_TOPIC_ID)
                        return true;
  
                ClusterTopic t = new ClusterTopic(topic);
                LinkedList ll = (LinkedList) topicSubscribers.get(t);
                return (ll != null);
  
        }
  
        /**
         * the messageArrivedEvent method is used by the Transport
         * to deliver messages to us.
         */
        public void messageArrivedEvent(short channelId, NodeId senderId, byte[] 
message)
                throws java.lang.InterruptedException {
  
                if (modeStop)
                        return;
  
                SpyMessage spyMessage = null;
                try {
  
                        ByteArrayInputStream bais = new ByteArrayInputStream(message);
                        ObjectInputStream is = new ObjectInputStream(bais);
                        spyMessage = (SpyMessage) is.readObject();
  
                } catch (Exception e) {
                        try {
                                failureHandler(e, "Invalid message received.");
                        } catch (JMSException ignore) {
                        }
                }
  
                onMessage(senderId, spyMessage);
        }
  
        /**
         * Used internally to send a message to all the local listeners.
         */
        private void onMessage(NodeId sender, SpyMessage message) throws 
InterruptedException {
  
                if (modeStop)
                        return;
  
                LinkedList ll = (LinkedList) 
topicSubscribers.get(message.getJMSDestination());
                if (ll == null) {
                        // No listeners...
                        Log.log("No subscriptions for message: " + message);
                        return;
                }
  
                Log.log("Dispatching message: " + message);
                Iterator i = ll.iterator();
                while (i.hasNext()) {
                        ClusterTopicSubscriber c = (ClusterTopicSubscriber) i.next();
                        c.onMessage(sender, message);
                }
  
        }
  
        /**
         * Send a message to the node on the given channel.
         */
        public void send(SpyMessage message) throws JMSException {
  
                try {
                        NodeId dest = null;
                        ;
                        short topic;
  
                        Object o = message.getJMSDestination();
                        if (o instanceof ClusterTemporaryTopic) {
  
                                topic = TEMPORARY_TOPIC_ID;
                                dest = ((ClusterTemporaryTopic) o).nodeId;
                                if (dest.equals(transport.getLocalNodeId())) {
                                        onMessage(transport.getLocalNodeId(), message);
                                        return;
                                }
  
                        } else {
  
                                topic = ((ClusterTopic) o).topicId;
                                onMessage(transport.getLocalNodeId(), message);
  
                        }
  
                        ByteArrayOutputStream bos = new ByteArrayOutputStream();
                        ObjectOutputStream os = new ObjectOutputStream(bos);
                        os.writeObject(message);
                        os.close();
                        byte data[] = bos.toByteArray();
  
                        if (dest == null) {
                                transport.send(topic, data, false, true);
                        } else {
                                transport.send(dest, topic, data, false, true);
                        }
  
                } catch (Exception e) {
                        failureHandler(e, "Could not send the message to the cluster");
                }
        }
  
        //Called by a session when it is closing
        void sessionClosing(ClusterTopicSession who) {
                synchronized (createdSessions) {
                        createdSessions.remove(who);
                }
        }
  
        /**
         * setClientID method comment.
         */
        public void setClientID(java.lang.String arg1) throws javax.jms.JMSException {
                throw new javax.jms.JMSException("The client Id cannot be changed");
        }
  
        /**
         * setExceptionListener method comment.
         */
        public void setExceptionListener(javax.jms.ExceptionListener listener) throws 
javax.jms.JMSException {
                if (closed)
                        throw new IllegalStateException("The connection is closed");
  
                exceptionListener = listener;
        }
  
        /**
         * start method comment.
         */
        public void start() throws javax.jms.JMSException {
                if (closed)
                        throw new IllegalStateException("The connection is closed");
  
                if (!modeStop)
                        return;
                modeStop = false;
  
        }
  
        /**
         * stop method comment.
         */
        public void stop() throws javax.jms.JMSException {
                if (closed)
                        throw new IllegalStateException("The connection is closed");
  
                if (modeStop)
                        return;
                modeStop = true;
        }
  }
  
  
  1.1                  
jbossmq/src/main/org/jbossmq/cluster/jms/ClusterTopicConnectionFactory.java
  
  Index: ClusterTopicConnectionFactory.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.cluster.jms;
  
  import javax.jms.TopicConnection;
  import javax.jms.TopicConnectionFactory;
  import javax.jms.JMSException;
  
  import java.io.Serializable;
  import java.util.Properties;
  
  import org.jbossmq.Log;
  import org.jbossmq.xml.XElement;
  
  /**
   *    This class implements javax.jms.TopicConnectionFactory
   *      
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class ClusterTopicConnectionFactory implements java.io.Serializable, 
javax.jms.TopicConnectionFactory {
  
        private XElement serverXElement;
  
        public ClusterTopicConnectionFactory() throws Exception {
  
                //Load the property file
                java.io.InputStream in = 
getClass().getClassLoader().getResource("jbossmq-cluster.xml").openStream();
                serverXElement = XElement.createFrom(in);
                in.close();
  
                // Make sure that we loaded the right type of xml file
                if (!serverXElement.getName().equals("Server")) {
                        throw new JMSException("The jbossmq-cluster.xml file is 
invalid.");
                }
  
        }
  
        public TopicConnection createTopicConnection() throws JMSException {
                try {
                        return new ClusterTopicConnection(serverXElement);
                } catch (JMSException e) {
                        throw e;
                } catch (Exception e) {
                        failureHandler(e, "ClusterTopicConnection creation has failed 
!");
                        return null;
                }
        }
  
        public TopicConnection createTopicConnection(String userName, String password) 
throws JMSException {
                return createTopicConnection();
        }
  
        private void failureHandler(Exception e, String reason) throws JMSException {
                Log.error(e);
                throw new JMSException(reason);
        }
  
  }
  
  
  1.1                  
jbossmq/src/main/org/jbossmq/cluster/jms/ClusterTopicPublisher.java
  
  Index: ClusterTopicPublisher.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.cluster.jms;
  
  import javax.jms.TopicPublisher;
  import javax.jms.JMSException;
  import javax.jms.Message;
  import javax.jms.Topic;
  import javax.jms.InvalidDestinationException;
  import javax.jms.DeliveryMode;
  
  import org.jbossmq.SpyMessage;
  
  /**
   *    This class implements javax.jms.TopicPublisher
   *      
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class ClusterTopicPublisher implements TopicPublisher {
  
        protected int defaultDeliveryMode = SpyMessage.DEFAULT_DELIVERY_MODE;
        protected int defaultPriority = SpyMessage.DEFAULT_PRIORITY;
        protected long defaultTTL = SpyMessage.DEFAULT_TIME_TO_LIVE;
        private boolean disableMessageID = false;
        private boolean disableTS = false;
        //The session to which this publisher is linked
        private ClusterTopicSession session;
        //The topic of this publisher
        private Topic myTopic = null;
  
        ClusterTopicPublisher(ClusterTopicSession s, Topic t) {
                session = s;
                myTopic = t;
        }
  
        public Topic getTopic() throws JMSException {
                return myTopic;
        }
  
        public void publish(Message message) throws JMSException {
                if (myTopic == null)
                        throw new InvalidDestinationException("I do not have a default 
Destination !");
                publish(myTopic, message, defaultDeliveryMode, defaultPriority, 
defaultTTL);
        }
  
        public void publish(Topic topic, Message message) throws JMSException {
                publish(topic, message, defaultDeliveryMode, defaultPriority, 
defaultTTL);
        }
  
        public void publish(Message message, int deliveryMode, int priority, long 
timeToLive) throws JMSException {
                if (myTopic == null)
                        throw new InvalidDestinationException("Destination is null !");
                publish(myTopic, message, deliveryMode, priority, timeToLive);
        }
  
        public void publish(Topic topic, Message message, int deliveryMode, int 
priority, long timeToLive)
                throws JMSException {
                //Set the header fields
                message.setJMSDestination(topic);
                message.setJMSDeliveryMode(deliveryMode);
                long ts = System.currentTimeMillis();
                message.setJMSTimestamp(ts);
                if (timeToLive == 0) {
                        message.setJMSExpiration(0);
                } else {
                        message.setJMSExpiration(timeToLive + ts);
                }
                message.setJMSPriority(priority);
                message.setJMSMessageID(session.connection.getNextMessageId());
  
                //Clone the message so we can make the outbound message read only
                SpyMessage clone = ((SpyMessage) message).myClone();
                clone.setReadOnlyMode();
  
                //Send the message
                session.connection.send(clone);
  
        }
  
        public void close() throws JMSException {
                //Is there anything useful to do ?
                //Let the GC do its work !
        }
  
        public int getDeliveryMode() throws JMSException {
                return defaultDeliveryMode;
        }
  
        public boolean getDisableMessageID() throws JMSException {
                return disableMessageID;
        }
  
        public boolean getDisableMessageTimestamp() throws JMSException {
                return disableTS;
        }
  
        public int getPriority() throws JMSException {
                return defaultPriority;
        }
  
        public long getTimeToLive() throws JMSException {
                return defaultTTL;
        }
  
        public void setDeliveryMode(int deli) throws JMSException {
                if (deli == Message.DEFAULT_DELIVERY_MODE)
                        defaultDeliveryMode = DeliveryMode.NON_PERSISTENT;
                else if (deli != DeliveryMode.NON_PERSISTENT && deli != 
DeliveryMode.PERSISTENT)
                        throw new JMSException("Bad DeliveryMode value");
                else
                        defaultDeliveryMode = deli;
        }
  
        public void setDisableMessageID(boolean value) throws JMSException {
                disableMessageID = value;
        }
  
        public void setDisableMessageTimestamp(boolean value) throws JMSException {
                disableTS = value;
        }
  
        public void setPriority(int pri) throws JMSException {
                if (pri == Message.DEFAULT_PRIORITY)
                        defaultPriority = 4;
                else if (pri < 0 || pri > 9)
                        throw new JMSException("Bad priority value");
                else
                        defaultPriority = pri;
        }
  
        public void setTimeToLive(int timeToLive) throws JMSException {
                if (timeToLive == Message.DEFAULT_TIME_TO_LIVE)
                        timeToLive = 0;
                else if (timeToLive < 0)
                        throw new JMSException("Bad TimeToLive value");
                else
                        defaultTTL = timeToLive;
        }
  
        public void setTimeToLive(long timeToLive) throws JMSException {
                if (timeToLive == Message.DEFAULT_TIME_TO_LIVE)
                        timeToLive = 0;
                else if (timeToLive < 0)
                        throw new JMSException("Bad TimeToLive value");
                else
                        defaultTTL = timeToLive;
        }
  }
  
  
  1.1                  
jbossmq/src/main/org/jbossmq/cluster/jms/ClusterTopicSession.java
  
  Index: ClusterTopicSession.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.cluster.jms;
  
  import javax.jms.TopicSession;
  import javax.jms.Topic;
  import javax.jms.Destination;
  import javax.jms.TopicSubscriber;
  import javax.jms.JMSException;
  import javax.jms.TopicPublisher;
  import javax.jms.TemporaryTopic;
  import javax.jms.MessageListener;
  import javax.jms.Message;
  import javax.jms.StreamMessage;
  import javax.jms.BytesMessage;
  import javax.jms.TextMessage;
  import javax.jms.MapMessage;
  import javax.jms.ObjectMessage;
  
  import java.util.Collection;
  import java.util.HashSet;
  import java.util.HashMap;
  import java.util.Iterator;
  import java.io.Serializable;
  import java.io.FileOutputStream;
  import java.io.ObjectOutputStream;
  import java.io.IOException;
   
  import org.jbossmq.Log;
  import org.jbossmq.selectors.Selector;
  import org.jbossmq.SpyObjectMessage;
  import org.jbossmq.SpyStreamMessage;
  import org.jbossmq.SpyTextMessage;
  import org.jbossmq.SpyMessage;
  import org.jbossmq.SpyBytesMessage;
  import org.jbossmq.SpyMapMessage;
  
  
  /**
   *    This class implements javax.jms.TopicSession 
   *      
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class ClusterTopicSession implements TopicSession
  {
  
        //Is the session closed ?
        boolean closed;
        ClusterTopicConnection connection;
        //MessageConsumers created by this session
        protected HashSet consumers = new HashSet();
  
        ClusterTopicSession(ClusterTopicConnection myConnection) {
                connection = myConnection;
        }
  
     
        public Topic createTopic(String topicName) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");
                return connection.createTopic(topicName);
        }
  
        public TopicSubscriber createSubscriber(Topic topic) throws JMSException
        {
                return createSubscriber(topic,null,false);
        }
  
        public TopicSubscriber createSubscriber(Topic topic, String messageSelector, 
boolean noLocal) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                ClusterTopicSubscriber sub=new 
ClusterTopicSubscriber(this,topic,noLocal, messageSelector);
                                                                                
                synchronized (consumers) {
                        HashSet newMap=(HashSet)consumers.clone();      
                        newMap.add(sub);
                        consumers=newMap;
                }
                connection.addConsumer(sub);
                
                return sub;
        }
  
        public TopicSubscriber createDurableSubscriber(Topic topic, String name) 
throws JMSException
        {
                throw new javax.jms.JMSException("This feature is not implemented");
        }
  
        public TopicSubscriber createDurableSubscriber(Topic topic, String name, 
String messageSelector, boolean noLocal) throws JMSException
        {
                throw new javax.jms.JMSException("This feature is not implemented");
        }
  
        public TopicPublisher createPublisher(Topic topic) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");
                return new ClusterTopicPublisher(this,topic);
        }
        
        public TemporaryTopic createTemporaryTopic() throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
                                                                         
                return connection.getTemporaryTopic();
        }
  
        public void unsubscribe(String name) throws JMSException
        {
                throw new javax.jms.JMSException("This feature is not implemented");
        }
  
        /**
         * close method comment.
         */
        synchronized public void close() throws javax.jms.JMSException {
  
                if (closed)
                        return;
                closed = true;
  
            Iterator i =consumers.iterator();
            while (i.hasNext()) {
                ClusterTopicSubscriber s = (ClusterTopicSubscriber)i.next();
                s.close();
            }
  
            connection.sessionClosing(this);
  
        }
  
        /**
         * commit method comment.
         */
        public void commit() throws javax.jms.JMSException {
                throw new javax.jms.JMSException("This feature is not implemented");
        }
  
        public BytesMessage createBytesMessage() throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                
                SpyBytesMessage message = new SpyBytesMessage();
                message.producerClientId = connection.getClientID();
                return message;
        }
  
        public MapMessage createMapMessage() throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                         
                SpyMapMessage message = new SpyMapMessage();
                message.producerClientId = connection.getClientID();
                return message;
        }
  
        public Message createMessage() throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                         
                SpyMessage message = new SpyMessage();
                message.producerClientId = connection.getClientID();
                return message;
        }
  
        public ObjectMessage createObjectMessage() throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                         
                SpyObjectMessage message = new SpyObjectMessage();
                message.producerClientId = connection.getClientID();
                return message;
        }
  
        public ObjectMessage createObjectMessage(Serializable object) throws 
JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                
                SpyObjectMessage message=new SpyObjectMessage();
                message.setObject(object);
                message.producerClientId = connection.getClientID();
                return message;
        }
  
        public StreamMessage createStreamMessage() throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                         
                SpyStreamMessage message = new SpyStreamMessage();
                message.producerClientId = connection.getClientID();
                return message;
        }
  
        public TextMessage createTextMessage() throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                         
                SpyTextMessage message = new SpyTextMessage();
                message.producerClientId = connection.getClientID();
                return message;
        }
  
        public TextMessage createTextMessage(String string) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                         
                SpyTextMessage message=new SpyTextMessage();
                message.setText(string);
                message.producerClientId = connection.getClientID();
                return message;
        }
  
        /**
         * getMessageListener method comment.
         */
        public javax.jms.MessageListener getMessageListener() throws 
javax.jms.JMSException {
                throw new javax.jms.JMSException("This feature is not implemented");
        }
  
        /**
         * getTransacted method comment.
         */
        public boolean getTransacted() throws javax.jms.JMSException {
                throw new javax.jms.JMSException("This feature is not implemented");
        }
  
        /**
         * recover method comment.
         */
        public void recover() throws javax.jms.JMSException {
                throw new javax.jms.JMSException("This feature is not implemented");
        }
  
        /**
         * rollback method comment.
         */
        public void rollback() throws javax.jms.JMSException {
                throw new javax.jms.JMSException("This feature is not implemented");
        }
  
        /**
         * run method comment.
         */
        public void run() {
                // This is an ASF requirment which we will not be implementing. 
        }
  
        /**
         * setMessageListener method comment.
         */
        public void setMessageListener(javax.jms.MessageListener arg1) throws 
javax.jms.JMSException {
                throw new javax.jms.JMSException("This feature is not implemented");
        }
  }
  
  
  1.1                  
jbossmq/src/main/org/jbossmq/cluster/jms/ClusterTopicSubscriber.java
  
  Index: ClusterTopicSubscriber.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.cluster.jms;
  
  import javax.jms.TopicSubscriber;
  import javax.jms.JMSException;
  import javax.jms.Topic;
  import javax.jms.Message;
  import javax.jms.MessageListener;
  
  import org.jbossmq.selectors.Selector;
  import org.jbossmq.cluster.transport.NodeId;
  import org.jbossmq.SpyMessage;
  
  import EDU.oswego.cs.dl.util.concurrent.BoundedPriorityQueue;
  
  /**
   *    This class implements javax.jms.TopicSubscriber
   *      
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class ClusterTopicSubscriber implements TopicSubscriber {
  
        // The topic I registered
        Topic topic;
        //Am I closed ?
        boolean closed;
        //Do i have a thread in one of my receive methods?
        boolean isReceiving;
        //My message listener (null if none)
        MessageListener messageListener;
        BoundedPriorityQueue messageQueue = new BoundedPriorityQueue(50);
        // Should I not accept local messages.
        boolean noLocal;
        // The selector to apply to the messages
        String selector;
        // The session this was created with
        ClusterTopicSession session;
  
        ClusterTopicSubscriber(ClusterTopicSession session, Topic topic, boolean 
noLocal, String selector) {
                this.session = session;
                this.topic = topic;
                this.selector = selector;
                this.noLocal = noLocal;
        }
  
        synchronized public Topic getTopic() throws JMSException {
                if (closed)
                        throw new IllegalStateException("The MessageConsumer is 
closed");
                return topic;
        }
  
        synchronized public boolean getNoLocal() throws JMSException {
                if (closed)
                        throw new IllegalStateException("The MessageConsumer is 
closed");
                return noLocal;
        }
  
        /**
         * close method comment.
         */
        synchronized public void close() throws javax.jms.JMSException {
  
                if (closed)
                        return;
  
                if (messageListener == null && isReceiving) {
                        //A consumer could be waiting in receive()
                        try {
                                messageQueue.put(this);
                        } catch (InterruptedException ignore) {
                                Thread.currentThread().interrupt();
                        }
                }
  
                closed = true;
        }
  
        /**
         * getMessageListener method comment.
         */
        synchronized public javax.jms.MessageListener getMessageListener() throws 
javax.jms.JMSException {
                if (closed)
                        throw new IllegalStateException("The MessageConsumer is 
closed");
                return messageListener;
        }
  
        /**
         * getMessageSelector method comment.
         */
        synchronized public java.lang.String getMessageSelector() throws 
javax.jms.JMSException {
                if (closed)
                        throw new IllegalStateException("The MessageConsumer is 
closed");
                return selector;
        }
  
        /**
         * Used internally to dispatch a message to the client
         */
        synchronized public void onMessage(NodeId sender, SpyMessage message) throws 
InterruptedException {
  
                if (closed)
                        return;
                if (session.connection.modeStop)
                        return;
                if (noLocal && 
sender.equals(session.connection.transport.getLocalNodeId()))
                        return;
  
                if (messageListener == null) {
                        // If your receive messageQueue is full... You loose it buddy!
                        messageQueue.offer(message, 0);
                } else {
                        messageListener.onMessage(message);
                }
  
        }
  
        /**
         * receive method comment.
         */
        public javax.jms.Message receive() throws javax.jms.JMSException {
  
                // close could be updated by a different thread
                synchronized (this) {
                        if (closed)
                                throw new IllegalStateException("The MessageConsumer 
is closed");
                        if (messageListener != null)
                                throw new JMSException("A message listener is already 
registered");
                        isReceiving = true;
                }
  
                try {
  
                        while (session.connection.modeStop) {
                                if (closed)
                                        return null;
                                // Wait for the connection to be started.
                                Thread.sleep(500);
                        }
  
                        Object o = messageQueue.take();
                        if (o == this) {
                                // this is a signal that we have closed
                                return null;
                        }
  
                        return (Message) o;
  
                } catch (InterruptedException e) {
                        session.connection.failureHandler(e, "Receive interrupted");
                } finally {
                        synchronized (this) {
                                isReceiving = false;
                        }
                }
  
                return null;
        }
  
        /**
         * receive method comment.
         */
        public javax.jms.Message receive(long arg1) throws javax.jms.JMSException {
                throw new javax.jms.JMSException("This feature is not implemented");
        }
  
        /**
         * receiveNoWait method comment.
         */
        public javax.jms.Message receiveNoWait() throws javax.jms.JMSException {
                throw new javax.jms.JMSException("This feature is not implemented");
        }
  
        /**
         * setMessageListener method comment.
         */
        synchronized public void setMessageListener(javax.jms.MessageListener 
listener) throws javax.jms.JMSException {
  
                if (closed)
                        throw new IllegalStateException("The MessageConsumer is 
closed");
                if (isReceiving)
                        throw new JMSException("This MessageConsumer is waiting in 
receive() !");
  
                messageListener = listener;
  
        }
  }
  
  

Reply via email to