Cool, thanks. I won't be able to recreate a test today. But I have a suspicion that the message length is being lost. Will follow up.
And like Justin, I would expect this to work as well. John On Fri, Sep 2, 2016 at 12:48 PM Andy Redhead <andy.redh...@oneadvanced.com> wrote: > Hmm, something is eating my attachments, both classes inline below: > > > NativeProcessMsgReader> > > > > package com...c2java; > > import java.util.concurrent.ArrayBlockingQueue; > import java.util.concurrent.BlockingQueue; > import java.util.concurrent.ThreadPoolExecutor; > import java.util.concurrent.TimeUnit; > > import javax.annotation.PostConstruct; > import javax.annotation.PreDestroy; > > import org.apache.activemq.artemis.api.core.ActiveMQException; > import org.apache.activemq.artemis.api.core.TransportConfiguration; > import org.apache.activemq.artemis.api.core.client.ActiveMQClient; > import org.apache.activemq.artemis.api.core.client.ClientConsumer; > import org.apache.activemq.artemis.api.core.client.ClientMessage; > import org.apache.activemq.artemis.api.core.client.ClientSession; > import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; > import org.apache.activemq.artemis.api.core.client.ServerLocator; > import > org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > import com...NativeProcessCmd; > > /** > * Read messages from Artemis using server API. > * > * Uses a single thread (with a persistent ClientSession) to read from the > shared queue used by all > * native processes to send outbound messages. > * > * Messages are handed on to a fixed sized executor for processing and > sending results to browser. > * > * @author andyredhead > * > */ > public class NativeProcessMsgReader extends Thread { > > final static Logger logger = > LoggerFactory.getLogger(NativeProcessMsgReader.class); > > int workerThreadPoolSize = 1; > int executorQueueLength = 500; > > String incomingMsgFromNativeProcQueueName; > > BlockingQueue<Runnable> nativeProcCmdQueue; > ThreadPoolExecutor nativeProcCmdExec; > > ClientSessionFactory factory; > ClientSession session; > ClientConsumer consumer; > > boolean shutdown = false; > > public NativeProcessMsgReader(int workerThreadPoolSize, int > executorQueueLength, > String incomingMsgFromNativeProcQueueName) { > this.workerThreadPoolSize = workerThreadPoolSize; > this.incomingMsgFromNativeProcQueueName = > incomingMsgFromNativeProcQueueName; > } > > @PostConstruct > public void startUp() throws Exception { > logger.info("startUp - start, thread pool size: {}, queue length: {}, > queue name: {}", > workerThreadPoolSize, workerThreadPoolSize, > incomingMsgFromNativeProcQueueName); > > try { > > // processing work taken from queue > nativeProcCmdQueue = new > ArrayBlockingQueue<Runnable>(executorQueueLength); > nativeProcCmdExec = new ThreadPoolExecutor(workerThreadPoolSize, > workerThreadPoolSize, 5, > TimeUnit.MINUTES, nativeProcCmdQueue); > > > > this.start(); > > } catch (Exception asyncQueueSetupErr) { > logger.error("ctor - problem setting up queue consumer: " + > asyncQueueSetupErr.getMessage(), > asyncQueueSetupErr); > throw asyncQueueSetupErr; > } > > logger.info("startUp - complete."); > } > > @PreDestroy > public void shutDown() { > > logger.info("shutDown - commencing shut down"); > > // stop new work coming in > shutdown = true; > > // close down the native prod cmd executor > logger.info("shutDown - requesting executor shuts down..."); > nativeProcCmdExec.shutdownNow(); > logger.info("shutDown - requested executor shuts down."); > } > > @Override > public void run() { > > // reading from queue > ServerLocator locator = ActiveMQClient.createServerLocatorWithoutHA( > new TransportConfiguration(InVMConnectorFactory.class.getName())); > > try { > factory = locator.createSessionFactory(); > session = factory.createSession(); > consumer = > session.createConsumer(incomingMsgFromNativeProcQueueName); > session.start(); > } catch (Exception e) { > logger.warn("run - problem setting up native proc to java queue > consumer: {}", e.getMessage(), > e); > } > > while (!shutdown) { > try { > > logger.debug("run - in while loop, waiting for message"); > > /* break every half second to check shutdown status */ > ClientMessage msgReceived = consumer.receive(500); > > if (msgReceived != null) { > > logger.debug("run - received non-null message"); > logger.debug("run - non-null message, body length: {}", > msgReceived.getBodySize()); > > String receivedMsg = msgReceived.getBodyBuffer().readString(); > > logger.debug("run - message text: {}", receivedMsg); > > if ((receivedMsg != null) && (receivedMsg.length() > 0)) { > > logger.debug("run - in while loop, got message"); > > NativeProcessCmd nativeProcCmd = new > NativeProcessCmd(receivedMsg); > nativeProcCmdExec.submit(nativeProcCmd); > logger.debug("run - in while loop, cmd pushed to executor"); > > } else { > logger.debug("run - null or zero length message read from > native process cmd queue"); > } > } else { > logger.debug("run - no message during last read period"); > } > > } catch (ActiveMQException e) { > logger.warn( > "run - problem reading message from native process command > queue: " + e.getMessage(), > e); > } > > } // end not shutdown while loop > > > // been told to shut down so close Artemis client objects > > try { > > consumer.close(); > session.stop(); > factory.close(); > > logger.info("run - Artemis client objects shut down."); > > } catch (Exception e) { > logger.warn("run - problem closing Artemis client objects during > shutdown: " + e.getMessage(), > e); > } > > } > > } > > > NumberGuessMain> > > package com...numberguess; > > import java.io.IOException; > import java.util.HashMap; > > import org.apache.qpid.proton.amqp.messaging.AmqpValue; > import org.apache.qpid.proton.message.Message; > import org.apache.qpid.proton.messenger.Messenger; > > import com.google.gson.Gson; > > public class NumberGuessMain { > > private int answer = -1; > > public static final int EXIT_CODE_WRONG_CMD_LINE_ARGS = 1000; > > public static final int EXIT_CODE_BROWSER_CONTROL_QUEUE_SETUP_ERR = 1001; > > public static final int EXIT_CODE_BROWSER_RESPONSE_QUEUE_SETUP_ERR = > 1002; > > private String taskInstanceId; > > private String browserControlQueueUrl; > > private String browserResponseQueueUrl; > > Messenger sendBrowserControlMessenger; > > Messenger receiveBrowserResponseMessenger; > > Gson gson = new Gson(); > > > public static void main(String[] args) { > if ((args != null) && (args.length > 2)) { > NumberGuessMain me = new NumberGuessMain(args[0], args[1], args[2]); > me.run(); > me.shutdown(); > } else { > > System.err.println("task instance id, browser control and response > queues not specified" > + " (expects taskInstanceId browserControlQueueAmqpUrl > browserResponseQueueAmqpUrl)"); > System.exit(EXIT_CODE_WRONG_CMD_LINE_ARGS); > } > > > > } > > public NumberGuessMain(String taskInstanceId, String > browserControlQueueUrl, > String browserResponseQueueUrl) { > > this.taskInstanceId = taskInstanceId; > this.browserControlQueueUrl = browserControlQueueUrl; > this.browserResponseQueueUrl = browserResponseQueueUrl; > > System.out.println("Task Instance ID: " + this.taskInstanceId); > > // browser control queue > > System.out.println("Browser Control Queue: " + > this.browserControlQueueUrl); > > sendBrowserControlMessenger = new Messenger.Factory().create(); > try { > sendBrowserControlMessenger.start(); > } catch (IOException e) { > System.err.println("Problem starting browser control AMQP messenger: > " + e.getMessage()); > e.printStackTrace(); > System.exit(EXIT_CODE_BROWSER_CONTROL_QUEUE_SETUP_ERR); > } > > > // browser response queue > > System.out.println("Browser Response Queue: " + > this.browserResponseQueueUrl); > > receiveBrowserResponseMessenger = new Messenger.Factory().create(); > try { > > receiveBrowserResponseMessenger.start(); > receiveBrowserResponseMessenger.subscribe(browserResponseQueueUrl); > > } catch (IOException e) { > System.err.println("Problem starting receive from browser AMQP > messenger: " + e.getMessage()); > e.printStackTrace(); > System.exit(EXIT_CODE_BROWSER_RESPONSE_QUEUE_SETUP_ERR); > } > > } > > protected void run() { > > System.out.println("Hello from numberguess"); > > OutboundNativeProcessMessage showFirstPanelMsg = > new OutboundNativeProcessMessage(taskInstanceId, > "ShowTitlePanel", null); > > send(showFirstPanelMsg); > } > > protected void send(OutboundNativeProcessMessage outMsg) { > Message msg = new Message.Factory().create(); > msg.setAddress(browserControlQueueUrl); > String msgJson = gson.toJson(outMsg); > msg.setBody(new AmqpValue(msgJson)); > sendBrowserControlMessenger.put(msg); > sendBrowserControlMessenger.send(); > System.out.println("Sent browser control msg: " + msgJson); > } > > protected HashMap sendReceive(OutboundNativeProcessMessage outMsg) { > HashMap receivedData = null; > > send(outMsg); > > System.out.println("Waiting for response message..."); > receiveBrowserResponseMessenger.recv(); > Message msg = receiveBrowserResponseMessenger.get(); > System.out.println("Got response message."); > > // @ToDo process the message here... > > return receivedData; > } > > protected void shutdown() { > if (sendBrowserControlMessenger != null) { > sendBrowserControlMessenger.stop(); > } > if (receiveBrowserResponseMessenger != null) { > try { > // receiveBrowserResponseMessenger. > receiveBrowserResponseMessenger.stop(); > } catch (Exception e) { > System.err.println("Problem closing browser response queue > messenger: " + e.getMessage()); > e.printStackTrace(); > } > } > } > > } > > -----Original Message----- > From: Andy Redhead [mailto:andy.redh...@oneadvanced.com] > Sent: 02 September 2016 17:45 > To: users@activemq.apache.org > Subject: RE: Artemis Core consumer hangs when reading message sent by AMQP > producer > > Hi, > > Sorry, I could have sworn I attached the code in my original email - > trying again... > > Cheers, Andy > > -----Original Message----- > From: John D. Ament [mailto:johndam...@apache.org] > Sent: 02 September 2016 17:42 > To: users@activemq.apache.org > Subject: Re: Artemis Core consumer hangs when reading message sent by AMQP > producer > > Andy, > > Are you able to provide your client code? NativeProcessMsgReader isn't > Artemis's so its not clear what else you're doing to connect, subscribe, > etc. > > John > > On Fri, Sep 2, 2016 at 12:38 PM Andy Redhead <andy.redh...@oneadvanced.com > > > wrote: > > > Hi, > > > > > > > > Adding to the information below, if I change the junit test case that > > tries sending a message through artemis into “NativeProcessMsgReader” > > to use AMQP (rather than the core api), I get the stack trace: > > > > > > > > Exception in thread "Thread-1" *java.lang.IndexOutOfBoundsException*: > > readerIndex(25) + length(276496418) exceeds writerIndex(294): > > DuplicatedByteBuf(ridx: 25, widx: 294, cap: 512, unwrapped: > > UnpooledHeapByteBuf(ridx: 17, widx: 294, cap: 512)) > > > > at io.netty.buffer.AbstractByteBuf.checkReadableBytes( > > *AbstractByteBuf.java:1165*) > > > > at io.netty.buffer.AbstractByteBuf.readBytes( > > *AbstractByteBuf.java:675*) > > > > at io.netty.buffer.AbstractByteBuf.readBytes( > > *AbstractByteBuf.java:683*) > > > > at io.netty.buffer.WrappedByteBuf.readBytes( > > *WrappedByteBuf.java:511*) > > > > at > > org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.rea > > dSimpleStringInternal( > > *ChannelBufferWrapper.java:90*) > > > > at > > org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.rea > > dStringInternal( > > *ChannelBufferWrapper.java:113*) > > > > at > > org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.rea > > dString( > > *ChannelBufferWrapper.java:96*) > > > > at > > com.advanced365.transoft.renault.poc.decforms.asyncmsg.c2java.NativePr > > ocessMsgReader.run( > > *NativeProcessMsgReader.java:132*) > > > > > > > > It is interesting to note that “NativeProcessMsgReader.java:132” links > > to this line: > > > > > > > > String receivedMsg = > > msgReceived.getBodyBuffer().readString(); > > > > > > > > Which is the line I thought was causing trouble when the code ran in > > Tomcat… > > > > > > > > > > > > Next, I modified “NativeProcessMsgReader” to use Qpid AMQP client to > > read messages from Artemis (with the test case still sending messages > > using > > AMQP) – this worked perfectly. > > > > > > > > So it seems that: > > > > · if the producer and consumer use the same protocol, messages flow > > through ok. > > > > · If the producer users AMQP API and the consumer uses Artemis Core > > API then there is a problem > > > > > > > > For now I’ll stick with using AMQP at both ends. > > > > > > > > I’m still curious to know if it’s reasonable to expect using an AMQP > > producer and an Artemis Core consumer to work? > > > > > > > > Cheers, Andy > > > > > > > > *From:* Andy Redhead [mailto:andy.redh...@oneadvanced.com] > > *Sent:* 01 September 2016 23:03 > > *To:* users@activemq.apache.org > > *Subject:* Artemis Core consumer hangs when reading message sent by > > AMQP producer > > > > > > > > Hi, > > > > > > > > I’m running Artemis 1.3.0 embedded inside a spring app running in Tomcat. > > > > > > > > I have a remote message producer (NumberGuessMain.java) that uses the > > Apache Qpid Proton library to push messages onto an Artemis queue with > > the > > URL: > > > > > > > > amqp://localhost:5672/native-2-java > > > > > > > > The messages created by the producer are JSON strings. > > > > > > > > I have a single threaded, singleton consumer that uses the native > > Artemis API (NativeProcessMsgReader.java) running inside the same web > > app as Artemis, reading from the queue: > > > > > > > > native-2-java > > > > > > > > While there are no messages to read, the consumer happily loops > > through the while loop in the “run” method. > > > > > > > > As soon as the consumer tries to read the body of the first message, > > it hangs on the line: > > > > > > > > String receivedMsg = > > msgReceived.getBodyBuffer().readString(); > > > > > > > > The last lines in the log file are: > > > > > > > > 2016-09-01T22:27:26,270 15137 [Thread-6] DEBUG > > c.a.t.r.p.d.a.c.NativeProcessMsgReader - run - no message during last > > read period > > > > 2016-09-01T22:27:26,270 15137 [Thread-6] DEBUG > > c.a.t.r.p.d.a.c.NativeProcessMsgReader - run - in while loop, waiting > > for message > > > > 2016-09-01T22:27:26,772 15639 [Thread-6] DEBUG > > c.a.t.r.p.d.a.c.NativeProcessMsgReader - run - no message during last > > read period > > > > 2016-09-01T22:27:26,772 15639 [Thread-6] DEBUG > > c.a.t.r.p.d.a.c.NativeProcessMsgReader - run - in while loop, waiting > > for message > > > > 2016-09-01T22:27:26,912 15779 [Thread-6] DEBUG > > c.a.t.r.p.d.a.c.NativeProcessMsgReader - run - received non-null > > message > > > > 2016-09-01T22:27:26,912 15779 [Thread-6] DEBUG > > c.a.t.r.p.d.a.c.NativeProcessMsgReader - run - non-null message, body > > length: 307 > > > > > > > > To me this looks like the message has reached Artemis and is made > > available to the consumer but something goes wrong when the consumer > > tries to access the body of the message. > > > > > > > > I’m new to Artemis and AMQP so it’s quite possible I’m doing something > > stupid… > > > > > > > > Is the basic assumption that it’s ok to send a message using AMQP and > > receive it using Artemis core API valid? > > > > > > > > Is there something obvious in the code that is causing this problem? > > > > > > > > Any pointers gratefully received. > > > > > > > > Cheers, Andy > > > > > > > > [image: cid:image012.png@01D17AF7.D972DF70] > > <http://www.oneadvanced.com/> > > > > *Andy Redhead* > > Principal Consultant > Solutions > Advanced > > *________________________* > > > > *Advanced* > > 230 City Road, London, EC1V 2TT > > t: 020 7880 8888 > m: 0781 392 5246 > > > > www.oneadvanced.com > > > > [image: cid:image018.png@01D17AF7.D972DF70] > > <http://www.linkedin.com/company/2426258>[image: > > cid:image019.png@01D17AF7.D972DF70] > > <https://twitter.com/Going_Advanced> > > > > *>* > > * A Sunday Times Top Track 250 Company 2015 **>** Ranked in UK's 50 > > fastest growing technology companies 2014* > > > > > > > > ***** Email confidentiality ***** > > > > This message is private and confidential. If you have received this > > message in error, please notify us and remove it from your system. The > > dissemination, copying or distribution of this message, or related > > files, by anyone other than the intended recipient is strictly > prohibited. > > > > > > > > Any views or opinions expressed are solely those of the author and do > > not necessarily represent those of Advanced 365 Limited. > > > > > > > > ***** Email monitoring ***** > > > > Advanced 365 Limited may monitor email traffic data and also the > > content of email for the purposes of security and staff training. > > > > > > > > ***** Email security ***** > > > > In keeping with good computing practice, the recipient of this email > > should ensure that it is virus-free. Advanced 365 Limited does not > > accept responsibility for any virus that may be transferred by way of > this email. > > > > > > > > Email may be susceptible to data corruption, interception and/or > > unauthorised amendment. Advanced 365 Limited does not accept liability > > for any such corruption, interception or amendment or any consequences > thereof. > > > > > > > > This email has been scanned for viruses by the Symantec Email > > Security.cloud service. > > > > > > > > Advanced 365 Limited, part of the Advanced Computer Software Group > > > > Registered office: Ditton Park, Riding Court Road, Datchet, Berkshire, > > SL3 9LL, UK > > > > Registered in England under number 2124540 > > > > > > ------------------------------ > > > > > > Please consider the environment: Think before you print! > > > > ***** Email confidentiality ***** > > > > This message is private and confidential. If you have received this > > message in error, please notify us and remove it from your system. The > > dissemination, copying or distribution of this message, or related > > files, by anyone other than the intended recipient is strictly > prohibited. > > > > > > > > Any views or opinions expressed are solely those of the author and do > > not necessarily represent those of Advanced 365 Limited. > > > > > > > > ***** Email monitoring ***** > > > > Advanced 365 Limited may monitor email traffic data and also the > > content of email for the purposes of security and staff training. > > > > > > > > ***** Email security ***** > > > > In keeping with good computing practice, the recipient of this email > > should ensure that it is virus-free. Advanced 365 Limited does not > > accept responsibility for any virus that may be transferred by way of > this email. > > > > > > > > Email may be susceptible to data corruption, interception and/or > > unauthorised amendment. Advanced 365 Limited does not accept liability > > for any such corruption, interception or amendment or any consequences > thereof. > > > > > > > > This email has been scanned for viruses by the Symantec Email > > Security.cloud service. > > > > > > > > Advanced 365 Limited, part of the Advanced Computer Software Group > > > > Registered office: Ditton Park, Riding Court Road, Datchet, Berkshire, > > SL3 9LL, UK > > > > Registered in England under number 2124540 > > > ***** Email confidentiality ***** > > This message is private and confidential. If you have received this > message in error, please notify us and remove it from your system. The > dissemination, copying or distribution of this message, or related files, > by anyone other than the intended recipient is strictly prohibited. > > > > Any views or opinions expressed are solely those of the author and do not > necessarily represent those of Advanced 365 Limited. > > > > ***** Email monitoring ***** > > Advanced 365 Limited may monitor email traffic data and also the content > of email for the purposes of security and staff training. > > > > ***** Email security ***** > > In keeping with good computing practice, the recipient of this email > should ensure that it is virus-free. Advanced 365 Limited does not accept > responsibility for any virus that may be transferred by way of this email. > > > > Email may be susceptible to data corruption, interception and/or > unauthorised amendment. Advanced 365 Limited does not accept liability for > any such corruption, interception or amendment or any consequences thereof. > > > > This email has been scanned for viruses by the Symantec Email > Security.cloud service. > > > > Advanced 365 Limited, part of the Advanced Computer Software Group > > Registered office: Ditton Park, Riding Court Road, Datchet, Berkshire, SL3 > 9LL, UK > > Registered in England under number 2124540 > ***** Email confidentiality ***** > > This message is private and confidential. If you have received this > message in error, please notify us and remove it from your system. The > dissemination, copying or distribution of this message, or related files, > by anyone other than the intended recipient is strictly prohibited. > > > > Any views or opinions expressed are solely those of the author and do not > necessarily represent those of Advanced 365 Limited. > > > > ***** Email monitoring ***** > > Advanced 365 Limited may monitor email traffic data and also the content > of email for the purposes of security and staff training. > > > > ***** Email security ***** > > In keeping with good computing practice, the recipient of this email > should ensure that it is virus-free. Advanced 365 Limited does not accept > responsibility for any virus that may be transferred by way of this email. > > > > Email may be susceptible to data corruption, interception and/or > unauthorised amendment. Advanced 365 Limited does not accept liability for > any such corruption, interception or amendment or any consequences thereof. > > > > This email has been scanned for viruses by the Symantec Email > Security.cloud service. > > > > Advanced 365 Limited, part of the Advanced Computer Software Group > > Registered office: Ditton Park, Riding Court Road, Datchet, Berkshire, SL3 > 9LL, UK > > Registered in England under number 2124540 >