Add option to ServiceFailureDetector to republish failed events
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/75f00025 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/75f00025 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/75f00025 Branch: refs/heads/master Commit: 75f00025980c240c8bd37c5ed1b74198bda76d99 Parents: dd0c234 Author: Svetoslav Neykov <[email protected]> Authored: Fri Mar 27 19:10:29 2015 +0200 Committer: Svetoslav Neykov <[email protected]> Committed: Tue Apr 7 14:38:11 2015 +0300 ---------------------------------------------------------------------- .../policy/ha/ServiceFailureDetector.java | 14 +++- .../policy/ha/ServiceFailureDetectorTest.java | 71 +++++++++++++++++++- 2 files changed, 83 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/75f00025/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java b/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java index f3264e3..7f5c142 100644 --- a/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java +++ b/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java @@ -102,6 +102,12 @@ public class ServiceFailureDetector extends ServiceStateLogic.ComputeServiceStat .defaultValue(Duration.ZERO) .build(); + @SetFromFlag("entityFailedRepublishTime") + public static final ConfigKey<Duration> ENTITY_FAILED_REPUBLISH_TIME = BasicConfigKey.builder(Duration.class) + .name("entityFailed.republishTime") + .description("Publish failed state periodically at the specified intervals, null to disable.") + .build(); + protected Long firstUpTime; protected Long currentFailureStartTime = null; @@ -215,7 +221,13 @@ public class ServiceFailureDetector extends ServiceStateLogic.ComputeServiceStat if (delayBeforeCheck<=0) { if (LOG.isDebugEnabled()) LOG.debug("{} publishing failed (state={}; currentFailureStartTime={}; now={}", new Object[] {this, state, Time.makeDateString(currentFailureStartTime), Time.makeDateString(now)}); - publishEntityFailedTime = null; + Duration republishDelay = getConfig(ENTITY_FAILED_REPUBLISH_TIME); + if (republishDelay == null) { + publishEntityFailedTime = null; + } else { + publishEntityFailedTime = now + republishDelay.toMilliseconds(); + recomputeIn = Math.min(recomputeIn, republishDelay.toMilliseconds()); + } lastPublished = LastPublished.FAILED; entity.emit(HASensors.ENTITY_FAILED, new HASensors.FailureDescriptor(entity, getFailureDescription(now))); } else { http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/75f00025/policy/src/test/java/brooklyn/policy/ha/ServiceFailureDetectorTest.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/brooklyn/policy/ha/ServiceFailureDetectorTest.java b/policy/src/test/java/brooklyn/policy/ha/ServiceFailureDetectorTest.java index c31bbda..7250e8e 100644 --- a/policy/src/test/java/brooklyn/policy/ha/ServiceFailureDetectorTest.java +++ b/policy/src/test/java/brooklyn/policy/ha/ServiceFailureDetectorTest.java @@ -24,7 +24,10 @@ import static org.testng.Assert.fail; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -56,6 +59,7 @@ import com.google.common.base.Predicates; import com.google.common.collect.ImmutableMap; public class ServiceFailureDetectorTest { + private static final Logger log = LoggerFactory.getLogger(ServiceFailureDetectorTest.class); private static final int TIMEOUT_MS = 10*1000; @@ -245,7 +249,7 @@ public class ServiceFailureDetectorTest { .configure(ServiceFailureDetector.SERVICE_ON_FIRE_STABILIZATION_DELAY, Duration.ONE_SECOND) .configure(ServiceFailureDetector.ENTITY_RECOVERED_STABILIZATION_DELAY, Duration.ONE_SECOND)); - // Set the entit to healthy + // Set the entity to healthy e1.setAttribute(TestEntity.SERVICE_UP, true); ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING); EntityTestUtils.assertAttributeEqualsEventually(e1, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); @@ -312,6 +316,71 @@ public class ServiceFailureDetectorTest { assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null); } + @Test(groups="Integration") // Has a 1.5 second wait + public void testRepublishedFailure() throws Exception { + Duration republishPeriod = Duration.millis(100); + + e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class) + .configure(ServiceFailureDetector.ENTITY_FAILED_REPUBLISH_TIME, republishPeriod)); + + // Set the entity to healthy + e1.setAttribute(TestEntity.SERVICE_UP, true); + ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING); + EntityTestUtils.assertAttributeEqualsEventually(e1, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); + + // Make the entity fail; + ServiceStateLogic.ServiceProblemsLogic.updateProblemsIndicator(e1, "test", "foo"); + EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE); + assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null); + + //wait for at least 10 republish events (~1 sec) + assertEventsSizeEventually(10); + + // Now recover + ServiceStateLogic.ServiceProblemsLogic.clearProblemsIndicator(e1, "test"); + EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); + assertHasEventEventually(HASensors.ENTITY_RECOVERED, Predicates.<Object>equalTo(e1), null); + + //once recovered check no more failed events emitted periodically + assertEventsSizeContiniually(events.size()); + + SensorEvent<FailureDescriptor> prevEvent = null; + for (SensorEvent<FailureDescriptor> event : events) { + if (prevEvent != null) { + long repeatOffset = event.getTimestamp() - prevEvent.getTimestamp(); + long deviation = Math.abs(repeatOffset - republishPeriod.toMilliseconds()); + if (deviation > republishPeriod.toMilliseconds()/10 && + //warn only if recovered is too far away from the last failure + (!event.getSensor().equals(HASensors.ENTITY_RECOVERED) || + repeatOffset > republishPeriod.toMilliseconds())) { + log.error("The time between failure republish (" + repeatOffset + "ms) deviates too much from the expected " + republishPeriod + ". prevEvent=" + prevEvent + ", event=" + event); + } + } + prevEvent = event; + } + + //make sure no republish takes place after recovered + assertEquals(prevEvent.getSensor(), HASensors.ENTITY_RECOVERED); + } + + private void assertEventsSizeContiniually(final int size) { + Asserts.succeedsContinually(MutableMap.of("timeout", 500), new Runnable() { + @Override + public void run() { + assertTrue(events.size() == size, "assertEventsSizeContiniually expects " + size + " events but found " + events.size() + ": " + events); + } + }); + } + + private void assertEventsSizeEventually(final int size) { + Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() { + @Override + public void run() { + assertTrue(events.size() >= size, "assertEventsSizeContiniually expects at least " + size + " events but found " + events.size() + ": " + events); + } + }); + } + private void assertHasEvent(Sensor<?> sensor, Predicate<Object> componentPredicate, Predicate<? super CharSequence> descriptionPredicate) { for (SensorEvent<FailureDescriptor> event : events) { if (event.getSensor().equals(sensor) &&
