Repository: qpid-broker-j Updated Branches: refs/heads/master 37209c589 -> d27237d87
QPID-7933: [Java Broker] Ensure changes made to existing durable children of virtualhost after virtualhost restart are persisted Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/d27237d8 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/d27237d8 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/d27237d8 Branch: refs/heads/master Commit: d27237d87672dff846603e5d04a849116ee47fac Parents: 37209c5 Author: Lorenz Quack <lqu...@apache.org> Authored: Mon Oct 9 16:51:26 2017 +0100 Committer: Lorenz Quack <lqu...@apache.org> Committed: Mon Oct 9 16:51:26 2017 +0100 ---------------------------------------------------------------------- .../VirtualHostStoreUpgraderAndRecoverer.java | 97 +++++++++++--------- .../server/virtualhost/AbstractVirtualHost.java | 13 +-- .../qpid/server/model/VirtualHostTest.java | 51 ++++++++++ 3 files changed, 108 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d27237d8/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java b/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java index f212068..62fc386 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java +++ b/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java @@ -1023,28 +1023,34 @@ public class VirtualHostStoreUpgraderAndRecoverer extends AbstractConfigurationS records, VirtualHost.class.getSimpleName(), VirtualHost.MODEL_VERSION); - recover(durableConfigurationStore, upgradedRecords, isNew); + recover(_virtualHostNode, durableConfigurationStore, upgradedRecords, isNew); return isNew; } public void reloadAndRecover(final DurableConfigurationStore durableConfigurationStore) { + reloadAndRecoverInternal(_virtualHostNode, durableConfigurationStore); + } + + public void reloadAndRecoverVirtualHost(final DurableConfigurationStore durableConfigurationStore) + { + reloadAndRecoverInternal(_virtualHostNode.getVirtualHost(), durableConfigurationStore); + } + + private void reloadAndRecoverInternal(final ConfiguredObject<?> recoveryRoot, + final DurableConfigurationStore durableConfigurationStore) + { final List<ConfiguredObjectRecord> records = new ArrayList<>(); - durableConfigurationStore.reload(new ConfiguredObjectRecordHandler() - { - @Override - public void handle(final ConfiguredObjectRecord record) - { - records.add(record); - } - }); - recover(durableConfigurationStore, records, false); + durableConfigurationStore.reload(records::add); + recover(recoveryRoot, durableConfigurationStore, records, false); } - private void recover(final DurableConfigurationStore durableConfigurationStore, - final List<ConfiguredObjectRecord> records, final boolean isNew) + private void recover(final ConfiguredObject<?> recoveryRoot, + final DurableConfigurationStore durableConfigurationStore, + final List<ConfiguredObjectRecord> records, + final boolean isNew) { - new GenericRecoverer(_virtualHostNode).recover(records, isNew); + new GenericRecoverer(recoveryRoot).recover(records, isNew); final StoreConfigurationChangeListener configChangeListener = new StoreConfigurationChangeListener(durableConfigurationStore); @@ -1065,50 +1071,53 @@ public class VirtualHostStoreUpgraderAndRecoverer extends AbstractConfigurationS } }); } - _virtualHostNode.addChangeListener(new AbstractConfigurationChangeListener() + + if (recoveryRoot instanceof VirtualHostNode) { - @Override - public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child) + _virtualHostNode.addChangeListener(new AbstractConfigurationChangeListener() { - if(child instanceof VirtualHost) + @Override + public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child) { - applyRecursively(child, new RecursiveAction<ConfiguredObject<?>>() + if (child instanceof VirtualHost) { - @Override - public boolean applyToChildren(final ConfiguredObject<?> object) + applyRecursively(child, new RecursiveAction<ConfiguredObject<?>>() { - return object.isDurable(); - } - - @Override - public void performAction(final ConfiguredObject<?> object) - { - if(object.isDurable()) + @Override + public boolean applyToChildren(final ConfiguredObject<?> object) { - durableConfigurationStore.update(true, object.asObjectRecord()); - object.addChangeListener(configChangeListener); + return object.isDurable(); } - } - }); + @Override + public void performAction(final ConfiguredObject<?> object) + { + if (object.isDurable()) + { + durableConfigurationStore.update(true, object.asObjectRecord()); + object.addChangeListener(configChangeListener); + } + } + }); + } } - } - @Override - public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child) - { - if(child instanceof VirtualHost) + @Override + public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child) { - child.removeChangeListener(configChangeListener); - removeVirtualHostConfiguration((VirtualHost<?>) child, durableConfigurationStore); + if (child instanceof VirtualHost) + { + child.removeChangeListener(configChangeListener); + removeVirtualHostConfiguration((VirtualHost<?>) child, durableConfigurationStore); + } } - } - }); - if(isNew) - { - if(_virtualHostNode instanceof AbstractConfiguredObject) + }); + if (isNew) { - ((AbstractConfiguredObject)_virtualHostNode).forceUpdateAllSecureAttributes(); + if (_virtualHostNode instanceof AbstractConfiguredObject) + { + ((AbstractConfiguredObject) _virtualHostNode).forceUpdateAllSecureAttributes(); + } } } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d27237d8/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 4126c84..2a473e6 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -119,12 +119,12 @@ import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; -import org.apache.qpid.server.store.GenericRecoverer; import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreProvider; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.VirtualHostStoreUpgraderAndRecoverer; import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.server.store.handler.DistributedTransactionHandler; import org.apache.qpid.server.store.handler.MessageHandler; @@ -2630,14 +2630,9 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte resetStatistics(); createHousekeepingExecutor(); - final List<ConfiguredObjectRecord> records = new ArrayList<>(); - - // Transitioning to STOPPED will have closed all our children. Now we are transition - // back to ACTIVE, we need to recover and re-open them. - - getDurableConfigurationStore().reload(records::add); - - new GenericRecoverer(this).recover(records, false); + final VirtualHostStoreUpgraderAndRecoverer virtualHostStoreUpgraderAndRecoverer = + new VirtualHostStoreUpgraderAndRecoverer((VirtualHostNode<?>) getParent()); + virtualHostStoreUpgraderAndRecoverer.reloadAndRecoverVirtualHost(getDurableConfigurationStore()); final Collection<VirtualHostAccessControlProvider> accessControlProviders = getChildren(VirtualHostAccessControlProvider.class); if (!accessControlProviders.isEmpty()) http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d27237d8/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java ---------------------------------------------------------------------- diff --git a/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java index 3dce931..b303944 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java +++ b/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java @@ -35,6 +35,7 @@ import static org.mockito.Mockito.when; import java.security.AccessControlException; import java.security.Principal; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -250,6 +251,55 @@ public class VirtualHostTest extends QpidTestCase } + public void testModifyDurableChildAfterRestartingVirtualHost() + { + String virtualHostName = getName(); + + VirtualHost<?> virtualHost = createVirtualHost(virtualHostName); + final ConfiguredObjectRecord virtualHostCor = virtualHost.asObjectRecord(); + + // Give virtualhost a queue and an exchange + Queue queue = virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, "myQueue")); + final ConfiguredObjectRecord queueCor = queue.asObjectRecord(); + + final List<ConfiguredObjectRecord> allObjects = new ArrayList<>(); + allObjects.add(virtualHostCor); + allObjects.add(queueCor); + + ((AbstractConfiguredObject<?>)virtualHost).stop(); + assertEquals("Unexpected state", State.STOPPED, virtualHost.getState()); + + // Setup an answer that will return the configured object records + doAnswer(new Answer() + { + final Iterator<ConfiguredObjectRecord> corIterator = allObjects.iterator(); + + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + ConfiguredObjectRecordHandler handler = (ConfiguredObjectRecordHandler) invocation.getArguments()[0]; + while (corIterator.hasNext()) + { + handler.handle(corIterator.next()); + } + + return null; + } + }).when(_configStore).reload(any(ConfiguredObjectRecordHandler.class)); + + ((AbstractConfiguredObject<?>)virtualHost).start(); + assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState()); + final Collection<Queue> queues = virtualHost.getChildren(Queue.class); + assertEquals("Unexpected number of queues after restart", 1, queues.size()); + + final Queue recoveredQueue = queues.iterator().next(); + recoveredQueue.setAttributes(Collections.singletonMap(ConfiguredObject.DESCRIPTION, "testDescription")); + final ConfiguredObjectRecord recoveredQueueCor = queue.asObjectRecord(); + + verify(_configStore).update(eq(false), matchesRecord(recoveredQueueCor.getId(), recoveredQueueCor.getType())); + } + + public void testStopVirtualHost_ClosesConnections() { String virtualHostName = getName(); @@ -555,6 +605,7 @@ public class VirtualHostTest extends QpidTestCase // Fire the child added event on the node _storeConfigurationChangeListener.childAdded(_virtualHostNode,host); _virtualHost = host; + when(_virtualHostNode.getVirtualHost()).thenReturn(_virtualHost); return host; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org