[jira] [Updated] (AMQ-5347) persistJMSRedelivered flag doesn't work correctly when exceptions occur
[ https://issues.apache.org/jira/browse/AMQ-5347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jesse Fugitt updated AMQ-5347: -- Patch Info: Patch Available persistJMSRedelivered flag doesn't work correctly when exceptions occur --- Key: AMQ-5347 URL: https://issues.apache.org/jira/browse/AMQ-5347 Project: ActiveMQ Issue Type: Bug Components: Broker Affects Versions: 5.10.0 Reporter: Jesse Fugitt Attachments: AMQ5347.patch, RedeliveryRestartWithExceptionTest.java The new flag in 5.10 that ensures the JMSRedelivered flag persists across broker restarts does not work correctly when an exception occurs when attempting to write the message update to disk before the restart. In that case, messages can be assigned to receivers, the broker can be restarted, and then the messages are re-assigned to receivers and do not include the JMSRedelivered flag as expected. I will attach a unit test and proposed fix to illustrate the problem. Also, here is additional information I had sent to the mailing list: When using the new option persisteJMSRedelivered (to ensure the redelivered flag is set correctly on potentially duplicate messages that are re-dispatched by the broker even after a restart): policyEntry queue= persistJMSRedelivered=true/policyEntry there is still a case where a message can be re-sent and will not be marked as redelivered. I can open a JIRA and probably create a unit test but it is pretty clear from the pasted code below where the exception is getting swallowed. Would the preferred fix be to update the broker interface and make preProcessDispatch throw an IOException or would it be better to add a new field to the MessageDispatch class to indicate an exception occurred and leave the interface alone? The specific case when this can happen is when a MessageStore returns an exception during the updateMessage call, which then gets swallowed (and an ERROR logged) and still allows the message to be dispatched to the consumer. The exception seems like it should actually propagate out of the preProcessDispatch function in RegionBroker as shown below, but this would require changing the Broker interface and making the void preProcessDispatch function throw an IOException. //RegionBroker.java @Override public void preProcessDispatch(MessageDispatch messageDispatch) { Message message = messageDispatch.getMessage(); if (message != null) { long endTime = System.currentTimeMillis(); message.setBrokerOutTime(endTime); if (getBrokerService().isEnableStatistics()) { long totalTime = endTime - message.getBrokerInTime(); ((Destination) message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime); } if (((BaseDestination) message.getRegionDestination()).isPersistJMSRedelivered() !message.isRedelivered() message.isPersistent()) { final int originalValue = message.getRedeliveryCounter(); message.incrementRedeliveryCounter(); try { ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message); } catch (IOException error) { LOG.error(Failed to persist JMSRedeliveryFlag on {} in {}, message.getMessageId(), message.getDestination(), error); } finally { message.setRedeliveryCounter(originalValue); } } } } //TransportConnection.java protected void processDispatch(Command command) throws IOException { MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null); try { if (!stopping.get()) { if (messageDispatch != null) { broker.preProcessDispatch(messageDispatch); } dispatch(command); //This code will dispatch the message whether or not the updateMessage function actually worked } ... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (AMQ-5274) Stuck messages and CPU churn when aborted transacted message expires
[ https://issues.apache.org/jira/browse/AMQ-5274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Tully resolved AMQ-5274. - Resolution: Fixed Fix Version/s: 5.11.0 fix in https://git-wip-us.apache.org/repos/asf?p=activemq.git;a=commit;h=26807cd4 inflight messages are not expired by the broker side background periodic check. In this way, acks do not compete with expiry. Stuck messages and CPU churn when aborted transacted message expires Key: AMQ-5274 URL: https://issues.apache.org/jira/browse/AMQ-5274 Project: ActiveMQ Issue Type: Bug Affects Versions: 5.8.0, 5.9.0, 5.9.1, 5.10.0 Environment: win64, RHEL11 Reporter: Yannick Malins Assignee: Gary Tully Priority: Critical Fix For: 5.11.0 Attachments: AMQ-5274.zip, AMQ-5274v2.zip, AMQ5274Test.java, logs_extract.txt The test case is a simple producer/consumer: Producer: 20 messages are injected, with a timeout of 10s. Consumer: The redelivery policy is set to 0 retries (the issue exists with other values). The consumer uses transactions and throws a runtime exception on each message received. queue stats show 20 enqueue, 19 dequeue, 1 pending DLQ stat show 20 enqueue: all 20 messages go to DLQ, IDs ending in 1-10 for failing, 11-20 for expiry (approx) the pending item (ID ending in 10) is a ghost message , and remains stuck indefinitely in queue.test if you browse, the message is not shown A) if you restart the broker, after a short while the message is cleaned: jvm 1| WARN | Duplicate message add attempt rejected. Destination: QUEUE://ActiveMQ.DLQ, Message id: ID:REDACTED-52872-1405079629779-1:1:1:1:10 jvm 1| WARN | org.apache.activemq.broker.region.cursors.QueueStorePrefetch@5b427f3c:ActiveMQ.DLQ,batchResetNeeded=false,storeHasMessages=true,size=0,cacheEnabled=true,maxBatchSize:20,hasSpace:tru e - cursor got duplicate: ID:REDACTED--52872-1405079629779-1:1:1:1:10, 4 jvm 1| WARN | duplicate message from store ID:REDACTED--52872-1405079629779-1:1:1:1:10, redirecting for dlq processing B) if you purge, ActiveMQ logs a warning: WARN | queue://queue.test after purge complete, message count stats report: 1 the queue is marked as being empty. however if you restart the broker, the message re-appears shorty, before being cleaned as above SUPPLEMENTARY: with activeMQ 5.9.0 and above , if you run the injection several times, the CPU usage of ActiveMQ climbs drastically until the queue is purged. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: problem when MessageStore updateMessage function throws an exception
A store failure is typically fatal. There is an ioexception handler in the loop for message sends and the default behaviour is to stop the broker. One option is to pull in the ioexception handler in this case also, allowing the broker stop behaviour. at this point, predispatch, the message is pending an ack and won't get dispatched again till the consumer/subscription closes so not dispatching will leave it dangling. What is the ideal behaviour for a message in this case? On 4 September 2014 21:26, Fugitt, Jesse jfug...@informatica.com wrote: When using the new option persisteJMSRedelivered (to ensure the redelivered flag is set correctly on potentially duplicate messages that are re-dispatched by the broker even after a restart): policyEntry queue= persistJMSRedelivered=true/policyEntry there is still a case where a message can be re-sent and will not be marked as redelivered. I can open a JIRA and probably create a unit test but it is pretty clear from the pasted code below where the exception is getting swallowed. Would the preferred fix be to update the broker interface and make preProcessDispatch throw an IOException or would it be better to add a new field to the MessageDispatch class to indicate an exception occurred and leave the interface alone? The specific case when this can happen is when a MessageStore returns an exception during the updateMessage call, which then gets swallowed (and an ERROR logged) and still allows the message to be dispatched to the consumer. The exception seems like it should actually propagate out of the preProcessDispatch function in RegionBroker as shown below, but this would require changing the Broker interface and making the void preProcessDispatch function throw an IOException. //RegionBroker.java @Override public void preProcessDispatch(MessageDispatch messageDispatch) { Message message = messageDispatch.getMessage(); if (message != null) { long endTime = System.currentTimeMillis(); message.setBrokerOutTime(endTime); if (getBrokerService().isEnableStatistics()) { long totalTime = endTime - message.getBrokerInTime(); ((Destination) message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime); } if (((BaseDestination) message.getRegionDestination()).isPersistJMSRedelivered() !message.isRedelivered() message.isPersistent()) { final int originalValue = message.getRedeliveryCounter(); message.incrementRedeliveryCounter(); try { ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message); } catch (IOException error) { LOG.error(Failed to persist JMSRedeliveryFlag on {} in {}, message.getMessageId(), message.getDestination(), error); } finally { message.setRedeliveryCounter(originalValue); } } } } //TransportConnection.java protected void processDispatch(Command command) throws IOException { MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null); try { if (!stopping.get()) { if (messageDispatch != null) { broker.preProcessDispatch(messageDispatch); } dispatch(command); //This code will dispatch the message whether or not the updateMessage function actually worked } ... I wanted to get input on this issue before proceeding further with it. Thanks, Jesse -- http://redhat.com http://blog.garytully.com
Re: problem when MessageStore updateMessage function throws an exception
ah, I see your jira and patch. will try and have a peek tomorrow. On 11 September 2014 22:47, Gary Tully gary.tu...@gmail.com wrote: A store failure is typically fatal. There is an ioexception handler in the loop for message sends and the default behaviour is to stop the broker. One option is to pull in the ioexception handler in this case also, allowing the broker stop behaviour. at this point, predispatch, the message is pending an ack and won't get dispatched again till the consumer/subscription closes so not dispatching will leave it dangling. What is the ideal behaviour for a message in this case? On 4 September 2014 21:26, Fugitt, Jesse jfug...@informatica.com wrote: When using the new option persisteJMSRedelivered (to ensure the redelivered flag is set correctly on potentially duplicate messages that are re-dispatched by the broker even after a restart): policyEntry queue= persistJMSRedelivered=true/policyEntry there is still a case where a message can be re-sent and will not be marked as redelivered. I can open a JIRA and probably create a unit test but it is pretty clear from the pasted code below where the exception is getting swallowed. Would the preferred fix be to update the broker interface and make preProcessDispatch throw an IOException or would it be better to add a new field to the MessageDispatch class to indicate an exception occurred and leave the interface alone? The specific case when this can happen is when a MessageStore returns an exception during the updateMessage call, which then gets swallowed (and an ERROR logged) and still allows the message to be dispatched to the consumer. The exception seems like it should actually propagate out of the preProcessDispatch function in RegionBroker as shown below, but this would require changing the Broker interface and making the void preProcessDispatch function throw an IOException. //RegionBroker.java @Override public void preProcessDispatch(MessageDispatch messageDispatch) { Message message = messageDispatch.getMessage(); if (message != null) { long endTime = System.currentTimeMillis(); message.setBrokerOutTime(endTime); if (getBrokerService().isEnableStatistics()) { long totalTime = endTime - message.getBrokerInTime(); ((Destination) message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime); } if (((BaseDestination) message.getRegionDestination()).isPersistJMSRedelivered() !message.isRedelivered() message.isPersistent()) { final int originalValue = message.getRedeliveryCounter(); message.incrementRedeliveryCounter(); try { ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message); } catch (IOException error) { LOG.error(Failed to persist JMSRedeliveryFlag on {} in {}, message.getMessageId(), message.getDestination(), error); } finally { message.setRedeliveryCounter(originalValue); } } } } //TransportConnection.java protected void processDispatch(Command command) throws IOException { MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null); try { if (!stopping.get()) { if (messageDispatch != null) { broker.preProcessDispatch(messageDispatch); } dispatch(command); //This code will dispatch the message whether or not the updateMessage function actually worked } ... I wanted to get input on this issue before proceeding further with it. Thanks, Jesse -- http://redhat.com http://blog.garytully.com -- http://redhat.com http://blog.garytully.com