[ 
https://issues.apache.org/jira/browse/AMQ-7229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17319895#comment-17319895
 ] 

JJ commented on AMQ-7229:
-------------------------

Hi, I've been talking to another person who's a customer of the same system and 
confirms the issue that I see and we have some thoughts as to what happens 
(though it a little complex)

Upstream the ultimate producer creates 1 message per event and pushes it out as 
XML. The system we then connect too (and maybe based on an old AMQ 5.8.0 
install though they won't confirm this to us directly) takes each received 
message and batches them into blocks of 32 messages or 5 seconds whichever is 
sooner and pushes the message out as JSON to the final consumers.

What we think appears to happen is that somewhere the ultimate producer is 
sending a blank or corrupt message that's being batched by the middle man, that 
batch of messages then gets sent and triggers that error.

A test case is well beyond me but I have reached out for some assistance on 
that front but there's no guarantees. Hope that gives a bit more insight. 
(sorry didn't see the alert for your update)

> JmsConnector handling loss of connection [ActiveMQConnection 
> {id=ID:xs-38677-1560088234347-2:100,clientId=ID:xs.test.int-38677-1560088234347-1:100,started=true}]
>  | org.apache.activemq.network.jms.JmsConnector | ActiveMQ Session Task-8514
> ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-7229
>                 URL: https://issues.apache.org/jira/browse/AMQ-7229
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, Camel, Connector, JMS client
>    Affects Versions: 5.15.8, 5.15.9, 5.15.10, 5.15.12
>         Environment: CentOS 6 & 7
> Libs loaded:
> activemq-broker-5.15.12.jar
>  activemq-client-5.15.12.jar
>  activemq-console-5.15.12.jar
>  activemq-jaas-5.15.12.jar
>  activemq-kahadb-store-5.15.12.jar
>  activemq-openwire-legacy-5.15.12.jar
>  activemq-protobuf-1.1.jar
>  activemq-spring-5.15.12.ja
>  activemq-web-5.15.12.jar
>  geronimo-j2ee-management_1.1_spec-1.0.1.jar
>  geronimo-jms_1.1_spec-1.1.1.jar
>  geronimo-jta_1.1_spec-1.1.1.jar
>  hawtbuf-1.11.jar
>  jcl-over-slf4j-1.7.25.jar
>  slf4j-api-1.7.25.jar
> asm-1.0.2.jar
> json-array-splitter-1.0.1.jar
> json-path-2.4.0.jar
> json-simple-1.1.1.jar
> json-smart-2.3.jar
>  
> Camel Libs:
> activemq-camel-5.15.12.jar
>  camel-core-2.25.0.jar
>  camel-jms-2.25.0.jar
>  camel-jsonpath-2.25.0.jar
>  camel-spring-2.25.0.jar
>  camel-stream-2.25.0.jar
>            Reporter: JJ
>            Assignee: Jean-Baptiste Onofré
>            Priority: Blocker
>              Labels: beginner, data-loss, error, newbie
>             Fix For: 5.16.2, 5.15.16
>
>
> Periodically I am getting the following logged to my production and dev AMQ 
> instances;
> {code:java}
> JmsConnector handling loss of connection [ActiveMQConnection
> {id=ID:server-38677-1560088234347-2:100,clientId=ID:xs.test.int-38677-1560088234347-1:100,started=true}]|
> org.apache.activemq.network.jms.JmsConnector| ActiveMQ Session Task-8514
> {code}
> This come at random times from two separate servers on separate networks 
> (connecting to a common upstream provider) These errors ALWAYS come in groups 
> of 11, with the time spanning 11 sec - both servers see the same error at the 
> same time (Indicating something at the far end maybe?)
> When I check the connection count in a JMX console it has increased by 11, 
> with none of the old connections seeming to close or get cleaned up.
> System config:
> JMS-JMS Bridge with embedded broker bridging topics from a remote server to 
> local server. Local clients connect with nio+stomp and all have persistent 
> subscriptions to topics.
> The remote connection is openwire.
> When this error occurs it leads to a corrupted message which then causes the 
> DB store to grow until an OOM error occurs and AMQ stops processing. The only 
> way to resolve the issue is to stop all the connected clients and clear there 
> subscriptions and then reconnect.
> I have done wireshark on a dev broker but can't see anything *obvious* though 
> wireshark doesn't like to dissect openwire so I can't see what's actually 
> going on.
> This error sometimes occurs days apart, or like today multiple times a day.
> The only reference I see in the [source code is line 
> 496|https://github.com/apache/activemq/blob/master/activemq-broker/src/main/java/org/apache/activemq/network/jms/JmsConnector.java]
>  
> The error logged in debug is:
> {code:java}
> 2019-04-02 19:29:20,360 | TRACE | ID:xs-39837-1553964750839-4:140:2 sending 
> message: ActiveMQTextMessage {commandId = 3319310, responseRequired = false, 
> mes
>  sageId = ID:xs-39837-1553964750839-4:140:2:1:1, originalDestination = null, 
> originalTransactionId = null, producerId = 
> ID:xs-39837-1553964750839-4:140:2:1,
>  destination = topic://TD_ALL, transactionId = null, expiration = 0, 
> timestamp = 1554229760360, arrival = 0, brokerInTime = 1554229755115, 
> brokerOutTime = 15
>  54229755118, correlationId = null, replyTo = null, persistent = true, type = 
> null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = 
> null,
>   compressed = true, userID = null, content = 
> org.apache.activemq.util.ByteSequence@2a02b7e9, marshalledProperties = 
> org.apache.activemq.util.ByteSequence@7d
>  db0ca1, dataStructure = null, redeliveryCounter = 0, size = 0, properties = 
> null, readOnlyProperties = true, readOnlyBody = true, droppable = false, 
> jmsXGroupFirstForConsumer = false, text = 
> \\\{"SF_MSG":{"time":"1554229754000","area_id":...r":"2W58"}}} | 
> org.apache.activemq.ActiveMQSession | ActiveMQ Session Task-4447
>  2019-04-02 19:29:20,360 | TRACE | Running task iteration 0 - 
> vm://xs.test.int#279 | org.apache.activemq.thread.PooledTaskRunner | ActiveMQ 
> VMTransport: vm://xs.test.int#279-1
>  2019-04-02 19:29:20,400 | DEBUG | Error occured while processing sync 
> command: ActiveMQTextMessage {commandId = 7, responseRequired = true, 
> messageId = ID:xs-39837-1553964750839-4:140:2:1:1, originalDestination = 
> null, originalTransactionId = null, producerId = 
> ID:xs-39837-1553964750839-4:140:2:1, destination = topic://TD_ALL, 
> transactionId = null, expiration = 0, timestamp = 1554229760360, arrival = 0, 
> brokerInTime = 1554229760361, brokerOutTime = 1554229755118, correlationId = 
> null, replyTo = null, persistent = true, type = null, priority = 4, groupID = 
> null, groupSequence = 0, targetConsumerId = null, compressed = true, userID = 
> null, content = org.apache.activemq.util.ByteSequence@2a02b7e9, 
> marshalledProperties = org.apache.activemq.util.ByteSequence@7ddb0ca1, 
> dataStructure = null, redeliveryCounter = 0, size = 1595, properties = null, 
> readOnlyProperties = true, readOnlyBody = true, droppable = false, 
> jmsXGroupFirstForConsumer = false, text = 
> \\{"SF_MSG":{"time":"1554229754000","area_id":...r":"2W58"}}}, exception: 
> java.io.EOFException | org.apache.activemq.broker.TransportConnection.Service 
> | ActiveMQ VMTransport: vm://xs.test.int#279-1
>  java.io.EOFException
>          at 
> java.io.DataInputStream.readUnsignedShort(DataInputStream.java:340)[:1.8.0_201]
>          at 
> java.io.DataInputStream.readUTF(DataInputStream.java:589)[:1.8.0_201]
>          at 
> java.io.DataInputStream.readUTF(DataInputStream.java:564)[:1.8.0_201]
>          at 
> org.apache.activemq.util.MarshallingSupport.unmarshalPrimitiveMap(MarshallingSupport.java:97)[activemq-client-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.util.MarshallingSupport.unmarshalPrimitiveMap(MarshallingSupport.java:78)[activemq-client-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.util.MarshallingSupport.unmarshalPrimitiveMap(MarshallingSupport.java:70)[activemq-client-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.command.Message.unmarsallProperties(Message.java:252)[activemq-client-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.command.Message.getProperty(Message.java:202)[activemq-client-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy.add(RetainedMessageSubscriptionRecoveryPolicy.java:53)[activemq-broker-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.broker.region.Topic.dispatch(Topic.java:756)[activemq-broker-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.broker.region.Topic.doMessageSend(Topic.java:556)[activemq-broker-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.broker.region.Topic.send(Topic.java:484)[activemq-broker-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.broker.region.DestinationFilter.send(DestinationFilter.java:138)[activemq-broker-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:508)[activemq-broker-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:459)[activemq-broker-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.broker.jmx.ManagedRegionBroker.send(ManagedRegionBroker.java:293)[activemq-broker-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)[activemq-broker-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)[activemq-broker-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:293)[activemq-broker-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)[activemq-broker-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)[activemq-broker-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.security.AuthorizationBroker.send(AuthorizationBroker.java:226)[activemq-broker-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)[activemq-broker-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:578)[activemq-broker-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:768)[activemq-client-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:336)[activemq-broker-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:200)[activemq-broker-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)[activemq-client-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)[activemq-client-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:275)[activemq-broker-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:133)[activemq-client-5.15.9.jar:5.15.9]
>          at 
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48)[activemq-client-5.15.9.jar:5.15.9]
>          at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)[:1.8.0_201]
>          at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)[:1.8.0_201]
>          at java.lang.Thread.run(Thread.java:748)[:1.8.0_201]{code}
>  
> If I front the connection to the upstream server by using camel - I get the 
> error below which matches up with the error above.. I am currently running 
> camel in full debug to hopefully catch another event. I'm unsure if using the 
> camel configuration will stop the eventual OOM conditrion as seen above but I 
> will try and configure a test client later to watch.
> {code:java}
> 2019-06-15 19:28:23,855 | WARN  | Execution of JMS message listener failed. 
> Caused by: [org.apache.camel.RuntimeCamelException - javax.jms.JMSException: 
> java.io.EOFException] | 
> org.apache.camel.component.jms.EndpointMessageListener | Camel (Piper_Bravo) 
> thread #1 - JmsConsumer[TD_ALL_SIG_AREA]
> org.apache.camel.RuntimeCamelException: javax.jms.JMSException: 
> java.io.EOFException
>         at 
> org.apache.camel.component.jms.JmsBinding.extractHeadersFromJms(JmsBinding.java:209)[camel-jms-2.23.0.jar:2.23.0]
>         at 
> org.apache.camel.component.jms.JmsMessage.populateInitialHeaders(JmsMessage.java:235)[camel-jms-2.23.0.jar:2.23.0]
>         at 
> org.apache.camel.impl.DefaultMessage.createHeaders(DefaultMessage.java:258)[camel-core-2.23.0.jar:2.23.0]
>         at 
> org.apache.camel.component.jms.JmsMessage.ensureInitialHeaders(JmsMessage.java:220)[camel-jms-2.23.0.jar:2.23.0]
>         at 
> org.apache.camel.component.jms.JmsMessage.getHeader(JmsMessage.java:170)[camel-jms-2.23.0.jar:2.23.0]
>         at 
> org.apache.camel.impl.DefaultMessage.getHeader(DefaultMessage.java:94)[camel-core-2.23.0.jar:2.23.0]
>         at 
> org.apache.camel.impl.DefaultUnitOfWork.<init>(DefaultUnitOfWork.java:115)[camel-core-2.23.0.jar:2.23.0]
>         at 
> org.apache.camel.impl.DefaultUnitOfWork.<init>(DefaultUnitOfWork.java:75)[camel-core-2.23.0.jar:2.23.0]
>         at 
> org.apache.camel.impl.DefaultUnitOfWorkFactory.createUnitOfWork(DefaultUnitOfWorkFactory.java:34)[camel-core-2.23.0.jar:2.23.0]
>         at 
> org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.createUnitOfWork(CamelInternalProcessor.java:695)[camel-core-2.23.0.jar:2.23.0]
>         at 
> org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.before(CamelInternalProcessor.java:663)[camel-core-2.23.0.jar:2.23.0]
>         at 
> org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.before(CamelInternalProcessor.java:634)[camel-core-2.23.0.jar:2.23.0]
>         at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:149)[camel-core-2.23.0.jar:2.23.0]
>         at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)[camel-core-2.23.0.jar:2.23.0]
>         at 
> org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:113)[camel-jms-2.23.0.jar:2.23.0]
>         at 
> org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:719)[spring-jms-4.3.18.RELEASE.jar:4.3.18.RELEASE]
>         at 
> org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)[spring-jms-4.3.18.RELEASE.jar:4.3.18.RELEASE]
>         at 
> org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:649)[spring-jms-4.3.18.RELEASE.jar:4.3.18.RELEASE]
>         at 
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:317)[spring-jms-4.3.18.RELEASE.jar:4.3.18.RELEASE]
>         at 
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:255)[spring-jms-4.3.18.RELEASE.jar:4.3.18.RELEASE]
>         at 
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1168)[spring-jms-4.3.18.RELEASE.jar:4.3.18.RELEASE]
>         at 
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1160)[spring-jms-4.3.18.RELEASE.jar:4.3.18.RELEASE]
>         at 
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1057)[spring-jms-4.3.18.RELEASE.jar:4.3.18.RELEASE]
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)[:1.8.0_212]
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)[:1.8.0_212]
>         at java.lang.Thread.run(Thread.java:748)[:1.8.0_212]{code}
>  
> DEBUG Capture of good message:
> {code:java}
> 2019-06-26 18:50:43,461 | DEBUG |
> nrod://topic:TD_ALL_SIG_AREA?clientId=test.broker1%40test&durableSubscriptionName=td-test-test
> consumer received JMS message: ActiveMQTextMessage {commandId = 2113029,
> responseRequired = false, messageId =
> ID:opendata-backend.rockshore.net-45497-1560093463062-11:26340:1:1:90442,
> originalDestination = null, originalTransactionId = null, producerId =
> ID:opendata-backend.rockshore.net-45497-1560093463062-11:26340:1:1,
> destination = topic://TD_ALL_SIG_AREA, transactionId = null, expiration =
> 1561571743449, timestamp = 1561571443449, arrival = 0, brokerInTime =
> 1561571443450, brokerOutTime = 1561571443453, correlationId = null, replyTo
> = null, persistent = true, type = null, priority = 4, groupID = null,
> groupSequence = 0, targetConsumerId = null, compressed = true, userID =
> null, content = org.apache.activemq.util.ByteSequence@5b0352b8,
> marshalledProperties = org.apache.activemq.util.ByteSequence@68ec1f70,
> dataStructure = null, redeliveryCounter = 0, size = 0, properties =
> {transformation=jms-xml}, readOnlyProperties = true, readOnlyBody = true,
> droppable = false, jmsXGroupFirstForConsumer = false, text =
> [{"CA_MSG":{"to":"0335","time":"1561571443000...ata":"CA"}}]} |
> org.apache.camel.component.jms.EndpointMessageListener | Camel (Piper_Alpha)
> thread #1 - JmsConsumer[TD_ALL_SIG_AREA]{code}
>  
> DEBUG Capture of bad message:
> {code:java}
> 2019-06-26 18:50:43,202 | DEBUG |
> nrod://topic:TD_ALL_SIG_AREA?clientId=test.broker1%40test&durableSubscriptionName=td-test-test
> consumer received JMS message: ActiveMQTextMessage {commandId = 2113027,
> responseRequired = false, messageId =
> ID:opendata-backend.rockshore.net-45497-1560093463062-11:26340:4:7:90688,
> originalDestination = null, originalTransactionId = null, producerId =
> ID:opendata-backend.rockshore.net-45497-1560093463062-11:26340:4:7,
> destination = topic://TD_ALL_SIG_AREA, transactionId = null, expiration =
> 1561571743189, timestamp = 1561571443189, arrival = 0, brokerInTime =
> 1561571443190, brokerOutTime = 1561571443194, correlationId = null, replyTo
> = null, persistent = true, type = null, priority = 4, groupID = null,
> groupSequence = 0, targetConsumerId = null, compressed = true, userID =
> null, content = org.apache.activemq.util.ByteSequence@7eea0857,
> marshalledProperties = org.apache.activemq.util.ByteSequence@dc68e3d,
> dataStructure = null, redeliveryCounter = 0, size = 0, properties = null,
> readOnlyProperties = true, readOnlyBody = true, droppable = false,
> jmsXGroupFirstForConsumer = false, text =
> [{"SF_MSG":{"time":"1561571442000","area_id":...ata":"40"}}]} |
> org.apache.camel.component.jms.EndpointMessageListener | Camel (Piper_Alpha)
> thread #1 - JmsConsumer[TD_ALL_SIG_AREA]{code}
> In the JMS header a good message has;
> {code:java}
> properties = {transformation=jms-xml}{code}
> But in a 'bad' message it only has
> {code:java}
> properties = null {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to