Author: ritchiem
Date: Thu Oct  8 12:40:23 2009
New Revision: 823149

URL: http://svn.apache.org/viewvc?rev=823149&view=rev
Log:
QPID-1440 : Code review changes from QPID-1289. All actioned except adding new 
createConsumer() method.

Modified:
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=823149&r1=823148&r2=823149&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 Thu Oct  8 12:40:23 2009
@@ -313,7 +313,7 @@
     protected AMQConnectionDelegate _delegate;
 
     // this connection maximum number of prefetched messages
-    protected int _maxPrefetch;
+    private int _maxPrefetch;
 
     //Indicates whether persistent messages are synchronized
     private boolean _syncPersistence;

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=823149&r1=823148&r2=823149&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
 Thu Oct  8 12:40:23 2009
@@ -39,6 +39,15 @@
     Session createSession(final boolean transacted, final int acknowledgeMode,
      final int prefetchHigh, final int prefetchLow) throws JMSException;
 
+    /**
+     * Create an XASession with default prefetch values of:
+     * High = MaxPrefetch
+     * Low  = MaxPrefetch / 2
+     * @return XASession
+     * @throws JMSException thrown if there is a problem creating the session.
+     */
+    XASession createXASession() throws JMSException;
+
     XASession createXASession(int prefetchHigh, int prefetchLow) throws 
JMSException;
 
     void failoverPrep();

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=823149&r1=823148&r2=823149&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
 Thu Oct  8 12:40:23 2009
@@ -100,6 +100,18 @@
     }
 
     /**
+     * Create an XASession with default prefetch values of:
+     * High = MaxPrefetch
+     * Low  = MaxPrefetch / 2
+     * @return XASession
+     * @throws JMSException
+     */
+    public XASession createXASession() throws JMSException
+    {
+        return createXASession((int) _conn.getMaxPrefetch(), (int) 
_conn.getMaxPrefetch() / 2);
+    }
+
+    /**
      * create an XA Session and start it if required.
      */
     public XASession createXASession(int prefetchHigh, int prefetchLow) throws 
JMSException

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=823149&r1=823148&r2=823149&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
 Thu Oct  8 12:40:23 2009
@@ -191,6 +191,18 @@
                 }, _conn).execute();
     }
 
+    /**
+     * Create an XASession with default prefetch values of:
+     * High = MaxPrefetch
+     * Low  = MaxPrefetch / 2
+     * @return XASession
+     * @throws JMSException thrown if there is a problem creating the session.
+     */        
+    public XASession createXASession() throws JMSException
+    {
+        return createXASession((int) _conn.getMaxPrefetch(), (int) 
_conn.getMaxPrefetch() / 2);
+    }
+
     private void createChannelOverWire(int channelId, int prefetchHigh, int 
prefetchLow, boolean transacted)
             throws AMQException, FailoverException
     {

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=823149&r1=823148&r2=823149&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Thu Oct  8 12:40:23 2009
@@ -21,7 +21,6 @@
 package org.apache.qpid.client;
 
 import java.io.Serializable;
-import java.io.IOException;
 import java.net.URISyntaxException;
 import java.text.MessageFormat;
 import java.util.ArrayList;
@@ -92,7 +91,6 @@
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.thread.Threading;
 import org.apache.qpid.url.AMQBindingURL;
-import org.apache.mina.common.IoSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -115,8 +113,6 @@
 public abstract class AMQSession<C extends BasicMessageConsumer, P extends 
BasicMessageProducer> extends Closeable implements Session, QueueSession, 
TopicSession
 {
 
-
-
     public static final class IdToConsumerMap<C extends BasicMessageConsumer>
     {
         private final BasicMessageConsumer[] _fastAccessConsumers = new 
BasicMessageConsumer[16];
@@ -263,10 +259,10 @@
     private int _ticket;
 
     /** Holds the high mark for prefetched message, at which the session is 
suspended. */
-    private int _defaultPrefetchHighMark;
+    private int _prefetchHighMark;
 
     /** Holds the low mark for prefetched messages, below which the session is 
resumed. */
-    private int _defaultPrefetchLowMark;
+    private int _prefetchLowMark;
 
     /** Holds the message listener, if any, which is attached to this session. 
*/
     private MessageListener _messageListener = null;
@@ -447,13 +443,13 @@
 
         _channelId = channelId;
         _messageFactoryRegistry = messageFactoryRegistry;
-        _defaultPrefetchHighMark = defaultPrefetchHighMark;
-        _defaultPrefetchLowMark = defaultPrefetchLowMark;
+        _prefetchHighMark = defaultPrefetchHighMark;
+        _prefetchLowMark = defaultPrefetchLowMark;
 
         if (_acknowledgeMode == NO_ACKNOWLEDGE)
         {
             _queue =
-                    new FlowControllingBlockingQueue(_defaultPrefetchHighMark, 
_defaultPrefetchLowMark,
+                    new FlowControllingBlockingQueue(_prefetchHighMark, 
_prefetchLowMark,
                                                      new 
FlowControllingBlockingQueue.ThresholdListener()
                                                      {
                                                          private final 
AtomicBoolean _suspendState = new AtomicBoolean();
@@ -461,7 +457,7 @@
                                                          public void 
aboveThreshold(int currentValue)
                                                          {
                                                              _logger.debug(
-                                                                     "Above 
threshold(" + _defaultPrefetchHighMark
+                                                                     "Above 
threshold(" + _prefetchHighMark
                                                                      + ") so 
suspending channel. Current value is " + currentValue);
                                                              
_suspendState.set(true);
                                                              new Thread(new 
SuspenderRunner(_suspendState)).start();
@@ -471,7 +467,7 @@
                                                          public void 
underThreshold(int currentValue)
                                                          {
                                                              _logger.debug(
-                                                                     "Below 
threshold(" + _defaultPrefetchLowMark
+                                                                     "Below 
threshold(" + _prefetchLowMark
                                                                      + ") so 
unsuspending channel. Current value is " + currentValue);
                                                              
_suspendState.set(false);
                                                              new Thread(new 
SuspenderRunner(_suspendState)).start();
@@ -481,7 +477,7 @@
         }
         else
         {
-            _queue = new 
FlowControllingBlockingQueue(_defaultPrefetchHighMark, null);
+            _queue = new FlowControllingBlockingQueue(_prefetchHighMark, null);
         }
     }
 
@@ -897,7 +893,7 @@
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, _defaultPrefetchHighMark, 
_defaultPrefetchLowMark, noLocal, false,
+        return createConsumerImpl(destination, _prefetchHighMark, 
_prefetchLowMark, noLocal, false,
                                   messageSelector, null, true, true);
     }
 
@@ -905,7 +901,7 @@
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, _defaultPrefetchHighMark, 
_defaultPrefetchLowMark, false, (destination instanceof Topic), null, null,
+        return createConsumerImpl(destination, _prefetchHighMark, 
_prefetchLowMark, false, (destination instanceof Topic), null, null,
                                   false, false);
     }
 
@@ -913,7 +909,7 @@
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, _defaultPrefetchHighMark, 
_defaultPrefetchLowMark, false, true, null, null,
+        return createConsumerImpl(destination, _prefetchHighMark, 
_prefetchLowMark, false, true, null, null,
                                   false, false);
     }
 
@@ -921,7 +917,7 @@
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, _defaultPrefetchHighMark, 
_defaultPrefetchLowMark, false, (destination instanceof Topic),
+        return createConsumerImpl(destination, _prefetchHighMark, 
_prefetchLowMark, false, (destination instanceof Topic),
                                   messageSelector, null, false, false);
     }
 
@@ -930,7 +926,7 @@
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, _defaultPrefetchHighMark, 
_defaultPrefetchLowMark, noLocal, (destination instanceof Topic),
+        return createConsumerImpl(destination, _prefetchHighMark, 
_prefetchLowMark, noLocal, (destination instanceof Topic),
                                   messageSelector, null, false, false);
     }
 
@@ -939,7 +935,7 @@
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, _defaultPrefetchHighMark, 
_defaultPrefetchLowMark, noLocal, true,
+        return createConsumerImpl(destination, _prefetchHighMark, 
_prefetchLowMark, noLocal, true,
                                   messageSelector, null, false, false);
     }
 
@@ -1363,17 +1359,17 @@
 
     public int getDefaultPrefetch()
     {
-        return _defaultPrefetchHighMark;
+        return _prefetchHighMark;
     }
 
     public int getDefaultPrefetchHigh()
     {
-        return _defaultPrefetchHighMark;
+        return _prefetchHighMark;
     }
 
     public int getDefaultPrefetchLow()
     {
-        return _defaultPrefetchLowMark;
+        return _prefetchLowMark;
     }
 
     public AMQShortString getDefaultQueueExchangeName()

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java?rev=823149&r1=823148&r2=823149&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
 Thu Oct  8 12:40:23 2009
@@ -47,7 +47,7 @@
     public synchronized XASession createXASession() throws JMSException
     {
         checkNotClosed();
-        return _delegate.createXASession(_maxPrefetch, _maxPrefetch / 2);
+        return _delegate.createXASession();
     }
 
     //-- Interface  XAQueueConnection

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java?rev=823149&r1=823148&r2=823149&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
 Thu Oct  8 12:40:23 2009
@@ -39,7 +39,7 @@
      * type: long
      */
     public static final String MAX_PREFETCH_PROP_NAME = "max_prefetch";
-    public static final String MAX_PREFETCH_DEFAULT = "5000";
+    public static final String MAX_PREFETCH_DEFAULT = "500";
 
     /**
      * When true a sync command is sent after every persistent messages.



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to