RE: problem when MessageStore updateMessage function throws an exception

2014-09-12 Thread Fugitt, Jesse
Yes, the fix in the JIRA is working for us now with a re-built broker using 
that fix (other fixes might be possible that don't involve changing the broker 
interface but the attached fix seemed the most straightforward).  The problem 
for us occurs typically during a broker shutdown when the message store's 
updateMessage throws the exception.  We are expecting the broker to never 
assign a message out to a consumer if it couldn't successfully complete 
updateMessage (otherwise you are open to duplicates being sent upon restart of 
the broker that are not marked redelivered).  The attached unit test 
illustrates the expected behavior.

Thanks,
Jesse

-Original Message-
From: Gary Tully [mailto:gary.tu...@gmail.com] 
Sent: Thursday, September 11, 2014 4:51 PM
To: dev@activemq.apache.org
Subject: Re: problem when MessageStore updateMessage function throws an 
exception

ah, I see your jira and patch. will try and have a peek tomorrow.

On 11 September 2014 22:47, Gary Tully gary.tu...@gmail.com wrote:
 A store failure is typically fatal. There is an ioexception handler in 
 the loop for message sends and the default behaviour is to stop the 
 broker.
 One option is to pull in the ioexception handler in this case also, 
 allowing the broker stop behaviour.

 at this point, predispatch, the message is pending an ack and won't 
 get dispatched again till the consumer/subscription closes so not 
 dispatching will leave it dangling.

 What is the ideal behaviour for a message in this case?


 On 4 September 2014 21:26, Fugitt, Jesse jfug...@informatica.com wrote:
 When using the new option persisteJMSRedelivered (to ensure the 
 redelivered flag is set correctly on potentially duplicate messages 
 that are re-dispatched by the broker even after a restart): 
 policyEntry queue= persistJMSRedelivered=true/policyEntry

 there is still a case where a message can be re-sent and will not be marked 
 as redelivered.  I can open a JIRA and probably create a unit test but it is 
 pretty clear from the pasted code below where the exception is getting 
 swallowed.  Would the preferred fix be to update the broker interface and 
 make preProcessDispatch throw an IOException or would it be better to add a 
 new field to the MessageDispatch class to indicate an exception occurred and 
 leave the interface alone?

 The specific case when this can happen is when a MessageStore returns an 
 exception during the updateMessage call, which then gets swallowed (and an 
 ERROR logged) and still allows the message to be dispatched to the consumer. 
  The exception seems like it should actually propagate out of the 
 preProcessDispatch function in RegionBroker as shown below, but this would 
 require changing the Broker interface and making the void preProcessDispatch 
 function throw an IOException.

 //RegionBroker.java
 @Override
 public void preProcessDispatch(MessageDispatch messageDispatch) {
 Message message = messageDispatch.getMessage();
 if (message != null) {
 long endTime = System.currentTimeMillis();
 message.setBrokerOutTime(endTime);
 if (getBrokerService().isEnableStatistics()) {
 long totalTime = endTime - message.getBrokerInTime();
 ((Destination) 
 message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
 }
 if (((BaseDestination) 
 message.getRegionDestination()).isPersistJMSRedelivered()  
 !message.isRedelivered()  message.isPersistent()) {
 final int originalValue = message.getRedeliveryCounter();
 message.incrementRedeliveryCounter();
 try {
 ((BaseDestination) 
 message.getRegionDestination()).getMessageStore().updateMessage(message);
 } catch (IOException error) {
 LOG.error(Failed to persist JMSRedeliveryFlag on {} in 
 {}, message.getMessageId(), message.getDestination(), error);
 } finally {
 message.setRedeliveryCounter(originalValue);
 }
 }
 }
 }

 //TransportConnection.java
 protected void processDispatch(Command command) throws IOException {
 MessageDispatch messageDispatch = (MessageDispatch) 
 (command.isMessageDispatch() ? command : null);
 try {
 if (!stopping.get()) {
 if (messageDispatch != null) {
 broker.preProcessDispatch(messageDispatch);
 }
 dispatch(command);  //This code will dispatch the message 
 whether or not the updateMessage function actually worked
 }
 ...



 I wanted to get input on this issue before proceeding further with it.

 Thanks,
 Jesse



 --
 http://redhat.com
 http://blog.garytully.com



--
http://redhat.com
http://blog.garytully.com


Re: problem when MessageStore updateMessage function throws an exception

2014-09-11 Thread Gary Tully
A store failure is typically fatal. There is an ioexception handler in
the loop for message sends and the default behaviour is to stop the
broker.
One option is to pull in the ioexception handler in this case also,
allowing the broker stop behaviour.

at this point, predispatch, the message is pending an ack and won't
get dispatched again till the consumer/subscription closes so not
dispatching will leave it dangling.

What is the ideal behaviour for a message in this case?


On 4 September 2014 21:26, Fugitt, Jesse jfug...@informatica.com wrote:
 When using the new option persisteJMSRedelivered (to ensure the redelivered 
 flag is set correctly on potentially duplicate messages that are 
 re-dispatched by the broker even after a restart): policyEntry queue= 
 persistJMSRedelivered=true/policyEntry

 there is still a case where a message can be re-sent and will not be marked 
 as redelivered.  I can open a JIRA and probably create a unit test but it is 
 pretty clear from the pasted code below where the exception is getting 
 swallowed.  Would the preferred fix be to update the broker interface and 
 make preProcessDispatch throw an IOException or would it be better to add a 
 new field to the MessageDispatch class to indicate an exception occurred and 
 leave the interface alone?

 The specific case when this can happen is when a MessageStore returns an 
 exception during the updateMessage call, which then gets swallowed (and an 
 ERROR logged) and still allows the message to be dispatched to the consumer.  
 The exception seems like it should actually propagate out of the 
 preProcessDispatch function in RegionBroker as shown below, but this would 
 require changing the Broker interface and making the void preProcessDispatch 
 function throw an IOException.

 //RegionBroker.java
 @Override
 public void preProcessDispatch(MessageDispatch messageDispatch) {
 Message message = messageDispatch.getMessage();
 if (message != null) {
 long endTime = System.currentTimeMillis();
 message.setBrokerOutTime(endTime);
 if (getBrokerService().isEnableStatistics()) {
 long totalTime = endTime - message.getBrokerInTime();
 ((Destination) 
 message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
 }
 if (((BaseDestination) 
 message.getRegionDestination()).isPersistJMSRedelivered()  
 !message.isRedelivered()  message.isPersistent()) {
 final int originalValue = message.getRedeliveryCounter();
 message.incrementRedeliveryCounter();
 try {
 ((BaseDestination) 
 message.getRegionDestination()).getMessageStore().updateMessage(message);
 } catch (IOException error) {
 LOG.error(Failed to persist JMSRedeliveryFlag on {} in 
 {}, message.getMessageId(), message.getDestination(), error);
 } finally {
 message.setRedeliveryCounter(originalValue);
 }
 }
 }
 }

 //TransportConnection.java
 protected void processDispatch(Command command) throws IOException {
 MessageDispatch messageDispatch = (MessageDispatch) 
 (command.isMessageDispatch() ? command : null);
 try {
 if (!stopping.get()) {
 if (messageDispatch != null) {
 broker.preProcessDispatch(messageDispatch);
 }
 dispatch(command);  //This code will dispatch the message 
 whether or not the updateMessage function actually worked
 }
 ...



 I wanted to get input on this issue before proceeding further with it.

 Thanks,
 Jesse



-- 
http://redhat.com
http://blog.garytully.com


Re: problem when MessageStore updateMessage function throws an exception

2014-09-11 Thread Gary Tully
ah, I see your jira and patch. will try and have a peek tomorrow.

On 11 September 2014 22:47, Gary Tully gary.tu...@gmail.com wrote:
 A store failure is typically fatal. There is an ioexception handler in
 the loop for message sends and the default behaviour is to stop the
 broker.
 One option is to pull in the ioexception handler in this case also,
 allowing the broker stop behaviour.

 at this point, predispatch, the message is pending an ack and won't
 get dispatched again till the consumer/subscription closes so not
 dispatching will leave it dangling.

 What is the ideal behaviour for a message in this case?


 On 4 September 2014 21:26, Fugitt, Jesse jfug...@informatica.com wrote:
 When using the new option persisteJMSRedelivered (to ensure the redelivered 
 flag is set correctly on potentially duplicate messages that are 
 re-dispatched by the broker even after a restart): policyEntry queue= 
 persistJMSRedelivered=true/policyEntry

 there is still a case where a message can be re-sent and will not be marked 
 as redelivered.  I can open a JIRA and probably create a unit test but it is 
 pretty clear from the pasted code below where the exception is getting 
 swallowed.  Would the preferred fix be to update the broker interface and 
 make preProcessDispatch throw an IOException or would it be better to add a 
 new field to the MessageDispatch class to indicate an exception occurred and 
 leave the interface alone?

 The specific case when this can happen is when a MessageStore returns an 
 exception during the updateMessage call, which then gets swallowed (and an 
 ERROR logged) and still allows the message to be dispatched to the consumer. 
  The exception seems like it should actually propagate out of the 
 preProcessDispatch function in RegionBroker as shown below, but this would 
 require changing the Broker interface and making the void preProcessDispatch 
 function throw an IOException.

 //RegionBroker.java
 @Override
 public void preProcessDispatch(MessageDispatch messageDispatch) {
 Message message = messageDispatch.getMessage();
 if (message != null) {
 long endTime = System.currentTimeMillis();
 message.setBrokerOutTime(endTime);
 if (getBrokerService().isEnableStatistics()) {
 long totalTime = endTime - message.getBrokerInTime();
 ((Destination) 
 message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
 }
 if (((BaseDestination) 
 message.getRegionDestination()).isPersistJMSRedelivered()  
 !message.isRedelivered()  message.isPersistent()) {
 final int originalValue = message.getRedeliveryCounter();
 message.incrementRedeliveryCounter();
 try {
 ((BaseDestination) 
 message.getRegionDestination()).getMessageStore().updateMessage(message);
 } catch (IOException error) {
 LOG.error(Failed to persist JMSRedeliveryFlag on {} in 
 {}, message.getMessageId(), message.getDestination(), error);
 } finally {
 message.setRedeliveryCounter(originalValue);
 }
 }
 }
 }

 //TransportConnection.java
 protected void processDispatch(Command command) throws IOException {
 MessageDispatch messageDispatch = (MessageDispatch) 
 (command.isMessageDispatch() ? command : null);
 try {
 if (!stopping.get()) {
 if (messageDispatch != null) {
 broker.preProcessDispatch(messageDispatch);
 }
 dispatch(command);  //This code will dispatch the message 
 whether or not the updateMessage function actually worked
 }
 ...



 I wanted to get input on this issue before proceeding further with it.

 Thanks,
 Jesse



 --
 http://redhat.com
 http://blog.garytully.com



-- 
http://redhat.com
http://blog.garytully.com