Author: gtully
Date: Fri Jul 17 23:18:29 2009
New Revision: 795270

URL: http://svn.apache.org/viewvc?rev=795270&view=rev
Log:
use non compencating schedualler and ensure DLQ copies message early - ensures 
accurate processing of expired messages - 
https://issues.apache.org/activemq/browse/AMQ-1112

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 Fri Jul 17 23:18:29 2009
@@ -187,7 +187,7 @@
         }
         
         if (getExpireMessagesPeriod() > 0) {
-            scheduler.executePeriodically(expireMessagesTask, 
getExpireMessagesPeriod());
+            scheduler.schedualPeriodically(expireMessagesTask, 
getExpireMessagesPeriod());
         }
         
         super.initialize();

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
 Fri Jul 17 23:18:29 2009
@@ -45,15 +45,18 @@
      * @throws IOException
      */
     protected void acknowledge(final ConnectionContext context, final 
MessageAck ack, final MessageReference n) throws IOException {
-        if (n.isExpired()) {
-            if (!broker.isExpired(n)) {
-                LOG.info("ignoring ack " + ack + ", for already expired 
message: " + n);
-                return;
-            }
-        }
         final Destination q = n.getRegionDestination();
         final QueueMessageReference node = (QueueMessageReference)n;
         final Queue queue = (Queue)q;
+        
+        if (n.isExpired()) {
+            if (broker.isExpired(n)) {
+                queue.messageExpired(context, this, node);
+            } else {
+                LOG.debug("ignoring ack " + ack + ", for already expired 
message: " + n);
+            }
+            return;
+        }
         queue.removeMessage(context, this, node, ack);
     }
 

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
 Fri Jul 17 23:18:29 2009
@@ -25,6 +25,7 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+
 import javax.jms.InvalidClientIDException;
 import javax.jms.JMSException;
 import org.apache.activemq.broker.Broker;
@@ -682,14 +683,14 @@
     private boolean stampAsExpired(Message message) throws IOException {
         boolean stamped=false;
         if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
-            long expiration=message.getExpiration();
-            message.setExpiration(0);
+            long expiration=message.getExpiration();     
             message.setProperty(ORIGINAL_EXPIRATION,new Long(expiration));
             stamped = true;
         }
         return stamped;
     }
 
+    
     public void messageExpired(ConnectionContext context, MessageReference 
node) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Message expired " + node);
@@ -708,11 +709,10 @@
                                                
.getRegionDestination().getDeadLetterStrategy();
                                        if(deadLetterStrategy!=null){
                                                
if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
-                                                   if 
(node.getRegionDestination().getActiveMQDestination().isTopic()) {
-                                                       // message may be 
inflight to other subscriptions so do not modify
-                                                       message = 
message.copy();
-                                                   }
-                                                       
if(!message.isPersistent()){
+                                                   // message may be inflight 
to other subscriptions so do not modify
+                                                   message = message.copy();
+                                                   message.setExpiration(0);
+                                                   if(!message.isPersistent()){
                                                            
message.setPersistent(true);
                                                            
message.setProperty("originalDeliveryMode",
                                                                        
"NON_PERSISTENT");
@@ -727,7 +727,7 @@
                                                        if 
(context.getBroker()==null) {
                                                                
context.setBroker(getRoot());
                                                        }
-                                                       
BrokerSupport.resend(context,message,
+                                                       
BrokerSupport.resendNoCopy(context,message,
                                                                
deadLetterDestination);
                                                }
                                        } else {

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
 Fri Jul 17 23:18:29 2009
@@ -307,8 +307,8 @@
                                     // While waiting for space to free up... 
the
                                     // message may have expired.
                                     if (message.isExpired()) {
-                                        
getDestinationStatistics().getExpired().increment();
                                         broker.messageExpired(context, 
message);
+                                        
getDestinationStatistics().getExpired().increment();
                                     } else {
                                         doMessageSend(producerExchange, 
message);
                                     }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
 Fri Jul 17 23:18:29 2009
@@ -88,7 +88,6 @@
     private transient ActiveMQConnection connection;
     private transient org.apache.activemq.broker.region.Destination 
regionDestination;
     private transient MemoryUsage memoryUsage;
-    private transient boolean expired;
 
     private BrokerId[] brokerPath;
     private BrokerId[] cluster;
@@ -339,9 +338,6 @@
 
     public void setExpiration(long expiration) {
         this.expiration = expiration;
-        if (this.expiration > 0) {
-            expired = false;
-        }
     }
 
     /**
@@ -439,13 +435,8 @@
     }
 
     public boolean isExpired() {
-        if (!expired) {
-            long expireTime = getExpiration();
-            if (expireTime > 0 && System.currentTimeMillis() > expireTime) {
-                expired = true;
-            }
-        }
-        return expired;
+        long expireTime = getExpiration();
+        return expireTime > 0 && System.currentTimeMillis() > expireTime;
     }
 
     public boolean isAdvisory() {

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
 Fri Jul 17 23:18:29 2009
@@ -47,6 +47,16 @@
         TIMER_TASKS.put(task, timerTask);
     }
 
+    /*
+     * execute on rough schedual based on termination of last execution. There 
is no
+     * compensation (two runs in quick succession) for delays
+     */
+    public synchronized void schedualPeriodically(final Runnable task, long 
period) {
+        TimerTask timerTask = new SchedulerTimerTask(task);
+        CLOCK_DAEMON.schedule(timerTask, period, period);
+        TIMER_TASKS.put(task, timerTask);
+    }
+    
     public synchronized void cancel(Runnable task) {
        TimerTask ticket = TIMER_TASKS.remove(task);
         if (ticket != null) {

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
 Fri Jul 17 23:18:29 2009
@@ -32,6 +32,10 @@
     private BrokerSupport() {        
     }
     
+    public static void resendNoCopy(final ConnectionContext context, Message 
originalMessage, ActiveMQDestination deadLetterDestination) throws Exception {
+        doResend(context, originalMessage, deadLetterDestination, false);
+    }
+    
     /**
      * @param context
      * @param originalMessage 
@@ -39,7 +43,11 @@
      * @throws Exception
      */
     public static void resend(final ConnectionContext context, Message 
originalMessage, ActiveMQDestination deadLetterDestination) throws Exception {
-        Message message = originalMessage.copy();
+        doResend(context, originalMessage, deadLetterDestination, true);
+    }
+    
+    public static void doResend(final ConnectionContext context, Message 
originalMessage, ActiveMQDestination deadLetterDestination, boolean copy) 
throws Exception {       
+        Message message = copy ? originalMessage.copy() : originalMessage;
         message.setOriginalDestination(message.getDestination());
         message.setOriginalTransactionId(message.getTransactionId());
         message.setDestination(deadLetterDestination);

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
 Fri Jul 17 23:18:29 2009
@@ -49,12 +49,13 @@
 
     private static final Log LOG = 
LogFactory.getLog(ExpiredMessagesTest.class);
     
-       BrokerService broker;
-       Connection connection;
-       Session session;
-       MessageProducer producer;
-       MessageConsumer consumer;
-       public ActiveMQDestination destination = new ActiveMQQueue("test");
+    BrokerService broker;
+    Connection connection;
+    Session session;
+    MessageProducer producer;
+    MessageConsumer consumer;
+    public ActiveMQDestination destination = new ActiveMQQueue("test");
+    public ActiveMQDestination dlqDestination = new 
ActiveMQQueue("ActiveMQ.DLQ");
     public boolean useTextMessage = true;
     public boolean useVMCursor = true;
     
@@ -103,12 +104,12 @@
                
         consumerThread.start();
                
-               
+               final int numMessagesToSend = 10000;
                Thread producingThread = new Thread("Producing Thread") {
             public void run() {
                 try {
                        int i = 0;
-                       while (i++ < 10000) {
+                       while (i++ < numMessagesToSend) {
                                
producer.send(session.createTextMessage("test"));
                        }
                        producer.close();
@@ -159,10 +160,36 @@
                 return view.getQueueSize() == 0;
             }
         }));
+        
+        final long expiredBeforeEnqueue = numMessagesToSend - 
view.getEnqueueCount();
+        final long totalExpiredCount = view.getExpiredCount() + 
expiredBeforeEnqueue;
+        
+        final DestinationViewMBean dlqView = createView(dlqDestination);
+        LOG.info("DLQ stats: size= " + dlqView.getQueueSize() + ", enqueues: " 
+ dlqView.getDequeueCount() + ", dequeues: " + dlqView.getDequeueCount()
+                + ", dispatched: " + dlqView.getDispatchCount() + ", inflight: 
" + dlqView.getInFlightCount() + ", expiries: " + dlqView.getExpiredCount());
+        
+        Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                return totalExpiredCount == dlqView.getQueueSize();
+            }
+        });
+        assertEquals("dlq contains all expired", totalExpiredCount, 
dlqView.getQueueSize());
+        
+        // verify DQL
+        MessageConsumer dlqConsumer = createDlqConsumer(connection);
+        int count = 0;
+        while (dlqConsumer.receive(4000) != null) {
+            count++;
+        }
+        assertEquals("dlq returned all expired", count, totalExpiredCount);
        }
 
        
-       public void initCombosForTestRecoverExpiredMessages() {
+       private MessageConsumer createDlqConsumer(Connection connection) throws 
Exception {
+           return connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE).createConsumer(dlqDestination);
+    }
+
+    public void initCombosForTestRecoverExpiredMessages() {
            addCombinationValues("useVMCursor", new Object[] {Boolean.TRUE, 
Boolean.FALSE});
        }
        
@@ -266,9 +293,9 @@
                 String domain = "org.apache.activemq";
                 ObjectName name;
                if (destination.isQueue()) {
-                       name = new ObjectName(domain + 
":BrokerName=localhost,Type=Queue,Destination=test");
+                       name = new ObjectName(domain + 
":BrokerName=localhost,Type=Queue,Destination=" + 
destination.getPhysicalName());
                } else {
-                       name = new ObjectName(domain + 
":BrokerName=localhost,Type=Topic,Destination=test");
+                       name = new ObjectName(domain + 
":BrokerName=localhost,Type=Topic,Destination=" + 
destination.getPhysicalName());
                }
                return 
(DestinationViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
 name, DestinationViewMBean.class, true);
        }


Reply via email to