Greetings! I have recently started enumerating messages in a queue from time to time, because we’re trying to figure out if a unit of work is still pending. But I’m getting this error occasionally:
2023-07-27T21:38:30.007 [CanvasState-Abandoned] RpdmQueueUtils.browseStartMessages:159 [] ERROR - Error enumerating messages for pod {"message_type":"pod_specifier","pod_type":"project_execution","memory_mb":2048,"jvm_memory_mb":250} java.lang.RuntimeException: AMQ219023: The large message lost connection with its session, either because of a rollback or a closed session at org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl.getBodyBuffer(ClientLargeMessageImpl.java:91) at org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage.readBytes(ActiveMQBytesMessage.java:217) at net.redpoint.rpdm.services.RpdmQueueUtils.browseStartMessages(RpdmQueueUtils.java:155) at net.redpoint.rpdm.services.RpdmQueueUtils.getAllQueuedCanvasIds(RpdmQueueUtils.java:76) at net.redpoint.rpdm.canvas_state_query.CanvasStateQueryServerImpl.doCanvasAbandoned(CanvasStateQueryServerImpl.java:282) at net.redpoint.rpdm.canvas_state_query.CanvasStateQueryServerImpl$Abandoned.action(CanvasStateQueryServerImpl.java:265) at net.redpoint.rpdm.services.LazyStartPeriodicThread.run(LazyStartPeriodicThread.java:91) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: ActiveMQIllegalStateException[errorType=ILLEGAL_STATE message=AMQ219023: The large message lost connection with its session, either because of a rollback or a closed session] at org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl.saveBuffer(LargeMessageControllerImpl.java:265) at org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl.checkBuffer(ClientLargeMessageImpl.java:157) at org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl.getBodyBuffer(ClientLargeMessageImpl.java:89) This happens in two parts using JMS. The first enumerates the queue using browseMessages() to collect a List<Message>, and the second processes the Messages, reading their bodies and examining them. The problem occurs on this line in browseStartMessages when we read the body bytes bm.readBytes(requestBytes); Despite the error text, the session isn’t closed, or at least this error doesn’t always happen despite my not doing anything to re-create the session. I suspect that, given the ClientLargeMessageImpl.getBodyBuffer on the stack, something is going wrong that is specific to “large” messages. IIRC the large messages use a coat-check pattern to store the message on disk and bypass the in-memory queue. Can you suggest a solution? I’ve though of two potential changes: - Raise minLargeMessageSize to something big enough that our messages are never “large” - Read the body bytes immediately, while enumerating the queue, instead of later when looping over the List<Message> I was also trying to see if there is a way to attach “metadata” to a message and only read that? These messages are large, and presumably it is expensive to scan them, and I’m really only looking for a UUID embedded in all of that. JDK: Temurin 17 AMQ broker version: 2.28.0 AMQ JMS client dependency: <dependency> <groupId>org.apache.activemq</groupId> <artifactId>artemis-jms-client-all</artifactId> <version>2.28.0</version> </dependency> Thanks John public List<Message> browseMessages(String queueName) { // This helps avoid a known issue with queue message enumeration (in ActiveMQ "classic"), where after getting the final message // it hangs for a significant time, up to 20 seconds, before the Enumeration returns false. int count = getQueueEntryCount(queueName); List<Message> result = new ArrayList<>(); synchronized (lock) { try { Queue queue = getSession().createQueue(queueName); try (QueueBrowser browser = getSession().createBrowser(queue)) { Enumeration e = browser.getEnumeration(); while (count-- > 0 && e.hasMoreElements()) { try { result.add((Message) e.nextElement()); } catch (Exception ex) { LOG.warn("Error browsing queue '" + queueName + "'", ex); } } } } catch (Exception ex) { LOG.warn("Error browsing queue '" + queueName + "'", ex); } } return result; } The second processes the Messages: public List<RpcRequestPacket> browseStartMessages(PodSpecifier podSpecifier) { List<RpcRequestPacket> result = new ArrayList<>(); for (var message : browseMessages(getStartQueueName(podSpecifier))) { var bm = (BytesMessage)message; try { var requestBytes = new byte[(int) bm.getBodyLength()]; bm.readBytes(requestBytes); result.add(new RpcRequestPacket(requestBytes, new JmsRpcMessageMetadata(message.getJMSMessageID(), ""))); } catch (Exception ex) { LOG.error("Error enumerating messages for pod " + podSpecifier, ex); } } return result; } [rg] <https://www.redpointglobal.com/> John Lilley Data Management Chief Architect, Redpoint Global Inc. 888 Worcester Street, Suite 200 Wellesley, MA 02482 M: +1 7209385761<tel:+1%207209385761> | john.lil...@redpointglobal.com<mailto:john.lil...@redpointglobal.com> PLEASE NOTE: This e-mail from Redpoint Global Inc. (“Redpoint”) is confidential and is intended solely for the use of the individual(s) to whom it is addressed. If you believe you received this e-mail in error, please notify the sender immediately, delete the e-mail from your computer and do not copy, print or disclose it to anyone else. If you properly received this e-mail as a customer, partner or vendor of Redpoint, you should maintain its contents in confidence subject to the terms and conditions of your agreement(s) with Redpoint.