Author: rgodfrey
Date: Wed Dec 14 15:18:33 2016
New Revision: 1774263

URL: http://svn.apache.org/viewvc?rev=1774263&view=rev
Log:
QPID-7589 : Allow the creation of temporary reply addresses in the $management 
synthetic virtual host

Added:
    
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
   (with props)
Modified:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
    
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
    
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/management/amqp/AmqpManagementTest.java
    qpid/java/trunk/test-profiles/Java10BrokenTestsExcludes

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1774263&r1=1774262&r2=1774263&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
 Wed Dec 14 15:18:33 2016
@@ -2695,6 +2695,10 @@ public abstract class AbstractVirtualHos
         {
             return (T) createChild((Class<? extends Queue>)clazz, attributes);
         }
+        else if(clazz.isAssignableFrom(Queue.class))
+        {
+            return (T) createChild(Queue.class, attributes);
+        }
         else
         {
             throw new IllegalArgumentException("Cannot create message source 
children of class " + clazz.getSimpleName());
@@ -2713,6 +2717,10 @@ public abstract class AbstractVirtualHos
         {
             return (T) createChild((Class<? extends Queue>)clazz, attributes);
         }
+        else if(clazz.isAssignableFrom(Queue.class))
+        {
+            return (T) createChild(Queue.class, attributes);
+        }
         else
         {
             throw new IllegalArgumentException("Cannot create message 
destination children of class " + clazz.getSimpleName());

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1774263&r1=1774262&r2=1774263&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 Wed Dec 14 15:18:33 2016
@@ -799,7 +799,7 @@ public class Session_1_0 implements AMQS
                 {
                     if (Boolean.TRUE.equals(source.getDynamic()))
                     {
-                        Queue<?> tempQueue = 
createTemporaryQueue(source.getDynamicNodeProperties());
+                        MessageSource tempQueue = 
createDynamicSource(source.getDynamicNodeProperties());
                         source.setAddress(tempQueue.getName());
                     }
                     String addr = source.getAddress();
@@ -976,7 +976,7 @@ public class Session_1_0 implements AMQS
                         if (Boolean.TRUE.equals(target.getDynamic()))
                         {
 
-                            Queue<?> tempQueue = 
createTemporaryQueue(target.getDynamicNodeProperties());
+                            MessageDestination tempQueue = 
createDynamicDestination(target.getDynamicNodeProperties());
                             target.setAddress(tempQueue.getName());
                         }
 
@@ -1114,50 +1114,47 @@ public class Session_1_0 implements AMQS
     }
 
 
-    private Queue<?> createTemporaryQueue(Map properties)
+    private MessageSource createDynamicSource(Map properties)
     {
         final String queueName = _primaryDomain + "TempQueue" + 
UUID.randomUUID().toString();
-        Queue<?> queue = null;
+        MessageSource queue = null;
         try
         {
-            LifetimePolicy lifetimePolicy = properties == null
-                                            ? null
-                                            : (LifetimePolicy) 
properties.get(LIFETIME_POLICY);
-            Map<String,Object> attributes = new HashMap<String,Object>();
-            attributes.put(org.apache.qpid.server.model.Queue.ID, 
UUID.randomUUID());
-            attributes.put(org.apache.qpid.server.model.Queue.NAME, queueName);
-            attributes.put(org.apache.qpid.server.model.Queue.DURABLE, false);
+            Map<String, Object> attributes = 
convertDynamicNodePropertiesToAttributes(properties, queueName);
 
-            if(lifetimePolicy instanceof DeleteOnNoLinks)
-            {
-                
attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY,
-                               
org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_NO_LINKS);
-            }
-            else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages)
-            {
-                
attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY,
-                               
org.apache.qpid.server.model.LifetimePolicy.IN_USE);
-            }
-            else if(lifetimePolicy instanceof DeleteOnClose)
-            {
-                
attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY,
-                               
org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE);
-            }
-            else if(lifetimePolicy instanceof DeleteOnNoMessages)
-            {
-                
attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY,
-                               
org.apache.qpid.server.model.LifetimePolicy.IN_USE);
-            }
-            else
-            {
-                
attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY,
-                               
org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE);
-            }
 
 
-            // TODO convert AMQP 1-0 node properties to queue attributes
+            queue = getAddressSpace().createMessageSource(MessageSource.class, 
attributes);
+        }
+        catch (AccessControlException e)
+        {
+            Error error = new Error();
+            error.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
+            error.setDescription(e.getMessage());
 
-            queue = getAddressSpace().createMessageSource(Queue.class, 
attributes);
+            _connection.close(error);
+        }
+        catch (AbstractConfiguredObject.DuplicateNameException e)
+        {
+            _logger.error("A temporary queue was created with a name which 
collided with an existing queue name");
+            throw new ConnectionScopedRuntimeException(e);
+        }
+
+        return queue;
+    }
+
+
+    private MessageDestination createDynamicDestination(Map properties)
+    {
+        final String queueName = _primaryDomain + "TempQueue" + 
UUID.randomUUID().toString();
+        MessageDestination queue = null;
+        try
+        {
+            Map<String, Object> attributes = 
convertDynamicNodePropertiesToAttributes(properties, queueName);
+
+
+
+            queue = 
getAddressSpace().createMessageDestination(MessageDestination.class, 
attributes);
         }
         catch (AccessControlException e)
         {
@@ -1176,6 +1173,45 @@ public class Session_1_0 implements AMQS
         return queue;
     }
 
+    private Map<String, Object> convertDynamicNodePropertiesToAttributes(final 
Map properties, final String queueName)
+    {
+        // TODO convert AMQP 1-0 node properties to queue attributes
+        LifetimePolicy lifetimePolicy = properties == null
+                                        ? null
+                                        : (LifetimePolicy) 
properties.get(LIFETIME_POLICY);
+        Map<String,Object> attributes = new HashMap<String,Object>();
+        attributes.put(Queue.ID, UUID.randomUUID());
+        attributes.put(Queue.NAME, queueName);
+        attributes.put(Queue.DURABLE, false);
+
+        if(lifetimePolicy instanceof DeleteOnNoLinks)
+        {
+            attributes.put(Queue.LIFETIME_POLICY,
+                           
org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_NO_LINKS);
+        }
+        else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages)
+        {
+            attributes.put(Queue.LIFETIME_POLICY,
+                           org.apache.qpid.server.model.LifetimePolicy.IN_USE);
+        }
+        else if(lifetimePolicy instanceof DeleteOnClose)
+        {
+            attributes.put(Queue.LIFETIME_POLICY,
+                           
org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE);
+        }
+        else if(lifetimePolicy instanceof DeleteOnNoMessages)
+        {
+            attributes.put(Queue.LIFETIME_POLICY,
+                           org.apache.qpid.server.model.LifetimePolicy.IN_USE);
+        }
+        else
+        {
+            attributes.put(Queue.LIFETIME_POLICY,
+                           
org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE);
+        }
+        return attributes;
+    }
+
     ServerTransaction getTransaction(Binary transactionId)
     {
 

Modified: 
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java?rev=1774263&r1=1774262&r2=1774263&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
 Wed Dec 14 15:18:33 2016
@@ -22,15 +22,25 @@ package org.apache.qpid.server.managemen
 
 import java.nio.charset.StandardCharsets;
 import java.security.AccessControlException;
+import java.security.AccessController;
 import java.security.Principal;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 
+import javax.security.auth.Subject;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.server.connection.SessionPrincipal;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageInstance;
@@ -41,6 +51,7 @@ import org.apache.qpid.server.model.Conn
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.plugin.SystemAddressSpaceCreator;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.LinkModel;
 import org.apache.qpid.server.protocol.LinkRegistry;
 import org.apache.qpid.server.security.SecurityToken;
@@ -62,6 +73,8 @@ public class ManagementAddressSpace impl
     public static final String MANAGEMENT_ADDRESS_SPACE_NAME = "$management";
     private static final String MANAGEMENT_NODE_NAME = "$management";
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ManagementAddressSpace.class);
+
     private final String _name;
     private final SystemAddressSpaceCreator.AddressSpaceRegistry 
_addressSpaceRegistry;
     private final ManagementNode _managementNode;
@@ -73,6 +86,7 @@ public class ManagementAddressSpace impl
     private final Broker<?> _broker;
     private final Principal _principal;
     private final UUID _id;
+    private final ConcurrentMap<Object, ConcurrentMap<String, 
ProxyMessageSource>> _connectionSpecificDestinations = new 
ConcurrentHashMap<>();
 
     public ManagementAddressSpace(final 
SystemAddressSpaceCreator.AddressSpaceRegistry addressSpaceRegistry)
     {
@@ -110,7 +124,10 @@ public class ManagementAddressSpace impl
         {
             return _propertiesNode;
         }
-        return null;
+        else
+        {
+            return getProxyNode(name);
+        }
     }
 
     @Override
@@ -120,9 +137,40 @@ public class ManagementAddressSpace impl
         {
             return _managementNode;
         }
+        else
+        {
+            MessageDestination connectionSpecificDestinations = 
getProxyNode(name);
+            if (connectionSpecificDestinations != null) return 
connectionSpecificDestinations;
+        }
+
+        return null;
+    }
+
+    ProxyMessageSource getProxyNode(final String name)
+    {
+        LOGGER.debug("RG: looking for proxy source {}", name);
+        Subject currentSubject = 
Subject.getSubject(AccessController.getContext());
+        Set<SessionPrincipal> sessionPrincipals = 
currentSubject.getPrincipals(SessionPrincipal.class);
+        if (!sessionPrincipals.isEmpty())
+        {
+            Object connectionReference = 
sessionPrincipals.iterator().next().getSession().getConnectionReference();
+            Map<String, ProxyMessageSource>
+                    connectionSpecificDestinations = 
_connectionSpecificDestinations.get(connectionReference);
+            if(connectionSpecificDestinations != null)
+            {
+                LOGGER.debug("RG: ", connectionSpecificDestinations);
+
+                return connectionSpecificDestinations.get(name);
+            }
+        }
         return null;
     }
 
+    ManagementNode getManagementNode()
+    {
+        return _managementNode;
+    }
+
     @Override
     public void registerConnection(final AMQPConnection<?> connection)
     {
@@ -187,14 +235,78 @@ public class ManagementAddressSpace impl
     @Override
     public <T extends MessageSource> T createMessageSource(final Class<T> 
clazz, final Map<String, Object> attributes)
     {
-        return null;
+        if(clazz == MessageSource.class)
+        {
+            return (T) createProxyNode(attributes);
+        }
+        else
+        {
+            return null;
+        }
     }
 
+    private ProxyMessageSource createProxyNode(final Map<String, Object> 
attributes)
+    {
+        LOGGER.debug("RG: in create proxy node");
+        Subject currentSubject = 
Subject.getSubject(AccessController.getContext());
+        Set<SessionPrincipal> sessionPrincipals = 
currentSubject.getPrincipals(SessionPrincipal.class);
+        if (!sessionPrincipals.isEmpty())
+        {
+            LOGGER.debug("RG: session principal present");
+            final ProxyMessageSource proxyMessageSource = new 
ProxyMessageSource(this, attributes);
+            final AMQSessionModel session = 
sessionPrincipals.iterator().next().getSession();
+            final Object connectionReference = 
session.getConnectionReference();
+            ConcurrentMap<String, ProxyMessageSource> 
connectionSpecificDestinations =
+                    _connectionSpecificDestinations.get(connectionReference);
+            if (connectionSpecificDestinations == null)
+            {
+                connectionSpecificDestinations = new ConcurrentHashMap<>();
+                
if(_connectionSpecificDestinations.putIfAbsent(connectionReference, 
connectionSpecificDestinations) == null)
+                {
+                    session.getAMQPConnection().addDeleteTask(new Action()
+                    {
+                        @Override
+                        public void performAction(final Object object)
+                        {
+                            
_connectionSpecificDestinations.remove(connectionReference);
+                        }
+                    });
+                }
+            }
+            connectionSpecificDestinations.put(proxyMessageSource.getName(), 
proxyMessageSource);
+            return proxyMessageSource;
+        }
+        else
+        {
+            return null;
+        }
+    }
+
+    void removeProxyMessageSource(final Object connectionReference, final 
String name)
+    {
+        ConcurrentMap<String, ProxyMessageSource> 
connectionSpecificDestinations =
+                _connectionSpecificDestinations.get(connectionReference);
+        if(connectionSpecificDestinations != null)
+        {
+            connectionSpecificDestinations.remove(name);
+        }
+    }
+
+
     @Override
     public <T extends MessageDestination> T createMessageDestination(final 
Class<T> clazz,
                                                                      final 
Map<String, Object> attributes)
     {
-        return null;
+
+        LOGGER.debug("RG : requesting destination creation");
+        if(clazz == MessageDestination.class)
+        {
+            return (T) createProxyNode(attributes);
+        }
+        else
+        {
+            return null;
+        }
     }
 
     @Override

Modified: 
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1774263&r1=1774262&r2=1774263&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
 Wed Dec 14 15:18:33 2016
@@ -48,6 +48,9 @@ import java.util.regex.Pattern;
 
 import javax.security.auth.Subject;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.connection.SessionPrincipal;
 import org.apache.qpid.server.consumer.ConsumerOption;
@@ -89,6 +92,7 @@ import org.apache.qpid.server.util.State
 
 class ManagementNode implements MessageSource, MessageDestination
 {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ManagementNode.class);
 
     public static final String IDENTITY_ATTRIBUTE = "identity";
     public static final String INDEX_ATTRIBUTE = "index";
@@ -978,14 +982,13 @@ class ManagementNode implements MessageS
             AMQSessionModel publishingSession = 
sessionPrincipals.iterator().next().getSession();
             for (ManagementNodeConsumer candidate : _consumers)
             {
-                if (candidate.getTarget().getTargetAddress().equals(replyTo) 
&& candidate.getSessionModel() == publishingSession)
+                if (candidate.getTarget().getTargetAddress().equals(replyTo) 
&& candidate.getSessionModel().getConnectionReference() == 
publishingSession.getConnectionReference())
                 {
                     consumer = candidate;
                     break;
                 }
             }
         }
-
         return consumer == null ? _addressSpace.getDefaultDestination() : 
consumer;
     }
 

Added: 
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java?rev=1774263&view=auto
==============================================================================
--- 
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
 (added)
+++ 
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
 Wed Dec 14 15:18:33 2016
@@ -0,0 +1,384 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.management.amqp;
+
+import java.security.AccessControlException;
+import java.security.AccessController;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.security.auth.Subject;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.connection.SessionPrincipal;
+import org.apache.qpid.server.consumer.ConsumerOption;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageContainer;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.security.SecurityToken;
+import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+
+public class ProxyMessageSource implements MessageSource, MessageDestination
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ProxyMessageSource.class);
+    private final String _name;
+    private final UUID _id = UUID.randomUUID();
+    private final ManagementAddressSpace _managementAddressSpace;
+    private volatile MessageInstanceConsumer<?> _consumer;
+    private final AtomicBoolean _consumerSet = new AtomicBoolean(false);
+    private Object _connectionReference;
+
+    public ProxyMessageSource(final ManagementAddressSpace 
managementAddressSpace, final Map<String, Object> attributes)
+    {
+        _name = String.valueOf(attributes.get(ConfiguredObject.NAME));
+        _managementAddressSpace = managementAddressSpace;
+    }
+
+    @Override
+    public String getName()
+    {
+        return _name;
+    }
+
+    @Override
+    public NamedAddressSpace getAddressSpace()
+    {
+        return _managementAddressSpace;
+    }
+
+    @Override
+    public void authorisePublish(final SecurityToken token, final Map<String, 
Object> arguments)
+            throws AccessControlException
+    {
+        throw new AccessControlException("Sending messages to temporary 
addresses in a management addres spaceis not supported");
+    }
+
+    @Override
+    public <M extends ServerMessage<? extends StorableMessageMetaData>> int 
send(final M message,
+                                                                               
  final String routingAddress,
+                                                                               
  final InstanceProperties instanceProperties,
+                                                                               
  final ServerTransaction txn,
+                                                                               
  final Action<? super MessageInstance> postEnqueueAction)
+    {
+        return 0;
+    }
+
+    @Override
+    public UUID getId()
+    {
+        return _id;
+    }
+
+    @Override
+    public MessageDurability getMessageDurability()
+    {
+        return MessageDurability.NEVER;
+    }
+
+    @Override
+    public <T extends ConsumerTarget<T>> MessageInstanceConsumer<T> 
addConsumer(final T target,
+                                                                               
 final FilterManager filters,
+                                                                               
 final Class<? extends ServerMessage> messageClass,
+                                                                               
 final String consumerName,
+                                                                               
 final EnumSet<ConsumerOption> options,
+                                                                               
 final Integer priority)
+            throws ExistingExclusiveConsumer, 
ExistingConsumerPreventsExclusive,
+                   ConsumerAccessRefused, QueueDeleted
+    {
+        if(_consumerSet.compareAndSet(false,true))
+        {
+            Subject currentSubject = 
Subject.getSubject(AccessController.getContext());
+            Set<SessionPrincipal> sessionPrincipals = 
currentSubject.getPrincipals(SessionPrincipal.class);
+            if (!sessionPrincipals.isEmpty())
+            {
+                _connectionReference = 
sessionPrincipals.iterator().next().getSession().getConnectionReference();
+
+                WrappingTarget<T> wrapper = new WrappingTarget<>(target, 
_name);
+                
_managementAddressSpace.getManagementNode().addConsumer(wrapper, filters, 
messageClass, _name, options, priority);
+                final MessageInstanceConsumer<T> consumer = 
wrapper.getConsumer();
+                _consumer = consumer;
+                return consumer;
+            }
+            else
+            {
+                return null;
+            }
+        }
+        else
+        {
+            throw new ExistingExclusiveConsumer();
+        }
+    }
+
+    @Override
+    public Collection<? extends MessageInstanceConsumer> getConsumers()
+    {
+        return _consumer == null ? 
Collections.<MessageInstanceConsumer>emptySet() : 
Collections.singleton(_consumer);
+    }
+
+    @Override
+    public boolean verifySessionAccess(final AMQSessionModel<?, ?> session)
+    {
+        return session.getConnectionReference() == _connectionReference;
+    }
+
+    private class WrappingTarget<T extends ConsumerTarget<T>> implements 
ConsumerTarget<WrappingTarget<T>>
+    {
+        private final T _underlying;
+        private final String _address;
+        private MessageInstanceConsumer<T> _consumer;
+
+        public WrappingTarget(final T target, String address)
+        {
+            _underlying = target;
+            _address = address;
+        }
+
+        public T getUnderlying()
+        {
+            return _underlying;
+        }
+
+        public MessageInstanceConsumer<T> getConsumer()
+        {
+            return _consumer;
+        }
+
+        @Override
+        public void acquisitionRemoved(final MessageInstance node)
+        {
+            _underlying.acquisitionRemoved(node);
+        }
+
+        @Override
+        public boolean processPending()
+        {
+            return _underlying.processPending();
+        }
+
+        @Override
+        public String getTargetAddress()
+        {
+            return _address;
+        }
+
+        @Override
+        public boolean isMultiQueue()
+        {
+            return false;
+        }
+
+        @Override
+        public void notifyWork()
+        {
+            LOGGER.debug("RG: notifyWork called");
+            _underlying.notifyWork();
+        }
+
+        @Override
+        public void updateNotifyWorkDesired()
+        {
+            _underlying.updateNotifyWorkDesired();
+        }
+
+        @Override
+        public boolean isNotifyWorkDesired()
+        {
+            return _underlying.isNotifyWorkDesired();
+        }
+
+        @Override
+        public State getState()
+        {
+            return _underlying.getState();
+        }
+
+        @Override
+        public void consumerAdded(final 
MessageInstanceConsumer<WrappingTarget<T>> sub)
+        {
+            _consumer = new UnwrappingWrappingConsumer(sub, this);
+            _underlying.consumerAdded(_consumer);
+        }
+
+        @Override
+        public ListenableFuture<Void> consumerRemoved(final 
MessageInstanceConsumer<WrappingTarget<T>> sub)
+        {
+            return _underlying.consumerRemoved(_consumer);
+        }
+
+        @Override
+        public long getUnacknowledgedBytes()
+        {
+            return _underlying.getUnacknowledgedBytes();
+        }
+
+        @Override
+        public long getUnacknowledgedMessages()
+        {
+            return _underlying.getUnacknowledgedMessages();
+        }
+
+        @Override
+        public AMQSessionModel getSessionModel()
+        {
+            return _underlying.getSessionModel();
+        }
+
+        @Override
+        public long send(final MessageInstanceConsumer consumer,
+                         final MessageInstance entry,
+                         final boolean batch)
+        {
+            return _underlying.send(_consumer, entry, batch);
+        }
+
+        @Override
+        public boolean sendNextMessage()
+        {
+            return _underlying.sendNextMessage();
+        }
+
+        @Override
+        public void flushBatched()
+        {
+            _underlying.flushBatched();
+        }
+
+        @Override
+        public void queueEmpty()
+        {
+            _underlying.queueEmpty();
+        }
+
+        @Override
+        public boolean allocateCredit(final ServerMessage msg)
+        {
+            return _underlying.allocateCredit(msg);
+        }
+
+        @Override
+        public void restoreCredit(final ServerMessage queueEntry)
+        {
+            _underlying.restoreCredit(queueEntry);
+        }
+
+        @Override
+        public boolean isSuspended()
+        {
+            return _underlying.isSuspended();
+        }
+
+        @Override
+        public boolean close()
+        {
+            
_managementAddressSpace.removeProxyMessageSource(_connectionReference, _name);
+            ProxyMessageSource.this._consumer = null;
+            return _underlying.close();
+        }
+
+
+    }
+    private static class UnwrappingWrappingConsumer<T extends 
ConsumerTarget<T>> implements MessageInstanceConsumer<T>
+    {
+        private final MessageInstanceConsumer<WrappingTarget<T>> _underlying;
+        private final WrappingTarget<T> _target;
+
+        public UnwrappingWrappingConsumer(final 
MessageInstanceConsumer<WrappingTarget<T>> sub, WrappingTarget<T> wrappedTarget)
+        {
+            _underlying = sub;
+            _target = wrappedTarget;
+        }
+
+        @Override
+        public boolean isClosed()
+        {
+            return _underlying.isClosed();
+        }
+
+        @Override
+        public boolean acquires()
+        {
+            return _underlying.acquires();
+        }
+
+        @Override
+        public String getName()
+        {
+            return _underlying.getName();
+        }
+
+        @Override
+        public void close()
+        {
+            _underlying.close();
+        }
+
+        @Override
+        public void externalStateChange()
+        {
+            _underlying.externalStateChange();
+        }
+
+        @Override
+        public Object getIdentifier()
+        {
+            return _underlying.getIdentifier();
+        }
+
+        @Override
+        public MessageContainer pullMessage()
+        {
+            return _underlying.pullMessage();
+        }
+
+        @Override
+        public T getTarget()
+        {
+            return _target.getUnderlying();
+        }
+
+        @Override
+        public void setNotifyWorkDesired(final boolean desired)
+        {
+            _underlying.setNotifyWorkDesired(desired);
+        }
+    }
+}

Propchange: 
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/management/amqp/AmqpManagementTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/management/amqp/AmqpManagementTest.java?rev=1774263&r1=1774262&r2=1774263&view=diff
==============================================================================
--- 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/management/amqp/AmqpManagementTest.java
 (original)
+++ 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/management/amqp/AmqpManagementTest.java
 Wed Dec 14 15:18:33 2016
@@ -168,7 +168,7 @@ public class AmqpManagementTest extends
         message.setStringProperty("identity", "self");
         message.setStringProperty("type", "org.amqp.management");
         message.setStringProperty("operation", "QUERY");
-        message.setObject("attributeNames", new ArrayList<>());
+        message.setObject("attributeNames", "[]");
         message.setJMSReplyTo(_replyAddress);
 
         _producer.send(message);
@@ -217,7 +217,7 @@ public class AmqpManagementTest extends
         assertTrue("The attribute names are not a list", attributeNames 
instanceof Collection);
         attributeNamesCollection = (Collection)attributeNames;
         assertEquals("The attributeNames are no as expected", 
Arrays.asList("name", "identity", "type"), attributeNamesCollection);
-        Object resultsObject = ((MapMessage) 
responseMessage).getObject("results");
+        Object resultsObject = getValueFromMapResponse(responseMessage, 
"results");
         assertTrue("results is not a collection", resultsObject instanceof 
Collection);
         Collection results = (Collection)resultsObject;
 
@@ -295,11 +295,11 @@ public class AmqpManagementTest extends
         assertTrue("The response message does not have a status code",
                    
Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
         assertEquals("The response code did not indicate success", 201, 
responseMessage.getIntProperty("statusCode"));
-        assertTrue("The response was not a MapMessage", responseMessage 
instanceof MapMessage);
-        assertEquals("The created queue was not a standard queue", 
"org.apache.qpid.StandardQueue", 
((MapMessage)responseMessage).getString("type"));
-        assertEquals("The created queue was not a standard queue", "standard", 
((MapMessage)responseMessage).getString("qpid-type"));
-        assertEquals("the created queue did not have the correct alerting 
threshold", 100L, 
((MapMessage)responseMessage).getLong(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES));
-        Object identity = ((MapMessage) responseMessage).getObject("identity");
+        checkResponseIsMapType(responseMessage);
+        assertEquals("The created queue was not a standard queue", 
"org.apache.qpid.StandardQueue", getValueFromMapResponse(responseMessage, 
"type"));
+        assertEquals("The created queue was not a standard queue", "standard", 
getValueFromMapResponse(responseMessage, "qpid-type"));
+        assertEquals("the created queue did not have the correct alerting 
threshold", 100L, 
getValueFromMapResponse(responseMessage,ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES));
+        Object identity = getValueFromMapResponse(responseMessage,"identity");
 
         message = _session.createMapMessage();
 
@@ -316,8 +316,8 @@ public class AmqpManagementTest extends
         assertTrue("The response message does not have a status code",
                    
Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
         assertEquals("The response code did not indicate success", 200, 
responseMessage.getIntProperty("statusCode"));
-        assertTrue("The response was not a MapMessage", responseMessage 
instanceof MapMessage);
-        assertEquals("the created queue did not have the correct alerting 
threshold", 250L, 
((MapMessage)responseMessage).getLong(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES));
+        checkResponseIsMapType(responseMessage);
+        assertEquals("the created queue did not have the correct alerting 
threshold", 250L, getValueFromMapResponse(responseMessage, 
ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES));
 
         message = _session.createMapMessage();
 
@@ -530,9 +530,9 @@ public class AmqpManagementTest extends
         assertTrue("The response message does not have a status code",
                    
Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
         assertEquals("Incorrect response code", 200, 
responseMessage.getIntProperty("statusCode"));
-        assertTrue("The response was not a MapMessage", responseMessage 
instanceof MapMessage);
-        assertEquals("The name of the virtual host is not as expected", 
virtualHostName, ((MapMessage)responseMessage).getString("name"));
-        assertEquals("The type of the virtual host is not as expected", 
"Memory", ((MapMessage)responseMessage).getString("qpid-type"));
+        checkResponseIsMapType(responseMessage);
+        assertEquals("The name of the virtual host is not as expected", 
virtualHostName, getValueFromMapResponse(responseMessage, "name"));
+        assertEquals("The type of the virtual host is not as expected", 
"Memory", getValueFromMapResponse(responseMessage, "qpid-type"));
 
 
     }

Modified: qpid/java/trunk/test-profiles/Java10BrokenTestsExcludes
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/Java10BrokenTestsExcludes?rev=1774263&r1=1774262&r2=1774263&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/Java10BrokenTestsExcludes (original)
+++ qpid/java/trunk/test-profiles/Java10BrokenTestsExcludes Wed Dec 14 15:18:33 
2016
@@ -43,16 +43,6 @@ org.apache.qpid.systest.rest.PublishMess
 // this test fails - likely a client bug with the modification racing the send
 org.apache.qpid.test.unit.basic.BytesMessageTest#testModificationAfterSend
 
-// AmqpManagement at the broker level needs the client to support the 
request/response pattern using local target names
-// (since the synthetic virtual host cannot create a temporary queue).  This 
will require the client to support this
-org.apache.qpid.systest.management.amqp.AmqpManagementTest#testCreateQueueOnBrokerManagement
-org.apache.qpid.systest.management.amqp.AmqpManagementTest#testCreateBindingOnBrokerManagement
-org.apache.qpid.systest.management.amqp.AmqpManagementTest#testCreateConnectionOnBrokerManagement
-org.apache.qpid.systest.management.amqp.AmqpManagementTest#testGetTypesOnBrokerManagement
-org.apache.qpid.systest.management.amqp.AmqpManagementTest#testCreateVirtualHost
-org.apache.qpid.systest.management.amqp.AmqpManagementTest#testQueryBrokerManagement
-
-
 // This test fails with error The underlying correlation-id is not binary and 
so can't be returned
 // however the correlation id in the incoming message was set as byte[] so the 
issue is within the conversion to
 // the InternalMessage and back



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to