Author: rgodfrey
Date: Thu Mar 17 23:59:37 2016
New Revision: 1735525
URL: http://svn.apache.org/viewvc?rev=1735525&view=rev
Log:
QPID-6962 : Use a single context variable for max message size
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Test.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java?rev=1735525&r1=1735524&r2=1735525&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
Thu Mar 17 23:59:37 2016
@@ -51,6 +51,10 @@ public interface Connection<X extends Co
@ManagedContextDefault(name = MAX_UNCOMMITTED_IN_MEMORY_SIZE)
long DEFAULT_MAX_UNCOMMITTED_IN_MEMORY_SIZE = 10l * 1024l * 1024l;
+ String MAX_MESSAGE_SIZE = "qpid.max_message_size";
+ @ManagedContextDefault(name = MAX_MESSAGE_SIZE)
+ int DEFAULT_MAX_MESSAGE_SIZE = 0x1f40000; // 500Mb
+
@DerivedAttribute
String getClientId();
@@ -58,6 +62,9 @@ public interface Connection<X extends Co
String getClientVersion();
@DerivedAttribute
+ String getClientProduct();
+
+ @DerivedAttribute
boolean isIncoming();
@DerivedAttribute
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1735525&r1=1735524&r2=1735525&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
Thu Mar 17 23:59:37 2016
@@ -77,11 +77,6 @@ public interface VirtualHost<X extends V
@ManagedContextDefault( name = VIRTUALHOST_WORK_DIR_VAR)
public static final String VIRTUALHOST_WORK_DIR =
VIRTUALHOST_WORK_DIR_VAR_EXPRESSION;
- String MAX_MESSAGE_SIZE = "qpid.max_message_size";
-
- @ManagedContextDefault(name = MAX_MESSAGE_SIZE)
- int DEFAULT_MAX_MESSAGE_SIZE = 0x1f40000; // 500Mb
-
@ManagedContextDefault( name = "queue.deadLetterQueueEnabled")
public static final boolean DEFAULT_DEAD_LETTER_QUEUE_ENABLED = false;
String DEFAULT_DLE_NAME_SUFFIX = "_DLE";
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java?rev=1735525&r1=1735524&r2=1735525&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
Thu Mar 17 23:59:37 2016
@@ -82,11 +82,6 @@ public interface AmqpPort<X extends Amqp
@ManagedContextDefault(name = PORT_AMQP_ACCEPT_BACKLOG)
int DEFAULT_PORT_AMQP_ACCEPT_BACKLOG = 1024;
- String PORT_MAX_MESSAGE_SIZE = "qpid.port.max_message_size";
-
- @ManagedContextDefault(name = PORT_MAX_MESSAGE_SIZE)
- int DEFAULT_MAX_MESSAGE_SIZE = 0x1f40000; // 500Mb
-
String OPEN_CONNECTIONS_WARN_PERCENT =
"qpid.port.open_connections_warn_percent";
@ManagedContextDefault(name = OPEN_CONNECTIONS_WARN_PERCENT)
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1735525&r1=1735524&r2=1735525&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
Thu Mar 17 23:59:37 2016
@@ -53,6 +53,7 @@ import org.apache.qpid.server.logging.su
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Session;
@@ -218,6 +219,7 @@ public abstract class AbstractAMQPConnec
return _aggregateTicker;
}
+ @Override
public final long getLastIoTime()
{
return Math.max(getLastReadTime(), getLastWriteTime());
@@ -245,16 +247,18 @@ public abstract class AbstractAMQPConnec
_lastWriteTime = System.currentTimeMillis();
}
+ @Override
public final long getConnectionId()
{
return _connectionId;
}
- public final StatisticsCounter getMessageDeliveryStatistics()
+ private StatisticsCounter getMessageDeliveryStatistics()
{
return _messagesDelivered;
}
+ @Override
public String getRemoteAddressString()
{
return String.valueOf(_network.getRemoteAddress());
@@ -270,26 +274,31 @@ public abstract class AbstractAMQPConnec
return this;
}
+ @Override
public boolean isConnectionStopped()
{
return _stopped;
}
+ @Override
public final String getVirtualHostName()
{
return getVirtualHost() == null ? null : getVirtualHost().getName();
}
+ @Override
public String getClientVersion()
{
return _clientVersion;
}
+ @Override
public String getRemoteProcessPid()
{
return _remoteProcessPid;
}
+ @Override
public void setScheduler(final NetworkConnectionScheduler
networkConnectionScheduler)
{
if(_network instanceof NonBlockingConnection)
@@ -298,6 +307,7 @@ public abstract class AbstractAMQPConnec
}
}
+ @Override
public String getClientProduct()
{
return _clientProduct;
@@ -305,32 +315,24 @@ public abstract class AbstractAMQPConnec
protected void updateMaxMessageSize()
{
+ _maxMessageSize.set(Math.min(getMaxMessageSize(getPort()),
getMaxMessageSize(getVirtualHost())));
+ }
+
+ private long getMaxMessageSize(final ConfiguredObject<?> object)
+ {
long maxMessageSize;
try
{
- maxMessageSize = getPort().getContextValue(Integer.class,
AmqpPort.PORT_MAX_MESSAGE_SIZE);
+ maxMessageSize = object.getContextValue(Integer.class,
MAX_MESSAGE_SIZE);
}
catch (NullPointerException | IllegalArgumentException e)
{
_logger.warn("Context variable {} has invalid value and cannot be
used to restrict maximum message size",
- AmqpPort.PORT_MAX_MESSAGE_SIZE,
+ MAX_MESSAGE_SIZE,
e);
maxMessageSize = Long.MAX_VALUE;
}
- try
- {
- maxMessageSize = Math.min(maxMessageSize,
- (long)
getVirtualHost().getContextValue(Integer.class, VirtualHost.MAX_MESSAGE_SIZE));
- }
- catch (NullPointerException | IllegalArgumentException e)
- {
-
- _logger.warn("Context variable {} has invalid value and cannot be
used to restrict maximum message size",
- VirtualHost.MAX_MESSAGE_SIZE,
- e);
- }
-
- _maxMessageSize.set(maxMessageSize > 0 ? maxMessageSize :
Long.MAX_VALUE);
+ return maxMessageSize > 0 ? maxMessageSize : Long.MAX_VALUE;
}
public long getMaxMessageSize()
@@ -338,11 +340,13 @@ public abstract class AbstractAMQPConnec
return _maxMessageSize.get();
}
+ @Override
public void addDeleteTask(final Action<? super C> task)
{
_connectionCloseTaskList.add(task);
}
+ @Override
public void removeDeleteTask(final Action<? super C> task)
{
_connectionCloseTaskList.remove(task);
@@ -372,38 +376,42 @@ public abstract class AbstractAMQPConnec
}
}
+ @Override
public String getClientId()
{
return _clientId;
}
- public final StatisticsCounter getDataReceiptStatistics()
+ private StatisticsCounter getDataReceiptStatistics()
{
return _dataReceived;
}
- public final StatisticsCounter getDataDeliveryStatistics()
+ private StatisticsCounter getDataDeliveryStatistics()
{
return _dataDelivered;
}
+ @Override
public final SocketAddress getRemoteSocketAddress()
{
return _network.getRemoteAddress();
}
+ @Override
public void registerMessageDelivered(long messageSize)
{
_messagesDelivered.registerEvent(1L);
_dataDelivered.registerEvent(messageSize);
-
((VirtualHost<?>)getVirtualHost()).registerMessageDelivered(messageSize);
+ getVirtualHost().registerMessageDelivered(messageSize);
}
+ @Override
public void registerMessageReceived(long messageSize, long timestamp)
{
_messagesReceived.registerEvent(1L, timestamp);
_dataReceived.registerEvent(messageSize, timestamp);
-
((VirtualHost<?>)getVirtualHost()).registerMessageReceived(messageSize,
timestamp);
+ getVirtualHost().registerMessageReceived(messageSize, timestamp);
}
public final void resetStatistics()
@@ -414,7 +422,7 @@ public abstract class AbstractAMQPConnec
_dataReceived.reset();
}
- public final StatisticsCounter getMessageReceiptStatistics()
+ private StatisticsCounter getMessageReceiptStatistics()
{
return _messagesReceived;
}
@@ -717,6 +725,7 @@ public abstract class AbstractAMQPConnec
protected abstract EventLogger getEventLogger();
+ @Override
public final boolean isAuthorizedMessagePrincipal(final String userId)
{
return !_messageAuthorizationRequired ||
getAuthorizedPrincipal().getName().equals(userId == null? "" : userId);
@@ -727,7 +736,7 @@ public abstract class AbstractAMQPConnec
private final long _allowedTime;
private volatile long _accumulatedSchedulingDelay;
- public SlowConnectionOpenTicker(long timeoutTime)
+ SlowConnectionOpenTicker(long timeoutTime)
{
_allowedTime = timeoutTime;
}
@@ -735,9 +744,7 @@ public abstract class AbstractAMQPConnec
@Override
public int getTimeToNextTick(final long currentTime)
{
- final int timeToNextTick = (int) (getCreatedTime() + _allowedTime
+ _accumulatedSchedulingDelay
- - currentTime);
- return timeToNextTick;
+ return (int) (getCreatedTime() + _allowedTime +
_accumulatedSchedulingDelay - currentTime);
}
@Override
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java?rev=1735525&r1=1735524&r2=1735525&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
Thu Mar 17 23:59:37 2016
@@ -100,7 +100,7 @@ public class AMQChannelTest extends Qpid
_port = mock(AmqpPort.class);
when(_port.getChildExecutor()).thenReturn(taskExecutor);
when(_port.getModel()).thenReturn(BrokerModel.getInstance());
- when(_port.getContextValue(Integer.class,
AmqpPort.PORT_MAX_MESSAGE_SIZE)).thenReturn(1);
+ when(_port.getContextValue(Integer.class,
Connection.MAX_MESSAGE_SIZE)).thenReturn(1);
AuthenticatedPrincipal authenticatedPrincipal = new
AuthenticatedPrincipal("user");
Set<Principal> authenticatedUser =
Collections.<Principal>singleton(authenticatedPrincipal);
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Test.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Test.java?rev=1735525&r1=1735524&r2=1735525&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Test.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Test.java
Thu Mar 17 23:59:37 2016
@@ -146,7 +146,7 @@ public class AMQPConnection_0_8Test exte
when(_port.getAuthenticationProvider()).thenReturn(authenticationProvider);
when(_port.getVirtualHost(VIRTUAL_HOST_NAME)).thenReturn(_virtualHost);
when(_port.getContextValue(Long.class,
Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)).thenReturn(2500l);
- when(_port.getContextValue(Integer.class,
AmqpPort.PORT_MAX_MESSAGE_SIZE)).thenReturn(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE);
+ when(_port.getContextValue(Integer.class,
Connection.MAX_MESSAGE_SIZE)).thenReturn(Connection.DEFAULT_MAX_MESSAGE_SIZE);
_sender = mock(ByteBufferSender.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]