User: hiram   
  Date: 01/02/27 20:35:00

  Modified:    src/main/org/jbossmq/cluster Cluster.java
                        InvalidConfigurationException.java
                        InvalidStateException.java NodeId.java
                        SerializerUtil.java TopicListener.java
                        Transport.java TransportListener.java
  Added:       src/main/org/jbossmq/cluster TestReceiver.java
                        TestSender.java
  Removed:     src/main/org/jbossmq/cluster ClusterTesterReceiver.java
                        ClusterTesterSender.java
  Log:
  We can now send point 2 point or broadcast messages in the cluster.  Refactored alot 
of the udp implementation.
  
  Revision  Changes    Path
  1.3       +42 -0     jbossmq/src/main/org/jbossmq/cluster/Cluster.java
  
  Index: Cluster.java
  ===================================================================
  RCS file: /products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/Cluster.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- Cluster.java      2001/02/22 06:12:26     1.2
  +++ Cluster.java      2001/02/28 04:34:56     1.3
  @@ -36,7 +36,7 @@
    *
    * @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - * @version $Revision: 1.2 $
  + * @version $Revision: 1.3 $
    */
   public class Cluster {
   
  @@ -286,5 +286,47 @@
                        TopicListener c = (TopicListener) i.next();
                        c.onMessage(topic, sender, message);
                }
  +     }
  +
  +     /**
  +      * Send a message to the cluster on the given channel.
  +      * The message will also be sent to the local node.
  +      * The message will be marked as not droppable.
  +      */
  +     public void send(NodeId dest, short topic, Object message)
  +             throws IOException, InterruptedException {
  +             // Send w/ local broacast && non droppable
  +             send(dest, topic, message, true, false, true);
  +     }
  +
  +     /**
  +      * Send a message to the node on the given channel.
  +      * The local node will also get the message if localBroadcast is true.
  +      * The droppable flag is hint to the transport layer.
  +      */
  +     public void send(NodeId dest,
  +             short topic,
  +             Object message,
  +             boolean localBroadcast,
  +             boolean droppable,
  +             boolean keepOrder)
  +             throws IOException, InterruptedException {
  +
  +             if (localBroadcast)
  +                     messageArrivedEvent(topic, null, message);
  +
  +             ByteArrayOutputStream bos = new ByteArrayOutputStream();
  +             ObjectOutputStream os = new ObjectOutputStream(bos);
  +
  +             if (message instanceof Externalizable) {
  +                     ((Externalizable) message).writeExternal(os);
  +             } else {
  +                     os.writeObject(message);
  +             }
  +             os.close();
  +
  +             byte data[] = bos.toByteArray();
  +             transport.send(dest, topic, data, droppable, keepOrder);
  +
        }
   }
  
  
  
  1.3       +0 -0      
jbossmq/src/main/org/jbossmq/cluster/InvalidConfigurationException.java
  
  Index: InvalidConfigurationException.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/InvalidConfigurationException.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- InvalidConfigurationException.java        2001/02/22 06:12:26     1.2
  +++ InvalidConfigurationException.java        2001/02/28 04:34:57     1.3
  @@ -11,7 +11,7 @@
   /**
    * @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - * @version $Revision: 1.2 $
  + * @version $Revision: 1.3 $
    */
   public class InvalidConfigurationException extends Exception {
        
  
  
  
  1.3       +0 -0      jbossmq/src/main/org/jbossmq/cluster/InvalidStateException.java
  
  Index: InvalidStateException.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/InvalidStateException.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- InvalidStateException.java        2001/02/22 06:12:26     1.2
  +++ InvalidStateException.java        2001/02/28 04:34:57     1.3
  @@ -11,7 +11,7 @@
   /**
    * @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - * @version $Revision: 1.2 $
  + * @version $Revision: 1.3 $
    */
   public class InvalidStateException extends Exception {
        
  
  
  
  1.2       +0 -0      jbossmq/src/main/org/jbossmq/cluster/NodeId.java
  
  Index: NodeId.java
  ===================================================================
  RCS file: /products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/NodeId.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- NodeId.java       2001/02/22 06:12:26     1.1
  +++ NodeId.java       2001/02/28 04:34:58     1.2
  @@ -6,7 +6,7 @@
    * 
    * @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - * @version $Revision: 1.1 $
  + * @version $Revision: 1.2 $
    */
   public interface NodeId {
   }
  
  
  
  1.3       +0 -0      jbossmq/src/main/org/jbossmq/cluster/SerializerUtil.java
  
  Index: SerializerUtil.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/SerializerUtil.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- SerializerUtil.java       2001/02/22 06:12:26     1.2
  +++ SerializerUtil.java       2001/02/28 04:34:58     1.3
  @@ -14,7 +14,7 @@
    *
    * @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - * @version $Revision: 1.2 $
  + * @version $Revision: 1.3 $
    */
   public class SerializerUtil {
   
  
  
  
  1.3       +0 -0      jbossmq/src/main/org/jbossmq/cluster/TopicListener.java
  
  Index: TopicListener.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/TopicListener.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- TopicListener.java        2001/02/22 06:12:27     1.2
  +++ TopicListener.java        2001/02/28 04:34:58     1.3
  @@ -14,7 +14,7 @@
    *
    * @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - * @version $Revision: 1.2 $
  + * @version $Revision: 1.3 $
    *
    */
   public interface TopicListener {
  
  
  
  1.3       +1 -0      jbossmq/src/main/org/jbossmq/cluster/Transport.java
  
  Index: Transport.java
  ===================================================================
  RCS file: /products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/Transport.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- Transport.java    2001/02/22 06:12:27     1.2
  +++ Transport.java    2001/02/28 04:34:58     1.3
  @@ -13,7 +13,7 @@
    *
    * @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - * @version $Revision: 1.2 $
  + * @version $Revision: 1.3 $
    *
    */
   public interface Transport {
  @@ -22,4 +22,5 @@
        public void setTransportListener(TransportListener c);
        public void send(short channelId, byte data[], boolean droppable, boolean 
keepOrder) throws IOException, InterruptedException;
        
  +     public void send(NodeId dest, short channelId, byte data[], boolean droppable, 
boolean keepOrder) throws IOException, InterruptedException;
   }
  
  
  
  1.3       +0 -0      jbossmq/src/main/org/jbossmq/cluster/TransportListener.java
  
  Index: TransportListener.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/TransportListener.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- TransportListener.java    2001/02/22 06:12:27     1.2
  +++ TransportListener.java    2001/02/28 04:34:58     1.3
  @@ -16,7 +16,7 @@
    * 
    * @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - * @version $Revision: 1.2 $
  + * @version $Revision: 1.3 $
    */
   public interface TransportListener {
   
  
  
  
  1.1                  jbossmq/src/main/org/jbossmq/cluster/TestReceiver.java
  
  Index: TestReceiver.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.cluster;
  
  import EDU.oswego.cs.dl.util.concurrent.*;
  import org.jbossmq.cluster.udp.UDPTransport;
  
  /**
   * @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   * @version $Revision: 1.1 $
   */
  public class TestReceiver {
  
        public static void main(java.lang.String[] args) throws Exception {
  
                Cluster cluster = new Cluster();
                UDPTransport transport = new UDPTransport();
  
                java.util.Properties p = new java.util.Properties();
                p.setProperty("TransportMode","Multicast");
                p.setProperty("Group","225.1.1.1");
                p.setProperty("Port", "1000");
                
                transport.setProperties(p);
                cluster.setTransport(transport);
  
                short echoChannelId = 13;
  
                final Channel echoChannel = new BoundedLinkedQueue(10);
                TopicListener myListener = new TopicListener() {
                        public void onMessage(short topic, NodeId sender, Object m) {
                                try {
                                        System.out.println("Got :" + m);
                                        //echoChannel.put( m );
                                } catch (Exception e) {
                                }
                        }
                };
  
                cluster.addTopicListener(echoChannelId, myListener);
  
                transport.start();
  
                System.out.println("Waiting for messages");
                while (true) {
                        String t = (String) echoChannel.take();
                        System.out.println("Got :" + t);
                }
        }
  }
  
  
  1.1                  jbossmq/src/main/org/jbossmq/cluster/TestSender.java
  
  Index: TestSender.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.cluster;
  
  import EDU.oswego.cs.dl.util.concurrent.*;
  import org.jbossmq.cluster.udp.UDPTransport;
  
  /**
   * @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   * @version $Revision: 1.1 $
   */
  public class TestSender {
        
        /**
         * Starts the application.
         * @param args an array of command-line arguments
         */
        public static void main(java.lang.String[] args) throws Exception {
  
                Cluster cluster = new Cluster();
                UDPTransport transport = new UDPTransport();
  
                java.util.Properties p = new java.util.Properties();
                p.setProperty("TransportMode","Multicast");
                p.setProperty("Group","225.1.1.1");
                p.setProperty("Port", "1000");
  
                transport.setProperties(p);
                cluster.setTransport(transport);
  
                transport.start();
  
                short echoChannelId = 13;
  
                String t = "Hello World";
                for (int i = 0; i < 1000; i++) {
                        cluster.send(echoChannelId, t + " #" + i);
                }
                System.out.println("Message, sent");
                synchronized (cluster) {
                        cluster.wait();
                }
        }
  }
  
  

Reply via email to