[jira] [Updated] (AMQ-5347) persistJMSRedelivered flag doesn't work correctly when exceptions occur

2014-09-11 Thread Jesse Fugitt (JIRA)

 [ 
https://issues.apache.org/jira/browse/AMQ-5347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jesse Fugitt updated AMQ-5347:
--
Patch Info: Patch Available

 persistJMSRedelivered flag doesn't work correctly when exceptions occur
 ---

 Key: AMQ-5347
 URL: https://issues.apache.org/jira/browse/AMQ-5347
 Project: ActiveMQ
  Issue Type: Bug
  Components: Broker
Affects Versions: 5.10.0
Reporter: Jesse Fugitt
 Attachments: AMQ5347.patch, RedeliveryRestartWithExceptionTest.java


 The new flag in 5.10 that ensures the JMSRedelivered flag persists across 
 broker restarts does not work correctly when an exception occurs when 
 attempting to write the message update to disk before the restart.  In that 
 case, messages can be assigned to receivers, the broker can be restarted, and 
 then the messages are re-assigned to receivers and do not include the 
 JMSRedelivered flag as expected.  I will attach a unit test and proposed fix 
 to illustrate the problem.
 Also, here is additional information I had sent to the mailing list:
 When using the new option persisteJMSRedelivered (to ensure the redelivered 
 flag is set correctly on potentially duplicate messages that are 
 re-dispatched by the broker even after a restart): policyEntry queue= 
 persistJMSRedelivered=true/policyEntry
 there is still a case where a message can be re-sent and will not be marked 
 as redelivered.  I can open a JIRA and probably create a unit test but it is 
 pretty clear from the pasted code below where the exception is getting 
 swallowed.  Would the preferred fix be to update the broker interface and 
 make preProcessDispatch throw an IOException or would it be better to add a 
 new field to the MessageDispatch class to indicate an exception occurred and 
 leave the interface alone?
 The specific case when this can happen is when a MessageStore returns an 
 exception during the updateMessage call, which then gets swallowed (and an 
 ERROR logged) and still allows the message to be dispatched to the consumer.  
 The exception seems like it should actually propagate out of the 
 preProcessDispatch function in RegionBroker as shown below, but this would 
 require changing the Broker interface and making the void preProcessDispatch 
 function throw an IOException.
 //RegionBroker.java
 @Override
 public void preProcessDispatch(MessageDispatch messageDispatch) {
 Message message = messageDispatch.getMessage();
 if (message != null) {
 long endTime = System.currentTimeMillis();
 message.setBrokerOutTime(endTime);
 if (getBrokerService().isEnableStatistics()) {
 long totalTime = endTime - message.getBrokerInTime();
 ((Destination) 
 message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
 }
 if (((BaseDestination) 
 message.getRegionDestination()).isPersistJMSRedelivered()  
 !message.isRedelivered()  message.isPersistent()) {
 final int originalValue = message.getRedeliveryCounter();
 message.incrementRedeliveryCounter();
 try {
 ((BaseDestination) 
 message.getRegionDestination()).getMessageStore().updateMessage(message);
 } catch (IOException error) {
 LOG.error(Failed to persist JMSRedeliveryFlag on {} in 
 {}, message.getMessageId(), message.getDestination(), error);
 } finally {
 message.setRedeliveryCounter(originalValue);
 }
 }
 }
 }
 //TransportConnection.java
 protected void processDispatch(Command command) throws IOException {
 MessageDispatch messageDispatch = (MessageDispatch) 
 (command.isMessageDispatch() ? command : null);
 try {
 if (!stopping.get()) {
 if (messageDispatch != null) {
 broker.preProcessDispatch(messageDispatch);
 }
 dispatch(command);  //This code will dispatch the message 
 whether or not the updateMessage function actually worked
 }
 ...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (AMQ-5274) Stuck messages and CPU churn when aborted transacted message expires

2014-09-11 Thread Gary Tully (JIRA)

 [ 
https://issues.apache.org/jira/browse/AMQ-5274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Tully resolved AMQ-5274.
-
   Resolution: Fixed
Fix Version/s: 5.11.0

fix in 
https://git-wip-us.apache.org/repos/asf?p=activemq.git;a=commit;h=26807cd4

inflight messages are not expired by the broker side background periodic check. 
In this way, acks do not compete with expiry.

 Stuck messages and CPU churn when aborted transacted message expires
 

 Key: AMQ-5274
 URL: https://issues.apache.org/jira/browse/AMQ-5274
 Project: ActiveMQ
  Issue Type: Bug
Affects Versions: 5.8.0, 5.9.0, 5.9.1, 5.10.0
 Environment: win64, RHEL11
Reporter: Yannick Malins
Assignee: Gary Tully
Priority: Critical
 Fix For: 5.11.0

 Attachments: AMQ-5274.zip, AMQ-5274v2.zip, AMQ5274Test.java, 
 logs_extract.txt


 The test case is a simple producer/consumer:
 Producer: 20 messages are injected, with a timeout of 10s.
 Consumer: The redelivery policy is set to 0 retries (the issue exists with 
 other values). The consumer uses transactions and throws a runtime exception 
 on each message received.
 queue stats show 20 enqueue, 19 dequeue, 1 pending
 DLQ stat show 20 enqueue: all 20 messages go to DLQ, IDs ending in 1-10 for 
 failing, 11-20 for expiry (approx)
 the pending item (ID ending in 10) is a ghost message , and remains stuck 
 indefinitely in queue.test
 if you browse, the message is not shown
 A) if you restart the broker, after a short while the message is cleaned:
 jvm 1|  WARN | Duplicate message add attempt rejected. Destination: 
 QUEUE://ActiveMQ.DLQ, Message id: ID:REDACTED-52872-1405079629779-1:1:1:1:10
 jvm 1|  WARN | 
 org.apache.activemq.broker.region.cursors.QueueStorePrefetch@5b427f3c:ActiveMQ.DLQ,batchResetNeeded=false,storeHasMessages=true,size=0,cacheEnabled=true,maxBatchSize:20,hasSpace:tru
 e - cursor got duplicate: ID:REDACTED--52872-1405079629779-1:1:1:1:10, 4
 jvm 1|  WARN | duplicate message from store 
 ID:REDACTED--52872-1405079629779-1:1:1:1:10, redirecting for dlq processing
 B) if you purge, ActiveMQ logs a warning: WARN | queue://queue.test after 
 purge complete, message count stats report: 1
   the queue is marked as being empty.
   however if you restart the broker, the message re-appears shorty, 
 before being cleaned as above
   
   
 SUPPLEMENTARY: with activeMQ 5.9.0 and above , if you run the injection 
 several times, the CPU usage of ActiveMQ climbs drastically until the queue 
 is purged.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: problem when MessageStore updateMessage function throws an exception

2014-09-11 Thread Gary Tully
A store failure is typically fatal. There is an ioexception handler in
the loop for message sends and the default behaviour is to stop the
broker.
One option is to pull in the ioexception handler in this case also,
allowing the broker stop behaviour.

at this point, predispatch, the message is pending an ack and won't
get dispatched again till the consumer/subscription closes so not
dispatching will leave it dangling.

What is the ideal behaviour for a message in this case?


On 4 September 2014 21:26, Fugitt, Jesse jfug...@informatica.com wrote:
 When using the new option persisteJMSRedelivered (to ensure the redelivered 
 flag is set correctly on potentially duplicate messages that are 
 re-dispatched by the broker even after a restart): policyEntry queue= 
 persistJMSRedelivered=true/policyEntry

 there is still a case where a message can be re-sent and will not be marked 
 as redelivered.  I can open a JIRA and probably create a unit test but it is 
 pretty clear from the pasted code below where the exception is getting 
 swallowed.  Would the preferred fix be to update the broker interface and 
 make preProcessDispatch throw an IOException or would it be better to add a 
 new field to the MessageDispatch class to indicate an exception occurred and 
 leave the interface alone?

 The specific case when this can happen is when a MessageStore returns an 
 exception during the updateMessage call, which then gets swallowed (and an 
 ERROR logged) and still allows the message to be dispatched to the consumer.  
 The exception seems like it should actually propagate out of the 
 preProcessDispatch function in RegionBroker as shown below, but this would 
 require changing the Broker interface and making the void preProcessDispatch 
 function throw an IOException.

 //RegionBroker.java
 @Override
 public void preProcessDispatch(MessageDispatch messageDispatch) {
 Message message = messageDispatch.getMessage();
 if (message != null) {
 long endTime = System.currentTimeMillis();
 message.setBrokerOutTime(endTime);
 if (getBrokerService().isEnableStatistics()) {
 long totalTime = endTime - message.getBrokerInTime();
 ((Destination) 
 message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
 }
 if (((BaseDestination) 
 message.getRegionDestination()).isPersistJMSRedelivered()  
 !message.isRedelivered()  message.isPersistent()) {
 final int originalValue = message.getRedeliveryCounter();
 message.incrementRedeliveryCounter();
 try {
 ((BaseDestination) 
 message.getRegionDestination()).getMessageStore().updateMessage(message);
 } catch (IOException error) {
 LOG.error(Failed to persist JMSRedeliveryFlag on {} in 
 {}, message.getMessageId(), message.getDestination(), error);
 } finally {
 message.setRedeliveryCounter(originalValue);
 }
 }
 }
 }

 //TransportConnection.java
 protected void processDispatch(Command command) throws IOException {
 MessageDispatch messageDispatch = (MessageDispatch) 
 (command.isMessageDispatch() ? command : null);
 try {
 if (!stopping.get()) {
 if (messageDispatch != null) {
 broker.preProcessDispatch(messageDispatch);
 }
 dispatch(command);  //This code will dispatch the message 
 whether or not the updateMessage function actually worked
 }
 ...



 I wanted to get input on this issue before proceeding further with it.

 Thanks,
 Jesse



-- 
http://redhat.com
http://blog.garytully.com


Re: problem when MessageStore updateMessage function throws an exception

2014-09-11 Thread Gary Tully
ah, I see your jira and patch. will try and have a peek tomorrow.

On 11 September 2014 22:47, Gary Tully gary.tu...@gmail.com wrote:
 A store failure is typically fatal. There is an ioexception handler in
 the loop for message sends and the default behaviour is to stop the
 broker.
 One option is to pull in the ioexception handler in this case also,
 allowing the broker stop behaviour.

 at this point, predispatch, the message is pending an ack and won't
 get dispatched again till the consumer/subscription closes so not
 dispatching will leave it dangling.

 What is the ideal behaviour for a message in this case?


 On 4 September 2014 21:26, Fugitt, Jesse jfug...@informatica.com wrote:
 When using the new option persisteJMSRedelivered (to ensure the redelivered 
 flag is set correctly on potentially duplicate messages that are 
 re-dispatched by the broker even after a restart): policyEntry queue= 
 persistJMSRedelivered=true/policyEntry

 there is still a case where a message can be re-sent and will not be marked 
 as redelivered.  I can open a JIRA and probably create a unit test but it is 
 pretty clear from the pasted code below where the exception is getting 
 swallowed.  Would the preferred fix be to update the broker interface and 
 make preProcessDispatch throw an IOException or would it be better to add a 
 new field to the MessageDispatch class to indicate an exception occurred and 
 leave the interface alone?

 The specific case when this can happen is when a MessageStore returns an 
 exception during the updateMessage call, which then gets swallowed (and an 
 ERROR logged) and still allows the message to be dispatched to the consumer. 
  The exception seems like it should actually propagate out of the 
 preProcessDispatch function in RegionBroker as shown below, but this would 
 require changing the Broker interface and making the void preProcessDispatch 
 function throw an IOException.

 //RegionBroker.java
 @Override
 public void preProcessDispatch(MessageDispatch messageDispatch) {
 Message message = messageDispatch.getMessage();
 if (message != null) {
 long endTime = System.currentTimeMillis();
 message.setBrokerOutTime(endTime);
 if (getBrokerService().isEnableStatistics()) {
 long totalTime = endTime - message.getBrokerInTime();
 ((Destination) 
 message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
 }
 if (((BaseDestination) 
 message.getRegionDestination()).isPersistJMSRedelivered()  
 !message.isRedelivered()  message.isPersistent()) {
 final int originalValue = message.getRedeliveryCounter();
 message.incrementRedeliveryCounter();
 try {
 ((BaseDestination) 
 message.getRegionDestination()).getMessageStore().updateMessage(message);
 } catch (IOException error) {
 LOG.error(Failed to persist JMSRedeliveryFlag on {} in 
 {}, message.getMessageId(), message.getDestination(), error);
 } finally {
 message.setRedeliveryCounter(originalValue);
 }
 }
 }
 }

 //TransportConnection.java
 protected void processDispatch(Command command) throws IOException {
 MessageDispatch messageDispatch = (MessageDispatch) 
 (command.isMessageDispatch() ? command : null);
 try {
 if (!stopping.get()) {
 if (messageDispatch != null) {
 broker.preProcessDispatch(messageDispatch);
 }
 dispatch(command);  //This code will dispatch the message 
 whether or not the updateMessage function actually worked
 }
 ...



 I wanted to get input on this issue before proceeding further with it.

 Thanks,
 Jesse



 --
 http://redhat.com
 http://blog.garytully.com



-- 
http://redhat.com
http://blog.garytully.com