Author: chirino
Date: Mon Jun 11 09:53:38 2007
New Revision: 546196
URL: http://svn.apache.org/viewvc?view=rev&rev=546196
Log:
Added a cluster field to each message so that each message can be persisted to
a unique set of brokers working together as a master slave cluster.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MessageMarshaller.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/MessageTestSupport.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?view=diff&rev=546196&r1=546195&r2=546196
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
Mon Jun 11 09:53:38 2007
@@ -82,6 +82,7 @@
private BrokerId [] brokerPath;
protected boolean droppable = false;
+ private BrokerId [] cluster;
abstract public Message copy();
@@ -610,5 +611,19 @@
}
public void setDroppable(boolean droppable) {
this.droppable = droppable;
+ }
+
+ /**
+ * If a message is stored in multiple nodes on a cluster,
+ * all the cluster members will be listed here.
+ * Otherwise, it will be null.
+ *
+ * @openwire:property version=3 cache=true
+ */
+ public BrokerId[] getCluster() {
+ return cluster;
+ }
+ public void setCluster(BrokerId[] cluster) {
+ this.cluster = cluster;
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MessageMarshaller.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MessageMarshaller.java?view=diff&rev=546196&r1=546195&r2=546196
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MessageMarshaller.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MessageMarshaller.java
Mon Jun 11 09:53:38 2007
@@ -92,6 +92,18 @@
info.setRecievedByDFBridge(bs.readBoolean());
info.setDroppable(bs.readBoolean());
+ if (bs.readBoolean()) {
+ short size = dataIn.readShort();
+ org.apache.activemq.command.BrokerId value[] = new
org.apache.activemq.command.BrokerId[size];
+ for( int i=0; i < size; i++ ) {
+ value[i] = (org.apache.activemq.command.BrokerId)
tightUnmarsalNestedObject(wireFormat,dataIn, bs);
+ }
+ info.setCluster(value);
+ }
+ else {
+ info.setCluster(null);
+ }
+
info.afterUnmarshall(wireFormat);
}
@@ -130,6 +142,7 @@
rc += tightMarshalString1(info.getUserID(), bs);
bs.writeBoolean(info.isRecievedByDFBridge());
bs.writeBoolean(info.isDroppable());
+ rc += tightMarshalObjectArray1(wireFormat, info.getCluster(), bs);
return rc + 9;
}
@@ -171,6 +184,7 @@
tightMarshalString2(info.getUserID(), dataOut, bs);
bs.readBoolean();
bs.readBoolean();
+ tightMarshalObjectArray2(wireFormat, info.getCluster(), dataOut, bs);
info.afterMarshall(wireFormat);
@@ -228,6 +242,18 @@
info.setRecievedByDFBridge(dataIn.readBoolean());
info.setDroppable(dataIn.readBoolean());
+ if (dataIn.readBoolean()) {
+ short size = dataIn.readShort();
+ org.apache.activemq.command.BrokerId value[] = new
org.apache.activemq.command.BrokerId[size];
+ for( int i=0; i < size; i++ ) {
+ value[i] = (org.apache.activemq.command.BrokerId)
looseUnmarsalNestedObject(wireFormat,dataIn);
+ }
+ info.setCluster(value);
+ }
+ else {
+ info.setCluster(null);
+ }
+
info.afterUnmarshall(wireFormat);
}
@@ -269,6 +295,7 @@
looseMarshalString(info.getUserID(), dataOut);
dataOut.writeBoolean(info.isRecievedByDFBridge());
dataOut.writeBoolean(info.isDroppable());
+ looseMarshalObjectArray(wireFormat, info.getCluster(), dataOut);
}
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/MessageTestSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/MessageTestSupport.java?view=diff&rev=546196&r1=546195&r2=546196
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/MessageTestSupport.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/MessageTestSupport.java
Mon Jun 11 09:53:38 2007
@@ -81,5 +81,12 @@
info.setUserID("UserID:16");
info.setRecievedByDFBridge(true);
info.setDroppable(false);
+ {
+ BrokerId value[] = new BrokerId[2];
+ for( int i=0; i < 2; i++ ) {
+ value[i] = createBrokerId("Cluster:17");
+ }
+ info.setCluster(value);
+ }
}
}