Author: rgodfrey
Date: Thu Mar 17 23:59:37 2016
New Revision: 1735525

URL: http://svn.apache.org/viewvc?rev=1735525&view=rev
Log:
QPID-6962 : Use a single context variable for max message size

Modified:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Test.java

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java?rev=1735525&r1=1735524&r2=1735525&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
 Thu Mar 17 23:59:37 2016
@@ -51,6 +51,10 @@ public interface Connection<X extends Co
     @ManagedContextDefault(name = MAX_UNCOMMITTED_IN_MEMORY_SIZE)
     long DEFAULT_MAX_UNCOMMITTED_IN_MEMORY_SIZE = 10l * 1024l * 1024l;
 
+    String MAX_MESSAGE_SIZE = "qpid.max_message_size";
+    @ManagedContextDefault(name = MAX_MESSAGE_SIZE)
+    int DEFAULT_MAX_MESSAGE_SIZE = 0x1f40000; // 500Mb
+
     @DerivedAttribute
     String getClientId();
 
@@ -58,6 +62,9 @@ public interface Connection<X extends Co
     String getClientVersion();
 
     @DerivedAttribute
+    String getClientProduct();
+
+    @DerivedAttribute
     boolean isIncoming();
 
     @DerivedAttribute

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1735525&r1=1735524&r2=1735525&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
 Thu Mar 17 23:59:37 2016
@@ -77,11 +77,6 @@ public interface VirtualHost<X extends V
     @ManagedContextDefault( name = VIRTUALHOST_WORK_DIR_VAR)
     public static final String VIRTUALHOST_WORK_DIR = 
VIRTUALHOST_WORK_DIR_VAR_EXPRESSION;
 
-    String MAX_MESSAGE_SIZE = "qpid.max_message_size";
-
-    @ManagedContextDefault(name = MAX_MESSAGE_SIZE)
-    int DEFAULT_MAX_MESSAGE_SIZE = 0x1f40000; // 500Mb
-
     @ManagedContextDefault( name = "queue.deadLetterQueueEnabled")
     public static final boolean DEFAULT_DEAD_LETTER_QUEUE_ENABLED = false;
     String DEFAULT_DLE_NAME_SUFFIX = "_DLE";

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java?rev=1735525&r1=1735524&r2=1735525&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
 Thu Mar 17 23:59:37 2016
@@ -82,11 +82,6 @@ public interface AmqpPort<X extends Amqp
     @ManagedContextDefault(name = PORT_AMQP_ACCEPT_BACKLOG)
     int DEFAULT_PORT_AMQP_ACCEPT_BACKLOG = 1024;
 
-    String PORT_MAX_MESSAGE_SIZE = "qpid.port.max_message_size";
-
-    @ManagedContextDefault(name = PORT_MAX_MESSAGE_SIZE)
-    int DEFAULT_MAX_MESSAGE_SIZE = 0x1f40000; // 500Mb
-
     String OPEN_CONNECTIONS_WARN_PERCENT = 
"qpid.port.open_connections_warn_percent";
 
     @ManagedContextDefault(name = OPEN_CONNECTIONS_WARN_PERCENT)

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1735525&r1=1735524&r2=1735525&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
 Thu Mar 17 23:59:37 2016
@@ -53,6 +53,7 @@ import org.apache.qpid.server.logging.su
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.Session;
@@ -218,6 +219,7 @@ public abstract class AbstractAMQPConnec
         return _aggregateTicker;
     }
 
+    @Override
     public final long getLastIoTime()
     {
         return Math.max(getLastReadTime(), getLastWriteTime());
@@ -245,16 +247,18 @@ public abstract class AbstractAMQPConnec
         _lastWriteTime = System.currentTimeMillis();
     }
 
+    @Override
     public final long getConnectionId()
     {
         return _connectionId;
     }
 
-    public final StatisticsCounter getMessageDeliveryStatistics()
+    private StatisticsCounter getMessageDeliveryStatistics()
     {
         return _messagesDelivered;
     }
 
+    @Override
     public String getRemoteAddressString()
     {
         return String.valueOf(_network.getRemoteAddress());
@@ -270,26 +274,31 @@ public abstract class AbstractAMQPConnec
         return this;
     }
 
+    @Override
     public boolean isConnectionStopped()
     {
         return _stopped;
     }
 
+    @Override
     public final String getVirtualHostName()
     {
         return getVirtualHost() == null ? null : getVirtualHost().getName();
     }
 
+    @Override
     public String getClientVersion()
     {
         return _clientVersion;
     }
 
+    @Override
     public String getRemoteProcessPid()
     {
         return _remoteProcessPid;
     }
 
+    @Override
     public void setScheduler(final NetworkConnectionScheduler 
networkConnectionScheduler)
     {
         if(_network instanceof NonBlockingConnection)
@@ -298,6 +307,7 @@ public abstract class AbstractAMQPConnec
         }
     }
 
+    @Override
     public String getClientProduct()
     {
         return _clientProduct;
@@ -305,32 +315,24 @@ public abstract class AbstractAMQPConnec
 
     protected void updateMaxMessageSize()
     {
+        _maxMessageSize.set(Math.min(getMaxMessageSize(getPort()), 
getMaxMessageSize(getVirtualHost())));
+    }
+
+    private long getMaxMessageSize(final ConfiguredObject<?> object)
+    {
         long maxMessageSize;
         try
         {
-            maxMessageSize = getPort().getContextValue(Integer.class, 
AmqpPort.PORT_MAX_MESSAGE_SIZE);
+            maxMessageSize = object.getContextValue(Integer.class, 
MAX_MESSAGE_SIZE);
         }
         catch (NullPointerException | IllegalArgumentException e)
         {
             _logger.warn("Context variable {} has invalid value and cannot be 
used to restrict maximum message size",
-                         AmqpPort.PORT_MAX_MESSAGE_SIZE,
+                         MAX_MESSAGE_SIZE,
                          e);
             maxMessageSize = Long.MAX_VALUE;
         }
-        try
-        {
-            maxMessageSize = Math.min(maxMessageSize,
-                                      (long) 
getVirtualHost().getContextValue(Integer.class, VirtualHost.MAX_MESSAGE_SIZE));
-        }
-        catch (NullPointerException | IllegalArgumentException e)
-        {
-
-            _logger.warn("Context variable {} has invalid value and cannot be 
used to restrict maximum message size",
-                         VirtualHost.MAX_MESSAGE_SIZE,
-                         e);
-        }
-
-        _maxMessageSize.set(maxMessageSize > 0 ? maxMessageSize : 
Long.MAX_VALUE);
+        return maxMessageSize > 0 ? maxMessageSize : Long.MAX_VALUE;
     }
 
     public long getMaxMessageSize()
@@ -338,11 +340,13 @@ public abstract class AbstractAMQPConnec
         return _maxMessageSize.get();
     }
 
+    @Override
     public void addDeleteTask(final Action<? super C> task)
     {
         _connectionCloseTaskList.add(task);
     }
 
+    @Override
     public void removeDeleteTask(final Action<? super C> task)
     {
         _connectionCloseTaskList.remove(task);
@@ -372,38 +376,42 @@ public abstract class AbstractAMQPConnec
         }
     }
 
+    @Override
     public String getClientId()
     {
         return _clientId;
     }
 
-    public final StatisticsCounter getDataReceiptStatistics()
+    private StatisticsCounter getDataReceiptStatistics()
     {
         return _dataReceived;
     }
 
-    public final StatisticsCounter getDataDeliveryStatistics()
+    private StatisticsCounter getDataDeliveryStatistics()
     {
         return _dataDelivered;
     }
 
+    @Override
     public final SocketAddress getRemoteSocketAddress()
     {
         return _network.getRemoteAddress();
     }
 
+    @Override
     public void registerMessageDelivered(long messageSize)
     {
         _messagesDelivered.registerEvent(1L);
         _dataDelivered.registerEvent(messageSize);
-        
((VirtualHost<?>)getVirtualHost()).registerMessageDelivered(messageSize);
+        getVirtualHost().registerMessageDelivered(messageSize);
     }
 
+    @Override
     public void registerMessageReceived(long messageSize, long timestamp)
     {
         _messagesReceived.registerEvent(1L, timestamp);
         _dataReceived.registerEvent(messageSize, timestamp);
-        
((VirtualHost<?>)getVirtualHost()).registerMessageReceived(messageSize, 
timestamp);
+        getVirtualHost().registerMessageReceived(messageSize, timestamp);
     }
 
     public final void resetStatistics()
@@ -414,7 +422,7 @@ public abstract class AbstractAMQPConnec
         _dataReceived.reset();
     }
 
-    public final StatisticsCounter getMessageReceiptStatistics()
+    private StatisticsCounter getMessageReceiptStatistics()
     {
         return _messagesReceived;
     }
@@ -717,6 +725,7 @@ public abstract class AbstractAMQPConnec
 
     protected abstract EventLogger getEventLogger();
 
+    @Override
     public final boolean isAuthorizedMessagePrincipal(final String userId)
     {
         return !_messageAuthorizationRequired || 
getAuthorizedPrincipal().getName().equals(userId == null? "" : userId);
@@ -727,7 +736,7 @@ public abstract class AbstractAMQPConnec
         private final long _allowedTime;
         private volatile long _accumulatedSchedulingDelay;
 
-        public SlowConnectionOpenTicker(long timeoutTime)
+        SlowConnectionOpenTicker(long timeoutTime)
         {
             _allowedTime = timeoutTime;
         }
@@ -735,9 +744,7 @@ public abstract class AbstractAMQPConnec
         @Override
         public int getTimeToNextTick(final long currentTime)
         {
-            final int timeToNextTick = (int) (getCreatedTime() + _allowedTime 
+ _accumulatedSchedulingDelay
-                                              - currentTime);
-            return timeToNextTick;
+            return (int) (getCreatedTime() + _allowedTime + 
_accumulatedSchedulingDelay - currentTime);
         }
 
         @Override

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java?rev=1735525&r1=1735524&r2=1735525&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
 Thu Mar 17 23:59:37 2016
@@ -100,7 +100,7 @@ public class AMQChannelTest extends Qpid
         _port = mock(AmqpPort.class);
         when(_port.getChildExecutor()).thenReturn(taskExecutor);
         when(_port.getModel()).thenReturn(BrokerModel.getInstance());
-        when(_port.getContextValue(Integer.class, 
AmqpPort.PORT_MAX_MESSAGE_SIZE)).thenReturn(1);
+        when(_port.getContextValue(Integer.class, 
Connection.MAX_MESSAGE_SIZE)).thenReturn(1);
 
         AuthenticatedPrincipal authenticatedPrincipal = new 
AuthenticatedPrincipal("user");
         Set<Principal> authenticatedUser = 
Collections.<Principal>singleton(authenticatedPrincipal);

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Test.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Test.java?rev=1735525&r1=1735524&r2=1735525&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Test.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Test.java
 Thu Mar 17 23:59:37 2016
@@ -146,7 +146,7 @@ public class AMQPConnection_0_8Test exte
         
when(_port.getAuthenticationProvider()).thenReturn(authenticationProvider);
         when(_port.getVirtualHost(VIRTUAL_HOST_NAME)).thenReturn(_virtualHost);
         when(_port.getContextValue(Long.class, 
Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)).thenReturn(2500l);
-        when(_port.getContextValue(Integer.class, 
AmqpPort.PORT_MAX_MESSAGE_SIZE)).thenReturn(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE);
+        when(_port.getContextValue(Integer.class, 
Connection.MAX_MESSAGE_SIZE)).thenReturn(Connection.DEFAULT_MAX_MESSAGE_SIZE);
 
         _sender = mock(ByteBufferSender.class);
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to