[ https://issues.apache.org/jira/browse/AMQ-4802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13794176#comment-13794176 ]
Marc Leineweber commented on AMQ-4802: -------------------------------------- Hello Timothy, my test case consists of two classes - one producer and one comsumer. My real implementation reads input from a file, but even with writing 0-bytes to the OutputStream, the InputFile reading fails. I write 64 MB (64*1024*1024 bytes). See my source code below. Thanks in advance, Marc {code:title=The Consumer|borderStyle=solid} package test; import java.io.BufferedOutputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.logging.Level; import java.util.logging.Logger; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Queue; import javax.jms.Session; import javax.naming.NamingException; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * Test class: test comsumer for ActiveMQInputStream. */ public class AMQStreamConsumer { private static final Logger logger = Logger.getLogger(AMQStreamConsumer.class.getName()); public static final String outFilepath = "MyTestFile"; /** * @param args the command line arguments */ public static void main(String[] args) throws IOException { try { readInputStream("AMQStreamTest"); System.exit(0); } catch (JMSException ex) { Logger.getLogger(AMQStreamConsumer.class.getName()).log(Level.SEVERE, null, ex); } catch (NamingException ex) { Logger.getLogger(AMQStreamConsumer.class.getName()).log(Level.SEVERE, null, ex); } } /** * Reads the inputStream from the given queue. * * @param aStreamQueueName * @throws JMSException * @throws NamingException */ private static void readInputStream(String aStreamQueueName) throws JMSException, NamingException, FileNotFoundException, IOException { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("guest", "guest", "tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // CLIENT_ACKNOWLEDGE makes no difference String exclusiveQueueName = aStreamQueueName; // + "?consumer.exclusive=true"; // use the queue exclusively makes no difference byte[] buf = new byte[1024]; OutputStream fileOutputStream = null; InputStream queueInputStream = null; try { fileOutputStream = new BufferedOutputStream(new FileOutputStream(outFilepath)); Queue destination = session.createQueue(exclusiveQueueName); queueInputStream = ((ActiveMQConnection)connection).createInputStream(destination); long sumSize = 0; int counter = 0; for (int readCount = queueInputStream.read(buf); readCount != -1; readCount = queueInputStream.read(buf)) { sumSize += readCount; counter++; logger.log(Level.INFO, "read [{0}] from input stream (bytes so far {1})", new Object[] {counter, sumSize}); fileOutputStream.write(buf, 0, readCount); } } finally { if (fileOutputStream != null) { fileOutputStream.close(); } if (queueInputStream != null) { queueInputStream.close(); } } session.close(); connection.close(); } } {code} {code:title=The Producer|borderStyle=solid} package test; import java.io.IOException; import java.io.OutputStream; import java.util.HashMap; import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.naming.NamingException; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * Test class: test producer for ActiveMQOutputStream. */ public class AMQStreamProducer { private static final Logger logger = Logger.getLogger(AMQStreamProducer.class.getName()); /** * @param args the command line arguments */ public static void main(String[] args) throws IOException { try { sendOutputStream("AMQStreamTest"); logger.info("outputstream sent"); System.exit(0); } catch (JMSException ex) { Logger.getLogger(AMQStreamProducer.class.getName()).log(Level.SEVERE, null, ex); } catch (NamingException ex) { Logger.getLogger(AMQStreamProducer.class.getName()).log(Level.SEVERE, null, ex); } } /** * Sends an OutputStream with 0 bytes. */ protected static void sendOutputStream(String aQueueName) throws IOException, JMSException, NamingException { Connection connection; Session session = null; ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://localhost:61616"); connection = connectionFactory.createConnection(); session = connection.createSession(false, session.AUTO_ACKNOWLEDGE); Destination streamDestination = session.createQueue(aQueueName); Map messageProperties = new HashMap<String, String>(); messageProperties.put("myProperty", "TEST"); OutputStream out = ((ActiveMQConnection)connection).createOutputStream(streamDestination, messageProperties, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); for(int i=0; i< 4*1024*1024; i++) { out.write(0); } out.close(); session.close(); connection.close(); } } {code} > ActiveMQInputStream does not work for big content > ------------------------------------------------- > > Key: AMQ-4802 > URL: https://issues.apache.org/jira/browse/AMQ-4802 > Project: ActiveMQ > Issue Type: Bug > Components: JMS client > Affects Versions: 5.8.0 > Environment: Mac OS 10.6.8 > Reporter: Marc Leineweber > > Sending a file with size 4.6 MB with ActiveMQOutputStream works and results > in 71 messages on the ActiveMQ broker. > Trying to receive this file / these messages using an ActiveMQInputStream > fails - after a while (720896 bytes received so far) inputStream.read(buffer) > never answers (it waits in ...). If inputStream is created with a timeout, I > get a ActiveMQInputStream$ReadTimeoutEcxeption. -- This message was sent by Atlassian JIRA (v6.1#6144)