[ 
https://issues.apache.org/jira/browse/AMQ-4802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13794176#comment-13794176
 ] 

Marc Leineweber commented on AMQ-4802:
--------------------------------------

Hello Timothy,

my test case consists of two classes - one producer and one comsumer. My real 
implementation reads input from a file, but even with writing 0-bytes to the 
OutputStream, the InputFile reading fails. I write 64 MB (64*1024*1024 bytes). 
See my source code below.

Thanks in advance,
Marc

{code:title=The Consumer|borderStyle=solid}
package test;

import java.io.BufferedOutputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.NamingException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * Test class: test comsumer for ActiveMQInputStream.
 */
public class AMQStreamConsumer {

    private static final Logger logger = 
Logger.getLogger(AMQStreamConsumer.class.getName());
    public static final String outFilepath = "MyTestFile";
                
    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) throws IOException {
        try {
                        readInputStream("AMQStreamTest");
            System.exit(0);
        } catch (JMSException ex) {
            
Logger.getLogger(AMQStreamConsumer.class.getName()).log(Level.SEVERE, null, ex);
        } catch (NamingException ex) {
            
Logger.getLogger(AMQStreamConsumer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

        
        /**
         * Reads the inputStream from the given queue.
         * 
         * @param aStreamQueueName 
         * @throws JMSException
         * @throws NamingException 
         */
    private static void readInputStream(String aStreamQueueName) throws 
JMSException, NamingException, FileNotFoundException, IOException {
        ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory("guest", "guest", "tcp://localhost:61616");
                Connection connection = connectionFactory.createConnection();
                connection.start();
                Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE); // CLIENT_ACKNOWLEDGE makes no difference
                String exclusiveQueueName = aStreamQueueName; //  + 
"?consumer.exclusive=true"; // use the queue exclusively makes no difference
                byte[] buf = new byte[1024];
                OutputStream fileOutputStream = null;
                InputStream queueInputStream = null;
                try {
                        fileOutputStream = new BufferedOutputStream(new 
FileOutputStream(outFilepath));
                        Queue destination = 
session.createQueue(exclusiveQueueName);
                        queueInputStream = 
((ActiveMQConnection)connection).createInputStream(destination); 
                        long sumSize = 0;
                        int counter = 0;
                        for (int readCount = queueInputStream.read(buf); 
readCount != -1; readCount = queueInputStream.read(buf)) {
                                sumSize += readCount;
                                counter++;
                                logger.log(Level.INFO, "read [{0}] from input 
stream (bytes so far {1})", new Object[] {counter, sumSize});
                                fileOutputStream.write(buf, 0, readCount);
                        }
                } finally {
                        if (fileOutputStream != null) {
                                fileOutputStream.close();
                        }
                        if (queueInputStream != null) {
                                queueInputStream.close();
                        }
                }
                session.close();
                connection.close();
    }
}
{code}
{code:title=The Producer|borderStyle=solid}

package test;

import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.naming.NamingException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * Test class: test producer for ActiveMQOutputStream.
 */
public class AMQStreamProducer {

    private static final Logger logger = 
Logger.getLogger(AMQStreamProducer.class.getName());
                
    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) throws IOException {

        try {
            sendOutputStream("AMQStreamTest");
            logger.info("outputstream sent");
            System.exit(0);
        } catch (JMSException ex) {
            
Logger.getLogger(AMQStreamProducer.class.getName()).log(Level.SEVERE, null, ex);
        } catch (NamingException ex) {
            
Logger.getLogger(AMQStreamProducer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

        /**
         * Sends an OutputStream with 0 bytes.
         */
    protected static void sendOutputStream(String aQueueName) throws 
IOException, JMSException, NamingException {
                Connection connection;
                Session session = null;
                ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory("admin", "admin", "tcp://localhost:61616");
                connection = connectionFactory.createConnection();
                session = connection.createSession(false, 
session.AUTO_ACKNOWLEDGE);
                Destination streamDestination = session.createQueue(aQueueName);
                Map messageProperties = new HashMap<String, String>();
                messageProperties.put("myProperty", "TEST");
                OutputStream out = 
((ActiveMQConnection)connection).createOutputStream(streamDestination, 
messageProperties, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, 
Message.DEFAULT_TIME_TO_LIVE);
                for(int i=0; i< 4*1024*1024; i++) {
                                out.write(0);
                }
                out.close();
                session.close();
                connection.close();
    }
}
{code}

> ActiveMQInputStream does not work for big content
> -------------------------------------------------
>
>                 Key: AMQ-4802
>                 URL: https://issues.apache.org/jira/browse/AMQ-4802
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: JMS client
>    Affects Versions: 5.8.0
>         Environment: Mac OS 10.6.8
>            Reporter: Marc Leineweber
>
> Sending a file with size 4.6 MB with ActiveMQOutputStream works and results 
> in 71 messages on the ActiveMQ broker.
> Trying to receive this file / these messages using an ActiveMQInputStream 
> fails - after a while (720896 bytes received so far) inputStream.read(buffer) 
> never answers (it waits in ...). If inputStream is created with a timeout, I 
> get a ActiveMQInputStream$ReadTimeoutEcxeption. 



--
This message was sent by Atlassian JIRA
(v6.1#6144)

Reply via email to