http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java b/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java deleted file mode 100644 index e747a82..0000000 --- a/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java +++ /dev/null @@ -1,472 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.management.ha; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.fail; - -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.brooklyn.api.entity.proxying.EntitySpec; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.api.management.ha.HighAvailabilityMode; -import org.apache.brooklyn.api.management.ha.ManagementNodeState; -import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord; -import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord; -import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister; -import org.apache.brooklyn.test.entity.LocalManagementContextForTests; -import org.apache.brooklyn.test.entity.TestApplication; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import brooklyn.entity.basic.ApplicationBuilder; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.rebind.PersistenceExceptionHandlerImpl; -import brooklyn.entity.rebind.persister.BrooklynMementoPersisterToObjectStore; -import brooklyn.entity.rebind.persister.InMemoryObjectStore; -import brooklyn.entity.rebind.persister.ListeningObjectStore; -import brooklyn.entity.rebind.persister.PersistMode; -import brooklyn.entity.rebind.persister.PersistenceObjectStore; -import brooklyn.internal.BrooklynFeatureEnablement; -import brooklyn.management.ha.TestEntityFailingRebind.RebindException; -import brooklyn.management.internal.ManagementContextInternal; -import brooklyn.test.Asserts; -import brooklyn.util.collections.MutableList; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; - -import com.google.common.base.Stopwatch; -import com.google.common.base.Ticker; -import com.google.common.collect.ImmutableList; - -@Test -public class HighAvailabilityManagerSplitBrainTest { - - private static final Logger log = LoggerFactory.getLogger(HighAvailabilityManagerSplitBrainTest.class); - - private List<HaMgmtNode> nodes = new MutableList<HighAvailabilityManagerSplitBrainTest.HaMgmtNode>(); - Map<String,String> sharedBackingStore = MutableMap.of(); - Map<String,Date> sharedBackingStoreDates = MutableMap.of(); - private AtomicLong sharedTime; // used to set the ticker's return value - private ClassLoader classLoader = getClass().getClassLoader(); - - public class HaMgmtNode { - // TODO share with HotStandbyTest and WarmStandbyTest and a few others (minor differences but worth it ultimately) - - private ManagementContextInternal mgmt; - private String ownNodeId; - private String nodeName; - private ListeningObjectStore objectStore; - private ManagementPlaneSyncRecordPersister persister; - private HighAvailabilityManagerImpl ha; - private Ticker ticker; - private AtomicLong currentTime; // used to set the ticker's return value - - public void setUp() throws Exception { - if (sharedTime==null) - currentTime = new AtomicLong(System.currentTimeMillis()); - - ticker = new Ticker() { - // strictly not a ticker because returns millis UTC, but it works fine even so - @Override public long read() { - if (sharedTime!=null) return sharedTime.get(); - return currentTime.get(); - } - }; - - nodeName = "node "+nodes.size(); - mgmt = newLocalManagementContext(); - ownNodeId = mgmt.getManagementNodeId(); - objectStore = new ListeningObjectStore(newPersistenceObjectStore()); - objectStore.injectManagementContext(mgmt); - objectStore.prepareForSharedUse(PersistMode.CLEAN, HighAvailabilityMode.DISABLED); - persister = new ManagementPlaneSyncRecordPersisterToObjectStore(mgmt, objectStore, classLoader); - ((ManagementPlaneSyncRecordPersisterToObjectStore)persister).preferRemoteTimestampInMemento(); - BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(objectStore, mgmt.getBrooklynProperties(), classLoader); - mgmt.getRebindManager().setPersister(persisterObj, PersistenceExceptionHandlerImpl.builder().build()); - ha = ((HighAvailabilityManagerImpl)mgmt.getHighAvailabilityManager()) - .setPollPeriod(Duration.PRACTICALLY_FOREVER) - .setHeartbeatTimeout(Duration.THIRTY_SECONDS) - .setLocalTicker(ticker) - .setRemoteTicker(ticker) - .setPersister(persister); - log.info("Created "+nodeName+" "+ownNodeId); - } - - public void tearDown() throws Exception { - if (ha != null) ha.stop(); - if (mgmt != null) Entities.destroyAll(mgmt); - if (objectStore != null) objectStore.deleteCompletely(); - } - - private long tickerCurrentMillis() { - return ticker.read(); - } - - private long tickerAdvance(Duration duration) { - if (sharedTime!=null) - throw new IllegalStateException("Using shared ticker; cannot advance private node clock"); - currentTime.addAndGet(duration.toMilliseconds()); - return tickerCurrentMillis(); - } - - @Override - public String toString() { - return nodeName+" "+ownNodeId; - } - } - - private Boolean prevThrowOnRebind; - - @BeforeMethod(alwaysRun=true) - public void setUp() throws Exception { - prevThrowOnRebind = TestEntityFailingRebind.getThrowOnRebind(); - TestEntityFailingRebind.setThrowOnRebind(true); - nodes.clear(); - sharedBackingStore.clear(); - } - - @AfterMethod(alwaysRun=true) - public void tearDown() throws Exception { - try { - for (HaMgmtNode n: nodes) - n.tearDown(); - } finally { - if (prevThrowOnRebind != null) TestEntityFailingRebind.setThrowOnRebind(prevThrowOnRebind); - } - } - - public HaMgmtNode newNode() throws Exception { - HaMgmtNode node = new HaMgmtNode(); - node.setUp(); - nodes.add(node); - return node; - } - - private void sharedTickerAdvance(Duration duration) { - if (sharedTime==null) { - for (HaMgmtNode n: nodes) - n.tickerAdvance(duration); - } else { - sharedTime.addAndGet(duration.toMilliseconds()); - } - } - - private long sharedTickerCurrentMillis() { - return sharedTime.get(); - } - - protected void useSharedTime() { - if (!nodes.isEmpty()) - throw new IllegalStateException("shared time must be set up before any nodes created"); - sharedTime = new AtomicLong(System.currentTimeMillis()); - } - - protected ManagementContextInternal newLocalManagementContext() { - return new LocalManagementContextForTests(); - } - - protected PersistenceObjectStore newPersistenceObjectStore() { - return new InMemoryObjectStore(sharedBackingStore, sharedBackingStoreDates); - } - - @Test - public void testDoubleRebindFails() throws Exception { - useSharedTime(); - HaMgmtNode n1 = newNode(); - HaMgmtNode n2 = newNode(); - - // first auto should become master - n1.ha.start(HighAvailabilityMode.AUTO); - n2.ha.start(HighAvailabilityMode.AUTO); - assertEquals(n1.ha.getNodeState(), ManagementNodeState.MASTER); - - TestApplication app = ApplicationBuilder.newManagedApp( - EntitySpec.create(TestApplication.class).impl(TestEntityFailingRebind.class), n1.mgmt); - app.start(ImmutableList.<Location>of()); - - n1.mgmt.getRebindManager().forcePersistNow(false, null); - - //don't publish state for a while (i.e. long store delays, failures) - sharedTickerAdvance(Duration.ONE_MINUTE); - - try { - n2.ha.publishAndCheck(false); - fail("n2 rebind failure expected"); - } catch (Exception e) { - assertNestedRebindException(e); - } - - // re-check should re-assert successfully, no rebind expected as he was previously master - n1.ha.publishAndCheck(false); - ManagementPlaneSyncRecord memento; - memento = n1.ha.loadManagementPlaneSyncRecord(true); - assertEquals(memento.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.MASTER); - assertEquals(memento.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.FAILED); - - // hot backup permitted by the TestEntityFailingRebind - n1.ha.changeMode(HighAvailabilityMode.HOT_BACKUP); - memento = n1.ha.loadManagementPlaneSyncRecord(true); - assertEquals(memento.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.HOT_BACKUP); - try { - n1.ha.changeMode(HighAvailabilityMode.MASTER); - fail("n1 rebind failure expected"); - } catch (Exception e) { - assertNestedRebindException(e); - } - - memento = n1.ha.loadManagementPlaneSyncRecord(true); - assertEquals(memento.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.FAILED); - assertEquals(memento.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.FAILED); - } - - @Test - public void testStandbyRebind() throws Exception { - useSharedTime(); - HaMgmtNode n1 = newNode(); - HaMgmtNode n2 = newNode(); - - // first auto should become master - n1.ha.start(HighAvailabilityMode.AUTO); - n2.ha.start(HighAvailabilityMode.AUTO); - - TestApplication app = ApplicationBuilder.newManagedApp( - EntitySpec.create(TestApplication.class).impl(TestEntityFailingRebind.class), n1.mgmt); - app.start(ImmutableList.<Location>of()); - - n1.mgmt.getRebindManager().forcePersistNow(false, null); - - //don't publish state for a while (i.e. long store delays, failures) - sharedTickerAdvance(Duration.ONE_MINUTE); - - try { - n2.ha.publishAndCheck(false); - fail("n2 rebind failure expected"); - } catch (Exception e) { - assertNestedRebindException(e); - } - - TestEntityFailingRebind.setThrowOnRebind(false); - n1.ha.publishAndCheck(false); - - ManagementPlaneSyncRecord memento = n1.ha.loadManagementPlaneSyncRecord(true); - assertEquals(memento.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.MASTER); - assertEquals(memento.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.FAILED); - } - - private void assertNestedRebindException(Throwable t) { - Throwable ptr = t; - while (ptr != null) { - if (ptr instanceof RebindException) { - return; - } - ptr = ptr.getCause(); - } - Exceptions.propagate(t); - } - - @Test - public void testIfNodeStopsBeingAbleToWrite() throws Exception { - useSharedTime(); - log.info("time at start "+sharedTickerCurrentMillis()); - - HaMgmtNode n1 = newNode(); - HaMgmtNode n2 = newNode(); - - // first auto should become master - n1.ha.start(HighAvailabilityMode.AUTO); - ManagementPlaneSyncRecord memento1 = n1.ha.loadManagementPlaneSyncRecord(true); - - log.info(n1+" HA: "+memento1); - assertEquals(memento1.getMasterNodeId(), n1.ownNodeId); - Long time0 = sharedTickerCurrentMillis(); - assertEquals(memento1.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(), time0); - assertEquals(memento1.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.MASTER); - - // second - make explicit hot; that's a strictly more complex case than cold standby, so provides pretty good coverage - n2.ha.start(HighAvailabilityMode.HOT_STANDBY); - ManagementPlaneSyncRecord memento2 = n2.ha.loadManagementPlaneSyncRecord(true); - - log.info(n2+" HA: "+memento2); - assertEquals(memento2.getMasterNodeId(), n1.ownNodeId); - assertEquals(memento2.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.MASTER); - assertEquals(memento2.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.HOT_STANDBY); - assertEquals(memento2.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(), time0); - assertEquals(memento2.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(), time0); - - // and no entities at either - assertEquals(n1.mgmt.getApplications().size(), 0); - assertEquals(n2.mgmt.getApplications().size(), 0); - - // create - TestApplication app = ApplicationBuilder.newManagedApp(EntitySpec.create(TestApplication.class), n1.mgmt); - app.start(ImmutableList.<Location>of()); - app.setAttribute(TestApplication.MY_ATTRIBUTE, "hello"); - - assertEquals(n1.mgmt.getApplications().size(), 1); - assertEquals(n2.mgmt.getApplications().size(), 0); - log.info("persisting "+n1.ownNodeId); - n1.mgmt.getRebindManager().forcePersistNow(false, null); - - n1.objectStore.setWritesFailSilently(true); - log.info(n1+" writes off"); - sharedTickerAdvance(Duration.ONE_MINUTE); - log.info("time now "+sharedTickerCurrentMillis()); - Long time1 = sharedTickerCurrentMillis(); - - log.info("publish "+n2.ownNodeId); - n2.ha.publishAndCheck(false); - ManagementPlaneSyncRecord memento2b = n2.ha.loadManagementPlaneSyncRecord(true); - log.info(n2+" HA now: "+memento2b); - - // n2 infers n1 as failed - assertEquals(memento2b.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.FAILED); - assertEquals(memento2b.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.MASTER); - assertEquals(memento2b.getMasterNodeId(), n2.ownNodeId); - assertEquals(memento2b.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(), time0); - assertEquals(memento2b.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(), time1); - - assertEquals(n1.mgmt.getApplications().size(), 1); - assertEquals(n2.mgmt.getApplications().size(), 1); - assertEquals(n1.mgmt.getApplications().iterator().next().getAttribute(TestApplication.MY_ATTRIBUTE), "hello"); - - n1.objectStore.setWritesFailSilently(false); - log.info(n1+" writes on"); - - sharedTickerAdvance(Duration.ONE_SECOND); - log.info("time now "+sharedTickerCurrentMillis()); - Long time2 = sharedTickerCurrentMillis(); - - log.info("publish "+n1.ownNodeId); - n1.ha.publishAndCheck(false); - ManagementPlaneSyncRecord memento1b = n1.ha.loadManagementPlaneSyncRecord(true); - log.info(n1+" HA now: "+memento1b); - - ManagementNodeState expectedStateAfterDemotion = BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_DEFAULT_STANDBY_IS_HOT_PROPERTY) ? - ManagementNodeState.HOT_STANDBY : ManagementNodeState.STANDBY; - - // n1 comes back and demotes himself - assertEquals(memento1b.getManagementNodes().get(n1.ownNodeId).getStatus(), expectedStateAfterDemotion); - assertEquals(memento1b.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.MASTER); - assertEquals(memento1b.getMasterNodeId(), n2.ownNodeId); - assertEquals(memento1b.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(), time2); - assertEquals(memento1b.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(), time1); - - // n2 now sees itself as master, with n1 in standby again - ManagementPlaneSyncRecord memento2c = n2.ha.loadManagementPlaneSyncRecord(true); - log.info(n2+" HA now: "+memento2c); - assertEquals(memento2c.getManagementNodes().get(n1.ownNodeId).getStatus(), expectedStateAfterDemotion); - assertEquals(memento2c.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.MASTER); - assertEquals(memento2c.getMasterNodeId(), n2.ownNodeId); - assertEquals(memento2c.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(), time2); - assertEquals(memento2c.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(), time2); - - // right number of entities at n2; n1 may or may not depending whether hot standby is default - assertEquals(n2.mgmt.getApplications().size(), 1); - assertEquals(n1.mgmt.getApplications().size(), BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_DEFAULT_STANDBY_IS_HOT_PROPERTY) ? 1 : 0); - } - - @Test(invocationCount=50, groups="Integration") - public void testIfNodeStopsBeingAbleToWriteManyTimes() throws Exception { - testIfNodeStopsBeingAbleToWrite(); - } - - @Test - public void testSimultaneousStartup() throws Exception { - doTestConcurrentStartup(5, null); - } - - @Test - public void testNearSimultaneousStartup() throws Exception { - doTestConcurrentStartup(20, Duration.millis(20)); - } - - @Test(invocationCount=50, groups="Integration") - public void testNearSimultaneousStartupManyTimes() throws Exception { - doTestConcurrentStartup(20, Duration.millis(20)); - } - - protected void doTestConcurrentStartup(int size, final Duration staggerStart) throws Exception { - useSharedTime(); - - List<Thread> spawned = MutableList.of(); - for (int i=0; i<size; i++) { - final HaMgmtNode n = newNode(); - Thread t = new Thread() { public void run() { - if (staggerStart!=null) Time.sleep(staggerStart.multiply(Math.random())); - n.ha.start(HighAvailabilityMode.AUTO); - n.ha.setPollPeriod(Duration.millis(20)); - } }; - spawned.add(t); - t.start(); - } - - try { - final Stopwatch timer = Stopwatch.createStarted(); - Asserts.succeedsEventually(new Runnable() { - @Override public void run() { - ManagementPlaneSyncRecord memento = nodes.get(0).ha.loadManagementPlaneSyncRecord(true); - List<ManagementNodeState> counts = MutableList.of(), savedCounts = MutableList.of(); - for (HaMgmtNode n: nodes) { - counts.add(n.ha.getNodeState()); - ManagementNodeSyncRecord m = memento.getManagementNodes().get(n.ownNodeId); - if (m!=null) { - savedCounts.add(m.getStatus()); - } - } - log.info("while starting "+nodes.size()+" nodes: " - +Collections.frequency(counts, ManagementNodeState.MASTER)+" M + " - +Collections.frequency(counts, ManagementNodeState.HOT_STANDBY)+" hot + " - +Collections.frequency(counts, ManagementNodeState.STANDBY)+" warm + " - +Collections.frequency(counts, ManagementNodeState.INITIALIZING)+" init; " - + memento.getManagementNodes().size()+" saved, " - +Collections.frequency(savedCounts, ManagementNodeState.MASTER)+" M + " - +Collections.frequency(savedCounts, ManagementNodeState.HOT_STANDBY)+" hot + " - +Collections.frequency(savedCounts, ManagementNodeState.STANDBY)+" warm + " - +Collections.frequency(savedCounts, ManagementNodeState.INITIALIZING)+" init"); - - if (timer.isRunning() && Duration.of(timer).compareTo(Duration.TEN_SECONDS)>0) { - log.warn("we seem to have a problem stabilizing"); //handy place to set a suspend-VM breakpoint! - timer.stop(); - } - assertEquals(Collections.frequency(counts, ManagementNodeState.MASTER), 1); - assertEquals(Collections.frequency(counts, ManagementNodeState.HOT_STANDBY)+Collections.frequency(counts, ManagementNodeState.STANDBY), nodes.size()-1); - assertEquals(Collections.frequency(savedCounts, ManagementNodeState.MASTER), 1); - assertEquals(Collections.frequency(savedCounts, ManagementNodeState.HOT_STANDBY)+Collections.frequency(savedCounts, ManagementNodeState.STANDBY), nodes.size()-1); - }}); - } catch (Throwable t) { - log.warn("Failed to stabilize (rethrowing): "+t, t); - throw Exceptions.propagate(t); - } - - for (Thread t: spawned) - t.join(Duration.THIRTY_SECONDS.toMilliseconds()); - } - - -}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java b/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java deleted file mode 100644 index 57a5d2e..0000000 --- a/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java +++ /dev/null @@ -1,284 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.management.ha; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; - -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.brooklyn.api.management.ha.HighAvailabilityMode; -import org.apache.brooklyn.api.management.ha.ManagementNodeState; -import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord; -import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord; -import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister; -import org.apache.brooklyn.test.entity.LocalManagementContextForTests; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import brooklyn.BrooklynVersion; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.rebind.PersistenceExceptionHandlerImpl; -import brooklyn.entity.rebind.persister.BrooklynMementoPersisterToObjectStore; -import brooklyn.entity.rebind.persister.PersistMode; -import brooklyn.entity.rebind.persister.PersistenceObjectStore; -import brooklyn.entity.rebind.plane.dto.BasicManagementNodeSyncRecord; -import brooklyn.entity.rebind.plane.dto.BasicManagementNodeSyncRecord.Builder; -import brooklyn.management.ha.HighAvailabilityManagerImpl.PromotionListener; -import brooklyn.management.internal.ManagementContextInternal; -import brooklyn.test.Asserts; -import brooklyn.util.time.Duration; - -import com.google.common.base.Ticker; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; - -@Test -public abstract class HighAvailabilityManagerTestFixture { - - @SuppressWarnings("unused") - private static final Logger log = LoggerFactory.getLogger(HighAvailabilityManagerTestFixture.class); - - private ManagementPlaneSyncRecordPersister persister; - protected ManagementContextInternal managementContext; - private String ownNodeId; - private HighAvailabilityManagerImpl manager; - private Ticker ticker; - private AtomicLong currentTime; // used to set the ticker's return value - private RecordingPromotionListener promotionListener; - private ClassLoader classLoader = getClass().getClassLoader(); - private PersistenceObjectStore objectStore; - - @BeforeMethod(alwaysRun=true) - public void setUp() throws Exception { - currentTime = new AtomicLong(1000000000L); - ticker = new Ticker() { - // strictly not a ticker because returns millis UTC, but it works fine even so - @Override public long read() { - return currentTime.get(); - } - }; - promotionListener = new RecordingPromotionListener(); - managementContext = newLocalManagementContext(); - ownNodeId = managementContext.getManagementNodeId(); - objectStore = newPersistenceObjectStore(); - objectStore.injectManagementContext(managementContext); - objectStore.prepareForSharedUse(PersistMode.CLEAN, HighAvailabilityMode.DISABLED); - persister = new ManagementPlaneSyncRecordPersisterToObjectStore(managementContext, objectStore, classLoader); - ((ManagementPlaneSyncRecordPersisterToObjectStore)persister).preferRemoteTimestampInMemento(); - BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore( - objectStore, - managementContext.getBrooklynProperties(), - classLoader); - managementContext.getRebindManager().setPersister(persisterObj, PersistenceExceptionHandlerImpl.builder().build()); - manager = ((HighAvailabilityManagerImpl)managementContext.getHighAvailabilityManager()) - .setPollPeriod(getPollPeriod()) - .setHeartbeatTimeout(Duration.THIRTY_SECONDS) - .setPromotionListener(promotionListener) - .setLocalTicker(ticker) - .setRemoteTicker(getRemoteTicker()) - .setPersister(persister); - persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder() - .node(newManagerMemento(ownNodeId, ManagementNodeState.HOT_STANDBY)) - .build()); - - } - - protected ManagementContextInternal newLocalManagementContext() { - return LocalManagementContextForTests.newInstance(); - } - - protected abstract PersistenceObjectStore newPersistenceObjectStore(); - - @AfterMethod(alwaysRun=true) - public void tearDown() throws Exception { - if (manager != null) manager.stop(); - if (managementContext != null) Entities.destroyAll(managementContext); - if (objectStore != null) objectStore.deleteCompletely(); - } - - // The web-console could still be polling (e.g. if have just restarted brooklyn), before the persister is set. - // Must not throw NPE, but instead return something sensible (e.g. an empty state record). - @Test - public void testGetManagementPlaneSyncStateDoesNotThrowNpeBeforePersisterSet() throws Exception { - HighAvailabilityManagerImpl manager2 = new HighAvailabilityManagerImpl(managementContext) - .setPollPeriod(Duration.millis(10)) - .setHeartbeatTimeout(Duration.THIRTY_SECONDS) - .setPromotionListener(promotionListener) - .setLocalTicker(ticker) - .setRemoteTicker(ticker); - try { - ManagementPlaneSyncRecord state = manager2.loadManagementPlaneSyncRecord(true); - assertNotNull(state); - } finally { - manager2.stop(); - } - - } - // Can get a log.error about our management node's heartbeat being out of date. Caused by - // poller first writing a heartbeat record, and then the clock being incremented. But the - // next poll fixes it. - public void testPromotes() throws Exception { - persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder() - .node(newManagerMemento(ownNodeId, ManagementNodeState.HOT_STANDBY)) - .node(newManagerMemento("node1", ManagementNodeState.MASTER)) - .setMaster("node1") - .build()); - - manager.start(HighAvailabilityMode.AUTO); - - // Simulate passage of time; ticker used by this HA-manager so it will "correctly" publish - // its own heartbeat with the new time; but node1's record is now out-of-date. - tickerAdvance(Duration.seconds(31)); - - // Expect to be notified of our promotion, as the only other node - promotionListener.assertCalledEventually(); - } - - @Test(groups="Integration") // because one second wait in succeedsContinually - public void testDoesNotPromoteIfMasterTimeoutNotExpired() throws Exception { - persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder() - .node(newManagerMemento(ownNodeId, ManagementNodeState.HOT_STANDBY)) - .node(newManagerMemento("node1", ManagementNodeState.MASTER)) - .setMaster("node1") - .build()); - - manager.start(HighAvailabilityMode.AUTO); - - tickerAdvance(Duration.seconds(25)); - - // Expect not to be notified, as 25s < 30s timeout - // (it's normally a fake clock so won't hit 30, even waiting 1s below - but in "IntegrationTest" subclasses it is real!) - Asserts.succeedsContinually(new Runnable() { - @Override public void run() { - assertTrue(promotionListener.callTimestamps.isEmpty(), "calls="+promotionListener.callTimestamps); - }}); - } - - public void testGetManagementPlaneStatus() throws Exception { - // with the name zzzzz the mgr created here should never be promoted by the alphabetical strategy! - - tickerAdvance(Duration.FIVE_SECONDS); - persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder() - .node(newManagerMemento(ownNodeId, ManagementNodeState.STANDBY)) - .node(newManagerMemento("zzzzzzz_node1", ManagementNodeState.STANDBY)) - .build()); - persister.loadSyncRecord(); - long zzzTime = tickerCurrentMillis(); - tickerAdvance(Duration.FIVE_SECONDS); - - manager.start(HighAvailabilityMode.AUTO); - ManagementPlaneSyncRecord memento = manager.loadManagementPlaneSyncRecord(true); - - // Note can assert timestamp because not "real" time; it's using our own Ticker - assertEquals(memento.getMasterNodeId(), ownNodeId); - assertEquals(memento.getManagementNodes().keySet(), ImmutableSet.of(ownNodeId, "zzzzzzz_node1")); - assertEquals(memento.getManagementNodes().get(ownNodeId).getNodeId(), ownNodeId); - assertEquals(memento.getManagementNodes().get(ownNodeId).getStatus(), ManagementNodeState.MASTER); - assertEquals(memento.getManagementNodes().get(ownNodeId).getLocalTimestamp(), tickerCurrentMillis()); - assertEquals(memento.getManagementNodes().get("zzzzzzz_node1").getNodeId(), "zzzzzzz_node1"); - assertEquals(memento.getManagementNodes().get("zzzzzzz_node1").getStatus(), ManagementNodeState.STANDBY); - assertEquals(memento.getManagementNodes().get("zzzzzzz_node1").getLocalTimestamp(), zzzTime); - } - - @Test(groups="Integration", invocationCount=50) //because we have had non-deterministic failures - public void testGetManagementPlaneStatusManyTimes() throws Exception { - testGetManagementPlaneStatus(); - } - - @Test - public void testGetManagementPlaneSyncStateInfersTimedOutNodeAsFailed() throws Exception { - persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder() - .node(newManagerMemento(ownNodeId, ManagementNodeState.HOT_STANDBY)) - .node(newManagerMemento("node1", ManagementNodeState.MASTER)) - .setMaster("node1") - .build()); - - manager.start(HighAvailabilityMode.HOT_STANDBY); - - ManagementPlaneSyncRecord state = manager.loadManagementPlaneSyncRecord(true); - assertEquals(state.getManagementNodes().get("node1").getStatus(), ManagementNodeState.MASTER); - assertEquals(state.getManagementNodes().get(ownNodeId).getStatus(), ManagementNodeState.HOT_STANDBY); - - // Simulate passage of time; ticker used by this HA-manager so it will "correctly" publish - // its own heartbeat with the new time; but node1's record is now out-of-date. - tickerAdvance(Duration.seconds(31)); - - ManagementPlaneSyncRecord state2 = manager.loadManagementPlaneSyncRecord(true); - assertEquals(state2.getManagementNodes().get("node1").getStatus(), ManagementNodeState.FAILED); - assertNotEquals(state.getManagementNodes().get(ownNodeId).getStatus(), ManagementNodeState.FAILED); - } - - protected Duration getPollPeriod() { - return Duration.millis(10); - } - - protected long tickerCurrentMillis() { - return ticker.read(); - } - - protected long tickerAdvance(Duration duration) { - currentTime.addAndGet(duration.toMilliseconds()); - return tickerCurrentMillis(); - } - - protected Ticker getRemoteTicker() { - return ticker; - } - - protected ManagementNodeSyncRecord newManagerMemento(String nodeId, ManagementNodeState status) { - Builder rb = BasicManagementNodeSyncRecord.builder(); - rb.brooklynVersion(BrooklynVersion.get()).nodeId(nodeId).status(status); - rb.localTimestamp(tickerCurrentMillis()); - if (getRemoteTicker()!=null) - rb.remoteTimestamp(getRemoteTicker().read()); - return rb.build(); - } - - public static class RecordingPromotionListener implements PromotionListener { - public final List<Long> callTimestamps = Lists.newCopyOnWriteArrayList(); - - @Override - public void promotingToMaster() { - callTimestamps.add(System.currentTimeMillis()); - } - - public void assertNotCalled() { - assertTrue(callTimestamps.isEmpty(), "calls="+callTimestamps); - } - - public void assertCalled() { - assertFalse(callTimestamps.isEmpty(), "calls="+callTimestamps); - } - - public void assertCalledEventually() { - Asserts.succeedsEventually(new Runnable() { - @Override public void run() { - assertCalled(); - }}); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/brooklyn/management/ha/HotStandbyTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/management/ha/HotStandbyTest.java b/core/src/test/java/brooklyn/management/ha/HotStandbyTest.java deleted file mode 100644 index f061289..0000000 --- a/core/src/test/java/brooklyn/management/ha/HotStandbyTest.java +++ /dev/null @@ -1,665 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.management.ha; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; - -import java.util.ArrayDeque; -import java.util.Date; -import java.util.Deque; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; - -import org.apache.brooklyn.api.entity.Application; -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.Feed; -import org.apache.brooklyn.api.entity.proxying.EntitySpec; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.api.location.LocationSpec; -import org.apache.brooklyn.api.management.ha.HighAvailabilityMode; -import org.apache.brooklyn.api.management.ha.ManagementNodeState; -import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister; -import org.apache.brooklyn.test.EntityTestUtils; -import org.apache.brooklyn.test.entity.LocalManagementContextForTests; -import org.apache.brooklyn.test.entity.TestApplication; -import org.apache.brooklyn.test.entity.TestEntity; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import brooklyn.entity.basic.Entities; -import brooklyn.entity.rebind.PersistenceExceptionHandlerImpl; -import brooklyn.entity.rebind.RebindFeedTest.MyEntityWithFunctionFeedImpl; -import brooklyn.entity.rebind.RebindFeedTest.MyEntityWithNewFeedsEachTimeImpl; -import brooklyn.entity.rebind.RebindManagerImpl; -import brooklyn.entity.rebind.RebindTestFixture; -import brooklyn.entity.rebind.persister.BrooklynMementoPersisterToObjectStore; -import brooklyn.entity.rebind.persister.InMemoryObjectStore; -import brooklyn.entity.rebind.persister.ListeningObjectStore; -import brooklyn.entity.rebind.persister.PersistMode; -import brooklyn.entity.rebind.persister.PersistenceObjectStore; - -import org.apache.brooklyn.location.basic.LocalhostMachineProvisioningLocation.LocalhostMachine; - -import brooklyn.management.internal.AbstractManagementContext; -import brooklyn.management.internal.ManagementContextInternal; -import brooklyn.util.collections.MutableList; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.javalang.JavaClassNames; -import brooklyn.util.repeat.Repeater; -import brooklyn.util.text.ByteSizeStrings; -import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; - -import com.google.common.collect.Iterables; - -public class HotStandbyTest { - - private static final Logger log = LoggerFactory.getLogger(HotStandbyTest.class); - - private List<HaMgmtNode> nodes = new MutableList<HotStandbyTest.HaMgmtNode>(); - Map<String,String> sharedBackingStore = MutableMap.of(); - Map<String,Date> sharedBackingStoreDates = MutableMap.of(); - private ClassLoader classLoader = getClass().getClassLoader(); - - public class HaMgmtNode { - // TODO share with WarmStandbyTest and SplitBrainTest and a few others (minor differences but worth it ultimately) - - private ManagementContextInternal mgmt; - private String ownNodeId; - private String nodeName; - private ListeningObjectStore objectStore; - private ManagementPlaneSyncRecordPersister persister; - private HighAvailabilityManagerImpl ha; - private Duration persistOrRebindPeriod = Duration.ONE_SECOND; - - public void setUp() throws Exception { - nodeName = "node "+nodes.size(); - mgmt = newLocalManagementContext(); - ownNodeId = mgmt.getManagementNodeId(); - objectStore = new ListeningObjectStore(newPersistenceObjectStore()); - objectStore.injectManagementContext(mgmt); - objectStore.prepareForSharedUse(PersistMode.CLEAN, HighAvailabilityMode.DISABLED); - persister = new ManagementPlaneSyncRecordPersisterToObjectStore(mgmt, objectStore, classLoader); - ((ManagementPlaneSyncRecordPersisterToObjectStore)persister).preferRemoteTimestampInMemento(); - BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(objectStore, mgmt.getBrooklynProperties(), classLoader); - ((RebindManagerImpl)mgmt.getRebindManager()).setPeriodicPersistPeriod(persistOrRebindPeriod); - mgmt.getRebindManager().setPersister(persisterObj, PersistenceExceptionHandlerImpl.builder().build()); - ha = ((HighAvailabilityManagerImpl)mgmt.getHighAvailabilityManager()) - .setPollPeriod(Duration.PRACTICALLY_FOREVER) - .setHeartbeatTimeout(Duration.THIRTY_SECONDS) - .setPersister(persister); - log.info("Created "+nodeName+" "+ownNodeId); - } - - public void tearDownThisOnly() throws Exception { - if (ha != null) ha.stop(); - if (mgmt!=null) mgmt.getRebindManager().stop(); - if (mgmt != null) Entities.destroyAll(mgmt); - } - - public void tearDownAll() throws Exception { - tearDownThisOnly(); - // can't delete the object store until all being torn down - if (objectStore != null) objectStore.deleteCompletely(); - } - - @Override - public String toString() { - return nodeName+" "+ownNodeId; - } - - public RebindManagerImpl rebinder() { - return (RebindManagerImpl)mgmt.getRebindManager(); - } - } - - @BeforeMethod(alwaysRun=true) - public void setUp() throws Exception { - nodes.clear(); - sharedBackingStore.clear(); - } - - public HaMgmtNode newNode(Duration persistOrRebindPeriod) throws Exception { - HaMgmtNode node = new HaMgmtNode(); - node.persistOrRebindPeriod = persistOrRebindPeriod; - node.setUp(); - nodes.add(node); - return node; - } - - @AfterMethod(alwaysRun=true) - public void tearDown() throws Exception { - for (HaMgmtNode n: nodes) - n.tearDownAll(); - } - - protected ManagementContextInternal newLocalManagementContext() { - return new LocalManagementContextForTests(); - } - - protected PersistenceObjectStore newPersistenceObjectStore() { - return new InMemoryObjectStore(sharedBackingStore, sharedBackingStoreDates); - } - - private HaMgmtNode createMaster(Duration persistOrRebindPeriod) throws Exception { - HaMgmtNode n1 = newNode(persistOrRebindPeriod); - n1.ha.start(HighAvailabilityMode.AUTO); - assertEquals(n1.ha.getNodeState(), ManagementNodeState.MASTER); - return n1; - } - - private HaMgmtNode createHotStandby(Duration rebindPeriod) throws Exception { - HaMgmtNode n2 = newNode(rebindPeriod); - n2.ha.start(HighAvailabilityMode.HOT_STANDBY); - assertEquals(n2.ha.getNodeState(), ManagementNodeState.HOT_STANDBY); - return n2; - } - - private TestApplication createFirstAppAndPersist(HaMgmtNode n1) throws Exception { - TestApplication app = TestApplication.Factory.newManagedInstanceForTests(n1.mgmt); - // for testing without enrichers, if desired: -// TestApplication app = ApplicationBuilder.newManagedApp(EntitySpec.create(TestApplication.class).impl(TestApplicationNoEnrichersImpl.class), n1.mgmt); - app.setDisplayName("First App"); - app.start(MutableList.<Location>of()); - app.config().set(TestEntity.CONF_NAME, "first-app"); - app.setAttribute(TestEntity.SEQUENCE, 3); - - forcePersistNow(n1); - return app; - } - - protected void forcePersistNow(HaMgmtNode n1) { - n1.mgmt.getRebindManager().forcePersistNow(false, null); - } - - private Application expectRebindSequenceNumber(HaMgmtNode master, HaMgmtNode hotStandby, Application app, int expectedSensorSequenceValue, boolean immediate) { - Application appRO = hotStandby.mgmt.lookup(app.getId(), Application.class); - - if (immediate) { - forcePersistNow(master); - forceRebindNow(hotStandby); - EntityTestUtils.assertAttributeEquals(appRO, TestEntity.SEQUENCE, expectedSensorSequenceValue); - } else { - EntityTestUtils.assertAttributeEqualsEventually(appRO, TestEntity.SEQUENCE, expectedSensorSequenceValue); - } - - log.info("got sequence number "+expectedSensorSequenceValue+" from "+appRO); - - // make sure the instance (proxy) is unchanged - Application appRO2 = hotStandby.mgmt.lookup(app.getId(), Application.class); - Assert.assertTrue(appRO2==appRO); - - return appRO; - } - - private void forceRebindNow(HaMgmtNode hotStandby) { - hotStandby.mgmt.getRebindManager().rebind(null, null, ManagementNodeState.HOT_STANDBY); - } - - @Test - public void testHotStandbySeesInitialCustomNameConfigAndSensorValueButDoesntAllowChange() throws Exception { - HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER); - TestApplication app = createFirstAppAndPersist(n1); - HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER); - - assertEquals(n2.mgmt.getApplications().size(), 1); - Application appRO = n2.mgmt.lookup(app.getId(), Application.class); - Assert.assertNotNull(appRO); - Assert.assertTrue(appRO instanceof TestApplication); - assertEquals(appRO.getDisplayName(), "First App"); - assertEquals(appRO.getConfig(TestEntity.CONF_NAME), "first-app"); - assertEquals(appRO.getAttribute(TestEntity.SEQUENCE), (Integer)3); - - try { - ((TestApplication)appRO).setAttribute(TestEntity.SEQUENCE, 4); - Assert.fail("Should not have allowed sensor to be set"); - } catch (Exception e) { - Assert.assertTrue(e.toString().toLowerCase().contains("read-only"), "Error message did not contain expected text: "+e); - } - assertEquals(appRO.getAttribute(TestEntity.SEQUENCE), (Integer)3); - } - - @Test - public void testHotStandbySeesChangesToNameConfigAndSensorValue() throws Exception { - HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER); - TestApplication app = createFirstAppAndPersist(n1); - HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER); - - assertEquals(n2.mgmt.getApplications().size(), 1); - Application appRO = n2.mgmt.lookup(app.getId(), Application.class); - Assert.assertNotNull(appRO); - assertEquals(appRO.getChildren().size(), 0); - - // test changes - - app.setDisplayName("First App Renamed"); - app.config().set(TestEntity.CONF_NAME, "first-app-renamed"); - app.setAttribute(TestEntity.SEQUENCE, 4); - - appRO = expectRebindSequenceNumber(n1, n2, app, 4, true); - assertEquals(n2.mgmt.getEntityManager().getEntities().size(), 1); - assertEquals(appRO.getDisplayName(), "First App Renamed"); - assertEquals(appRO.getConfig(TestEntity.CONF_NAME), "first-app-renamed"); - - // and change again for good measure! - - app.setDisplayName("First App"); - app.config().set(TestEntity.CONF_NAME, "first-app-restored"); - app.setAttribute(TestEntity.SEQUENCE, 5); - - appRO = expectRebindSequenceNumber(n1, n2, app, 5, true); - assertEquals(n2.mgmt.getEntityManager().getEntities().size(), 1); - assertEquals(appRO.getDisplayName(), "First App"); - assertEquals(appRO.getConfig(TestEntity.CONF_NAME), "first-app-restored"); - } - - - public void testHotStandbySeesStructuralChangesIncludingRemoval() throws Exception { - doTestHotStandbySeesStructuralChangesIncludingRemoval(true); - } - - @Test(groups="Integration") // due to time (it waits for background persistence) - public void testHotStandbyUnforcedSeesStructuralChangesIncludingRemoval() throws Exception { - doTestHotStandbySeesStructuralChangesIncludingRemoval(false); - } - - public void doTestHotStandbySeesStructuralChangesIncludingRemoval(boolean immediate) throws Exception { - HaMgmtNode n1 = createMaster(immediate ? Duration.PRACTICALLY_FOREVER : Duration.millis(200)); - TestApplication app = createFirstAppAndPersist(n1); - HaMgmtNode n2 = createHotStandby(immediate ? Duration.PRACTICALLY_FOREVER : Duration.millis(200)); - - assertEquals(n2.mgmt.getApplications().size(), 1); - Application appRO = n2.mgmt.lookup(app.getId(), Application.class); - Assert.assertNotNull(appRO); - assertEquals(appRO.getChildren().size(), 0); - - // test additions - new child, new app - - TestEntity child = app.addChild(EntitySpec.create(TestEntity.class).configure(TestEntity.CONF_NAME, "first-child")); - Entities.manage(child); - TestApplication app2 = TestApplication.Factory.newManagedInstanceForTests(n1.mgmt); - app2.config().set(TestEntity.CONF_NAME, "second-app"); - - app.setAttribute(TestEntity.SEQUENCE, 4); - appRO = expectRebindSequenceNumber(n1, n2, app, 4, immediate); - - assertEquals(appRO.getChildren().size(), 1); - Entity childRO = Iterables.getOnlyElement(appRO.getChildren()); - assertEquals(childRO.getId(), child.getId()); - assertEquals(childRO.getConfig(TestEntity.CONF_NAME), "first-child"); - - assertEquals(n2.mgmt.getApplications().size(), 2); - Application app2RO = n2.mgmt.lookup(app2.getId(), Application.class); - Assert.assertNotNull(app2RO); - assertEquals(app2RO.getConfig(TestEntity.CONF_NAME), "second-app"); - - assertEquals(n2.mgmt.getEntityManager().getEntities().size(), 3); - - // now test removals - - Entities.unmanage(child); - Entities.unmanage(app2); - - app.setAttribute(TestEntity.SEQUENCE, 5); - appRO = expectRebindSequenceNumber(n1, n2, app, 5, immediate); - - EntityTestUtils.assertAttributeEqualsEventually(appRO, TestEntity.SEQUENCE, 5); - assertEquals(n2.mgmt.getEntityManager().getEntities().size(), 1); - assertEquals(appRO.getChildren().size(), 0); - assertEquals(n2.mgmt.getApplications().size(), 1); - Assert.assertNull(n2.mgmt.lookup(app2.getId(), Application.class)); - Assert.assertNull(n2.mgmt.lookup(child.getId(), Application.class)); - } - - @Test(groups="Integration", invocationCount=50) - public void testHotStandbySeesStructuralChangesIncludingRemovalManyTimes() throws Exception { - doTestHotStandbySeesStructuralChangesIncludingRemoval(true); - } - - Deque<Long> usedMemory = new ArrayDeque<Long>(); - protected long noteUsedMemory(String message) { - Time.sleep(Duration.millis(200)); - for (HaMgmtNode n: nodes) { - ((AbstractManagementContext)n.mgmt).getGarbageCollector().gcIteration(); - } - System.gc(); System.gc(); - Time.sleep(Duration.millis(50)); - System.gc(); System.gc(); - long mem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); - usedMemory.addLast(mem); - log.info("Memory used - "+message+": "+ByteSizeStrings.java().apply(mem)); - return mem; - } - public void assertUsedMemoryLessThan(String event, long max) { - noteUsedMemory(event); - long nowUsed = usedMemory.peekLast(); - if (nowUsed > max) { - // aggressively try to force GC - Time.sleep(Duration.ONE_SECOND); - usedMemory.removeLast(); - noteUsedMemory(event+" (extra GC)"); - nowUsed = usedMemory.peekLast(); - if (nowUsed > max) { - Assert.fail("Too much memory used - "+ByteSizeStrings.java().apply(nowUsed)+" > max "+ByteSizeStrings.java().apply(max)); - } - } - } - public void assertUsedMemoryMaxDelta(String event, long deltaMegabytes) { - assertUsedMemoryLessThan(event, usedMemory.peekLast() + deltaMegabytes*1024*1024); - } - - @Test(groups="Integration") - public void testHotStandbyDoesNotLeakLotsOfRebinds() throws Exception { - log.info("Starting test "+JavaClassNames.niceClassAndMethod()); - final int DELTA = 2; - - HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER); - TestApplication app = createFirstAppAndPersist(n1); - long initialUsed = noteUsedMemory("Created app"); - - HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER); - assertUsedMemoryMaxDelta("Standby created", DELTA); - - forcePersistNow(n1); - forceRebindNow(n2); - assertUsedMemoryMaxDelta("Persisted and rebinded once", DELTA); - - for (int i=0; i<10; i++) { - forcePersistNow(n1); - forceRebindNow(n2); - } - assertUsedMemoryMaxDelta("Persisted and rebinded 10x", DELTA); - - for (int i=0; i<1000; i++) { - if ((i+1)%100==0) { - noteUsedMemory("iteration "+(i+1)); - usedMemory.removeLast(); - } - forcePersistNow(n1); - forceRebindNow(n2); - } - assertUsedMemoryMaxDelta("Persisted and rebinded 1000x", DELTA); - - Entities.unmanage(app); - forcePersistNow(n1); - forceRebindNow(n2); - - assertUsedMemoryLessThan("And now all unmanaged", initialUsed + DELTA*1000*1000); - } - - static class BigObject { - public BigObject(int sizeBytes) { array = new byte[sizeBytes]; } - byte[] array; - } - - @Test(groups="Integration") - public void testHotStandbyDoesNotLeakBigObjects() throws Exception { - log.info("Starting test "+JavaClassNames.niceClassAndMethod()); - final int SIZE = 5; - final int SIZE_UP_BOUND = SIZE+2; - final int SIZE_DOWN_BOUND = SIZE-1; - final int GRACE = 2; - // the XML persistence uses a lot of space, we approx at between 2x and 3c - final int SIZE_IN_XML = 3*SIZE; - final int SIZE_IN_XML_DOWN = 2*SIZE; - - HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER); - TestApplication app = createFirstAppAndPersist(n1); - noteUsedMemory("Finished seeding"); - Long initialUsed = usedMemory.peekLast(); - app.config().set(TestEntity.CONF_OBJECT, new BigObject(SIZE*1000*1000)); - assertUsedMemoryMaxDelta("Set a big config object", SIZE_UP_BOUND); - forcePersistNow(n1); - assertUsedMemoryMaxDelta("Persisted a big config object", SIZE_IN_XML); - - HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER); - forceRebindNow(n2); - assertUsedMemoryMaxDelta("Rebinded", SIZE_UP_BOUND); - - for (int i=0; i<10; i++) - forceRebindNow(n2); - assertUsedMemoryMaxDelta("Several more rebinds", GRACE); - for (int i=0; i<10; i++) { - forcePersistNow(n1); - forceRebindNow(n2); - } - assertUsedMemoryMaxDelta("And more rebinds and more persists", GRACE); - - app.config().set(TestEntity.CONF_OBJECT, "big is now small"); - assertUsedMemoryMaxDelta("Big made small at primary", -SIZE_DOWN_BOUND); - forcePersistNow(n1); - assertUsedMemoryMaxDelta("And persisted", -SIZE_IN_XML_DOWN); - - forceRebindNow(n2); - assertUsedMemoryMaxDelta("And at secondary", -SIZE_DOWN_BOUND); - - Entities.unmanage(app); - forcePersistNow(n1); - forceRebindNow(n2); - - assertUsedMemoryLessThan("And now all unmanaged", initialUsed+GRACE*1000*1000); - } - - @Test(groups="Integration") // because it's slow - // Sept 2014 - there is a small leak, of 200 bytes per child created and destroyed; - // but this goes away when the app is destroyed; it may be a benign record - public void testHotStandbyDoesNotLeakLotsOfRebindsCreatingAndDestroyingAChildEntity() throws Exception { - log.info("Starting test "+JavaClassNames.niceClassAndMethod()); - final int DELTA = 2; - - HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER); - TestApplication app = createFirstAppAndPersist(n1); - long initialUsed = noteUsedMemory("Created app"); - - HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER); - assertUsedMemoryMaxDelta("Standby created", DELTA); - - TestEntity lastChild = app.addChild(EntitySpec.create(TestEntity.class).configure(TestEntity.CONF_NAME, "first-child")); - Entities.manage(lastChild); - forcePersistNow(n1); - forceRebindNow(n2); - assertUsedMemoryMaxDelta("Child created and rebinded once", DELTA); - - for (int i=0; i<1000; i++) { - if (i==9 || (i+1)%100==0) { - noteUsedMemory("iteration "+(i+1)); - usedMemory.removeLast(); - } - TestEntity newChild = app.addChild(EntitySpec.create(TestEntity.class).configure(TestEntity.CONF_NAME, "first-child")); - Entities.manage(newChild); - Entities.unmanage(lastChild); - lastChild = newChild; - - forcePersistNow(n1); - forceRebindNow(n2); - } - assertUsedMemoryMaxDelta("Persisted and rebinded 1000x", DELTA); - - Entities.unmanage(app); - forcePersistNow(n1); - forceRebindNow(n2); - - assertUsedMemoryLessThan("And now all unmanaged", initialUsed + DELTA*1000*1000); - } - - protected void assertHotStandby(HaMgmtNode n1) { - assertEquals(n1.ha.getNodeState(), ManagementNodeState.HOT_STANDBY); - Assert.assertTrue(n1.rebinder().isReadOnlyRunning()); - Assert.assertFalse(n1.rebinder().isPersistenceRunning()); - } - - protected void assertMaster(HaMgmtNode n1) { - assertEquals(n1.ha.getNodeState(), ManagementNodeState.MASTER); - Assert.assertFalse(n1.rebinder().isReadOnlyRunning()); - Assert.assertTrue(n1.rebinder().isPersistenceRunning()); - } - - @Test - public void testChangeMode() throws Exception { - HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER); - TestApplication app = createFirstAppAndPersist(n1); - HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER); - - TestEntity child = app.addChild(EntitySpec.create(TestEntity.class).configure(TestEntity.CONF_NAME, "first-child")); - Entities.manage(child); - TestApplication app2 = TestApplication.Factory.newManagedInstanceForTests(n1.mgmt); - app2.config().set(TestEntity.CONF_NAME, "second-app"); - - forcePersistNow(n1); - n2.ha.setPriority(1); - n1.ha.changeMode(HighAvailabilityMode.HOT_STANDBY); - - // both now hot standby - assertHotStandby(n1); - assertHotStandby(n2); - - assertEquals(n1.mgmt.getApplications().size(), 2); - Application app2RO = n1.mgmt.lookup(app2.getId(), Application.class); - Assert.assertNotNull(app2RO); - assertEquals(app2RO.getConfig(TestEntity.CONF_NAME), "second-app"); - try { - ((TestApplication)app2RO).setAttribute(TestEntity.SEQUENCE, 4); - Assert.fail("Should not have allowed sensor to be set"); - } catch (Exception e) { - Assert.assertTrue(e.toString().toLowerCase().contains("read-only"), "Error message did not contain expected text: "+e); - } - - n1.ha.changeMode(HighAvailabilityMode.AUTO); - n2.ha.changeMode(HighAvailabilityMode.HOT_STANDBY, true, false); - // both still hot standby (n1 will defer to n2 as it has higher priority) - assertHotStandby(n1); - assertHotStandby(n2); - - // with priority 1, n2 will now be elected - n2.ha.changeMode(HighAvailabilityMode.AUTO); - assertHotStandby(n1); - assertMaster(n2); - - assertEquals(n2.mgmt.getApplications().size(), 2); - Application app2B = n2.mgmt.lookup(app2.getId(), Application.class); - Assert.assertNotNull(app2B); - assertEquals(app2B.getConfig(TestEntity.CONF_NAME), "second-app"); - ((TestApplication)app2B).setAttribute(TestEntity.SEQUENCE, 4); - - forcePersistNow(n2); - forceRebindNow(n1); - Application app2BRO = n1.mgmt.lookup(app2.getId(), Application.class); - Assert.assertNotNull(app2BRO); - EntityTestUtils.assertAttributeEquals(app2BRO, TestEntity.SEQUENCE, 4); - } - - @Test(groups="Integration", invocationCount=20) - public void testChangeModeManyTimes() throws Exception { - testChangeMode(); - } - - @Test - public void testChangeModeToDisabledAndBack() throws Exception { - HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER); - n1.mgmt.getLocationManager().createLocation(LocationSpec.create(LocalhostMachine.class)); - @SuppressWarnings("unused") - TestApplication app = createFirstAppAndPersist(n1); - - HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER); - - // disabled n1 allows n2 to become master when next we tell it to check - n1.ha.changeMode(HighAvailabilityMode.DISABLED); - n2.ha.changeMode(HighAvailabilityMode.AUTO); - assertMaster(n2); - assertEquals(n1.ha.getNodeState(), ManagementNodeState.FAILED); - Assert.assertTrue(n1.mgmt.getApplications().isEmpty(), "n1 should have had no apps; instead had: "+n1.mgmt.getApplications()); - Assert.assertTrue(n1.mgmt.getEntityManager().getEntities().isEmpty(), "n1 should have had no entities; instead had: "+n1.mgmt.getEntityManager().getEntities()); - Assert.assertTrue(n1.mgmt.getLocationManager().getLocations().isEmpty(), "n1 should have had no locations; instead had: "+n1.mgmt.getLocationManager().getLocations()); - - // we can now change n1 back to hot_standby - n1.ha.changeMode(HighAvailabilityMode.HOT_STANDBY); - assertHotStandby(n1); - // and it sees apps - Assert.assertFalse(n1.mgmt.getApplications().isEmpty(), "n1 should have had apps now"); - Assert.assertFalse(n1.mgmt.getLocationManager().getLocations().isEmpty(), "n1 should have had locations now"); - // and if n2 is disabled, n1 promotes - n2.ha.changeMode(HighAvailabilityMode.DISABLED); - n1.ha.changeMode(HighAvailabilityMode.AUTO); - assertMaster(n1); - assertEquals(n2.ha.getNodeState(), ManagementNodeState.FAILED); - } - - @Test - public void testHotStandbyDoesNotStartFeeds() throws Exception { - HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER); - TestApplication app = createFirstAppAndPersist(n1); - TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class).impl(MyEntityWithFunctionFeedImpl.class)); - forcePersistNow(n1); - Assert.assertTrue(entity.feeds().getFeeds().size() > 0, "Feeds: "+entity.feeds().getFeeds()); - for (Feed feed : entity.feeds().getFeeds()) { - assertTrue(feed.isRunning(), "Feed expected running, but it is non-running"); - } - - HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER); - TestEntity entityRO = (TestEntity) n2.mgmt.lookup(entity.getId(), Entity.class); - Assert.assertTrue(entityRO.feeds().getFeeds().size() > 0, "Feeds: "+entity.feeds().getFeeds()); - for (Feed feedRO : entityRO.feeds().getFeeds()) { - assertFalse(feedRO.isRunning(), "Feed expected non-active, but it is running"); - } - } - - @Test(groups="Integration") - public void testHotStandbyDoesNotStartFeedsRebindingManyTimes() throws Exception { - testHotStandbyDoesNotStartFeeds(); - final HaMgmtNode hsb = createHotStandby(Duration.millis(10)); - Repeater.create("until 10 rebinds").every(Duration.millis(100)).until( - new Callable<Boolean>() { - @Override - public Boolean call() throws Exception { - return ((RebindManagerImpl)hsb.mgmt.getRebindManager()).getReadOnlyRebindCount() >= 10; - } - }).runRequiringTrue(); - // make sure not too many tasks (allowing 5 for rebind etc; currently just 2) - RebindTestFixture.waitForTaskCountToBecome(hsb.mgmt, 5); - } - - @Test(groups="Integration") - public void testHotStandbyDoesNotStartFeedsRebindingManyTimesWithAnotherFeedGenerator() throws Exception { - HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER); - TestApplication app = createFirstAppAndPersist(n1); - TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class).impl(MyEntityWithNewFeedsEachTimeImpl.class)); - forcePersistNow(n1); - Assert.assertTrue(entity.feeds().getFeeds().size() == 4, "Feeds: "+entity.feeds().getFeeds()); - - final HaMgmtNode hsb = createHotStandby(Duration.millis(10)); - Repeater.create("until 10 rebinds").every(Duration.millis(100)).until( - new Callable<Boolean>() { - @Override - public Boolean call() throws Exception { - return ((RebindManagerImpl)hsb.mgmt.getRebindManager()).getReadOnlyRebindCount() >= 10; - } - }).runRequiringTrue(); - // make sure not too many tasks (allowing 5 for rebind etc; currently just 2) - RebindTestFixture.waitForTaskCountToBecome(hsb.mgmt, 5); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/brooklyn/management/ha/ImmutableManagementPlaneSyncRecord.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/management/ha/ImmutableManagementPlaneSyncRecord.java b/core/src/test/java/brooklyn/management/ha/ImmutableManagementPlaneSyncRecord.java deleted file mode 100644 index 10c47f9..0000000 --- a/core/src/test/java/brooklyn/management/ha/ImmutableManagementPlaneSyncRecord.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.management.ha; - -import java.util.Map; - -import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord; -import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord; - -import com.google.common.base.Objects; -import com.google.common.collect.ImmutableMap; - -public class ImmutableManagementPlaneSyncRecord implements ManagementPlaneSyncRecord { - private final String masterNodeId; - private final Map<String, ManagementNodeSyncRecord> managementNodes; - - ImmutableManagementPlaneSyncRecord(String masterNodeId, Map<String, ManagementNodeSyncRecord> nodes) { - this.masterNodeId = masterNodeId; - this.managementNodes = ImmutableMap.copyOf(nodes); - } - - @Override - public String getMasterNodeId() { - return masterNodeId; - } - - @Override - public Map<String, ManagementNodeSyncRecord> getManagementNodes() { - return managementNodes; - } - - @Override - public String toString() { - return Objects.toStringHelper(this).add("master", masterNodeId).add("nodes", managementNodes.keySet()).toString(); - } - - @Override - public String toVerboseString() { - return Objects.toStringHelper(this).add("master", masterNodeId).add("nodes", managementNodes).toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/brooklyn/management/ha/ManagementPlaneSyncRecordPersisterInMemory.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/management/ha/ManagementPlaneSyncRecordPersisterInMemory.java b/core/src/test/java/brooklyn/management/ha/ManagementPlaneSyncRecordPersisterInMemory.java deleted file mode 100644 index bc8b418..0000000 --- a/core/src/test/java/brooklyn/management/ha/ManagementPlaneSyncRecordPersisterInMemory.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.management.ha; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.IOException; -import java.util.concurrent.TimeoutException; - -import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord; -import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord; -import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.rebind.persister.InMemoryObjectStore; -import brooklyn.util.time.Duration; - -import com.google.common.annotations.VisibleForTesting; - -/** @deprecated since 0.7.0 use {@link ManagementPlaneSyncRecordPersisterToObjectStore} - * with {@link InMemoryObjectStore} - * <code> - * new ManagementPlaneSyncRecordPersisterToObjectStore(new InMemoryObjectStore(), classLoader) - * </code> */ -@Deprecated -public class ManagementPlaneSyncRecordPersisterInMemory implements ManagementPlaneSyncRecordPersister { - - private static final Logger LOG = LoggerFactory.getLogger(ManagementPlaneSyncRecordPersisterInMemory.class); - - private final MutableManagementPlaneSyncRecord memento = new MutableManagementPlaneSyncRecord(); - - private volatile boolean running = true; - - @Override - public synchronized void stop() { - running = false; - } - - @Override - public ManagementPlaneSyncRecord loadSyncRecord() throws IOException { - if (!running) { - throw new IllegalStateException("Persister not running; cannot load memento"); - } - - return memento.snapshot(); - } - - @VisibleForTesting - @Override - public synchronized void waitForWritesCompleted(Duration timeout) throws InterruptedException, TimeoutException { - // The synchronized is sufficient - guarantee that no concurrent calls - return; - } - - @Override - public synchronized void delta(Delta delta) { - if (!running) { - if (LOG.isDebugEnabled()) LOG.debug("Persister not running; ignoring checkpointed delta of manager-memento"); - return; - } - - for (ManagementNodeSyncRecord m : delta.getNodes()) { - memento.addNode(m); - } - for (String id : delta.getRemovedNodeIds()) { - memento.deleteNode(id); - } - switch (delta.getMasterChange()) { - case NO_CHANGE: - break; // no-op - case SET_MASTER: - memento.setMasterNodeId(checkNotNull(delta.getNewMasterOrNull())); - break; - case CLEAR_MASTER: - memento.setMasterNodeId(null); - break; // no-op - default: - throw new IllegalStateException("Unknown state for master-change: "+delta.getMasterChange()); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/brooklyn/management/ha/MasterChooserTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/management/ha/MasterChooserTest.java b/core/src/test/java/brooklyn/management/ha/MasterChooserTest.java deleted file mode 100644 index 14804b1..0000000 --- a/core/src/test/java/brooklyn/management/ha/MasterChooserTest.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.management.ha; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNull; - -import java.util.List; - -import org.apache.brooklyn.api.management.ha.ManagementNodeState; -import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import brooklyn.BrooklynVersion; -import brooklyn.entity.basic.EntityFunctions; -import brooklyn.entity.rebind.plane.dto.BasicManagementNodeSyncRecord; -import brooklyn.management.ha.BasicMasterChooser.AlphabeticMasterChooser; -import brooklyn.management.ha.BasicMasterChooser.ScoredRecord; -import brooklyn.util.time.Duration; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; - -public class MasterChooserTest { - - private MutableManagementPlaneSyncRecord memento; - private AlphabeticMasterChooser chooser; - private long now; - - @BeforeMethod(alwaysRun=true) - public void setUp() throws Exception { - memento = new MutableManagementPlaneSyncRecord(); - chooser = new AlphabeticMasterChooser(); - now = System.currentTimeMillis(); - } - - @Test - public void testChoosesFirstAlphanumeric() throws Exception { - memento.addNode(newManagerMemento("node1", ManagementNodeState.STANDBY, now)); - memento.addNode(newManagerMemento("node2", ManagementNodeState.STANDBY, now)); - memento.addNode(newManagerMemento("node3", ManagementNodeState.STANDBY, now)); - Duration heartbeatTimeout = Duration.THIRTY_SECONDS; - String ownNodeId = "node2"; - assertEquals(chooser.choose(memento, heartbeatTimeout, ownNodeId).getNodeId(), "node1"); - } - - @Test - public void testReturnsNullIfNoValid() throws Exception { - memento.addNode(newManagerMemento("node1", ManagementNodeState.STANDBY, now - 31*1000)); - Duration heartbeatTimeout = Duration.THIRTY_SECONDS; - assertNull(chooser.choose(memento, heartbeatTimeout, "node2")); - } - - @Test - public void testFiltersOutByHeartbeat() throws Exception { - memento.addNode(newManagerMemento("node1", ManagementNodeState.STANDBY, now - 31*1000)); - memento.addNode(newManagerMemento("node2", ManagementNodeState.STANDBY, now - 20*1000)); - memento.addNode(newManagerMemento("node3", ManagementNodeState.STANDBY, now)); - Duration heartbeatTimeout = Duration.THIRTY_SECONDS; - assertEquals(getIds(chooser.sort(chooser.filterHealthy(memento, heartbeatTimeout, now))), ImmutableList.of("node2", "node3")); - } - - protected static List<String> getIds(List<ScoredRecord<?>> filterHealthy) { - return ImmutableList.copyOf(Iterables.transform(filterHealthy, EntityFunctions.id())); - } - - @Test - public void testFiltersOutByStatusNotPreferringMaster() throws Exception { - assertEquals(doTestFiltersOutByStatus(false, false), ImmutableList.of("node4", "node5")); - } - @Test - public void testFiltersOutByStatusPreferringMaster() throws Exception { - assertEquals(doTestFiltersOutByStatus(true, false), ImmutableList.of("node5", "node4")); - } - - @Test - public void testFiltersOutByStatusNotPreferringHot() throws Exception { - assertEquals(doTestFiltersOutByStatus(false, true), ImmutableList.of("node4", "node5", "node6")); - } - @Test - public void testFiltersOutByStatusPreferringHot() throws Exception { - assertEquals(doTestFiltersOutByStatus(true, true), ImmutableList.of("node5", "node6", "node4")); - } - - protected List<String> doTestFiltersOutByStatus(boolean preferHot, boolean includeHot) throws Exception { - chooser = new AlphabeticMasterChooser(preferHot); - memento.addNode(newManagerMemento("node1", ManagementNodeState.FAILED, now)); - memento.addNode(newManagerMemento("node2", ManagementNodeState.TERMINATED, now)); - memento.addNode(newManagerMemento("node3", null, now)); - memento.addNode(newManagerMemento("node4", ManagementNodeState.STANDBY, now)); - memento.addNode(newManagerMemento("node5", ManagementNodeState.MASTER, now)); - if (includeHot) - memento.addNode(newManagerMemento("node6", ManagementNodeState.HOT_STANDBY, now)); - return getIds(chooser.sort(chooser.filterHealthy(memento, Duration.THIRTY_SECONDS, now))); - } - - @Test - public void testExplicityPriority() throws Exception { - chooser = new AlphabeticMasterChooser(); - memento.addNode(newManagerMemento("node1", ManagementNodeState.STANDBY, now, BrooklynVersion.get(), 2L)); - memento.addNode(newManagerMemento("node2", ManagementNodeState.STANDBY, now, BrooklynVersion.get(), -1L)); - memento.addNode(newManagerMemento("node3", ManagementNodeState.STANDBY, now, BrooklynVersion.get(), null)); - List<String> order = getIds(chooser.sort(chooser.filterHealthy(memento, Duration.THIRTY_SECONDS, now))); - assertEquals(order, ImmutableList.of("node1", "node3", "node2")); - } - - @Test - public void testVersionsMaybeNull() throws Exception { - chooser = new AlphabeticMasterChooser(); - memento.addNode(newManagerMemento("node1", ManagementNodeState.STANDBY, now, "v10", null)); - memento.addNode(newManagerMemento("node2", ManagementNodeState.STANDBY, now, "v3-snapshot", null)); - memento.addNode(newManagerMemento("node3", ManagementNodeState.STANDBY, now, "v3-snapshot", -1L)); - memento.addNode(newManagerMemento("node4", ManagementNodeState.STANDBY, now, "v3-snapshot", null)); - memento.addNode(newManagerMemento("node5", ManagementNodeState.STANDBY, now, "v3-stable", null)); - memento.addNode(newManagerMemento("node6", ManagementNodeState.STANDBY, now, "v1", null)); - memento.addNode(newManagerMemento("node7", ManagementNodeState.STANDBY, now, null, null)); - List<String> order = getIds(chooser.sort(chooser.filterHealthy(memento, Duration.THIRTY_SECONDS, now))); - assertEquals(order, ImmutableList.of("node1", "node5", "node6", "node2", "node4", "node7", "node3")); - } - - private ManagementNodeSyncRecord newManagerMemento(String nodeId, ManagementNodeState status, long timestamp) { - return newManagerMemento(nodeId, status, timestamp, BrooklynVersion.get(), null); - } - private ManagementNodeSyncRecord newManagerMemento(String nodeId, ManagementNodeState status, long timestamp, - String version, Long priority) { - return BasicManagementNodeSyncRecord.builder().brooklynVersion(version).nodeId(nodeId).status(status).localTimestamp(timestamp).remoteTimestamp(timestamp). - priority(priority).build(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/brooklyn/management/ha/MutableManagementPlaneSyncRecord.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/management/ha/MutableManagementPlaneSyncRecord.java b/core/src/test/java/brooklyn/management/ha/MutableManagementPlaneSyncRecord.java deleted file mode 100644 index a68cac3..0000000 --- a/core/src/test/java/brooklyn/management/ha/MutableManagementPlaneSyncRecord.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.management.ha; - -import java.util.Map; - -import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord; -import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord; - -import com.google.common.collect.Maps; - -public class MutableManagementPlaneSyncRecord implements ManagementPlaneSyncRecord { - private String masterNodeId; - private Map<String, ManagementNodeSyncRecord> managementNodes = Maps.newConcurrentMap(); - - @Override - public String getMasterNodeId() { - return masterNodeId; - } - - @Override - public Map<String, ManagementNodeSyncRecord> getManagementNodes() { - return managementNodes; - } - - @Override - public String toVerboseString() { - return toString(); - } - - public ImmutableManagementPlaneSyncRecord snapshot() { - return new ImmutableManagementPlaneSyncRecord(masterNodeId, managementNodes); - } - - public void setMasterNodeId(String masterNodeId) { - this.masterNodeId = masterNodeId; - } - - public void addNode(ManagementNodeSyncRecord memento) { - managementNodes.put(memento.getNodeId(), memento); - } - - public void deleteNode(String nodeId) { - managementNodes.remove(nodeId); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/brooklyn/management/ha/TestEntityFailingRebind.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/management/ha/TestEntityFailingRebind.java b/core/src/test/java/brooklyn/management/ha/TestEntityFailingRebind.java deleted file mode 100644 index a8cfced..0000000 --- a/core/src/test/java/brooklyn/management/ha/TestEntityFailingRebind.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.management.ha; - -import org.apache.brooklyn.test.entity.TestApplicationImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TestEntityFailingRebind extends TestApplicationImpl { - - private static final Logger LOG = LoggerFactory.getLogger(TestEntityFailingRebind.class); - - public static class RebindException extends RuntimeException { - private static final long serialVersionUID = 1L; - - public RebindException(String message) { - super(message); - } - } - - private static boolean throwOnRebind = true; - - public static void setThrowOnRebind(boolean state) { - throwOnRebind = state; - } - - public static boolean getThrowOnRebind() { - return throwOnRebind; - } - - @Override - public void rebind() { - if (throwOnRebind) { - LOG.warn("Throwing intentional exception to simulate failure of rebinding " + this); - throw new RebindException("Intentional exception thrown when rebinding " + this); - } - } - -}
