This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit 16f868a9b918c46e32fb689f6abc7c6c8c9ff501
Author: Alex Heneveld <a...@cloudsoft.io>
AuthorDate: Mon Apr 1 11:30:51 2024 +0100

    allow soft retention to be specified, globally or per workflow/step
---
 .../core/workflow/WorkflowExecutionContext.java    |   7 +-
 .../store/WorkflowRetentionAndExpiration.java      | 184 ++++++++++++++++-----
 .../store/WorkflowStateActiveInMemory.java         |  18 +-
 .../store/WorkflowStatePersistenceViaSensors.java  |  71 +++-----
 .../workflow/utils/WorkflowRetentionParser.java    | 105 ++++++++----
 .../workflow/WorkflowPersistReplayErrorsTest.java  |  22 ++-
 .../SoftwareProcessRebindNotRunningEntityTest.java |   8 +-
 7 files changed, 275 insertions(+), 140 deletions(-)

diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
index 8cffdcbcc3..f026023198 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
@@ -633,17 +633,12 @@ public class WorkflowExecutionContext {
         return ((EntityInternal)getEntity()).getManagementContext();
     }
 
-    @JsonIgnore
-    protected WorkflowStatePersistenceViaSensors getPersister() {
-        return new WorkflowStatePersistenceViaSensors(getManagementContext());
-    }
-
     public void persist() {
         if (isInErrorHandlerSubWorkflow()) {
             // currently don't persist error handler sub-workflows
             return;
         }
-        getPersister().checkpoint(this);
+        WorkflowRetentionAndExpiration.checkpoint(getManagementContext(), 
this);
     }
 
     /** Get the value of the input. Supports Brooklyn DSL resolution but NOT 
Freemarker resolution. */
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java
index 4c2c667d59..852c75d11f 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java
@@ -18,9 +18,20 @@
  */
 package org.apache.brooklyn.core.workflow.store;
 
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.mgmt.ManagementContext;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
@@ -28,6 +39,7 @@ import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.workflow.WorkflowExecutionContext;
+import 
org.apache.brooklyn.core.workflow.store.WorkflowStatePersistenceViaSensors.PersistenceWithQueuedTasks;
 import org.apache.brooklyn.core.workflow.utils.WorkflowRetentionParser;
 import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.core.task.BasicExecutionManager;
@@ -36,15 +48,51 @@ import org.apache.brooklyn.util.text.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-import java.util.stream.Collectors;
-
 public class WorkflowRetentionAndExpiration {
 
     private static final Logger log = 
LoggerFactory.getLogger(WorkflowRetentionAndExpiration.class);
 
     public static final ConfigKey<String> WORKFLOW_RETENTION_DEFAULT = 
ConfigKeys.newStringConfigKey("workflow.retention.default",
-            "Default retention for workflows", "3");
+            "Default retention for workflows (persisted)", "3");
+    public static final ConfigKey<String> WORKFLOW_RETENTION_DEFAULT_SOFT = 
ConfigKeys.newStringConfigKey("workflow.retention.default.soft",
+            "Default soft retention for workflows (in-memory)", "3");
+
+    public static void checkpoint(ManagementContext mgmt, 
WorkflowExecutionContext context) {
+        Entity entity = context.getEntity();
+        if (Entities.isUnmanagingOrNoLongerManaged(entity)) {
+            log.debug("Skipping persistence of "+context+" as entity is no 
longer active here");
+            return;
+        }
+
+        doGlobalUpdateIfNeededOnDiskAndInMemory(mgmt);
+
+        new WorkflowStatePersistenceViaSensors(mgmt).checkpoint(context, 
PersistenceWithQueuedTasks.WARN);
+
+        // keep active workflows in memory, even if disabled
+        
WorkflowStateActiveInMemory.get(context.getManagementContext()).checkpoint(context);
+    }
+
+    static final long GLOBAL_UPDATE_FREQUENCY = 5*60*1000;  // every 5m wipe 
out old workflows
+
+    static void doGlobalUpdateIfNeededOnDiskAndInMemory(ManagementContext 
mgmt) {
+        WorkflowStateActiveInMemory inMem = 
WorkflowStateActiveInMemory.get(mgmt);
+
+        if (inMem.lastGlobalClear + GLOBAL_UPDATE_FREQUENCY > 
System.currentTimeMillis()) return;
+        inMem.lastGlobalClear = System.currentTimeMillis();
+
+        AtomicInteger total = new AtomicInteger(0);
+        Collection<Entity> entities = mgmt.getEntityManager().getEntities();
+        entities.forEach(entity -> {
+            // on disk
+            int change = new 
WorkflowStatePersistenceViaSensors(mgmt).expireOldWorkflowsOnDisk(entity, null);
+            if (change!=0) log.debug("Global entity workflow persistence 
update, removed "+(-change)+" workflows from "+entity);
+            total.addAndGet(change);
+
+            // in memory
+            inMem.recomputeExpiration(entity, null);
+        });
+        if (total.get()!=0) log.debug("Global entity workflow persistence 
update, removed "+(-total.get())+" workflows across all "+entities.size()+" 
entities");
+    }
 
     @JsonInclude(JsonInclude.Include.NON_EMPTY)
     public static class WorkflowRetentionSettings {
@@ -60,35 +108,64 @@ public class WorkflowRetentionAndExpiration {
 
         @JsonIgnore
         private transient WorkflowRetentionParser.WorkflowRetentionFilter 
expiryFn;
+        @JsonIgnore
+        private transient WorkflowRetentionParser.WorkflowRetentionFilter 
softExpiryFn;
 
         public WorkflowRetentionParser.WorkflowRetentionFilter 
getExpiryFn(WorkflowExecutionContext w) {
             return init(w).expiryFn;
         }
+        public WorkflowRetentionParser.WorkflowRetentionFilter 
getSoftExpiryFn(WorkflowExecutionContext w) {
+            return init(w).softExpiryFn;
+        }
         public WorkflowRetentionSettings init(WorkflowExecutionContext w) {
             if (w.getParent()!=null && 
Boolean.TRUE.equals(w.getParent().getRetentionSettings().disabled)) {
                 disabled = true;
 
-            } else if (expiryFn==null) {
-                expiryResolved = expiryResolved!=null ? expiryResolved : 
expiry;
-                expiryFn = new WorkflowRetentionParser(expiryResolved).parse();
-                if (w != null) {
-                    Set<String> set = INIT_REENTRANT.get();
-                    if (set==null) {
-                        set = MutableSet.of();
-                        INIT_REENTRANT.set(set);
-                    }
-                    if (!set.add(w.getWorkflowId()+":"+expiryResolved)) {
-                        // double-check we don't cause endless loops; see 
KeepContext notes
-                        throw new IllegalStateException("Invalid workflow 
retention '"+expiryResolved+"' as it refers to itself");
+            } else {
+                if (expiryFn == null) {
+                    expiryResolved = expiryResolved != null ? expiryResolved : 
expiry;
+                    expiryFn = new 
WorkflowRetentionParser(expiryResolved).parse();
+                    if (w != null) {
+                        Set<String> set = INIT_REENTRANT.get();
+                        if (set == null) {
+                            set = MutableSet.of();
+                            INIT_REENTRANT.set(set);
+                        }
+                        if (!set.add(w.getWorkflowId() + ":" + 
expiryResolved)) {
+                            // double-check we don't cause endless loops; see 
KeepContext notes
+                            throw new IllegalStateException("Invalid workflow 
retention '" + expiryResolved + "' as it refers to itself");
+                        }
+                        try {
+                            expiryFn = expiryFn.init(w);
+                        } finally {
+                            set.remove(w.getWorkflowId() + ":" + 
expiryResolved);
+                            if (set.isEmpty()) INIT_REENTRANT.remove();
+                        }
                     }
-                    try {
-                        expiryFn = expiryFn.init(w);
-                    } finally {
-                        set.remove(w.getWorkflowId()+":"+expiryResolved);
-                        if (set.isEmpty()) INIT_REENTRANT.remove();
+                    expiryResolved = expiryFn.toString();  // remove any 
references to `context` that might trigger an infinite loop
+                }
+                if (softExpiryFn == null) {
+                    softExpiryResolved = softExpiryResolved != null ? 
softExpiryResolved : softExpiry;
+                    softExpiryFn = new 
WorkflowRetentionParser(softExpiryResolved).soft().parse();
+                    if (w != null) {
+                        Set<String> set = INIT_REENTRANT.get();
+                        if (set == null) {
+                            set = MutableSet.of();
+                            INIT_REENTRANT.set(set);
+                        }
+                        if (!set.add(w.getWorkflowId() + ":" + 
softExpiryResolved)) {
+                            // double-check we don't cause endless loops; see 
KeepContext notes
+                            throw new IllegalStateException("Invalid workflow 
retention '" + softExpiryResolved + "' as it refers to itself");
+                        }
+                        try {
+                            softExpiryFn = softExpiryFn.init(w);
+                        } finally {
+                            set.remove(w.getWorkflowId() + ":" + 
softExpiryResolved);
+                            if (set.isEmpty()) INIT_REENTRANT.remove();
+                        }
                     }
+                    softExpiryResolved = softExpiryFn.toString();  // remove 
any references to `context` that might trigger an infinite loop
                 }
-                expiryResolved = expiryFn.toString();  // remove any 
references to `context` that might trigger an infinite loop
             }
             return this;
         }
@@ -101,42 +178,67 @@ public class WorkflowRetentionAndExpiration {
                 this.expiryFn = r2.expiryFn;
                 this.expiryResolved = r2.expiryResolved;
             }
+            if (Strings.isNonEmpty(r2.softExpiry)) {
+                this.softExpiry = r2.softExpiry;
+                this.softExpiryFn = r2.softExpiryFn;
+                this.softExpiryResolved = r2.softExpiryResolved;
+            }
         }
     }
 
     static ThreadLocal<Set<String>> INIT_REENTRANT = new 
ThreadLocal<Set<String>>();
 
-    static Map<String, WorkflowExecutionContext> 
recomputeExpiration(Map<String, WorkflowExecutionContext> v, 
WorkflowExecutionContext optionalContext, boolean useSoftlyKeptExpiry) {
+    static Map<String, WorkflowExecutionContext> 
recomputeExpiration(Map<String, WorkflowExecutionContext> v, @Nullable 
WorkflowExecutionContext optionalContext, boolean useSoftlyKeptExpiry) {
         Set<String> workflowHashesToUpdate = optionalContext!=null ? 
MutableSet.of(Strings.firstNonBlank(optionalContext.getRetentionHash(), 
"empty-expiry-hash"))  //should always be set
             : 
v.values().stream().map(WorkflowExecutionContext::getRetentionHash).collect(Collectors.toSet());
 
         workflowHashesToUpdate.forEach(k -> {
+            // if optional context supplied, perhaps only recompute for that 
hash
+            if (optionalContext!=null && 
!k.equals(optionalContext.getRetentionHash())) {
+                if (!isExpirable(optionalContext)) {
+                    return;
+                } else {
+                    // no-op -- if it is expirable, do a full recompute for 
the entity, to ensure sub-workflows are no longer retained
+                    // (cross-entity subworkflows will not be cleaned up; they 
will get collected when another workflow runs there,
+                    // or when there is a global cleanup event)
+                }
+            }
+
             List<WorkflowExecutionContext> finishedTwins = v.values().stream()
                     .filter(c -> k.equals(c.getRetentionHash()))
                     .filter(c -> isExpirable(c))
-                    .filter(c -> !c.equals(optionalContext))
                     .collect(Collectors.toList());
 
             if (finishedTwins.isEmpty()) return;
 
-            Optional<WorkflowExecutionContext> existingRetentionExpiry = 
finishedTwins.stream().filter(w -> w.getRetentionSettings().expiry != 
null).findAny();
+            Function<WorkflowRetentionSettings,String> expiryAccessor = 
useSoftlyKeptExpiry ? wrs -> wrs.softExpiry : wrs -> wrs.expiry;
+            Optional<WorkflowExecutionContext> existingRetentionExpiry;
+            if (optionalContext!=null && 
k.equals(optionalContext.getRetentionHash()) && 
expiryAccessor.apply(optionalContext.getRetentionSettings())!=null)
+                existingRetentionExpiry = Optional.of(optionalContext);
+            else
+                existingRetentionExpiry = finishedTwins.stream().filter(w -> 
expiryAccessor.apply(w.getRetentionSettings()) != null).findAny();
+
             WorkflowRetentionParser.WorkflowRetentionFilter expiry;
-            if (useSoftlyKeptExpiry) {
-                expiry = 
WorkflowRetentionParser.newDefaultSoftFilter().init(finishedTwins.iterator().next());
-            } else if (existingRetentionExpiry.isPresent()) {
+
+            if (existingRetentionExpiry.isPresent()) {
                 // log if expiry fn differs for the same hash
                 // (but note if it refers to parents, invocations from 
different places could result in different expiry functions)
-                if (optionalContext!=null && 
optionalContext.getRetentionHash().equals(k)) {
-                    if (optionalContext.getRetentionSettings().expiry != null) 
{
-                        if 
(!optionalContext.getRetentionSettings().expiry.equals(existingRetentionExpiry.get().getRetentionSettings().expiry))
 {
-                            log.warn("Retention specification for " + 
optionalContext + " '" + optionalContext.getRetentionSettings().expiry + "' is 
different for same hash. " +
-                                    "Expiry should be constant within a hash 
but " + existingRetentionExpiry.get() + " has '" + 
existingRetentionExpiry.get().getRetentionSettings().expiry + "'");
+                // (no such warning for the soft side of it)
+                if (useSoftlyKeptExpiry) {
+                    expiry = 
existingRetentionExpiry.get().getRetentionSettings().getSoftExpiryFn(existingRetentionExpiry.get());
+                } else {
+                    if (optionalContext != null && 
optionalContext.getRetentionHash().equals(k)) {
+                        if (optionalContext.getRetentionSettings().expiry != 
null) {
+                            if 
(!optionalContext.getRetentionSettings().expiry.equals(existingRetentionExpiry.get().getRetentionSettings().expiry))
 {
+                                log.warn("Retention specification for " + 
optionalContext + " '" + optionalContext.getRetentionSettings().expiry + "' is 
different for same hash. " +
+                                        "Expiry should be constant within a 
hash but " + existingRetentionExpiry.get() + " has '" + 
existingRetentionExpiry.get().getRetentionSettings().expiry + "'");
+                            }
                         }
                     }
+                    expiry = 
existingRetentionExpiry.get().getRetentionSettings().getExpiryFn(existingRetentionExpiry.get());
                 }
-                expiry = 
existingRetentionExpiry.get().getRetentionSettings().getExpiryFn(existingRetentionExpiry.get());
             } else {
-                expiry = 
WorkflowRetentionParser.newDefaultFilter().init(finishedTwins.iterator().next());
+                expiry = 
WorkflowRetentionParser.newDefaultFilter(useSoftlyKeptExpiry).init(finishedTwins.iterator().next());
             }
 
             Collection<WorkflowExecutionContext> retainedFinishedTwins = 
expiry.apply(finishedTwins);
@@ -169,12 +271,16 @@ public class WorkflowRetentionAndExpiration {
 
     private static boolean isExpirable(WorkflowExecutionContext c) {
         if (c.getStatus() == null || !c.getStatus().expirable) return false;
+
+        // should we expire of completed children workflows even if an 
ancestor workflow is not expirable?
+        // this would prevent silly retention of workflows whose parents are 
about to end, where the retention check runs on the child just before the 
parent finishes;
+        // however it would also limit the ability to inspect children 
workflows e.g. in a foreach block, as only e.g. 3 of the children would be kept 
ever.
+        // on balance, do NOT expire those; wait for another event to trigger 
their clean-up.
+        // (if the child workflow is marked disabled, it is not persisted, but 
takes effect in all other cases.)
         if (c.getParent()!=null) {
-            // XXX for size reasons, should skip this - fow now, don't expire 
children workflows unless parents are also expirable
             if (!isExpirable(c.getParent())) return false;
-
-            // we could weaken this if we have lots of children workflows, but 
that is more work; left as an enhancement
         }
+
         return true;
     }
 
@@ -192,6 +298,6 @@ public class WorkflowRetentionAndExpiration {
 
 
     public static void expireOldWorkflows(Entity entity) {
-        new 
WorkflowStatePersistenceViaSensors(((EntityInternal)entity).getManagementContext()).updateMaps(entity,
 true, true, null, null);
+        new 
WorkflowStatePersistenceViaSensors(((EntityInternal)entity).getManagementContext()).updateMaps(entity,
 null, true, true, true, null, null);
     }
 }
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java
index 88fe2319f2..0083b167dc 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java
@@ -21,6 +21,7 @@ package org.apache.brooklyn.core.workflow.store;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
+import javax.annotation.Nullable;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
@@ -30,7 +31,6 @@ import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.workflow.WorkflowExecutionContext;
-import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.collections.MutableSet;
 import org.slf4j.Logger;
@@ -42,7 +42,10 @@ public class WorkflowStateActiveInMemory {
 
     public static final ConfigKey<WorkflowStateActiveInMemory> 
IN_MEMORY_WORKFLOWS = 
ConfigKeys.newConfigKey(WorkflowStateActiveInMemory.class, 
"internals.brooklyn.workflow.in_memory");
 
-    private static final long GLOBAL_UPDATE_FREQUENCY = 5*60*1000;  // every 
5m wipe out workflows from old entities
+    long lastInMemEntitiesClear = System.currentTimeMillis();
+
+    // this applies to both sensors and active, but is stored here as this 
instance is kept on the mgmt context
+    long lastGlobalClear = System.currentTimeMillis();
 
     public static WorkflowStateActiveInMemory get(ManagementContext mgmt) {
         WorkflowStateActiveInMemory localActiveWorkflows = 
mgmt.getScratchpad().get(IN_MEMORY_WORKFLOWS);
@@ -69,10 +72,8 @@ public class WorkflowStateActiveInMemory {
         this.mgmt = mgmt;
     }
 
-    long lastInMemClear = System.currentTimeMillis();
-
     public void expireAbsentEntities() {
-        lastInMemClear = System.currentTimeMillis();
+        lastInMemEntitiesClear = System.currentTimeMillis();
         Set<String> copy;
         synchronized (active) { copy = MutableSet.copyOf(active.keySet()); }
         synchronized (completedSoftlyKept) { 
copy.addAll(completedSoftlyKept.keySet()); }
@@ -89,11 +90,12 @@ public class WorkflowStateActiveInMemory {
         if (context.getStatus().expirable) {
             withActiveForEntity(context.getEntity().getId(), false, wfm -> 
wfm.remove(context.getWorkflowId()));
             withSoftlyKeptForEntity(context.getEntity().getId(), true, wfm -> 
{ wfm.put(context.getWorkflowId(), context); return null; });
+            recomputeExpiration(context.getEntity(), context);
         } else {
             // keep active workflows in memory, even if disabled
             withActiveForEntity(context.getEntity().getId(), true, wfm -> 
wfm.put(context.getWorkflowId(), context));
         }
-        if (lastInMemClear + GLOBAL_UPDATE_FREQUENCY < 
System.currentTimeMillis()) {
+        if (lastInMemEntitiesClear + 
WorkflowRetentionAndExpiration.GLOBAL_UPDATE_FREQUENCY < 
System.currentTimeMillis()) {
             // poor man's cleanup, every minute, but good enough
             expireAbsentEntities();
         }
@@ -160,9 +162,9 @@ public class WorkflowStateActiveInMemory {
         return result;
     }
 
-    public void recomputeExpiration(Entity entity) {
+    public void recomputeExpiration(Entity entity, @Nullable 
WorkflowExecutionContext optionalContext) {
         withSoftlyKeptForEntity(entity.getId(), false, wfm -> {
-            WorkflowRetentionAndExpiration.recomputeExpiration(wfm.asMap(), 
null, true);
+            WorkflowRetentionAndExpiration.recomputeExpiration(wfm.asMap(), 
optionalContext, true);
             return null;
         });
     }
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
index 97c37fc1a3..ffd1698c91 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
@@ -18,6 +18,14 @@
  */
 package org.apache.brooklyn.core.workflow.store;
 
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
 import com.google.common.reflect.TypeToken;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.mgmt.ManagementContext;
@@ -25,7 +33,6 @@ import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.sensor.Sensors;
 import org.apache.brooklyn.core.workflow.WorkflowExecutionContext;
@@ -35,15 +42,6 @@ import org.apache.brooklyn.util.guava.Maybe;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
 public class WorkflowStatePersistenceViaSensors {
 
     private static final Logger log = 
LoggerFactory.getLogger(WorkflowStatePersistenceViaSensors.class);
@@ -52,8 +50,6 @@ public class WorkflowStatePersistenceViaSensors {
 
     public static final AttributeSensor<Map<String,WorkflowExecutionContext>> 
INTERNAL_WORKFLOWS = Sensors.newSensor(new TypeToken<Map<String, 
WorkflowExecutionContext>>() {}, "internals.brooklyn.workflow");
 
-    private static final long GLOBAL_UPDATE_FREQUENCY = 5*60*1000;  // every 
5m wipe out old workflows
-
     public static WorkflowStatePersistenceViaSensors get(ManagementContext 
mgmt) {
         WorkflowStatePersistenceViaSensors sharedInstance = 
mgmt.getScratchpad().get(SENSOR_WORKFLOW_PERSISTER);
         if (sharedInstance==null) {
@@ -76,27 +72,11 @@ public class WorkflowStatePersistenceViaSensors {
 
     enum PersistenceWithQueuedTasks { ALLOW, WARN, FAIL }
 
-    long lastInMemClear = System.currentTimeMillis();
-
-    public void checkpoint(WorkflowExecutionContext context) {
-        checkpoint(context, PersistenceWithQueuedTasks.WARN);
-    }
-    public void checkpoint(WorkflowExecutionContext context, 
PersistenceWithQueuedTasks expectQueuedTasks) {
-        doGlobalUpdateIfNeeded();
-
-        Entity entity = context.getEntity();
-        if (Entities.isUnmanagingOrNoLongerManaged(entity)) {
-            log.debug("Skipping persistence of "+context+" as entity is no 
longer active here");
-            return;
-        }
-
-        // keep active workflows in memory, even if disabled
-        
WorkflowStateActiveInMemory.get(context.getManagementContext()).checkpoint(context);
-
+    protected void checkpoint(WorkflowExecutionContext context, 
PersistenceWithQueuedTasks expectQueuedTasks) {
         if (Boolean.TRUE.equals(context.getRetentionSettings().disabled)) {
             if (getFromTag(BrooklynTaskTags.tagForWorkflow(context), false, 
false)!=null) {
                 // need to clear
-                updateMap(entity, false, true, v -> 
v.remove(context.getWorkflowId(), context));
+                updateMap(context.getEntity(), context, false, true, v -> 
v.remove(context.getWorkflowId(), context));
             }
             return;
         }
@@ -110,28 +90,15 @@ public class WorkflowStatePersistenceViaSensors {
             }
         }
 
-        expireOldWorkflows(entity, context);
-    }
-
-    private void doGlobalUpdateIfNeeded() {
-        if (lastInMemClear + GLOBAL_UPDATE_FREQUENCY > 
System.currentTimeMillis()) return;
-        lastInMemClear = System.currentTimeMillis();
-        AtomicInteger total = new AtomicInteger(0);
-        Collection<Entity> entities = mgmt.getEntityManager().getEntities();
-        entities.forEach(entity -> {
-            int change = expireOldWorkflows(entity, null);
-            if (change!=0) log.debug("Global entity workflow persistence 
update, removed "+(-change)+" workflows from "+entity);
-            total.addAndGet(change);
-        });
-        if (total.get()!=0) log.debug("Global entity workflow persistence 
update, removed "+(-total.get())+" workflows across all "+entities.size()+" 
entities");
+        expireOldWorkflowsOnDisk(context.getEntity(), context);
     }
 
-    public int expireOldWorkflows(Entity entity, @Nullable 
WorkflowExecutionContext context) {
+    int expireOldWorkflowsOnDisk(Entity entity, @Nullable 
WorkflowExecutionContext context) {
         // clear interrupt status so we can persist e.g. if we are interrupted 
or shutdown
         boolean interrupted = Thread.interrupted();
         boolean doExpiry = 
WorkflowRetentionAndExpiration.isExpirationCheckNeeded(entity);
         try {
-            return updateMaps(entity, doExpiry, true, context==null ? null : v 
-> v.put(context.getWorkflowId(), context), null);
+            return updateMaps(entity, null, doExpiry, false, true, 
context==null ? null : v -> v.put(context.getWorkflowId(), context), null);
 
         } finally {
             if (interrupted) Thread.currentThread().interrupt();
@@ -142,7 +109,7 @@ public class WorkflowStatePersistenceViaSensors {
         if (w.getStatus()==null || w.getStatus().expirable || w.getStatus()== 
WorkflowExecutionContext.WorkflowStatus.STAGED) {
             log.debug("Explicit request to delete workflow "+w);
             AtomicBoolean result = new AtomicBoolean(false);
-            updateMaps(w.getEntity(), false, true, map -> {
+            updateMaps(w.getEntity(), w, false, false, true, map -> {
                 boolean removed = 
WorkflowRetentionAndExpiration.deleteWorkflowFromMap(map, w, true, true);
                 if (removed) result.set(true);
             }, w);
@@ -153,26 +120,26 @@ public class WorkflowStatePersistenceViaSensors {
         }
     }
 
-    int updateMaps(Entity entity, boolean doExpiry, boolean persist, 
Consumer<Map<String,WorkflowExecutionContext>> action, WorkflowExecutionContext 
contextToRemoveFromSoftMemory) {
-        int result = updateMap(entity, doExpiry, persist, action);
+    int updateMaps(Entity entity, @Nullable WorkflowExecutionContext 
optionalContext, boolean doExpiryForSensor, boolean doExpiryInMemory, boolean 
persist, Consumer<Map<String,WorkflowExecutionContext>> action, 
WorkflowExecutionContext contextToRemoveFromSoftMemory) {
+        int result = updateMap(entity, optionalContext, doExpiryForSensor, 
persist, action);
 
         // and update softly kept
         WorkflowStateActiveInMemory activeInMemory = 
WorkflowStateActiveInMemory.get(mgmt);
         if (contextToRemoveFromSoftMemory!=null) {
             activeInMemory.deleteWorkflow(contextToRemoveFromSoftMemory);
         }
-        if (doExpiry) activeInMemory.recomputeExpiration(entity);
+        if (doExpiryInMemory) activeInMemory.recomputeExpiration(entity, 
optionalContext);
 
         return result;
     }
 
-    int updateMap(Entity entity, boolean doExpiry, boolean persist, 
Consumer<Map<String,WorkflowExecutionContext>> action) {
+    int updateMap(Entity entity, @Nullable WorkflowExecutionContext 
optionalContext, boolean doExpiry, boolean persist, 
Consumer<Map<String,WorkflowExecutionContext>> action) {
         AtomicInteger delta = new AtomicInteger(0);
         entity.sensors().modify(INTERNAL_WORKFLOWS, vo -> {
             Map<String, WorkflowExecutionContext> v = MutableMap.copyOf(vo);
             delta.set(-v.size());
             if (action!=null) action.accept(v);
-            if (doExpiry) v = 
WorkflowRetentionAndExpiration.recomputeExpiration(v, null, false);
+            if (doExpiry) v = 
WorkflowRetentionAndExpiration.recomputeExpiration(v, optionalContext, false);
             delta.getAndAdd(v.size());
             return Maybe.of(v);
         });
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java
index a5dbe86b0f..db5a2bdf3f 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java
@@ -18,6 +18,18 @@
  */
 package org.apache.brooklyn.core.workflow.utils;
 
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
 import org.apache.brooklyn.core.workflow.WorkflowExecutionContext;
 import org.apache.brooklyn.core.workflow.WorkflowExpressionResolution;
 import org.apache.brooklyn.core.workflow.store.WorkflowRetentionAndExpiration;
@@ -27,13 +39,7 @@ import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.guava.Maybe;
 import org.apache.brooklyn.util.text.Strings;
 import org.apache.brooklyn.util.time.Duration;
-
-import javax.annotation.Nullable;
-import java.time.Instant;
-import java.time.temporal.ChronoUnit;
-import java.util.*;
-import java.util.function.Function;
-import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
 
 public class WorkflowRetentionParser {
 
@@ -82,17 +88,48 @@ also allows `hard` at start or end, or `soft [limit]` at end
                 continue;
             }
 
-            // TODO soft/hard keyword; take whichever occurs last
-            if (retentionExpression.contains(" hash ")) {
-                if (result.hash != null)
-                    throw new IllegalArgumentException("Cannot set multiple 
'hash' in retention expression");
-                int i = retentionExpression.indexOf(" hash ");
-                result.hash = 
Strings.removeFromStart(retentionExpression.substring(i).trim(), "hash").trim();
-                retentionExpression = retentionExpression.substring(0, 
i).trim();
-                continue;
+            List<Pair<String,Integer>> specialTerms = MutableList.of();
+            for (String term: MutableList.of("hash", "soft", "hard"))
+                specialTerms.add(Pair.of(term, retentionExpression.indexOf(" 
"+term+" ")));
+            Collections.sort(specialTerms, (x,y) -> 
-Integer.compare(x.getRight(), y.getRight()));
+            Pair<String, Integer> last = specialTerms.iterator().next();
+            if (last.getRight()>=0) {
+                if ("hash".equals(last.getLeft())) {
+                    if (result.hash != null)
+                        throw new IllegalArgumentException("Cannot set 
multiple 'hash' in retention expression");
+                    result.hash = 
Strings.removeFromStart(retentionExpression.substring(last.getRight()).trim(), 
last.getLeft()).trim();
+                    retentionExpression = retentionExpression.substring(0, 
last.getRight()).trim();
+                    continue;
+                }
+                if ("hard".equals(last.getLeft())) {
+                    if (result.softExpiry != null)
+                        throw new IllegalArgumentException("Cannot set 
multiple 'hard' or 'soft' in retention expression");
+                    result.softExpiry = "0";
+                    String hardTrailing = 
Strings.removeFromStart(retentionExpression.substring(last.getRight()).trim(), 
last.getLeft()).trim();
+                    if (Strings.isNonBlank(hardTrailing)) {
+                        if (last.getRight() == 0) retentionExpression = 
hardTrailing;
+                        else throw new IllegalArgumentException("Cannot have 
retention definition both before and after 'hard' keyword");
+                    } else {
+                        retentionExpression = retentionExpression.substring(0, 
last.getRight()).trim();
+                    }
+                    continue;
+                }
+                if ("soft".equals(last.getLeft())) {
+                    if (result.softExpiry != null)
+                        throw new IllegalArgumentException("Cannot set 
multiple 'hard' or 'soft' in retention expression");
+                    String softTrailing = 
Strings.removeFromStart(retentionExpression.substring(last.getRight()).trim(), 
last.getLeft()).trim();
+                    if (Strings.isNonBlank(softTrailing)) {
+                        result.softExpiry = softTrailing;
+                        new 
WorkflowRetentionParser(result.softExpiry).soft().parse();
+                        retentionExpression = retentionExpression.substring(0, 
last.getRight()).trim();
+                    } else {
+                        throw new IllegalArgumentException("Specification for 
'soft' retetntion must provide retention expression after the keyword");
+                    }
+                    continue;
+                }
             }
             break;
-        } while (false);
+        } while (true);
 
         if (retentionExpression.equals("disabled")) {
             result.disabled = true;
@@ -201,6 +238,8 @@ also allows `hard` at start or end, or `soft [limit]` at end
 
     static abstract class KeepDelegate implements WorkflowRetentionFilter {
         WorkflowRetentionFilter delegate;
+        final boolean soft;
+        KeepDelegate(boolean soft) { this.soft = soft; }
         @Override
         public Collection<WorkflowExecutionContext> 
apply(Collection<WorkflowExecutionContext> workflowExecutionContexts) {
             if (delegate==null) throw new IllegalStateException("Not 
initialized");
@@ -214,30 +253,31 @@ also allows `hard` at start or end, or `soft [limit]` at 
end
         protected abstract WorkflowRetentionFilter 
findDelegate(WorkflowExecutionContext workflow);
     }
     static class KeepSystem extends KeepDelegate {
+        KeepSystem(boolean soft) { super(soft); }
         @Override
         public WorkflowRetentionFilter findDelegate(WorkflowExecutionContext 
workflow) {
             if (workflow==null) throw new IllegalStateException("Retention 
'system' cannot be used here");
-            return new 
WorkflowRetentionParser(workflow.getManagementContext().getConfig().getConfig(WorkflowRetentionAndExpiration.WORKFLOW_RETENTION_DEFAULT)).parse().init(null);
+            return new 
WorkflowRetentionParser(workflow.getManagementContext().getConfig().getConfig(
+                    soft ? 
WorkflowRetentionAndExpiration.WORKFLOW_RETENTION_DEFAULT_SOFT : 
WorkflowRetentionAndExpiration.WORKFLOW_RETENTION_DEFAULT))
+                    .soft(soft).parse().init(null);
         }
         @Override
         public String toString() {
             return "system";
         }
     }
-    public static WorkflowRetentionFilter newDefaultFilter() {
-        return new KeepParent();
-    }
-    public static WorkflowRetentionFilter newDefaultSoftFilter() {
-        return new KeepSystem();
+    public static WorkflowRetentionFilter newDefaultFilter(boolean soft) {
+        return new KeepParent(soft);
     }
     static class KeepParent extends KeepDelegate {
+        KeepParent(boolean soft) { super(soft); }
         @Override
         public WorkflowRetentionFilter findDelegate(WorkflowExecutionContext 
workflow) {
             if (workflow == null) throw new IllegalStateException("Retention 
'parent' cannot be used here");
             else if (workflow.getParent()!=null) {
-                return 
workflow.getParent().getRetentionSettings().getExpiryFn(workflow.getParent());
+                return soft ? 
workflow.getParent().getRetentionSettings().getSoftExpiryFn(workflow.getParent())
 : 
workflow.getParent().getRetentionSettings().getExpiryFn(workflow.getParent());
             } else {
-                return new KeepSystem().init(workflow);
+                return new KeepSystem(soft).init(workflow);
             }
         }
         @Override
@@ -246,12 +286,13 @@ also allows `hard` at start or end, or `soft [limit]` at 
end
         }
     }
     static class KeepContext extends KeepDelegate {
+        KeepContext(boolean soft) { super(soft); }
         @Override
         public WorkflowRetentionFilter findDelegate(WorkflowExecutionContext 
workflow) {
             if (workflow == null) throw new IllegalStateException("Retention 
'context' cannot be used here");
 
             // expands to string to something that doesn't reference context 
so that this does not infinitely recurse
-            return workflow.getRetentionSettings().getExpiryFn(workflow);
+            return soft ? 
workflow.getRetentionSettings().getSoftExpiryFn(workflow) : 
workflow.getRetentionSettings().getExpiryFn(workflow);
         }
         @Override
         public String toString() {
@@ -261,17 +302,21 @@ also allows `hard` at start or end, or `soft [limit]` at 
end
 
     String fullExpression;
     String rest;
+    boolean soft = false;
 
     public WorkflowRetentionParser(String fullExpression) {
         this.fullExpression = fullExpression;
     }
 
+    public WorkflowRetentionParser soft() { return soft(true); }
+    public WorkflowRetentionParser soft(boolean soft) { this.soft = soft; 
return this; }
+
     public WorkflowRetentionFilter parse() {
-        if (Strings.isBlank(fullExpression)) return newDefaultFilter();
+        if (Strings.isBlank(fullExpression)) return newDefaultFilter(soft);
 
         rest = Strings.trimStart(fullExpression.toLowerCase());
         WorkflowRetentionFilter result = parseTerm();
-        if (!Strings.isBlank(rest)) return newDefaultFilter();
+        if (!Strings.isBlank(rest)) return newDefaultFilter(soft);
         return result;
     }
 
@@ -337,9 +382,9 @@ also allows `hard` at start or end, or `soft [limit]` at end
         if (term.isPresent()) return term.get();
 
         if (eatNA("all") || eatNA("forever")) return new KeepAll();
-        if (eatNA("system")) return new KeepSystem();
-        if (eatNA("parent")) return new KeepParent();
-        if (eatNA("context")) return new KeepContext();
+        if (eatNA("system")) return new KeepSystem(soft);
+        if (eatNA("parent")) return new KeepParent(soft);
+        if (eatNA("context")) return new KeepContext(soft);
 
         int i = maxPositive(rest.indexOf(","), rest.indexOf(")"));
         if (i==-1) i = rest.length();
diff --git 
a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
 
b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
index f90a0a27d9..4f5cb3684c 100644
--- 
a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
+++ 
b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
@@ -20,6 +20,7 @@ package org.apache.brooklyn.core.workflow;
 
 import ch.qos.logback.classic.Level;
 import ch.qos.logback.classic.spi.ILoggingEvent;
+import com.google.common.base.Predicates;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Iterables;
 import org.apache.brooklyn.api.entity.EntityLocal;
@@ -39,6 +40,7 @@ import 
org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal;
 import org.apache.brooklyn.core.mgmt.rebind.RebindOptions;
 import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixture;
 import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.workflow.store.WorkflowRetentionAndExpiration;
 import 
org.apache.brooklyn.core.workflow.store.WorkflowStatePersistenceViaSensors;
 import org.apache.brooklyn.entity.stock.BasicApplication;
 import org.apache.brooklyn.test.Asserts;
@@ -927,6 +929,18 @@ public class WorkflowPersistReplayErrorsTest extends 
RebindTestFixture<BasicAppl
         Asserts.assertSize(wp.get().getWorkflows(app, false).keySet(), 1);
         Asserts.assertSize(wp.get().getWorkflows(app, true).keySet(), 3);
 
+        // setting soft 4 should give one more, and allow hash keyword at end
+        w1 = doTestRetentionDisabled(2, "min(1,context) soft 4 hash 
my-fixed-hash", false, false, false);
+        
Asserts.assertEquals(lastWorkflowContext.getRetentionSettings().softExpiryResolved,
 "4");
+        Asserts.assertSize(wp.get().getWorkflows(app, false).keySet(), 1);
+        Asserts.assertSize(wp.get().getWorkflows(app, true).keySet(), 4);
+
+        // and soft with min(2,...) allowing hash keyword before should also 
work
+        w1 = doTestRetentionDisabled(2, "min(1,context) hash my-fixed-hash 
soft min(2,system)", false, false, false);
+        
Asserts.assertEquals(lastWorkflowContext.getRetentionSettings().softExpiryResolved,
 "min(2,system)");
+        Asserts.assertSize(wp.get().getWorkflows(app, false).keySet(), 1);
+        Asserts.eventually(() -> wp.get().getWorkflows(app, 
true).keySet().size(), Predicates.equalTo(2), Duration.seconds(2));
+
         // invoking our test gives a new workflow hash because the effector 
name is different
         w2 = doTestRetentionDisabled(2, "1", false, false, false);
         Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), 
MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId()));
@@ -978,7 +992,7 @@ public class WorkflowPersistReplayErrorsTest extends 
RebindTestFixture<BasicAppl
 
         // wait 5s and run something, it should cause everything else to expire
         Time.sleep(Duration.FIVE_SECONDS);
-        wp.get().expireOldWorkflows(app, null);
+        WorkflowRetentionAndExpiration.expireOldWorkflows(app);
         // should now be empty
         Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), 
MutableSet.of());
 
@@ -991,17 +1005,17 @@ public class WorkflowPersistReplayErrorsTest extends 
RebindTestFixture<BasicAppl
         Time.sleep(Duration.seconds(5));
         w3 = doTestRetentionDisabled("hash my-fixed-hash max(1,"+longWait+")", 
"context", false, true, false);
         // should now have all 3
-        wp.get().expireOldWorkflows(app, null);
+        WorkflowRetentionAndExpiration.expireOldWorkflows(app);
         Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), 
MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId(), w3.getWorkflowId()));
 
         Time.sleep(Duration.seconds(5));
         // now just the last 1 (only 1 in 10s)
-        wp.get().expireOldWorkflows(app, null);
+        WorkflowRetentionAndExpiration.expireOldWorkflows(app);
         Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), 
MutableSet.of(w3.getWorkflowId()));
 
         Time.sleep(Duration.seconds(5));
         // still have last 1 (even after 10s)
-        wp.get().expireOldWorkflows(app, null);
+        WorkflowRetentionAndExpiration.expireOldWorkflows(app);
         Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), 
MutableSet.of(w3.getWorkflowId()));
 
         // run two more, that's all we should have
diff --git 
a/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessRebindNotRunningEntityTest.java
 
b/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessRebindNotRunningEntityTest.java
index 7d1849af97..4defc5b109 100644
--- 
a/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessRebindNotRunningEntityTest.java
+++ 
b/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessRebindNotRunningEntityTest.java
@@ -129,7 +129,13 @@ public class SoftwareProcessRebindNotRunningEntityTest 
extends RebindTestFixture
                 }
                 if (latch instanceof TerminableCountDownLatch) 
((TerminableCountDownLatch)latch).terminate();
             }
-            super.tearDown(Duration.millis(10));   // stops here can be 
blocked, don't wait on them
+            try {
+                super.tearDown(Duration.millis(50));   // stops here can be 
blocked, don't wait on them
+            } catch (Exception e) {
+                // we fail on this in case it is a real problem, but not 
believed to be, only seen occasionally, and not since timeout was increased 
2024-04-01
+                LOG.warn("Teardown of test encountered exception; not unknown 
if multiple processes attempt to destroy, as destruction is deliberately 
unsynchronized to minimize race errors", e);
+                throw Exceptions.propagateAnnotated("Concurrent teardown 
issue", e);
+            }
             if (executor != null) executor.shutdownNow();
         } finally {
             latches.clear();


Reply via email to