Author: chirino
Date: Fri Mar 9 09:29:30 2007
New Revision: 516475
URL: http://svn.apache.org/viewvc?view=rev&rev=516475
Log:
Adding the bits need to do producer flow control with a window to the broker.
Just implemented on the Queue case for now.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java?view=diff&rev=516475&r1=516474&r2=516475
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
Fri Mar 9 09:29:30 2007
@@ -17,6 +17,9 @@
*/
package org.apache.activemq;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
@@ -36,9 +39,6 @@
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.util.IntrospectionSupport;
-import java.util.HashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* A client uses a <CODE>MessageProducer</CODE> object to send messages to a
* destination. A <CODE>MessageProducer</CODE> object is created by passing a
@@ -496,11 +496,7 @@
}
}
- int size = this.session.send(this, dest, message, deliveryMode,
priority, timeToLive);
-
- if( producerWindow!=null ) {
- producerWindow.increaseUsage(size);
- }
+ this.session.send(this, dest, message, deliveryMode, priority,
timeToLive, producerWindow);
stats.onMessage();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?view=diff&rev=516475&r1=516474&r2=516475
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Fri Mar 9 09:29:30 2007
@@ -17,32 +17,78 @@
*/
package org.apache.activemq;
-import org.apache.activemq.command.*;
+import java.io.File;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.net.URL;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.jms.TransactionRolledBackException;
+
+import org.apache.activemq.blob.BlobTransferPolicy;
+import org.apache.activemq.blob.BlobUploader;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTempDestination;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.TransactionId;
import org.apache.activemq.management.JMSSessionStatsImpl;
import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl;
+import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.LongSequenceGenerator;
-import org.apache.activemq.blob.BlobUploader;
-import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-import javax.jms.Message;
-import java.io.Serializable;
-import java.io.File;
-import java.io.InputStream;
-import java.net.URL;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-
/**
* <P>
* A <CODE>Session</CODE> object is a single-threaded context for producing
@@ -1546,11 +1592,13 @@
* message priority.
* @param timeToLive -
* message expiration.
+ * @param producerWindow
* @throws JMSException
*/
- protected int send(ActiveMQMessageProducer producer,
+ protected void send(ActiveMQMessageProducer producer,
ActiveMQDestination destination,Message message,int
deliveryMode,
- int priority,long timeToLive) throws JMSException{
+ int priority,long timeToLive, UsageManager producerWindow)
throws JMSException{
+
checkClosed();
if(destination.isTemporary()&&connection.isDeleted(destination)){
throw new JMSException("Cannot publish to a deleted
Destination: "
@@ -1598,15 +1646,18 @@
}
if(!connection.isAlwaysSyncSend()&&(!msg.isPersistent()||connection.isUseAsyncSend()||txid!=null)){
this.connection.asyncSendPacket(msg);
+ if( producerWindow!=null ) {
+ // Since we defer lots of the marshaling till
we hit the wire, this might not
+ // provide and accurate size. We may change
over to doing more aggressive marshaling,
+ // to get more accurate sizes.. this is more
important once users start using producer window
+ // flow control.
+ int size = msg.getSize();
+ producerWindow.increaseUsage(size);
+ }
}else{
this.connection.syncSendPacket(msg);
}
- // Since we defer lots of the marshaling till we hit
the wire, this might not
- // provide and accurate size. We may change over to
doing more aggressive marshaling,
- // to get more accurate sizes.. this is more important
once users start using producer window
- // flow control.
- return msg.getSize();
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?view=diff&rev=516475&r1=516474&r2=516475
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
Fri Mar 9 09:29:30 2007
@@ -58,6 +58,7 @@
private boolean networkConnection;
private final AtomicBoolean stopping = new AtomicBoolean();
private final MessageEvaluationContext messageEvaluationContext = new
MessageEvaluationContext();
+ private boolean dontSendReponse;
public ConnectionContext() {
}
@@ -258,6 +259,14 @@
public AtomicBoolean getStopping() {
return stopping;
+ }
+
+ public void setDontSendReponse(boolean b) {
+ this.dontSendReponse=b;
+ }
+
+ public boolean isDontSendReponse() {
+ return dontSendReponse;
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=516475&r1=516474&r2=516475
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Fri Mar 9 09:29:30 2007
@@ -124,6 +124,7 @@
private final Map<ConsumerId,ConsumerBrokerExchange>consumerExchanges =
new HashMap<ConsumerId,ConsumerBrokerExchange>();
private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
protected AtomicBoolean dispatchStopped=new AtomicBoolean(false);
+ private ConnectionContext context;
private boolean networkConnection;
private AtomicInteger protocolVersion=new
AtomicInteger(CommandTypes.PROTOCOL_VERSION);
@@ -284,6 +285,16 @@
}
response.setCorrelationId(commandId);
}
+
+ // The context may have been flagged so that the response is not sent.
+ if( context!=null ) {
+ if( context.isDontSendReponse() ) {
+ context.setDontSendReponse(false);
+ response=null;
+ }
+ context=null;
+ }
+
return response;
}
@@ -344,7 +355,7 @@
synchronized public Response processBeginTransaction(TransactionInfo info)
throws Exception{
ConnectionState
cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
- ConnectionContext context=null;
+ context=null;
if(cs!=null){
context=cs.getContext();
}
@@ -365,7 +376,7 @@
synchronized public Response processPrepareTransaction(TransactionInfo
info) throws Exception{
ConnectionState
cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
- ConnectionContext context=null;
+ context=null;
if(cs!=null){
context=cs.getContext();
}
@@ -388,7 +399,7 @@
synchronized public Response
processCommitTransactionOnePhase(TransactionInfo info) throws Exception{
ConnectionState
cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
- ConnectionContext context=null;
+ context=null;
if(cs!=null){
context=cs.getContext();
}
@@ -399,7 +410,7 @@
synchronized public Response
processCommitTransactionTwoPhase(TransactionInfo info) throws Exception{
ConnectionState
cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
- ConnectionContext context=null;
+ context=null;
if(cs!=null){
context=cs.getContext();
}
@@ -410,7 +421,7 @@
synchronized public Response processRollbackTransaction(TransactionInfo
info) throws Exception{
ConnectionState
cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
- ConnectionContext context=null;
+ context=null;
if(cs!=null){
context=cs.getContext();
}
@@ -421,7 +432,7 @@
synchronized public Response processForgetTransaction(TransactionInfo
info) throws Exception{
ConnectionState
cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
- ConnectionContext context=null;
+ context=null;
if(cs!=null){
context=cs.getContext();
}
@@ -431,7 +442,7 @@
synchronized public Response processRecoverTransactions(TransactionInfo
info) throws Exception{
ConnectionState
cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
- ConnectionContext context=null;
+ context=null;
if(cs!=null){
context=cs.getContext();
}
@@ -626,7 +637,7 @@
log.debug("Setting up new connection: "+this);
// Setup the context.
String clientId=info.getClientId();
- ConnectionContext context=new ConnectionContext();
+ context=new ConnectionContext();
context.setConnection(this);
context.setBroker(broker);
context.setConnector(connector);
@@ -1096,7 +1107,7 @@
synchronized(producerExchanges){
result=new ProducerBrokerExchange();
ConnectionState state=lookupConnectionState(id);
- ConnectionContext context=state.getContext();
+ context=state.getContext();
result.setConnectionContext(context);
SessionState ss=state.getSessionState(id.getParentId());
if(ss!=null){
@@ -1125,7 +1136,7 @@
synchronized(consumerExchanges){
result=new ConsumerBrokerExchange();
ConnectionState state=lookupConnectionState(id);
- ConnectionContext context=state.getContext();
+ context=state.getContext();
result.setConnectionContext(context);
SessionState ss=state.getSessionState(id.getParentId());
if(ss!=null){
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=516475&r1=516474&r2=516475
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Mar 9 09:29:30 2007
@@ -42,9 +42,12 @@
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerAck;
+import org.apache.activemq.command.Response;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.kaha.Store;
@@ -78,7 +81,6 @@
private final DestinationStatistics destinationStatistics = new
DestinationStatistics();
private PendingMessageCursor messages;
private final LinkedList pagedInMessages = new LinkedList();
-
private LockOwner exclusiveOwner;
private MessageGroupMap messageGroupOwners;
@@ -95,7 +97,7 @@
private final Object doDispatchMutex = new Object();
private TaskRunner taskRunner;
private boolean started = false;
-
+
public Queue(ActiveMQDestination destination, final UsageManager
memoryManager, MessageStore store, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
this.destination = destination;
@@ -318,6 +320,23 @@
}
}
+
+ private final LinkedList<Runnable> messagesWaitingForSpace = new
LinkedList<Runnable>();
+ private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
+ public void run() {
+
+ // We may need to do this in async thread since this is run for
within a synchronization
+ // that the UsageManager is holding.
+
+ synchronized( messagesWaitingForSpace ) {
+ while( !usageManager.isFull() &&
!messagesWaitingForSpace.isEmpty()) {
+ Runnable op =
messagesWaitingForSpace.removeFirst();
+ op.run();
+ }
+ }
+
+ };
+ };
public void send(final ProducerBrokerExchange producerExchange,final
Message message) throws Exception {
final ConnectionContext context =
producerExchange.getConnectionContext();
@@ -327,27 +346,88 @@
if (log.isDebugEnabled()) {
log.debug("Expired message: " + message);
}
+ if( producerExchange.getProducerState().getInfo().getWindowSize()
> 0 || !message.isResponseRequired() ) {
+ ProducerAck ack = new
ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
+ context.getConnection().dispatchAsync(ack);
+ }
return;
}
- if (context.isProducerFlowControl() && !context.isNetworkConnection())
{
- if(usageManager.isSendFailIfNoSpace()&&usageManager.isFull()){
- throw new javax.jms.ResourceAllocationException("Usage Manager
memory limit reached");
- }else{
- while( !usageManager.waitForSpace(1000) ) {
- if( context.getStopping().get() )
- throw new IOException("Connection closed, send
aborted.");
- }
- // The usage manager could have delayed us by the time
- // we unblock the message could have expired..
- if(message.isExpired()){
- if (log.isDebugEnabled()) {
- log.debug("Expired message: " + message);
- }
- return;
- }
- }
+ if ( context.isProducerFlowControl() ) {
+ if( usageManager.isFull() ) {
+ if(usageManager.isSendFailIfNoSpace()){
+ throw new javax.jms.ResourceAllocationException("Usage
Manager memory limit reached");
+ }else{
+
+ // We can avoid blocking due to low usage if the
producer is sending a sync message or
+ // if it is using a producer window
+ if(
producerExchange.getProducerState().getInfo().getWindowSize() > 0 ||
message.isResponseRequired() ) {
+ synchronized( messagesWaitingForSpace ) {
+ messagesWaitingForSpace.add(new
Runnable() {
+ public void run() {
+ try {
+ doMessageSend(producerExchange,
message);
+ if(
message.isResponseRequired() ) {
+ Response
response = new Response();
+
response.setCorrelationId(message.getCommandId());
+
context.getConnection().dispatchAsync(response);
+ } else {
+ ProducerAck ack = new
ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
+
context.getConnection().dispatchAsync(ack);
+ }
+ } catch
(Exception e) {
+ if(
message.isResponseRequired() ) {
+
ExceptionResponse response = new ExceptionResponse(e);
+
response.setCorrelationId(message.getCommandId());
+
context.getConnection().dispatchAsync(response);
+ } else {
+ ProducerAck ack = new
ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
+
context.getConnection().dispatchAsync(ack);
+ }
+ }
+ }
+ });
+
+ // If the user manager is not full,
then the task will not get called..
+ if(
!usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask) ) {
+ // so call it directly here.
+
sendMessagesWaitingForSpaceTask.run();
+ }
+
+ context.setDontSendReponse(true);
+ return;
+ }
+
+ } else {
+
+ // Producer flow control cannot be used, so we
have do the flow control at the broker
+ // by blocking this thread until there is space
available.
+ while( !usageManager.waitForSpace(1000) ) {
+ if( context.getStopping().get() )
+ throw new IOException("Connection
closed, send aborted.");
+ }
+
+ // The usage manager could have delayed us by
the time
+ // we unblock the message could have expired..
+ if(message.isExpired()){
+ if (log.isDebugEnabled()) {
+ log.debug("Expired message: " +
message);
+ }
+ if(
producerExchange.getProducerState().getInfo().getWindowSize() > 0 ||
!message.isResponseRequired() ) {
+ ProducerAck ack = new
ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
+
context.getConnection().dispatchAsync(ack);
+ }
+ return;
+ }
+ }
+ }
+ }
}
- message.setRegionDestination(this);
+ doMessageSend(producerExchange, message);
+ }
+
+ private void doMessageSend(final ProducerBrokerExchange
producerExchange, final Message message) throws IOException, Exception {
+ final ConnectionContext context =
producerExchange.getConnectionContext();
+ message.setRegionDestination(this);
if(store!=null&&message.isPersistent()){
store.addMessage(context,message);
}
@@ -361,12 +441,16 @@
messages.addMessageLast(message);
}
// It could take while before we receive the commit
- // operration.. by that time the message could have
expired..
+ // op, by that time the message could have expired..
if(message.isExpired()){
// TODO: remove message from store.
if (log.isDebugEnabled()) {
log.debug("Expired message: " + message);
}
+ if(
producerExchange.getProducerState().getInfo().getWindowSize() > 0 ||
!message.isResponseRequired() ) {
+ ProducerAck ack = new
ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
+
context.getConnection().dispatchAsync(ack);
+ }
return;
}
sendMessage(context,message);
@@ -379,9 +463,7 @@
sendMessage(context,message);
}
- }
-
-
+ }
public void dispose(ConnectionContext context) throws IOException {
if (store != null) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java?view=diff&rev=516475&r1=516474&r2=516475
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java
Fri Mar 9 09:29:30 2007
@@ -38,6 +38,11 @@
public ProducerAck() {
}
+ public ProducerAck(ProducerId producerId, int size) {
+ this.producerId = producerId;
+ this.size = size;
+ }
+
public void copy(ProducerAck copy) {
super.copy(copy);
copy.producerId = producerId;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java?view=diff&rev=516475&r1=516474&r2=516475
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
Fri Mar 9 09:29:30 2007
@@ -18,14 +18,14 @@
package org.apache.activemq.memory;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.util.concurrent.CopyOnWriteArrayList;
-
/**
* Used to keep track of how much of something is being used so that
@@ -60,6 +60,7 @@
private String name = "";
private float usagePortion = 1.0f;
private List<UsageManager> children = new
CopyOnWriteArrayList<UsageManager>();
+ private final LinkedList<Runnable> callbacks = new LinkedList<Runnable>();
public UsageManager() {
this(null,"default");
@@ -292,6 +293,11 @@
if(oldPercentUsage>=100&&newPercentUsage<100){
synchronized(usageMutex){
usageMutex.notifyAll();
+ for (Iterator iter = callbacks.iterator(); iter.hasNext();) {
+ Runnable callback = (Runnable)
iter.next();
+ callback.run();
+ }
+ callbacks.clear();
}
}
// Let the listeners know
@@ -331,4 +337,37 @@
private void removeChild(UsageManager child){
children.remove(child);
}
+
+ /**
+ * @param callback
+ * @return true if the UsageManager was full. The callback will only be
called if this method returns true.
+ */
+ public boolean notifyCallbackWhenNotFull( final Runnable callback ) {
+
+ if(parent!=null) {
+ Runnable r = new Runnable(){
+ public void run() {
+ synchronized (usageMutex) {
+ if( percentUsage >= 100 ) {
+ callbacks.add(callback);
+ } else {
+ callback.run();
+ }
+ }
+ }
+ };
+ if( parent.notifyCallbackWhenNotFull(r) ) {
+ return true;
+ }
+ }
+ synchronized (usageMutex) {
+ if( percentUsage >= 100 ) {
+ callbacks.add(callback);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
}