Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/database/AbstractPasswordFilePrincipalDatabase.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/database/AbstractPasswordFilePrincipalDatabase.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/database/AbstractPasswordFilePrincipalDatabase.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/database/AbstractPasswordFilePrincipalDatabase.java
 Tue Mar  3 14:56:40 2015
@@ -22,6 +22,8 @@ package org.apache.qpid.server.security.
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.security.auth.UsernamePrincipal;
+import org.apache.qpid.server.util.BaseAction;
+import org.apache.qpid.server.util.FileHelper;
 
 import javax.security.auth.callback.PasswordCallback;
 import javax.security.auth.login.AccountNotFoundException;
@@ -36,7 +38,6 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
@@ -45,9 +46,9 @@ public abstract class AbstractPasswordFi
     protected static final String DEFAULT_ENCODING = "utf-8";
 
     private final Pattern _regexp = Pattern.compile(":");
-    private final Map<String, U> _userMap = new HashMap<String, U>();
+    private final Map<String, U> _userMap = new HashMap<>();
     private final ReentrantLock _userUpdate = new ReentrantLock();
-    private final Random _random = new Random();
+    private final FileHelper _fileHelper = new FileHelper();
     private File _passwordFile;
 
     public final void open(File passwordFile) throws IOException
@@ -181,7 +182,7 @@ public abstract class AbstractPasswordFi
         try
         {
             _userUpdate.lock();
-            final Map<String, U> newUserMap = new HashMap<String, U>();
+            final Map<String, U> newUserMap = new HashMap<>();
 
             BufferedReader reader = null;
             try
@@ -224,71 +225,33 @@ public abstract class AbstractPasswordFi
 
     protected abstract Logger getLogger();
 
-    protected File createTempFileOnSameFilesystem()
-    {
-        File liveFile = _passwordFile;
-        File tmp;
-
-        do
-        {
-            tmp = new File(liveFile.getPath() + _random.nextInt() + ".tmp");
-        }
-        while(tmp.exists());
 
-        tmp.deleteOnExit();
-        return tmp;
-    }
-
-    protected void swapTempFileToLive(final File temp) throws IOException
+    protected void savePasswordFile() throws IOException
     {
-        File live = _passwordFile;
-        // Remove any existing ".old" file
-        final File old = new File(live.getAbsoluteFile() + ".old");
-        if (old.exists())
+        try
         {
-            old.delete();
-        }
+            _userUpdate.lock();
 
-        // Create an new ".old" file
-        if(!live.renameTo(old))
-        {
-            //unable to rename the existing file to the backup name
-            getLogger().error("Could not backup the existing password file");
-            throw new IOException("Could not backup the existing password 
file");
+            _fileHelper.writeFileSafely(_passwordFile.toPath(), new 
BaseAction<File,IOException>()
+            {
+                @Override
+                public void performAction(File file) throws IOException
+                {
+                    writeToFile(file);
+                }
+            });
         }
-
-        // Move temp file to be the new "live" file
-        if(!temp.renameTo(live))
+        finally
         {
-            //failed to rename the new file to the required filename
-            if(!old.renameTo(live))
-            {
-                //unable to return the backup to required filename
-                getLogger().error(
-                        "Could not rename the new password file into place, 
and unable to restore original file");
-                throw new IOException("Could not rename the new password file 
into place, and unable to restore original file");
-            }
-
-            getLogger().error("Could not rename the new password file into 
place");
-            throw new IOException("Could not rename the new password file into 
place");
+            _userUpdate.unlock();
         }
     }
 
-    protected void savePasswordFile() throws IOException
+    private void writeToFile(File tmp) throws IOException
     {
-        try
-        {
-            _userUpdate.lock();
-
-            BufferedReader reader = null;
-            PrintStream writer = null;
-
-            File tmp = createTempFileOnSameFilesystem();
-
-            try
+            try(PrintStream writer = new PrintStream(tmp);
+                BufferedReader reader = new BufferedReader(new 
FileReader(_passwordFile)))
             {
-                writer = new PrintStream(tmp);
-                reader = new BufferedReader(new FileReader(_passwordFile));
                 String line;
 
                 while ((line = reader.readLine()) != null)
@@ -346,32 +309,6 @@ public abstract class AbstractPasswordFi
                 getLogger().error("Unable to create the new password file: " + 
e);
                 throw new IOException("Unable to create the new password 
file",e);
             }
-            finally
-            {
-
-                try
-                {
-                    if (reader != null)
-                    {
-                        reader.close();
-                    }
-                }
-                finally
-                {
-                    if (writer != null)
-                    {
-                        writer.close();
-                    }
-                }
-
-            }
-
-            swapTempFileToLive(tmp);
-        }
-        finally
-        {
-            _userUpdate.unlock();
-        }
     }
 
     protected abstract U createUserFromPassword(Principal principal, char[] 
passwd);

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/Base64MD5PasswordDatabaseAuthenticationManager.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/Base64MD5PasswordDatabaseAuthenticationManager.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/Base64MD5PasswordDatabaseAuthenticationManager.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/Base64MD5PasswordDatabaseAuthenticationManager.java
 Tue Mar  3 14:56:40 2015
@@ -28,7 +28,7 @@ import org.apache.qpid.server.model.Mana
 import 
org.apache.qpid.server.security.auth.database.Base64MD5PasswordFilePrincipalDatabase;
 import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
 
-@ManagedObject( category = false, type = "Base64MD5PasswordFile" )
+@ManagedObject( category = false, managesChildren = true, type = 
"Base64MD5PasswordFile" )
 public class Base64MD5PasswordDatabaseAuthenticationManager
         extends 
PrincipalDatabaseAuthenticationManager<Base64MD5PasswordDatabaseAuthenticationManager>
 {

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PlainPasswordDatabaseAuthenticationManager.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PlainPasswordDatabaseAuthenticationManager.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PlainPasswordDatabaseAuthenticationManager.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PlainPasswordDatabaseAuthenticationManager.java
 Tue Mar  3 14:56:40 2015
@@ -28,7 +28,7 @@ import org.apache.qpid.server.model.Mana
 import 
org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase;
 import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
 
-@ManagedObject( category = false, type = "PlainPasswordFile" )
+@ManagedObject( category = false, managesChildren = true, type = 
"PlainPasswordFile" )
 public class PlainPasswordDatabaseAuthenticationManager extends 
PrincipalDatabaseAuthenticationManager<PlainPasswordDatabaseAuthenticationManager>
 {
     public static final String PROVIDER_TYPE = "PlainPasswordFile";

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
 Tue Mar  3 14:56:40 2015
@@ -23,6 +23,8 @@ package org.apache.qpid.server.security.
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.security.AccessControlException;
 import java.security.Principal;
 import java.util.Collection;
@@ -40,6 +42,7 @@ import javax.security.sasl.SaslServer;
 
 import org.apache.log4j.Logger;
 
+import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.Broker;
@@ -56,6 +59,7 @@ import org.apache.qpid.server.security.a
 import 
org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
 import org.apache.qpid.server.security.auth.UsernamePrincipal;
 import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
+import org.apache.qpid.server.util.FileHelper;
 
 public abstract class PrincipalDatabaseAuthenticationManager<T extends 
PrincipalDatabaseAuthenticationManager<T>>
         extends AbstractAuthenticationManager<T>
@@ -96,7 +100,11 @@ public abstract class PrincipalDatabaseA
         {
             try
             {
-                passwordFile.createNewFile();
+                Path path = new FileHelper().createNewFile(passwordFile, 
getContextValue(String.class, BrokerProperties.POSIX_FILE_PERMISSIONS));
+                if (!Files.exists(path))
+                {
+                    throw new 
IllegalConfigurationException(String.format("Cannot create password file at 
'%s'", _path));
+                }
             }
             catch (IOException e)
             {

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupDatabase.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupDatabase.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupDatabase.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupDatabase.java
 Tue Mar  3 14:56:40 2015
@@ -34,6 +34,8 @@ import java.util.concurrent.ConcurrentSk
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
+import org.apache.qpid.server.util.BaseAction;
+import org.apache.qpid.server.util.FileHelper;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
 /**
@@ -232,9 +234,9 @@ public class FileGroupDatabase implement
         }
     }
 
-    private synchronized void writeGroupFile(String groupFile) throws 
IOException
+    private synchronized void writeGroupFile(final String groupFile) throws 
IOException
     {
-        Properties propertiesFile = new Properties();
+        final Properties propertiesFile = new Properties();
 
         for (String group : _groupToUserMap.keySet())
         {
@@ -244,19 +246,19 @@ public class FileGroupDatabase implement
             propertiesFile.setProperty(group + ".users", userList);
         }
 
-        String comment = "Written " + new Date();
-        FileOutputStream fileOutputStream = new FileOutputStream(groupFile);
-        try
-        {
-            propertiesFile.store(fileOutputStream, comment);
-        }
-        finally
+
+        new FileHelper().writeFileSafely(new File(groupFile).toPath(), new 
BaseAction<File, IOException>()
         {
-            if(fileOutputStream != null)
+            @Override
+            public void performAction(File file) throws IOException
             {
-                fileOutputStream.close();
+                String comment = "Written " + new Date();
+                try(FileOutputStream fileOutputStream = new 
FileOutputStream(file))
+                {
+                    propertiesFile.store(fileOutputStream, comment);
+                }
             }
-        }
+        });
     }
 
     private void validatePropertyNameIsGroupName(String propertyName)

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
 Tue Mar  3 14:56:40 2015
@@ -27,6 +27,8 @@ import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
 import java.nio.channels.OverlappingFileLockException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -40,6 +42,9 @@ import java.util.Map;
 import java.util.UUID;
 
 import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.util.BaseAction;
+import org.apache.qpid.server.util.FileHelper;
 import org.codehaus.jackson.JsonGenerator;
 import org.codehaus.jackson.JsonProcessingException;
 import org.codehaus.jackson.Version;
@@ -85,6 +90,7 @@ public class JsonFileConfigStore impleme
     private final Map<String, List<UUID>> _idsByType = new HashMap<String, 
List<UUID>>();
     private final ObjectMapper _objectMapper = new ObjectMapper();
     private final Class<? extends ConfiguredObject> _rootClass;
+    private final FileHelper _fileHelper;
 
     private Map<String,Class<? extends ConfiguredObject>> _classNameMapping;
     private String _directoryName;
@@ -123,6 +129,7 @@ public class JsonFileConfigStore impleme
         _objectMapper.registerModule(_module);
         _objectMapper.enable(SerializationConfig.Feature.INDENT_OUTPUT);
         _rootClass = rootClass;
+        _fileHelper = new FileHelper();
     }
 
     @Override
@@ -173,7 +180,7 @@ public class JsonFileConfigStore impleme
             _directoryName = fileFromSettings.getParent();
             _configFileName = fileFromSettings.getName();
             _backupFileName = fileFromSettings.getName() + ".bak";
-            _tempFileName = fileFromSettings.getName() + ".tmp";;
+            _tempFileName = fileFromSettings.getName() + ".tmp";
 
             _lockFileName = fileFromSettings.getName() + ".lck";
         }
@@ -191,56 +198,45 @@ public class JsonFileConfigStore impleme
         checkDirectoryIsWritable(_directoryName);
         getFileLock();
 
-        if(!fileExists(_configFileName))
+        Path storeFile = new File(_directoryName, _configFileName).toPath();
+        Path backupFile = new File(_directoryName, _backupFileName).toPath();
+        if(!Files.exists(storeFile))
         {
-            if(!fileExists(_backupFileName))
+            if(!Files.exists(backupFile))
             {
-                File newFile = new File(_directoryName, _configFileName);
                 try
                 {
-                    _objectMapper.writeValue(newFile, Collections.emptyMap());
+                    String posixFileAttributes = 
_parent.getContextValue(String.class, BrokerProperties.POSIX_FILE_PERMISSIONS);
+                    storeFile = _fileHelper.createNewFile(storeFile, 
posixFileAttributes);
+                    _objectMapper.writeValue(storeFile.toFile(), 
Collections.emptyMap());
                 }
                 catch (IOException e)
                 {
-                    throw new StoreException("Could not write configuration 
file " + newFile, e);
+                    throw new StoreException("Could not write configuration 
file " + storeFile, e);
                 }
             }
             else
             {
-                renameFile(_backupFileName, _configFileName);
+                try
+                {
+                    _fileHelper.atomicFileMoveOrReplace(backupFile, storeFile);
+                }
+                catch (IOException e)
+                {
+                    throw new StoreException("Could not move backup to 
configuration file " + storeFile, e);
+                }
             }
         }
-        deleteFileIfExists(_backupFileName);
-    }
 
-    private void renameFile(String fromFileName, String toFileName)
-    {
-        File toFile = deleteFileIfExists(toFileName);
-        File fromFile = new File(_directoryName, fromFileName);
-
-        if(!fromFile.renameTo(toFile))
+        try
         {
-            throw new StoreException("Cannot rename file " + 
fromFile.getAbsolutePath() + " to " + toFile.getAbsolutePath());
+            Files.deleteIfExists(backupFile);
         }
-    }
-
-    private File deleteFileIfExists(final String toFileName)
-    {
-        File toFile = new File(_directoryName, toFileName);
-        if(toFile.exists())
+        catch (IOException e)
         {
-            if(!toFile.delete())
-            {
-                throw new StoreException("Cannot delete file " + 
toFile.getAbsolutePath());
-            }
+            throw new StoreException("Could not delete backup file " + 
backupFile, e);
         }
-        return toFile;
-    }
 
-    private boolean fileExists(String fileName)
-    {
-        File file = new File(_directoryName, fileName);
-        return file.exists();
     }
 
     private void getFileLock()
@@ -396,7 +392,7 @@ public class JsonFileConfigStore impleme
     private void save()
     {
         UUID rootId = getRootId();
-        Map<String, Object> data = null;
+        final Map<String, Object> data;
 
         if (rootId == null)
         {
@@ -409,15 +405,18 @@ public class JsonFileConfigStore impleme
 
         try
         {
-            deleteFileIfExists(_tempFileName);
-            deleteFileIfExists(_backupFileName);
-
-            File tmpFile = new File(_directoryName, _tempFileName);
-            _objectMapper.writeValue(tmpFile, data);
-            renameFile(_configFileName, _backupFileName);
-            renameFile(tmpFile.getName(),_configFileName);
-            tmpFile.delete();
-            deleteFileIfExists(_backupFileName);
+            Path tmpFile = new File(_directoryName, _tempFileName).toPath();
+            _fileHelper.writeFileSafely( new File(_directoryName, 
_configFileName).toPath(),
+                    new File(_directoryName, _backupFileName).toPath(),
+                    tmpFile,
+                    new BaseAction<File, IOException>()
+                    {
+                        @Override
+                        public void performAction(File file) throws IOException
+                        {
+                            _objectMapper.writeValue(file, data);
+                        }
+                    });
         }
         catch (IOException e)
         {

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
 Tue Mar  3 14:56:40 2015
@@ -20,8 +20,10 @@
  */
 package org.apache.qpid.server.store;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -30,14 +32,19 @@ import java.util.UUID;
 
 import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import 
org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener;
 import org.apache.qpid.server.filter.FilterSupport;
 import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.ConfigurationChangeListener;
+import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.VirtualHostNode;
 import org.apache.qpid.server.queue.QueueArgumentsConverter;
+import org.apache.qpid.server.util.Action;
 
 public class VirtualHostStoreUpgraderAndRecoverer
 {
@@ -509,12 +516,100 @@ public class VirtualHostStoreUpgraderAnd
 
     }
 
-    public void perform(DurableConfigurationStore durableConfigurationStore)
+    public void perform(final DurableConfigurationStore 
durableConfigurationStore)
     {
         String virtualHostCategory = VirtualHost.class.getSimpleName();
         GenericStoreUpgrader upgraderHandler = new 
GenericStoreUpgrader(virtualHostCategory, VirtualHost.MODEL_VERSION, 
durableConfigurationStore, _upgraders);
         upgraderHandler.upgrade();
 
         new 
GenericRecoverer(_virtualHostNode).recover(upgraderHandler.getRecords());
+
+        final StoreConfigurationChangeListener configChangeListener = new 
StoreConfigurationChangeListener(durableConfigurationStore);
+        if(_virtualHostNode.getVirtualHost() != null)
+        {
+            applyRecursively(_virtualHostNode.getVirtualHost(), new 
Action<ConfiguredObject<?>>()
+            {
+                @Override
+                public void performAction(final ConfiguredObject<?> object)
+                {
+                    object.addChangeListener(configChangeListener);
+                }
+            });
+        }
+        _virtualHostNode.addChangeListener(new ConfigurationChangeListener()
+        {
+            @Override
+            public void stateChanged(final ConfiguredObject<?> object, final 
State oldState, final State newState)
+            {
+
+            }
+
+            @Override
+            public void childAdded(final ConfiguredObject<?> object, final 
ConfiguredObject<?> child)
+            {
+                if(child instanceof VirtualHost)
+                {
+                    applyRecursively(child, new Action<ConfiguredObject<?>>()
+                    {
+                        @Override
+                        public void performAction(final ConfiguredObject<?> 
object)
+                        {
+                            if(object.isDurable())
+                            {
+                                durableConfigurationStore.update(true, 
object.asObjectRecord());
+                                object.addChangeListener(configChangeListener);
+                            }
+                        }
+                    });
+
+                }
+            }
+
+            @Override
+            public void childRemoved(final ConfiguredObject<?> object, final 
ConfiguredObject<?> child)
+            {
+                if(child instanceof VirtualHost)
+                {
+                    child.removeChangeListener(configChangeListener);
+                }
+            }
+
+            @Override
+            public void attributeSet(final ConfiguredObject<?> object,
+                                     final String attributeName,
+                                     final Object oldAttributeValue,
+                                     final Object newAttributeValue)
+            {
+
+            }
+        });
+    }
+
+    private void applyRecursively(final ConfiguredObject<?> object, final 
Action<ConfiguredObject<?>> action)
+    {
+        applyRecursively(object, action, new HashSet<ConfiguredObject<?>>());
+    }
+
+    private void applyRecursively(final ConfiguredObject<?> object,
+                                  final Action<ConfiguredObject<?>> action,
+                                  final HashSet<ConfiguredObject<?>> visited)
+    {
+        if(!visited.contains(object))
+        {
+            visited.add(object);
+            action.performAction(object);
+            for(Class<? extends ConfiguredObject> childClass : 
object.getModel().getChildTypes(object.getCategoryClass()))
+            {
+                Collection<? extends ConfiguredObject> children = 
object.getChildren(childClass);
+                if(children != null)
+                {
+                    for(ConfiguredObject<?> child : children)
+                    {
+                        applyRecursively(child, action, visited);
+                    }
+                }
+            }
+        }
     }
+
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
 Tue Mar  3 14:56:40 2015
@@ -36,8 +36,13 @@ import java.util.concurrent.ScheduledThr
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.slf4j.LoggerFactory;
+
+
 public class SelectorThread extends Thread
 {
+    private static final org.slf4j.Logger LOGGER = 
LoggerFactory.getLogger(SelectorThread.class);
+
     public static final String IO_THREAD_NAME_PREFIX  = "NCS-";
     private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>();
     private final Queue<NonBlockingConnection> _unregisteredConnections = new 
ConcurrentLinkedQueue<>();
@@ -165,7 +170,8 @@ public class SelectorThread extends Thre
                     NonBlockingConnection connection = iterator.next();
 
                     int period = 
connection.getTicker().getTimeToNextTick(currentTime);
-                    if (period < 0 || connection.isStateChanged())
+
+                    if (period <= 0 || connection.isStateChanged())
                     {
                         toBeScheduled.add(connection);
                         connection.getSocketChannel().register(_selector, 
0).cancel();

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java
 Tue Mar  3 14:56:40 2015
@@ -20,7 +20,7 @@
  */
 package org.apache.qpid.server.util;
 
-public interface Action<T>
+public interface Action<T> extends BaseAction<T, RuntimeException>
 {
     void performAction(T object);
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
 Tue Mar  3 14:56:40 2015
@@ -63,6 +63,8 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.*;
 import org.apache.qpid.server.model.adapter.ConnectionAdapter;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.plugin.ConnectionValidator;
 import org.apache.qpid.server.plugin.QpidServiceLoader;
 import org.apache.qpid.server.plugin.SystemNodeCreator;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
@@ -75,7 +77,6 @@ import org.apache.qpid.server.security.S
 import org.apache.qpid.server.security.access.Operation;
 import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
-import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.Event;
 import org.apache.qpid.server.store.EventListener;
@@ -94,6 +95,8 @@ import org.apache.qpid.server.util.MapVa
 public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> 
extends AbstractConfiguredObject<X>
         implements VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>, 
IConnectionRegistry.RegistryChangeListener, EventListener
 {
+    private final Collection<ConnectionValidator> _connectionValidators = new 
ArrayList<>();
+
     private static enum BlockingType { STORE, FILESYSTEM };
 
     private static final String USE_ASYNC_RECOVERY = 
"use_async_message_store_recovery";
@@ -162,6 +165,14 @@ public abstract class AbstractVirtualHos
     @ManagedAttributeField
     private int _housekeepingThreadCount;
 
+    @ManagedAttributeField
+    private List<String> _enabledConnectionValidators;
+
+    @ManagedAttributeField
+    private List<String> _disabledConnectionValidators;
+
+    @ManagedAttributeField
+    private List<String> _globalAddressDomains;
 
     private boolean _useAsyncRecoverer;
 
@@ -212,6 +223,13 @@ public abstract class AbstractVirtualHos
         {
             throw new IllegalArgumentException(getClass().getSimpleName() + " 
must be durable");
         }
+        if(getGlobalAddressDomains() != null)
+        {
+            for(String domain : getGlobalAddressDomains())
+            {
+                validateGlobalAddressDomain(domain);
+            }
+        }
     }
 
     @Override
@@ -230,6 +248,26 @@ public abstract class AbstractVirtualHos
                 throw new IntegrityViolationException("Cannot delete default 
virtual host '" + getName() + "'");
             }
         }
+        if(changedAttributes.contains(GLOBAL_ADDRESS_DOMAINS))
+        {
+            VirtualHost<?, ?, ?> virtualHost = (VirtualHost<?, ?, ?>) 
proxyForValidation;
+            if(virtualHost.getGlobalAddressDomains() != null)
+            {
+                for(String name : virtualHost.getGlobalAddressDomains())
+                {
+                    validateGlobalAddressDomain(name);
+                }
+            }
+        }
+    }
+
+    private void validateGlobalAddressDomain(final String name)
+    {
+        String regex = "/(/?)([\\w_\\-:.\\$]+/)*[\\w_\\-:.\\$]+";
+        if(!name.matches(regex))
+        {
+            throw new IllegalArgumentException("'"+name+"' is not a valid 
global address domain");
+        }
     }
 
     @Override
@@ -243,8 +281,17 @@ public abstract class AbstractVirtualHos
     {
         super.validateOnCreate();
         validateMessageStoreCreation();
+        if(getGlobalAddressDomains() != null)
+        {
+            for(String name : getGlobalAddressDomains())
+            {
+                validateGlobalAddressDomain(name);
+            }
+        }
     }
 
+
+
     private void validateMessageStoreCreation()
     {
         MessageStore store = createMessageStore();
@@ -293,10 +340,20 @@ public abstract class AbstractVirtualHos
         _messageStore.addEventListener(this, 
Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
         _messageStore.addEventListener(this, 
Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
 
-        addChangeListener(new StoreUpdatingChangeListener());
+        _fileSystemMaxUsagePercent = getContextValue(Integer.class, 
Broker.STORE_FILESYSTEM_MAX_USAGE_PERCENT);
 
 
-        _fileSystemMaxUsagePercent = getContextValue(Integer.class, 
Broker.STORE_FILESYSTEM_MAX_USAGE_PERCENT);
+        QpidServiceLoader serviceLoader = new QpidServiceLoader();
+        for(ConnectionValidator validator : 
serviceLoader.instancesOf(ConnectionValidator.class))
+        {
+            if((_enabledConnectionValidators.isEmpty()
+                && (_disabledConnectionValidators.isEmpty()) || 
!_disabledConnectionValidators.contains(validator.getType()))
+               || _enabledConnectionValidators.contains(validator.getType()))
+            {
+                _connectionValidators.add(validator);
+            }
+
+        }
     }
 
     private void checkVHostStateIsActive()
@@ -438,6 +495,20 @@ public abstract class AbstractVirtualHos
         return _eventLogger;
     }
 
+    @Override
+    public boolean authoriseCreateConnection(final AMQConnectionModel<?, ?> 
connection)
+    {
+        getSecurityManager().authoriseCreateConnection(connection);
+        for(ConnectionValidator validator : _connectionValidators)
+        {
+            if(!validator.validateConnectionCreation(connection))
+            {
+                return false;
+            }
+        }
+        return true;
+    }
+
     /**
      * Initialise a housekeeping task to iterate over queues cleaning expired 
messages with no consumers
      * and checking for idle or open transactions that have exceeded the 
permitted thresholds.
@@ -526,9 +597,42 @@ public abstract class AbstractVirtualHos
     }
 
     @Override
+    public List<String> getEnabledConnectionValidators()
+    {
+        return _enabledConnectionValidators;
+    }
+
+    @Override
+    public List<String> getDisabledConnectionValidators()
+    {
+        return _disabledConnectionValidators;
+    }
+
+    @Override
+    public List<String> getGlobalAddressDomains()
+    {
+        return _globalAddressDomains;
+    }
+
+    @Override
     public AMQQueue<?> getQueue(String name)
     {
-        return (AMQQueue<?>) getChildByName(Queue.class, name);
+        AMQQueue<?> childByName = (AMQQueue<?>) getChildByName(Queue.class, 
name);
+        if(childByName == null && getGlobalAddressDomains() != null)
+        {
+            for(String domain : getGlobalAddressDomains())
+            {
+                if(name.startsWith(domain + "/"))
+                {
+                    childByName = (AMQQueue<?>) 
getChildByName(Queue.class,name.substring(domain.length()));
+                    if(childByName != null)
+                    {
+                        break;
+                    }
+                }
+            }
+        }
+        return childByName;
     }
 
     @Override
@@ -556,14 +660,6 @@ public abstract class AbstractVirtualHos
     {
         int purged = queue.deleteAndReturnCount();
 
-        if (queue.isDurable() && !(queue.getLifetimePolicy()
-                                   == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
-                                   || queue.getLifetimePolicy()
-                                      == LifetimePolicy.DELETE_ON_SESSION_END))
-        {
-            DurableConfigurationStore store = getDurableConfigurationStore();
-            store.remove(queue.asObjectRecord());
-        }
         return purged;
 }
 
@@ -614,7 +710,22 @@ public abstract class AbstractVirtualHos
     @Override
     public ExchangeImpl getExchange(String name)
     {
-        return getChildByName(ExchangeImpl.class,name);
+        ExchangeImpl childByName = getChildByName(ExchangeImpl.class, name);
+        if(childByName == null && getGlobalAddressDomains() != null)
+        {
+            for(String domain : getGlobalAddressDomains())
+            {
+                if(name.startsWith(domain + "/"))
+                {
+                    childByName = 
getChildByName(ExchangeImpl.class,name.substring(domain.length()));
+                    if(childByName != null)
+                    {
+                        break;
+                    }
+                }
+            }
+        }
+        return childByName;
     }
 
     @Override
@@ -671,6 +782,23 @@ public abstract class AbstractVirtualHos
         exchange.deleteWithChecks();
     }
 
+    @Override
+    public String getLocalAddress(final String routingAddress)
+    {
+        String localAddress = routingAddress;
+        if(getGlobalAddressDomains() != null)
+        {
+            for(String domain : getGlobalAddressDomains())
+            {
+                if(localAddress.length() > routingAddress.length() - 
domain.length() && routingAddress.startsWith(domain + "/"))
+                {
+                    localAddress = routingAddress.substring(domain.length());
+                }
+            }
+        }
+        return localAddress;
+    }
+
     public SecurityManager getSecurityManager()
     {
         return _broker.getSecurityManager();
@@ -886,6 +1014,12 @@ public abstract class AbstractVirtualHos
         }
     }
 
+    @Override
+    public String getRedirectHost(final AmqpPort<?> port)
+    {
+        return null;
+    }
+
     private class VirtualHostHouseKeepingTask extends HouseKeepingTask
     {
         public VirtualHostHouseKeepingTask()
@@ -1390,14 +1524,6 @@ public abstract class AbstractVirtualHos
         return total;
     }
 
-    @Override
-    protected void onCreate()
-    {
-        super.onCreate();
-        ConfiguredObjectRecord record = asObjectRecord();
-        getDurableConfigurationStore().create(new 
ConfiguredObjectRecordImpl(record.getId(), record.getType(), 
record.getAttributes()));
-    }
-
     @StateTransition( currentState = { State.UNINITIALIZED,State.ERRORED }, 
desiredState = State.ACTIVE )
     private void onActivate()
     {
@@ -1509,44 +1635,6 @@ public abstract class AbstractVirtualHos
         onActivate();
     }
 
-    private class StoreUpdatingChangeListener implements 
ConfigurationChangeListener
-    {
-        @Override
-        public void stateChanged(final ConfiguredObject<?> object, final State 
oldState, final State newState)
-        {
-            if (object == AbstractVirtualHost.this && isDurable() && newState 
== State.DELETED)
-            {
-                getDurableConfigurationStore().remove(asObjectRecord());
-                object.removeChangeListener(this);
-            }
-        }
-
-        @Override
-        public void childAdded(final ConfiguredObject<?> object, final 
ConfiguredObject<?> child)
-        {
-
-        }
-
-        @Override
-        public void childRemoved(final ConfiguredObject<?> object, final 
ConfiguredObject<?> child)
-        {
-
-        }
-
-        @Override
-        public void attributeSet(final ConfiguredObject<?> object,
-                                 final String attributeName,
-                                 final Object oldAttributeValue,
-                                 final Object newAttributeValue)
-        {
-            if (object == AbstractVirtualHost.this && isDurable() && 
getState() != State.DELETED && isAttributePersisted(attributeName)
-                    && !(attributeName.equals(VirtualHost.DESIRED_STATE) && 
newAttributeValue.equals(State.DELETED)))
-            {
-                getDurableConfigurationStore().update(false, asObjectRecord());
-            }
-        }
-    }
-
     private class FileSystemSpaceChecker extends HouseKeepingTask
     {
         private boolean _fileSystemFull;

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
 Tue Mar  3 14:56:40 2015
@@ -33,6 +33,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.NoFactoryForTypeException;
 import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.LinkRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.security.SecurityManager;
@@ -108,4 +109,7 @@ public interface VirtualHostImpl< X exte
 
     EventLogger getEventLogger();
 
+    boolean authoriseCreateConnection(AMQConnectionModel<?, ?> connection);
+
+    String getLocalAddress(String routingAddress);
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java
 Tue Mar  3 14:56:40 2015
@@ -309,4 +309,14 @@ public class BrokerOptionsTest extends Q
         assertEquals("unexpected number of entries", 2, props.keySet().size());
         assertEquals("value", props.get("name"));
     }
+
+
+    public void testSetInitialSystemProperties()
+    {
+        assertNull("Unexpected default value for initial system properties", 
_options.getInitialSystemProperties());
+
+        _options.setInitialSystemProperties("test.properties");
+
+        assertEquals("test.properties", _options.getInitialSystemProperties());
+    }
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/binding/BindingImplTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/binding/BindingImplTest.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/binding/BindingImplTest.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/binding/BindingImplTest.java
 Tue Mar  3 14:56:40 2015
@@ -35,6 +35,8 @@ import org.apache.qpid.server.model.Bind
 import org.apache.qpid.server.model.BrokerModel;
 import org.apache.qpid.server.model.Model;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class BindingImplTest extends QpidTestCase
@@ -57,7 +59,11 @@ public class BindingImplTest extends Qpi
         attributes.put(Binding.ARGUMENTS, arguments);
         attributes.put(Binding.NAME, getTestName());
         AMQQueue queue = mock(AMQQueue.class);
+        VirtualHostImpl vhost = mock(VirtualHostImpl.class);
+        SecurityManager securityManager = mock(SecurityManager.class);
+        when(vhost.getSecurityManager()).thenReturn(securityManager);
         when(queue.getTaskExecutor()).thenReturn(_taskExecutor);
+        when(queue.getVirtualHost()).thenReturn(vhost);
         when(queue.getModel()).thenReturn(_model);
         ExchangeImpl exchange = mock(ExchangeImpl.class);
         when(exchange.getTaskExecutor()).thenReturn(_taskExecutor);

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java
 Tue Mar  3 14:56:40 2015
@@ -57,6 +57,7 @@ public class StoreConfigurationChangeLis
         notifyBrokerStarted();
         UUID id = UUID.randomUUID();
         ConfiguredObject object = mock(VirtualHost.class);
+        when(object.isDurable()).thenReturn(true);
         when(object.getId()).thenReturn(id);
         ConfiguredObjectRecord record = mock(ConfiguredObjectRecord.class);
         when(object.asObjectRecord()).thenReturn(record);
@@ -69,11 +70,13 @@ public class StoreConfigurationChangeLis
         notifyBrokerStarted();
         Broker broker = mock(Broker.class);
         when(broker.getCategoryClass()).thenReturn(Broker.class);
+        when(broker.isDurable()).thenReturn(true);
         VirtualHost child = mock(VirtualHost.class);
         when(child.getCategoryClass()).thenReturn(VirtualHost.class);
         Model model = mock(Model.class);
         
when(model.getChildTypes(any(Class.class))).thenReturn(Collections.<Class<? 
extends ConfiguredObject>>emptyList());
         when(child.getModel()).thenReturn(model);
+        when(child.isDurable()).thenReturn(true);
         _listener.childAdded(broker, child);
         verify(_store).update(eq(true), any(ConfiguredObjectRecord.class));
     }
@@ -83,15 +86,17 @@ public class StoreConfigurationChangeLis
         notifyBrokerStarted();
         Broker broker = mock(Broker.class);
         when(broker.getCategoryClass()).thenReturn(Broker.class);
+        when(broker.isDurable()).thenReturn(true);
         _listener.attributeSet(broker, Broker.CONNECTION_SESSION_COUNT_LIMIT, 
null, 1);
         verify(_store).update(eq(false),any(ConfiguredObjectRecord.class));
     }
 
-    public void testChildAddedForVirtualHostNode()
+    public void testChildAddedWhereParentManagesChildStorage()
     {
         notifyBrokerStarted();
 
         VirtualHostNode<?> object = mock(VirtualHostNode.class);
+        when(object.managesChildStorage()).thenReturn(true);
         VirtualHost<?,?,?> virtualHost = mock(VirtualHost.class);
         _listener.childAdded(object, virtualHost);
         verifyNoMoreInteractions(_store);

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
 Tue Mar  3 14:56:40 2015
@@ -38,7 +38,6 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.filter.MessageFilter;
-import org.apache.qpid.server.filter.SimpleFilterManager;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
@@ -103,16 +102,23 @@ public class MockConsumer implements Con
     {
         if(_messageIds != null)
         {
-            SimpleFilterManager filters = new SimpleFilterManager();
-            filters.add(new MessageFilter()
+            FilterManager filters = new FilterManager();
+            MessageFilter filter = new MessageFilter()
             {
                 @Override
+                public String getName()
+                {
+                    return "";
+                }
+
+                @Override
                 public boolean matches(final Filterable message)
                 {
                     final String messageId = 
message.getMessageHeader().getMessageId();
                     return _messageIds.contains(messageId);
                 }
-            });
+            };
+            filters.add(filter.getName(), filter);
             return filters;
         }
         else

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
 Tue Mar  3 14:56:40 2015
@@ -45,6 +45,7 @@ import org.mockito.stubbing.Answer;
 
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.protocol.AMQConstant;
+import 
org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener;
 import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import 
org.apache.qpid.server.connection.IConnectionRegistry.RegistryChangeListener;
@@ -66,6 +67,7 @@ public class VirtualHostTest extends Qpi
     private VirtualHostNode<?> _virtualHostNode;
     private DurableConfigurationStore _configStore;
     private VirtualHost<?, ?, ?> _virtualHost;
+    private StoreConfigurationChangeListener _storeConfigurationChangeListener;
 
     @Override
     protected void setUp() throws Exception
@@ -79,9 +81,13 @@ public class VirtualHostTest extends Qpi
         when(_broker.getTaskExecutor()).thenReturn(_taskExecutor);
 
         _virtualHostNode = mock(VirtualHostNode.class);
+        when(_virtualHostNode.isDurable()).thenReturn(true);
         _configStore = mock(DurableConfigurationStore.class);
+        _storeConfigurationChangeListener = new 
StoreConfigurationChangeListener(_configStore);
+
         
when(_virtualHostNode.getConfigurationStore()).thenReturn(_configStore);
 
+
         // Virtualhost needs the EventLogger from the SystemContext.
         when(_virtualHostNode.getParent(Broker.class)).thenReturn(_broker);
 
@@ -122,7 +128,7 @@ public class VirtualHostTest extends Qpi
         assertEquals("Unexpected name", virtualHostName, 
virtualHost.getName());
         assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
 
-        verify(_configStore).create(matchesRecord(virtualHost.getId(), 
virtualHost.getType()));
+        
verify(_configStore).update(eq(true),matchesRecord(virtualHost.getId(), 
virtualHost.getType()));
     }
 
     public void testDeleteVirtualHost()
@@ -170,7 +176,7 @@ public class VirtualHostTest extends Qpi
         virtualHost.start();
         assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
 
-        verify(_configStore, 
times(1)).create(matchesRecord(virtualHost.getId(), virtualHost.getType()));
+        verify(_configStore, times(1)).update(eq(true), 
matchesRecord(virtualHost.getId(), virtualHost.getType()));
         verify(_configStore, times(2)).update(eq(false), 
matchesRecord(virtualHost.getId(), virtualHost.getType()));
     }
 
@@ -293,7 +299,7 @@ public class VirtualHostTest extends Qpi
         assertNotNull(queue.getId());
         assertEquals(queueName, queue.getName());
 
-        verify(_configStore).create(matchesRecord(queue.getId(), 
queue.getType()));
+        verify(_configStore).update(eq(true),matchesRecord(queue.getId(), 
queue.getType()));
     }
 
     public void testCreateNonDurableQueue()
@@ -396,7 +402,10 @@ public class VirtualHostTest extends Qpi
         attributes.put(VirtualHost.TYPE, 
TestMemoryVirtualHost.VIRTUAL_HOST_TYPE);
 
         TestMemoryVirtualHost host = new TestMemoryVirtualHost(attributes, 
_virtualHostNode);
+        host.addChangeListener(_storeConfigurationChangeListener);
         host.create();
+        // Fire the child added event on the node
+        _storeConfigurationChangeListener.childAdded(_virtualHostNode,host);
         _virtualHost = host;
         return host;
     }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/singleton/AbstractConfiguredObjectTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/singleton/AbstractConfiguredObjectTest.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/singleton/AbstractConfiguredObjectTest.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/singleton/AbstractConfiguredObjectTest.java
 Tue Mar  3 14:56:40 2015
@@ -22,14 +22,18 @@ import java.security.PrivilegedAction;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.security.auth.Subject;
 
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.ConfigurationChangeListener;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Model;
+import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.test.utils.QpidTestCase;
 
@@ -476,4 +480,84 @@ public class AbstractConfiguredObjectTes
 
 
     }
+
+    public void testAttributeSetListenerFiring()
+    {
+        final String objectName = "listenerFiring";
+
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put(ConfiguredObject.NAME, objectName);
+        attributes.put(TestSingleton.STRING_VALUE, "first");
+
+        final TestSingleton object = 
_model.getObjectFactory().create(TestSingleton.class, attributes);
+
+        final AtomicInteger listenerCount = new AtomicInteger();
+        final LinkedHashMap<String, String> updates = new LinkedHashMap<>();
+        object.addChangeListener(new NoopConfigurationChangeListener()
+        {
+            @Override
+            public void attributeSet(final ConfiguredObject<?> object,
+                                     final String attributeName,
+                                     final Object oldAttributeValue,
+                                     final Object newAttributeValue)
+            {
+                listenerCount.incrementAndGet();
+                String delta = String.valueOf(oldAttributeValue) + "=>" + 
String.valueOf(newAttributeValue);
+                updates.put(attributeName, delta);
+            }
+        });
+
+        // Set updated value (should cause listener to fire)
+        
object.setAttributes(Collections.singletonMap(TestSingleton.STRING_VALUE, 
"second"));
+
+        assertEquals(1, listenerCount.get());
+        String delta = updates.remove(TestSingleton.STRING_VALUE);
+        assertEquals("first=>second", delta);
+
+        // Set unchanged value (should not cause listener to fire)
+        
object.setAttributes(Collections.singletonMap(TestSingleton.STRING_VALUE, 
"second"));
+        assertEquals(1, listenerCount.get());
+
+        // Set value to null (should cause listener to fire)
+        
object.setAttributes(Collections.singletonMap(TestSingleton.STRING_VALUE, 
null));
+        assertEquals(2, listenerCount.get());
+        delta = updates.remove(TestSingleton.STRING_VALUE);
+        assertEquals("second=>null", delta);
+
+        // Set to null again (should not cause listener to fire)
+        
object.setAttributes(Collections.singletonMap(TestSingleton.STRING_VALUE, 
null));
+        assertEquals(2, listenerCount.get());
+
+        // Set updated value (should cause listener to fire)
+        
object.setAttributes(Collections.singletonMap(TestSingleton.STRING_VALUE, 
"third"));
+        assertEquals(3, listenerCount.get());
+        delta = updates.remove(TestSingleton.STRING_VALUE);
+        assertEquals("null=>third", delta);
+    }
+
+    private static class NoopConfigurationChangeListener implements 
ConfigurationChangeListener
+    {
+        @Override
+        public void stateChanged(final ConfiguredObject<?> object, final State 
oldState, final State newState)
+        {
+        }
+
+        @Override
+        public void childAdded(final ConfiguredObject<?> object, final 
ConfiguredObject<?> child)
+        {
+        }
+
+        @Override
+        public void childRemoved(final ConfiguredObject<?> object, final 
ConfiguredObject<?> child)
+        {
+        }
+
+        @Override
+        public void attributeSet(final ConfiguredObject<?> object,
+                                 final String attributeName,
+                                 final Object oldAttributeValue,
+                                 final Object newAttributeValue)
+        {
+        }
+    }
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
 Tue Mar  3 14:56:40 2015
@@ -93,6 +93,7 @@ public class LastValueQueueListTest exte
         ServerMessage message = createTestServerMessage(null);
 
         QueueEntry addedEntry = _list.add(message);
+        addedEntry.acquire();
         addedEntry.delete();
 
         int numberOfEntries = countEntries(_list);
@@ -113,6 +114,7 @@ public class LastValueQueueListTest exte
         ServerMessage message = createTestServerMessage(TEST_KEY_VALUE);
 
         QueueEntry addedEntry = _list.add(message);
+        addedEntry.acquire();
         addedEntry.delete();
 
         int numberOfEntries = countEntries(_list);
@@ -173,6 +175,7 @@ public class LastValueQueueListTest exte
         assertEquals(1, countEntries(_list));
         assertEquals(1, _list.getLatestValuesMap().size());
 
+        addedEntry.acquire();
         addedEntry.delete();
 
         assertEquals(0, countEntries(_list));
@@ -193,7 +196,9 @@ public class LastValueQueueListTest exte
         assertEquals(2, countEntries(_list));
         assertEquals(2, _list.getLatestValuesMap().size());
 
+        addedEntry1.acquire();
         addedEntry1.delete();
+        addedEntry2.acquire();
         addedEntry2.delete();
 
         assertEquals(0, countEntries(_list));

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
 Tue Mar  3 14:56:40 2015
@@ -113,6 +113,7 @@ public abstract class QueueEntryImplTest
      */
     private void delete()
     {
+        _queueEntry.acquire();
         _queueEntry.delete();
         assertTrue("Queue entry should be in DELETED state after invoking of 
delete method",
                 _queueEntry.isDeleted());

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
 Tue Mar  3 14:56:40 2015
@@ -196,6 +196,7 @@ public abstract class QueueEntryListTest
         final QueueEntry head = getTestList().getHead();
 
         final QueueEntry first = getTestList().next(head);
+        first.acquire();
         first.delete();
 
         final QueueEntry second = getTestList().next(head);
@@ -226,6 +227,7 @@ public abstract class QueueEntryListTest
         assertNull(list.next(queueEntry2));
 
         //'delete' the 2nd QueueEntry
+        queueEntry2.acquire();
         queueEntry2.delete();
         assertTrue("Deleting node should have succeeded", 
queueEntry2.isDeleted());
 

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
 Tue Mar  3 14:56:40 2015
@@ -109,6 +109,7 @@ public class SimpleQueueEntryImplTest ex
     public void testTraverseWithDeletedEntries()
     {
         // Delete 2nd queue entry
+        _queueEntry2.acquire();
         _queueEntry2.delete();
         assertTrue(_queueEntry2.isDeleted());
 

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
 Tue Mar  3 14:56:40 2015
@@ -137,6 +137,7 @@ public class SortedQueueEntryTest extend
     public void testTraverseWithDeletedEntries()
     {
         // Delete 2nd queue entry
+        _queueEntry3.acquire();
         _queueEntry3.delete();
         assertTrue(_queueEntry3.isDeleted());
 

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
 Tue Mar  3 14:56:40 2015
@@ -155,7 +155,7 @@ public class StandardQueueEntryListTest
 
     public void testScavenge() throws Exception
     {
-        OrderedQueueEntryList sqel = new StandardQueueEntryList(null);
+        OrderedQueueEntryList sqel = new 
StandardQueueEntryList(mock(StandardQueueImpl.class));
         ConcurrentMap<Integer,QueueEntry> entriesMap = new 
ConcurrentHashMap<Integer,QueueEntry>();
 
 
@@ -215,6 +215,7 @@ public class StandardQueueEntryListTest
     {
         QueueEntry entry = entriesMap.remove(pos);
         boolean wasDeleted = entry.isDeleted();
+        entry.acquire();
         entry.delete();
         return entry.isDeleted() && !wasDeleted;
     }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
 Tue Mar  3 14:56:40 2015
@@ -184,7 +184,8 @@ public class ServerConnectionDelegate ex
             vhostName = "";
         }
 
-        vhost = ((AmqpPort)sconn.getPort()).getVirtualHost(vhostName);
+        AmqpPort port = (AmqpPort) sconn.getPort();
+        vhost = port.getVirtualHost(vhostName);
 
 
 
@@ -193,14 +194,28 @@ public class ServerConnectionDelegate ex
             if (vhost.getState() != State.ACTIVE)
             {
                 sconn.setState(Connection.State.CLOSING);
-                sconn.invoke(new 
ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Virtual host 
'"+vhostName+"' is not active"));
+                final String redirectHost = vhost.getRedirectHost(port);
+                if(redirectHost == null)
+                {
+                    sconn.invoke(new 
ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED,
+                                                     "Virtual host '" + 
vhostName + "' is not active"));
+                }
+                else
+                {
+                    sconn.invoke(new ConnectionRedirect(redirectHost, new 
ArrayList<Object>()));
+                }
                 return;
             }
 
             sconn.setVirtualHost(vhost);
             try
             {
-                vhost.getSecurityManager().authoriseCreateConnection(sconn);
+                if(!vhost.authoriseCreateConnection(sconn))
+                {
+                    sconn.setState(Connection.State.CLOSING);
+                    sconn.invoke(new 
ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Connection not 
authorized"));
+                    return;
+                }
             }
             catch (AccessControlException e)
             {
@@ -215,7 +230,8 @@ public class ServerConnectionDelegate ex
         else
         {
             sconn.setState(Connection.State.CLOSING);
-            sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, 
"Unknown virtualhost '"+vhostName+"'"));
+            sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH,
+                                             "Unknown virtualhost '" + 
vhostName + "'"));
         }
 
     }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
 Tue Mar  3 14:56:40 2015
@@ -33,6 +33,7 @@ import java.util.UUID;
 
 import org.apache.log4j.Logger;
 
+import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.protocol.ServerProtocolEngine;
@@ -40,8 +41,10 @@ import org.apache.qpid.server.consumer.C
 import org.apache.qpid.server.exchange.ExchangeImpl;
 import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.filter.ArrivalTimeFilter;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
+import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.message.InstanceProperties;
@@ -259,6 +262,43 @@ public class ServerSessionDelegate exten
                         return;
                     }
 
+
+                    if(method.hasArguments() && 
method.getArguments().containsKey(AMQPFilterTypes.REPLAY_PERIOD.toString()))
+                    {
+                        Object value = 
method.getArguments().get(AMQPFilterTypes.REPLAY_PERIOD.toString());
+                        final long period;
+                        if(value instanceof Number)
+                        {
+                            period = ((Number)value).longValue();
+                        }
+                        else if(value instanceof String)
+                        {
+                            try
+                            {
+                                period = Long.parseLong(value.toString());
+                            }
+                            catch (NumberFormatException e)
+                            {
+                                exception(session, method, 
ExecutionErrorCode.ILLEGAL_ARGUMENT, "Cannot parse value " + value + " as a 
number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString());
+                                return;
+                            }
+                        }
+                        else
+                        {
+                            exception(session, method, 
ExecutionErrorCode.ILLEGAL_ARGUMENT, "Cannot parse value " + value + " as a 
number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString());
+                            return;
+                        }
+                        final long startingFrom = System.currentTimeMillis() - 
(1000l * period);
+                        if(filterManager == null)
+                        {
+                            filterManager = new FilterManager();
+                        }
+                        MessageFilter filter = new 
ArrivalTimeFilter(startingFrom);
+                        filterManager.add(filter.getName(), filter);
+
+                    }
+
+
                     ConsumerTarget_0_10 target = new 
ConsumerTarget_0_10((ServerSession)session, destination,
                                                                                
  method.getAcceptMode(),
                                                                                
  method.getAcquireMode(),
@@ -1609,4 +1649,5 @@ public class ServerSessionDelegate exten
         {
         }
     }
+
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 Tue Mar  3 14:56:40 2015
@@ -59,11 +59,11 @@ import org.apache.qpid.server.consumer.C
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.exchange.ExchangeImpl;
 import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.filter.ArrivalTimeFilter;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.filter.MessageFilter;
-import org.apache.qpid.server.filter.SimpleFilterManager;
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
@@ -673,7 +673,7 @@ public class AMQChannel
      * @param tag       the tag chosen by the client (if null, server will 
generate one)
      * @param sources     the queues to subscribe to
      * @param acks      Are acks enabled for this subscriber
-     * @param filters   Filters to apply to this subscriber
+     * @param arguments   Filters to apply to this subscriber
      *
      * @param exclusive Flag requesting exclusive access to the queue
      * @return the consumer tag. This is returned to the subscriber and used 
in subsequent unsubscribe requests
@@ -681,7 +681,7 @@ public class AMQChannel
      * @throws org.apache.qpid.AMQException                  if something goes 
wrong
      */
     public AMQShortString consumeFromSource(AMQShortString tag, 
Collection<MessageSource> sources, boolean acks,
-                                            FieldTable filters, boolean 
exclusive, boolean noLocal)
+                                            FieldTable arguments, boolean 
exclusive, boolean noLocal)
             throws MessageSource.ExistingConsumerPreventsExclusive,
                    MessageSource.ExistingExclusiveConsumer,
                    AMQInvalidArgumentException,
@@ -700,19 +700,19 @@ public class AMQChannel
         ConsumerTarget_0_8 target;
         EnumSet<ConsumerImpl.Option> options = 
EnumSet.noneOf(ConsumerImpl.Option.class);
 
-        if(filters != null && 
Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue())))
+        if(arguments != null && 
Boolean.TRUE.equals(arguments.get(AMQPFilterTypes.NO_CONSUME.getValue())))
         {
-            target = ConsumerTarget_0_8.createBrowserTarget(this, tag, 
filters, _noAckCreditManager);
+            target = ConsumerTarget_0_8.createBrowserTarget(this, tag, 
arguments, _noAckCreditManager);
         }
         else if(acks)
         {
-            target = ConsumerTarget_0_8.createAckTarget(this, tag, filters, 
_creditManager);
+            target = ConsumerTarget_0_8.createAckTarget(this, tag, arguments, 
_creditManager);
             options.add(ConsumerImpl.Option.ACQUIRES);
             options.add(ConsumerImpl.Option.SEES_REQUEUES);
         }
         else
         {
-            target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, 
_noAckCreditManager);
+            target = ConsumerTarget_0_8.createNoAckTarget(this, tag, 
arguments, _noAckCreditManager);
             options.add(ConsumerImpl.Option.ACQUIRES);
             options.add(ConsumerImpl.Option.SEES_REQUEUES);
         }
@@ -732,23 +732,66 @@ public class AMQChannel
 
         try
         {
-            FilterManager filterManager = 
FilterManagerFactory.createManager(FieldTable.convertToMap(filters));
+            FilterManager filterManager = 
FilterManagerFactory.createManager(FieldTable.convertToMap(arguments));
             if(noLocal)
             {
                 if(filterManager == null)
                 {
-                    filterManager = new SimpleFilterManager();
+                    filterManager = new FilterManager();
                 }
                 final Object connectionReference = getConnectionReference();
-                filterManager.add(new MessageFilter()
+                MessageFilter filter = new MessageFilter()
                 {
+
+                    @Override
+                    public String getName()
+                    {
+                        return AMQPFilterTypes.NO_LOCAL.toString();
+                    }
+
                     @Override
                     public boolean matches(final Filterable message)
                     {
                         return message.getConnectionReference() != 
connectionReference;
                     }
-                });
+                };
+                filterManager.add(filter.getName(), filter);
+            }
+
+            if(arguments != null && 
arguments.containsKey(AMQPFilterTypes.REPLAY_PERIOD.toString()))
+            {
+                Object value = 
arguments.get(AMQPFilterTypes.REPLAY_PERIOD.toString());
+                final long period;
+                if(value instanceof Number)
+                {
+                    period = ((Number)value).longValue();
+                }
+                else if(value instanceof String)
+                {
+                    try
+                    {
+                        period = Long.parseLong(value.toString());
+                    }
+                    catch (NumberFormatException e)
+                    {
+                        throw new AMQInvalidArgumentException("Cannot parse 
value " + value + " as a number for filter " + 
AMQPFilterTypes.REPLAY_PERIOD.toString());
+                    }
+                }
+                else
+                {
+                    throw new AMQInvalidArgumentException("Cannot parse value 
" + value + " as a number for filter " + 
AMQPFilterTypes.REPLAY_PERIOD.toString());
+                }
+
+                final long startingFrom = System.currentTimeMillis() - (1000l 
* period);
+                if(filterManager == null)
+                {
+                    filterManager = new FilterManager();
+                }
+                MessageFilter filter = new ArrivalTimeFilter(startingFrom);
+                filterManager.add(filter.getName(), filter);
+
             }
+
             for(MessageSource source : sources)
             {
                 ConsumerImpl sub =

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
 Tue Mar  3 14:56:40 2015
@@ -1565,7 +1565,7 @@ public class AMQProtocolEngine implement
             virtualHostStr = virtualHostName == null ? null : 
virtualHostName.toString();
         }
 
-        VirtualHostImpl virtualHost = 
((AmqpPort)getPort()).getVirtualHost(virtualHostStr);
+        VirtualHostImpl<?,?,?> virtualHost = 
((AmqpPort)getPort()).getVirtualHost(virtualHostStr);
 
         if (virtualHost == null)
         {
@@ -1578,8 +1578,16 @@ public class AMQProtocolEngine implement
             // Check virtualhost access
             if (virtualHost.getState() != State.ACTIVE)
             {
-                closeConnection(AMQConstant.CONNECTION_FORCED,
-                                "Virtual host '" + virtualHost.getName() + "' 
is not active",0);
+                String redirectHost = virtualHost.getRedirectHost(getPort());
+                if(redirectHost != null)
+                {
+                    closeConnection(0, new AMQFrame(0,new 
ConnectionRedirectBody(getProtocolVersion(),AMQShortString.valueOf(redirectHost),
 null)));
+                }
+                else
+                {
+                    closeConnection(AMQConstant.CONNECTION_FORCED,
+                                    "Virtual host '" + virtualHost.getName() + 
"' is not active", 0);
+                }
 
             }
             else
@@ -1587,21 +1595,29 @@ public class AMQProtocolEngine implement
                 setVirtualHost(virtualHost);
                 try
                 {
-                    
virtualHost.getSecurityManager().authoriseCreateConnection(this);
-                    if (getContextKey() == null)
+
+                    if(virtualHost.authoriseCreateConnection(this))
                     {
-                        setContextKey(new 
AMQShortString(Long.toString(System.currentTimeMillis())));
-                    }
+                        if (getContextKey() == null)
+                        {
+                            setContextKey(new 
AMQShortString(Long.toString(System.currentTimeMillis())));
+                        }
+
+                        MethodRegistry methodRegistry = getMethodRegistry();
+                        AMQMethodBody responseBody = 
methodRegistry.createConnectionOpenOkBody(virtualHostName);
 
-                    MethodRegistry methodRegistry = getMethodRegistry();
-                    AMQMethodBody responseBody = 
methodRegistry.createConnectionOpenOkBody(virtualHostName);
+                        writeFrame(responseBody.generateFrame(0));
+                        _state = ConnectionState.OPEN;
 
-                    writeFrame(responseBody.generateFrame(0));
-                    _state = ConnectionState.OPEN;
+                    }
+                    else
+                    {
+                        closeConnection(AMQConstant.ACCESS_REFUSED, 
"Connection refused",0);
+                    }
                 }
                 catch (AccessControlException e)
                 {
-                    closeConnection(AMQConstant.ACCESS_REFUSED, 
e.getMessage(),0);
+                    closeConnection(AMQConstant.ACCESS_REFUSED, 
e.getMessage(), 0);
                 }
             }
         }
@@ -1767,7 +1783,7 @@ public class AMQProtocolEngine implement
         _logger.info("Locale selected: " + locale);
 
         SubjectCreator subjectCreator = getSubjectCreator();
-        SaslServer ss = null;
+        SaslServer ss;
         try
         {
             ss = subjectCreator.createSaslServer(String.valueOf(mechanism),



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to