User: hiram
Date: 01/02/21 22:12:28
Modified: src/main/org/jbossmq/cluster Cluster.java
ClusterTesterReceiver.java ClusterTesterSender.java
InvalidConfigurationException.java
InvalidStateException.java SerializerUtil.java
TopicListener.java Transport.java
TransportListener.java
Added: src/main/org/jbossmq/cluster NodeId.java
Log:
Improved easy of use by removing the requirment of having to explicitly set the
nodeId of a cluster.
We now uses normal UDP packets for resend requests (In other words resend requests
are not broadcasted).
Message transmition rates are slowed down when a node starts receiving resend
requests.
Revision Changes Path
1.2 +23 -33 jbossmq/src/main/org/jbossmq/cluster/Cluster.java
Index: Cluster.java
===================================================================
RCS file: /products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/Cluster.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- Cluster.java 2001/02/15 03:33:04 1.1
+++ Cluster.java 2001/02/22 06:12:26 1.2
@@ -36,7 +36,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class Cluster {
@@ -48,10 +48,8 @@
// The transport layer that the cluster will be using
private Transport transport;
- // The node id of the local process. All process in a cluster MUST
- // have unique nodeIds
- public short nodeId;
+
// The managment topic is reserved for the transport layer's use.
public static final short MANAGMENT_TOPIC = 0;
@@ -60,7 +58,7 @@
// easy to use.
private MyTransportListener transportListener = new MyTransportListener();
private class MyTransportListener implements TransportListener {
- public void messageArrivedEvent(short topic, short senderId, byte
message[])
+ public void messageArrivedEvent(short topic, NodeId senderId, byte
message[])
throws InterruptedException {
Short id = new Short(topic);
@@ -116,19 +114,9 @@
transport.setCluster(this);
}
- /**
- * Every node in the cluster has to have a unique id.
- */
- public void setNodeId(short newNodeId) {
- nodeId = newNodeId;
- }
- /**
- * The id of this node of the cluster.
- */
- public short getNodeId() {
- return nodeId;
- }
+
+
/**
* The transport being used by the cluster.
@@ -249,23 +237,7 @@
}
- /**
- * Used internally to send a message to all the local listeners.
- */
- private void messageArrivedEvent(short topic, short sender, Object message)
- throws InterruptedException {
- LinkedList ll = (LinkedList) topicListeners.get(new Short(topic));
- if (ll == null) {
- // No listeners...
- return;
- }
- Iterator i = ll.iterator();
- while (i.hasNext()) {
- TopicListener c = (TopicListener) i.next();
- c.onMessage(topic, sender, message);
- }
- }
/**
* Send a message to the cluster on the given channel.
@@ -281,7 +253,7 @@
throws IOException, InterruptedException {
if (localBroadcast)
- messageArrivedEvent(topic, getNodeId(), message);
+ messageArrivedEvent(topic, null, message);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream os = new ObjectOutputStream(bos);
@@ -296,5 +268,23 @@
byte data[] = bos.toByteArray();
transport.send(topic, data, droppable, keepOrder);
+ }
+
+ /**
+ * Used internally to send a message to all the local listeners.
+ */
+ private void messageArrivedEvent(short topic, NodeId sender, Object message)
+ throws InterruptedException {
+ LinkedList ll = (LinkedList) topicListeners.get(new Short(topic));
+ if (ll == null) {
+ // No listeners...
+ return;
+ }
+
+ Iterator i = ll.iterator();
+ while (i.hasNext()) {
+ TopicListener c = (TopicListener) i.next();
+ c.onMessage(topic, sender, message);
+ }
}
}
1.2 +1 -2 jbossmq/src/main/org/jbossmq/cluster/ClusterTesterReceiver.java
Index: ClusterTesterReceiver.java
===================================================================
RCS file:
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/ClusterTesterReceiver.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- ClusterTesterReceiver.java 2001/02/15 03:33:04 1.1
+++ ClusterTesterReceiver.java 2001/02/22 06:12:26 1.2
@@ -12,7 +12,7 @@
/**
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class ClusterTesterReceiver {
@@ -28,14 +28,13 @@
is.close();
transport.setProperties(p);
- cluster.setNodeId(Short.parseShort(p.getProperty("NodeId")));
cluster.setTransport(transport);
short echoChannelId = 13;
final Channel echoChannel = new BoundedLinkedQueue(10);
TopicListener myListener = new TopicListener() {
- public void onMessage(short topic, short sender, Object m) {
+ public void onMessage(short topic, NodeId sender, Object m) {
try {
System.out.println("Got :" + m);
//echoChannel.put( m );
1.2 +0 -1 jbossmq/src/main/org/jbossmq/cluster/ClusterTesterSender.java
Index: ClusterTesterSender.java
===================================================================
RCS file:
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/ClusterTesterSender.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- ClusterTesterSender.java 2001/02/15 03:33:04 1.1
+++ ClusterTesterSender.java 2001/02/22 06:12:26 1.2
@@ -12,7 +12,7 @@
/**
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class ClusterTesterSender {
@@ -32,7 +32,6 @@
is.close();
transport.setProperties(p);
- cluster.setNodeId(Short.parseShort(p.getProperty("NodeId")));
cluster.setTransport(transport);
transport.start();
1.2 +0 -0
jbossmq/src/main/org/jbossmq/cluster/InvalidConfigurationException.java
Index: InvalidConfigurationException.java
===================================================================
RCS file:
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/InvalidConfigurationException.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- InvalidConfigurationException.java 2001/02/15 03:33:04 1.1
+++ InvalidConfigurationException.java 2001/02/22 06:12:26 1.2
@@ -11,7 +11,7 @@
/**
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class InvalidConfigurationException extends Exception {
1.2 +0 -0 jbossmq/src/main/org/jbossmq/cluster/InvalidStateException.java
Index: InvalidStateException.java
===================================================================
RCS file:
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/InvalidStateException.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- InvalidStateException.java 2001/02/15 03:33:05 1.1
+++ InvalidStateException.java 2001/02/22 06:12:26 1.2
@@ -11,7 +11,7 @@
/**
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class InvalidStateException extends Exception {
1.2 +12 -0 jbossmq/src/main/org/jbossmq/cluster/SerializerUtil.java
Index: SerializerUtil.java
===================================================================
RCS file:
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/SerializerUtil.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SerializerUtil.java 2001/02/15 03:33:05 1.1
+++ SerializerUtil.java 2001/02/22 06:12:26 1.2
@@ -14,7 +14,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class SerializerUtil {
@@ -84,6 +84,18 @@
public static void writeShortTo(short v, byte[] data, int pos) {
data[pos] = (byte) ((v >>> 8) & 0xFF);
+ data[pos + 1] = (byte) ((v >>> 0) & 0xFF);
+ }
+
+ public static int readUShortFrom(byte[] data, int offset) {
+ return (
+ ((data[offset + 0] & 0xFF) << 8)
+ | ((data[offset + 1] & 0xFF) << 0)
+ );
+ }
+
+ static public void writeUShortTo(int v, byte[] data, int pos) {
+ data[pos + 0] = (byte) ((v >>> 8) & 0xFF);
data[pos + 1] = (byte) ((v >>> 0) & 0xFF);
}
}
1.2 +2 -1 jbossmq/src/main/org/jbossmq/cluster/TopicListener.java
Index: TopicListener.java
===================================================================
RCS file:
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/TopicListener.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- TopicListener.java 2001/02/15 03:33:05 1.1
+++ TopicListener.java 2001/02/22 06:12:27 1.2
@@ -14,9 +14,10 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*
*/
public interface TopicListener {
- public void onMessage( short topic, short sender, Object message );
+
+ public void onMessage( short topic, NodeId sender, Object message );
}
1.2 +0 -0 jbossmq/src/main/org/jbossmq/cluster/Transport.java
Index: Transport.java
===================================================================
RCS file: /products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/Transport.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- Transport.java 2001/02/15 03:33:05 1.1
+++ Transport.java 2001/02/22 06:12:27 1.2
@@ -13,7 +13,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*
*/
public interface Transport {
1.2 +2 -1 jbossmq/src/main/org/jbossmq/cluster/TransportListener.java
Index: TransportListener.java
===================================================================
RCS file:
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/TransportListener.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- TransportListener.java 2001/02/15 03:33:05 1.1
+++ TransportListener.java 2001/02/22 06:12:27 1.2
@@ -16,8 +16,9 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public interface TransportListener {
- public void messageArrivedEvent( short channelId, short senderId, byte
message[] ) throws InterruptedException;
+
+ public void messageArrivedEvent( short channelId, NodeId senderId, byte
message[] ) throws InterruptedException;
}
1.1 jbossmq/src/main/org/jbossmq/cluster/NodeId.java
Index: NodeId.java
===================================================================
package org.jbossmq.cluster;
import java.lang.Object;
/**
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public interface NodeId {
}