[
https://issues.apache.org/jira/browse/AMQ-4802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13794176#comment-13794176
]
Marc Leineweber commented on AMQ-4802:
--
Hello Timothy,
my test case consists of two classes - one producer and one comsumer. My real
implementation reads input from a file, but even with writing 0-bytes to the
OutputStream, the InputFile reading fails. I write 64 MB (64*1024*1024 bytes).
See my source code below.
Thanks in advance,
Marc
{code:title=The Consumer|borderStyle=solid}
package test;
import java.io.BufferedOutputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.NamingException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* Test class: test comsumer for ActiveMQInputStream.
*/
public class AMQStreamConsumer {
private static final Logger logger =
Logger.getLogger(AMQStreamConsumer.class.getName());
public static final String outFilepath = "MyTestFile";
/**
* @param args the command line arguments
*/
public static void main(String[] args) throws IOException {
try {
readInputStream("AMQStreamTest");
System.exit(0);
} catch (JMSException ex) {
Logger.getLogger(AMQStreamConsumer.class.getName()).log(Level.SEVERE, null, ex);
} catch (NamingException ex) {
Logger.getLogger(AMQStreamConsumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
/**
* Reads the inputStream from the given queue.
*
* @param aStreamQueueName
* @throws JMSException
* @throws NamingException
*/
private static void readInputStream(String aStreamQueueName) throws
JMSException, NamingException, FileNotFoundException, IOException {
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("guest", "guest", "tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE); // CLIENT_ACKNOWLEDGE makes no difference
String exclusiveQueueName = aStreamQueueName; // +
"?consumer.exclusive=true"; // use the queue exclusively makes no difference
byte[] buf = new byte[1024];
OutputStream fileOutputStream = null;
InputStream queueInputStream = null;
try {
fileOutputStream = new BufferedOutputStream(new
FileOutputStream(outFilepath));
Queue destination =
session.createQueue(exclusiveQueueName);
queueInputStream =
((ActiveMQConnection)connection).createInputStream(destination);
long sumSize = 0;
int counter = 0;
for (int readCount = queueInputStream.read(buf);
readCount != -1; readCount = queueInputStream.read(buf)) {
sumSize += readCount;
counter++;
logger.log(Level.INFO, "read [{0}] from input
stream (bytes so far {1})", new Object[] {counter, sumSize});
fileOutputStream.write(buf, 0, readCount);
}
} finally {
if (fileOutputStream != null) {
fileOutputStream.close();
}
if (queueInputStream != null) {
queueInputStream.close();
}
}
session.close();
connection.close();
}
}
{code}
{code:title=The Producer|borderStyle=solid}
package test;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.naming.NamingException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* Test class: test producer for ActiveMQOutputStream.
*/
public class AMQStreamProducer {
private static final Logger logger =
Logger.getLogger(AMQStreamProducer.class.getName());
/**
* @param args the command lin