This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit 51ad80bf693adc11085d219842bf529ad04257b9
Author: Alex Heneveld <a...@cloudsoft.io>
AuthorDate: Fri Mar 29 23:54:50 2024 +0000

    allow keeping soft references to previous workflows
    
    means we can set 'retention: disabled' for persistence purposes but still - 
usually - have access to them in memory.
    does not yet apply any limits to that cache, but thinking is to follow 
system default for now,
    and perhaps to allow configuration in future.
---
 .../camp/brooklyn/WorkflowYamlRebindTest.java      |  18 ++--
 .../store/WorkflowRetentionAndExpiration.java      |   7 +-
 .../store/WorkflowStateActiveInMemory.java         | 106 ++++++++++++---------
 .../store/WorkflowStatePersistenceViaSensors.java  |  13 ++-
 .../workflow/utils/WorkflowRetentionParser.java    |  34 +++++--
 .../workflow/WorkflowPersistReplayErrorsTest.java  |   2 +-
 6 files changed, 111 insertions(+), 69 deletions(-)

diff --git 
a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlRebindTest.java
 
b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlRebindTest.java
index b4b1145185..697428ccec 100644
--- 
a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlRebindTest.java
+++ 
b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlRebindTest.java
@@ -18,7 +18,10 @@
  */
 package org.apache.brooklyn.camp.brooklyn;
 
-import com.google.common.base.Stopwatch;
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
@@ -28,20 +31,20 @@ import org.apache.brooklyn.api.entity.EntitySpec;
 import org.apache.brooklyn.api.location.LocationSpec;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.objs.EntityAdjunct;
-import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.api.sensor.Feed;
 import org.apache.brooklyn.camp.brooklyn.spi.dsl.methods.BrooklynDslCommon;
 import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.entity.*;
+import org.apache.brooklyn.core.entity.Dumper;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
-import org.apache.brooklyn.core.mgmt.rebind.RebindOptions;
 import org.apache.brooklyn.core.sensor.Sensors;
 import org.apache.brooklyn.core.test.entity.TestApplication;
 import org.apache.brooklyn.core.test.entity.TestEntity;
 import org.apache.brooklyn.core.workflow.WorkflowBasicTest;
 import org.apache.brooklyn.core.workflow.WorkflowEffector;
-import org.apache.brooklyn.core.workflow.WorkflowExecutionContext;
 import org.apache.brooklyn.entity.software.base.EmptySoftwareProcess;
 import org.apache.brooklyn.entity.stock.BasicApplication;
 import org.apache.brooklyn.entity.stock.BasicEntity;
@@ -59,11 +62,6 @@ import org.slf4j.LoggerFactory;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.io.File;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Predicate;
-
 public class WorkflowYamlRebindTest extends AbstractYamlRebindTest {
 
     private static final Logger log = 
LoggerFactory.getLogger(WorkflowYamlRebindTest.class);
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java
index cc0adc5e9b..c6c8a16d73 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java
@@ -50,9 +50,14 @@ public class WorkflowRetentionAndExpiration {
     public static class WorkflowRetentionSettings {
         public Boolean disabled;
         public String hash;
+
         public String expiry;
         public String expiryResolved;
 
+        // soft expiry refers to what is kept in memory
+        public String softExpiry;
+        public String softExpiryResolved;
+
         @JsonIgnore
         private transient WorkflowRetentionParser.WorkflowRetentionFilter 
expiryFn;
 
@@ -163,7 +168,7 @@ public class WorkflowRetentionAndExpiration {
     private static boolean isExpirable(WorkflowExecutionContext c) {
         if (c.getStatus() == null || !c.getStatus().expirable) return false;
         if (c.getParent()!=null) {
-            // fow now, don't expire children workflows unless parents are 
also expirable
+            // XXX for size reasons, should skip this - fow now, don't expire 
children workflows unless parents are also expirable
             if (!isExpirable(c.getParent())) return false;
 
             // we could weaken this if we have lots of children workflows, but 
that is more work; left as an enhancement
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java
index d27a161a16..59f5780a87 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java
@@ -20,13 +20,17 @@ package org.apache.brooklyn.core.workflow.store;
 
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.mgmt.ManagementContext;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.workflow.WorkflowExecutionContext;
+import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.collections.MutableSet;
 import org.slf4j.Logger;
@@ -55,7 +59,10 @@ public class WorkflowStateActiveInMemory {
     }
 
     private final ManagementContext mgmt;
-    final Map<String,Map<String,WorkflowExecutionContext>> data = 
MutableMap.of();
+    // active workflows by entity then workflow id
+    final Map<String,Map<String,WorkflowExecutionContext>> active = 
MutableMap.of();
+    // cached remembered workflows by entity then workflow id
+    final Map<String, Cache<String, WorkflowExecutionContext>> 
completedSoftlyKept = MutableMap.of();
 
     // created and managed by mgmt context scratchpad
     protected WorkflowStateActiveInMemory(ManagementContext mgmt) {
@@ -67,40 +74,25 @@ public class WorkflowStateActiveInMemory {
     public void expireAbsentEntities() {
         lastInMemClear = System.currentTimeMillis();
         Set<String> copy;
-        synchronized (data) {
-            copy = MutableSet.copyOf(data.keySet());
-        }
+        synchronized (active) { copy = MutableSet.copyOf(active.keySet()); }
+        synchronized (completedSoftlyKept) { 
copy.addAll(completedSoftlyKept.keySet()); }
+
         copy.forEach(entityId -> {
             if (mgmt.getEntityManager().getEntity(entityId) == null) {
-                data.remove(entityId);
+                synchronized (active) { active.remove(entityId); }
+                synchronized (completedSoftlyKept) { 
completedSoftlyKept.remove(entityId); }
             }
         });
     }
 
     public void checkpoint(WorkflowExecutionContext context) {
-        // keep active workflows in memory, even if disabled
-        Map<String, WorkflowExecutionContext> entityActiveWorkflows = 
getForWorkflowIdWithLockButResultNeedsSynch(context.getEntity().getId());
         if (context.getStatus().expirable) {
-            if (entityActiveWorkflows!=null) {
-                synchronized (entityActiveWorkflows) {
-                    entityActiveWorkflows.remove(context.getWorkflowId());
-                }
-            }
+            withActiveForEntity(context.getEntity().getId(), false, wfm -> 
wfm.remove(context.getWorkflowId()));
+            withSoftlyKeptForEntity(context.getEntity().getId(), true, wfm -> 
{ wfm.put(context.getWorkflowId(), context); return null; });
         } else {
-            if (entityActiveWorkflows==null) {
-                synchronized (data) {
-                    entityActiveWorkflows = 
data.get(context.getEntity().getId());
-                    if (entityActiveWorkflows==null) {
-                        entityActiveWorkflows = MutableMap.of();
-                        data.put(context.getEntity().getId(), 
entityActiveWorkflows);
-                    }
-                }
-            }
-            synchronized (entityActiveWorkflows) {
-                entityActiveWorkflows.put(context.getWorkflowId(), context);
-            }
+            // keep active workflows in memory, even if disabled
+            withActiveForEntity(context.getEntity().getId(), true, wfm -> 
wfm.put(context.getWorkflowId(), context));
         }
-
         if (lastInMemClear + GLOBAL_UPDATE_FREQUENCY < 
System.currentTimeMillis()) {
             // poor man's cleanup, every minute, but good enough
             expireAbsentEntities();
@@ -112,37 +104,59 @@ public class WorkflowStateActiveInMemory {
         return getWorkflowsCopy(entity);
     }
     public MutableMap<String,WorkflowExecutionContext> getWorkflowsCopy(Entity 
entity) {
-        Map<String, WorkflowExecutionContext> entityActiveWorkflows = 
getForWorkflowIdWithLockButResultNeedsSynch(entity.getId());
-        if (entityActiveWorkflows == null) return MutableMap.of();
-        synchronized (entityActiveWorkflows) {
-            return MutableMap.copyOf(entityActiveWorkflows);
-        }
+        return getWorkflowsCopy(entity, true);
+    }
+    public MutableMap<String,WorkflowExecutionContext> getWorkflowsCopy(Entity 
entity, boolean includeSoftlyKeptCompleted) {
+        MutableMap<String,WorkflowExecutionContext> result = MutableMap.of();
+        withActiveForEntity(entity.getId(), false, wfm -> { 
result.putAll(wfm); return null; });
+        if (includeSoftlyKeptCompleted) 
withSoftlyKeptForEntity(entity.getId(), false, wfm -> { 
result.putAll(wfm.asMap()); return null; });
+        return result;
     }
 
     boolean deleteWorkflow(WorkflowExecutionContext context) {
-        Map<String, WorkflowExecutionContext> entityActiveWorkflows = 
getForWorkflowIdWithLockButResultNeedsSynch(context.getEntity().getId());
-        if (entityActiveWorkflows!=null) {
-            synchronized (entityActiveWorkflows) {
-                return entityActiveWorkflows.remove(context.getWorkflowId()) 
!= null;
-            }
+        boolean result = false;
+        result = 
Boolean.TRUE.equals(withActiveForEntity(context.getEntity().getId(), false, wfm 
-> wfm.remove(context.getWorkflowId())!=null)) || result;
+        result = 
Boolean.TRUE.equals(withSoftlyKeptForEntity(context.getEntity().getId(), false, 
wfm -> {
+            WorkflowExecutionContext soft = 
wfm.getIfPresent(context.getWorkflowId());
+            wfm.invalidate(context.getWorkflowId());
+            return (soft!=null);
+        })) || result;
+
+        return result;
+    }
+
+    private <T> T withActiveForEntity(String entityId, boolean upsert, 
Function<Map<String, WorkflowExecutionContext>,T> fn) {
+        if (entityId==null) return null;
+        Map<String, WorkflowExecutionContext> result;
+        synchronized (active) {
+            result = active.computeIfAbsent(entityId, _key -> upsert ? 
MutableMap.of() : null);
+        }
+        if (result==null) return null;
+        synchronized (result) {
+            return fn.apply(result);
         }
-        return false;
     }
 
-    // note: callers should subsequently sync on the returned map
-    private Map<String, WorkflowExecutionContext> 
getForWorkflowIdWithLockButResultNeedsSynch(String entityId) {
-        synchronized (data) {
-            return data.get(entityId);
+    private <T> T withSoftlyKeptForEntity(String entityId, boolean upsert, 
Function<Cache<String, WorkflowExecutionContext>,T> fn) {
+        if (entityId==null) return null;
+        Cache<String, WorkflowExecutionContext> result;
+        synchronized (completedSoftlyKept) {
+            result = completedSoftlyKept.computeIfAbsent(entityId, _key -> 
upsert ? CacheBuilder.newBuilder().softValues().build() : null);
+        }
+        if (result==null) return null;
+        synchronized (result) {
+            return fn.apply(result);
         }
     }
 
     public WorkflowExecutionContext 
getFromTag(BrooklynTaskTags.WorkflowTaskTag tag) {
-        Map<String, WorkflowExecutionContext> activeForEntity = 
getForWorkflowIdWithLockButResultNeedsSynch(tag.getEntityId());
-        if (activeForEntity!=null) {
-            synchronized (activeForEntity) {
-                return activeForEntity.get(tag.getWorkflowId());
-            }
+        return getFromTag(tag, true);
+    }
+    public WorkflowExecutionContext 
getFromTag(BrooklynTaskTags.WorkflowTaskTag tag, boolean 
includeCompletedSoftlyKept) {
+        WorkflowExecutionContext result = 
withActiveForEntity(tag.getEntityId(), false, wfm -> 
wfm.get(tag.getWorkflowId()));
+        if (includeCompletedSoftlyKept && result==null) {
+            result = withSoftlyKeptForEntity(tag.getEntityId(), false, wfm -> 
wfm.getIfPresent(tag.getWorkflowId()));
         }
-        return null;
+        return result;
     }
 }
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
index 63170c539b..82ef7b0c30 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
@@ -94,7 +94,7 @@ public class WorkflowStatePersistenceViaSensors {
         
WorkflowStateActiveInMemory.get(context.getManagementContext()).checkpoint(context);
 
         if (Boolean.TRUE.equals(context.getRetentionSettings().disabled)) {
-            if (getFromTag(BrooklynTaskTags.tagForWorkflow(context), 
false)!=null) {
+            if (getFromTag(BrooklynTaskTags.tagForWorkflow(context), false, 
false)!=null) {
                 // need to clear
                 updateMap(entity, false, true, v -> 
v.remove(context.getWorkflowId(), context));
             }
@@ -168,7 +168,7 @@ public class WorkflowStatePersistenceViaSensors {
     }
 
     public Map<String,WorkflowExecutionContext> getWorkflows(Entity entity) {
-        MutableMap<String, WorkflowExecutionContext> result = 
WorkflowStateActiveInMemory.get(mgmt).getWorkflowsCopy(entity);
+        MutableMap<String, WorkflowExecutionContext> result = 
WorkflowStateActiveInMemory.get(mgmt).getWorkflowsCopy(entity, true);
         result.add(entity.sensors().get(INTERNAL_WORKFLOWS));
         return result;
     }
@@ -185,17 +185,20 @@ public class WorkflowStatePersistenceViaSensors {
     }
 
     public Maybe<WorkflowExecutionContext> 
getFromTag(BrooklynTaskTags.WorkflowTaskTag tag) {
-        return getFromTag(tag, true);
+        return getFromTag(tag, true, true);
+    }
+    public Maybe<WorkflowExecutionContext> 
getFromTag(BrooklynTaskTags.WorkflowTaskTag tag, boolean 
includeSoftlyKeptInMemory) {
+        return getFromTag(tag, true, includeSoftlyKeptInMemory);
     }
 
-    private Maybe<WorkflowExecutionContext> 
getFromTag(BrooklynTaskTags.WorkflowTaskTag tag, boolean allowInMemory) {
+    private Maybe<WorkflowExecutionContext> 
getFromTag(BrooklynTaskTags.WorkflowTaskTag tag, boolean allowActiveInMemory, 
boolean allowActiveAndSoftlyKeptInMemory) {
         Entity targetEntity = mgmt.lookup(tag.getEntityId(), Entity.class);
         if (targetEntity==null) {
             return Maybe.absent("Entity "+tag.getWorkflowId()+" not found");
         } else {
             WorkflowExecutionContext w = null;
 
-            if (allowInMemory) w = 
WorkflowStateActiveInMemory.get(mgmt).getFromTag(tag);
+            if (allowActiveInMemory || allowActiveAndSoftlyKeptInMemory) w = 
WorkflowStateActiveInMemory.get(mgmt).getFromTag(tag, 
allowActiveAndSoftlyKeptInMemory);
 
             if (w==null) {
                 w = new 
WorkflowStatePersistenceViaSensors(mgmt).getWorkflows(targetEntity).get(tag.getWorkflowId());
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java
index 18ea10d7b0..d2488457bc 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java
@@ -55,6 +55,8 @@ the semantics of `min` and `max` are
 * `max` means completed workflow instances must be retained if they meet any 
of the constraints implied by the `<value>` arguments, i.e. `max(2, 3, 1h, 2h)` 
means to keep the 3 most recent instances irrespective of when they run, and to 
keep all instances for up to two hours
 
 also allows a `hash <value>` to be set at the start or the end
+
+also allows `hard` at start or end, or `soft [limit]` at end
      */
 
     public static WorkflowRetentionSettings parse(String retentionExpression, 
@Nullable WorkflowExecutionContext context) {
@@ -63,20 +65,40 @@ also allows a `hash <value>` to be set at the start or the 
end
         if (Strings.isBlank(retentionExpression)) return result;
         retentionExpression = retentionExpression.trim().toLowerCase();
 
-        if (retentionExpression.equals("disabled")) {
-            result.disabled = true;
-
-        } else {
-
+        do {
             if (retentionExpression.startsWith("hash ")) {
+                if (result.hash != null)
+                    throw new IllegalArgumentException("Cannot set multiple 
'hash' in retention expression");
                 retentionExpression = 
Strings.removeFromStart(retentionExpression, "hash").trim();
                 result.hash = Strings.getFirstWord(retentionExpression);
                 retentionExpression = 
retentionExpression.substring(result.hash.length()).trim();
-            } else if (retentionExpression.contains(" hash ")) {
+                continue;
+            }
+            if (retentionExpression.startsWith("hard ")) {
+                if (result.softExpiry != null)
+                    throw new IllegalArgumentException("Cannot set multiple 
'hard' or 'soft' in retention expression");
+                retentionExpression = 
Strings.removeFromStart(retentionExpression, "hard").trim();
+                result.softExpiry = "0";
+                continue;
+            }
+
+            // TODO soft/hard keyword; take whichever occurs last
+            if (retentionExpression.contains(" hash ")) {
+                if (result.hash != null)
+                    throw new IllegalArgumentException("Cannot set multiple 
'hash' in retention expression");
                 int i = retentionExpression.indexOf(" hash ");
                 result.hash = 
Strings.removeFromStart(retentionExpression.substring(i).trim(), "hash").trim();
                 retentionExpression = retentionExpression.substring(0, 
i).trim();
+                continue;
             }
+            break;
+        } while (false);
+
+        if (retentionExpression.equals("disabled")) {
+            result.disabled = true;
+
+        } else {
+
             if (Strings.isNonBlank(result.hash) && context!=null) {
                 result.hash = 
context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_RUNNING,
 result.hash, String.class);
             }
diff --git 
a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
 
b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
index 32a0328970..adc6fd2702 100644
--- 
a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
+++ 
b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
@@ -1036,7 +1036,7 @@ public class WorkflowPersistReplayErrorsTest extends 
RebindTestFixture<BasicAppl
         }
         app.sensors().set(b, 1);
 
-        Maybe<WorkflowExecutionContext> wf = new 
WorkflowStatePersistenceViaSensors(mgmt()).getFromTag(wt);
+        Maybe<WorkflowExecutionContext> wf = new 
WorkflowStatePersistenceViaSensors(mgmt()).getFromTag(wt, false);
 
         if (shouldBeDisabled) {
             Time.sleep(Duration.millis(500));

Reply via email to