Author: chirino
Date: Fri Mar  9 09:29:30 2007
New Revision: 516475

URL: http://svn.apache.org/viewvc?view=rev&rev=516475
Log:
Adding the bits need to do producer flow control with a window to the broker. 
Just implemented on the Queue case for now.


Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java?view=diff&rev=516475&r1=516474&r2=516475
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
 Fri Mar  9 09:29:30 2007
@@ -17,6 +17,9 @@
  */
 package org.apache.activemq;
 
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.IllegalStateException;
@@ -36,9 +39,6 @@
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.util.IntrospectionSupport;
 
-import java.util.HashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * A client uses a <CODE>MessageProducer</CODE> object to send messages to a
  * destination. A <CODE>MessageProducer</CODE> object is created by passing a
@@ -496,11 +496,7 @@
                        }
         }
         
-        int size = this.session.send(this, dest, message, deliveryMode, 
priority, timeToLive);
-
-        if( producerWindow!=null ) {
-                       producerWindow.increaseUsage(size);
-        }
+        this.session.send(this, dest, message, deliveryMode, priority, 
timeToLive, producerWindow);
         
         stats.onMessage();            
     }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?view=diff&rev=516475&r1=516474&r2=516475
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
 Fri Mar  9 09:29:30 2007
@@ -17,32 +17,78 @@
  */
 package org.apache.activemq;
 
-import org.apache.activemq.command.*;
+import java.io.File;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.net.URL;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.jms.TransactionRolledBackException;
+
+import org.apache.activemq.blob.BlobTransferPolicy;
+import org.apache.activemq.blob.BlobUploader;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTempDestination;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.management.JMSSessionStatsImpl;
 import org.apache.activemq.management.StatsCapable;
 import org.apache.activemq.management.StatsImpl;
+import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.util.Callback;
 import org.apache.activemq.util.LongSequenceGenerator;
-import org.apache.activemq.blob.BlobUploader;
-import org.apache.activemq.blob.BlobTransferPolicy;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-import javax.jms.Message;
-import java.io.Serializable;
-import java.io.File;
-import java.io.InputStream;
-import java.net.URL;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 /**
  * <P>
  * A <CODE>Session</CODE> object is a single-threaded context for producing
@@ -1546,11 +1592,13 @@
      *            message priority.
      * @param timeToLive -
      *            message expiration.
+     * @param producerWindow 
      * @throws JMSException
      */
-    protected int send(ActiveMQMessageProducer producer,
+    protected void send(ActiveMQMessageProducer producer,
                ActiveMQDestination destination,Message message,int 
deliveryMode,
-               int priority,long timeToLive) throws JMSException{
+               int priority,long timeToLive, UsageManager producerWindow) 
throws JMSException{
+       
                checkClosed();
                
if(destination.isTemporary()&&connection.isDeleted(destination)){
                        throw new JMSException("Cannot publish to a deleted 
Destination: "
@@ -1598,15 +1646,18 @@
                        }
                        
if(!connection.isAlwaysSyncSend()&&(!msg.isPersistent()||connection.isUseAsyncSend()||txid!=null)){
                 this.connection.asyncSendPacket(msg);
+                       if( producerWindow!=null ) {
+                               // Since we defer lots of the marshaling till 
we hit the wire, this might not 
+                               // provide and accurate size.  We may change 
over to doing more aggressive marshaling,
+                               // to get more accurate sizes.. this is more 
important once users start using producer window
+                               // flow control.                        
+                               int size = msg.getSize();
+                               producerWindow.increaseUsage(size);
+                       }
             }else{
                 this.connection.syncSendPacket(msg);
             }
 
-                       // Since we defer lots of the marshaling till we hit 
the wire, this might not 
-                       // provide and accurate size.  We may change over to 
doing more aggressive marshaling,
-                       // to get more accurate sizes.. this is more important 
once users start using producer window
-                       // flow control.
-                       return msg.getSize();
                }
        }
 

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?view=diff&rev=516475&r1=516474&r2=516475
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
 Fri Mar  9 09:29:30 2007
@@ -58,6 +58,7 @@
     private boolean networkConnection;
     private final AtomicBoolean stopping = new AtomicBoolean();
     private final MessageEvaluationContext messageEvaluationContext = new 
MessageEvaluationContext();
+       private boolean dontSendReponse;
     
     public ConnectionContext() {
     }
@@ -258,6 +259,14 @@
        
        public AtomicBoolean getStopping() {
                return stopping;
+       }
+
+       public void setDontSendReponse(boolean b) {
+               this.dontSendReponse=b;         
+       }
+
+       public boolean isDontSendReponse() {
+               return dontSendReponse;
        }       
        
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=516475&r1=516474&r2=516475
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
 Fri Mar  9 09:29:30 2007
@@ -124,6 +124,7 @@
     private final Map<ConsumerId,ConsumerBrokerExchange>consumerExchanges = 
new HashMap<ConsumerId,ConsumerBrokerExchange>();
     private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
     protected AtomicBoolean dispatchStopped=new AtomicBoolean(false);
+    private ConnectionContext context;
     private boolean networkConnection;
     private AtomicInteger protocolVersion=new 
AtomicInteger(CommandTypes.PROTOCOL_VERSION);
     
@@ -284,6 +285,16 @@
             }
             response.setCorrelationId(commandId);
         }
+        
+        // The context may have been flagged so that the response is not sent.
+        if( context!=null ) {
+               if( context.isDontSendReponse() ) {
+                       context.setDontSendReponse(false);
+                       response=null;
+               }
+            context=null;
+        }
+        
         return response;
     }
 
@@ -344,7 +355,7 @@
 
     synchronized public Response processBeginTransaction(TransactionInfo info) 
throws Exception{
         ConnectionState 
cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
-        ConnectionContext context=null;
+        context=null;
         if(cs!=null){
             context=cs.getContext();
         }
@@ -365,7 +376,7 @@
 
     synchronized public Response processPrepareTransaction(TransactionInfo 
info) throws Exception{
         ConnectionState 
cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
-        ConnectionContext context=null;
+        context=null;
         if(cs!=null){
             context=cs.getContext();
         }
@@ -388,7 +399,7 @@
 
     synchronized public Response 
processCommitTransactionOnePhase(TransactionInfo info) throws Exception{
         ConnectionState 
cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
-        ConnectionContext context=null;
+        context=null;
         if(cs!=null){
             context=cs.getContext();
         }
@@ -399,7 +410,7 @@
 
     synchronized public Response 
processCommitTransactionTwoPhase(TransactionInfo info) throws Exception{
         ConnectionState 
cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
-        ConnectionContext context=null;
+        context=null;
         if(cs!=null){
             context=cs.getContext();
         }
@@ -410,7 +421,7 @@
 
     synchronized public Response processRollbackTransaction(TransactionInfo 
info) throws Exception{
         ConnectionState 
cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
-        ConnectionContext context=null;
+        context=null;
         if(cs!=null){
             context=cs.getContext();
         }
@@ -421,7 +432,7 @@
 
     synchronized public Response processForgetTransaction(TransactionInfo 
info) throws Exception{
         ConnectionState 
cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
-        ConnectionContext context=null;
+        context=null;
         if(cs!=null){
             context=cs.getContext();
         }
@@ -431,7 +442,7 @@
 
     synchronized public Response processRecoverTransactions(TransactionInfo 
info) throws Exception{
         ConnectionState 
cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
-        ConnectionContext context=null;
+        context=null;
         if(cs!=null){
             context=cs.getContext();
         }
@@ -626,7 +637,7 @@
         log.debug("Setting up new connection: "+this);
         // Setup the context.
         String clientId=info.getClientId();
-        ConnectionContext context=new ConnectionContext();
+        context=new ConnectionContext();
         context.setConnection(this);
         context.setBroker(broker);
         context.setConnector(connector);
@@ -1096,7 +1107,7 @@
             synchronized(producerExchanges){
                 result=new ProducerBrokerExchange();
                 ConnectionState state=lookupConnectionState(id);
-                ConnectionContext context=state.getContext();
+                context=state.getContext();
                 result.setConnectionContext(context);
                 SessionState ss=state.getSessionState(id.getParentId());
                 if(ss!=null){
@@ -1125,7 +1136,7 @@
             synchronized(consumerExchanges){
                 result=new ConsumerBrokerExchange();
                 ConnectionState state=lookupConnectionState(id);
-                ConnectionContext context=state.getContext();
+                context=state.getContext();
                 result.setConnectionContext(context);
                 SessionState ss=state.getSessionState(id.getParentId());
                 if(ss!=null){

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=516475&r1=516474&r2=516475
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 Fri Mar  9 09:29:30 2007
@@ -42,9 +42,12 @@
 import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerAck;
+import org.apache.activemq.command.Response;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.kaha.Store;
@@ -78,7 +81,6 @@
     private final DestinationStatistics destinationStatistics = new 
DestinationStatistics();
     private  PendingMessageCursor messages;
     private final LinkedList pagedInMessages = new LinkedList();
-
     private LockOwner exclusiveOwner;
     private MessageGroupMap messageGroupOwners;
 
@@ -95,7 +97,7 @@
     private final Object doDispatchMutex = new Object();
     private TaskRunner taskRunner;
     private boolean started = false;
-
+    
     public Queue(ActiveMQDestination destination, final UsageManager 
memoryManager, MessageStore store, DestinationStatistics parentStats,
             TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
         this.destination = destination;
@@ -318,6 +320,23 @@
         }
 
     }
+    
+    private final LinkedList<Runnable> messagesWaitingForSpace = new 
LinkedList<Runnable>();
+    private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
+       public void run() {
+               
+               // We may need to do this in async thread since this is run for 
within a synchronization
+               // that the UsageManager is holding.
+               
+               synchronized( messagesWaitingForSpace ) {
+                       while( !usageManager.isFull() && 
!messagesWaitingForSpace.isEmpty()) {
+                               Runnable op = 
messagesWaitingForSpace.removeFirst();
+                               op.run();
+                       }
+               }
+               
+       };
+    };
 
     public void send(final ProducerBrokerExchange producerExchange,final 
Message message) throws Exception {
        final ConnectionContext context = 
producerExchange.getConnectionContext(); 
@@ -327,27 +346,88 @@
             if (log.isDebugEnabled()) {
                 log.debug("Expired message: " + message);
             }
+            if( producerExchange.getProducerState().getInfo().getWindowSize() 
> 0 || !message.isResponseRequired() ) {
+                       ProducerAck ack = new 
ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), 
message.getSize());
+                               context.getConnection().dispatchAsync(ack);     
                                                
+            }
             return;
         }
-        if (context.isProducerFlowControl() && !context.isNetworkConnection()) 
{
-            if(usageManager.isSendFailIfNoSpace()&&usageManager.isFull()){
-                throw new javax.jms.ResourceAllocationException("Usage Manager 
memory limit reached");
-            }else{
-                while( !usageManager.waitForSpace(1000) ) {
-                    if( context.getStopping().get() )
-                        throw new IOException("Connection closed, send 
aborted.");
-                }
-                // The usage manager could have delayed us by the time
-                // we unblock the message could have expired..
-                if(message.isExpired()){
-                    if (log.isDebugEnabled()) {
-                        log.debug("Expired message: " + message);
-                    }
-                    return;
-                }
-            }
+        if ( context.isProducerFlowControl() ) {
+               if( usageManager.isFull() ) {
+                   if(usageManager.isSendFailIfNoSpace()){
+                       throw new javax.jms.ResourceAllocationException("Usage 
Manager memory limit reached");
+                   }else{
+                       
+                       // We can avoid blocking due to low usage if the 
producer is sending a sync message or
+                       // if it is using a producer window
+                       if( 
producerExchange.getProducerState().getInfo().getWindowSize() > 0 || 
message.isResponseRequired() ) {
+                               synchronized( messagesWaitingForSpace ) {
+                                       messagesWaitingForSpace.add(new 
Runnable() {
+                                               public void run() {
+                                       try {                                   
                
+                                               doMessageSend(producerExchange, 
message);
+                                               if( 
message.isResponseRequired() ) {
+                                                               Response 
response = new Response();
+                                                               
response.setCorrelationId(message.getCommandId());
+                                                                               
context.getConnection().dispatchAsync(response);
+                                               } else {
+                                                       ProducerAck ack = new 
ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), 
message.getSize());
+                                                                               
context.getConnection().dispatchAsync(ack);                                     
                
+                                               }
+                                                               } catch 
(Exception e) {
+                                               if( 
message.isResponseRequired() ) {
+                                                               
ExceptionResponse response = new ExceptionResponse(e);
+                                                               
response.setCorrelationId(message.getCommandId());
+                                                                               
context.getConnection().dispatchAsync(response);                                
                                        
+                                               } else {
+                                                       ProducerAck ack = new 
ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), 
message.getSize());
+                                                                               
context.getConnection().dispatchAsync(ack);                                     
                
+                                               }
+                                                               }
+                                               }
+                                       });
+                                       
+                                       // If the user manager is not full, 
then the task will not get called..
+                                       if( 
!usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask) ) {
+                                               // so call it directly here.
+                                               
sendMessagesWaitingForSpaceTask.run();
+                                       }
+                                       
+                                       context.setDontSendReponse(true);
+                                       return;
+                               }
+                               
+                       } else {
+                               
+                               // Producer flow control cannot be used, so we 
have do the flow control at the broker 
+                               // by blocking this thread until there is space 
available.                              
+                               while( !usageManager.waitForSpace(1000) ) {
+                                   if( context.getStopping().get() )
+                                       throw new IOException("Connection 
closed, send aborted.");
+                               }
+                               
+                               // The usage manager could have delayed us by 
the time
+                               // we unblock the message could have expired..
+                               if(message.isExpired()){
+                                   if (log.isDebugEnabled()) {
+                                       log.debug("Expired message: " + 
message);
+                                   }
+                                   if( 
producerExchange.getProducerState().getInfo().getWindowSize() > 0 || 
!message.isResponseRequired() ) {
+                                               ProducerAck ack = new 
ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), 
message.getSize());
+                                                       
context.getConnection().dispatchAsync(ack);                                     
                
+                                   }
+                                   return;
+                               }
+                       }
+                   }
+               }
         }
-        message.setRegionDestination(this);
+        doMessageSend(producerExchange, message);
+    }
+
+       private void doMessageSend(final ProducerBrokerExchange 
producerExchange, final Message message) throws IOException, Exception {
+               final ConnectionContext context = 
producerExchange.getConnectionContext();
+               message.setRegionDestination(this);
         if(store!=null&&message.isPersistent()){
             store.addMessage(context,message);
         }
@@ -361,12 +441,16 @@
                         messages.addMessageLast(message);
                     }
                     // It could take while before we receive the commit
-                    // operration.. by that time the message could have 
expired..
+                    // op, by that time the message could have expired..
                     if(message.isExpired()){
                         // TODO: remove message from store.
                         if (log.isDebugEnabled()) {
                             log.debug("Expired message: " + message);
                         }
+                        if( 
producerExchange.getProducerState().getInfo().getWindowSize() > 0 || 
!message.isResponseRequired() ) {
+                               ProducerAck ack = new 
ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), 
message.getSize());
+                                       
context.getConnection().dispatchAsync(ack);                                     
                
+                        }
                         return;
                     }
                     sendMessage(context,message);
@@ -379,9 +463,7 @@
             sendMessage(context,message);
             
         }
-    }
-       
-    
+       }    
 
     public void dispose(ConnectionContext context) throws IOException {
         if (store != null) {

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java?view=diff&rev=516475&r1=516474&r2=516475
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java
 Fri Mar  9 09:29:30 2007
@@ -38,6 +38,11 @@
     public ProducerAck() {
     }
     
+    public ProducerAck(ProducerId producerId, int size) {
+       this.producerId = producerId;
+       this.size = size;
+    }
+    
     public void copy(ProducerAck copy) {
         super.copy(copy);
         copy.producerId = producerId;

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java?view=diff&rev=516475&r1=516474&r2=516475
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
 Fri Mar  9 09:29:30 2007
@@ -18,14 +18,14 @@
 package org.apache.activemq.memory;
 
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.activemq.Service;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.util.concurrent.CopyOnWriteArrayList;
-
 
 /**
  * Used to keep track of how much of something is being used so that 
@@ -60,6 +60,7 @@
     private String name = "";
     private float usagePortion = 1.0f;
     private List<UsageManager> children = new 
CopyOnWriteArrayList<UsageManager>();
+    private final LinkedList<Runnable> callbacks = new LinkedList<Runnable>();
 
     public UsageManager() {
         this(null,"default");
@@ -292,6 +293,11 @@
         if(oldPercentUsage>=100&&newPercentUsage<100){
             synchronized(usageMutex){
                 usageMutex.notifyAll();
+                for (Iterator iter = callbacks.iterator(); iter.hasNext();) {
+                                       Runnable callback = (Runnable) 
iter.next();
+                                       callback.run();
+                               }
+                callbacks.clear();
             }
         }
         // Let the listeners know
@@ -331,4 +337,37 @@
     private void removeChild(UsageManager child){
         children.remove(child);
     }
+    
+    /**
+     * @param callback
+     * @return true if the UsageManager was full.  The callback will only be 
called if this method returns true.
+     */
+    public boolean notifyCallbackWhenNotFull( final Runnable callback ) {
+        
+       if(parent!=null) {
+               Runnable r = new Runnable(){
+                               public void run() {
+                               synchronized (usageMutex) {
+                                   if( percentUsage >= 100 ) {
+                                       callbacks.add(callback);
+                                   } else {
+                                       callback.run();
+                                   }
+                               }
+                               }
+            };
+               if( parent.notifyCallbackWhenNotFull(r) ) {
+                       return true;
+               }
+       }
+        synchronized (usageMutex) {
+            if( percentUsage >= 100 ) {
+               callbacks.add(callback);
+               return true;
+            } else {
+               return false;
+            }
+        }
+    }
+
 }


Reply via email to