Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=827724&r1=827723&r2=827724&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Tue Oct 20 16:23:01 2009 @@ -19,13 +19,11 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.store.TransactionLog; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionList; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.PrincipalHolder; +import org.apache.qpid.server.security.PrincipalHolder; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.QueueActor; import org.apache.qpid.server.logging.subjects.QueueLogSubject; @@ -33,7 +31,7 @@ import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.messages.QueueMessages; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.txn.Transaction; +import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; @@ -61,62 +59,40 @@ { private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class); - private int unused; - - private PrincipalHolder _prinicpalHolder; - - - private Object _exclusiveOwner; - - private Exchange _alternateExchange; - - - static final class QueueContext implements Context - { - volatile QueueEntry _lastSeenEntry; - volatile QueueEntry _releasedEntry; - - public QueueContext(QueueEntry head) - { - _lastSeenEntry = head; - } - - public QueueEntry getLastSeenEntry() - { - return _lastSeenEntry; - } - } - - - static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry> - _lastSeenUpdater = - AtomicReferenceFieldUpdater.newUpdater - (QueueContext.class, QueueEntry.class, "_lastSeenEntry"); - - static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry> - _releasedUpdater = - AtomicReferenceFieldUpdater.newUpdater - (QueueContext.class, QueueEntry.class, "_releasedEntry"); + private final VirtualHost _virtualHost; private final AMQShortString _name; + private final String _resourceName; /** null means shared */ private final AMQShortString _owner; + private PrincipalHolder _prinicpalHolder; + + private Object _exclusiveOwner; + + private final boolean _durable; /** If true, this queue is deleted when the last subscriber is removed */ private final boolean _autoDelete; - private final VirtualHost _virtualHost; + private Exchange _alternateExchange; /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */ private final ExchangeBindings _bindings = new ExchangeBindings(this); - private final AtomicBoolean _deleted = new AtomicBoolean(false); - private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>(); + protected final QueueEntryList _entries; + + protected final SubscriptionList _subscriptionList = new SubscriptionList(this); + + private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead()); + + private volatile Subscription _exclusiveSubscriber; + + private final AtomicInteger _atomicQueueCount = new AtomicInteger(0); @@ -124,18 +100,10 @@ private final AtomicInteger _activeSubscriberCount = new AtomicInteger(); - protected final SubscriptionList _subscriptionList = new SubscriptionList(this); - private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead()); + private final AtomicLong _totalMessagesReceived = new AtomicLong(); - private volatile Subscription _exclusiveSubscriber; - protected final QueueEntryList _entries; - private final AMQQueueMBean _managedObject; - private final Executor _asyncDelivery; - private final AtomicLong _totalMessagesReceived = new AtomicLong(); - - private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>(); /** max allowed size(KB) of a single message */ public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize(); @@ -152,23 +120,37 @@ /** the minimum interval between sending out consecutive alerts of the same type */ public long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap(); - private static final int MAX_ASYNC_DELIVERIES = 10; + private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity(); + + private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity(); private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class); + + static final int MAX_ASYNC_DELIVERIES = 10; + + private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE); private AtomicReference _asynchronousRunner = new AtomicReference(null); + private final Executor _asyncDelivery; private AtomicInteger _deliveredMessages = new AtomicInteger(); private AtomicBoolean _stopped = new AtomicBoolean(false); + + private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>(); + + private final AtomicBoolean _deleted = new AtomicBoolean(false); + private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>(); + + private LogSubject _logSubject; private LogActor _logActor; + private AMQQueueMBean _managedObject; + private static final String SUB_FLUSH_RUNNER = "SUB_FLUSH_RUNNER"; + private boolean _nolocal; - private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity(); - private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity(); protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) - throws AMQException { this(name, durable, owner, autoDelete, virtualHost, new SimpleQueueEntryList.Factory()); } @@ -179,7 +161,6 @@ boolean autoDelete, VirtualHost virtualHost, QueueEntryListFactory entryListFactory) - throws AMQException { if (name == null) @@ -193,6 +174,7 @@ } _name = name; + _resourceName = String.valueOf(name); _durable = durable; _owner = owner; _autoDelete = autoDelete; @@ -231,7 +213,7 @@ } catch (JMException e) { - throw new AMQException("AMQQueue MBean creation has failed ", e); + _logger.error("AMQQueue MBean creation has failed ", e); } resetNotifications(); @@ -255,11 +237,21 @@ // ------ Getters and Setters + public void execute(ReadWriteRunnable runnable) + { + _asyncDelivery.execute(runnable); + } + public AMQShortString getName() { return _name; } + public void setNoLocal(boolean nolocal) + { + _nolocal = nolocal; + } + public boolean isDurable() { return _durable; @@ -401,6 +393,7 @@ if (!isDeleted()) { subscription.setQueue(this, exclusive); + subscription.setNoLocal(_nolocal); _subscriptionList.add(subscription); if (isDeleted()) { @@ -540,7 +533,10 @@ deliverAsync(); } - _managedObject.checkForNotification(entry.getMessage()); + if(_managedObject != null) + { + _managedObject.checkForNotification(entry.getMessage()); + } return entry; } @@ -612,10 +608,10 @@ QueueContext subContext = (QueueContext) sub.getQueueContext(); QueueEntry releasedEntry = subContext._releasedEntry; - _lastSeenUpdater.set(subContext, entry); + QueueContext._lastSeenUpdater.set(subContext, entry); if(releasedEntry == entry) { - _releasedUpdater.compareAndSet(subContext, releasedEntry, null); + QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null); } } @@ -629,7 +625,7 @@ while((oldEntry = subContext._releasedEntry) == null || oldEntry.compareTo(entry) > 0) { - if(_releasedUpdater.compareAndSet(subContext, oldEntry, entry)) + if(QueueContext._releasedUpdater.compareAndSet(subContext, oldEntry, entry)) { break; } @@ -939,11 +935,11 @@ public void moveMessagesToAnotherQueue(final long fromMessageId, final long toMessageId, String queueName, - StoreContext storeContext) + ServerTransaction txn) { - AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); - TransactionLog txnLog = getVirtualHost().getTransactionLog(); + final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); + List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() { @@ -962,62 +958,48 @@ } }); - try + + + // Move the messages in on the message store. + for (final QueueEntry entry : entries) { - txnLog.beginTran(storeContext); + final ServerMessage message = entry.getMessage(); + txn.enqueue(toQueue, message, + new ServerTransaction.Action() + { - // Move the messages in on the message store. - for (QueueEntry entry : entries) - { - ServerMessage message = entry.getMessage(); + public void postCommit() + { + try + { + toQueue.enqueue(message); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + } - if (message.isPersistent() && toQueue.isDurable()) - { - txnLog.enqueueMessage(storeContext, toQueue, message.getMessageNumber()); - } - // dequeue does not decrement the refence count - entry.dequeue(); - } + public void onRollback() + { + entry.release(); + } + }); + txn.dequeue(this, message, + new ServerTransaction.Action() + { - // Commit and flush the move transcations. - try - { - txnLog.commitTran(storeContext); - } - catch (AMQException e) - { - throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e); - } - } - catch (AMQException e) - { - try - { - txnLog.abortTran(storeContext); - } - catch (AMQException rollbackEx) - { - _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx); - } - throw new RuntimeException(e); - } + public void postCommit() + { + entry.discard(); + } - try - { + public void onRollback() + { + + } + }); - for (QueueEntry entry : entries) - { - toQueue.enqueue(entry.getMessage()); - entry.delete(); - } - } - catch (MessageCleanupException e) - { - throw new RuntimeException(e); - } - catch (AMQException e) - { - throw new RuntimeException(e); } } @@ -1025,10 +1007,9 @@ public void copyMessagesToAnotherQueue(final long fromMessageId, final long toMessageId, String queueName, - final StoreContext storeContext) + final ServerTransaction txn) { - AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); - TransactionLog txnLog = getVirtualHost().getTransactionLog(); + final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() { @@ -1046,66 +1027,37 @@ } }); - try + + // Move the messages in on the message store. + for (QueueEntry entry : entries) { - txnLog.beginTran(storeContext); + final ServerMessage message = entry.getMessage(); - // Move the messages in on the message store. - for (QueueEntry entry : entries) + if (message.isPersistent() && toQueue.isDurable()) { - ServerMessage message = entry.getMessage(); - if (message.isPersistent() && toQueue.isDurable()) - { - - txnLog.enqueueMessage(storeContext, toQueue, message.getMessageNumber()); - - } - } + txn.enqueue(toQueue, message, new ServerTransaction.Action() + { + public void postCommit() + { + try + { + toQueue.enqueue(message); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + } - // Commit and flush the move transcations. - try - { - txnLog.commitTran(storeContext); - } - catch (AMQException e) - { - throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e); - } - } - catch (AMQException e) - { - try - { - txnLog.abortTran(storeContext); - } - catch (AMQException rollbackEx) - { - _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx); - } - throw new RuntimeException(e); - } + public void onRollback() + { - try - { - for (QueueEntry entry : entries) - { + } + }); - ServerMessage message = entry.getMessage(); - if (message != null) - { - toQueue.enqueue(entry.getMessage()); - } } } - catch (MessageCleanupException e) - { - throw new RuntimeException(e); - } - catch (AMQException e) - { - throw new RuntimeException(e); - } } @@ -1160,7 +1112,7 @@ QueueEntryIterator queueListIterator = _entries.iterator(); long count = 0; - Transaction txn = new LocalTransaction(getVirtualHost().getTransactionLog()); + ServerTransaction txn = new LocalTransaction(getVirtualHost().getTransactionLog()); while (queueListIterator.advance()) { @@ -1181,14 +1133,14 @@ private void dequeueEntry(final QueueEntry node) { - Transaction txn = new AutoCommitTransaction(getVirtualHost().getTransactionLog()); + ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getTransactionLog()); dequeueEntry(node, txn); } - private void dequeueEntry(final QueueEntry node, Transaction txn) + private void dequeueEntry(final QueueEntry node, ServerTransaction txn) { txn.dequeue(this, node.getMessage(), - new Transaction.Action() + new ServerTransaction.Action() { public void postCommit() @@ -1241,7 +1193,7 @@ } }); - Transaction txn = new LocalTransaction(getVirtualHost().getTransactionLog()); + ServerTransaction txn = new LocalTransaction(getVirtualHost().getTransactionLog()); if(_alternateExchange != null) { @@ -1255,7 +1207,7 @@ if(rerouteQueues != null & rerouteQueues.size() != 0) { txn.enqueue(rerouteQueues, entry.getMessage(), - new Transaction.Action() + new ServerTransaction.Action() { public void postCommit() @@ -1280,7 +1232,7 @@ } }); txn.dequeue(this, entry.getMessage(), - new Transaction.Action() + new ServerTransaction.Action() { public void postCommit() @@ -1308,7 +1260,7 @@ if(message != null) { txn.dequeue(this, message, - new Transaction.Action() + new ServerTransaction.Action() { public void postCommit() @@ -1327,7 +1279,10 @@ txn.commit(); - _managedObject.unregister(); + if(_managedObject!=null) + { + _managedObject.unregister(); + } for (Task task : _deleteTaskList) { @@ -1417,7 +1372,13 @@ public void deliverAsync(Subscription sub) { - _asyncDelivery.execute(new SubFlushRunner(sub)); + SubFlushRunner flusher = (SubFlushRunner) sub.get(SUB_FLUSH_RUNNER); + if(flusher == null) + { + flusher = new SubFlushRunner(sub); + sub.set(SUB_FLUSH_RUNNER, flusher); + } + _asyncDelivery.execute(flusher); } @@ -1466,66 +1427,12 @@ } } - private class SubFlushRunner implements ReadWriteRunnable - { - private final Subscription _sub; - - public SubFlushRunner(Subscription sub) - { - _sub = sub; - } - - public void run() - { - - String originalName = Thread.currentThread().getName(); - try{ - Thread.currentThread().setName("SubFlushRunner-"+_sub); - - boolean complete = false; - try - { - CurrentActor.set(_sub.getLogActor()); - complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES)); - - } - catch (AMQException e) - { - _logger.error(e); - } - finally - { - CurrentActor.remove(); - } - if (!complete && !_sub.isSuspended()) - { - _asyncDelivery.execute(this); - } - } - finally - { - Thread.currentThread().setName(originalName); - } - - } - - public boolean isRead() - { - return false; - } - - public boolean isWrite() - { - return true; - } - } - public void flushSubscription(Subscription sub) throws AMQException { flushSubscription(sub, Long.MAX_VALUE); } - public boolean flushSubscription(Subscription sub, Long iterations) throws AMQException + public boolean flushSubscription(Subscription sub, long iterations) throws AMQException { boolean atTail = false; @@ -1655,9 +1562,9 @@ } } - if(_lastSeenUpdater.compareAndSet(context, lastSeen, node)) + if(QueueContext._lastSeenUpdater.compareAndSet(context, lastSeen, node)) { - _releasedUpdater.compareAndSet(context, releasedNode, null); + QueueContext._releasedUpdater.compareAndSet(context, releasedNode, null); } lastSeen = context._lastSeenEntry; @@ -1774,7 +1681,10 @@ } else { - _managedObject.checkForNotification(node.getMessage()); + if(_managedObject!=null) + { + _managedObject.checkForNotification(node.getMessage()); + } } } @@ -1969,4 +1879,9 @@ _flowResumeCapacity = config.getFlowResumeCapacity(); } } + + public String getResourceName() + { + return _resourceName; + } }
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=827724&r1=827723&r2=827724&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Tue Oct 20 16:23:01 2009 @@ -4,6 +4,7 @@ import org.apache.qpid.server.message.ServerMessage; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.atomic.AtomicLong; /* * @@ -44,9 +45,7 @@ _nextUpdater = AtomicReferenceFieldUpdater.newUpdater (QueueEntryImpl.class, QueueEntryImpl.class, "_next"); - - - + private AtomicLong _deletes = new AtomicLong(0L); public SimpleQueueEntryList(AMQQueue queue) @@ -56,21 +55,77 @@ _tail = _head; } + + void advanceHead() { + _deletes.incrementAndGet(); QueueEntryImpl head = _head.nextNode(); + boolean deleted = head.isDeleted(); while(head._next != null && head.isDeleted()) { + deleted = true; final QueueEntryImpl newhead = head.nextNode(); if(newhead != null) { - _nextUpdater.compareAndSet(_head,head, newhead); + if(_nextUpdater.compareAndSet(_head,head, newhead)) + { + _deletes.decrementAndGet(); + } } head = _head.nextNode(); } + + if(!deleted) + { + deleted = true; + } + + if(_deletes.get() > 1000L) + { + _deletes.set(0L); + scavenge(); + } } + void scavenge() + { + QueueEntryImpl root = _head; + QueueEntryImpl next = root.nextNode(); + + do + { + + + while(next._next != null && next.isDeleted()) + { + + final QueueEntryImpl newhead = next.nextNode(); + if(newhead != null) + { + _nextUpdater.compareAndSet(root,next, newhead); + } + next = root.nextNode(); + } + if(next._next != null) + { + if(!next.isDeleted()) + { + root = next; + next = root.nextNode(); + } + } + else + { + break; + } + + } while (next != null && next._next != null); + + } + + public AMQQueue getQueue() { Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java?rev=827724&view=auto ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java (added) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java Tue Oct 20 16:23:01 2009 @@ -0,0 +1,68 @@ +package org.apache.qpid.server.queue; + +import org.apache.qpid.pool.ReadWriteRunnable; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.AMQException; +import org.apache.log4j.Logger; + + +class SubFlushRunner implements ReadWriteRunnable +{ + private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class); + + + private final Subscription _sub; + private final String _name; + private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES; + + public SubFlushRunner(Subscription sub) + { + _sub = sub; + _name = "SubFlushRunner-"+_sub; + } + + public void run() + { + + + Thread.currentThread().setName(_name); + + boolean complete = false; + try + { + CurrentActor.set(_sub.getLogActor()); + complete = getQueue().flushSubscription(_sub, ITERATIONS); + + } + catch (AMQException e) + { + _logger.error(e); + } + finally + { + CurrentActor.remove(); + } + if (!complete && !_sub.isSuspended()) + { + getQueue().execute(this); + } + + + } + + private SimpleAMQQueue getQueue() + { + return (SimpleAMQQueue) _sub.getQueue(); + } + + public boolean isRead() + { + return false; + } + + public boolean isWrite() + { + return true; + } +} Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java ------------------------------------------------------------------------------ svn:executable = * Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java?rev=827724&r1=827723&r2=827724&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java Tue Oct 20 16:23:01 2009 @@ -35,8 +35,8 @@ import org.apache.qpid.server.security.access.ACLManager; import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalDatabaseManager; import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; -import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; import java.io.File; @@ -51,15 +51,15 @@ public void initialise(int instanceID) throws Exception { - _rootMessageLogger = new RootMessageLoggerImpl(_configuration, + _rootMessageLogger = new RootMessageLoggerImpl(_configuration, new Log4jMessageLogger()); - + _registryName = String.valueOf(instanceID); // Set the Actor for current log messages CurrentActor.set(new BrokerActor(_registryName, _rootMessageLogger)); - CurrentActor.get().message(BrokerMessages.BRK_1001(QpidProperties.getReleaseVersion(),QpidProperties.getBuildVersion())); + CurrentActor.get().message(BrokerMessages.BRK_1001(QpidProperties.getReleaseVersion(),QpidProperties.getBuildVersion())); initialiseManagedObjectRegistry(); @@ -68,7 +68,7 @@ _pluginManager = new PluginManager(_configuration.getPluginDirectory()); _accessManager = new ACLManager(_configuration.getSecurityConfiguration(), _pluginManager); - + _databaseManager = new ConfigurationFilePrincipalDatabaseManager(_configuration); _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null); @@ -99,10 +99,10 @@ } private void initialiseVirtualHosts() throws Exception - { + { for (String name : _configuration.getVirtualHosts()) { - _virtualHostRegistry.registerVirtualHost(new VirtualHost(_configuration.getVirtualHostConfig(name))); + _virtualHostRegistry.registerVirtualHost(new VirtualHostImpl(_configuration.getVirtualHostConfig(name))); } getVirtualHostRegistry().setDefaultVirtualHostName(_configuration.getDefaultVirtualHost()); } Copied: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/PrincipalHolder.java (from r821930, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/PrincipalHolder.java) URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/PrincipalHolder.java?p2=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/PrincipalHolder.java&p1=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/PrincipalHolder.java&r1=821930&r2=827724&rev=827724&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/PrincipalHolder.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/PrincipalHolder.java Tue Oct 20 16:23:01 2009 @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.server; +package org.apache.qpid.server.security; import java.security.Principal; Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/PrincipalHolder.java ------------------------------------------------------------------------------ svn:executable = * Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java?rev=827724&r1=827723&r2=827724&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java Tue Oct 20 16:23:01 2009 @@ -14,9 +14,9 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. + * * - * */ package org.apache.qpid.server.security.access; @@ -32,16 +32,12 @@ import org.apache.log4j.Logger; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.configuration.SecurityConfiguration; -import org.apache.qpid.server.configuration.ServerConfiguration; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.plugins.PluginManager; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult; -import org.apache.qpid.server.security.access.plugins.SimpleXML; +import org.apache.qpid.server.security.PrincipalHolder; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.PrincipalHolder; public class ACLManager { @@ -79,7 +75,7 @@ { _hostPlugins = configurePlugins(hostConfig); } - + public Map<String, ACLPlugin> configurePlugins(SecurityConfiguration hostConfig) throws ConfigurationException { Configuration securityConfig = hostConfig.getConfiguration(); @@ -109,7 +105,7 @@ } } return plugins; - } + } public static Logger getLogger() { @@ -132,18 +128,18 @@ if (result == AuthzResult.DENIED) { // Something vetoed the access, we're done - return false; + return false; } else if (result == AuthzResult.ALLOWED) { - // Remove plugin from global check list since + // Remove plugin from global check list since // host allow overrides global allow remainingPlugins.remove(plugin.getKey()); } } - + for (ACLPlugin plugin : remainingPlugins.values()) - { + { result = checker.allowed(plugin); if (result == AuthzResult.DENIED) { @@ -271,7 +267,7 @@ }); } - + public boolean authorisePublish(final PrincipalHolder session, final boolean immediate, final boolean mandatory, final AMQShortString routingKey, final Exchange e) { Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java?rev=827724&r1=827723&r2=827724&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java Tue Oct 20 16:23:01 2009 @@ -24,10 +24,9 @@ import org.apache.commons.configuration.ConfigurationException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.PrincipalHolder; +import org.apache.qpid.server.security.PrincipalHolder; public interface ACLPlugin { @@ -35,13 +34,13 @@ { ALLOWED, DENIED, - ABSTAIN + ABSTAIN } void setConfiguration(Configuration config) throws ConfigurationException; - // These return true if the plugin thinks the action should be allowed, and false if not. - + // These return true if the plugin thinks the action should be allowed, and false if not. + AuthzResult authoriseBind(PrincipalHolder session, Exchange exch, AMQQueue queue, AMQShortString routingKey); AuthzResult authoriseCreateExchange(PrincipalHolder session, boolean autoDelete, boolean durable, Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java?rev=827724&r1=827723&r2=827724&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java Tue Oct 20 16:23:01 2009 @@ -22,14 +22,13 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.access.ACLPlugin; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.PrincipalHolder; +import org.apache.qpid.server.security.PrincipalHolder; /** - * This ACLPlugin abstains from all votes. Useful if your plugin only cares about a few operations. + * This ACLPlugin abstains from all votes. Useful if your plugin only cares about a few operations. */ public abstract class AbstractACLPlugin implements ACLPlugin { Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java?rev=827724&r1=827723&r2=827724&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java Tue Oct 20 16:23:01 2009 @@ -21,21 +21,19 @@ package org.apache.qpid.server.security.access.plugins; import org.apache.commons.configuration.Configuration; -import org.apache.qpid.AMQConnectionException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.access.ACLPlugin; +import org.apache.qpid.server.security.PrincipalHolder; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.PrincipalHolder; public abstract class BasicACLPlugin implements ACLPlugin { - // Returns true or false if the plugin should authorise or deny the request + // Returns true or false if the plugin should authorise or deny the request protected abstract AuthzResult getResult(); - + public AuthzResult authoriseBind(PrincipalHolder session, Exchange exch, AMQQueue queue, AMQShortString routingKey) { @@ -51,7 +49,7 @@ public AuthzResult authoriseConsume(PrincipalHolder session, boolean noAck, AMQQueue queue) { - return getResult(); + return getResult(); } public AuthzResult authoriseConsume(PrincipalHolder session, Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java?rev=827724&r1=827723&r2=827724&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java Tue Oct 20 16:23:01 2009 @@ -25,15 +25,14 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.access.ACLPlugin; import org.apache.qpid.server.security.access.ACLPluginFactory; import org.apache.qpid.server.security.access.AccessResult; import org.apache.qpid.server.security.access.Permission; import org.apache.qpid.server.security.access.PrincipalPermissions; +import org.apache.qpid.server.security.PrincipalHolder; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.PrincipalHolder; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -57,7 +56,7 @@ return plugin; } }; - + private Map<String, PrincipalPermissions> _users; private final AccessResult GRANTED = new AccessResult(this, AccessResult.AccessStatus.GRANTED); @@ -82,7 +81,7 @@ /** * Publish format takes Exchange + Routing Key Pairs - * + * * @param config * XML Configuration */ Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java?rev=827724&r1=827723&r2=827724&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java Tue Oct 20 16:23:01 2009 @@ -32,12 +32,11 @@ import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.security.access.ACLPlugin; import org.apache.qpid.server.security.access.ACLPluginFactory; import org.apache.qpid.server.security.access.plugins.AbstractACLPlugin; +import org.apache.qpid.server.security.PrincipalHolder; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.PrincipalHolder; import org.apache.qpid.util.NetMatcher; public class FirewallPlugin extends AbstractACLPlugin @@ -59,7 +58,7 @@ return plugin; } }; - + public class FirewallRule { @@ -71,13 +70,13 @@ public FirewallRule(String access, List networks, List hostnames) { _access = (access.equals("allow")) ? AuthzResult.ALLOWED : AuthzResult.DENIED; - + if (networks != null && networks.size() > 0) { String[] networkStrings = objListToStringArray(networks); _network = new NetMatcher(networkStrings); } - + if (hostnames != null && hostnames.size() > 0) { int i = 0; @@ -87,7 +86,7 @@ _hostnamePatterns[i++] = Pattern.compile(hostname); } } - + } private String[] objListToStringArray(List objList) @@ -149,7 +148,7 @@ thread.run(); long endTime = System.currentTimeMillis() + DNS_TIMEOUT; - + while (System.currentTimeMillis() < endTime && !done.get()) { try @@ -183,7 +182,7 @@ if(!(principalHolder instanceof ProtocolEngine)) { return AuthzResult.ABSTAIN; // We only deal with tcp sessions - } + } ProtocolEngine session = (ProtocolEngine) principalHolder; @@ -237,7 +236,7 @@ _default = AuthzResult.DENIED; } CompositeConfiguration finalConfig = new CompositeConfiguration(config); - + List subFiles = config.getList("x...@filename]"); for (Object subFile : subFiles) { @@ -245,7 +244,7 @@ } // all rules must have an access attribute - int numRules = finalConfig.getList("ru...@access]").size(); + int numRules = finalConfig.getList("ru...@access]").size(); _rules = new FirewallRule[numRules]; for (int i = 0; i < numRules; i++) { Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java?rev=827724&r1=827723&r2=827724&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java Tue Oct 20 16:23:01 2009 @@ -215,14 +215,7 @@ _logger.warn("Unable to load access file:" + jmxaccesssFile); } - try - { - _mbean.register(); - } - catch (AMQException e) - { - _logger.warn("Unable to register user management MBean"); - } + _mbean.register(); } catch (JMException e) { Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java?rev=827724&r1=827723&r2=827724&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java Tue Oct 20 16:23:01 2009 @@ -31,12 +31,12 @@ { protected LogSubject _logSubject; - public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration hostConfig) throws Exception + public void configure(VirtualHost virtualHost) throws Exception { _logSubject = new MessageStoreLogSubject(virtualHost, this); CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1001(this.getClass().getName())); } - + public void close() throws Exception { CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_1003()); Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java?rev=827724&view=auto ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java (added) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java Tue Oct 20 16:23:01 2009 @@ -0,0 +1,57 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store; + +import java.nio.ByteBuffer; +import org.apache.qpid.framing.FieldTable; + +public interface ConfigurationRecoveryHandler +{ + QueueRecoveryHandler begin(MessageStore store); + + public static interface QueueRecoveryHandler + { + void queue(String queueName, String owner, FieldTable arguments); + ExchangeRecoveryHandler completeQueueRecovery(); + } + + public static interface ExchangeRecoveryHandler + { + void exchange(String exchangeName, String type, boolean autoDelete); + BindingRecoveryHandler completeExchangeRecovery(); + } + + public static interface BindingRecoveryHandler + { + void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf); + void completeBindingRecovery(); + } + + public static interface QueueEntryRecoveryHandler + { + void complete(); + + void queueEntry(String queueName, long messageId); + } + + + +} Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java ------------------------------------------------------------------------------ svn:executable = * --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org