Author: tabish
Date: Mon Oct 25 21:58:04 2010
New Revision: 1027282
URL: http://svn.apache.org/viewvc?rev=1027282&view=rev
Log:
apply patch for: https://issues.apache.org/activemq/browse/AMQ-2988
with modifications to make the properties map unmodifiable and use the generic
Collections.emptyMap method instead of creating a new empty map.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java?rev=1027282&r1=1027281&r2=1027282&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java
Mon Oct 25 21:58:04 2010
@@ -18,6 +18,7 @@ package org.apache.activemq;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.jms.IllegalStateException;
@@ -52,6 +53,7 @@ public class ActiveMQInputStream extends
private boolean eosReached;
private byte buffer[];
private int pos;
+ private Map<String, Object> jmsProperties;
private ProducerId producerId;
private long nextSequenceId;
@@ -135,6 +137,19 @@ public class ActiveMQInputStream extends
}
}
+ /**
+ * Return the JMS Properties which where used to send the InputStream
+ *
+ * @return jmsProperties
+ * @throws IOException
+ */
+ public Map<String, Object> getJMSProperties() throws IOException {
+ if (jmsProperties == null) {
+ fillBuffer();
+ }
+ return jmsProperties;
+ }
+
public ActiveMQMessage receive() throws JMSException {
checkClosed();
MessageDispatch md;
@@ -227,13 +242,24 @@ public class ActiveMQInputStream extends
buffer = new byte[(int)bm.getBodyLength()];
bm.readBytes(buffer);
pos = 0;
+ if (jmsProperties == null) {
+ jmsProperties = Collections.unmodifiableMap(new
HashMap<String, Object>(bm.getProperties()));
+ }
} else {
eosReached = true;
+ if (jmsProperties == null) {
+ // no properties found
+ jmsProperties = Collections.emptyMap();
+ }
}
return;
}
} catch (JMSException e) {
eosReached = true;
+ if (jmsProperties == null) {
+ // no properties found
+ jmsProperties = Collections.emptyMap();
+ }
throw IOExceptionSupport.create(e);
}
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java?rev=1027282&r1=1027281&r2=1027282&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
Mon Oct 25 21:58:04 2010
@@ -18,12 +18,20 @@ package org.apache.activemq.streams;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQInputStream;
import org.apache.activemq.JmsTestSupport;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@@ -38,7 +46,9 @@ public class JMSInputStreamTest extends
protected DataInputStream in;
private ActiveMQConnection connection2;
+ private ActiveMQInputStream amqIn;
+
public static Test suite() {
return suite(JMSInputStreamTest.class);
}
@@ -57,12 +67,27 @@ public class JMSInputStreamTest extends
protected void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
+ }
+
+ /**
+ * Setup connection and streams
+ *
+ * @param props
+ * @throws JMSException
+ */
+ private void setUpConnection(Map<String, Object> props) throws
JMSException {
connection2 = (ActiveMQConnection)factory.createConnection(userName,
password);
connections.add(connection2);
- out = new DataOutputStream(connection.createOutputStream(destination));
- in = new DataInputStream(connection2.createInputStream(destination));
+ OutputStream amqOut;
+ if (props != null) {
+ amqOut = connection.createOutputStream(destination, props,
Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY,
Message.DEFAULT_TIME_TO_LIVE);
+ } else {
+ amqOut = connection.createOutputStream(destination);
+ }
+ out = new DataOutputStream(amqOut);
+ amqIn = (ActiveMQInputStream)
connection2.createInputStream(destination);
+ in = new DataInputStream(amqIn);
}
-
/*
* @see TestCase#tearDown()
*/
@@ -71,6 +96,7 @@ public class JMSInputStreamTest extends
}
public void testStreams() throws Exception {
+ setUpConnection(null);
out.writeInt(4);
out.flush();
assertTrue(in.readInt() == 4);
@@ -85,12 +111,70 @@ public class JMSInputStreamTest extends
out.writeLong(i);
}
out.flush();
+
for (int i = 0; i < 100; i++) {
assertTrue(in.readLong() == i);
}
}
+ // Test for AMQ-2988
+ public void testStreamsWithProperties() throws Exception {
+ String name1 = "PROPERTY_1";
+ String name2 = "PROPERTY_2";
+ String value1 = "VALUE_1";
+ String value2 = "VALUE_2";
+ Map<String,Object> jmsProperties = new HashMap<String, Object>();
+ jmsProperties.put(name1, value1);
+ jmsProperties.put(name2, value2);
+ setUpConnection(jmsProperties);
+
+ out.writeInt(4);
+ out.flush();
+ assertTrue(in.readInt() == 4);
+ out.writeFloat(2.3f);
+ out.flush();
+ assertTrue(in.readFloat() == 2.3f);
+ String str = "this is a test string";
+ out.writeUTF(str);
+ out.flush();
+ assertTrue(in.readUTF().equals(str));
+ for (int i = 0; i < 100; i++) {
+ out.writeLong(i);
+ }
+ out.flush();
+
+ // check properties before we try to read the stream
+ checkProperties(jmsProperties);
+
+ for (int i = 0; i < 100; i++) {
+ assertTrue(in.readLong() == i);
+ }
+
+ // check again after read was done
+ checkProperties(jmsProperties);
+ }
+
+ // check if the received stream has the properties set
+ // Test for AMQ-2988
+ private void checkProperties(Map<String, Object> jmsProperties) throws
IOException {
+ Map<String, Object> receivedJmsProps = amqIn.getJMSProperties();
+
+ // we should at least have the same amount or more properties
+ assertTrue(jmsProperties.size() <= receivedJmsProps.size());
+
+
+ // check the properties to see if we have everything in there
+ Iterator<String> propsIt = jmsProperties.keySet().iterator();
+ while(propsIt.hasNext()) {
+ String key = propsIt.next();
+ assertTrue(receivedJmsProps.containsKey(key));
+ assertEquals(jmsProperties.get(key), receivedJmsProps.get(key));
+ }
+ }
+
public void testLarge() throws Exception {
+ setUpConnection(null);
+
final int testData = 23;
final int dataLength = 4096;
final int count = 1024;