This is an automated email from the ASF dual-hosted git repository. orudyy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/master by this push: new 1e3cd0e QPID-8482:[Broker-J] Introduce derived attribute for a session peer name in AMQP 0-10 sessions 1e3cd0e is described below commit 1e3cd0effea1ee79bcfffb730180bc44cc18a66c Author: Dedeepya T <dedeepy...@yahoo.co.in> AuthorDate: Tue Nov 24 20:30:13 2020 +0530 QPID-8482:[Broker-J] Introduce derived attribute for a session peer name in AMQP 0-10 sessions This closes #71 --- .../protocol/v0_10/ServerConnectionDelegate.java | 52 +++++- .../apache/qpid/server/protocol/v0_10/Session.java | 31 ++++ .../qpid/server/protocol/v0_10/Session_0_10.java | 14 +- .../v0_10/ServerConnectionDelegateTest.java | 181 +++++++++++++++++++++ .../server/protocol/v0_10/ServerSessionTest.java | 2 +- 5 files changed, 276 insertions(+), 4 deletions(-) diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index 227c48d..ef5f8ce 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -22,14 +22,19 @@ package org.apache.qpid.server.protocol.v0_10; import static org.apache.qpid.server.protocol.v0_10.ServerConnection.State.CLOSE_RCVD; +import java.nio.charset.StandardCharsets; import java.security.AccessControlException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.security.Principal; import java.util.ArrayList; +import java.util.Base64; import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +57,8 @@ import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; public class ServerConnectionDelegate extends MethodDelegate<ServerConnection> implements ProtocolDelegate<ServerConnection> { private static final Logger LOGGER = LoggerFactory.getLogger(ServerConnectionDelegate.class); + static final String MESSAGE_DIGEST_SHA1 = "SHA-1"; + static final int BASE64_LIMIT = 64; private final AmqpPort<?> _port; private List<Object> _locales; @@ -410,7 +417,7 @@ public class ServerConnectionDelegate extends MethodDelegate<ServerConnection> i final ServerSession serverSession = new ServerSession(serverConnection, serverSessionDelegate, new Binary(atc.getName()), 0); final Session_0_10 session = new Session_0_10(serverConnection.getAmqpConnection(), atc.getChannel(), - serverSession); + serverSession, getPeerSessionName(atc.getName())); session.create(); serverSession.setModelObject(session); @@ -427,6 +434,49 @@ public class ServerConnectionDelegate extends MethodDelegate<ServerConnection> i } } + private String getPeerSessionName(final byte[] attachName) + { + try + { + return UUID.fromString(new String(attachName, StandardCharsets.UTF_8)).toString(); + } + catch (RuntimeException e) + { + return createBase64OrSha1(attachName); + } + } + + private String createBase64OrSha1(final byte[] attachName) + { + if (attachName.length <= BASE64_LIMIT) + { + return Base64.getEncoder().encodeToString(attachName); + } + else + { + return createSha1(attachName); + } + } + + private String createSha1(final byte[] attachName) + { + try + { + final MessageDigest digest = MessageDigest.getInstance(MESSAGE_DIGEST_SHA1); + return Base64.getEncoder().encodeToString(digest.digest(attachName)); + } + catch (NoSuchAlgorithmException e) + { + return Base64.getEncoder().encodeToString(attachName); + } + } + + // for test purposes only + void setState(ConnectionState state) + { + _state = state; + } + private boolean isSessionNameUnique(final byte[] name, final ServerConnection conn) { final Principal authorizedPrincipal = conn.getAuthorizedPrincipal(); diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session.java new file mode 100644 index 0000000..4347f6b --- /dev/null +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.protocol.v0_10; + +import org.apache.qpid.server.model.DerivedAttribute; +import org.apache.qpid.server.model.ManagedObject; + +@ManagedObject(category = false, creatable = false, type="Session_0_10") +public interface Session<C extends Session<C>> extends org.apache.qpid.server.session.AMQPSession<C, ConsumerTarget_0_10> +{ + @DerivedAttribute + String getPeerSessionName(); +} diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java index dd6ca80..d8197be 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java @@ -33,14 +33,18 @@ import org.apache.qpid.server.session.AbstractAMQPSession; import org.apache.qpid.server.util.Action; public class Session_0_10 extends AbstractAMQPSession<Session_0_10, ConsumerTarget_0_10> - implements LogSubject, org.apache.qpid.server.util.Deletable<Session_0_10> + implements LogSubject, org.apache.qpid.server.util.Deletable<Session_0_10> ,Session<Session_0_10> { private final AMQPConnection_0_10 _connection; private final ServerSession _serverSession; + private String _peerSessionName; - protected Session_0_10(final Connection<?> parent, final int sessionId, final ServerSession serverSession) + protected Session_0_10(final Connection<?> parent, + final int sessionId, + final ServerSession serverSession, final String peerSessionName) { super(parent, sessionId); + _peerSessionName = peerSessionName; _connection = (AMQPConnection_0_10) parent; _serverSession = serverSession; } @@ -159,4 +163,10 @@ public class Session_0_10 extends AbstractAMQPSession<Session_0_10, ConsumerTarg { return _serverSession; } + + @Override + public String getPeerSessionName() + { + return _peerSessionName; + } } diff --git a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegateTest.java b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegateTest.java new file mode 100644 index 0000000..fbcfe3c --- /dev/null +++ b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegateTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.protocol.v0_10; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import static org.apache.qpid.server.protocol.v0_10.ServerConnectionDelegate.BASE64_LIMIT; +import static org.apache.qpid.server.protocol.v0_10.ServerConnectionDelegate.MESSAGE_DIGEST_SHA1; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.security.AccessControlContext; +import java.security.AccessController; +import java.security.MessageDigest; +import java.security.PrivilegedAction; +import java.util.Base64; +import java.util.Collections; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import javax.security.auth.Subject; +import javax.security.auth.SubjectDomainCombiner; + +import org.hamcrest.CoreMatchers; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.stubbing.Answer; + +import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.model.AuthenticationProvider; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.BrokerModel; +import org.apache.qpid.server.model.NamedAddressSpace; +import org.apache.qpid.server.model.port.AmqpPort; +import org.apache.qpid.server.protocol.v0_10.transport.SessionAttach; +import org.apache.qpid.test.utils.UnitTestBase; + +public class ServerConnectionDelegateTest extends UnitTestBase +{ + + private ServerConnectionDelegate _delegate; + private ServerConnection _serverConnection; + private TaskExecutor _taskExecutor; + private AccessControlContext _accessControlContext; + + @Before + public void setUp() + { + _taskExecutor = CurrentThreadTaskExecutor.newStartedInstance(); + final Broker broker = mock(Broker.class); + when(broker.getNetworkBufferSize()).thenReturn(0xffff); + when(broker.getContextValue(Long.class, Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT)).thenReturn(Long.MAX_VALUE); + when(broker.getTaskExecutor()).thenReturn(_taskExecutor); + when(broker.getModel()).thenReturn(BrokerModel.getInstance()); + final AuthenticationProvider authenticationProvider = mock(AuthenticationProvider.class); + when(authenticationProvider.getAvailableMechanisms(anyBoolean())).thenReturn(Collections.singletonList("PLAIN")); + final AmqpPort<?> port = mock(AmqpPort.class); + when(port.getAuthenticationProvider()).thenReturn(authenticationProvider); + when(port.getParent()).thenReturn(broker); + + _delegate = new ServerConnectionDelegate(port, true, "test"); + _delegate.setState(ServerConnectionDelegate.ConnectionState.OPEN); + final NamedAddressSpace addressSpace = mock(NamedAddressSpace.class); + when(addressSpace.getConnections()).thenReturn(Collections.emptyList()); + + final Subject subject = new Subject(); + subject.setReadOnly(); + _accessControlContext = AccessController.getContext(); + final AMQPConnection_0_10 amqpConnection = mock(AMQPConnection_0_10.class); + when(amqpConnection.getParent()).thenReturn(broker); + when(amqpConnection.getBroker()).thenReturn(broker); + when(amqpConnection.getChildExecutor()).thenReturn(_taskExecutor); + when(amqpConnection.getModel()).thenReturn(BrokerModel.getInstance()); + when(amqpConnection.getSubject()).thenReturn(subject); + when(amqpConnection.getContextValue(Long.class, org.apache.qpid.server.model.Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Long.MAX_VALUE); + when(amqpConnection.getContextValue(Integer.class, org.apache.qpid.server.model.Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Integer.MAX_VALUE); + doAnswer((Answer<AccessControlContext>) invocationOnMock -> { + Subject subject1 = (Subject) invocationOnMock.getArgument(0); + return AccessController.doPrivileged( + (PrivilegedAction<AccessControlContext>) () -> + new AccessControlContext(_accessControlContext, new SubjectDomainCombiner(subject1))); + }).when(amqpConnection).getAccessControlContextFromSubject(any()); + when(amqpConnection.getEventLogger()).thenReturn(mock(EventLogger.class)); + + _serverConnection = mock(ServerConnection.class); + when(_serverConnection.getAddressSpace()).thenReturn(addressSpace); + when(_serverConnection.getBroker()).thenReturn(broker); + when(_serverConnection.getAmqpConnection()).thenReturn(amqpConnection); + } + + @After + public void tearDown() + { + _taskExecutor.stop(); + } + + @Test + public void sessionAttachWhenNameIsUUID() + { + final String name = UUID.randomUUID().toString(); + final SessionAttach attach = createSessionAttach(name); + + _delegate.sessionAttach(_serverConnection, attach); + + final ArgumentCaptor<ServerSession> sessionCaptor = ArgumentCaptor.forClass(ServerSession.class); + verify(_serverConnection).registerSession(sessionCaptor.capture()); + + final ServerSession serverSession = sessionCaptor.getValue(); + final Session session = serverSession.getModelObject(); + assertThat(session.getPeerSessionName(), CoreMatchers.is(equalTo(name))); + } + + @Test + public void sessionAttachWhenNameIsNotUUID() + { + final String name = "ABC"; + final SessionAttach attach = createSessionAttach(name); + + _delegate.sessionAttach(_serverConnection, attach); + + final ArgumentCaptor<ServerSession> sessionCaptor = ArgumentCaptor.forClass(ServerSession.class); + verify(_serverConnection).registerSession(sessionCaptor.capture()); + + final ServerSession serverSession = sessionCaptor.getValue(); + final Session session = serverSession.getModelObject(); + assertThat(session.getPeerSessionName(), CoreMatchers.is(equalTo(Base64.getEncoder().encodeToString(name.getBytes(UTF_8))))); + } + + @Test + public void sessionAttachWhenNameExceedsSizeLimit() throws Exception + { + final String name = Stream.generate(() -> String.valueOf('a')).limit(BASE64_LIMIT + 1).collect(Collectors.joining());; + final SessionAttach attach = createSessionAttach(name); + + _delegate.sessionAttach(_serverConnection, attach); + + final ArgumentCaptor<ServerSession> sessionCaptor = ArgumentCaptor.forClass(ServerSession.class); + verify(_serverConnection).registerSession(sessionCaptor.capture()); + + final ServerSession serverSession = sessionCaptor.getValue(); + final Session session = serverSession.getModelObject(); + final String digest = Base64.getEncoder().encodeToString(MessageDigest.getInstance(MESSAGE_DIGEST_SHA1).digest(name.getBytes(UTF_8))); + assertThat(session.getPeerSessionName(), CoreMatchers.is(equalTo(digest))); + } + + private SessionAttach createSessionAttach(final String name) + { + final SessionAttach attach = new SessionAttach(); + attach.setName(name.getBytes(UTF_8)); + return attach; + } +} \ No newline at end of file diff --git a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java index f7e0259..c32bf4e 100644 --- a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java +++ b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java @@ -145,7 +145,7 @@ public class ServerSessionTest extends UnitTestBase invokedMethods.add(m); } }; - Session_0_10 modelSession = new Session_0_10(modelConnection, 1, session); + Session_0_10 modelSession = new Session_0_10(modelConnection, 1, session, getTestName()); session.setModelObject(modelSession); ServerSessionDelegate delegate = new ServerSessionDelegate(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org