http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/ha/ConnectionFailureDetectorTest.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/ha/ConnectionFailureDetectorTest.java b/policy/src/test/java/org/apache/brooklyn/policy/ha/ConnectionFailureDetectorTest.java new file mode 100644 index 0000000..e2d6998 --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/ha/ConnectionFailureDetectorTest.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.ha; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import java.io.IOException; +import java.net.ServerSocket; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.apache.brooklyn.api.event.Sensor; +import org.apache.brooklyn.api.event.SensorEvent; +import org.apache.brooklyn.api.event.SensorEventListener; +import org.apache.brooklyn.api.management.ManagementContext; +import org.apache.brooklyn.api.policy.PolicySpec; +import org.apache.brooklyn.test.entity.LocalManagementContextForTests; +import org.apache.brooklyn.test.entity.TestApplication; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.basic.ApplicationBuilder; +import brooklyn.entity.basic.Entities; +import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor; +import brooklyn.test.Asserts; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.time.Duration; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; +import com.google.common.net.HostAndPort; + +public class ConnectionFailureDetectorTest { + + private static final int TIMEOUT_MS = 30*1000; + private static final int OVERHEAD = 250; + private static final int POLL_PERIOD = 100; + + private ManagementContext managementContext; + private TestApplication app; + + private List<SensorEvent<FailureDescriptor>> events; + + private ServerSocket serverSocket; + private HostAndPort serverSocketAddress; + + @BeforeMethod(alwaysRun=true) + public void setUp() throws Exception { + events = new CopyOnWriteArrayList<SensorEvent<FailureDescriptor>>(); + + managementContext = new LocalManagementContextForTests(); + app = ApplicationBuilder.newManagedApp(TestApplication.class, managementContext); + + app.getManagementContext().getSubscriptionManager().subscribe( + app, + HASensors.CONNECTION_FAILED, + new SensorEventListener<FailureDescriptor>() { + @Override public void onEvent(SensorEvent<FailureDescriptor> event) { + events.add(event); + } + }); + app.getManagementContext().getSubscriptionManager().subscribe( + app, + HASensors.CONNECTION_RECOVERED, + new SensorEventListener<FailureDescriptor>() { + @Override public void onEvent(SensorEvent<FailureDescriptor> event) { + events.add(event); + } + }); + + serverSocketAddress = startServerSocket(); + } + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + stopServerSocket(); + if (managementContext != null) Entities.destroyAll(managementContext); + } + + private HostAndPort startServerSocket() throws Exception { + if (serverSocketAddress != null) { + serverSocket = new ServerSocket(serverSocketAddress.getPort()); + } else { + for (int i = 40000; i < 40100; i++) { + try { + serverSocket = new ServerSocket(i); + } catch (IOException e) { + // try next port + } + } + assertNotNull(serverSocket, "Failed to create server socket; no ports free in range!"); + serverSocketAddress = HostAndPort.fromParts(serverSocket.getInetAddress().getHostAddress(), serverSocket.getLocalPort()); + } + return serverSocketAddress; + } + + private void stopServerSocket() throws Exception { + if (serverSocket != null) serverSocket.close(); + } + + @Test(groups="Integration") // Has a 1 second wait + public void testNotNotifiedOfFailuresForHealthy() throws Exception { + // Create members before and after the policy is registered, to test both scenarios + + app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class) + .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress)); + + assertNoEventsContinually(); + } + + @Test + public void testNotifiedOfFailure() throws Exception { + app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class) + .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress)); + + stopServerSocket(); + + assertHasEventEventually(HASensors.CONNECTION_FAILED, Predicates.<Object>equalTo(app), null); + assertEquals(events.size(), 1, "events="+events); + } + + @Test + public void testNotifiedOfRecovery() throws Exception { + app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class) + .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress)); + + stopServerSocket(); + assertHasEventEventually(HASensors.CONNECTION_FAILED, Predicates.<Object>equalTo(app), null); + + // make the connection recover + startServerSocket(); + assertHasEventEventually(HASensors.CONNECTION_RECOVERED, Predicates.<Object>equalTo(app), null); + assertEquals(events.size(), 2, "events="+events); + } + + @Test + public void testReportsFailureWhenAlreadyDownOnRegisteringPolicy() throws Exception { + stopServerSocket(); + + app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class) + .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress)); + + assertHasEventEventually(HASensors.CONNECTION_FAILED, Predicates.<Object>equalTo(app), null); + } + + @Test(groups="Integration") // Because slow + public void testNotNotifiedOfTemporaryFailuresDuringStabilisationDelay() throws Exception { + app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class) + .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress) + .configure(ConnectionFailureDetector.CONNECTION_FAILED_STABILIZATION_DELAY, Duration.ONE_MINUTE)); + + stopServerSocket(); + Thread.sleep(100); + startServerSocket(); + + assertNoEventsContinually(); + } + + @Test(groups="Integration") // Because slow + public void testNotifiedOfFailureAfterStabilisationDelay() throws Exception { + final int stabilisationDelay = 1000; + + app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class) + .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress) + .configure(ConnectionFailureDetector.CONNECTION_FAILED_STABILIZATION_DELAY, Duration.of(stabilisationDelay))); + + stopServerSocket(); + + assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD)); + assertHasEventEventually(HASensors.CONNECTION_FAILED, Predicates.<Object>equalTo(app), null); + } + + @Test(groups="Integration") // Because slow + public void testFailuresThenUpDownResetsStabilisationCount() throws Exception { + final long stabilisationDelay = 1000; + + app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class) + .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress) + .configure(ConnectionFailureDetector.CONNECTION_FAILED_STABILIZATION_DELAY, Duration.of(stabilisationDelay))); + + stopServerSocket(); + assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD)); + + startServerSocket(); + Thread.sleep(POLL_PERIOD+OVERHEAD); + stopServerSocket(); + assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD)); + + assertHasEventEventually(HASensors.CONNECTION_FAILED, Predicates.<Object>equalTo(app), null); + } + + @Test(groups="Integration") // Because slow + public void testNotNotifiedOfTemporaryRecoveryDuringStabilisationDelay() throws Exception { + final long stabilisationDelay = 1000; + + app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class) + .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress) + .configure(ConnectionFailureDetector.CONNECTION_RECOVERED_STABILIZATION_DELAY, Duration.of(stabilisationDelay))); + + stopServerSocket(); + assertHasEventEventually(HASensors.CONNECTION_FAILED, Predicates.<Object>equalTo(app), null); + events.clear(); + + startServerSocket(); + Thread.sleep(POLL_PERIOD+OVERHEAD); + stopServerSocket(); + + assertNoEventsContinually(Duration.of(stabilisationDelay + OVERHEAD)); + } + + @Test(groups="Integration") // Because slow + public void testNotifiedOfRecoveryAfterStabilisationDelay() throws Exception { + final int stabilisationDelay = 1000; + + app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class) + .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress) + .configure(ConnectionFailureDetector.CONNECTION_RECOVERED_STABILIZATION_DELAY, Duration.of(stabilisationDelay))); + + stopServerSocket(); + assertHasEventEventually(HASensors.CONNECTION_FAILED, Predicates.<Object>equalTo(app), null); + events.clear(); + + startServerSocket(); + assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD)); + assertHasEventEventually(HASensors.CONNECTION_RECOVERED, Predicates.<Object>equalTo(app), null); + } + + @Test(groups="Integration") // Because slow + public void testRecoversThenDownUpResetsStabilisationCount() throws Exception { + final long stabilisationDelay = 1000; + + app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class) + .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress) + .configure(ConnectionFailureDetector.CONNECTION_RECOVERED_STABILIZATION_DELAY, Duration.of(stabilisationDelay))); + + stopServerSocket(); + assertHasEventEventually(HASensors.CONNECTION_FAILED, Predicates.<Object>equalTo(app), null); + events.clear(); + + startServerSocket(); + assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD)); + + stopServerSocket(); + Thread.sleep(POLL_PERIOD+OVERHEAD); + startServerSocket(); + assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD)); + + assertHasEventEventually(HASensors.CONNECTION_RECOVERED, Predicates.<Object>equalTo(app), null); + } + + private void assertHasEvent(Sensor<?> sensor, Predicate<Object> componentPredicate, Predicate<? super CharSequence> descriptionPredicate) { + for (SensorEvent<FailureDescriptor> event : events) { + if (event.getSensor().equals(sensor) && + (componentPredicate == null || componentPredicate.apply(event.getValue().getComponent())) && + (descriptionPredicate == null || descriptionPredicate.apply(event.getValue().getDescription()))) { + return; + } + } + fail("No matching "+sensor+" event found; events="+events); + } + + private void assertHasEventEventually(final Sensor<?> sensor, final Predicate<Object> componentPredicate, final Predicate<? super CharSequence> descriptionPredicate) { + Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() { + @Override public void run() { + assertHasEvent(sensor, componentPredicate, descriptionPredicate); + }}); + } + + private void assertNoEventsContinually(Duration duration) { + Asserts.succeedsContinually(ImmutableMap.of("timeout", duration), new Runnable() { + @Override public void run() { + assertTrue(events.isEmpty(), "events="+events); + }}); + } + + private void assertNoEventsContinually() { + Asserts.succeedsContinually(new Runnable() { + @Override public void run() { + assertTrue(events.isEmpty(), "events="+events); + }}); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/ha/HaPolicyRebindTest.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/ha/HaPolicyRebindTest.java b/policy/src/test/java/org/apache/brooklyn/policy/ha/HaPolicyRebindTest.java new file mode 100644 index 0000000..3782ccf --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/ha/HaPolicyRebindTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.ha; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.fail; + +import java.util.List; +import java.util.Set; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.proxying.EntitySpec; +import org.apache.brooklyn.api.event.Sensor; +import org.apache.brooklyn.api.event.SensorEvent; +import org.apache.brooklyn.api.event.SensorEventListener; +import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.api.location.LocationSpec; +import org.apache.brooklyn.api.policy.EnricherSpec; +import org.apache.brooklyn.api.policy.PolicySpec; +import org.apache.brooklyn.test.entity.TestApplication; +import org.apache.brooklyn.test.entity.TestEntity; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.basic.Entities; +import brooklyn.entity.basic.Lifecycle; +import brooklyn.entity.basic.ServiceStateLogic; +import brooklyn.entity.group.DynamicCluster; +import brooklyn.entity.rebind.RebindTestFixtureWithApp; + +import org.apache.brooklyn.location.basic.SimulatedLocation; + +import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor; +import brooklyn.test.Asserts; +import brooklyn.util.collections.MutableMap; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public class HaPolicyRebindTest extends RebindTestFixtureWithApp { + + private TestEntity origEntity; + private SensorEventListener<FailureDescriptor> eventListener; + private List<SensorEvent<FailureDescriptor>> events; + + @Override + @BeforeMethod(alwaysRun=true) + public void setUp() throws Exception { + super.setUp(); + origEntity = origApp.createAndManageChild(EntitySpec.create(TestEntity.class)); + events = Lists.newCopyOnWriteArrayList(); + eventListener = new SensorEventListener<FailureDescriptor>() { + @Override public void onEvent(SensorEvent<FailureDescriptor> event) { + events.add(event); + } + }; + } + + @Test + public void testServiceRestarterWorksAfterRebind() throws Exception { + origEntity.addPolicy(PolicySpec.create(ServiceRestarter.class) + .configure(ServiceRestarter.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED)); + + TestApplication newApp = rebind(); + final TestEntity newEntity = (TestEntity) Iterables.find(newApp.getChildren(), Predicates.instanceOf(TestEntity.class)); + + newEntity.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(origEntity, "simulate failure")); + + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + assertEquals(newEntity.getCallHistory(), ImmutableList.of("restart")); + }}); + } + + @Test + public void testServiceReplacerWorksAfterRebind() throws Exception { + Location origLoc = origManagementContext.getLocationManager().createLocation(LocationSpec.create(SimulatedLocation.class)); + DynamicCluster origCluster = origApp.createAndManageChild(EntitySpec.create(DynamicCluster.class) + .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(TestEntity.class)) + .configure(DynamicCluster.INITIAL_SIZE, 3)); + origApp.start(ImmutableList.<Location>of(origLoc)); + + origCluster.addPolicy(PolicySpec.create(ServiceReplacer.class) + .configure(ServiceReplacer.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED)); + + // rebind + TestApplication newApp = rebind(); + final DynamicCluster newCluster = (DynamicCluster) Iterables.find(newApp.getChildren(), Predicates.instanceOf(DynamicCluster.class)); + + // stimulate the policy + final Set<Entity> initialMembers = ImmutableSet.copyOf(newCluster.getMembers()); + final TestEntity e1 = (TestEntity) Iterables.get(initialMembers, 1); + + newApp.getManagementContext().getSubscriptionManager().subscribe(e1, HASensors.ENTITY_FAILED, eventListener); + newApp.getManagementContext().getSubscriptionManager().subscribe(e1, HASensors.ENTITY_RECOVERED, eventListener); + + e1.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e1, "simulate failure")); + + // Expect e1 to be replaced + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + Set<Entity> newMembers = Sets.difference(ImmutableSet.copyOf(newCluster.getMembers()), initialMembers); + Set<Entity> removedMembers = Sets.difference(initialMembers, ImmutableSet.copyOf(newCluster.getMembers())); + assertEquals(removedMembers, ImmutableSet.of(e1)); + assertEquals(newMembers.size(), 1); + assertEquals(((TestEntity)Iterables.getOnlyElement(newMembers)).getCallHistory(), ImmutableList.of("start")); + + // TODO e1 not reporting "start" after rebind because callHistory is a field rather than an attribute, so was not persisted + Asserts.assertEqualsIgnoringOrder(e1.getCallHistory(), ImmutableList.of("stop")); + assertFalse(Entities.isManaged(e1)); + }}); + } + + @Test + public void testServiceFailureDetectorWorksAfterRebind() throws Exception { + origEntity.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)); + + // rebind + TestApplication newApp = rebind(); + final TestEntity newEntity = (TestEntity) Iterables.find(newApp.getChildren(), Predicates.instanceOf(TestEntity.class)); + + newApp.getManagementContext().getSubscriptionManager().subscribe(newEntity, HASensors.ENTITY_FAILED, eventListener); + + newEntity.setAttribute(TestEntity.SERVICE_UP, true); + ServiceStateLogic.setExpectedState(newEntity, Lifecycle.RUNNING); + + // trigger the failure + newEntity.setAttribute(TestEntity.SERVICE_UP, false); + + assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(newEntity), null); + assertEquals(events.size(), 1, "events="+events); + } + + private void assertHasEventEventually(final Sensor<?> sensor, final Predicate<Object> componentPredicate, final Predicate<? super CharSequence> descriptionPredicate) { + Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() { + @Override public void run() { + assertHasEvent(sensor, componentPredicate, descriptionPredicate); + }}); + } + + private void assertHasEvent(Sensor<?> sensor, Predicate<Object> componentPredicate, Predicate<? super CharSequence> descriptionPredicate) { + for (SensorEvent<FailureDescriptor> event : events) { + if (event.getSensor().equals(sensor) && + (componentPredicate == null || componentPredicate.apply(event.getValue().getComponent())) && + (descriptionPredicate == null || descriptionPredicate.apply(event.getValue().getDescription()))) { + return; + } + } + fail("No matching "+sensor+" event found; events="+events); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceFailureDetectorStabilizationTest.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceFailureDetectorStabilizationTest.java b/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceFailureDetectorStabilizationTest.java new file mode 100644 index 0000000..ca84592 --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceFailureDetectorStabilizationTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.ha; + +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.apache.brooklyn.api.entity.proxying.EntitySpec; +import org.apache.brooklyn.api.event.Sensor; +import org.apache.brooklyn.api.event.SensorEvent; +import org.apache.brooklyn.api.event.SensorEventListener; +import org.apache.brooklyn.api.management.ManagementContext; +import org.apache.brooklyn.api.policy.EnricherSpec; +import org.apache.brooklyn.test.entity.LocalManagementContextForTests; +import org.apache.brooklyn.test.entity.TestApplication; +import org.apache.brooklyn.test.entity.TestEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.basic.ApplicationBuilder; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.basic.Lifecycle; +import brooklyn.entity.basic.ServiceStateLogic; +import brooklyn.entity.basic.ServiceStateLogicTest; +import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor; +import brooklyn.test.Asserts; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.time.Duration; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; + +/** also see more primitive tests in {@link ServiceStateLogicTest} */ +public class ServiceFailureDetectorStabilizationTest { + + private static final Logger LOG = LoggerFactory.getLogger(ServiceFailureDetectorStabilizationTest.class); + + private static final int TIMEOUT_MS = 10*1000; + private static final int OVERHEAD = 250; + + private ManagementContext managementContext; + private TestApplication app; + private TestEntity e1; + + private List<SensorEvent<FailureDescriptor>> events; + + @BeforeMethod(alwaysRun=true) + public void setUp() throws Exception { + events = new CopyOnWriteArrayList<SensorEvent<FailureDescriptor>>(); + + managementContext = new LocalManagementContextForTests(); + app = ApplicationBuilder.newManagedApp(TestApplication.class, managementContext); + e1 = app.createAndManageChild(EntitySpec.create(TestEntity.class)); + e1.setAttribute(TestEntity.SERVICE_UP, true); + ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING); + + app.getManagementContext().getSubscriptionManager().subscribe( + e1, + HASensors.ENTITY_FAILED, + new SensorEventListener<FailureDescriptor>() { + @Override public void onEvent(SensorEvent<FailureDescriptor> event) { + events.add(event); + } + }); + app.getManagementContext().getSubscriptionManager().subscribe( + e1, + HASensors.ENTITY_RECOVERED, + new SensorEventListener<FailureDescriptor>() { + @Override public void onEvent(SensorEvent<FailureDescriptor> event) { + events.add(event); + } + }); + } + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + if (managementContext != null) Entities.destroyAll(managementContext); + } + + @Test(groups="Integration") // Because slow + public void testNotNotifiedOfTemporaryFailuresDuringStabilisationDelay() throws Exception { + e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class) + .configure(ServiceFailureDetector.ENTITY_FAILED_STABILIZATION_DELAY, Duration.ONE_MINUTE)); + + e1.setAttribute(TestEntity.SERVICE_UP, false); + Thread.sleep(100); + e1.setAttribute(TestEntity.SERVICE_UP, true); + + assertNoEventsContinually(); + } + + @Test(groups="Integration") // Because slow + public void testNotifiedOfFailureAfterStabilisationDelay() throws Exception { + final int stabilisationDelay = 1000; + + e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class) + .configure(ServiceFailureDetector.ENTITY_FAILED_STABILIZATION_DELAY, Duration.of(stabilisationDelay))); + + e1.setAttribute(TestEntity.SERVICE_UP, false); + + assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD)); + assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null); + } + + @Test(groups="Integration") // Because slow + public void testFailuresThenUpDownResetsStabilisationCount() throws Exception { + LOG.debug("Running testFailuresThenUpDownResetsStabilisationCount"); + final long stabilisationDelay = 1000; + + e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class) + .configure(ServiceFailureDetector.ENTITY_FAILED_STABILIZATION_DELAY, Duration.of(stabilisationDelay))); + + e1.setAttribute(TestEntity.SERVICE_UP, false); + assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD)); + + e1.setAttribute(TestEntity.SERVICE_UP, true); + Thread.sleep(OVERHEAD); + e1.setAttribute(TestEntity.SERVICE_UP, false); + assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD)); + + assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null); + } + + @Test(groups="Integration") // Because slow + public void testNotNotifiedOfTemporaryRecoveryDuringStabilisationDelay() throws Exception { + final long stabilisationDelay = 1000; + + e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class) + .configure(ServiceFailureDetector.ENTITY_RECOVERED_STABILIZATION_DELAY, Duration.of(stabilisationDelay))); + + e1.setAttribute(TestEntity.SERVICE_UP, false); + assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null); + events.clear(); + + e1.setAttribute(TestEntity.SERVICE_UP, true); + Thread.sleep(100); + e1.setAttribute(TestEntity.SERVICE_UP, false); + + assertNoEventsContinually(Duration.of(stabilisationDelay + OVERHEAD)); + } + + @Test(groups="Integration") // Because slow + public void testNotifiedOfRecoveryAfterStabilisationDelay() throws Exception { + final int stabilisationDelay = 1000; + + e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class) + .configure(ServiceFailureDetector.ENTITY_RECOVERED_STABILIZATION_DELAY, Duration.of(stabilisationDelay))); + + e1.setAttribute(TestEntity.SERVICE_UP, false); + assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null); + events.clear(); + + e1.setAttribute(TestEntity.SERVICE_UP, true); + assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD)); + assertHasEventEventually(HASensors.ENTITY_RECOVERED, Predicates.<Object>equalTo(e1), null); + } + + @Test(groups="Integration") // Because slow + public void testRecoversThenDownUpResetsStabilisationCount() throws Exception { + final long stabilisationDelay = 1000; + + e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class) + .configure(ServiceFailureDetector.ENTITY_RECOVERED_STABILIZATION_DELAY, Duration.of(stabilisationDelay))); + + e1.setAttribute(TestEntity.SERVICE_UP, false); + assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null); + events.clear(); + + e1.setAttribute(TestEntity.SERVICE_UP, true); + assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD)); + + e1.setAttribute(TestEntity.SERVICE_UP, false); + Thread.sleep(OVERHEAD); + e1.setAttribute(TestEntity.SERVICE_UP, true); + assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD)); + + assertHasEventEventually(HASensors.ENTITY_RECOVERED, Predicates.<Object>equalTo(e1), null); + } + + private void assertHasEvent(Sensor<?> sensor, Predicate<Object> componentPredicate, Predicate<? super CharSequence> descriptionPredicate) { + for (SensorEvent<FailureDescriptor> event : events) { + if (event.getSensor().equals(sensor) && + (componentPredicate == null || componentPredicate.apply(event.getValue().getComponent())) && + (descriptionPredicate == null || descriptionPredicate.apply(event.getValue().getDescription()))) { + return; + } + } + fail("No matching "+sensor+" event found; events="+events); + } + + private void assertHasEventEventually(final Sensor<?> sensor, final Predicate<Object> componentPredicate, final Predicate<? super CharSequence> descriptionPredicate) { + Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() { + @Override public void run() { + assertHasEvent(sensor, componentPredicate, descriptionPredicate); + }}); + } + + private void assertNoEventsContinually(Duration duration) { + Asserts.succeedsContinually(ImmutableMap.of("timeout", duration), new Runnable() { + @Override public void run() { + assertTrue(events.isEmpty(), "events="+events); + }}); + } + + private void assertNoEventsContinually() { + Asserts.succeedsContinually(new Runnable() { + @Override public void run() { + assertTrue(events.isEmpty(), "events="+events); + }}); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceFailureDetectorTest.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceFailureDetectorTest.java b/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceFailureDetectorTest.java new file mode 100644 index 0000000..2ad2d79 --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceFailureDetectorTest.java @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.ha; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.apache.brooklyn.api.entity.proxying.EntitySpec; +import org.apache.brooklyn.api.event.Sensor; +import org.apache.brooklyn.api.event.SensorEvent; +import org.apache.brooklyn.api.event.SensorEventListener; +import org.apache.brooklyn.api.management.ManagementContext; +import org.apache.brooklyn.api.policy.EnricherSpec; +import org.apache.brooklyn.test.EntityTestUtils; +import org.apache.brooklyn.test.entity.LocalManagementContextForTests; +import org.apache.brooklyn.test.entity.TestApplication; +import org.apache.brooklyn.test.entity.TestEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.basic.ApplicationBuilder; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.basic.Lifecycle; +import brooklyn.entity.basic.ServiceStateLogic; +import brooklyn.entity.basic.ServiceStateLogic.ServiceProblemsLogic; +import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor; +import brooklyn.test.Asserts; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; + +public class ServiceFailureDetectorTest { + private static final Logger log = LoggerFactory.getLogger(ServiceFailureDetectorTest.class); + + private static final int TIMEOUT_MS = 10*1000; + + private ManagementContext managementContext; + private TestApplication app; + private TestEntity e1; + + private List<SensorEvent<FailureDescriptor>> events; + private SensorEventListener<FailureDescriptor> eventListener; + + @BeforeMethod(alwaysRun=true) + public void setUp() throws Exception { + events = new CopyOnWriteArrayList<SensorEvent<FailureDescriptor>>(); + eventListener = new SensorEventListener<FailureDescriptor>() { + @Override public void onEvent(SensorEvent<FailureDescriptor> event) { + events.add(event); + } + }; + + managementContext = new LocalManagementContextForTests(); + app = ApplicationBuilder.newManagedApp(TestApplication.class, managementContext); + e1 = app.createAndManageChild(EntitySpec.create(TestEntity.class)); + e1.addEnricher(ServiceStateLogic.newEnricherForServiceStateFromProblemsAndUp()); + + app.getManagementContext().getSubscriptionManager().subscribe(e1, HASensors.ENTITY_FAILED, eventListener); + app.getManagementContext().getSubscriptionManager().subscribe(e1, HASensors.ENTITY_RECOVERED, eventListener); + } + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + if (managementContext != null) Entities.destroyAll(managementContext); + } + + @Test(groups="Integration") // Has a 1 second wait + public void testNotNotifiedOfFailuresForHealthy() throws Exception { + // Create members before and after the policy is registered, to test both scenarios + e1.setAttribute(TestEntity.SERVICE_UP, true); + ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING); + + e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)); + + assertNoEventsContinually(); + assertEquals(e1.getAttribute(TestEntity.SERVICE_STATE_ACTUAL), Lifecycle.RUNNING); + } + + @Test + public void testNotifiedOfFailure() throws Exception { + e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)); + + e1.setAttribute(TestEntity.SERVICE_UP, true); + ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING); + + assertEquals(events.size(), 0, "events="+events); + + e1.setAttribute(TestEntity.SERVICE_UP, false); + + assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null); + assertEquals(events.size(), 1, "events="+events); + EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE); + } + + @Test + public void testNotifiedOfFailureOnProblem() throws Exception { + e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)); + + e1.setAttribute(TestEntity.SERVICE_UP, true); + ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING); + + assertEquals(events.size(), 0, "events="+events); + + ServiceProblemsLogic.updateProblemsIndicator(e1, "test", "foo"); + + assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null); + assertEquals(events.size(), 1, "events="+events); + EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE); + } + + @Test + public void testNotifiedOfFailureOnStateOnFire() throws Exception { + e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)); + e1.setAttribute(TestEntity.SERVICE_UP, true); + ServiceStateLogic.setExpectedState(e1, Lifecycle.ON_FIRE); + + assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null); + assertEquals(events.size(), 1, "events="+events); + EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE); + } + + @Test + public void testNotifiedOfRecovery() throws Exception { + e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)); + + e1.setAttribute(TestEntity.SERVICE_UP, true); + ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING); + // Make the entity fail + e1.setAttribute(TestEntity.SERVICE_UP, false); + + assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null); + EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE); + + // And make the entity recover + e1.setAttribute(TestEntity.SERVICE_UP, true); + assertHasEventEventually(HASensors.ENTITY_RECOVERED, Predicates.<Object>equalTo(e1), null); + assertEquals(events.size(), 2, "events="+events); + EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); + } + + @Test + public void testNotifiedOfRecoveryFromProblems() throws Exception { + e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)); + + e1.setAttribute(TestEntity.SERVICE_UP, true); + ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING); + // Make the entity fail + ServiceProblemsLogic.updateProblemsIndicator(e1, "test", "foo"); + + assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null); + EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE); + + // And make the entity recover + ServiceProblemsLogic.clearProblemsIndicator(e1, "test"); + assertHasEventEventually(HASensors.ENTITY_RECOVERED, Predicates.<Object>equalTo(e1), null); + assertEquals(events.size(), 2, "events="+events); + EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); + } + + + @Test(groups="Integration") // Has a 1 second wait + public void testEmitsEntityFailureOnlyIfPreviouslyUp() throws Exception { + e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)); + + // Make the entity fail + e1.setAttribute(TestEntity.SERVICE_UP, false); + ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING); + + EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE); + assertNoEventsContinually(); + } + + @Test + public void testDisablingPreviouslyUpRequirementForEntityFailed() throws Exception { + e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class) + .configure(ServiceFailureDetector.ENTITY_FAILED_ONLY_IF_PREVIOUSLY_UP, false)); + + e1.setAttribute(TestEntity.SERVICE_UP, false); + ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING); + + EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE); + assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null); + } + + @Test + public void testDisablingOnFire() throws Exception { + e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class) + .configure(ServiceFailureDetector.SERVICE_ON_FIRE_STABILIZATION_DELAY, Duration.PRACTICALLY_FOREVER)); + + // Make the entity fail + e1.setAttribute(TestEntity.SERVICE_UP, true); + ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING); + EntityTestUtils.assertAttributeEqualsEventually(e1, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); + e1.setAttribute(TestEntity.SERVICE_UP, false); + + assertEquals(e1.getAttribute(TestEntity.SERVICE_STATE_ACTUAL), Lifecycle.RUNNING); + } + + @Test(groups="Integration") // Has a 1 second wait + public void testOnFireAfterDelay() throws Exception { + e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class) + .configure(ServiceFailureDetector.SERVICE_ON_FIRE_STABILIZATION_DELAY, Duration.ONE_SECOND)); + + // Make the entity fail + e1.setAttribute(TestEntity.SERVICE_UP, true); + ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING); + EntityTestUtils.assertAttributeEqualsEventually(e1, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); + + e1.setAttribute(TestEntity.SERVICE_UP, false); + + assertEquals(e1.getAttribute(TestEntity.SERVICE_STATE_ACTUAL), Lifecycle.RUNNING); + Time.sleep(Duration.millis(100)); + assertEquals(e1.getAttribute(TestEntity.SERVICE_STATE_ACTUAL), Lifecycle.RUNNING); + EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE); + } + + @Test(groups="Integration") // Has a 1 second wait + public void testOnFailureDelayFromProblemAndRecover() throws Exception { + e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class) + .configure(ServiceFailureDetector.SERVICE_ON_FIRE_STABILIZATION_DELAY, Duration.ONE_SECOND) + .configure(ServiceFailureDetector.ENTITY_RECOVERED_STABILIZATION_DELAY, Duration.ONE_SECOND)); + + // Set the entity to healthy + e1.setAttribute(TestEntity.SERVICE_UP, true); + ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING); + EntityTestUtils.assertAttributeEqualsEventually(e1, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); + + // Make the entity fail; won't set on-fire for 1s but will publish FAILED immediately. + ServiceStateLogic.ServiceProblemsLogic.updateProblemsIndicator(e1, "test", "foo"); + EntityTestUtils.assertAttributeEqualsContinually(ImmutableMap.of("timeout", 100), e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); + assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null); + assertEquals(e1.getAttribute(TestEntity.SERVICE_STATE_ACTUAL), Lifecycle.RUNNING); + + EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE); + + // Now recover: will publish RUNNING immediately, but has 1s stabilisation for RECOVERED + ServiceStateLogic.ServiceProblemsLogic.clearProblemsIndicator(e1, "test"); + EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); + + assertEquals(events.size(), 1, "events="+events); + + assertHasEventEventually(HASensors.ENTITY_RECOVERED, Predicates.<Object>equalTo(e1), null); + assertEquals(events.size(), 2, "events="+events); + } + + @Test(groups="Integration") // Has a 1 second wait + public void testAttendsToServiceState() throws Exception { + e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)); + + e1.setAttribute(TestEntity.SERVICE_UP, true); + // not counted as failed because not expected to be running + e1.setAttribute(TestEntity.SERVICE_UP, false); + + assertNoEventsContinually(); + } + + @Test(groups="Integration") // Has a 1 second wait + public void testOnlyReportsFailureIfRunning() throws Exception { + e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)); + + // Make the entity fail + ServiceStateLogic.setExpectedState(e1, Lifecycle.STARTING); + e1.setAttribute(TestEntity.SERVICE_UP, true); + e1.setAttribute(TestEntity.SERVICE_UP, false); + + assertNoEventsContinually(); + } + + @Test + public void testReportsFailureWhenAlreadyDownOnRegisteringPolicy() throws Exception { + ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING); + e1.setAttribute(TestEntity.SERVICE_UP, false); + + e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class) + .configure(ServiceFailureDetector.ENTITY_FAILED_ONLY_IF_PREVIOUSLY_UP, false)); + + assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null); + } + + @Test + public void testReportsFailureWhenAlreadyOnFireOnRegisteringPolicy() throws Exception { + ServiceStateLogic.setExpectedState(e1, Lifecycle.ON_FIRE); + + e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class) + .configure(ServiceFailureDetector.ENTITY_FAILED_ONLY_IF_PREVIOUSLY_UP, false)); + + assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null); + } + + @Test(groups="Integration") // Has a 1.5 second wait + public void testRepublishedFailure() throws Exception { + Duration republishPeriod = Duration.millis(100); + + e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class) + .configure(ServiceFailureDetector.ENTITY_FAILED_REPUBLISH_TIME, republishPeriod)); + + // Set the entity to healthy + e1.setAttribute(TestEntity.SERVICE_UP, true); + ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING); + EntityTestUtils.assertAttributeEqualsEventually(e1, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); + + // Make the entity fail; + ServiceStateLogic.ServiceProblemsLogic.updateProblemsIndicator(e1, "test", "foo"); + EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE); + assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null); + + //wait for at least 10 republish events (~1 sec) + assertEventsSizeEventually(10); + + // Now recover + ServiceStateLogic.ServiceProblemsLogic.clearProblemsIndicator(e1, "test"); + EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); + assertHasEventEventually(HASensors.ENTITY_RECOVERED, Predicates.<Object>equalTo(e1), null); + + //once recovered check no more failed events emitted periodically + assertEventsSizeContiniually(events.size()); + + SensorEvent<FailureDescriptor> prevEvent = null; + for (SensorEvent<FailureDescriptor> event : events) { + if (prevEvent != null) { + long repeatOffset = event.getTimestamp() - prevEvent.getTimestamp(); + long deviation = Math.abs(repeatOffset - republishPeriod.toMilliseconds()); + if (deviation > republishPeriod.toMilliseconds()/10 && + //warn only if recovered is too far away from the last failure + (!event.getSensor().equals(HASensors.ENTITY_RECOVERED) || + repeatOffset > republishPeriod.toMilliseconds())) { + log.error("The time between failure republish (" + repeatOffset + "ms) deviates too much from the expected " + republishPeriod + ". prevEvent=" + prevEvent + ", event=" + event); + } + } + prevEvent = event; + } + + //make sure no republish takes place after recovered + assertEquals(prevEvent.getSensor(), HASensors.ENTITY_RECOVERED); + } + + private void assertEventsSizeContiniually(final int size) { + Asserts.succeedsContinually(MutableMap.of("timeout", 500), new Runnable() { + @Override + public void run() { + assertTrue(events.size() == size, "assertEventsSizeContiniually expects " + size + " events but found " + events.size() + ": " + events); + } + }); + } + + private void assertEventsSizeEventually(final int size) { + Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() { + @Override + public void run() { + assertTrue(events.size() >= size, "assertEventsSizeContiniually expects at least " + size + " events but found " + events.size() + ": " + events); + } + }); + } + + private void assertHasEvent(Sensor<?> sensor, Predicate<Object> componentPredicate, Predicate<? super CharSequence> descriptionPredicate) { + for (SensorEvent<FailureDescriptor> event : events) { + if (event.getSensor().equals(sensor) && + (componentPredicate == null || componentPredicate.apply(event.getValue().getComponent())) && + (descriptionPredicate == null || descriptionPredicate.apply(event.getValue().getDescription()))) { + return; + } + } + fail("No matching "+sensor+" event found; events="+events); + } + + private void assertHasEventEventually(final Sensor<?> sensor, final Predicate<Object> componentPredicate, final Predicate<? super CharSequence> descriptionPredicate) { + Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() { + @Override public void run() { + assertHasEvent(sensor, componentPredicate, descriptionPredicate); + }}); + } + + private void assertNoEventsContinually() { + Asserts.succeedsContinually(new Runnable() { + @Override public void run() { + assertTrue(events.isEmpty(), "events="+events); + }}); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceReplacerTest.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceReplacerTest.java b/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceReplacerTest.java new file mode 100644 index 0000000..f92603a --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceReplacerTest.java @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.ha; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.proxying.EntitySpec; +import org.apache.brooklyn.api.event.SensorEvent; +import org.apache.brooklyn.api.event.SensorEventListener; +import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.api.location.LocationSpec; +import org.apache.brooklyn.api.management.ManagementContext; +import org.apache.brooklyn.api.policy.PolicySpec; +import org.apache.brooklyn.core.util.config.ConfigBag; +import org.apache.brooklyn.test.EntityTestUtils; +import org.apache.brooklyn.test.entity.LocalManagementContextForTests; +import org.apache.brooklyn.test.entity.TestApplication; +import org.apache.brooklyn.test.entity.TestEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.basic.ApplicationBuilder; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.basic.EntityInternal; +import brooklyn.entity.basic.Lifecycle; +import brooklyn.entity.basic.QuorumCheck; +import brooklyn.entity.basic.ServiceStateLogic.ComputeServiceIndicatorsFromChildrenAndMembers; +import brooklyn.entity.group.DynamicCluster; +import brooklyn.entity.trait.FailingEntity; + +import org.apache.brooklyn.location.basic.SimulatedLocation; + +import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor; +import brooklyn.test.Asserts; +import brooklyn.util.javalang.JavaClassNames; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public class ServiceReplacerTest { + + private static final Logger log = LoggerFactory.getLogger(ServiceReplacerTest.class); + + private ManagementContext managementContext; + private TestApplication app; + private SimulatedLocation loc; + private SensorEventListener<Object> eventListener; + private List<SensorEvent<?>> events; + + @BeforeMethod(alwaysRun=true) + public void setUp() throws Exception { + managementContext = new LocalManagementContextForTests(); + app = ApplicationBuilder.newManagedApp(TestApplication.class, managementContext); + loc = managementContext.getLocationManager().createLocation(LocationSpec.create(SimulatedLocation.class)); + events = Lists.newCopyOnWriteArrayList(); + eventListener = new SensorEventListener<Object>() { + @Override public void onEvent(SensorEvent<Object> event) { + events.add(event); + } + }; + } + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + if (managementContext != null) Entities.destroyAll(managementContext); + } + + @Test + public void testReplacesFailedMember() throws Exception { + final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class) + .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(TestEntity.class)) + .configure(DynamicCluster.INITIAL_SIZE, 3)); + app.start(ImmutableList.<Location>of(loc)); + + ServiceReplacer policy = new ServiceReplacer(new ConfigBag().configure(ServiceReplacer.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED)); + cluster.addPolicy(policy); + + final Set<Entity> initialMembers = ImmutableSet.copyOf(cluster.getMembers()); + final TestEntity e1 = (TestEntity) Iterables.get(initialMembers, 1); + + e1.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e1, "simulate failure")); + + // Expect e1 to be replaced + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + Set<Entity> newMembers = Sets.difference(ImmutableSet.copyOf(cluster.getMembers()), initialMembers); + Set<Entity> removedMembers = Sets.difference(initialMembers, ImmutableSet.copyOf(cluster.getMembers())); + assertEquals(removedMembers, ImmutableSet.of(e1)); + assertEquals(newMembers.size(), 1); + assertEquals(((TestEntity)Iterables.getOnlyElement(newMembers)).getCallHistory(), ImmutableList.of("start")); + assertEquals(e1.getCallHistory(), ImmutableList.of("start", "stop")); + assertFalse(Entities.isManaged(e1)); + }}); + } + + @Test(invocationCount=100) + public void testSetsOnFireWhenFailToReplaceMemberManyTimes() throws Exception { + testSetsOnFireWhenFailToReplaceMember(); + } + + // fails the startup of the replacement entity (but not the original). + @Test + public void testSetsOnFireWhenFailToReplaceMember() throws Exception { + app.subscribe(null, ServiceReplacer.ENTITY_REPLACEMENT_FAILED, eventListener); + + final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class) + .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(FailingEntity.class) + .configure(FailingEntity.FAIL_ON_START_CONDITION, predicateOnlyTrueForCallAtOrAfter(2))) + .configure(DynamicCluster.INITIAL_SIZE, 1) + .configure(DynamicCluster.QUARANTINE_FAILED_ENTITIES, true) + .configure(ComputeServiceIndicatorsFromChildrenAndMembers.UP_QUORUM_CHECK, QuorumCheck.QuorumChecks.alwaysTrue()) + .configure(ComputeServiceIndicatorsFromChildrenAndMembers.RUNNING_QUORUM_CHECK, QuorumCheck.QuorumChecks.alwaysTrue())); + app.start(ImmutableList.<Location>of(loc)); + + // should not be on fire + Assert.assertNotEquals(cluster.getAttribute(Attributes.SERVICE_STATE_ACTUAL), Lifecycle.ON_FIRE); + // and should eventually be running + EntityTestUtils.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); + + log.info("started "+app+" for "+JavaClassNames.niceClassAndMethod()); + + ServiceReplacer policy = new ServiceReplacer(new ConfigBag().configure(ServiceReplacer.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED)); + cluster.addPolicy(policy); + + final Set<Entity> initialMembers = ImmutableSet.copyOf(cluster.getMembers()); + final TestEntity e1 = (TestEntity) Iterables.get(initialMembers, 0); + + e1.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e1, "simulate failure")); + + // Expect cluster to go on-fire when fails to start replacement + // Note that we've set up-quorum and running-quorum to be "alwaysTrue" so that we don't get a transient onFire + // when the failed node fails to start (but before it has been removed from the group to be put in quarantine). + EntityTestUtils.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE); + + // Expect to have the second failed entity still kicking around as proof (in quarantine) + // The cluster should NOT go on fire until after the 2nd failure + Iterable<Entity> members = Iterables.filter(managementContext.getEntityManager().getEntities(), Predicates.instanceOf(FailingEntity.class)); + assertEquals(Iterables.size(members), 2); + + // e2 failed to start, so it won't have called stop on e1 + TestEntity e2 = (TestEntity) Iterables.getOnlyElement(Sets.difference(ImmutableSet.copyOf(members), initialMembers)); + assertEquals(e1.getCallHistory(), ImmutableList.of("start"), "e1.history="+e1.getCallHistory()); + assertEquals(e2.getCallHistory(), ImmutableList.of("start"), "e2.history="+e2.getCallHistory()); + + // And will have received notification event about it + assertEventuallyHasEntityReplacementFailedEvent(cluster); + } + + @Test(groups="Integration") // has a 1 second wait + public void testDoesNotOnFireWhenFailToReplaceMember() throws Exception { + app.subscribe(null, ServiceReplacer.ENTITY_REPLACEMENT_FAILED, eventListener); + + final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class) + .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(FailingEntity.class) + .configure(FailingEntity.FAIL_ON_START_CONDITION, predicateOnlyTrueForCallAtOrAfter(2))) + .configure(DynamicCluster.INITIAL_SIZE, 1) + .configure(DynamicCluster.QUARANTINE_FAILED_ENTITIES, true)); + app.start(ImmutableList.<Location>of(loc)); + + ServiceReplacer policy = new ServiceReplacer(new ConfigBag() + .configure(ServiceReplacer.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED) + .configure(ServiceReplacer.SET_ON_FIRE_ON_FAILURE, false)); + cluster.addPolicy(policy); + + final Set<Entity> initialMembers = ImmutableSet.copyOf(cluster.getMembers()); + final TestEntity e1 = (TestEntity) Iterables.get(initialMembers, 0); + + e1.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e1, "simulate failure")); + + // Configured to not mark cluster as on fire + Asserts.succeedsContinually(new Runnable() { + @Override public void run() { + assertNotEquals(cluster.getAttribute(Attributes.SERVICE_STATE_ACTUAL), Lifecycle.ON_FIRE); + }}); + + // And will have received notification event about it + assertEventuallyHasEntityReplacementFailedEvent(cluster); + } + + @Test(groups="Integration") // 1s wait + public void testStopFailureOfOldEntityDoesNotSetClusterOnFire() throws Exception { + app.subscribe(null, ServiceReplacer.ENTITY_REPLACEMENT_FAILED, eventListener); + + final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class) + .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(FailingEntity.class) + .configure(FailingEntity.FAIL_ON_STOP_CONDITION, predicateOnlyTrueForCallAt(1))) + .configure(DynamicCluster.INITIAL_SIZE, 2)); + app.start(ImmutableList.<Location>of(loc)); + + cluster.addPolicy(PolicySpec.create(ServiceReplacer.class) + .configure(ServiceReplacer.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED)); + + final Set<Entity> initialMembers = ImmutableSet.copyOf(cluster.getMembers()); + final TestEntity e1 = (TestEntity) Iterables.get(initialMembers, 0); + + e1.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e1, "simulate failure")); + + // Expect e1 to be replaced + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + Set<Entity> newMembers = Sets.difference(ImmutableSet.copyOf(cluster.getMembers()), initialMembers); + Set<Entity> removedMembers = Sets.difference(initialMembers, ImmutableSet.copyOf(cluster.getMembers())); + assertEquals(removedMembers, ImmutableSet.of(e1)); + assertEquals(newMembers.size(), 1); + assertEquals(((TestEntity)Iterables.getOnlyElement(newMembers)).getCallHistory(), ImmutableList.of("start")); + assertEquals(e1.getCallHistory(), ImmutableList.of("start", "stop")); + assertFalse(Entities.isManaged(e1)); + }}); + + // Failure to stop the failed member should not cause "on-fire" of cluster + Asserts.succeedsContinually(new Runnable() { + @Override public void run() { + assertNotEquals(cluster.getAttribute(Attributes.SERVICE_STATE_ACTUAL), Lifecycle.ON_FIRE); + }}); + } + + /** + * If we keep on getting failure reports, never managing to replace the failed node, then don't keep trying + * (i.e. avoid infinite loop). + * + * TODO This code + configuration needs some work; it's not testing quite the scenarios that I + * was thinking of! + * I saw problem where a node failed, and the replacements failed, and we ended up trying thousands of times. + * (describing this scenario is made more complex by me having temporarily disabled the cluster from + * removing failed members, for debugging purposes!) + * Imagine these two scenarios: + * <ol> + * <li>Entity fails during call to start(). + * Here, the cluster removes it as a member (either unmanages it or puts it in quarantine) + * So the ENTITY_FAILED is ignored because the entity is not a member at that point. + * <li>Entity returns from start(), but quickly goes to service-down. + * Here we'll keep trying to replace that entity. Depending how long that takes, we'll either + * enter a horrible infinite loop, or we'll just provision a huge number of VMs over a long + * time period. + * Unfortunately this scenario is not catered for in the code yet. + * </ol> + */ + @Test(groups="Integration") // because takes 1.2 seconds + public void testAbandonsReplacementAfterNumFailures() throws Exception { + app.subscribe(null, ServiceReplacer.ENTITY_REPLACEMENT_FAILED, eventListener); + + final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class) + .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(FailingEntity.class) + .configure(FailingEntity.FAIL_ON_START_CONDITION, predicateOnlyTrueForCallAtOrAfter(11))) + .configure(DynamicCluster.INITIAL_SIZE, 10) + .configure(DynamicCluster.QUARANTINE_FAILED_ENTITIES, true)); + app.start(ImmutableList.<Location>of(loc)); + + ServiceReplacer policy = new ServiceReplacer(new ConfigBag() + .configure(ServiceReplacer.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED) + .configure(ServiceReplacer.FAIL_ON_NUM_RECURRING_FAILURES, 3)); + cluster.addPolicy(policy); + + final Set<Entity> initialMembers = ImmutableSet.copyOf(cluster.getMembers()); + for (int i = 0; i < 5; i++) { + final int counter = i+1; + EntityInternal entity = (EntityInternal) Iterables.get(initialMembers, i); + entity.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(entity, "simulate failure")); + if (i <= 3) { + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + Set<FailingEntity> all = ImmutableSet.copyOf(Iterables.filter(managementContext.getEntityManager().getEntities(), FailingEntity.class)); + Set<FailingEntity> replacements = Sets.difference(all, initialMembers); + Set<?> replacementMembers = Sets.intersection(ImmutableSet.of(cluster.getMembers()), replacements); + assertTrue(replacementMembers.isEmpty()); + assertEquals(replacements.size(), counter); + }}); + } else { + Asserts.succeedsContinually(new Runnable() { + @Override public void run() { + Set<FailingEntity> all = ImmutableSet.copyOf(Iterables.filter(managementContext.getEntityManager().getEntities(), FailingEntity.class)); + Set<FailingEntity> replacements = Sets.difference(all, initialMembers); + assertEquals(replacements.size(), 4); + }}); + } + } + } + + + private Predicate<Object> predicateOnlyTrueForCallAt(final int callNumber) { + return predicateOnlyTrueForCallRange(callNumber, callNumber); + } + + private Predicate<Object> predicateOnlyTrueForCallAtOrAfter(final int callLowerNumber) { + return predicateOnlyTrueForCallRange(callLowerNumber, Integer.MAX_VALUE); + } + + private Predicate<Object> predicateOnlyTrueForCallRange(final int callLowerNumber, final int callUpperNumber) { + return new Predicate<Object>() { + private final AtomicInteger counter = new AtomicInteger(0); + @Override public boolean apply(Object input) { + int num = counter.incrementAndGet(); + return num >= callLowerNumber && num <= callUpperNumber; + } + }; + } + + private void assertEventuallyHasEntityReplacementFailedEvent(final Entity expectedCluster) { + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + assertEquals(Iterables.getOnlyElement(events).getSensor(), ServiceReplacer.ENTITY_REPLACEMENT_FAILED, "events="+events); + assertEquals(Iterables.getOnlyElement(events).getSource(), expectedCluster, "events="+events); + assertEquals(((FailureDescriptor)Iterables.getOnlyElement(events).getValue()).getComponent(), expectedCluster, "events="+events); + }}); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceRestarterTest.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceRestarterTest.java b/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceRestarterTest.java new file mode 100644 index 0000000..c93612b --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceRestarterTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.ha; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.brooklyn.api.entity.proxying.EntitySpec; +import org.apache.brooklyn.api.event.SensorEvent; +import org.apache.brooklyn.api.event.SensorEventListener; +import org.apache.brooklyn.api.management.ManagementContext; +import org.apache.brooklyn.api.policy.PolicySpec; +import org.apache.brooklyn.core.util.config.ConfigBag; +import org.apache.brooklyn.test.entity.LocalManagementContextForTests; +import org.apache.brooklyn.test.entity.TestApplication; +import org.apache.brooklyn.test.entity.TestEntity; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.basic.ApplicationBuilder; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.basic.Lifecycle; +import brooklyn.entity.trait.FailingEntity; +import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor; +import brooklyn.test.Asserts; +import brooklyn.util.exceptions.Exceptions; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +public class ServiceRestarterTest { + + private static final int TIMEOUT_MS = 10*1000; + + private ManagementContext managementContext; + private TestApplication app; + private TestEntity e1; + private ServiceRestarter policy; + private SensorEventListener<Object> eventListener; + private List<SensorEvent<?>> events; + + @BeforeMethod(alwaysRun=true) + public void setUp() throws Exception { + managementContext = new LocalManagementContextForTests(); + app = ApplicationBuilder.newManagedApp(TestApplication.class, managementContext); + e1 = app.createAndManageChild(EntitySpec.create(TestEntity.class)); + events = Lists.newCopyOnWriteArrayList(); + eventListener = new SensorEventListener<Object>() { + @Override public void onEvent(SensorEvent<Object> event) { + events.add(event); + } + }; + } + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + if (managementContext != null) Entities.destroyAll(managementContext); + } + + @Test + public void testRestartsOnFailure() throws Exception { + policy = new ServiceRestarter(new ConfigBag().configure(ServiceRestarter.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED)); + e1.addPolicy(policy); + + e1.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e1, "simulate failure")); + + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + assertEquals(e1.getCallHistory(), ImmutableList.of("restart")); + }}); + } + + @Test(groups="Integration") // Has a 1 second wait + public void testDoesNotRestartsWhenHealthy() throws Exception { + policy = new ServiceRestarter(new ConfigBag().configure(ServiceRestarter.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED)); + e1.addPolicy(policy); + + e1.emit(HASensors.ENTITY_RECOVERED, new FailureDescriptor(e1, "not a failure")); + + Asserts.succeedsContinually(new Runnable() { + @Override public void run() { + assertEquals(e1.getCallHistory(), ImmutableList.of()); + }}); + } + + @Test + public void testEmitsFailureEventWhenRestarterFails() throws Exception { + final FailingEntity e2 = app.createAndManageChild(EntitySpec.create(FailingEntity.class) + .configure(FailingEntity.FAIL_ON_RESTART, true)); + app.subscribe(e2, ServiceRestarter.ENTITY_RESTART_FAILED, eventListener); + + policy = new ServiceRestarter(new ConfigBag().configure(ServiceRestarter.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED)); + e2.addPolicy(policy); + + e2.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e2, "simulate failure")); + + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + assertEquals(Iterables.getOnlyElement(events).getSensor(), ServiceRestarter.ENTITY_RESTART_FAILED, "events="+events); + assertEquals(Iterables.getOnlyElement(events).getSource(), e2, "events="+events); + assertEquals(((FailureDescriptor)Iterables.getOnlyElement(events).getValue()).getComponent(), e2, "events="+events); + }}); + + assertEquals(e2.getAttribute(Attributes.SERVICE_STATE_ACTUAL), Lifecycle.ON_FIRE); + } + + @Test + public void testDoesNotSetOnFireOnFailure() throws Exception { + final FailingEntity e2 = app.createAndManageChild(EntitySpec.create(FailingEntity.class) + .configure(FailingEntity.FAIL_ON_RESTART, true)); + app.subscribe(e2, ServiceRestarter.ENTITY_RESTART_FAILED, eventListener); + + policy = new ServiceRestarter(new ConfigBag() + .configure(ServiceRestarter.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED) + .configure(ServiceRestarter.SET_ON_FIRE_ON_FAILURE, false)); + e2.addPolicy(policy); + + e2.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e2, "simulate failure")); + + Asserts.succeedsContinually(new Runnable() { + @Override public void run() { + assertNotEquals(e2.getAttribute(Attributes.SERVICE_STATE_ACTUAL), Lifecycle.ON_FIRE); + }}); + } + + // Previously RestarterPolicy called entity.restart inside the event-listener thread. + // That caused all other events for that entity's subscriptions to be queued until that + // entity's single event handler thread was free again. + @Test + public void testRestartDoesNotBlockOtherSubscriptions() throws Exception { + final CountDownLatch inRestartLatch = new CountDownLatch(1); + final CountDownLatch continueRestartLatch = new CountDownLatch(1); + + final FailingEntity e2 = app.createAndManageChild(EntitySpec.create(FailingEntity.class) + .configure(FailingEntity.FAIL_ON_RESTART, true) + .configure(FailingEntity.EXEC_ON_FAILURE, new Function<Object, Void>() { + @Override public Void apply(Object input) { + inRestartLatch.countDown(); + try { + continueRestartLatch.await(); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } + return null; + }})); + + e2.addPolicy(PolicySpec.create(ServiceRestarter.class) + .configure(ServiceRestarter.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED)); + e2.subscribe(e2, TestEntity.SEQUENCE, eventListener); + + // Cause failure, and wait for entity.restart to be blocking + e2.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e1, "simulate failure")); + assertTrue(inRestartLatch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)); + + // Expect other notifications to continue to get through + e2.setAttribute(TestEntity.SEQUENCE, 1); + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + assertEquals(Iterables.getOnlyElement(events).getValue(), 1); + }}); + + // Allow restart to finish + continueRestartLatch.countDown(); + } +}