Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java Tue Mar 3 14:56:40 2015 @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.binding; -import java.security.AccessControlException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -45,10 +44,8 @@ import org.apache.qpid.server.model.Mana import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.StateTransition; -import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.util.StateChangeListener; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class BindingImpl extends AbstractConfiguredObject<BindingImpl> @@ -108,26 +105,6 @@ public class BindingImpl } } - @Override - protected void onCreate() - { - super.onCreate(); - try - { - _queue.getVirtualHost().getSecurityManager().authoriseCreateBinding(this); - } - catch(AccessControlException e) - { - deleted(); - throw e; - } - if (isDurable()) - { - _queue.getVirtualHost().getDurableConfigurationStore().create(asObjectRecord()); - } - - } - private static Map<String, Object> enhanceWithDurable(Map<String, Object> attributes, final AMQQueue queue, final ExchangeImpl exchange) @@ -263,12 +240,6 @@ public class BindingImpl { _arguments = arguments; BindingImpl.super.setAttribute(ARGUMENTS, getActualAttributes().get(ARGUMENTS), arguments); - if (isDurable()) - { - VirtualHostImpl<?, ?, ?> vhost = - (VirtualHostImpl<?, ?, ?>) _exchange.getParent(VirtualHost.class); - vhost.getDurableConfigurationStore().update(true, asObjectRecord()); - } } } ); @@ -278,6 +249,8 @@ public class BindingImpl @Override public void validateOnCreate() { + _queue.getVirtualHost().getSecurityManager().authoriseCreateBinding(this); + AMQQueue queue = getAMQQueue(); Map<String, Object> arguments = getArguments(); if (arguments!=null && !arguments.isEmpty() && FilterSupport.argumentsContainFilter(arguments))
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java Tue Mar 3 14:56:40 2015 @@ -48,6 +48,7 @@ public class BrokerProperties public static final String PROPERTY_QPID_HOME = "QPID_HOME"; public static final String PROPERTY_QPID_WORK = "QPID_WORK"; public static final String PROPERTY_LOG_RECORDS_BUFFER_SIZE = "qpid.broker_log_records_buffer_size"; + public static final String POSIX_FILE_PERMISSIONS = "qpid.default_posix_file_permissions"; private BrokerProperties() { Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java Tue Mar 3 14:56:40 2015 @@ -25,7 +25,6 @@ import java.util.Collection; import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.store.DurableConfigurationStore; public class StoreConfigurationChangeListener implements ConfigurationChangeListener @@ -43,7 +42,10 @@ public class StoreConfigurationChangeLis { if (newState == State.DELETED) { - _store.remove(object.asObjectRecord()); + if(object.isDurable()) + { + _store.remove(object.asObjectRecord()); + } object.removeChangeListener(this); } } @@ -51,20 +53,23 @@ public class StoreConfigurationChangeLis @Override public void childAdded(ConfiguredObject<?> object, ConfiguredObject<?> child) { - // exclude VirtualHostNode children from storing in broker store - if (!(object instanceof VirtualHostNode)) + if (!object.managesChildStorage()) { - child.addChangeListener(this); - _store.update(true,child.asObjectRecord()); + if(object.isDurable() && child.isDurable()) + { + child.addChangeListener(this); + _store.update(true, child.asObjectRecord()); - Class<? extends ConfiguredObject> categoryClass = child.getCategoryClass(); - Collection<Class<? extends ConfiguredObject>> childTypes = child.getModel().getChildTypes(categoryClass); + Class<? extends ConfiguredObject> categoryClass = child.getCategoryClass(); + Collection<Class<? extends ConfiguredObject>> childTypes = + child.getModel().getChildTypes(categoryClass); - for(Class<? extends ConfiguredObject> childClass : childTypes) - { - for (ConfiguredObject<?> grandchild : child.getChildren(childClass)) + for (Class<? extends ConfiguredObject> childClass : childTypes) { - childAdded(child, grandchild); + for (ConfiguredObject<?> grandchild : child.getChildren(childClass)) + { + childAdded(child, grandchild); + } } } } @@ -74,14 +79,20 @@ public class StoreConfigurationChangeLis @Override public void childRemoved(ConfiguredObject object, ConfiguredObject child) { - _store.remove(child.asObjectRecord()); + if(child.isDurable()) + { + _store.remove(child.asObjectRecord()); + } child.removeChangeListener(this); } @Override public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue) { - _store.update(false, object.asObjectRecord()); + if(object.isDurable()) + { + _store.update(false, object.asObjectRecord()); + } } @Override Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Tue Mar 3 14:56:40 2015 @@ -177,17 +177,6 @@ public abstract class AbstractExchange<T } @Override - protected void onCreate() - { - super.onCreate(); - if(isDurable()) - { - getVirtualHost().getDurableConfigurationStore().create(asObjectRecord()); - } - - } - - @Override public EventLogger getEventLogger() { return _virtualHost.getEventLogger(); @@ -213,12 +202,6 @@ public abstract class AbstractExchange<T throw new RequiredExchangeException(getName()); } - if (isDurable() && !isAutoDelete()) - { - getVirtualHost().getDurableConfigurationStore().remove(asObjectRecord()); - - } - if(_closed.compareAndSet(false,true)) { List<BindingImpl> bindings = new ArrayList<BindingImpl>(_bindings); @@ -241,11 +224,6 @@ public abstract class AbstractExchange<T } _closeTaskList.clear(); - if (isDurable() && !isAutoDelete()) - { - getVirtualHost().getDurableConfigurationStore().remove(asObjectRecord()); - - } } deleted(); } @@ -665,10 +643,6 @@ public abstract class AbstractExchange<T doRemoveBinding(b); queue.removeBinding(b); - if (b.isDurable()) - { - _virtualHost.getDurableConfigurationStore().remove(b.asObjectRecord()); - } b.delete(); } @@ -905,10 +879,6 @@ public abstract class AbstractExchange<T protected void changeAttributes(final Map<String, Object> attributes) { super.changeAttributes(attributes); - if (isDurable() && getState() != State.DELETED) - { - this.getVirtualHost().getDurableConfigurationStore().update(false, asObjectRecord()); - } } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java Tue Mar 3 14:56:40 2015 @@ -62,7 +62,8 @@ public class DefaultDestination implemen final AMQQueue q = _virtualHost.getQueue(routingAddress); if(q == null) { - if(routingAddress != null && routingAddress.contains("/") && !routingAddress.startsWith("/")) + routingAddress = _virtualHost.getLocalAddress(routingAddress); + if(routingAddress.contains("/") && !routingAddress.startsWith("/")) { String[] parts = routingAddress.split("/",2); ExchangeImpl exchange = _virtualHost.getExchange(parts[0]); @@ -71,7 +72,7 @@ public class DefaultDestination implemen return exchange.send(message, parts[1], instanceProperties, txn, postEnqueueAction); } } - else if(routingAddress == null || !routingAddress.contains("/")) + else if(!routingAddress.contains("/")) { ExchangeImpl exchange = _virtualHost.getExchange(routingAddress); if(exchange != null) Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java Tue Mar 3 14:56:40 2015 @@ -87,6 +87,12 @@ class HeadersBinding +"' with arguments: " + _binding.getArguments()); _filter = new MessageFilter() { + @Override + public String getName() + { + return ""; + } + @Override public boolean matches(Filterable message) { Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java Tue Mar 3 14:56:40 2015 @@ -14,26 +14,62 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. - * + * under the License. * + * */ package org.apache.qpid.server.filter; -// -// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> -// import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; -public interface FilterManager +public class FilterManager { - void add(MessageFilter filter); - void remove(MessageFilter filter); + private final Map<String, MessageFilter> _filters = new ConcurrentHashMap<>(); - boolean allAllow(Filterable msg); + public FilterManager() + { + } + + public void add(String name, MessageFilter filter) + { + _filters.put(name, filter); + } + + public boolean allAllow(Filterable msg) + { + for (MessageFilter filter : _filters.values()) + { + if (!filter.matches(msg)) + { + return false; + } + } + return true; + } + + public Iterator<MessageFilter> filters() + { + return _filters.values().iterator(); + } + + public boolean hasFilters() + { + return !_filters.isEmpty(); + } + + public boolean hasFilter(final String name) + { + return _filters.containsKey(name); + } + + @Override + public String toString() + { + return _filters.toString(); + } - Iterator<MessageFilter> filters(); - boolean hasFilters(); } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java Tue Mar 3 14:56:40 2015 @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.filter; +import java.util.Map; + import org.apache.log4j.Logger; import org.apache.qpid.common.AMQPFilterTypes; @@ -27,8 +29,6 @@ import org.apache.qpid.filter.SelectorPa import org.apache.qpid.filter.selector.ParseException; import org.apache.qpid.filter.selector.TokenMgrError; -import java.util.Map; - public class FilterManagerFactory { @@ -54,20 +54,13 @@ public class FilterManagerFactory if (selector instanceof String && !selector.equals("")) { - manager = new SimpleFilterManager(); + manager = new FilterManager(); try { - manager.add(new JMSSelectorFilter((String)selector)); - } - catch (ParseException e) - { - throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selector + "\"", e); - } - catch (SelectorParsingException e) - { - throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selector + "\"", e); + MessageFilter filter = new JMSSelectorFilter((String)selector); + manager.add(filter.getName(), filter); } - catch (TokenMgrError e) + catch (ParseException | SelectorParsingException | TokenMgrError e) { throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selector + "\"", e); } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java Tue Mar 3 14:56:40 2015 @@ -26,12 +26,14 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.WeakHashMap; + import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.filter.SelectorParsingException; import org.apache.qpid.filter.selector.ParseException; import org.apache.qpid.filter.selector.TokenMgrError; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.plugin.PluggableService; import org.apache.qpid.server.queue.AMQQueue; public class FilterSupport @@ -57,15 +59,7 @@ public class FilterSupport { selector = new JMSSelectorFilter(selectorString); } - catch (ParseException e) - { - throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); - } - catch (SelectorParsingException e) - { - throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); - } - catch (TokenMgrError e) + catch (ParseException | SelectorParsingException | TokenMgrError e) { throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); } @@ -119,6 +113,7 @@ public class FilterSupport } } + @PluggableService public static final class NoLocalFilter implements MessageFilter { private final MessageSource _queue; @@ -128,6 +123,12 @@ public class FilterSupport _queue = queue; } + @Override + public String getName() + { + return AMQPFilterTypes.NO_LOCAL.toString(); + } + public boolean matches(Filterable message) { @@ -165,6 +166,8 @@ public class FilterSupport { return _queue != null ? _queue.hashCode() : 0; } + + } static final class CompoundFilter implements MessageFilter @@ -178,6 +181,12 @@ public class FilterSupport _jmsSelectorFilter = jmsSelectorFilter; } + @Override + public String getName() + { + return ""; + } + public boolean matches(Filterable message) { return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message); @@ -216,5 +225,7 @@ public class FilterSupport result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0); return result; } + + } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java Tue Mar 3 14:56:40 2015 @@ -34,6 +34,10 @@ public interface Filterable Object getConnectionReference(); + long getMessageNumber(); + + long getArrivalTime(); + public class Factory { @@ -41,6 +45,7 @@ public interface Filterable { return new Filterable() { + @Override public AMQMessageHeader getMessageHeader() { @@ -64,6 +69,18 @@ public interface Filterable { return message.getConnectionReference(); } + + @Override + public long getMessageNumber() + { + return message.getMessageNumber(); + } + + @Override + public long getArrivalTime() + { + return message.getArrivalTime(); + } }; } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java Tue Mar 3 14:56:40 2015 @@ -25,14 +25,18 @@ import org.apache.commons.lang.builder.H import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; import org.apache.log4j.Logger; + +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.filter.BooleanExpression; import org.apache.qpid.filter.FilterableMessage; import org.apache.qpid.filter.SelectorParsingException; import org.apache.qpid.filter.selector.ParseException; import org.apache.qpid.filter.selector.SelectorParser; import org.apache.qpid.filter.selector.TokenMgrError; +import org.apache.qpid.server.plugin.PluggableService; +@PluggableService public class JMSSelectorFilter implements MessageFilter { private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class); @@ -46,6 +50,12 @@ public class JMSSelectorFilter implement _matcher = new SelectorParser().parse(selector); } + @Override + public String getName() + { + return AMQPFilterTypes.JMS_SELECTOR.toString(); + } + public boolean matches(Filterable message) { Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java Tue Mar 3 14:56:40 2015 @@ -22,5 +22,6 @@ package org.apache.qpid.server.filter; public interface MessageFilter { + String getName(); boolean matches(Filterable message); } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java Tue Mar 3 14:56:40 2015 @@ -122,8 +122,11 @@ public abstract class AbstractConfigured private final TaskExecutor _taskExecutor; private final Class<? extends ConfiguredObject> _category; + private final Class<? extends ConfiguredObject> _typeClass; private final Class<? extends ConfiguredObject> _bestFitInterface; private final Model _model; + private final boolean _managesChildStorage; + @ManagedAttributeField private long _createdTime; @@ -206,6 +209,8 @@ public abstract class AbstractConfigured _model = model; _category = ConfiguredObjectTypeRegistry.getCategory(getClass()); + Class<? extends ConfiguredObject> typeClass = model.getTypeRegistry().getTypeClass(getClass()); + _typeClass = typeClass == null ? _category : typeClass; _attributeTypes = model.getTypeRegistry().getAttributeTypes(getClass()); _automatedFields = model.getTypeRegistry().getAutomatedFields(getClass()); @@ -242,6 +247,7 @@ public abstract class AbstractConfigured } _type = ConfiguredObjectTypeRegistry.getType(getClass()); + _managesChildStorage = managesChildren(_category) || managesChildren(_typeClass); _bestFitInterface = calculateBestFitInterface(); if(attributes.get(TYPE) != null && !_type.equals(attributes.get(TYPE))) @@ -315,6 +321,11 @@ public abstract class AbstractConfigured } } + private boolean managesChildren(final Class<? extends ConfiguredObject> clazz) + { + return clazz.getAnnotation(ManagedObject.class).managesChildren(); + } + private Class<? extends ConfiguredObject> calculateBestFitInterface() { Set<Class<? extends ConfiguredObject>> candidates = new HashSet<Class<? extends ConfiguredObject>>(); @@ -1056,11 +1067,24 @@ public abstract class AbstractConfigured return _model; } + @Override public Class<? extends ConfiguredObject> getCategoryClass() { return _category; } + @Override + public Class<? extends ConfiguredObject> getTypeClass() + { + return _typeClass; + } + + @Override + public boolean managesChildStorage() + { + return _managesChildStorage; + } + public Map<String,String> getContext() { return _context == null ? Collections.<String,String>emptyMap() : Collections.unmodifiableMap(_context); @@ -1219,8 +1243,7 @@ public abstract class AbstractConfigured if(attr != null && (attr.isAutomated() || attr.isDerived())) { Object value = attr.getValue((X)this); - if(value != null && attr.isSecure() && - !SecurityManager.isSystemProcess()) + if(value != null && !SecurityManager.isSystemProcess() && attr.isSecureValue(value)) { return SECURE_VALUES.get(value.getClass()); } @@ -1620,8 +1643,9 @@ public abstract class AbstractConfigured { Object desired = attributes.get(name); Object expected = getAttribute(name); - if(((_attributes.get(name) != null && !_attributes.get(name).equals(attributes.get(name))) - || attributes.get(name) != null) + Object currentValue = _attributes.get(name); + if(((currentValue != null && !currentValue.equals(desired)) + || (currentValue == null && desired != null)) && changeAttribute(name, expected, desired)) { attributeSet(name, expected, desired); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java Tue Mar 3 14:56:40 2015 @@ -50,6 +50,25 @@ abstract class AttributeValueConverter<T } }; + static final AttributeValueConverter<Object> OBJECT_CONVERTER = new AttributeValueConverter<Object>() + { + @Override + public Object convert(final Object value, final ConfiguredObject object) + { + if(value instanceof String) + { + return AbstractConfiguredObject.interpolate(object, (String) value); + } + else if(value == null) + { + return null; + } + else + { + return value; + } + } + }; static final AttributeValueConverter<UUID> UUID_CONVERTER = new AttributeValueConverter<UUID>() { @Override @@ -398,7 +417,17 @@ abstract class AttributeValueConverter<T } else if(Map.class.isAssignableFrom(type)) { - return (AttributeValueConverter<X>) MAP_CONVERTER; + if(returnType instanceof ParameterizedType) + { + Type keyType = ((ParameterizedType) returnType).getActualTypeArguments()[0]; + Type valueType = ((ParameterizedType) returnType).getActualTypeArguments()[1]; + + return (AttributeValueConverter<X>) new GenericMapConverter(keyType,valueType); + } + else + { + return (AttributeValueConverter<X>) MAP_CONVERTER; + } } else if(Collection.class.isAssignableFrom(type)) { @@ -416,6 +445,10 @@ abstract class AttributeValueConverter<T { return (AttributeValueConverter<X>) new ConfiguredObjectConverter(type); } + else if(Object.class == type) + { + return (AttributeValueConverter<X>) OBJECT_CONVERTER; + } throw new IllegalArgumentException("Cannot create attribute converter of type " + type.getName()); } @@ -575,6 +608,62 @@ abstract class AttributeValueConverter<T } } + public static class GenericMapConverter extends AttributeValueConverter<Map> + { + + private final AttributeValueConverter<?> _keyConverter; + private final AttributeValueConverter<?> _valueConverter; + + + public GenericMapConverter(final Type keyType, final Type valueType) + { + _keyConverter = getConverter(getRawType(keyType), keyType); + + _valueConverter = getConverter(getRawType(valueType), valueType); + } + + + @Override + public Map convert(final Object value, final ConfiguredObject object) + { + if(value instanceof Map) + { + Map<?,?> original = (Map<?,?>)value; + Map converted = new LinkedHashMap(original.size()); + for(Map.Entry<?,?> entry : original.entrySet()) + { + converted.put(_keyConverter.convert(entry.getKey(),object), + _valueConverter.convert(entry.getValue(), object)); + } + return Collections.unmodifiableMap(converted); + } + else if(value == null) + { + return null; + } + else + { + if(value instanceof String) + { + String interpolated = AbstractConfiguredObject.interpolate(object, (String) value); + ObjectMapper objectMapper = new ObjectMapper(); + try + { + return convert(objectMapper.readValue(interpolated, Map.class), object); + } + catch (IOException e) + { + // fall through to the non-JSON single object case + } + } + + throw new IllegalArgumentException("Cannot convert type " + value.getClass() + " to a Map"); + } + + } + } + + static final class EnumConverter<X extends Enum<X>> extends AttributeValueConverter<X> { private final Class<X> _klazz; Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java Tue Mar 3 14:56:40 2015 @@ -28,6 +28,7 @@ import java.lang.reflect.Type; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.regex.Pattern; import org.apache.log4j.Logger; @@ -37,6 +38,7 @@ public class ConfiguredAutomatedAttribut private final ManagedAttribute _annotation; private final Method _validValuesMethod; + private final Pattern _secureValuePattern; ConfiguredAutomatedAttribute(final Class<C> clazz, final Method getter, @@ -53,6 +55,16 @@ public class ConfiguredAutomatedAttribut validValuesMethod = getValidValuesMethod(validValue, clazz); } _validValuesMethod = validValuesMethod; + + String secureValueFilter = _annotation.secureValueFilter(); + if (secureValueFilter == null || "".equals(secureValueFilter)) + { + _secureValuePattern = null; + } + else + { + _secureValuePattern = Pattern.compile(secureValueFilter); + } } private Method getValidValuesMethod(final String validValue, final Class<C> clazz) @@ -140,6 +152,11 @@ public class ConfiguredAutomatedAttribut return _annotation.description(); } + public Pattern getSecureValueFilter() + { + return _secureValuePattern; + } + public Collection<String> validValues() { if(_validValuesMethod != null) Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedAttribute.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedAttribute.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedAttribute.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedAttribute.java Tue Mar 3 14:56:40 2015 @@ -21,10 +21,12 @@ package org.apache.qpid.server.model; import java.lang.reflect.Method; +import java.util.regex.Pattern; public class ConfiguredDerivedAttribute<C extends ConfiguredObject, T> extends ConfiguredObjectAttribute<C,T> { private final DerivedAttribute _annotation; + private final Pattern _secureValuePattern; ConfiguredDerivedAttribute(final Class<C> clazz, final Method getter, @@ -32,6 +34,16 @@ public class ConfiguredDerivedAttribute< { super(clazz, getter); _annotation = annotation; + + String secureValueFilter = _annotation.secureValueFilter(); + if (secureValueFilter == null || "".equals(secureValueFilter)) + { + _secureValuePattern = null; + } + else + { + _secureValuePattern = Pattern.compile(secureValueFilter); + } } public boolean isAutomated() @@ -72,4 +84,10 @@ public class ConfiguredDerivedAttribute< return _annotation.description(); } + @Override + public Pattern getSecureValueFilter() + { + return _secureValuePattern; + } + } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java Tue Mar 3 14:56:40 2015 @@ -239,6 +239,9 @@ public interface ConfiguredObject<X exte void setAttributes(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException; Class<? extends ConfiguredObject> getCategoryClass(); + Class<? extends ConfiguredObject> getTypeClass(); + + boolean managesChildStorage(); <C extends ConfiguredObject<C>> C findConfiguredObject(Class<C> clazz, String name); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java Tue Mar 3 14:56:40 2015 @@ -22,6 +22,7 @@ package org.apache.qpid.server.model; import java.lang.reflect.Method; import java.lang.reflect.Type; +import java.util.regex.Pattern; public abstract class ConfiguredObjectAttribute<C extends ConfiguredObject, T> extends ConfiguredObjectAttributeOrStatistic<C,T> { @@ -49,6 +50,25 @@ public abstract class ConfiguredObjectAt public abstract String getDescription(); + public abstract Pattern getSecureValueFilter(); + + public boolean isSecureValue(Object value) + { + if (isSecure()) + { + Pattern filter = getSecureValueFilter(); + if (filter == null) + { + return true; + } + else + { + return filter.matcher(String.valueOf(value)).matches(); + } + } + return false; + } + public T convert(final Object value, C object) { final AttributeValueConverter<T> converter = getConverter(); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java Tue Mar 3 14:56:40 2015 @@ -156,7 +156,7 @@ public class ConfiguredObjectFactoryImpl factory = categoryFactories.get(_defaultTypes.get(category)); if(factory == null) { - throw new NoFactoryForTypeException(category, _defaultTypes.get(category)); + throw new NoFactoryForTypeException(category, type); } } return factory; Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java Tue Mar 3 14:56:40 2015 @@ -385,7 +385,7 @@ public class ConfiguredObjectTypeRegistr return null; } - private Class<? extends ConfiguredObject> getTypeClass(final Class<? extends ConfiguredObject> clazz) + public Class<? extends ConfiguredObject> getTypeClass(final Class<? extends ConfiguredObject> clazz) { String typeName = getType(clazz); Class<? extends ConfiguredObject> typeClass = null; Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/DerivedAttribute.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/DerivedAttribute.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/DerivedAttribute.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/DerivedAttribute.java Tue Mar 3 14:56:40 2015 @@ -32,4 +32,5 @@ public @interface DerivedAttribute boolean persist() default false; String description() default ""; boolean oversize() default false; + String secureValueFilter() default ""; } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java Tue Mar 3 14:56:40 2015 @@ -37,4 +37,5 @@ public @interface ManagedAttribute String[] validValues() default {}; boolean oversize() default false; String oversizedAltText() default ""; + String secureValueFilter() default ""; } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Tue Mar 3 14:56:40 2015 @@ -21,6 +21,8 @@ package org.apache.qpid.server.model; import java.util.Collection; +import java.util.List; +import java.util.Map; import org.apache.qpid.server.queue.QueueEntryVisitor; import org.apache.qpid.server.store.MessageDurability; @@ -48,6 +50,8 @@ public interface Queue<X extends Queue<X String QUEUE_FLOW_STOPPED = "queueFlowStopped"; String MAXIMUM_MESSAGE_TTL = "maximumMessageTtl"; String MINIMUM_MESSAGE_TTL = "minimumMessageTtl"; + String DEFAULT_FILTERS = "defaultFilters"; + String ENSURE_NONDESTRUCTIVE_CONSUMERS = "ensureNondestructiveConsumers"; String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint"; @ManagedContextDefault( name = QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT) @@ -67,6 +71,9 @@ public interface Queue<X extends Queue<X @ManagedAttribute( defaultValue = "NONE" ) ExclusivityPolicy getExclusive(); + @ManagedAttribute( defaultValue = "false" ) + boolean isEnsureNondestructiveConsumers(); + @DerivedAttribute( persist = true ) String getOwner(); @@ -155,6 +162,9 @@ public interface Queue<X extends Queue<X @ManagedAttribute long getMaximumMessageTtl(); + @ManagedAttribute + Map<String, Map<String,List<String>>> getDefaultFilters(); + //children Collection<? extends Binding> getBindings(); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java Tue Mar 3 14:56:40 2015 @@ -20,9 +20,6 @@ */ package org.apache.qpid.server.model; -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.ManagedObject; - @ManagedObject(category=true, managesChildren=false, creatable=false) public interface RemoteReplicationNode<X extends RemoteReplicationNode<X>> extends ConfiguredObject<X> { Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemConfig.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemConfig.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemConfig.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemConfig.java Tue Mar 3 14:56:40 2015 @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.model; +import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogRecorder; import org.apache.qpid.server.store.DurableConfigurationStore; @@ -37,6 +38,9 @@ public interface SystemConfig<X extends String INITIAL_CONFIGURATION_LOCATION = "initialConfigurationLocation"; String STARTUP_LOGGED_TO_SYSTEM_OUT = "startupLoggedToSystemOut"; + @ManagedContextDefault(name = BrokerProperties.POSIX_FILE_PERMISSIONS) + String DEFAULT_POSIX_FILE_PERMISSIONS = "rw-r-----"; + @ManagedAttribute(defaultValue = "false") boolean isManagementMode(); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Tue Mar 3 14:56:40 2015 @@ -22,14 +22,16 @@ package org.apache.qpid.server.model; import java.security.AccessControlException; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.store.MessageStore; -@ManagedObject( managesChildren = true, defaultType = "ProvidedStore") +@ManagedObject( defaultType = "ProvidedStore") public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>, E extends Exchange<?> > extends ConfiguredObject<X> { @@ -42,6 +44,9 @@ public interface VirtualHost<X extends V String STORE_TRANSACTION_OPEN_TIMEOUT_WARN = "storeTransactionOpenTimeoutWarn"; String HOUSE_KEEPING_THREAD_COUNT = "houseKeepingThreadCount"; String MODEL_VERSION = "modelVersion"; + String ENABLED_CONNECTION_VALIDATORS = "enabledConnectionValidators"; + String DISABLED_CONNECTION_VALIDATORS = "disabledConnectionValidators"; + String GLOBAL_ADDRESS_DOMAINS = "globalAddressDomains"; @ManagedContextDefault( name = "queue.deadLetterQueueEnabled") public static final boolean DEFAULT_DEAD_LETTER_QUEUE_ENABLED = false; @@ -88,6 +93,21 @@ public interface VirtualHost<X extends V @DerivedAttribute( persist = true ) String getModelVersion(); + @ManagedContextDefault( name = "virtualhost.enabledConnectionValidators") + String DEFAULT_ENABLED_VALIDATORS = "[]"; + + @ManagedAttribute( defaultValue = "${virtualhost.enabledConnectionValidators}") + List<String> getEnabledConnectionValidators(); + + @ManagedContextDefault( name = "virtualhost.disabledConnectionValidators") + String DEFAULT_DISABLED_VALIDATORS = "[]"; + + @ManagedAttribute( defaultValue = "${virtualhost.disabledConnectionValidators}") + List<String> getDisabledConnectionValidators(); + + @ManagedAttribute( defaultValue = "[]") + List<String> getGlobalAddressDomains(); + @ManagedStatistic long getQueueCount(); @@ -129,6 +149,8 @@ public interface VirtualHost<X extends V void delete(); + String getRedirectHost(AmqpPort<?> port); + public static interface Transaction { void dequeue(MessageInstance entry); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java Tue Mar 3 14:56:40 2015 @@ -24,7 +24,7 @@ import java.util.Collection; import org.apache.qpid.server.store.DurableConfigurationStore; -@ManagedObject(category=true, managesChildren=false) +@ManagedObject(category=true, managesChildren=true) public interface VirtualHostNode<X extends VirtualHostNode<X>> extends ConfiguredObject<X> { String QPID_INITIAL_CONFIG_VIRTUALHOST_CONFIG_VAR = "qpid.initial_config_virtualhost_config"; Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java Tue Mar 3 14:56:40 2015 @@ -80,7 +80,7 @@ public final class ConnectionAdapter ext { Map<String,Object> attributes = new HashMap<String, Object>(); attributes.put(ID, UUID.randomUUID()); - attributes.put(NAME, _connection.getRemoteAddressString().replaceAll("/", "")); + attributes.put(NAME, "[" + _connection.getConnectionId() + "] " + _connection.getRemoteAddressString().replaceAll("/", "")); attributes.put(DURABLE, false); return attributes; } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java Tue Mar 3 14:56:40 2015 @@ -25,7 +25,7 @@ import org.apache.qpid.server.model.Grou import org.apache.qpid.server.model.ManagedAttribute; import org.apache.qpid.server.model.ManagedObject; -@ManagedObject( category = false, type = "GroupFile" ) +@ManagedObject( category = false, type = "GroupFile", managesChildren = true ) public interface FileBasedGroupProvider<X extends FileBasedGroupProvider<X>> extends GroupProvider<X>, GroupManagingGroupProvider { String PATH="path"; Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java Tue Mar 3 14:56:40 2015 @@ -34,6 +34,7 @@ import java.util.UUID; import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.Broker; @@ -50,6 +51,7 @@ import org.apache.qpid.server.security.a import org.apache.qpid.server.security.auth.UsernamePrincipal; import org.apache.qpid.server.security.group.FileGroupDatabase; import org.apache.qpid.server.security.group.GroupPrincipal; +import org.apache.qpid.server.util.FileHelper; public class FileBasedGroupProviderImpl extends AbstractConfiguredObject<FileBasedGroupProviderImpl> implements FileBasedGroupProvider<FileBasedGroupProviderImpl> @@ -162,9 +164,11 @@ public class FileBasedGroupProviderImpl { throw new IllegalConfigurationException(String.format("Cannot create groups file at '%s'",_path)); } + try { - file.createNewFile(); + String posixFileAttributes = getContextValue(String.class, BrokerProperties.POSIX_FILE_PERMISSIONS); + new FileHelper().createNewFile(file, posixFileAttributes); } catch (IOException e) { Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java Tue Mar 3 14:56:40 2015 @@ -21,14 +21,14 @@ package org.apache.qpid.server.model.adapter; -import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -38,6 +38,9 @@ import java.util.Set; import java.util.TreeMap; import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.BrokerProperties; +import org.apache.qpid.server.util.BaseAction; +import org.apache.qpid.server.util.FileHelper; import org.codehaus.jackson.JsonParser; import org.codehaus.jackson.JsonProcessingException; import org.codehaus.jackson.map.ObjectMapper; @@ -118,7 +121,7 @@ public class FileSystemPreferencesProvid FileSystemPreferencesStore store = new FileSystemPreferencesStore(new File(_path)); // we need to check and create file if it does not exist every time on open - store.createIfNotExist(); + store.createIfNotExist(getContextValue(String.class, BrokerProperties.POSIX_FILE_PERMISSIONS)); store.open(); _store = store; _open = true; @@ -184,6 +187,7 @@ public class FileSystemPreferencesProvid if(_store != null) { + _store.close(); _store.delete(); deleted(); _authenticationProvider.setPreferencesProvider(null); @@ -280,7 +284,7 @@ public class FileSystemPreferencesProvid else { FileSystemPreferencesStore store = new FileSystemPreferencesStore(new File(_path)); - store.createIfNotExist(); + store.createIfNotExist(getContextValue(String.class, BrokerProperties.POSIX_FILE_PERMISSIONS)); store.open(); _store = store; } @@ -334,9 +338,9 @@ public class FileSystemPreferencesProvid { private final ObjectMapper _objectMapper; private final Map<String, Map<String, Object>> _preferences; + private final FileHelper _fileHelper; private File _storeFile; private FileLock _storeLock; - private RandomAccessFile _storeRAF; public FileSystemPreferencesStore(File preferencesFile) { @@ -345,9 +349,10 @@ public class FileSystemPreferencesProvid _objectMapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, true); _objectMapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true); _preferences = new TreeMap<String, Map<String, Object>>(); + _fileHelper = new FileHelper(); } - public void createIfNotExist() + public void createIfNotExist(String filePermissions) { if (!_storeFile.exists()) { @@ -358,7 +363,8 @@ public class FileSystemPreferencesProvid } try { - if (_storeFile.createNewFile() && !_storeFile.exists()) + Path path = _fileHelper.createNewFile(_storeFile, filePermissions); + if (!Files.exists(path)) { throw new IllegalConfigurationException(String.format("Cannot create preferences store file at '%s'", _storeFile.getAbsolutePath())); } @@ -391,43 +397,20 @@ public class FileSystemPreferencesProvid } try { - _storeRAF = new RandomAccessFile(_storeFile, "rw"); - FileChannel fileChannel = _storeRAF.getChannel(); - try - { - _storeLock = fileChannel.tryLock(); - } - catch (OverlappingFileLockException e) - { - _storeLock = null; - } - if (_storeLock == null) + getFileLock(_storeFile.getPath() + ".lck"); + if (_storeFile.length() > 0) { - throw new IllegalConfigurationException("Cannot get lock on store file " + _storeFile.getName() - + " is another instance running?"); - } - long fileSize = fileChannel.size(); - if (fileSize > 0) - { - ByteBuffer buffer = ByteBuffer.allocate((int) fileSize); - fileChannel.read(buffer); - buffer.rewind(); - buffer.flip(); - byte[] data = buffer.array(); - try - { - Map<String, Map<String, Object>> preferencesMap = _objectMapper.readValue(data, - new TypeReference<Map<String, Map<String, Object>>>() - { - }); - _preferences.putAll(preferencesMap); - } - catch (JsonProcessingException e) - { - throw new IllegalConfigurationException("Cannot parse preferences json in " + _storeFile.getName(), e); - } + Map<String, Map<String, Object>> preferencesMap = _objectMapper.readValue(_storeFile, + new TypeReference<Map<String, Map<String, Object>>>() + { + }); + _preferences.putAll(preferencesMap); } } + catch (JsonProcessingException e) + { + throw new IllegalConfigurationException("Cannot parse preferences json in " + _storeFile.getName(), e); + } catch (IOException e) { throw new IllegalConfigurationException("Cannot load preferences from " + _storeFile.getName(), e); @@ -443,6 +426,7 @@ public class FileSystemPreferencesProvid if (_storeLock != null) { _storeLock.release(); + _storeLock.channel().close(); } } catch (IOException e) @@ -452,22 +436,7 @@ public class FileSystemPreferencesProvid finally { _storeLock = null; - try - { - if (_storeRAF != null) - { - _storeRAF.close(); - } - } - catch (IOException e) - { - LOGGER.error("Cannot close preferences file", e); - } - finally - { - _storeRAF = null; - _preferences.clear(); - } + _preferences.clear(); } } } @@ -544,16 +513,14 @@ public class FileSystemPreferencesProvid checkStoreOpened(); try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - _objectMapper.writeValue(baos, _preferences); - FileChannel channel = _storeRAF.getChannel(); - long currentSize = channel.size(); - channel.position(0); - channel.write(ByteBuffer.wrap(baos.toByteArray())); - if (currentSize > baos.size()) + _fileHelper.writeFileSafely(_storeFile.toPath(), new BaseAction<File, IOException>() { - channel.truncate(baos.size()); - } + @Override + public void performAction(File file) throws IOException + { + _objectMapper.writeValue(file, _preferences); + } + }); } catch (IOException e) { @@ -569,5 +536,32 @@ public class FileSystemPreferencesProvid } } + private void getFileLock(String lockFilePath) + { + File lockFile = new File(lockFilePath); + try + { + lockFile.createNewFile(); + lockFile.deleteOnExit(); + + @SuppressWarnings("resource") + FileOutputStream out = new FileOutputStream(lockFile); + FileChannel channel = out.getChannel(); + _storeLock = channel.tryLock(); + } + catch (IOException ioe) + { + throw new IllegalStateException("Cannot create the lock file " + lockFile.getName(), ioe); + } + catch(OverlappingFileLockException e) + { + _storeLock = null; + } + + if(_storeLock == null) + { + throw new IllegalStateException("Cannot get lock on file " + lockFile.getAbsolutePath() + ". Is another instance running?"); + } + } } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java Tue Mar 3 14:56:40 2015 @@ -19,8 +19,11 @@ package org.apache.qpid.server.plugin; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.ServiceLoader; import org.apache.log4j.Logger; @@ -47,6 +50,16 @@ public class QpidServiceLoader return instancesOf(clazz, true); } + public <C extends Pluggable> Map<String,C> getInstancesByType(Class<C> clazz) + { + Map<String,C> instances = new HashMap<>(); + for(C instance : instancesOf(clazz)) + { + instances.put(instance.getType(), instance); + } + return Collections.unmodifiableMap(instances); + } + private <C extends Pluggable> Iterable<C> instancesOf(Class<C> clazz, boolean atLeastOne) { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Mar 3 14:56:40 2015 @@ -30,7 +30,9 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; @@ -52,6 +54,7 @@ import org.apache.qpid.server.consumer.C import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; import org.apache.qpid.server.filter.FilterManager; +import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; @@ -75,6 +78,8 @@ import org.apache.qpid.server.model.Queu import org.apache.qpid.server.model.QueueNotificationListener; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.StateTransition; +import org.apache.qpid.server.plugin.MessageFilterFactory; +import org.apache.qpid.server.plugin.QpidServiceLoader; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.SecurityManager; @@ -186,6 +191,9 @@ public abstract class AbstractQueue<X ex @ManagedAttributeField private MessageDurability _messageDurability; + @ManagedAttributeField + private Map<String, Map<String,List<String>>> _defaultFilters; + private Object _exclusiveOwner; // could be connection, session, Principal or a String for the container name private final Set<NotificationCheck> _notificationChecks = @@ -241,12 +249,15 @@ public abstract class AbstractQueue<X ex private long _minimumMessageTtl; @ManagedAttributeField private long _maximumMessageTtl; + @ManagedAttributeField + private boolean _ensureNondestructiveConsumers; private final AtomicBoolean _recovering = new AtomicBoolean(true); private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>(); private final QueueRunner _queueRunner = new QueueRunner(this); private boolean _closing; + private final ConcurrentMap<String,MessageFilter> _defaultFiltersMap = new ConcurrentHashMap<>(); protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost) { @@ -283,11 +294,7 @@ public abstract class AbstractQueue<X ex }); } - if (isDurable()) - { - _virtualHost.getDurableConfigurationStore().create(asObjectRecord()); - } - else if(getMessageDurability() != MessageDurability.NEVER) + if(!isDurable() && getMessageDurability() != MessageDurability.NEVER) { Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>() @@ -351,17 +358,9 @@ public abstract class AbstractQueue<X ex case PRINCIPAL: _exclusiveOwner = sessionModel.getConnectionModel().getAuthorizedPrincipal(); - if(isDurable()) - { - _virtualHost.getDurableConfigurationStore().update(false,asObjectRecord()); - } break; case CONTAINER: _exclusiveOwner = sessionModel.getConnectionModel().getRemoteContainerName(); - if(isDurable()) - { - _virtualHost.getDurableConfigurationStore().update(false,asObjectRecord()); - } break; case CONNECTION: _exclusiveOwner = sessionModel.getConnectionModel(); @@ -450,6 +449,40 @@ public abstract class AbstractQueue<X ex } _maxAsyncDeliveries = getContextValue(Integer.class, Queue.MAX_ASYNCHRONOUS_DELIVERIES); + + if(_defaultFilters != null) + { + QpidServiceLoader qpidServiceLoader = new QpidServiceLoader(); + final Map<String, MessageFilterFactory> messageFilterFactories = + qpidServiceLoader.getInstancesByType(MessageFilterFactory.class); + + for (Map.Entry<String,Map<String,List<String>>> entry : _defaultFilters.entrySet()) + { + String name = String.valueOf(entry.getKey()); + Map<String, List<String>> filterValue = entry.getValue(); + if(filterValue.size() == 1) + { + String filterTypeName = String.valueOf(filterValue.keySet().iterator().next()); + MessageFilterFactory filterFactory = messageFilterFactories.get(filterTypeName); + if(filterFactory != null) + { + List<String> filterArguments = filterValue.values().iterator().next(); + _defaultFiltersMap.put(name, filterFactory.newInstance(filterArguments)); + } + else + { + throw new IllegalArgumentException("Unknown filter type " + filterTypeName + ", known types are: " + messageFilterFactories.keySet()); + } + } + else + { + throw new IllegalArgumentException("Filter value should be a map with one entry, having the type as key and the value being the filter arguments, not " + filterValue); + + } + + } + } + updateAlertChecks(); } @@ -555,6 +588,12 @@ public abstract class AbstractQueue<X ex } @Override + public Map<String, Map<String, List<String>>> getDefaultFilters() + { + return _defaultFilters; + } + + @Override public final MessageDurability getMessageDurability() { return _messageDurability; @@ -573,6 +612,14 @@ public abstract class AbstractQueue<X ex } @Override + public boolean isEnsureNondestructiveConsumers() + { + return _ensureNondestructiveConsumers; + } + + + + @Override public Collection<String> getAvailableAttributes() { return new ArrayList<String>(_arguments.keySet()); @@ -603,7 +650,7 @@ public abstract class AbstractQueue<X ex @Override public synchronized QueueConsumerImpl addConsumer(final ConsumerTarget target, - final FilterManager filters, + FilterManager filters, final Class<? extends ServerMessage> messageClass, final String consumerName, EnumSet<ConsumerImpl.Option> optionSet) @@ -699,6 +746,26 @@ public abstract class AbstractQueue<X ex { throw new ExistingConsumerPreventsExclusive(); } + if(!_defaultFiltersMap.isEmpty()) + { + if(filters == null) + { + filters = new FilterManager(); + } + for (Map.Entry<String,MessageFilter> filter : _defaultFiltersMap.entrySet()) + { + if(!filters.hasFilter(filter.getKey())) + { + filters.add(filter.getKey(), filter.getValue()); + } + } + } + + if(_ensureNondestructiveConsumers) + { + optionSet = EnumSet.copyOf(optionSet); + optionSet.removeAll(EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, ConsumerImpl.Option.ACQUIRES)); + } QueueConsumerImpl consumer = new QueueConsumerImpl(this, target, Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java Tue Mar 3 14:56:40 2015 @@ -62,11 +62,16 @@ public class QueueArgumentsConverter public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue"; + public static final String QPID_DEFAULT_FILTERS = "qpid.default_filters"; + + public static final String QPID_ENSURE_NONDESTRUCTIVE_CONSUMERS = "qpid.ensure_nondestructive_consumers"; /** * No-local queue argument is used to support the no-local feature of Durable Subscribers. */ public static final String QPID_NO_LOCAL = "no-local"; + static final Map<String, String> ATTRIBUTE_MAPPINGS = new LinkedHashMap<String, String>(); + static { ATTRIBUTE_MAPPINGS.put(X_QPID_MINIMUM_ALERT_REPEAT_GAP, Queue.ALERT_REPEAT_GAP); @@ -99,6 +104,8 @@ public class QueueArgumentsConverter ATTRIBUTE_MAPPINGS.put(QPID_NO_LOCAL, Queue.NO_LOCAL); ATTRIBUTE_MAPPINGS.put(QPID_MESSAGE_DURABILITY, Queue.MESSAGE_DURABILITY); + ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_FILTERS, Queue.DEFAULT_FILTERS); + ATTRIBUTE_MAPPINGS.put(QPID_ENSURE_NONDESTRUCTIVE_CONSUMERS, Queue.ENSURE_NONDESTRUCTIVE_CONSUMERS); } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue Mar 3 14:56:40 2015 @@ -371,11 +371,16 @@ public abstract class QueueEntryImpl imp } } - private void dequeue() + private boolean dequeue() { EntryState state = _state; - if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE)) + while(state.getState() == State.ACQUIRED && !_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE)) + { + state = _state; + } + + if(state.getState() == State.ACQUIRED) { if (state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState) { @@ -387,7 +392,11 @@ public abstract class QueueEntryImpl imp { notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED); } - + return true; + } + else + { + return false; } } @@ -420,9 +429,10 @@ public abstract class QueueEntryImpl imp public void delete() { - dequeue(); - - dispose(); + if(dequeue()) + { + dispose(); + } } public int routeToAlternate(final Action<? super MessageInstance> action, ServerTransaction txn) Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java Tue Mar 3 14:56:40 2015 @@ -62,7 +62,7 @@ public interface FileKeyStore<X extends @ManagedAttribute(defaultValue = "${this:path}") String getDescription(); - @ManagedAttribute( mandatory = true, secure = true, oversize = true, oversizedAltText = OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT) + @ManagedAttribute( mandatory = true, secure = true, oversize = true, oversizedAltText = OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT, secureValueFilter = "^data\\:.*") String getStoreUrl(); @DerivedAttribute Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStore.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStore.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStore.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStore.java Tue Mar 3 14:56:40 2015 @@ -31,7 +31,7 @@ public interface NonJavaKeyStore<X exten @ManagedAttribute(defaultValue = "${this:subjectName}") String getDescription(); - @ManagedAttribute( mandatory = true, secure = true, oversize = true, oversizedAltText = OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT ) + @ManagedAttribute( mandatory = true, secure = true, oversize = true, oversizedAltText = OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT, secureValueFilter = "^data\\:.*") String getPrivateKeyUrl(); @ManagedAttribute( mandatory = true, oversize = true, oversizedAltText = OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT ) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org