This is an automated email from the ASF dual-hosted git repository. vavrtom pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push: new d7fc06b QPID-8573: [Broker-J] Logging enhancement of Sole Connection Enforcement Policy events d7fc06b is described below commit d7fc06b67ff8e417e228e81932fe9960a304cf3b Author: Marek Laca <mkl...@users.noreply.github.com> AuthorDate: Tue Feb 22 13:58:16 2022 +0100 QPID-8573: [Broker-J] Logging enhancement of Sole Connection Enforcement Policy events This closes #114 --- .../logging/messages/ResourceLimitMessages.java | 62 ++++++++++++++++++++++ .../messages/ResourceLimit_logmessages.properties | 1 + .../protocol/v1_0/AMQPConnection_1_0Impl.java | 11 ++++ .../SoleConnectionEnforcementPolicyException.java | 19 +++++-- .../StrongConnectionEstablishmentLimiter.java | 2 +- .../protocol/v1_0/ProtocolEngine_1_0_0Test.java | 2 +- .../StrongConnectionEstablishmentLimiterTest.java | 8 +-- 7 files changed, 96 insertions(+), 9 deletions(-) diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ResourceLimitMessages.java b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ResourceLimitMessages.java index 743db3e..5d50add 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ResourceLimitMessages.java +++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ResourceLimitMessages.java @@ -64,12 +64,14 @@ public class ResourceLimitMessages public static final String RESOURCELIMIT_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "resourcelimit"; public static final String ACCEPTED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "resourcelimit.accepted"; + public static final String INFO_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "resourcelimit.info"; public static final String REJECTED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "resourcelimit.rejected"; static { LoggerFactory.getLogger(RESOURCELIMIT_LOG_HIERARCHY); LoggerFactory.getLogger(ACCEPTED_LOG_HIERARCHY); + LoggerFactory.getLogger(INFO_LOG_HIERARCHY); LoggerFactory.getLogger(REJECTED_LOG_HIERARCHY); _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.ResourceLimit_logmessages", _currentLocale); @@ -137,6 +139,66 @@ public class ResourceLimitMessages /** * Log a ResourceLimit message of the Format: + * <pre>RL-1003 : Info : {0} : {1}</pre> + * Optional values are contained in [square brackets] and are numbered + * sequentially in the method call. + * + */ + public static LogMessage INFO(String param1, String param2) + { + String rawMessage = _messages.getString("INFO"); + + final Object[] messageArguments = {param1, param2}; + // Create a new MessageFormat to ensure thread safety. + // Sharing a MessageFormat and using applyPattern is not thread safe + MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale); + + final String message = formatter.format(messageArguments); + + return new LogMessage() + { + @Override + public String toString() + { + return message; + } + + @Override + public String getLogHierarchy() + { + return INFO_LOG_HIERARCHY; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + final LogMessage that = (LogMessage) o; + + return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString()); + + } + + @Override + public int hashCode() + { + int result = toString().hashCode(); + result = 31 * result + getLogHierarchy().hashCode(); + return result; + } + }; + } + + /** + * Log a ResourceLimit message of the Format: * <pre>RL-1002 : Rejected : {0} {1} by {2} : {3}</pre> * Optional values are contained in [square brackets] and are numbered * sequentially in the method call. diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ResourceLimit_logmessages.properties b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ResourceLimit_logmessages.properties index a78feed..87b5f74 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ResourceLimit_logmessages.properties +++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ResourceLimit_logmessages.properties @@ -20,3 +20,4 @@ # User Resource Limit logging message i18n strings. ACCEPTED = RL-1001 : Accepted : {0} {1} by {2} : {3} REJECTED = RL-1002 : Rejected : {0} {1} by {2} : {3} +INFO = RL-1003 : Info : {0} : {1} diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java index f2d7262..ea5e38d 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java @@ -53,6 +53,9 @@ import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; + +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.logging.messages.ResourceLimitMessages; import org.apache.qpid.server.security.limit.ConnectionLimitException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -973,11 +976,14 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio } if (e.getPolicy() == SoleConnectionEnforcementPolicy.REFUSE_CONNECTION) { + LOGGER.debug("Closing newly open connection: {}", e.getMessage()); _properties.put(Symbol.valueOf("amqp:connection-establishment-failed"), true); final Error error = new Error(AmqpError.INVALID_FIELD, String.format("Connection closed due to sole-connection-enforcement-policy '%s'", e.getPolicy())); error.setInfo(Collections.singletonMap(Symbol.valueOf("invalid-field"), Symbol.valueOf("container-id"))); closeConnection(error); + getEventLogger().message(ResourceLimitMessages.REJECTED( + "Opening", "connection", String.format("container '%s'", e.getContainerID()), e.getMessage())); } else if (e.getPolicy() == SoleConnectionEnforcementPolicy.CLOSE_EXISTING) { @@ -985,12 +991,17 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio String.format("Connection closed due to sole-connection-enforcement-policy '%s'", e.getPolicy())); error.setInfo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"), true)); + final EventLogger logger = getEventLogger(); final List<ListenableFuture<Void>> rescheduleFutures = new ArrayList<>(); for (final AMQPConnection_1_0<?> connection : e.getExistingConnections()) { if (!connection.isClosing()) { + LOGGER.debug("Closing existing connection '{}': {}", + connection.getName(), e.getMessage()); rescheduleFutures.add(connection.doOnIOThreadAsync(() -> connection.close(error))); + logger.message(ResourceLimitMessages.INFO( + String.format("Closing existing connection '%s'", connection.getName()), e.getMessage())); } } doAfter(allAsList(rescheduleFutures), () -> doOnIOThreadAsync(() -> receiveOpenInternal(addressSpace))); diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyException.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyException.java index 8f49b7b..35da2f2 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyException.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyException.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.Objects; import java.util.Set; import org.apache.qpid.server.protocol.v1_0.AMQPConnection_1_0; @@ -28,16 +29,23 @@ import org.apache.qpid.server.security.limit.ConnectionLimitException; public class SoleConnectionEnforcementPolicyException extends ConnectionLimitException { + private static final String MESSAGE = + "Single connection with container ID '%s' is required due to sole connection enforcement policy '%s'"; + private final Set<AMQPConnection_1_0<?>> _existingConnections; private final SoleConnectionEnforcementPolicy _policy; + private final String _containerID; + public SoleConnectionEnforcementPolicyException(SoleConnectionEnforcementPolicy policy, - Collection<? extends AMQPConnection_1_0<?>> connections) + Collection<? extends AMQPConnection_1_0<?>> connections, + String containerID) { - super(String.format("Single connection is required due to sole-connection-enforcement-policy '%s'", policy)); - _policy = policy; + super(String.format(MESSAGE, containerID, policy)); + _policy = Objects.requireNonNull(policy); _existingConnections = new HashSet<>(connections); + _containerID = Objects.requireNonNull(containerID); } public SoleConnectionEnforcementPolicy getPolicy() @@ -49,4 +57,9 @@ public class SoleConnectionEnforcementPolicyException extends ConnectionLimitExc { return Collections.unmodifiableSet(_existingConnections); } + + public String getContainerID() + { + return _containerID; + } } diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiter.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiter.java index c0ff83b..cf56520 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiter.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiter.java @@ -163,7 +163,7 @@ public class StrongConnectionEstablishmentLimiter implements ConnectionLimiterSe if (soleConnectionPolicy != null && !_connections.isEmpty()) { LOGGER.debug("Single connection is required, sole connection policy: {}", soleConnectionPolicy); - throw new SoleConnectionEnforcementPolicyException(soleConnectionPolicy, _connections); + throw new SoleConnectionEnforcementPolicyException(soleConnectionPolicy, _connections, _containerId); } final ConnectionSlot underlyingSlot = _underlyingLimiter.register(connection); diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java index 962fabb..8936168 100644 --- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java +++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java @@ -127,7 +127,7 @@ public class ProtocolEngine_1_0_0Test extends UnitTestBase public Object answer(final InvocationOnMock invocation) throws Throwable { _connection = connectionCaptor.getValue(); - throw new SoleConnectionEnforcementPolicyException(null, Collections.emptySet()); + throw new SoleConnectionEnforcementPolicyException(null, Collections.emptySet(), "abc1"); } }).when(_virtualHost).registerConnection(connectionCaptor.capture()); when(_virtualHost.getPrincipal()).thenReturn(mock(VirtualHostPrincipal.class)); diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiterTest.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiterTest.java index fdbd0b4..53831bc 100644 --- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiterTest.java +++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiterTest.java @@ -104,7 +104,7 @@ public class StrongConnectionEstablishmentLimiterTest extends UnitTestBase catch (SoleConnectionEnforcementPolicyException e) { assertEquals( - "Single connection is required due to sole-connection-enforcement-policy 'refuse-connection'", + "Single connection with container ID 'C' is required due to sole connection enforcement policy 'refuse-connection'", e.getMessage()); assertEquals(2, e.getExistingConnections().size()); @@ -138,7 +138,7 @@ public class StrongConnectionEstablishmentLimiterTest extends UnitTestBase catch (SoleConnectionEnforcementPolicyException e) { assertEquals( - "Single connection is required due to sole-connection-enforcement-policy 'close-existing'", + "Single connection with container ID 'C' is required due to sole connection enforcement policy 'close-existing'", e.getMessage()); assertEquals(1, e.getExistingConnections().size()); @@ -216,7 +216,7 @@ public class StrongConnectionEstablishmentLimiterTest extends UnitTestBase catch (SoleConnectionEnforcementPolicyException e) { assertEquals( - "Single connection is required due to sole-connection-enforcement-policy 'close-existing'", + "Single connection with container ID 'C' is required due to sole connection enforcement policy 'close-existing'", e.getMessage()); assertEquals(1, e.getExistingConnections().size()); @@ -255,7 +255,7 @@ public class StrongConnectionEstablishmentLimiterTest extends UnitTestBase catch (SoleConnectionEnforcementPolicyException e) { assertEquals( - "Single connection is required due to sole-connection-enforcement-policy 'refuse-connection'", + "Single connection with container ID 'C' is required due to sole connection enforcement policy 'refuse-connection'", e.getMessage()); assertEquals(1, e.getExistingConnections().size()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org