Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/database/AbstractPasswordFilePrincipalDatabase.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/database/AbstractPasswordFilePrincipalDatabase.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/database/AbstractPasswordFilePrincipalDatabase.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/database/AbstractPasswordFilePrincipalDatabase.java Tue Mar 3 14:56:40 2015 @@ -22,6 +22,8 @@ package org.apache.qpid.server.security. import org.apache.log4j.Logger; import org.apache.qpid.server.security.auth.UsernamePrincipal; +import org.apache.qpid.server.util.BaseAction; +import org.apache.qpid.server.util.FileHelper; import javax.security.auth.callback.PasswordCallback; import javax.security.auth.login.AccountNotFoundException; @@ -36,7 +38,6 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; @@ -45,9 +46,9 @@ public abstract class AbstractPasswordFi protected static final String DEFAULT_ENCODING = "utf-8"; private final Pattern _regexp = Pattern.compile(":"); - private final Map<String, U> _userMap = new HashMap<String, U>(); + private final Map<String, U> _userMap = new HashMap<>(); private final ReentrantLock _userUpdate = new ReentrantLock(); - private final Random _random = new Random(); + private final FileHelper _fileHelper = new FileHelper(); private File _passwordFile; public final void open(File passwordFile) throws IOException @@ -181,7 +182,7 @@ public abstract class AbstractPasswordFi try { _userUpdate.lock(); - final Map<String, U> newUserMap = new HashMap<String, U>(); + final Map<String, U> newUserMap = new HashMap<>(); BufferedReader reader = null; try @@ -224,71 +225,33 @@ public abstract class AbstractPasswordFi protected abstract Logger getLogger(); - protected File createTempFileOnSameFilesystem() - { - File liveFile = _passwordFile; - File tmp; - - do - { - tmp = new File(liveFile.getPath() + _random.nextInt() + ".tmp"); - } - while(tmp.exists()); - tmp.deleteOnExit(); - return tmp; - } - - protected void swapTempFileToLive(final File temp) throws IOException + protected void savePasswordFile() throws IOException { - File live = _passwordFile; - // Remove any existing ".old" file - final File old = new File(live.getAbsoluteFile() + ".old"); - if (old.exists()) + try { - old.delete(); - } + _userUpdate.lock(); - // Create an new ".old" file - if(!live.renameTo(old)) - { - //unable to rename the existing file to the backup name - getLogger().error("Could not backup the existing password file"); - throw new IOException("Could not backup the existing password file"); + _fileHelper.writeFileSafely(_passwordFile.toPath(), new BaseAction<File,IOException>() + { + @Override + public void performAction(File file) throws IOException + { + writeToFile(file); + } + }); } - - // Move temp file to be the new "live" file - if(!temp.renameTo(live)) + finally { - //failed to rename the new file to the required filename - if(!old.renameTo(live)) - { - //unable to return the backup to required filename - getLogger().error( - "Could not rename the new password file into place, and unable to restore original file"); - throw new IOException("Could not rename the new password file into place, and unable to restore original file"); - } - - getLogger().error("Could not rename the new password file into place"); - throw new IOException("Could not rename the new password file into place"); + _userUpdate.unlock(); } } - protected void savePasswordFile() throws IOException + private void writeToFile(File tmp) throws IOException { - try - { - _userUpdate.lock(); - - BufferedReader reader = null; - PrintStream writer = null; - - File tmp = createTempFileOnSameFilesystem(); - - try + try(PrintStream writer = new PrintStream(tmp); + BufferedReader reader = new BufferedReader(new FileReader(_passwordFile))) { - writer = new PrintStream(tmp); - reader = new BufferedReader(new FileReader(_passwordFile)); String line; while ((line = reader.readLine()) != null) @@ -346,32 +309,6 @@ public abstract class AbstractPasswordFi getLogger().error("Unable to create the new password file: " + e); throw new IOException("Unable to create the new password file",e); } - finally - { - - try - { - if (reader != null) - { - reader.close(); - } - } - finally - { - if (writer != null) - { - writer.close(); - } - } - - } - - swapTempFileToLive(tmp); - } - finally - { - _userUpdate.unlock(); - } } protected abstract U createUserFromPassword(Principal principal, char[] passwd);
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/Base64MD5PasswordDatabaseAuthenticationManager.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/Base64MD5PasswordDatabaseAuthenticationManager.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/Base64MD5PasswordDatabaseAuthenticationManager.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/Base64MD5PasswordDatabaseAuthenticationManager.java Tue Mar 3 14:56:40 2015 @@ -28,7 +28,7 @@ import org.apache.qpid.server.model.Mana import org.apache.qpid.server.security.auth.database.Base64MD5PasswordFilePrincipalDatabase; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; -@ManagedObject( category = false, type = "Base64MD5PasswordFile" ) +@ManagedObject( category = false, managesChildren = true, type = "Base64MD5PasswordFile" ) public class Base64MD5PasswordDatabaseAuthenticationManager extends PrincipalDatabaseAuthenticationManager<Base64MD5PasswordDatabaseAuthenticationManager> { Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PlainPasswordDatabaseAuthenticationManager.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PlainPasswordDatabaseAuthenticationManager.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PlainPasswordDatabaseAuthenticationManager.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PlainPasswordDatabaseAuthenticationManager.java Tue Mar 3 14:56:40 2015 @@ -28,7 +28,7 @@ import org.apache.qpid.server.model.Mana import org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; -@ManagedObject( category = false, type = "PlainPasswordFile" ) +@ManagedObject( category = false, managesChildren = true, type = "PlainPasswordFile" ) public class PlainPasswordDatabaseAuthenticationManager extends PrincipalDatabaseAuthenticationManager<PlainPasswordDatabaseAuthenticationManager> { public static final String PROVIDER_TYPE = "PlainPasswordFile"; Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java Tue Mar 3 14:56:40 2015 @@ -23,6 +23,8 @@ package org.apache.qpid.server.security. import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.security.AccessControlException; import java.security.Principal; import java.util.Collection; @@ -40,6 +42,7 @@ import javax.security.sasl.SaslServer; import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.Broker; @@ -56,6 +59,7 @@ import org.apache.qpid.server.security.a import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; import org.apache.qpid.server.security.auth.UsernamePrincipal; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; +import org.apache.qpid.server.util.FileHelper; public abstract class PrincipalDatabaseAuthenticationManager<T extends PrincipalDatabaseAuthenticationManager<T>> extends AbstractAuthenticationManager<T> @@ -96,7 +100,11 @@ public abstract class PrincipalDatabaseA { try { - passwordFile.createNewFile(); + Path path = new FileHelper().createNewFile(passwordFile, getContextValue(String.class, BrokerProperties.POSIX_FILE_PERMISSIONS)); + if (!Files.exists(path)) + { + throw new IllegalConfigurationException(String.format("Cannot create password file at '%s'", _path)); + } } catch (IOException e) { Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupDatabase.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupDatabase.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupDatabase.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupDatabase.java Tue Mar 3 14:56:40 2015 @@ -34,6 +34,8 @@ import java.util.concurrent.ConcurrentSk import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; +import org.apache.qpid.server.util.BaseAction; +import org.apache.qpid.server.util.FileHelper; import org.apache.qpid.server.util.ServerScopedRuntimeException; /** @@ -232,9 +234,9 @@ public class FileGroupDatabase implement } } - private synchronized void writeGroupFile(String groupFile) throws IOException + private synchronized void writeGroupFile(final String groupFile) throws IOException { - Properties propertiesFile = new Properties(); + final Properties propertiesFile = new Properties(); for (String group : _groupToUserMap.keySet()) { @@ -244,19 +246,19 @@ public class FileGroupDatabase implement propertiesFile.setProperty(group + ".users", userList); } - String comment = "Written " + new Date(); - FileOutputStream fileOutputStream = new FileOutputStream(groupFile); - try - { - propertiesFile.store(fileOutputStream, comment); - } - finally + + new FileHelper().writeFileSafely(new File(groupFile).toPath(), new BaseAction<File, IOException>() { - if(fileOutputStream != null) + @Override + public void performAction(File file) throws IOException { - fileOutputStream.close(); + String comment = "Written " + new Date(); + try(FileOutputStream fileOutputStream = new FileOutputStream(file)) + { + propertiesFile.store(fileOutputStream, comment); + } } - } + }); } private void validatePropertyNameIsGroupName(String propertyName) Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java Tue Mar 3 14:56:40 2015 @@ -27,6 +27,8 @@ import java.io.IOException; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -40,6 +42,9 @@ import java.util.Map; import java.util.UUID; import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.BrokerProperties; +import org.apache.qpid.server.util.BaseAction; +import org.apache.qpid.server.util.FileHelper; import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.JsonProcessingException; import org.codehaus.jackson.Version; @@ -85,6 +90,7 @@ public class JsonFileConfigStore impleme private final Map<String, List<UUID>> _idsByType = new HashMap<String, List<UUID>>(); private final ObjectMapper _objectMapper = new ObjectMapper(); private final Class<? extends ConfiguredObject> _rootClass; + private final FileHelper _fileHelper; private Map<String,Class<? extends ConfiguredObject>> _classNameMapping; private String _directoryName; @@ -123,6 +129,7 @@ public class JsonFileConfigStore impleme _objectMapper.registerModule(_module); _objectMapper.enable(SerializationConfig.Feature.INDENT_OUTPUT); _rootClass = rootClass; + _fileHelper = new FileHelper(); } @Override @@ -173,7 +180,7 @@ public class JsonFileConfigStore impleme _directoryName = fileFromSettings.getParent(); _configFileName = fileFromSettings.getName(); _backupFileName = fileFromSettings.getName() + ".bak"; - _tempFileName = fileFromSettings.getName() + ".tmp";; + _tempFileName = fileFromSettings.getName() + ".tmp"; _lockFileName = fileFromSettings.getName() + ".lck"; } @@ -191,56 +198,45 @@ public class JsonFileConfigStore impleme checkDirectoryIsWritable(_directoryName); getFileLock(); - if(!fileExists(_configFileName)) + Path storeFile = new File(_directoryName, _configFileName).toPath(); + Path backupFile = new File(_directoryName, _backupFileName).toPath(); + if(!Files.exists(storeFile)) { - if(!fileExists(_backupFileName)) + if(!Files.exists(backupFile)) { - File newFile = new File(_directoryName, _configFileName); try { - _objectMapper.writeValue(newFile, Collections.emptyMap()); + String posixFileAttributes = _parent.getContextValue(String.class, BrokerProperties.POSIX_FILE_PERMISSIONS); + storeFile = _fileHelper.createNewFile(storeFile, posixFileAttributes); + _objectMapper.writeValue(storeFile.toFile(), Collections.emptyMap()); } catch (IOException e) { - throw new StoreException("Could not write configuration file " + newFile, e); + throw new StoreException("Could not write configuration file " + storeFile, e); } } else { - renameFile(_backupFileName, _configFileName); + try + { + _fileHelper.atomicFileMoveOrReplace(backupFile, storeFile); + } + catch (IOException e) + { + throw new StoreException("Could not move backup to configuration file " + storeFile, e); + } } } - deleteFileIfExists(_backupFileName); - } - private void renameFile(String fromFileName, String toFileName) - { - File toFile = deleteFileIfExists(toFileName); - File fromFile = new File(_directoryName, fromFileName); - - if(!fromFile.renameTo(toFile)) + try { - throw new StoreException("Cannot rename file " + fromFile.getAbsolutePath() + " to " + toFile.getAbsolutePath()); + Files.deleteIfExists(backupFile); } - } - - private File deleteFileIfExists(final String toFileName) - { - File toFile = new File(_directoryName, toFileName); - if(toFile.exists()) + catch (IOException e) { - if(!toFile.delete()) - { - throw new StoreException("Cannot delete file " + toFile.getAbsolutePath()); - } + throw new StoreException("Could not delete backup file " + backupFile, e); } - return toFile; - } - private boolean fileExists(String fileName) - { - File file = new File(_directoryName, fileName); - return file.exists(); } private void getFileLock() @@ -396,7 +392,7 @@ public class JsonFileConfigStore impleme private void save() { UUID rootId = getRootId(); - Map<String, Object> data = null; + final Map<String, Object> data; if (rootId == null) { @@ -409,15 +405,18 @@ public class JsonFileConfigStore impleme try { - deleteFileIfExists(_tempFileName); - deleteFileIfExists(_backupFileName); - - File tmpFile = new File(_directoryName, _tempFileName); - _objectMapper.writeValue(tmpFile, data); - renameFile(_configFileName, _backupFileName); - renameFile(tmpFile.getName(),_configFileName); - tmpFile.delete(); - deleteFileIfExists(_backupFileName); + Path tmpFile = new File(_directoryName, _tempFileName).toPath(); + _fileHelper.writeFileSafely( new File(_directoryName, _configFileName).toPath(), + new File(_directoryName, _backupFileName).toPath(), + tmpFile, + new BaseAction<File, IOException>() + { + @Override + public void performAction(File file) throws IOException + { + _objectMapper.writeValue(file, data); + } + }); } catch (IOException e) { Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java Tue Mar 3 14:56:40 2015 @@ -20,8 +20,10 @@ */ package org.apache.qpid.server.store; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; @@ -30,14 +32,19 @@ import java.util.UUID; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener; import org.apache.qpid.server.filter.FilterSupport; import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.ConfigurationChangeListener; +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.queue.QueueArgumentsConverter; +import org.apache.qpid.server.util.Action; public class VirtualHostStoreUpgraderAndRecoverer { @@ -509,12 +516,100 @@ public class VirtualHostStoreUpgraderAnd } - public void perform(DurableConfigurationStore durableConfigurationStore) + public void perform(final DurableConfigurationStore durableConfigurationStore) { String virtualHostCategory = VirtualHost.class.getSimpleName(); GenericStoreUpgrader upgraderHandler = new GenericStoreUpgrader(virtualHostCategory, VirtualHost.MODEL_VERSION, durableConfigurationStore, _upgraders); upgraderHandler.upgrade(); new GenericRecoverer(_virtualHostNode).recover(upgraderHandler.getRecords()); + + final StoreConfigurationChangeListener configChangeListener = new StoreConfigurationChangeListener(durableConfigurationStore); + if(_virtualHostNode.getVirtualHost() != null) + { + applyRecursively(_virtualHostNode.getVirtualHost(), new Action<ConfiguredObject<?>>() + { + @Override + public void performAction(final ConfiguredObject<?> object) + { + object.addChangeListener(configChangeListener); + } + }); + } + _virtualHostNode.addChangeListener(new ConfigurationChangeListener() + { + @Override + public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState) + { + + } + + @Override + public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child) + { + if(child instanceof VirtualHost) + { + applyRecursively(child, new Action<ConfiguredObject<?>>() + { + @Override + public void performAction(final ConfiguredObject<?> object) + { + if(object.isDurable()) + { + durableConfigurationStore.update(true, object.asObjectRecord()); + object.addChangeListener(configChangeListener); + } + } + }); + + } + } + + @Override + public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child) + { + if(child instanceof VirtualHost) + { + child.removeChangeListener(configChangeListener); + } + } + + @Override + public void attributeSet(final ConfiguredObject<?> object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + + } + }); + } + + private void applyRecursively(final ConfiguredObject<?> object, final Action<ConfiguredObject<?>> action) + { + applyRecursively(object, action, new HashSet<ConfiguredObject<?>>()); + } + + private void applyRecursively(final ConfiguredObject<?> object, + final Action<ConfiguredObject<?>> action, + final HashSet<ConfiguredObject<?>> visited) + { + if(!visited.contains(object)) + { + visited.add(object); + action.performAction(object); + for(Class<? extends ConfiguredObject> childClass : object.getModel().getChildTypes(object.getCategoryClass())) + { + Collection<? extends ConfiguredObject> children = object.getChildren(childClass); + if(children != null) + { + for(ConfiguredObject<?> child : children) + { + applyRecursively(child, action, visited); + } + } + } + } } + } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Tue Mar 3 14:56:40 2015 @@ -36,8 +36,13 @@ import java.util.concurrent.ScheduledThr import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.LoggerFactory; + + public class SelectorThread extends Thread { + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SelectorThread.class); + public static final String IO_THREAD_NAME_PREFIX = "NCS-"; private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>(); private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>(); @@ -165,7 +170,8 @@ public class SelectorThread extends Thre NonBlockingConnection connection = iterator.next(); int period = connection.getTicker().getTimeToNextTick(currentTime); - if (period < 0 || connection.isStateChanged()) + + if (period <= 0 || connection.isStateChanged()) { toBeScheduled.add(connection); connection.getSocketChannel().register(_selector, 0).cancel(); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java Tue Mar 3 14:56:40 2015 @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.util; -public interface Action<T> +public interface Action<T> extends BaseAction<T, RuntimeException> { void performAction(T object); } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Tue Mar 3 14:56:40 2015 @@ -63,6 +63,8 @@ import org.apache.qpid.server.message.Me import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.*; import org.apache.qpid.server.model.adapter.ConnectionAdapter; +import org.apache.qpid.server.model.port.AmqpPort; +import org.apache.qpid.server.plugin.ConnectionValidator; import org.apache.qpid.server.plugin.QpidServiceLoader; import org.apache.qpid.server.plugin.SystemNodeCreator; import org.apache.qpid.server.protocol.AMQConnectionModel; @@ -75,7 +77,6 @@ import org.apache.qpid.server.security.S import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.store.ConfiguredObjectRecord; -import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; @@ -94,6 +95,8 @@ import org.apache.qpid.server.util.MapVa public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> extends AbstractConfiguredObject<X> implements VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>, IConnectionRegistry.RegistryChangeListener, EventListener { + private final Collection<ConnectionValidator> _connectionValidators = new ArrayList<>(); + private static enum BlockingType { STORE, FILESYSTEM }; private static final String USE_ASYNC_RECOVERY = "use_async_message_store_recovery"; @@ -162,6 +165,14 @@ public abstract class AbstractVirtualHos @ManagedAttributeField private int _housekeepingThreadCount; + @ManagedAttributeField + private List<String> _enabledConnectionValidators; + + @ManagedAttributeField + private List<String> _disabledConnectionValidators; + + @ManagedAttributeField + private List<String> _globalAddressDomains; private boolean _useAsyncRecoverer; @@ -212,6 +223,13 @@ public abstract class AbstractVirtualHos { throw new IllegalArgumentException(getClass().getSimpleName() + " must be durable"); } + if(getGlobalAddressDomains() != null) + { + for(String domain : getGlobalAddressDomains()) + { + validateGlobalAddressDomain(domain); + } + } } @Override @@ -230,6 +248,26 @@ public abstract class AbstractVirtualHos throw new IntegrityViolationException("Cannot delete default virtual host '" + getName() + "'"); } } + if(changedAttributes.contains(GLOBAL_ADDRESS_DOMAINS)) + { + VirtualHost<?, ?, ?> virtualHost = (VirtualHost<?, ?, ?>) proxyForValidation; + if(virtualHost.getGlobalAddressDomains() != null) + { + for(String name : virtualHost.getGlobalAddressDomains()) + { + validateGlobalAddressDomain(name); + } + } + } + } + + private void validateGlobalAddressDomain(final String name) + { + String regex = "/(/?)([\\w_\\-:.\\$]+/)*[\\w_\\-:.\\$]+"; + if(!name.matches(regex)) + { + throw new IllegalArgumentException("'"+name+"' is not a valid global address domain"); + } } @Override @@ -243,8 +281,17 @@ public abstract class AbstractVirtualHos { super.validateOnCreate(); validateMessageStoreCreation(); + if(getGlobalAddressDomains() != null) + { + for(String name : getGlobalAddressDomains()) + { + validateGlobalAddressDomain(name); + } + } } + + private void validateMessageStoreCreation() { MessageStore store = createMessageStore(); @@ -293,10 +340,20 @@ public abstract class AbstractVirtualHos _messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); _messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); - addChangeListener(new StoreUpdatingChangeListener()); + _fileSystemMaxUsagePercent = getContextValue(Integer.class, Broker.STORE_FILESYSTEM_MAX_USAGE_PERCENT); - _fileSystemMaxUsagePercent = getContextValue(Integer.class, Broker.STORE_FILESYSTEM_MAX_USAGE_PERCENT); + QpidServiceLoader serviceLoader = new QpidServiceLoader(); + for(ConnectionValidator validator : serviceLoader.instancesOf(ConnectionValidator.class)) + { + if((_enabledConnectionValidators.isEmpty() + && (_disabledConnectionValidators.isEmpty()) || !_disabledConnectionValidators.contains(validator.getType())) + || _enabledConnectionValidators.contains(validator.getType())) + { + _connectionValidators.add(validator); + } + + } } private void checkVHostStateIsActive() @@ -438,6 +495,20 @@ public abstract class AbstractVirtualHos return _eventLogger; } + @Override + public boolean authoriseCreateConnection(final AMQConnectionModel<?, ?> connection) + { + getSecurityManager().authoriseCreateConnection(connection); + for(ConnectionValidator validator : _connectionValidators) + { + if(!validator.validateConnectionCreation(connection)) + { + return false; + } + } + return true; + } + /** * Initialise a housekeeping task to iterate over queues cleaning expired messages with no consumers * and checking for idle or open transactions that have exceeded the permitted thresholds. @@ -526,9 +597,42 @@ public abstract class AbstractVirtualHos } @Override + public List<String> getEnabledConnectionValidators() + { + return _enabledConnectionValidators; + } + + @Override + public List<String> getDisabledConnectionValidators() + { + return _disabledConnectionValidators; + } + + @Override + public List<String> getGlobalAddressDomains() + { + return _globalAddressDomains; + } + + @Override public AMQQueue<?> getQueue(String name) { - return (AMQQueue<?>) getChildByName(Queue.class, name); + AMQQueue<?> childByName = (AMQQueue<?>) getChildByName(Queue.class, name); + if(childByName == null && getGlobalAddressDomains() != null) + { + for(String domain : getGlobalAddressDomains()) + { + if(name.startsWith(domain + "/")) + { + childByName = (AMQQueue<?>) getChildByName(Queue.class,name.substring(domain.length())); + if(childByName != null) + { + break; + } + } + } + } + return childByName; } @Override @@ -556,14 +660,6 @@ public abstract class AbstractVirtualHos { int purged = queue.deleteAndReturnCount(); - if (queue.isDurable() && !(queue.getLifetimePolicy() - == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE - || queue.getLifetimePolicy() - == LifetimePolicy.DELETE_ON_SESSION_END)) - { - DurableConfigurationStore store = getDurableConfigurationStore(); - store.remove(queue.asObjectRecord()); - } return purged; } @@ -614,7 +710,22 @@ public abstract class AbstractVirtualHos @Override public ExchangeImpl getExchange(String name) { - return getChildByName(ExchangeImpl.class,name); + ExchangeImpl childByName = getChildByName(ExchangeImpl.class, name); + if(childByName == null && getGlobalAddressDomains() != null) + { + for(String domain : getGlobalAddressDomains()) + { + if(name.startsWith(domain + "/")) + { + childByName = getChildByName(ExchangeImpl.class,name.substring(domain.length())); + if(childByName != null) + { + break; + } + } + } + } + return childByName; } @Override @@ -671,6 +782,23 @@ public abstract class AbstractVirtualHos exchange.deleteWithChecks(); } + @Override + public String getLocalAddress(final String routingAddress) + { + String localAddress = routingAddress; + if(getGlobalAddressDomains() != null) + { + for(String domain : getGlobalAddressDomains()) + { + if(localAddress.length() > routingAddress.length() - domain.length() && routingAddress.startsWith(domain + "/")) + { + localAddress = routingAddress.substring(domain.length()); + } + } + } + return localAddress; + } + public SecurityManager getSecurityManager() { return _broker.getSecurityManager(); @@ -886,6 +1014,12 @@ public abstract class AbstractVirtualHos } } + @Override + public String getRedirectHost(final AmqpPort<?> port) + { + return null; + } + private class VirtualHostHouseKeepingTask extends HouseKeepingTask { public VirtualHostHouseKeepingTask() @@ -1390,14 +1524,6 @@ public abstract class AbstractVirtualHos return total; } - @Override - protected void onCreate() - { - super.onCreate(); - ConfiguredObjectRecord record = asObjectRecord(); - getDurableConfigurationStore().create(new ConfiguredObjectRecordImpl(record.getId(), record.getType(), record.getAttributes())); - } - @StateTransition( currentState = { State.UNINITIALIZED,State.ERRORED }, desiredState = State.ACTIVE ) private void onActivate() { @@ -1509,44 +1635,6 @@ public abstract class AbstractVirtualHos onActivate(); } - private class StoreUpdatingChangeListener implements ConfigurationChangeListener - { - @Override - public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState) - { - if (object == AbstractVirtualHost.this && isDurable() && newState == State.DELETED) - { - getDurableConfigurationStore().remove(asObjectRecord()); - object.removeChangeListener(this); - } - } - - @Override - public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child) - { - - } - - @Override - public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child) - { - - } - - @Override - public void attributeSet(final ConfiguredObject<?> object, - final String attributeName, - final Object oldAttributeValue, - final Object newAttributeValue) - { - if (object == AbstractVirtualHost.this && isDurable() && getState() != State.DELETED && isAttributePersisted(attributeName) - && !(attributeName.equals(VirtualHost.DESIRED_STATE) && newAttributeValue.equals(State.DELETED))) - { - getDurableConfigurationStore().update(false, asObjectRecord()); - } - } - } - private class FileSystemSpaceChecker extends HouseKeepingTask { private boolean _fileSystemFull; Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Tue Mar 3 14:56:40 2015 @@ -33,6 +33,7 @@ import org.apache.qpid.server.message.Me import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.NoFactoryForTypeException; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.SecurityManager; @@ -108,4 +109,7 @@ public interface VirtualHostImpl< X exte EventLogger getEventLogger(); + boolean authoriseCreateConnection(AMQConnectionModel<?, ?> connection); + + String getLocalAddress(String routingAddress); } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java Tue Mar 3 14:56:40 2015 @@ -309,4 +309,14 @@ public class BrokerOptionsTest extends Q assertEquals("unexpected number of entries", 2, props.keySet().size()); assertEquals("value", props.get("name")); } + + + public void testSetInitialSystemProperties() + { + assertNull("Unexpected default value for initial system properties", _options.getInitialSystemProperties()); + + _options.setInitialSystemProperties("test.properties"); + + assertEquals("test.properties", _options.getInitialSystemProperties()); + } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/binding/BindingImplTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/binding/BindingImplTest.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/binding/BindingImplTest.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/binding/BindingImplTest.java Tue Mar 3 14:56:40 2015 @@ -35,6 +35,8 @@ import org.apache.qpid.server.model.Bind import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.Model; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.test.utils.QpidTestCase; public class BindingImplTest extends QpidTestCase @@ -57,7 +59,11 @@ public class BindingImplTest extends Qpi attributes.put(Binding.ARGUMENTS, arguments); attributes.put(Binding.NAME, getTestName()); AMQQueue queue = mock(AMQQueue.class); + VirtualHostImpl vhost = mock(VirtualHostImpl.class); + SecurityManager securityManager = mock(SecurityManager.class); + when(vhost.getSecurityManager()).thenReturn(securityManager); when(queue.getTaskExecutor()).thenReturn(_taskExecutor); + when(queue.getVirtualHost()).thenReturn(vhost); when(queue.getModel()).thenReturn(_model); ExchangeImpl exchange = mock(ExchangeImpl.class); when(exchange.getTaskExecutor()).thenReturn(_taskExecutor); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java Tue Mar 3 14:56:40 2015 @@ -57,6 +57,7 @@ public class StoreConfigurationChangeLis notifyBrokerStarted(); UUID id = UUID.randomUUID(); ConfiguredObject object = mock(VirtualHost.class); + when(object.isDurable()).thenReturn(true); when(object.getId()).thenReturn(id); ConfiguredObjectRecord record = mock(ConfiguredObjectRecord.class); when(object.asObjectRecord()).thenReturn(record); @@ -69,11 +70,13 @@ public class StoreConfigurationChangeLis notifyBrokerStarted(); Broker broker = mock(Broker.class); when(broker.getCategoryClass()).thenReturn(Broker.class); + when(broker.isDurable()).thenReturn(true); VirtualHost child = mock(VirtualHost.class); when(child.getCategoryClass()).thenReturn(VirtualHost.class); Model model = mock(Model.class); when(model.getChildTypes(any(Class.class))).thenReturn(Collections.<Class<? extends ConfiguredObject>>emptyList()); when(child.getModel()).thenReturn(model); + when(child.isDurable()).thenReturn(true); _listener.childAdded(broker, child); verify(_store).update(eq(true), any(ConfiguredObjectRecord.class)); } @@ -83,15 +86,17 @@ public class StoreConfigurationChangeLis notifyBrokerStarted(); Broker broker = mock(Broker.class); when(broker.getCategoryClass()).thenReturn(Broker.class); + when(broker.isDurable()).thenReturn(true); _listener.attributeSet(broker, Broker.CONNECTION_SESSION_COUNT_LIMIT, null, 1); verify(_store).update(eq(false),any(ConfiguredObjectRecord.class)); } - public void testChildAddedForVirtualHostNode() + public void testChildAddedWhereParentManagesChildStorage() { notifyBrokerStarted(); VirtualHostNode<?> object = mock(VirtualHostNode.class); + when(object.managesChildStorage()).thenReturn(true); VirtualHost<?,?,?> virtualHost = mock(VirtualHost.class); _listener.childAdded(object, virtualHost); verifyNoMoreInteractions(_store); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Tue Mar 3 14:56:40 2015 @@ -38,7 +38,6 @@ import org.apache.qpid.server.configurat import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.filter.MessageFilter; -import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; @@ -103,16 +102,23 @@ public class MockConsumer implements Con { if(_messageIds != null) { - SimpleFilterManager filters = new SimpleFilterManager(); - filters.add(new MessageFilter() + FilterManager filters = new FilterManager(); + MessageFilter filter = new MessageFilter() { @Override + public String getName() + { + return ""; + } + + @Override public boolean matches(final Filterable message) { final String messageId = message.getMessageHeader().getMessageId(); return _messageIds.contains(messageId); } - }); + }; + filters.add(filter.getName(), filter); return filters; } else Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java Tue Mar 3 14:56:40 2015 @@ -45,6 +45,7 @@ import org.mockito.stubbing.Answer; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener; import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.connection.IConnectionRegistry.RegistryChangeListener; @@ -66,6 +67,7 @@ public class VirtualHostTest extends Qpi private VirtualHostNode<?> _virtualHostNode; private DurableConfigurationStore _configStore; private VirtualHost<?, ?, ?> _virtualHost; + private StoreConfigurationChangeListener _storeConfigurationChangeListener; @Override protected void setUp() throws Exception @@ -79,9 +81,13 @@ public class VirtualHostTest extends Qpi when(_broker.getTaskExecutor()).thenReturn(_taskExecutor); _virtualHostNode = mock(VirtualHostNode.class); + when(_virtualHostNode.isDurable()).thenReturn(true); _configStore = mock(DurableConfigurationStore.class); + _storeConfigurationChangeListener = new StoreConfigurationChangeListener(_configStore); + when(_virtualHostNode.getConfigurationStore()).thenReturn(_configStore); + // Virtualhost needs the EventLogger from the SystemContext. when(_virtualHostNode.getParent(Broker.class)).thenReturn(_broker); @@ -122,7 +128,7 @@ public class VirtualHostTest extends Qpi assertEquals("Unexpected name", virtualHostName, virtualHost.getName()); assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState()); - verify(_configStore).create(matchesRecord(virtualHost.getId(), virtualHost.getType())); + verify(_configStore).update(eq(true),matchesRecord(virtualHost.getId(), virtualHost.getType())); } public void testDeleteVirtualHost() @@ -170,7 +176,7 @@ public class VirtualHostTest extends Qpi virtualHost.start(); assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState()); - verify(_configStore, times(1)).create(matchesRecord(virtualHost.getId(), virtualHost.getType())); + verify(_configStore, times(1)).update(eq(true), matchesRecord(virtualHost.getId(), virtualHost.getType())); verify(_configStore, times(2)).update(eq(false), matchesRecord(virtualHost.getId(), virtualHost.getType())); } @@ -293,7 +299,7 @@ public class VirtualHostTest extends Qpi assertNotNull(queue.getId()); assertEquals(queueName, queue.getName()); - verify(_configStore).create(matchesRecord(queue.getId(), queue.getType())); + verify(_configStore).update(eq(true),matchesRecord(queue.getId(), queue.getType())); } public void testCreateNonDurableQueue() @@ -396,7 +402,10 @@ public class VirtualHostTest extends Qpi attributes.put(VirtualHost.TYPE, TestMemoryVirtualHost.VIRTUAL_HOST_TYPE); TestMemoryVirtualHost host = new TestMemoryVirtualHost(attributes, _virtualHostNode); + host.addChangeListener(_storeConfigurationChangeListener); host.create(); + // Fire the child added event on the node + _storeConfigurationChangeListener.childAdded(_virtualHostNode,host); _virtualHost = host; return host; } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/singleton/AbstractConfiguredObjectTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/singleton/AbstractConfiguredObjectTest.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/singleton/AbstractConfiguredObjectTest.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/singleton/AbstractConfiguredObjectTest.java Tue Mar 3 14:56:40 2015 @@ -22,14 +22,18 @@ import java.security.PrivilegedAction; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import javax.security.auth.Subject; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.AbstractConfiguredObject; +import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Model; +import org.apache.qpid.server.model.State; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.test.utils.QpidTestCase; @@ -476,4 +480,84 @@ public class AbstractConfiguredObjectTes } + + public void testAttributeSetListenerFiring() + { + final String objectName = "listenerFiring"; + + Map<String, Object> attributes = new HashMap<>(); + attributes.put(ConfiguredObject.NAME, objectName); + attributes.put(TestSingleton.STRING_VALUE, "first"); + + final TestSingleton object = _model.getObjectFactory().create(TestSingleton.class, attributes); + + final AtomicInteger listenerCount = new AtomicInteger(); + final LinkedHashMap<String, String> updates = new LinkedHashMap<>(); + object.addChangeListener(new NoopConfigurationChangeListener() + { + @Override + public void attributeSet(final ConfiguredObject<?> object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + listenerCount.incrementAndGet(); + String delta = String.valueOf(oldAttributeValue) + "=>" + String.valueOf(newAttributeValue); + updates.put(attributeName, delta); + } + }); + + // Set updated value (should cause listener to fire) + object.setAttributes(Collections.singletonMap(TestSingleton.STRING_VALUE, "second")); + + assertEquals(1, listenerCount.get()); + String delta = updates.remove(TestSingleton.STRING_VALUE); + assertEquals("first=>second", delta); + + // Set unchanged value (should not cause listener to fire) + object.setAttributes(Collections.singletonMap(TestSingleton.STRING_VALUE, "second")); + assertEquals(1, listenerCount.get()); + + // Set value to null (should cause listener to fire) + object.setAttributes(Collections.singletonMap(TestSingleton.STRING_VALUE, null)); + assertEquals(2, listenerCount.get()); + delta = updates.remove(TestSingleton.STRING_VALUE); + assertEquals("second=>null", delta); + + // Set to null again (should not cause listener to fire) + object.setAttributes(Collections.singletonMap(TestSingleton.STRING_VALUE, null)); + assertEquals(2, listenerCount.get()); + + // Set updated value (should cause listener to fire) + object.setAttributes(Collections.singletonMap(TestSingleton.STRING_VALUE, "third")); + assertEquals(3, listenerCount.get()); + delta = updates.remove(TestSingleton.STRING_VALUE); + assertEquals("null=>third", delta); + } + + private static class NoopConfigurationChangeListener implements ConfigurationChangeListener + { + @Override + public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState) + { + } + + @Override + public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child) + { + } + + @Override + public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child) + { + } + + @Override + public void attributeSet(final ConfiguredObject<?> object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + } + } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java Tue Mar 3 14:56:40 2015 @@ -93,6 +93,7 @@ public class LastValueQueueListTest exte ServerMessage message = createTestServerMessage(null); QueueEntry addedEntry = _list.add(message); + addedEntry.acquire(); addedEntry.delete(); int numberOfEntries = countEntries(_list); @@ -113,6 +114,7 @@ public class LastValueQueueListTest exte ServerMessage message = createTestServerMessage(TEST_KEY_VALUE); QueueEntry addedEntry = _list.add(message); + addedEntry.acquire(); addedEntry.delete(); int numberOfEntries = countEntries(_list); @@ -173,6 +175,7 @@ public class LastValueQueueListTest exte assertEquals(1, countEntries(_list)); assertEquals(1, _list.getLatestValuesMap().size()); + addedEntry.acquire(); addedEntry.delete(); assertEquals(0, countEntries(_list)); @@ -193,7 +196,9 @@ public class LastValueQueueListTest exte assertEquals(2, countEntries(_list)); assertEquals(2, _list.getLatestValuesMap().size()); + addedEntry1.acquire(); addedEntry1.delete(); + addedEntry2.acquire(); addedEntry2.delete(); assertEquals(0, countEntries(_list)); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java Tue Mar 3 14:56:40 2015 @@ -113,6 +113,7 @@ public abstract class QueueEntryImplTest */ private void delete() { + _queueEntry.acquire(); _queueEntry.delete(); assertTrue("Queue entry should be in DELETED state after invoking of delete method", _queueEntry.isDeleted()); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java Tue Mar 3 14:56:40 2015 @@ -196,6 +196,7 @@ public abstract class QueueEntryListTest final QueueEntry head = getTestList().getHead(); final QueueEntry first = getTestList().next(head); + first.acquire(); first.delete(); final QueueEntry second = getTestList().next(head); @@ -226,6 +227,7 @@ public abstract class QueueEntryListTest assertNull(list.next(queueEntry2)); //'delete' the 2nd QueueEntry + queueEntry2.acquire(); queueEntry2.delete(); assertTrue("Deleting node should have succeeded", queueEntry2.isDeleted()); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java Tue Mar 3 14:56:40 2015 @@ -109,6 +109,7 @@ public class SimpleQueueEntryImplTest ex public void testTraverseWithDeletedEntries() { // Delete 2nd queue entry + _queueEntry2.acquire(); _queueEntry2.delete(); assertTrue(_queueEntry2.isDeleted()); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java Tue Mar 3 14:56:40 2015 @@ -137,6 +137,7 @@ public class SortedQueueEntryTest extend public void testTraverseWithDeletedEntries() { // Delete 2nd queue entry + _queueEntry3.acquire(); _queueEntry3.delete(); assertTrue(_queueEntry3.isDeleted()); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java Tue Mar 3 14:56:40 2015 @@ -155,7 +155,7 @@ public class StandardQueueEntryListTest public void testScavenge() throws Exception { - OrderedQueueEntryList sqel = new StandardQueueEntryList(null); + OrderedQueueEntryList sqel = new StandardQueueEntryList(mock(StandardQueueImpl.class)); ConcurrentMap<Integer,QueueEntry> entriesMap = new ConcurrentHashMap<Integer,QueueEntry>(); @@ -215,6 +215,7 @@ public class StandardQueueEntryListTest { QueueEntry entry = entriesMap.remove(pos); boolean wasDeleted = entry.isDeleted(); + entry.acquire(); entry.delete(); return entry.isDeleted() && !wasDeleted; } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Tue Mar 3 14:56:40 2015 @@ -184,7 +184,8 @@ public class ServerConnectionDelegate ex vhostName = ""; } - vhost = ((AmqpPort)sconn.getPort()).getVirtualHost(vhostName); + AmqpPort port = (AmqpPort) sconn.getPort(); + vhost = port.getVirtualHost(vhostName); @@ -193,14 +194,28 @@ public class ServerConnectionDelegate ex if (vhost.getState() != State.ACTIVE) { sconn.setState(Connection.State.CLOSING); - sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Virtual host '"+vhostName+"' is not active")); + final String redirectHost = vhost.getRedirectHost(port); + if(redirectHost == null) + { + sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, + "Virtual host '" + vhostName + "' is not active")); + } + else + { + sconn.invoke(new ConnectionRedirect(redirectHost, new ArrayList<Object>())); + } return; } sconn.setVirtualHost(vhost); try { - vhost.getSecurityManager().authoriseCreateConnection(sconn); + if(!vhost.authoriseCreateConnection(sconn)) + { + sconn.setState(Connection.State.CLOSING); + sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Connection not authorized")); + return; + } } catch (AccessControlException e) { @@ -215,7 +230,8 @@ public class ServerConnectionDelegate ex else { sconn.setState(Connection.State.CLOSING); - sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown virtualhost '"+vhostName+"'")); + sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, + "Unknown virtualhost '" + vhostName + "'")); } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Tue Mar 3 14:56:40 2015 @@ -33,6 +33,7 @@ import java.util.UUID; import org.apache.log4j.Logger; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.ServerProtocolEngine; @@ -40,8 +41,10 @@ import org.apache.qpid.server.consumer.C import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; import org.apache.qpid.server.filter.AMQInvalidArgumentException; +import org.apache.qpid.server.filter.ArrivalTimeFilter; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; +import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.message.InstanceProperties; @@ -259,6 +262,43 @@ public class ServerSessionDelegate exten return; } + + if(method.hasArguments() && method.getArguments().containsKey(AMQPFilterTypes.REPLAY_PERIOD.toString())) + { + Object value = method.getArguments().get(AMQPFilterTypes.REPLAY_PERIOD.toString()); + final long period; + if(value instanceof Number) + { + period = ((Number)value).longValue(); + } + else if(value instanceof String) + { + try + { + period = Long.parseLong(value.toString()); + } + catch (NumberFormatException e) + { + exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString()); + return; + } + } + else + { + exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString()); + return; + } + final long startingFrom = System.currentTimeMillis() - (1000l * period); + if(filterManager == null) + { + filterManager = new FilterManager(); + } + MessageFilter filter = new ArrivalTimeFilter(startingFrom); + filterManager.add(filter.getName(), filter); + + } + + ConsumerTarget_0_10 target = new ConsumerTarget_0_10((ServerSession)session, destination, method.getAcceptMode(), method.getAcquireMode(), @@ -1609,4 +1649,5 @@ public class ServerSessionDelegate exten { } } + } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Tue Mar 3 14:56:40 2015 @@ -59,11 +59,11 @@ import org.apache.qpid.server.consumer.C import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.filter.AMQInvalidArgumentException; +import org.apache.qpid.server.filter.ArrivalTimeFilter; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.filter.MessageFilter; -import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; @@ -673,7 +673,7 @@ public class AMQChannel * @param tag the tag chosen by the client (if null, server will generate one) * @param sources the queues to subscribe to * @param acks Are acks enabled for this subscriber - * @param filters Filters to apply to this subscriber + * @param arguments Filters to apply to this subscriber * * @param exclusive Flag requesting exclusive access to the queue * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests @@ -681,7 +681,7 @@ public class AMQChannel * @throws org.apache.qpid.AMQException if something goes wrong */ public AMQShortString consumeFromSource(AMQShortString tag, Collection<MessageSource> sources, boolean acks, - FieldTable filters, boolean exclusive, boolean noLocal) + FieldTable arguments, boolean exclusive, boolean noLocal) throws MessageSource.ExistingConsumerPreventsExclusive, MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException, @@ -700,19 +700,19 @@ public class AMQChannel ConsumerTarget_0_8 target; EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class); - if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue()))) + if(arguments != null && Boolean.TRUE.equals(arguments.get(AMQPFilterTypes.NO_CONSUME.getValue()))) { - target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _noAckCreditManager); + target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments, _noAckCreditManager); } else if(acks) { - target = ConsumerTarget_0_8.createAckTarget(this, tag, filters, _creditManager); + target = ConsumerTarget_0_8.createAckTarget(this, tag, arguments, _creditManager); options.add(ConsumerImpl.Option.ACQUIRES); options.add(ConsumerImpl.Option.SEES_REQUEUES); } else { - target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _noAckCreditManager); + target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments, _noAckCreditManager); options.add(ConsumerImpl.Option.ACQUIRES); options.add(ConsumerImpl.Option.SEES_REQUEUES); } @@ -732,23 +732,66 @@ public class AMQChannel try { - FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(filters)); + FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(arguments)); if(noLocal) { if(filterManager == null) { - filterManager = new SimpleFilterManager(); + filterManager = new FilterManager(); } final Object connectionReference = getConnectionReference(); - filterManager.add(new MessageFilter() + MessageFilter filter = new MessageFilter() { + + @Override + public String getName() + { + return AMQPFilterTypes.NO_LOCAL.toString(); + } + @Override public boolean matches(final Filterable message) { return message.getConnectionReference() != connectionReference; } - }); + }; + filterManager.add(filter.getName(), filter); + } + + if(arguments != null && arguments.containsKey(AMQPFilterTypes.REPLAY_PERIOD.toString())) + { + Object value = arguments.get(AMQPFilterTypes.REPLAY_PERIOD.toString()); + final long period; + if(value instanceof Number) + { + period = ((Number)value).longValue(); + } + else if(value instanceof String) + { + try + { + period = Long.parseLong(value.toString()); + } + catch (NumberFormatException e) + { + throw new AMQInvalidArgumentException("Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString()); + } + } + else + { + throw new AMQInvalidArgumentException("Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString()); + } + + final long startingFrom = System.currentTimeMillis() - (1000l * period); + if(filterManager == null) + { + filterManager = new FilterManager(); + } + MessageFilter filter = new ArrivalTimeFilter(startingFrom); + filterManager.add(filter.getName(), filter); + } + for(MessageSource source : sources) { ConsumerImpl sub = Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1663717&r1=1663716&r2=1663717&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Tue Mar 3 14:56:40 2015 @@ -1565,7 +1565,7 @@ public class AMQProtocolEngine implement virtualHostStr = virtualHostName == null ? null : virtualHostName.toString(); } - VirtualHostImpl virtualHost = ((AmqpPort)getPort()).getVirtualHost(virtualHostStr); + VirtualHostImpl<?,?,?> virtualHost = ((AmqpPort)getPort()).getVirtualHost(virtualHostStr); if (virtualHost == null) { @@ -1578,8 +1578,16 @@ public class AMQProtocolEngine implement // Check virtualhost access if (virtualHost.getState() != State.ACTIVE) { - closeConnection(AMQConstant.CONNECTION_FORCED, - "Virtual host '" + virtualHost.getName() + "' is not active",0); + String redirectHost = virtualHost.getRedirectHost(getPort()); + if(redirectHost != null) + { + closeConnection(0, new AMQFrame(0,new ConnectionRedirectBody(getProtocolVersion(),AMQShortString.valueOf(redirectHost), null))); + } + else + { + closeConnection(AMQConstant.CONNECTION_FORCED, + "Virtual host '" + virtualHost.getName() + "' is not active", 0); + } } else @@ -1587,21 +1595,29 @@ public class AMQProtocolEngine implement setVirtualHost(virtualHost); try { - virtualHost.getSecurityManager().authoriseCreateConnection(this); - if (getContextKey() == null) + + if(virtualHost.authoriseCreateConnection(this)) { - setContextKey(new AMQShortString(Long.toString(System.currentTimeMillis()))); - } + if (getContextKey() == null) + { + setContextKey(new AMQShortString(Long.toString(System.currentTimeMillis()))); + } + + MethodRegistry methodRegistry = getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(virtualHostName); - MethodRegistry methodRegistry = getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(virtualHostName); + writeFrame(responseBody.generateFrame(0)); + _state = ConnectionState.OPEN; - writeFrame(responseBody.generateFrame(0)); - _state = ConnectionState.OPEN; + } + else + { + closeConnection(AMQConstant.ACCESS_REFUSED, "Connection refused",0); + } } catch (AccessControlException e) { - closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(),0); + closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), 0); } } } @@ -1767,7 +1783,7 @@ public class AMQProtocolEngine implement _logger.info("Locale selected: " + locale); SubjectCreator subjectCreator = getSubjectCreator(); - SaslServer ss = null; + SaslServer ss; try { ss = subjectCreator.createSaslServer(String.valueOf(mechanism), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org