[ https://issues.apache.org/jira/browse/AMQ-5347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14192307#comment-14192307 ]
Jesse Fugitt commented on AMQ-5347: ----------------------------------- Great thanks. Killing the connection was the important thing and the runtime exception looks like it accomplishes that. Originally, I looked at a way to sneak the exception through the interface since changing would impact so many files/plugins but ended up just submitting the patch so that the intent was clear. Glad you were able to get this in. > persistJMSRedelivered flag doesn't work correctly when exceptions occur > ----------------------------------------------------------------------- > > Key: AMQ-5347 > URL: https://issues.apache.org/jira/browse/AMQ-5347 > Project: ActiveMQ > Issue Type: Bug > Components: Broker > Affects Versions: 5.10.0 > Reporter: Jesse Fugitt > Assignee: Gary Tully > Fix For: 5.11.0 > > Attachments: AMQ5347.patch, RedeliveryRestartWithExceptionTest.java > > > The new flag in 5.10 that ensures the JMSRedelivered flag persists across > broker restarts does not work correctly when an exception occurs when > attempting to write the "message update" to disk before the restart. In that > case, messages can be assigned to receivers, the broker can be restarted, and > then the messages are re-assigned to receivers and do not include the > JMSRedelivered flag as expected. I will attach a unit test and proposed fix > to illustrate the problem. > Also, here is additional information I had sent to the mailing list: > When using the new option persisteJMSRedelivered (to ensure the redelivered > flag is set correctly on potentially duplicate messages that are > re-dispatched by the broker even after a restart): <policyEntry queue=">" > persistJMSRedelivered="true"></policyEntry> > there is still a case where a message can be re-sent and will not be marked > as redelivered. I can open a JIRA and probably create a unit test but it is > pretty clear from the pasted code below where the exception is getting > swallowed. Would the preferred fix be to update the broker interface and > make preProcessDispatch throw an IOException or would it be better to add a > new field to the MessageDispatch class to indicate an exception occurred and > leave the interface alone? > The specific case when this can happen is when a MessageStore returns an > exception during the updateMessage call, which then gets swallowed (and an > ERROR logged) and still allows the message to be dispatched to the consumer. > The exception seems like it should actually propagate out of the > preProcessDispatch function in RegionBroker as shown below, but this would > require changing the Broker interface and making the void preProcessDispatch > function throw an IOException. > //RegionBroker.java > @Override > public void preProcessDispatch(MessageDispatch messageDispatch) { > Message message = messageDispatch.getMessage(); > if (message != null) { > long endTime = System.currentTimeMillis(); > message.setBrokerOutTime(endTime); > if (getBrokerService().isEnableStatistics()) { > long totalTime = endTime - message.getBrokerInTime(); > ((Destination) > message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime); > } > if (((BaseDestination) > message.getRegionDestination()).isPersistJMSRedelivered() && > !message.isRedelivered() && message.isPersistent()) { > final int originalValue = message.getRedeliveryCounter(); > message.incrementRedeliveryCounter(); > try { > ((BaseDestination) > message.getRegionDestination()).getMessageStore().updateMessage(message); > } catch (IOException error) { > LOG.error("Failed to persist JMSRedeliveryFlag on {} in > {}", message.getMessageId(), message.getDestination(), error); > } finally { > message.setRedeliveryCounter(originalValue); > } > } > } > } > //TransportConnection.java > protected void processDispatch(Command command) throws IOException { > MessageDispatch messageDispatch = (MessageDispatch) > (command.isMessageDispatch() ? command : null); > try { > if (!stopping.get()) { > if (messageDispatch != null) { > broker.preProcessDispatch(messageDispatch); > } > dispatch(command); //This code will dispatch the message > whether or not the updateMessage function actually worked > } > ... -- This message was sent by Atlassian JIRA (v6.3.4#6332)