http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerSplitBrainTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerSplitBrainTest.java b/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerSplitBrainTest.java new file mode 100644 index 0000000..7e34071 --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerSplitBrainTest.java @@ -0,0 +1,474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.management.ha; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.brooklyn.api.entity.proxying.EntitySpec; +import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.api.management.ha.HighAvailabilityMode; +import org.apache.brooklyn.api.management.ha.ManagementNodeState; +import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord; +import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord; +import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister; +import org.apache.brooklyn.core.management.ha.HighAvailabilityManagerImpl; +import org.apache.brooklyn.core.management.ha.ManagementPlaneSyncRecordPersisterToObjectStore; +import org.apache.brooklyn.core.management.ha.TestEntityFailingRebind.RebindException; +import org.apache.brooklyn.core.management.internal.ManagementContextInternal; +import org.apache.brooklyn.test.entity.LocalManagementContextForTests; +import org.apache.brooklyn.test.entity.TestApplication; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.basic.ApplicationBuilder; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.rebind.PersistenceExceptionHandlerImpl; +import brooklyn.entity.rebind.persister.BrooklynMementoPersisterToObjectStore; +import brooklyn.entity.rebind.persister.InMemoryObjectStore; +import brooklyn.entity.rebind.persister.ListeningObjectStore; +import brooklyn.entity.rebind.persister.PersistMode; +import brooklyn.entity.rebind.persister.PersistenceObjectStore; +import brooklyn.internal.BrooklynFeatureEnablement; +import brooklyn.test.Asserts; +import brooklyn.util.collections.MutableList; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +import com.google.common.base.Stopwatch; +import com.google.common.base.Ticker; +import com.google.common.collect.ImmutableList; + +@Test +public class HighAvailabilityManagerSplitBrainTest { + + private static final Logger log = LoggerFactory.getLogger(HighAvailabilityManagerSplitBrainTest.class); + + private List<HaMgmtNode> nodes = new MutableList<HighAvailabilityManagerSplitBrainTest.HaMgmtNode>(); + Map<String,String> sharedBackingStore = MutableMap.of(); + Map<String,Date> sharedBackingStoreDates = MutableMap.of(); + private AtomicLong sharedTime; // used to set the ticker's return value + private ClassLoader classLoader = getClass().getClassLoader(); + + public class HaMgmtNode { + // TODO share with HotStandbyTest and WarmStandbyTest and a few others (minor differences but worth it ultimately) + + private ManagementContextInternal mgmt; + private String ownNodeId; + private String nodeName; + private ListeningObjectStore objectStore; + private ManagementPlaneSyncRecordPersister persister; + private HighAvailabilityManagerImpl ha; + private Ticker ticker; + private AtomicLong currentTime; // used to set the ticker's return value + + public void setUp() throws Exception { + if (sharedTime==null) + currentTime = new AtomicLong(System.currentTimeMillis()); + + ticker = new Ticker() { + // strictly not a ticker because returns millis UTC, but it works fine even so + @Override public long read() { + if (sharedTime!=null) return sharedTime.get(); + return currentTime.get(); + } + }; + + nodeName = "node "+nodes.size(); + mgmt = newLocalManagementContext(); + ownNodeId = mgmt.getManagementNodeId(); + objectStore = new ListeningObjectStore(newPersistenceObjectStore()); + objectStore.injectManagementContext(mgmt); + objectStore.prepareForSharedUse(PersistMode.CLEAN, HighAvailabilityMode.DISABLED); + persister = new ManagementPlaneSyncRecordPersisterToObjectStore(mgmt, objectStore, classLoader); + ((ManagementPlaneSyncRecordPersisterToObjectStore)persister).preferRemoteTimestampInMemento(); + BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(objectStore, mgmt.getBrooklynProperties(), classLoader); + mgmt.getRebindManager().setPersister(persisterObj, PersistenceExceptionHandlerImpl.builder().build()); + ha = ((HighAvailabilityManagerImpl)mgmt.getHighAvailabilityManager()) + .setPollPeriod(Duration.PRACTICALLY_FOREVER) + .setHeartbeatTimeout(Duration.THIRTY_SECONDS) + .setLocalTicker(ticker) + .setRemoteTicker(ticker) + .setPersister(persister); + log.info("Created "+nodeName+" "+ownNodeId); + } + + public void tearDown() throws Exception { + if (ha != null) ha.stop(); + if (mgmt != null) Entities.destroyAll(mgmt); + if (objectStore != null) objectStore.deleteCompletely(); + } + + private long tickerCurrentMillis() { + return ticker.read(); + } + + private long tickerAdvance(Duration duration) { + if (sharedTime!=null) + throw new IllegalStateException("Using shared ticker; cannot advance private node clock"); + currentTime.addAndGet(duration.toMilliseconds()); + return tickerCurrentMillis(); + } + + @Override + public String toString() { + return nodeName+" "+ownNodeId; + } + } + + private Boolean prevThrowOnRebind; + + @BeforeMethod(alwaysRun=true) + public void setUp() throws Exception { + prevThrowOnRebind = TestEntityFailingRebind.getThrowOnRebind(); + TestEntityFailingRebind.setThrowOnRebind(true); + nodes.clear(); + sharedBackingStore.clear(); + } + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + try { + for (HaMgmtNode n: nodes) + n.tearDown(); + } finally { + if (prevThrowOnRebind != null) TestEntityFailingRebind.setThrowOnRebind(prevThrowOnRebind); + } + } + + public HaMgmtNode newNode() throws Exception { + HaMgmtNode node = new HaMgmtNode(); + node.setUp(); + nodes.add(node); + return node; + } + + private void sharedTickerAdvance(Duration duration) { + if (sharedTime==null) { + for (HaMgmtNode n: nodes) + n.tickerAdvance(duration); + } else { + sharedTime.addAndGet(duration.toMilliseconds()); + } + } + + private long sharedTickerCurrentMillis() { + return sharedTime.get(); + } + + protected void useSharedTime() { + if (!nodes.isEmpty()) + throw new IllegalStateException("shared time must be set up before any nodes created"); + sharedTime = new AtomicLong(System.currentTimeMillis()); + } + + protected ManagementContextInternal newLocalManagementContext() { + return new LocalManagementContextForTests(); + } + + protected PersistenceObjectStore newPersistenceObjectStore() { + return new InMemoryObjectStore(sharedBackingStore, sharedBackingStoreDates); + } + + @Test + public void testDoubleRebindFails() throws Exception { + useSharedTime(); + HaMgmtNode n1 = newNode(); + HaMgmtNode n2 = newNode(); + + // first auto should become master + n1.ha.start(HighAvailabilityMode.AUTO); + n2.ha.start(HighAvailabilityMode.AUTO); + assertEquals(n1.ha.getNodeState(), ManagementNodeState.MASTER); + + TestApplication app = ApplicationBuilder.newManagedApp( + EntitySpec.create(TestApplication.class).impl(TestEntityFailingRebind.class), n1.mgmt); + app.start(ImmutableList.<Location>of()); + + n1.mgmt.getRebindManager().forcePersistNow(false, null); + + //don't publish state for a while (i.e. long store delays, failures) + sharedTickerAdvance(Duration.ONE_MINUTE); + + try { + n2.ha.publishAndCheck(false); + fail("n2 rebind failure expected"); + } catch (Exception e) { + assertNestedRebindException(e); + } + + // re-check should re-assert successfully, no rebind expected as he was previously master + n1.ha.publishAndCheck(false); + ManagementPlaneSyncRecord memento; + memento = n1.ha.loadManagementPlaneSyncRecord(true); + assertEquals(memento.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.MASTER); + assertEquals(memento.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.FAILED); + + // hot backup permitted by the TestEntityFailingRebind + n1.ha.changeMode(HighAvailabilityMode.HOT_BACKUP); + memento = n1.ha.loadManagementPlaneSyncRecord(true); + assertEquals(memento.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.HOT_BACKUP); + try { + n1.ha.changeMode(HighAvailabilityMode.MASTER); + fail("n1 rebind failure expected"); + } catch (Exception e) { + assertNestedRebindException(e); + } + + memento = n1.ha.loadManagementPlaneSyncRecord(true); + assertEquals(memento.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.FAILED); + assertEquals(memento.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.FAILED); + } + + @Test + public void testStandbyRebind() throws Exception { + useSharedTime(); + HaMgmtNode n1 = newNode(); + HaMgmtNode n2 = newNode(); + + // first auto should become master + n1.ha.start(HighAvailabilityMode.AUTO); + n2.ha.start(HighAvailabilityMode.AUTO); + + TestApplication app = ApplicationBuilder.newManagedApp( + EntitySpec.create(TestApplication.class).impl(TestEntityFailingRebind.class), n1.mgmt); + app.start(ImmutableList.<Location>of()); + + n1.mgmt.getRebindManager().forcePersistNow(false, null); + + //don't publish state for a while (i.e. long store delays, failures) + sharedTickerAdvance(Duration.ONE_MINUTE); + + try { + n2.ha.publishAndCheck(false); + fail("n2 rebind failure expected"); + } catch (Exception e) { + assertNestedRebindException(e); + } + + TestEntityFailingRebind.setThrowOnRebind(false); + n1.ha.publishAndCheck(false); + + ManagementPlaneSyncRecord memento = n1.ha.loadManagementPlaneSyncRecord(true); + assertEquals(memento.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.MASTER); + assertEquals(memento.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.FAILED); + } + + private void assertNestedRebindException(Throwable t) { + Throwable ptr = t; + while (ptr != null) { + if (ptr instanceof RebindException) { + return; + } + ptr = ptr.getCause(); + } + Exceptions.propagate(t); + } + + @Test + public void testIfNodeStopsBeingAbleToWrite() throws Exception { + useSharedTime(); + log.info("time at start "+sharedTickerCurrentMillis()); + + HaMgmtNode n1 = newNode(); + HaMgmtNode n2 = newNode(); + + // first auto should become master + n1.ha.start(HighAvailabilityMode.AUTO); + ManagementPlaneSyncRecord memento1 = n1.ha.loadManagementPlaneSyncRecord(true); + + log.info(n1+" HA: "+memento1); + assertEquals(memento1.getMasterNodeId(), n1.ownNodeId); + Long time0 = sharedTickerCurrentMillis(); + assertEquals(memento1.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(), time0); + assertEquals(memento1.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.MASTER); + + // second - make explicit hot; that's a strictly more complex case than cold standby, so provides pretty good coverage + n2.ha.start(HighAvailabilityMode.HOT_STANDBY); + ManagementPlaneSyncRecord memento2 = n2.ha.loadManagementPlaneSyncRecord(true); + + log.info(n2+" HA: "+memento2); + assertEquals(memento2.getMasterNodeId(), n1.ownNodeId); + assertEquals(memento2.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.MASTER); + assertEquals(memento2.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.HOT_STANDBY); + assertEquals(memento2.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(), time0); + assertEquals(memento2.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(), time0); + + // and no entities at either + assertEquals(n1.mgmt.getApplications().size(), 0); + assertEquals(n2.mgmt.getApplications().size(), 0); + + // create + TestApplication app = ApplicationBuilder.newManagedApp(EntitySpec.create(TestApplication.class), n1.mgmt); + app.start(ImmutableList.<Location>of()); + app.setAttribute(TestApplication.MY_ATTRIBUTE, "hello"); + + assertEquals(n1.mgmt.getApplications().size(), 1); + assertEquals(n2.mgmt.getApplications().size(), 0); + log.info("persisting "+n1.ownNodeId); + n1.mgmt.getRebindManager().forcePersistNow(false, null); + + n1.objectStore.setWritesFailSilently(true); + log.info(n1+" writes off"); + sharedTickerAdvance(Duration.ONE_MINUTE); + log.info("time now "+sharedTickerCurrentMillis()); + Long time1 = sharedTickerCurrentMillis(); + + log.info("publish "+n2.ownNodeId); + n2.ha.publishAndCheck(false); + ManagementPlaneSyncRecord memento2b = n2.ha.loadManagementPlaneSyncRecord(true); + log.info(n2+" HA now: "+memento2b); + + // n2 infers n1 as failed + assertEquals(memento2b.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.FAILED); + assertEquals(memento2b.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.MASTER); + assertEquals(memento2b.getMasterNodeId(), n2.ownNodeId); + assertEquals(memento2b.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(), time0); + assertEquals(memento2b.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(), time1); + + assertEquals(n1.mgmt.getApplications().size(), 1); + assertEquals(n2.mgmt.getApplications().size(), 1); + assertEquals(n1.mgmt.getApplications().iterator().next().getAttribute(TestApplication.MY_ATTRIBUTE), "hello"); + + n1.objectStore.setWritesFailSilently(false); + log.info(n1+" writes on"); + + sharedTickerAdvance(Duration.ONE_SECOND); + log.info("time now "+sharedTickerCurrentMillis()); + Long time2 = sharedTickerCurrentMillis(); + + log.info("publish "+n1.ownNodeId); + n1.ha.publishAndCheck(false); + ManagementPlaneSyncRecord memento1b = n1.ha.loadManagementPlaneSyncRecord(true); + log.info(n1+" HA now: "+memento1b); + + ManagementNodeState expectedStateAfterDemotion = BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_DEFAULT_STANDBY_IS_HOT_PROPERTY) ? + ManagementNodeState.HOT_STANDBY : ManagementNodeState.STANDBY; + + // n1 comes back and demotes himself + assertEquals(memento1b.getManagementNodes().get(n1.ownNodeId).getStatus(), expectedStateAfterDemotion); + assertEquals(memento1b.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.MASTER); + assertEquals(memento1b.getMasterNodeId(), n2.ownNodeId); + assertEquals(memento1b.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(), time2); + assertEquals(memento1b.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(), time1); + + // n2 now sees itself as master, with n1 in standby again + ManagementPlaneSyncRecord memento2c = n2.ha.loadManagementPlaneSyncRecord(true); + log.info(n2+" HA now: "+memento2c); + assertEquals(memento2c.getManagementNodes().get(n1.ownNodeId).getStatus(), expectedStateAfterDemotion); + assertEquals(memento2c.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.MASTER); + assertEquals(memento2c.getMasterNodeId(), n2.ownNodeId); + assertEquals(memento2c.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(), time2); + assertEquals(memento2c.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(), time2); + + // right number of entities at n2; n1 may or may not depending whether hot standby is default + assertEquals(n2.mgmt.getApplications().size(), 1); + assertEquals(n1.mgmt.getApplications().size(), BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_DEFAULT_STANDBY_IS_HOT_PROPERTY) ? 1 : 0); + } + + @Test(invocationCount=50, groups="Integration") + public void testIfNodeStopsBeingAbleToWriteManyTimes() throws Exception { + testIfNodeStopsBeingAbleToWrite(); + } + + @Test + public void testSimultaneousStartup() throws Exception { + doTestConcurrentStartup(5, null); + } + + @Test + public void testNearSimultaneousStartup() throws Exception { + doTestConcurrentStartup(20, Duration.millis(20)); + } + + @Test(invocationCount=50, groups="Integration") + public void testNearSimultaneousStartupManyTimes() throws Exception { + doTestConcurrentStartup(20, Duration.millis(20)); + } + + protected void doTestConcurrentStartup(int size, final Duration staggerStart) throws Exception { + useSharedTime(); + + List<Thread> spawned = MutableList.of(); + for (int i=0; i<size; i++) { + final HaMgmtNode n = newNode(); + Thread t = new Thread() { public void run() { + if (staggerStart!=null) Time.sleep(staggerStart.multiply(Math.random())); + n.ha.start(HighAvailabilityMode.AUTO); + n.ha.setPollPeriod(Duration.millis(20)); + } }; + spawned.add(t); + t.start(); + } + + try { + final Stopwatch timer = Stopwatch.createStarted(); + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + ManagementPlaneSyncRecord memento = nodes.get(0).ha.loadManagementPlaneSyncRecord(true); + List<ManagementNodeState> counts = MutableList.of(), savedCounts = MutableList.of(); + for (HaMgmtNode n: nodes) { + counts.add(n.ha.getNodeState()); + ManagementNodeSyncRecord m = memento.getManagementNodes().get(n.ownNodeId); + if (m!=null) { + savedCounts.add(m.getStatus()); + } + } + log.info("while starting "+nodes.size()+" nodes: " + +Collections.frequency(counts, ManagementNodeState.MASTER)+" M + " + +Collections.frequency(counts, ManagementNodeState.HOT_STANDBY)+" hot + " + +Collections.frequency(counts, ManagementNodeState.STANDBY)+" warm + " + +Collections.frequency(counts, ManagementNodeState.INITIALIZING)+" init; " + + memento.getManagementNodes().size()+" saved, " + +Collections.frequency(savedCounts, ManagementNodeState.MASTER)+" M + " + +Collections.frequency(savedCounts, ManagementNodeState.HOT_STANDBY)+" hot + " + +Collections.frequency(savedCounts, ManagementNodeState.STANDBY)+" warm + " + +Collections.frequency(savedCounts, ManagementNodeState.INITIALIZING)+" init"); + + if (timer.isRunning() && Duration.of(timer).compareTo(Duration.TEN_SECONDS)>0) { + log.warn("we seem to have a problem stabilizing"); //handy place to set a suspend-VM breakpoint! + timer.stop(); + } + assertEquals(Collections.frequency(counts, ManagementNodeState.MASTER), 1); + assertEquals(Collections.frequency(counts, ManagementNodeState.HOT_STANDBY)+Collections.frequency(counts, ManagementNodeState.STANDBY), nodes.size()-1); + assertEquals(Collections.frequency(savedCounts, ManagementNodeState.MASTER), 1); + assertEquals(Collections.frequency(savedCounts, ManagementNodeState.HOT_STANDBY)+Collections.frequency(savedCounts, ManagementNodeState.STANDBY), nodes.size()-1); + }}); + } catch (Throwable t) { + log.warn("Failed to stabilize (rethrowing): "+t, t); + throw Exceptions.propagate(t); + } + + for (Thread t: spawned) + t.join(Duration.THIRTY_SECONDS.toMilliseconds()); + } + + +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerTestFixture.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerTestFixture.java b/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerTestFixture.java new file mode 100644 index 0000000..5a7f79a --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerTestFixture.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.management.ha; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.brooklyn.api.management.ha.HighAvailabilityMode; +import org.apache.brooklyn.api.management.ha.ManagementNodeState; +import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord; +import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord; +import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister; +import org.apache.brooklyn.core.management.ha.HighAvailabilityManagerImpl; +import org.apache.brooklyn.core.management.ha.ManagementPlaneSyncRecordDeltaImpl; +import org.apache.brooklyn.core.management.ha.ManagementPlaneSyncRecordPersisterToObjectStore; +import org.apache.brooklyn.core.management.ha.HighAvailabilityManagerImpl.PromotionListener; +import org.apache.brooklyn.core.management.internal.ManagementContextInternal; +import org.apache.brooklyn.test.entity.LocalManagementContextForTests; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.BrooklynVersion; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.rebind.PersistenceExceptionHandlerImpl; +import brooklyn.entity.rebind.persister.BrooklynMementoPersisterToObjectStore; +import brooklyn.entity.rebind.persister.PersistMode; +import brooklyn.entity.rebind.persister.PersistenceObjectStore; +import brooklyn.entity.rebind.plane.dto.BasicManagementNodeSyncRecord; +import brooklyn.entity.rebind.plane.dto.BasicManagementNodeSyncRecord.Builder; +import brooklyn.test.Asserts; +import brooklyn.util.time.Duration; + +import com.google.common.base.Ticker; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; + +@Test +public abstract class HighAvailabilityManagerTestFixture { + + @SuppressWarnings("unused") + private static final Logger log = LoggerFactory.getLogger(HighAvailabilityManagerTestFixture.class); + + private ManagementPlaneSyncRecordPersister persister; + protected ManagementContextInternal managementContext; + private String ownNodeId; + private HighAvailabilityManagerImpl manager; + private Ticker ticker; + private AtomicLong currentTime; // used to set the ticker's return value + private RecordingPromotionListener promotionListener; + private ClassLoader classLoader = getClass().getClassLoader(); + private PersistenceObjectStore objectStore; + + @BeforeMethod(alwaysRun=true) + public void setUp() throws Exception { + currentTime = new AtomicLong(1000000000L); + ticker = new Ticker() { + // strictly not a ticker because returns millis UTC, but it works fine even so + @Override public long read() { + return currentTime.get(); + } + }; + promotionListener = new RecordingPromotionListener(); + managementContext = newLocalManagementContext(); + ownNodeId = managementContext.getManagementNodeId(); + objectStore = newPersistenceObjectStore(); + objectStore.injectManagementContext(managementContext); + objectStore.prepareForSharedUse(PersistMode.CLEAN, HighAvailabilityMode.DISABLED); + persister = new ManagementPlaneSyncRecordPersisterToObjectStore(managementContext, objectStore, classLoader); + ((ManagementPlaneSyncRecordPersisterToObjectStore)persister).preferRemoteTimestampInMemento(); + BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore( + objectStore, + managementContext.getBrooklynProperties(), + classLoader); + managementContext.getRebindManager().setPersister(persisterObj, PersistenceExceptionHandlerImpl.builder().build()); + manager = ((HighAvailabilityManagerImpl)managementContext.getHighAvailabilityManager()) + .setPollPeriod(getPollPeriod()) + .setHeartbeatTimeout(Duration.THIRTY_SECONDS) + .setPromotionListener(promotionListener) + .setLocalTicker(ticker) + .setRemoteTicker(getRemoteTicker()) + .setPersister(persister); + persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder() + .node(newManagerMemento(ownNodeId, ManagementNodeState.HOT_STANDBY)) + .build()); + + } + + protected ManagementContextInternal newLocalManagementContext() { + return LocalManagementContextForTests.newInstance(); + } + + protected abstract PersistenceObjectStore newPersistenceObjectStore(); + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + if (manager != null) manager.stop(); + if (managementContext != null) Entities.destroyAll(managementContext); + if (objectStore != null) objectStore.deleteCompletely(); + } + + // The web-console could still be polling (e.g. if have just restarted brooklyn), before the persister is set. + // Must not throw NPE, but instead return something sensible (e.g. an empty state record). + @Test + public void testGetManagementPlaneSyncStateDoesNotThrowNpeBeforePersisterSet() throws Exception { + HighAvailabilityManagerImpl manager2 = new HighAvailabilityManagerImpl(managementContext) + .setPollPeriod(Duration.millis(10)) + .setHeartbeatTimeout(Duration.THIRTY_SECONDS) + .setPromotionListener(promotionListener) + .setLocalTicker(ticker) + .setRemoteTicker(ticker); + try { + ManagementPlaneSyncRecord state = manager2.loadManagementPlaneSyncRecord(true); + assertNotNull(state); + } finally { + manager2.stop(); + } + + } + // Can get a log.error about our management node's heartbeat being out of date. Caused by + // poller first writing a heartbeat record, and then the clock being incremented. But the + // next poll fixes it. + public void testPromotes() throws Exception { + persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder() + .node(newManagerMemento(ownNodeId, ManagementNodeState.HOT_STANDBY)) + .node(newManagerMemento("node1", ManagementNodeState.MASTER)) + .setMaster("node1") + .build()); + + manager.start(HighAvailabilityMode.AUTO); + + // Simulate passage of time; ticker used by this HA-manager so it will "correctly" publish + // its own heartbeat with the new time; but node1's record is now out-of-date. + tickerAdvance(Duration.seconds(31)); + + // Expect to be notified of our promotion, as the only other node + promotionListener.assertCalledEventually(); + } + + @Test(groups="Integration") // because one second wait in succeedsContinually + public void testDoesNotPromoteIfMasterTimeoutNotExpired() throws Exception { + persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder() + .node(newManagerMemento(ownNodeId, ManagementNodeState.HOT_STANDBY)) + .node(newManagerMemento("node1", ManagementNodeState.MASTER)) + .setMaster("node1") + .build()); + + manager.start(HighAvailabilityMode.AUTO); + + tickerAdvance(Duration.seconds(25)); + + // Expect not to be notified, as 25s < 30s timeout + // (it's normally a fake clock so won't hit 30, even waiting 1s below - but in "IntegrationTest" subclasses it is real!) + Asserts.succeedsContinually(new Runnable() { + @Override public void run() { + assertTrue(promotionListener.callTimestamps.isEmpty(), "calls="+promotionListener.callTimestamps); + }}); + } + + public void testGetManagementPlaneStatus() throws Exception { + // with the name zzzzz the mgr created here should never be promoted by the alphabetical strategy! + + tickerAdvance(Duration.FIVE_SECONDS); + persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder() + .node(newManagerMemento(ownNodeId, ManagementNodeState.STANDBY)) + .node(newManagerMemento("zzzzzzz_node1", ManagementNodeState.STANDBY)) + .build()); + persister.loadSyncRecord(); + long zzzTime = tickerCurrentMillis(); + tickerAdvance(Duration.FIVE_SECONDS); + + manager.start(HighAvailabilityMode.AUTO); + ManagementPlaneSyncRecord memento = manager.loadManagementPlaneSyncRecord(true); + + // Note can assert timestamp because not "real" time; it's using our own Ticker + assertEquals(memento.getMasterNodeId(), ownNodeId); + assertEquals(memento.getManagementNodes().keySet(), ImmutableSet.of(ownNodeId, "zzzzzzz_node1")); + assertEquals(memento.getManagementNodes().get(ownNodeId).getNodeId(), ownNodeId); + assertEquals(memento.getManagementNodes().get(ownNodeId).getStatus(), ManagementNodeState.MASTER); + assertEquals(memento.getManagementNodes().get(ownNodeId).getLocalTimestamp(), tickerCurrentMillis()); + assertEquals(memento.getManagementNodes().get("zzzzzzz_node1").getNodeId(), "zzzzzzz_node1"); + assertEquals(memento.getManagementNodes().get("zzzzzzz_node1").getStatus(), ManagementNodeState.STANDBY); + assertEquals(memento.getManagementNodes().get("zzzzzzz_node1").getLocalTimestamp(), zzzTime); + } + + @Test(groups="Integration", invocationCount=50) //because we have had non-deterministic failures + public void testGetManagementPlaneStatusManyTimes() throws Exception { + testGetManagementPlaneStatus(); + } + + @Test + public void testGetManagementPlaneSyncStateInfersTimedOutNodeAsFailed() throws Exception { + persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder() + .node(newManagerMemento(ownNodeId, ManagementNodeState.HOT_STANDBY)) + .node(newManagerMemento("node1", ManagementNodeState.MASTER)) + .setMaster("node1") + .build()); + + manager.start(HighAvailabilityMode.HOT_STANDBY); + + ManagementPlaneSyncRecord state = manager.loadManagementPlaneSyncRecord(true); + assertEquals(state.getManagementNodes().get("node1").getStatus(), ManagementNodeState.MASTER); + assertEquals(state.getManagementNodes().get(ownNodeId).getStatus(), ManagementNodeState.HOT_STANDBY); + + // Simulate passage of time; ticker used by this HA-manager so it will "correctly" publish + // its own heartbeat with the new time; but node1's record is now out-of-date. + tickerAdvance(Duration.seconds(31)); + + ManagementPlaneSyncRecord state2 = manager.loadManagementPlaneSyncRecord(true); + assertEquals(state2.getManagementNodes().get("node1").getStatus(), ManagementNodeState.FAILED); + assertNotEquals(state.getManagementNodes().get(ownNodeId).getStatus(), ManagementNodeState.FAILED); + } + + protected Duration getPollPeriod() { + return Duration.millis(10); + } + + protected long tickerCurrentMillis() { + return ticker.read(); + } + + protected long tickerAdvance(Duration duration) { + currentTime.addAndGet(duration.toMilliseconds()); + return tickerCurrentMillis(); + } + + protected Ticker getRemoteTicker() { + return ticker; + } + + protected ManagementNodeSyncRecord newManagerMemento(String nodeId, ManagementNodeState status) { + Builder rb = BasicManagementNodeSyncRecord.builder(); + rb.brooklynVersion(BrooklynVersion.get()).nodeId(nodeId).status(status); + rb.localTimestamp(tickerCurrentMillis()); + if (getRemoteTicker()!=null) + rb.remoteTimestamp(getRemoteTicker().read()); + return rb.build(); + } + + public static class RecordingPromotionListener implements PromotionListener { + public final List<Long> callTimestamps = Lists.newCopyOnWriteArrayList(); + + @Override + public void promotingToMaster() { + callTimestamps.add(System.currentTimeMillis()); + } + + public void assertNotCalled() { + assertTrue(callTimestamps.isEmpty(), "calls="+callTimestamps); + } + + public void assertCalled() { + assertFalse(callTimestamps.isEmpty(), "calls="+callTimestamps); + } + + public void assertCalledEventually() { + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + assertCalled(); + }}); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/HotStandbyTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/management/ha/HotStandbyTest.java b/core/src/test/java/org/apache/brooklyn/core/management/ha/HotStandbyTest.java new file mode 100644 index 0000000..da4f998 --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/management/ha/HotStandbyTest.java @@ -0,0 +1,667 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.management.ha; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import java.util.ArrayDeque; +import java.util.Date; +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; + +import org.apache.brooklyn.api.entity.Application; +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.Feed; +import org.apache.brooklyn.api.entity.proxying.EntitySpec; +import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.api.location.LocationSpec; +import org.apache.brooklyn.api.management.ha.HighAvailabilityMode; +import org.apache.brooklyn.api.management.ha.ManagementNodeState; +import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister; +import org.apache.brooklyn.core.management.ha.HighAvailabilityManagerImpl; +import org.apache.brooklyn.core.management.ha.ManagementPlaneSyncRecordPersisterToObjectStore; +import org.apache.brooklyn.core.management.internal.AbstractManagementContext; +import org.apache.brooklyn.core.management.internal.ManagementContextInternal; +import org.apache.brooklyn.test.EntityTestUtils; +import org.apache.brooklyn.test.entity.LocalManagementContextForTests; +import org.apache.brooklyn.test.entity.TestApplication; +import org.apache.brooklyn.test.entity.TestEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.basic.Entities; +import brooklyn.entity.rebind.PersistenceExceptionHandlerImpl; +import brooklyn.entity.rebind.RebindFeedTest.MyEntityWithFunctionFeedImpl; +import brooklyn.entity.rebind.RebindFeedTest.MyEntityWithNewFeedsEachTimeImpl; +import brooklyn.entity.rebind.RebindManagerImpl; +import brooklyn.entity.rebind.RebindTestFixture; +import brooklyn.entity.rebind.persister.BrooklynMementoPersisterToObjectStore; +import brooklyn.entity.rebind.persister.InMemoryObjectStore; +import brooklyn.entity.rebind.persister.ListeningObjectStore; +import brooklyn.entity.rebind.persister.PersistMode; +import brooklyn.entity.rebind.persister.PersistenceObjectStore; + +import org.apache.brooklyn.location.basic.LocalhostMachineProvisioningLocation.LocalhostMachine; + +import brooklyn.util.collections.MutableList; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.javalang.JavaClassNames; +import brooklyn.util.repeat.Repeater; +import brooklyn.util.text.ByteSizeStrings; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +import com.google.common.collect.Iterables; + +public class HotStandbyTest { + + private static final Logger log = LoggerFactory.getLogger(HotStandbyTest.class); + + private List<HaMgmtNode> nodes = new MutableList<HotStandbyTest.HaMgmtNode>(); + Map<String,String> sharedBackingStore = MutableMap.of(); + Map<String,Date> sharedBackingStoreDates = MutableMap.of(); + private ClassLoader classLoader = getClass().getClassLoader(); + + public class HaMgmtNode { + // TODO share with WarmStandbyTest and SplitBrainTest and a few others (minor differences but worth it ultimately) + + private ManagementContextInternal mgmt; + private String ownNodeId; + private String nodeName; + private ListeningObjectStore objectStore; + private ManagementPlaneSyncRecordPersister persister; + private HighAvailabilityManagerImpl ha; + private Duration persistOrRebindPeriod = Duration.ONE_SECOND; + + public void setUp() throws Exception { + nodeName = "node "+nodes.size(); + mgmt = newLocalManagementContext(); + ownNodeId = mgmt.getManagementNodeId(); + objectStore = new ListeningObjectStore(newPersistenceObjectStore()); + objectStore.injectManagementContext(mgmt); + objectStore.prepareForSharedUse(PersistMode.CLEAN, HighAvailabilityMode.DISABLED); + persister = new ManagementPlaneSyncRecordPersisterToObjectStore(mgmt, objectStore, classLoader); + ((ManagementPlaneSyncRecordPersisterToObjectStore)persister).preferRemoteTimestampInMemento(); + BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(objectStore, mgmt.getBrooklynProperties(), classLoader); + ((RebindManagerImpl)mgmt.getRebindManager()).setPeriodicPersistPeriod(persistOrRebindPeriod); + mgmt.getRebindManager().setPersister(persisterObj, PersistenceExceptionHandlerImpl.builder().build()); + ha = ((HighAvailabilityManagerImpl)mgmt.getHighAvailabilityManager()) + .setPollPeriod(Duration.PRACTICALLY_FOREVER) + .setHeartbeatTimeout(Duration.THIRTY_SECONDS) + .setPersister(persister); + log.info("Created "+nodeName+" "+ownNodeId); + } + + public void tearDownThisOnly() throws Exception { + if (ha != null) ha.stop(); + if (mgmt!=null) mgmt.getRebindManager().stop(); + if (mgmt != null) Entities.destroyAll(mgmt); + } + + public void tearDownAll() throws Exception { + tearDownThisOnly(); + // can't delete the object store until all being torn down + if (objectStore != null) objectStore.deleteCompletely(); + } + + @Override + public String toString() { + return nodeName+" "+ownNodeId; + } + + public RebindManagerImpl rebinder() { + return (RebindManagerImpl)mgmt.getRebindManager(); + } + } + + @BeforeMethod(alwaysRun=true) + public void setUp() throws Exception { + nodes.clear(); + sharedBackingStore.clear(); + } + + public HaMgmtNode newNode(Duration persistOrRebindPeriod) throws Exception { + HaMgmtNode node = new HaMgmtNode(); + node.persistOrRebindPeriod = persistOrRebindPeriod; + node.setUp(); + nodes.add(node); + return node; + } + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + for (HaMgmtNode n: nodes) + n.tearDownAll(); + } + + protected ManagementContextInternal newLocalManagementContext() { + return new LocalManagementContextForTests(); + } + + protected PersistenceObjectStore newPersistenceObjectStore() { + return new InMemoryObjectStore(sharedBackingStore, sharedBackingStoreDates); + } + + private HaMgmtNode createMaster(Duration persistOrRebindPeriod) throws Exception { + HaMgmtNode n1 = newNode(persistOrRebindPeriod); + n1.ha.start(HighAvailabilityMode.AUTO); + assertEquals(n1.ha.getNodeState(), ManagementNodeState.MASTER); + return n1; + } + + private HaMgmtNode createHotStandby(Duration rebindPeriod) throws Exception { + HaMgmtNode n2 = newNode(rebindPeriod); + n2.ha.start(HighAvailabilityMode.HOT_STANDBY); + assertEquals(n2.ha.getNodeState(), ManagementNodeState.HOT_STANDBY); + return n2; + } + + private TestApplication createFirstAppAndPersist(HaMgmtNode n1) throws Exception { + TestApplication app = TestApplication.Factory.newManagedInstanceForTests(n1.mgmt); + // for testing without enrichers, if desired: +// TestApplication app = ApplicationBuilder.newManagedApp(EntitySpec.create(TestApplication.class).impl(TestApplicationNoEnrichersImpl.class), n1.mgmt); + app.setDisplayName("First App"); + app.start(MutableList.<Location>of()); + app.config().set(TestEntity.CONF_NAME, "first-app"); + app.setAttribute(TestEntity.SEQUENCE, 3); + + forcePersistNow(n1); + return app; + } + + protected void forcePersistNow(HaMgmtNode n1) { + n1.mgmt.getRebindManager().forcePersistNow(false, null); + } + + private Application expectRebindSequenceNumber(HaMgmtNode master, HaMgmtNode hotStandby, Application app, int expectedSensorSequenceValue, boolean immediate) { + Application appRO = hotStandby.mgmt.lookup(app.getId(), Application.class); + + if (immediate) { + forcePersistNow(master); + forceRebindNow(hotStandby); + EntityTestUtils.assertAttributeEquals(appRO, TestEntity.SEQUENCE, expectedSensorSequenceValue); + } else { + EntityTestUtils.assertAttributeEqualsEventually(appRO, TestEntity.SEQUENCE, expectedSensorSequenceValue); + } + + log.info("got sequence number "+expectedSensorSequenceValue+" from "+appRO); + + // make sure the instance (proxy) is unchanged + Application appRO2 = hotStandby.mgmt.lookup(app.getId(), Application.class); + Assert.assertTrue(appRO2==appRO); + + return appRO; + } + + private void forceRebindNow(HaMgmtNode hotStandby) { + hotStandby.mgmt.getRebindManager().rebind(null, null, ManagementNodeState.HOT_STANDBY); + } + + @Test + public void testHotStandbySeesInitialCustomNameConfigAndSensorValueButDoesntAllowChange() throws Exception { + HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER); + TestApplication app = createFirstAppAndPersist(n1); + HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER); + + assertEquals(n2.mgmt.getApplications().size(), 1); + Application appRO = n2.mgmt.lookup(app.getId(), Application.class); + Assert.assertNotNull(appRO); + Assert.assertTrue(appRO instanceof TestApplication); + assertEquals(appRO.getDisplayName(), "First App"); + assertEquals(appRO.getConfig(TestEntity.CONF_NAME), "first-app"); + assertEquals(appRO.getAttribute(TestEntity.SEQUENCE), (Integer)3); + + try { + ((TestApplication)appRO).setAttribute(TestEntity.SEQUENCE, 4); + Assert.fail("Should not have allowed sensor to be set"); + } catch (Exception e) { + Assert.assertTrue(e.toString().toLowerCase().contains("read-only"), "Error message did not contain expected text: "+e); + } + assertEquals(appRO.getAttribute(TestEntity.SEQUENCE), (Integer)3); + } + + @Test + public void testHotStandbySeesChangesToNameConfigAndSensorValue() throws Exception { + HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER); + TestApplication app = createFirstAppAndPersist(n1); + HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER); + + assertEquals(n2.mgmt.getApplications().size(), 1); + Application appRO = n2.mgmt.lookup(app.getId(), Application.class); + Assert.assertNotNull(appRO); + assertEquals(appRO.getChildren().size(), 0); + + // test changes + + app.setDisplayName("First App Renamed"); + app.config().set(TestEntity.CONF_NAME, "first-app-renamed"); + app.setAttribute(TestEntity.SEQUENCE, 4); + + appRO = expectRebindSequenceNumber(n1, n2, app, 4, true); + assertEquals(n2.mgmt.getEntityManager().getEntities().size(), 1); + assertEquals(appRO.getDisplayName(), "First App Renamed"); + assertEquals(appRO.getConfig(TestEntity.CONF_NAME), "first-app-renamed"); + + // and change again for good measure! + + app.setDisplayName("First App"); + app.config().set(TestEntity.CONF_NAME, "first-app-restored"); + app.setAttribute(TestEntity.SEQUENCE, 5); + + appRO = expectRebindSequenceNumber(n1, n2, app, 5, true); + assertEquals(n2.mgmt.getEntityManager().getEntities().size(), 1); + assertEquals(appRO.getDisplayName(), "First App"); + assertEquals(appRO.getConfig(TestEntity.CONF_NAME), "first-app-restored"); + } + + + public void testHotStandbySeesStructuralChangesIncludingRemoval() throws Exception { + doTestHotStandbySeesStructuralChangesIncludingRemoval(true); + } + + @Test(groups="Integration") // due to time (it waits for background persistence) + public void testHotStandbyUnforcedSeesStructuralChangesIncludingRemoval() throws Exception { + doTestHotStandbySeesStructuralChangesIncludingRemoval(false); + } + + public void doTestHotStandbySeesStructuralChangesIncludingRemoval(boolean immediate) throws Exception { + HaMgmtNode n1 = createMaster(immediate ? Duration.PRACTICALLY_FOREVER : Duration.millis(200)); + TestApplication app = createFirstAppAndPersist(n1); + HaMgmtNode n2 = createHotStandby(immediate ? Duration.PRACTICALLY_FOREVER : Duration.millis(200)); + + assertEquals(n2.mgmt.getApplications().size(), 1); + Application appRO = n2.mgmt.lookup(app.getId(), Application.class); + Assert.assertNotNull(appRO); + assertEquals(appRO.getChildren().size(), 0); + + // test additions - new child, new app + + TestEntity child = app.addChild(EntitySpec.create(TestEntity.class).configure(TestEntity.CONF_NAME, "first-child")); + Entities.manage(child); + TestApplication app2 = TestApplication.Factory.newManagedInstanceForTests(n1.mgmt); + app2.config().set(TestEntity.CONF_NAME, "second-app"); + + app.setAttribute(TestEntity.SEQUENCE, 4); + appRO = expectRebindSequenceNumber(n1, n2, app, 4, immediate); + + assertEquals(appRO.getChildren().size(), 1); + Entity childRO = Iterables.getOnlyElement(appRO.getChildren()); + assertEquals(childRO.getId(), child.getId()); + assertEquals(childRO.getConfig(TestEntity.CONF_NAME), "first-child"); + + assertEquals(n2.mgmt.getApplications().size(), 2); + Application app2RO = n2.mgmt.lookup(app2.getId(), Application.class); + Assert.assertNotNull(app2RO); + assertEquals(app2RO.getConfig(TestEntity.CONF_NAME), "second-app"); + + assertEquals(n2.mgmt.getEntityManager().getEntities().size(), 3); + + // now test removals + + Entities.unmanage(child); + Entities.unmanage(app2); + + app.setAttribute(TestEntity.SEQUENCE, 5); + appRO = expectRebindSequenceNumber(n1, n2, app, 5, immediate); + + EntityTestUtils.assertAttributeEqualsEventually(appRO, TestEntity.SEQUENCE, 5); + assertEquals(n2.mgmt.getEntityManager().getEntities().size(), 1); + assertEquals(appRO.getChildren().size(), 0); + assertEquals(n2.mgmt.getApplications().size(), 1); + Assert.assertNull(n2.mgmt.lookup(app2.getId(), Application.class)); + Assert.assertNull(n2.mgmt.lookup(child.getId(), Application.class)); + } + + @Test(groups="Integration", invocationCount=50) + public void testHotStandbySeesStructuralChangesIncludingRemovalManyTimes() throws Exception { + doTestHotStandbySeesStructuralChangesIncludingRemoval(true); + } + + Deque<Long> usedMemory = new ArrayDeque<Long>(); + protected long noteUsedMemory(String message) { + Time.sleep(Duration.millis(200)); + for (HaMgmtNode n: nodes) { + ((AbstractManagementContext)n.mgmt).getGarbageCollector().gcIteration(); + } + System.gc(); System.gc(); + Time.sleep(Duration.millis(50)); + System.gc(); System.gc(); + long mem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + usedMemory.addLast(mem); + log.info("Memory used - "+message+": "+ByteSizeStrings.java().apply(mem)); + return mem; + } + public void assertUsedMemoryLessThan(String event, long max) { + noteUsedMemory(event); + long nowUsed = usedMemory.peekLast(); + if (nowUsed > max) { + // aggressively try to force GC + Time.sleep(Duration.ONE_SECOND); + usedMemory.removeLast(); + noteUsedMemory(event+" (extra GC)"); + nowUsed = usedMemory.peekLast(); + if (nowUsed > max) { + Assert.fail("Too much memory used - "+ByteSizeStrings.java().apply(nowUsed)+" > max "+ByteSizeStrings.java().apply(max)); + } + } + } + public void assertUsedMemoryMaxDelta(String event, long deltaMegabytes) { + assertUsedMemoryLessThan(event, usedMemory.peekLast() + deltaMegabytes*1024*1024); + } + + @Test(groups="Integration") + public void testHotStandbyDoesNotLeakLotsOfRebinds() throws Exception { + log.info("Starting test "+JavaClassNames.niceClassAndMethod()); + final int DELTA = 2; + + HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER); + TestApplication app = createFirstAppAndPersist(n1); + long initialUsed = noteUsedMemory("Created app"); + + HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER); + assertUsedMemoryMaxDelta("Standby created", DELTA); + + forcePersistNow(n1); + forceRebindNow(n2); + assertUsedMemoryMaxDelta("Persisted and rebinded once", DELTA); + + for (int i=0; i<10; i++) { + forcePersistNow(n1); + forceRebindNow(n2); + } + assertUsedMemoryMaxDelta("Persisted and rebinded 10x", DELTA); + + for (int i=0; i<1000; i++) { + if ((i+1)%100==0) { + noteUsedMemory("iteration "+(i+1)); + usedMemory.removeLast(); + } + forcePersistNow(n1); + forceRebindNow(n2); + } + assertUsedMemoryMaxDelta("Persisted and rebinded 1000x", DELTA); + + Entities.unmanage(app); + forcePersistNow(n1); + forceRebindNow(n2); + + assertUsedMemoryLessThan("And now all unmanaged", initialUsed + DELTA*1000*1000); + } + + static class BigObject { + public BigObject(int sizeBytes) { array = new byte[sizeBytes]; } + byte[] array; + } + + @Test(groups="Integration") + public void testHotStandbyDoesNotLeakBigObjects() throws Exception { + log.info("Starting test "+JavaClassNames.niceClassAndMethod()); + final int SIZE = 5; + final int SIZE_UP_BOUND = SIZE+2; + final int SIZE_DOWN_BOUND = SIZE-1; + final int GRACE = 2; + // the XML persistence uses a lot of space, we approx at between 2x and 3c + final int SIZE_IN_XML = 3*SIZE; + final int SIZE_IN_XML_DOWN = 2*SIZE; + + HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER); + TestApplication app = createFirstAppAndPersist(n1); + noteUsedMemory("Finished seeding"); + Long initialUsed = usedMemory.peekLast(); + app.config().set(TestEntity.CONF_OBJECT, new BigObject(SIZE*1000*1000)); + assertUsedMemoryMaxDelta("Set a big config object", SIZE_UP_BOUND); + forcePersistNow(n1); + assertUsedMemoryMaxDelta("Persisted a big config object", SIZE_IN_XML); + + HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER); + forceRebindNow(n2); + assertUsedMemoryMaxDelta("Rebinded", SIZE_UP_BOUND); + + for (int i=0; i<10; i++) + forceRebindNow(n2); + assertUsedMemoryMaxDelta("Several more rebinds", GRACE); + for (int i=0; i<10; i++) { + forcePersistNow(n1); + forceRebindNow(n2); + } + assertUsedMemoryMaxDelta("And more rebinds and more persists", GRACE); + + app.config().set(TestEntity.CONF_OBJECT, "big is now small"); + assertUsedMemoryMaxDelta("Big made small at primary", -SIZE_DOWN_BOUND); + forcePersistNow(n1); + assertUsedMemoryMaxDelta("And persisted", -SIZE_IN_XML_DOWN); + + forceRebindNow(n2); + assertUsedMemoryMaxDelta("And at secondary", -SIZE_DOWN_BOUND); + + Entities.unmanage(app); + forcePersistNow(n1); + forceRebindNow(n2); + + assertUsedMemoryLessThan("And now all unmanaged", initialUsed+GRACE*1000*1000); + } + + @Test(groups="Integration") // because it's slow + // Sept 2014 - there is a small leak, of 200 bytes per child created and destroyed; + // but this goes away when the app is destroyed; it may be a benign record + public void testHotStandbyDoesNotLeakLotsOfRebindsCreatingAndDestroyingAChildEntity() throws Exception { + log.info("Starting test "+JavaClassNames.niceClassAndMethod()); + final int DELTA = 2; + + HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER); + TestApplication app = createFirstAppAndPersist(n1); + long initialUsed = noteUsedMemory("Created app"); + + HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER); + assertUsedMemoryMaxDelta("Standby created", DELTA); + + TestEntity lastChild = app.addChild(EntitySpec.create(TestEntity.class).configure(TestEntity.CONF_NAME, "first-child")); + Entities.manage(lastChild); + forcePersistNow(n1); + forceRebindNow(n2); + assertUsedMemoryMaxDelta("Child created and rebinded once", DELTA); + + for (int i=0; i<1000; i++) { + if (i==9 || (i+1)%100==0) { + noteUsedMemory("iteration "+(i+1)); + usedMemory.removeLast(); + } + TestEntity newChild = app.addChild(EntitySpec.create(TestEntity.class).configure(TestEntity.CONF_NAME, "first-child")); + Entities.manage(newChild); + Entities.unmanage(lastChild); + lastChild = newChild; + + forcePersistNow(n1); + forceRebindNow(n2); + } + assertUsedMemoryMaxDelta("Persisted and rebinded 1000x", DELTA); + + Entities.unmanage(app); + forcePersistNow(n1); + forceRebindNow(n2); + + assertUsedMemoryLessThan("And now all unmanaged", initialUsed + DELTA*1000*1000); + } + + protected void assertHotStandby(HaMgmtNode n1) { + assertEquals(n1.ha.getNodeState(), ManagementNodeState.HOT_STANDBY); + Assert.assertTrue(n1.rebinder().isReadOnlyRunning()); + Assert.assertFalse(n1.rebinder().isPersistenceRunning()); + } + + protected void assertMaster(HaMgmtNode n1) { + assertEquals(n1.ha.getNodeState(), ManagementNodeState.MASTER); + Assert.assertFalse(n1.rebinder().isReadOnlyRunning()); + Assert.assertTrue(n1.rebinder().isPersistenceRunning()); + } + + @Test + public void testChangeMode() throws Exception { + HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER); + TestApplication app = createFirstAppAndPersist(n1); + HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER); + + TestEntity child = app.addChild(EntitySpec.create(TestEntity.class).configure(TestEntity.CONF_NAME, "first-child")); + Entities.manage(child); + TestApplication app2 = TestApplication.Factory.newManagedInstanceForTests(n1.mgmt); + app2.config().set(TestEntity.CONF_NAME, "second-app"); + + forcePersistNow(n1); + n2.ha.setPriority(1); + n1.ha.changeMode(HighAvailabilityMode.HOT_STANDBY); + + // both now hot standby + assertHotStandby(n1); + assertHotStandby(n2); + + assertEquals(n1.mgmt.getApplications().size(), 2); + Application app2RO = n1.mgmt.lookup(app2.getId(), Application.class); + Assert.assertNotNull(app2RO); + assertEquals(app2RO.getConfig(TestEntity.CONF_NAME), "second-app"); + try { + ((TestApplication)app2RO).setAttribute(TestEntity.SEQUENCE, 4); + Assert.fail("Should not have allowed sensor to be set"); + } catch (Exception e) { + Assert.assertTrue(e.toString().toLowerCase().contains("read-only"), "Error message did not contain expected text: "+e); + } + + n1.ha.changeMode(HighAvailabilityMode.AUTO); + n2.ha.changeMode(HighAvailabilityMode.HOT_STANDBY, true, false); + // both still hot standby (n1 will defer to n2 as it has higher priority) + assertHotStandby(n1); + assertHotStandby(n2); + + // with priority 1, n2 will now be elected + n2.ha.changeMode(HighAvailabilityMode.AUTO); + assertHotStandby(n1); + assertMaster(n2); + + assertEquals(n2.mgmt.getApplications().size(), 2); + Application app2B = n2.mgmt.lookup(app2.getId(), Application.class); + Assert.assertNotNull(app2B); + assertEquals(app2B.getConfig(TestEntity.CONF_NAME), "second-app"); + ((TestApplication)app2B).setAttribute(TestEntity.SEQUENCE, 4); + + forcePersistNow(n2); + forceRebindNow(n1); + Application app2BRO = n1.mgmt.lookup(app2.getId(), Application.class); + Assert.assertNotNull(app2BRO); + EntityTestUtils.assertAttributeEquals(app2BRO, TestEntity.SEQUENCE, 4); + } + + @Test(groups="Integration", invocationCount=20) + public void testChangeModeManyTimes() throws Exception { + testChangeMode(); + } + + @Test + public void testChangeModeToDisabledAndBack() throws Exception { + HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER); + n1.mgmt.getLocationManager().createLocation(LocationSpec.create(LocalhostMachine.class)); + @SuppressWarnings("unused") + TestApplication app = createFirstAppAndPersist(n1); + + HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER); + + // disabled n1 allows n2 to become master when next we tell it to check + n1.ha.changeMode(HighAvailabilityMode.DISABLED); + n2.ha.changeMode(HighAvailabilityMode.AUTO); + assertMaster(n2); + assertEquals(n1.ha.getNodeState(), ManagementNodeState.FAILED); + Assert.assertTrue(n1.mgmt.getApplications().isEmpty(), "n1 should have had no apps; instead had: "+n1.mgmt.getApplications()); + Assert.assertTrue(n1.mgmt.getEntityManager().getEntities().isEmpty(), "n1 should have had no entities; instead had: "+n1.mgmt.getEntityManager().getEntities()); + Assert.assertTrue(n1.mgmt.getLocationManager().getLocations().isEmpty(), "n1 should have had no locations; instead had: "+n1.mgmt.getLocationManager().getLocations()); + + // we can now change n1 back to hot_standby + n1.ha.changeMode(HighAvailabilityMode.HOT_STANDBY); + assertHotStandby(n1); + // and it sees apps + Assert.assertFalse(n1.mgmt.getApplications().isEmpty(), "n1 should have had apps now"); + Assert.assertFalse(n1.mgmt.getLocationManager().getLocations().isEmpty(), "n1 should have had locations now"); + // and if n2 is disabled, n1 promotes + n2.ha.changeMode(HighAvailabilityMode.DISABLED); + n1.ha.changeMode(HighAvailabilityMode.AUTO); + assertMaster(n1); + assertEquals(n2.ha.getNodeState(), ManagementNodeState.FAILED); + } + + @Test + public void testHotStandbyDoesNotStartFeeds() throws Exception { + HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER); + TestApplication app = createFirstAppAndPersist(n1); + TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class).impl(MyEntityWithFunctionFeedImpl.class)); + forcePersistNow(n1); + Assert.assertTrue(entity.feeds().getFeeds().size() > 0, "Feeds: "+entity.feeds().getFeeds()); + for (Feed feed : entity.feeds().getFeeds()) { + assertTrue(feed.isRunning(), "Feed expected running, but it is non-running"); + } + + HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER); + TestEntity entityRO = (TestEntity) n2.mgmt.lookup(entity.getId(), Entity.class); + Assert.assertTrue(entityRO.feeds().getFeeds().size() > 0, "Feeds: "+entity.feeds().getFeeds()); + for (Feed feedRO : entityRO.feeds().getFeeds()) { + assertFalse(feedRO.isRunning(), "Feed expected non-active, but it is running"); + } + } + + @Test(groups="Integration") + public void testHotStandbyDoesNotStartFeedsRebindingManyTimes() throws Exception { + testHotStandbyDoesNotStartFeeds(); + final HaMgmtNode hsb = createHotStandby(Duration.millis(10)); + Repeater.create("until 10 rebinds").every(Duration.millis(100)).until( + new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + return ((RebindManagerImpl)hsb.mgmt.getRebindManager()).getReadOnlyRebindCount() >= 10; + } + }).runRequiringTrue(); + // make sure not too many tasks (allowing 5 for rebind etc; currently just 2) + RebindTestFixture.waitForTaskCountToBecome(hsb.mgmt, 5); + } + + @Test(groups="Integration") + public void testHotStandbyDoesNotStartFeedsRebindingManyTimesWithAnotherFeedGenerator() throws Exception { + HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER); + TestApplication app = createFirstAppAndPersist(n1); + TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class).impl(MyEntityWithNewFeedsEachTimeImpl.class)); + forcePersistNow(n1); + Assert.assertTrue(entity.feeds().getFeeds().size() == 4, "Feeds: "+entity.feeds().getFeeds()); + + final HaMgmtNode hsb = createHotStandby(Duration.millis(10)); + Repeater.create("until 10 rebinds").every(Duration.millis(100)).until( + new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + return ((RebindManagerImpl)hsb.mgmt.getRebindManager()).getReadOnlyRebindCount() >= 10; + } + }).runRequiringTrue(); + // make sure not too many tasks (allowing 5 for rebind etc; currently just 2) + RebindTestFixture.waitForTaskCountToBecome(hsb.mgmt, 5); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/ImmutableManagementPlaneSyncRecord.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/management/ha/ImmutableManagementPlaneSyncRecord.java b/core/src/test/java/org/apache/brooklyn/core/management/ha/ImmutableManagementPlaneSyncRecord.java new file mode 100644 index 0000000..2f08057 --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/management/ha/ImmutableManagementPlaneSyncRecord.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.management.ha; + +import java.util.Map; + +import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord; +import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord; + +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableMap; + +public class ImmutableManagementPlaneSyncRecord implements ManagementPlaneSyncRecord { + private final String masterNodeId; + private final Map<String, ManagementNodeSyncRecord> managementNodes; + + ImmutableManagementPlaneSyncRecord(String masterNodeId, Map<String, ManagementNodeSyncRecord> nodes) { + this.masterNodeId = masterNodeId; + this.managementNodes = ImmutableMap.copyOf(nodes); + } + + @Override + public String getMasterNodeId() { + return masterNodeId; + } + + @Override + public Map<String, ManagementNodeSyncRecord> getManagementNodes() { + return managementNodes; + } + + @Override + public String toString() { + return Objects.toStringHelper(this).add("master", masterNodeId).add("nodes", managementNodes.keySet()).toString(); + } + + @Override + public String toVerboseString() { + return Objects.toStringHelper(this).add("master", masterNodeId).add("nodes", managementNodes).toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/ManagementPlaneSyncRecordPersisterInMemory.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/management/ha/ManagementPlaneSyncRecordPersisterInMemory.java b/core/src/test/java/org/apache/brooklyn/core/management/ha/ManagementPlaneSyncRecordPersisterInMemory.java new file mode 100644 index 0000000..5791c88 --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/management/ha/ManagementPlaneSyncRecordPersisterInMemory.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.management.ha; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord; +import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord; +import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister; +import org.apache.brooklyn.core.management.ha.ManagementPlaneSyncRecordPersisterToObjectStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.rebind.persister.InMemoryObjectStore; +import brooklyn.util.time.Duration; + +import com.google.common.annotations.VisibleForTesting; + +/** @deprecated since 0.7.0 use {@link ManagementPlaneSyncRecordPersisterToObjectStore} + * with {@link InMemoryObjectStore} + * <code> + * new ManagementPlaneSyncRecordPersisterToObjectStore(new InMemoryObjectStore(), classLoader) + * </code> */ +@Deprecated +public class ManagementPlaneSyncRecordPersisterInMemory implements ManagementPlaneSyncRecordPersister { + + private static final Logger LOG = LoggerFactory.getLogger(ManagementPlaneSyncRecordPersisterInMemory.class); + + private final MutableManagementPlaneSyncRecord memento = new MutableManagementPlaneSyncRecord(); + + private volatile boolean running = true; + + @Override + public synchronized void stop() { + running = false; + } + + @Override + public ManagementPlaneSyncRecord loadSyncRecord() throws IOException { + if (!running) { + throw new IllegalStateException("Persister not running; cannot load memento"); + } + + return memento.snapshot(); + } + + @VisibleForTesting + @Override + public synchronized void waitForWritesCompleted(Duration timeout) throws InterruptedException, TimeoutException { + // The synchronized is sufficient - guarantee that no concurrent calls + return; + } + + @Override + public synchronized void delta(Delta delta) { + if (!running) { + if (LOG.isDebugEnabled()) LOG.debug("Persister not running; ignoring checkpointed delta of manager-memento"); + return; + } + + for (ManagementNodeSyncRecord m : delta.getNodes()) { + memento.addNode(m); + } + for (String id : delta.getRemovedNodeIds()) { + memento.deleteNode(id); + } + switch (delta.getMasterChange()) { + case NO_CHANGE: + break; // no-op + case SET_MASTER: + memento.setMasterNodeId(checkNotNull(delta.getNewMasterOrNull())); + break; + case CLEAR_MASTER: + memento.setMasterNodeId(null); + break; // no-op + default: + throw new IllegalStateException("Unknown state for master-change: "+delta.getMasterChange()); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/MasterChooserTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/management/ha/MasterChooserTest.java b/core/src/test/java/org/apache/brooklyn/core/management/ha/MasterChooserTest.java new file mode 100644 index 0000000..8958c4b --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/management/ha/MasterChooserTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.management.ha; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; + +import java.util.List; + +import org.apache.brooklyn.api.management.ha.ManagementNodeState; +import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord; +import org.apache.brooklyn.core.management.ha.BasicMasterChooser.AlphabeticMasterChooser; +import org.apache.brooklyn.core.management.ha.BasicMasterChooser.ScoredRecord; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.BrooklynVersion; +import brooklyn.entity.basic.EntityFunctions; +import brooklyn.entity.rebind.plane.dto.BasicManagementNodeSyncRecord; +import brooklyn.util.time.Duration; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +public class MasterChooserTest { + + private MutableManagementPlaneSyncRecord memento; + private AlphabeticMasterChooser chooser; + private long now; + + @BeforeMethod(alwaysRun=true) + public void setUp() throws Exception { + memento = new MutableManagementPlaneSyncRecord(); + chooser = new AlphabeticMasterChooser(); + now = System.currentTimeMillis(); + } + + @Test + public void testChoosesFirstAlphanumeric() throws Exception { + memento.addNode(newManagerMemento("node1", ManagementNodeState.STANDBY, now)); + memento.addNode(newManagerMemento("node2", ManagementNodeState.STANDBY, now)); + memento.addNode(newManagerMemento("node3", ManagementNodeState.STANDBY, now)); + Duration heartbeatTimeout = Duration.THIRTY_SECONDS; + String ownNodeId = "node2"; + assertEquals(chooser.choose(memento, heartbeatTimeout, ownNodeId).getNodeId(), "node1"); + } + + @Test + public void testReturnsNullIfNoValid() throws Exception { + memento.addNode(newManagerMemento("node1", ManagementNodeState.STANDBY, now - 31*1000)); + Duration heartbeatTimeout = Duration.THIRTY_SECONDS; + assertNull(chooser.choose(memento, heartbeatTimeout, "node2")); + } + + @Test + public void testFiltersOutByHeartbeat() throws Exception { + memento.addNode(newManagerMemento("node1", ManagementNodeState.STANDBY, now - 31*1000)); + memento.addNode(newManagerMemento("node2", ManagementNodeState.STANDBY, now - 20*1000)); + memento.addNode(newManagerMemento("node3", ManagementNodeState.STANDBY, now)); + Duration heartbeatTimeout = Duration.THIRTY_SECONDS; + assertEquals(getIds(chooser.sort(chooser.filterHealthy(memento, heartbeatTimeout, now))), ImmutableList.of("node2", "node3")); + } + + protected static List<String> getIds(List<ScoredRecord<?>> filterHealthy) { + return ImmutableList.copyOf(Iterables.transform(filterHealthy, EntityFunctions.id())); + } + + @Test + public void testFiltersOutByStatusNotPreferringMaster() throws Exception { + assertEquals(doTestFiltersOutByStatus(false, false), ImmutableList.of("node4", "node5")); + } + @Test + public void testFiltersOutByStatusPreferringMaster() throws Exception { + assertEquals(doTestFiltersOutByStatus(true, false), ImmutableList.of("node5", "node4")); + } + + @Test + public void testFiltersOutByStatusNotPreferringHot() throws Exception { + assertEquals(doTestFiltersOutByStatus(false, true), ImmutableList.of("node4", "node5", "node6")); + } + @Test + public void testFiltersOutByStatusPreferringHot() throws Exception { + assertEquals(doTestFiltersOutByStatus(true, true), ImmutableList.of("node5", "node6", "node4")); + } + + protected List<String> doTestFiltersOutByStatus(boolean preferHot, boolean includeHot) throws Exception { + chooser = new AlphabeticMasterChooser(preferHot); + memento.addNode(newManagerMemento("node1", ManagementNodeState.FAILED, now)); + memento.addNode(newManagerMemento("node2", ManagementNodeState.TERMINATED, now)); + memento.addNode(newManagerMemento("node3", null, now)); + memento.addNode(newManagerMemento("node4", ManagementNodeState.STANDBY, now)); + memento.addNode(newManagerMemento("node5", ManagementNodeState.MASTER, now)); + if (includeHot) + memento.addNode(newManagerMemento("node6", ManagementNodeState.HOT_STANDBY, now)); + return getIds(chooser.sort(chooser.filterHealthy(memento, Duration.THIRTY_SECONDS, now))); + } + + @Test + public void testExplicityPriority() throws Exception { + chooser = new AlphabeticMasterChooser(); + memento.addNode(newManagerMemento("node1", ManagementNodeState.STANDBY, now, BrooklynVersion.get(), 2L)); + memento.addNode(newManagerMemento("node2", ManagementNodeState.STANDBY, now, BrooklynVersion.get(), -1L)); + memento.addNode(newManagerMemento("node3", ManagementNodeState.STANDBY, now, BrooklynVersion.get(), null)); + List<String> order = getIds(chooser.sort(chooser.filterHealthy(memento, Duration.THIRTY_SECONDS, now))); + assertEquals(order, ImmutableList.of("node1", "node3", "node2")); + } + + @Test + public void testVersionsMaybeNull() throws Exception { + chooser = new AlphabeticMasterChooser(); + memento.addNode(newManagerMemento("node1", ManagementNodeState.STANDBY, now, "v10", null)); + memento.addNode(newManagerMemento("node2", ManagementNodeState.STANDBY, now, "v3-snapshot", null)); + memento.addNode(newManagerMemento("node3", ManagementNodeState.STANDBY, now, "v3-snapshot", -1L)); + memento.addNode(newManagerMemento("node4", ManagementNodeState.STANDBY, now, "v3-snapshot", null)); + memento.addNode(newManagerMemento("node5", ManagementNodeState.STANDBY, now, "v3-stable", null)); + memento.addNode(newManagerMemento("node6", ManagementNodeState.STANDBY, now, "v1", null)); + memento.addNode(newManagerMemento("node7", ManagementNodeState.STANDBY, now, null, null)); + List<String> order = getIds(chooser.sort(chooser.filterHealthy(memento, Duration.THIRTY_SECONDS, now))); + assertEquals(order, ImmutableList.of("node1", "node5", "node6", "node2", "node4", "node7", "node3")); + } + + private ManagementNodeSyncRecord newManagerMemento(String nodeId, ManagementNodeState status, long timestamp) { + return newManagerMemento(nodeId, status, timestamp, BrooklynVersion.get(), null); + } + private ManagementNodeSyncRecord newManagerMemento(String nodeId, ManagementNodeState status, long timestamp, + String version, Long priority) { + return BasicManagementNodeSyncRecord.builder().brooklynVersion(version).nodeId(nodeId).status(status).localTimestamp(timestamp).remoteTimestamp(timestamp). + priority(priority).build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/MutableManagementPlaneSyncRecord.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/management/ha/MutableManagementPlaneSyncRecord.java b/core/src/test/java/org/apache/brooklyn/core/management/ha/MutableManagementPlaneSyncRecord.java new file mode 100644 index 0000000..d9c8943 --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/management/ha/MutableManagementPlaneSyncRecord.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.management.ha; + +import java.util.Map; + +import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord; +import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord; + +import com.google.common.collect.Maps; + +public class MutableManagementPlaneSyncRecord implements ManagementPlaneSyncRecord { + private String masterNodeId; + private Map<String, ManagementNodeSyncRecord> managementNodes = Maps.newConcurrentMap(); + + @Override + public String getMasterNodeId() { + return masterNodeId; + } + + @Override + public Map<String, ManagementNodeSyncRecord> getManagementNodes() { + return managementNodes; + } + + @Override + public String toVerboseString() { + return toString(); + } + + public ImmutableManagementPlaneSyncRecord snapshot() { + return new ImmutableManagementPlaneSyncRecord(masterNodeId, managementNodes); + } + + public void setMasterNodeId(String masterNodeId) { + this.masterNodeId = masterNodeId; + } + + public void addNode(ManagementNodeSyncRecord memento) { + managementNodes.put(memento.getNodeId(), memento); + } + + public void deleteNode(String nodeId) { + managementNodes.remove(nodeId); + } +} \ No newline at end of file
