This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit fd1d4cd8830c3ddcbf7e5962f2733021208969b1 Author: Alex Heneveld <a...@cloudsoft.io> AuthorDate: Fri Oct 6 15:55:36 2023 +0100 add support for `skip_initial_run` on workflow policies and sensors --- .../brooklyn/camp/brooklyn/WorkflowYamlTest.java | 28 ++++++++++----- .../org/apache/brooklyn/core/feed/PollConfig.java | 4 +++ .../java/org/apache/brooklyn/core/feed/Poller.java | 40 +++++++++++++++------- .../core/sensor/AbstractAddTriggerableSensor.java | 2 ++ .../brooklyn/core/workflow/WorkflowPolicy.java | 8 +++-- .../brooklynnode/SelectMasterEffectorTest.java | 3 +- 6 files changed, 59 insertions(+), 26 deletions(-) diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java index fd9b157708..7779fc6d63 100644 --- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java +++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java @@ -1068,23 +1068,35 @@ public class WorkflowYamlTest extends AbstractYamlTest { EntityAsserts.assertAttributeEqualsEventually(entity, Sensors.newIntegerSensor("x"), 7); } - @Test - public void testAddPolicyStep() throws Exception { - Entity app = createAndStartApplication( - "services:", - "- type: " + BasicEntity.class.getName()); - Entity entity = Iterables.getOnlyElement(app.getChildren()); + private void addSetXPolicy(Entity entity, String value, boolean ignoreTrigger, boolean skipInitial) { WorkflowExecutionContext x = WorkflowBasicTest.runWorkflow(entity, Strings.lines( "steps:", " - type: add-policy", " blueprint:", " type: workflow-policy", " brooklyn.config:", - " triggers: [ other_sensor ]", - " steps: [ set-sensor integer x = 1 ]" + " triggers: [ other_sensor" + (ignoreTrigger ? "_ignored" : "") + " ]", + " " + (skipInitial ? "skip_initial_run: true" : ""), + " steps: [ set-sensor integer x = "+value+" ]" ), "add-policy"); x.getTask(false).get().getUnchecked(); + } + + @Test + public void testAddPolicyStep() throws Exception { + Entity app = createAndStartApplication( + "services:", + "- type: " + BasicEntity.class.getName()); + Entity entity = Iterables.getOnlyElement(app.getChildren()); + addSetXPolicy(entity, "1", false, false); EntityAsserts.assertAttributeEqualsEventually(entity, Sensors.newIntegerSensor("x"), 1); + addSetXPolicy(entity, "2", true, false); // runs initially + EntityAsserts.assertAttributeEqualsEventually(entity, Sensors.newIntegerSensor("x"), 2); + addSetXPolicy(entity, "3", false, true); // does not run initially + // Time.sleep(Duration.millis(250)); // uncomment this to really test it + EntityAsserts.assertAttribute(entity, Sensors.newIntegerSensor("x"), v -> !new Integer(3).equals(v)); + entity.sensors().set(Sensors.newStringSensor("other_sensor"), "go"); // now it will run + EntityAsserts.assertAttributeEqualsEventually(entity, Sensors.newIntegerSensor("x"), 3); } @Test diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java b/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java index 983ee71f20..3358162283 100644 --- a/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java +++ b/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java @@ -35,6 +35,7 @@ import org.apache.brooklyn.util.time.Duration; */ public class PollConfig<V, T, F extends PollConfig<V, T, F>> extends FeedConfig<V, T, F> { + private Boolean skipInitialRun = null; // null default is false private long period = -1; private Object otherTriggers; private String description; @@ -52,6 +53,9 @@ public class PollConfig<V, T, F extends PollConfig<V, T, F>> extends FeedConfig< this.description = other.description; } + public Boolean getSkipInitialRun() { return skipInitialRun; } + public F skipInitialRun(Boolean val) { this.skipInitialRun = val; return self(); } + public long getPeriod() { return period; } diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java index 440228be9b..57f70983b9 100644 --- a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java +++ b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java @@ -26,7 +26,12 @@ import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; -import com.google.common.collect.*; +import com.google.common.base.MoreObjects; +import com.google.common.collect.Iterables; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; +import com.google.common.collect.SetMultimap; +import com.google.common.collect.Sets; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.mgmt.SubscriptionHandle; import org.apache.brooklyn.api.mgmt.Task; @@ -51,8 +56,6 @@ import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.MoreObjects; - /** * For executing periodic polls. @@ -91,6 +94,7 @@ public class Poller<V> { public void schedulePoll(AbstractEntityAdjunct feed, Set<? extends PollConfig> pollConfigs, Callable pollCallable, PollHandler pollHandler) { boolean subscribed = false; long minPeriodMillis = Long.MAX_VALUE; + boolean overallSkipInitialRun = false; Set<Supplier<DslPredicates.DslPredicate>> conditions = MutableSet.of(); for (PollConfig pc: pollConfigs) { @@ -108,9 +112,10 @@ public class Poller<V> { } for (Pair<Entity, Sensor> pair : triggersResolved) { - subscribe(pollCallable, pollHandler, pair.getLeft(), pair.getRight(), pc.getCondition()); + subscribe(pollCallable, pollHandler, pair.getLeft(), pair.getRight(), Boolean.TRUE.equals(pc.getSkipInitialRun()), pc.getCondition()); subscribed = true; } + overallSkipInitialRun |= Boolean.TRUE.equals(pc.getSkipInitialRun()); } if (minPeriodMillis >0 && (minPeriodMillis < Duration.PRACTICALLY_FOREVER.toMilliseconds() || !subscribed)) { @@ -124,13 +129,14 @@ public class Poller<V> { return aggregate; }; } - scheduleAtFixedRate(pollCallable, pollHandler, Duration.millis(minPeriodMillis), condition); + scheduleAtFixedRate(pollCallable, pollHandler, Duration.millis(minPeriodMillis), overallSkipInitialRun, condition); } } private static class PollJob<V> { final PollHandler<? super V> handler; final Duration pollPeriod; + boolean skipInitialRun = false; final Callable<?> job; final Runnable wrappedJob; final Entity pollTriggerEntity; @@ -140,14 +146,15 @@ public class Poller<V> { private boolean loggedPreviousException = false; PollJob(final Callable<V> job, final PollHandler<? super V> handler, Duration period) { - this(job, handler, period, null, null, null); + this(job, handler, period, null, null, false, null); } - PollJob(final Callable<V> job, final PollHandler<? super V> handler, Duration period, Entity sensorSource, Sensor<?> sensor, Supplier<DslPredicates.DslPredicate> pollCondition) { + PollJob(final Callable<V> job, final PollHandler<? super V> handler, Duration period, Entity sensorSource, Sensor<?> sensor, boolean skipInitialRun, Supplier<DslPredicates.DslPredicate> pollCondition) { this.handler = handler; this.pollPeriod = period; this.pollTriggerEntity = sensorSource; this.pollTriggerSensor = sensor; + this.skipInitialRun = skipInitialRun; this.pollCondition = pollCondition; this.job = job; wrappedJob = new Runnable() { @@ -200,21 +207,27 @@ public class Poller<V> { } public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> handler, long periodMillis) { - scheduleAtFixedRate(job, handler, Duration.millis(periodMillis), null); + scheduleAtFixedRate(job, handler, Duration.millis(periodMillis), false, null); } public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> handler, Duration period) { - scheduleAtFixedRate(job, handler, period, null); + scheduleAtFixedRate(job, handler, period, false, null); } public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> handler, Duration period, Supplier<DslPredicates.DslPredicate> pollCondition) { + scheduleAtFixedRate(job, handler, period, false, pollCondition); + } + public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> handler, Duration period, boolean skipInitialRun, Supplier<DslPredicates.DslPredicate> pollCondition) { if (started) { throw new IllegalStateException("Cannot schedule additional tasks after poller has started"); } - PollJob<V> foo = new PollJob<V>(job, handler, period, null, null, pollCondition); + PollJob<V> foo = new PollJob<V>(job, handler, period, null, null, skipInitialRun, pollCondition); pollJobs.add(foo); } public void subscribe(Callable<V> job, PollHandler<? super V> handler, Entity sensorSource, Sensor<?> sensor, Supplier<DslPredicates.DslPredicate> condition) { - pollJobs.add(new PollJob<V>(job, handler, null, sensorSource, sensor, condition)); + subscribe(job, handler, sensorSource, sensor, false, condition); + } + public void subscribe(Callable<V> job, PollHandler<? super V> handler, Entity sensorSource, Sensor<?> sensor, boolean skipInitialRun, Supplier<DslPredicates.DslPredicate> condition) { + pollJobs.add(new PollJob<V>(job, handler, null, sensorSource, sensor, skipInitialRun, condition)); } @SuppressWarnings({ "unchecked" }) @@ -254,7 +267,7 @@ public class Poller<V> { return task; }; Multimap<Callable,PollJob> nonScheduledJobs = Multimaps.newSetMultimap(MutableMap.of(), MutableSet::of); - pollJobs.forEach(pollJob -> nonScheduledJobs.put(pollJob.job, pollJob)); + pollJobs.stream().filter(pj -> !pj.skipInitialRun).forEach(pollJob -> nonScheduledJobs.put(pollJob.job, pollJob)); // 'runInitially' could be an option on the job; currently we always do // if it's a scheduled task, that happens automatically; if it's a triggered task @@ -272,6 +285,7 @@ public class Poller<V> { added = true; tb.displayName("Periodic: " + scheduleName); tb.period(pollJob.pollPeriod); + if (pollJob.skipInitialRun) tb.delay(pollJob.pollPeriod); if (minPeriod==null || (pollJob.pollPeriod.isShorterThan(minPeriod))) { minPeriod = pollJob.pollPeriod; @@ -309,7 +323,7 @@ public class Poller<V> { } // no period for these, but we do need to run them initially, but combine if the Callable is the same (e.g. multiple triggers) - // not the PollJob is one per trigger, and the wrappedJob is specific to the poll job, but doesn't depend on the trigger, so we can just take the first + // note the PollJob is one per trigger, and the wrappedJob is specific to the poll job, but doesn't depend on the trigger, so we can just take the first nonScheduledJobs.asMap().forEach( (jobC,jobP) -> { Runnable job = jobP.iterator().next().wrappedJob; String jobSummaries = jobP.stream().map(j -> j.handler.getDescription()).filter(Strings::isNonBlank).collect(Collectors.joining(", ")); diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java index a5c2c5a241..afa88dc796 100644 --- a/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java @@ -58,6 +58,7 @@ public abstract class AbstractAddTriggerableSensor<T> extends AbstractAddSensorF public static final ConfigKey<Duration> SENSOR_PERIOD = ConfigKeys.newConfigKey(Duration.class, "period", "Period, including units e.g. 1m or 5s or 200ms", null); public static final ConfigKey<Object> SENSOR_TRIGGERS = ConfigKeys.newConfigKey(new TypeToken<Object>() {}, "triggers", "Sensors which should trigger this feed, supplied with list of maps containing sensor (name or sensor instance) and entity (ID or entity instance), or just sensor names or just one sensor"); + public static final ConfigKey<Boolean> SKIP_INITIAL_RUN = ConfigKeys.newConfigKey(Boolean.class, "skip_initial_run", "If set, skips running when added; runs only after the period or on a subsequent trigger"); public static final ConfigKey<DslPredicates.DslPredicate> CONDITION = ConfigKeys.newConfigKey(DslPredicates.DslPredicate.class, "condition", "Optional condition required for this sensor feed to run"); public static final ConfigKey<Boolean> ONLY_IF_SERVICE_UP = ConfigKeys.newBooleanConfigKey("onlyIfServiceUp", "Whether to run only if service is up.", null); @@ -203,6 +204,7 @@ public abstract class AbstractAddTriggerableSensor<T> extends AbstractAddSensorF .logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup) .logWarningGraceTime(logWarningGraceTime) .period(getPeriod(entity, initParams())) + .skipInitialRun(initParam(SKIP_INITIAL_RUN)) .otherTriggers(getTriggersMaybe(entity, configBag).orNull()) .condition(new ConditionSupplierFromConfigBag(configBag, entity)); diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowPolicy.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowPolicy.java index 02043513b0..2baefe05e1 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowPolicy.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowPolicy.java @@ -28,6 +28,7 @@ import org.apache.brooklyn.core.feed.PollConfig; import org.apache.brooklyn.core.feed.PollHandler; import org.apache.brooklyn.core.feed.Poller; import org.apache.brooklyn.core.policy.AbstractPolicy; +import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor; import org.apache.brooklyn.util.collections.MutableSet; import org.apache.brooklyn.util.core.predicates.DslPredicates; import org.apache.brooklyn.util.exceptions.Exceptions; @@ -50,9 +51,9 @@ public class WorkflowPolicy<T> extends AbstractPolicy implements WorkflowCommonC private static final Logger LOG = LoggerFactory.getLogger(WorkflowPolicy.class); - public static final ConfigKey<Duration> POLICY_PERIOD = ConfigKeys.newConfigKey(Duration.class, "period", "Period, including units e.g. 1m or 5s or 200ms", null); - public static final ConfigKey<Object> POLICY_TRIGGERS_SENSORS = ConfigKeys.newConfigKey(new TypeToken<Object>() {}, "triggers", - "Sensors which should trigger this policy, supplied with list of maps containing sensor (name or sensor instance) and entity (ID or entity instance), or just sensor names or just one sensor"); + public static final ConfigKey<Duration> POLICY_PERIOD = AbstractAddTriggerableSensor.SENSOR_PERIOD; + public static final ConfigKey<Object> POLICY_TRIGGERS_SENSORS = AbstractAddTriggerableSensor.SENSOR_TRIGGERS; + public static final ConfigKey<Boolean> SKIP_INITIAL_RUN = AbstractAddTriggerableSensor.SKIP_INITIAL_RUN; public static final ConfigKey<DslPredicates.DslPredicate> CONDITION = ConfigKeys.newConfigKey(DslPredicates.DslPredicate.class, "condition", "Optional condition required for this sensor feed to run"); @@ -137,6 +138,7 @@ public class WorkflowPolicy<T> extends AbstractPolicy implements WorkflowCommonC PollConfig pc = new PollConfig( (AttributeSensor) null ) .period(getConfig(POLICY_PERIOD)) + .skipInitialRun(getConfig(SKIP_INITIAL_RUN)) .otherTriggers(getConfig(POLICY_TRIGGERS_SENSORS)) .condition(new ConditionSupplierFromAdjunct()); diff --git a/software/base/src/test/java/org/apache/brooklyn/entity/brooklynnode/SelectMasterEffectorTest.java b/software/base/src/test/java/org/apache/brooklyn/entity/brooklynnode/SelectMasterEffectorTest.java index f17e527c86..2846625e1a 100644 --- a/software/base/src/test/java/org/apache/brooklyn/entity/brooklynnode/SelectMasterEffectorTest.java +++ b/software/base/src/test/java/org/apache/brooklyn/entity/brooklynnode/SelectMasterEffectorTest.java @@ -79,8 +79,7 @@ public class SelectMasterEffectorTest extends BrooklynAppUnitTestSupport { } }, new DelegatingPollHandler<Void>(Collections.<AttributePollHandler<? super Void>>emptyList()), - Duration.millis(20), - null); + Duration.millis(20)); poller.start(); }