This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit fd1d4cd8830c3ddcbf7e5962f2733021208969b1
Author: Alex Heneveld <a...@cloudsoft.io>
AuthorDate: Fri Oct 6 15:55:36 2023 +0100

    add support for `skip_initial_run` on workflow policies and sensors
---
 .../brooklyn/camp/brooklyn/WorkflowYamlTest.java   | 28 ++++++++++-----
 .../org/apache/brooklyn/core/feed/PollConfig.java  |  4 +++
 .../java/org/apache/brooklyn/core/feed/Poller.java | 40 +++++++++++++++-------
 .../core/sensor/AbstractAddTriggerableSensor.java  |  2 ++
 .../brooklyn/core/workflow/WorkflowPolicy.java     |  8 +++--
 .../brooklynnode/SelectMasterEffectorTest.java     |  3 +-
 6 files changed, 59 insertions(+), 26 deletions(-)

diff --git 
a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java
 
b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java
index fd9b157708..7779fc6d63 100644
--- 
a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java
+++ 
b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java
@@ -1068,23 +1068,35 @@ public class WorkflowYamlTest extends AbstractYamlTest {
         EntityAsserts.assertAttributeEqualsEventually(entity, 
Sensors.newIntegerSensor("x"), 7);
     }
 
-    @Test
-    public void testAddPolicyStep() throws Exception {
-        Entity app = createAndStartApplication(
-                "services:",
-                "- type: " + BasicEntity.class.getName());
-        Entity entity = Iterables.getOnlyElement(app.getChildren());
+    private void addSetXPolicy(Entity entity, String value, boolean 
ignoreTrigger, boolean skipInitial) {
         WorkflowExecutionContext x = WorkflowBasicTest.runWorkflow(entity, 
Strings.lines(
                 "steps:",
                 "  - type: add-policy",
                 "    blueprint:",
                 "      type: workflow-policy",
                 "      brooklyn.config:",
-                "        triggers: [ other_sensor ]",
-                "        steps: [ set-sensor integer x = 1 ]"
+                "        triggers: [ other_sensor" + (ignoreTrigger ? 
"_ignored" : "") + " ]",
+                "        " + (skipInitial ? "skip_initial_run: true" : ""),
+                "        steps: [ set-sensor integer x = "+value+" ]"
         ), "add-policy");
         x.getTask(false).get().getUnchecked();
+    }
+
+    @Test
+    public void testAddPolicyStep() throws Exception {
+        Entity app = createAndStartApplication(
+                "services:",
+                "- type: " + BasicEntity.class.getName());
+        Entity entity = Iterables.getOnlyElement(app.getChildren());
+        addSetXPolicy(entity, "1", false, false);
         EntityAsserts.assertAttributeEqualsEventually(entity, 
Sensors.newIntegerSensor("x"), 1);
+        addSetXPolicy(entity, "2", true, false);  // runs initially
+        EntityAsserts.assertAttributeEqualsEventually(entity, 
Sensors.newIntegerSensor("x"), 2);
+        addSetXPolicy(entity, "3", false, true);  // does not run initially
+        // Time.sleep(Duration.millis(250));  // uncomment this to really test 
it
+        EntityAsserts.assertAttribute(entity, Sensors.newIntegerSensor("x"), v 
-> !new Integer(3).equals(v));
+        entity.sensors().set(Sensors.newStringSensor("other_sensor"), "go"); 
// now it will run
+        EntityAsserts.assertAttributeEqualsEventually(entity, 
Sensors.newIntegerSensor("x"), 3);
     }
 
     @Test
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java 
b/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java
index 983ee71f20..3358162283 100644
--- a/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java
@@ -35,6 +35,7 @@ import org.apache.brooklyn.util.time.Duration;
  */
 public class PollConfig<V, T, F extends PollConfig<V, T, F>> extends 
FeedConfig<V, T, F> {
 
+    private Boolean skipInitialRun = null;  // null default is false
     private long period = -1;
     private Object otherTriggers;
     private String description;
@@ -52,6 +53,9 @@ public class PollConfig<V, T, F extends PollConfig<V, T, F>> 
extends FeedConfig<
         this.description = other.description;
     }
 
+    public Boolean getSkipInitialRun() { return skipInitialRun; }
+    public F skipInitialRun(Boolean val) { this.skipInitialRun = val; return 
self(); }
+
     public long getPeriod() {
         return period;
     }
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java 
b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
index 440228be9b..57f70983b9 100644
--- a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
@@ -26,7 +26,12 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-import com.google.common.collect.*;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
 import org.apache.brooklyn.api.mgmt.Task;
@@ -51,8 +56,6 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.MoreObjects;
-
 
 /** 
  * For executing periodic polls.
@@ -91,6 +94,7 @@ public class Poller<V> {
     public void schedulePoll(AbstractEntityAdjunct feed, Set<? extends 
PollConfig> pollConfigs, Callable pollCallable, PollHandler pollHandler) {
         boolean subscribed = false;
         long minPeriodMillis = Long.MAX_VALUE;
+        boolean overallSkipInitialRun = false;
         Set<Supplier<DslPredicates.DslPredicate>> conditions = MutableSet.of();
 
         for (PollConfig pc: pollConfigs) {
@@ -108,9 +112,10 @@ public class Poller<V> {
             }
 
             for (Pair<Entity, Sensor> pair : triggersResolved) {
-                subscribe(pollCallable, pollHandler, pair.getLeft(), 
pair.getRight(), pc.getCondition());
+                subscribe(pollCallable, pollHandler, pair.getLeft(), 
pair.getRight(), Boolean.TRUE.equals(pc.getSkipInitialRun()), 
pc.getCondition());
                 subscribed = true;
             }
+            overallSkipInitialRun |= 
Boolean.TRUE.equals(pc.getSkipInitialRun());
         }
 
         if (minPeriodMillis >0 && (minPeriodMillis < 
Duration.PRACTICALLY_FOREVER.toMilliseconds() || !subscribed)) {
@@ -124,13 +129,14 @@ public class Poller<V> {
                     return aggregate;
                 };
             }
-            scheduleAtFixedRate(pollCallable, pollHandler, 
Duration.millis(minPeriodMillis), condition);
+            scheduleAtFixedRate(pollCallable, pollHandler, 
Duration.millis(minPeriodMillis), overallSkipInitialRun, condition);
         }
     }
 
     private static class PollJob<V> {
         final PollHandler<? super V> handler;
         final Duration pollPeriod;
+        boolean skipInitialRun = false;
         final Callable<?> job;
         final Runnable wrappedJob;
         final Entity pollTriggerEntity;
@@ -140,14 +146,15 @@ public class Poller<V> {
         private boolean loggedPreviousException = false;
 
         PollJob(final Callable<V> job, final PollHandler<? super V> handler, 
Duration period) {
-            this(job, handler, period, null, null, null);
+            this(job, handler, period, null, null, false, null);
         }
 
-        PollJob(final Callable<V> job, final PollHandler<? super V> handler, 
Duration period, Entity sensorSource, Sensor<?> sensor, 
Supplier<DslPredicates.DslPredicate> pollCondition) {
+        PollJob(final Callable<V> job, final PollHandler<? super V> handler, 
Duration period, Entity sensorSource, Sensor<?> sensor, boolean skipInitialRun, 
Supplier<DslPredicates.DslPredicate> pollCondition) {
             this.handler = handler;
             this.pollPeriod = period;
             this.pollTriggerEntity = sensorSource;
             this.pollTriggerSensor = sensor;
+            this.skipInitialRun = skipInitialRun;
             this.pollCondition = pollCondition;
             this.job = job;
             wrappedJob = new Runnable() {
@@ -200,21 +207,27 @@ public class Poller<V> {
     }
 
     public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> 
handler, long periodMillis) {
-        scheduleAtFixedRate(job, handler, Duration.millis(periodMillis), null);
+        scheduleAtFixedRate(job, handler, Duration.millis(periodMillis), 
false, null);
     }
     public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> 
handler, Duration period) {
-        scheduleAtFixedRate(job, handler, period, null);
+        scheduleAtFixedRate(job, handler, period, false, null);
     }
     public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> 
handler, Duration period, Supplier<DslPredicates.DslPredicate> pollCondition) {
+        scheduleAtFixedRate(job, handler, period, false, pollCondition);
+    }
+    public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> 
handler, Duration period, boolean skipInitialRun, 
Supplier<DslPredicates.DslPredicate> pollCondition) {
         if (started) {
             throw new IllegalStateException("Cannot schedule additional tasks 
after poller has started");
         }
-        PollJob<V> foo = new PollJob<V>(job, handler, period, null, null, 
pollCondition);
+        PollJob<V> foo = new PollJob<V>(job, handler, period, null, null, 
skipInitialRun, pollCondition);
         pollJobs.add(foo);
     }
 
     public void subscribe(Callable<V> job, PollHandler<? super V> handler, 
Entity sensorSource, Sensor<?> sensor, Supplier<DslPredicates.DslPredicate> 
condition) {
-        pollJobs.add(new PollJob<V>(job, handler, null, sensorSource, sensor, 
condition));
+        subscribe(job, handler, sensorSource, sensor, false, condition);
+    }
+    public void subscribe(Callable<V> job, PollHandler<? super V> handler, 
Entity sensorSource, Sensor<?> sensor, boolean skipInitialRun, 
Supplier<DslPredicates.DslPredicate> condition) {
+        pollJobs.add(new PollJob<V>(job, handler, null, sensorSource, sensor, 
skipInitialRun, condition));
     }
 
     @SuppressWarnings({ "unchecked" })
@@ -254,7 +267,7 @@ public class Poller<V> {
             return task;
         };
         Multimap<Callable,PollJob> nonScheduledJobs = 
Multimaps.newSetMultimap(MutableMap.of(), MutableSet::of);
-        pollJobs.forEach(pollJob -> nonScheduledJobs.put(pollJob.job, 
pollJob));
+        pollJobs.stream().filter(pj -> !pj.skipInitialRun).forEach(pollJob -> 
nonScheduledJobs.put(pollJob.job, pollJob));
 
         // 'runInitially' could be an option on the job; currently we always do
         // if it's a scheduled task, that happens automatically; if it's a 
triggered task
@@ -272,6 +285,7 @@ public class Poller<V> {
                 added = true;
                 tb.displayName("Periodic: " + scheduleName);
                 tb.period(pollJob.pollPeriod);
+                if (pollJob.skipInitialRun) tb.delay(pollJob.pollPeriod);
 
                 if (minPeriod==null || 
(pollJob.pollPeriod.isShorterThan(minPeriod))) {
                     minPeriod = pollJob.pollPeriod;
@@ -309,7 +323,7 @@ public class Poller<V> {
         }
 
         // no period for these, but we do need to run them initially, but 
combine if the Callable is the same (e.g. multiple triggers)
-        // not the PollJob is one per trigger, and the wrappedJob is specific 
to the poll job, but doesn't depend on the trigger, so we can just take the 
first
+        // note the PollJob is one per trigger, and the wrappedJob is specific 
to the poll job, but doesn't depend on the trigger, so we can just take the 
first
         nonScheduledJobs.asMap().forEach( (jobC,jobP) -> {
             Runnable job = jobP.iterator().next().wrappedJob;
             String jobSummaries = jobP.stream().map(j -> 
j.handler.getDescription()).filter(Strings::isNonBlank).collect(Collectors.joining(",
 "));
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java
 
b/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java
index a5c2c5a241..afa88dc796 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java
@@ -58,6 +58,7 @@ public abstract class AbstractAddTriggerableSensor<T> extends 
AbstractAddSensorF
     public static final ConfigKey<Duration> SENSOR_PERIOD = 
ConfigKeys.newConfigKey(Duration.class, "period", "Period, including units e.g. 
1m or 5s or 200ms", null);
     public static final ConfigKey<Object> SENSOR_TRIGGERS = 
ConfigKeys.newConfigKey(new TypeToken<Object>() {}, "triggers",
             "Sensors which should trigger this feed, supplied with list of 
maps containing sensor (name or sensor instance) and entity (ID or entity 
instance), or just sensor names or just one sensor");
+    public static final ConfigKey<Boolean> SKIP_INITIAL_RUN = 
ConfigKeys.newConfigKey(Boolean.class, "skip_initial_run", "If set, skips 
running when added; runs only after the period or on a subsequent trigger");
     public static final ConfigKey<DslPredicates.DslPredicate> CONDITION = 
ConfigKeys.newConfigKey(DslPredicates.DslPredicate.class, "condition", 
"Optional condition required for this sensor feed to run");
 
     public static final ConfigKey<Boolean> ONLY_IF_SERVICE_UP = 
ConfigKeys.newBooleanConfigKey("onlyIfServiceUp", "Whether to run only if 
service is up.", null);
@@ -203,6 +204,7 @@ public abstract class AbstractAddTriggerableSensor<T> 
extends AbstractAddSensorF
                 .logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup)
                 .logWarningGraceTime(logWarningGraceTime)
                 .period(getPeriod(entity, initParams()))
+                .skipInitialRun(initParam(SKIP_INITIAL_RUN))
                 .otherTriggers(getTriggersMaybe(entity, configBag).orNull())
                 .condition(new ConditionSupplierFromConfigBag(configBag, 
entity));
 
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowPolicy.java 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowPolicy.java
index 02043513b0..2baefe05e1 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowPolicy.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowPolicy.java
@@ -28,6 +28,7 @@ import org.apache.brooklyn.core.feed.PollConfig;
 import org.apache.brooklyn.core.feed.PollHandler;
 import org.apache.brooklyn.core.feed.Poller;
 import org.apache.brooklyn.core.policy.AbstractPolicy;
+import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor;
 import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.core.predicates.DslPredicates;
 import org.apache.brooklyn.util.exceptions.Exceptions;
@@ -50,9 +51,9 @@ public class WorkflowPolicy<T> extends AbstractPolicy 
implements WorkflowCommonC
 
     private static final Logger LOG = 
LoggerFactory.getLogger(WorkflowPolicy.class);
 
-    public static final ConfigKey<Duration> POLICY_PERIOD = 
ConfigKeys.newConfigKey(Duration.class, "period", "Period, including units e.g. 
1m or 5s or 200ms", null);
-    public static final ConfigKey<Object> POLICY_TRIGGERS_SENSORS = 
ConfigKeys.newConfigKey(new TypeToken<Object>() {}, "triggers",
-            "Sensors which should trigger this policy, supplied with list of 
maps containing sensor (name or sensor instance) and entity (ID or entity 
instance), or just sensor names or just one sensor");
+    public static final ConfigKey<Duration> POLICY_PERIOD = 
AbstractAddTriggerableSensor.SENSOR_PERIOD;
+    public static final ConfigKey<Object> POLICY_TRIGGERS_SENSORS =  
AbstractAddTriggerableSensor.SENSOR_TRIGGERS;
+    public static final ConfigKey<Boolean> SKIP_INITIAL_RUN = 
AbstractAddTriggerableSensor.SKIP_INITIAL_RUN;
 
     public static final ConfigKey<DslPredicates.DslPredicate> CONDITION = 
ConfigKeys.newConfigKey(DslPredicates.DslPredicate.class, "condition", 
"Optional condition required for this sensor feed to run");
 
@@ -137,6 +138,7 @@ public class WorkflowPolicy<T> extends AbstractPolicy 
implements WorkflowCommonC
 
         PollConfig pc = new PollConfig( (AttributeSensor) null )
                 .period(getConfig(POLICY_PERIOD))
+                .skipInitialRun(getConfig(SKIP_INITIAL_RUN))
                 .otherTriggers(getConfig(POLICY_TRIGGERS_SENSORS))
                 .condition(new ConditionSupplierFromAdjunct());
 
diff --git 
a/software/base/src/test/java/org/apache/brooklyn/entity/brooklynnode/SelectMasterEffectorTest.java
 
b/software/base/src/test/java/org/apache/brooklyn/entity/brooklynnode/SelectMasterEffectorTest.java
index f17e527c86..2846625e1a 100644
--- 
a/software/base/src/test/java/org/apache/brooklyn/entity/brooklynnode/SelectMasterEffectorTest.java
+++ 
b/software/base/src/test/java/org/apache/brooklyn/entity/brooklynnode/SelectMasterEffectorTest.java
@@ -79,8 +79,7 @@ public class SelectMasterEffectorTest extends 
BrooklynAppUnitTestSupport {
                 }
             },
             new 
DelegatingPollHandler<Void>(Collections.<AttributePollHandler<? super 
Void>>emptyList()),
-            Duration.millis(20),
-            null);
+            Duration.millis(20));
         poller.start();
     }
 

Reply via email to