Author: ritchiem Date: Thu Oct 8 12:40:23 2009 New Revision: 823149 URL: http://svn.apache.org/viewvc?rev=823149&view=rev Log: QPID-1440 : Code review changes from QPID-1289. All actioned except adding new createConsumer() method.
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=823149&r1=823148&r2=823149&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Thu Oct 8 12:40:23 2009 @@ -313,7 +313,7 @@ protected AMQConnectionDelegate _delegate; // this connection maximum number of prefetched messages - protected int _maxPrefetch; + private int _maxPrefetch; //Indicates whether persistent messages are synchronized private boolean _syncPersistence; Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=823149&r1=823148&r2=823149&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Thu Oct 8 12:40:23 2009 @@ -39,6 +39,15 @@ Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetchHigh, final int prefetchLow) throws JMSException; + /** + * Create an XASession with default prefetch values of: + * High = MaxPrefetch + * Low = MaxPrefetch / 2 + * @return XASession + * @throws JMSException thrown if there is a problem creating the session. + */ + XASession createXASession() throws JMSException; + XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException; void failoverPrep(); Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=823149&r1=823148&r2=823149&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Thu Oct 8 12:40:23 2009 @@ -100,6 +100,18 @@ } /** + * Create an XASession with default prefetch values of: + * High = MaxPrefetch + * Low = MaxPrefetch / 2 + * @return XASession + * @throws JMSException + */ + public XASession createXASession() throws JMSException + { + return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2); + } + + /** * create an XA Session and start it if required. */ public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=823149&r1=823148&r2=823149&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Thu Oct 8 12:40:23 2009 @@ -191,6 +191,18 @@ }, _conn).execute(); } + /** + * Create an XASession with default prefetch values of: + * High = MaxPrefetch + * Low = MaxPrefetch / 2 + * @return XASession + * @throws JMSException thrown if there is a problem creating the session. + */ + public XASession createXASession() throws JMSException + { + return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2); + } + private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException, FailoverException { Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=823149&r1=823148&r2=823149&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Oct 8 12:40:23 2009 @@ -21,7 +21,6 @@ package org.apache.qpid.client; import java.io.Serializable; -import java.io.IOException; import java.net.URISyntaxException; import java.text.MessageFormat; import java.util.ArrayList; @@ -92,7 +91,6 @@ import org.apache.qpid.jms.Session; import org.apache.qpid.thread.Threading; import org.apache.qpid.url.AMQBindingURL; -import org.apache.mina.common.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,8 +113,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession { - - public static final class IdToConsumerMap<C extends BasicMessageConsumer> { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; @@ -263,10 +259,10 @@ private int _ticket; /** Holds the high mark for prefetched message, at which the session is suspended. */ - private int _defaultPrefetchHighMark; + private int _prefetchHighMark; /** Holds the low mark for prefetched messages, below which the session is resumed. */ - private int _defaultPrefetchLowMark; + private int _prefetchLowMark; /** Holds the message listener, if any, which is attached to this session. */ private MessageListener _messageListener = null; @@ -447,13 +443,13 @@ _channelId = channelId; _messageFactoryRegistry = messageFactoryRegistry; - _defaultPrefetchHighMark = defaultPrefetchHighMark; - _defaultPrefetchLowMark = defaultPrefetchLowMark; + _prefetchHighMark = defaultPrefetchHighMark; + _prefetchLowMark = defaultPrefetchLowMark; if (_acknowledgeMode == NO_ACKNOWLEDGE) { _queue = - new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, + new FlowControllingBlockingQueue(_prefetchHighMark, _prefetchLowMark, new FlowControllingBlockingQueue.ThresholdListener() { private final AtomicBoolean _suspendState = new AtomicBoolean(); @@ -461,7 +457,7 @@ public void aboveThreshold(int currentValue) { _logger.debug( - "Above threshold(" + _defaultPrefetchHighMark + "Above threshold(" + _prefetchHighMark + ") so suspending channel. Current value is " + currentValue); _suspendState.set(true); new Thread(new SuspenderRunner(_suspendState)).start(); @@ -471,7 +467,7 @@ public void underThreshold(int currentValue) { _logger.debug( - "Below threshold(" + _defaultPrefetchLowMark + "Below threshold(" + _prefetchLowMark + ") so unsuspending channel. Current value is " + currentValue); _suspendState.set(false); new Thread(new SuspenderRunner(_suspendState)).start(); @@ -481,7 +477,7 @@ } else { - _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, null); + _queue = new FlowControllingBlockingQueue(_prefetchHighMark, null); } } @@ -897,7 +893,7 @@ { checkValidDestination(destination); - return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, + return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, false, messageSelector, null, true, true); } @@ -905,7 +901,7 @@ { checkValidDestination(destination); - return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic), null, null, + return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null, false, false); } @@ -913,7 +909,7 @@ { checkValidDestination(destination); - return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, true, null, null, + return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, true, null, null, false, false); } @@ -921,7 +917,7 @@ { checkValidDestination(destination); - return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic), + return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), messageSelector, null, false, false); } @@ -930,7 +926,7 @@ { checkValidDestination(destination); - return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, (destination instanceof Topic), + return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic), messageSelector, null, false, false); } @@ -939,7 +935,7 @@ { checkValidDestination(destination); - return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, true, + return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, true, messageSelector, null, false, false); } @@ -1363,17 +1359,17 @@ public int getDefaultPrefetch() { - return _defaultPrefetchHighMark; + return _prefetchHighMark; } public int getDefaultPrefetchHigh() { - return _defaultPrefetchHighMark; + return _prefetchHighMark; } public int getDefaultPrefetchLow() { - return _defaultPrefetchLowMark; + return _prefetchLowMark; } public AMQShortString getDefaultQueueExchangeName() Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java?rev=823149&r1=823148&r2=823149&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java Thu Oct 8 12:40:23 2009 @@ -47,7 +47,7 @@ public synchronized XASession createXASession() throws JMSException { checkNotClosed(); - return _delegate.createXASession(_maxPrefetch, _maxPrefetch / 2); + return _delegate.createXASession(); } //-- Interface XAQueueConnection Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java?rev=823149&r1=823148&r2=823149&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java Thu Oct 8 12:40:23 2009 @@ -39,7 +39,7 @@ * type: long */ public static final String MAX_PREFETCH_PROP_NAME = "max_prefetch"; - public static final String MAX_PREFETCH_DEFAULT = "5000"; + public static final String MAX_PREFETCH_DEFAULT = "500"; /** * When true a sync command is sent after every persistent messages. --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org