An expired message that is consumed and resent as a copied message does not
expire again.
-----------------------------------------------------------------------------------------
Key: AMQ-3153
URL: https://issues.apache.org/jira/browse/AMQ-3153
Project: ActiveMQ
Issue Type: Bug
Components: Broker
Affects Versions: 5.4.2
Reporter: Stirling Chow
Symptom
========
We have a use case where a producer sends a message to a queue with an
expiration. When the message expires to the DLQ, a consumer on the DLQ
receives the message, modifies it, and resends it to the original queue with an
updated expiration.
When the expired message is resent, it is given a new JMS message ID, so is,
for all intents and purposes, a new message. However, althought the resent
message has an updated expiration, it never expires to the DLQ.
Cause
=====
When a message expires, an "originalExpiration" property is added to the
message by RegionBroker.stampAsExpired:
private boolean stampAsExpired(Message message) throws IOException {
boolean stamped=false;
if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
long expiration=message.getExpiration();
message.setProperty(ORIGINAL_EXPIRATION,new Long(expiration));
stamped = true;
}
return stamped;
}
When the consumer receives and resends the expired message, ActiveMQSession
gives the message a new ID and updates its expiration:
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination
destination, Message message, int deliveryMode, int priority, long timeToLive,
MemoryUsage producerWindow, int sendTimeout) throws
JMSException {
..
long expiration = 0L;
if (!producer.getDisableMessageTimestamp()) {
long timeStamp = System.currentTimeMillis();
message.setJMSTimestamp(timeStamp);
if (timeToLive > 0) {
expiration = timeToLive + timeStamp;
}
}
message.setJMSExpiration(expiration);
...
// Set the message id.
if (msg == message) {
msg.setMessageId(new
MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
} else {
msg.setMessageId(new
MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
message.setJMSMessageID(msg.getMessageId().toString());
}
...
At this point the resent message has a new ID and a new expiration, so it
should be allowed to reexpire. However, the resent message still carries the
originalExpiration property, which makes RegionBroker report the message has
not expired (even though it may have):
@Override
public boolean isExpired(MessageReference messageReference) {
boolean expired = false;
if (messageReference.isExpired()) {
try {
// prevent duplicate expiry processing
Message message = messageReference.getMessage();
synchronized (message) {
expired = stampAsExpired(message);
}
} catch (IOException e) {
LOG.warn("unexpected exception on message expiry determination
for: " + messageReference, e);
}
}
return expired;
}
Since the broker is not reporting the message as expired, the expired message
processing in Queue bypasses the message (from Queue.doBrowse()):
if (broker.isExpired(node)) {
LOG.debug("expiring from messages: " +
node);
messageExpired(connectionContext,
createMessageReference(node.getMessage()));
}
messages.remove();
Solution
=======
Whenever a message is sent to a broker from a message producer, it should have
its originalExpiration property cleared. I've provided a patch in
ActiveMQSession to do this, but I'm not familiar enough with the workflow to
know if this is the appropriate place --- I'm specifically unhappy with the
need to case the javax.jms.Message to an ActiveMQMessage in order to clear the
readonly properties.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.