This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit fb7c01507a1bcb4bf01b48f33923bf161d168642
Author: Alex Heneveld <a...@cloudsoft.io>
AuthorDate: Sat Mar 30 01:22:34 2024 +0000

    expire appropriate softly-kept in memory completed workflows, using system 
default
---
 .../brooklyn/camp/brooklyn/WorkflowYamlTest.java   |  2 +-
 .../store/WorkflowRetentionAndExpiration.java      | 14 ++++----
 .../store/WorkflowStateActiveInMemory.java         |  7 ++++
 .../store/WorkflowStatePersistenceViaSensors.java  | 28 ++++++++++++----
 .../workflow/utils/WorkflowRetentionParser.java    |  3 ++
 .../workflow/WorkflowPersistReplayErrorsTest.java  | 37 ++++++++++++++--------
 6 files changed, 65 insertions(+), 26 deletions(-)

diff --git 
a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java
 
b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java
index b7ba499072..a4dc8d3bc1 100644
--- 
a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java
+++ 
b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java
@@ -362,7 +362,7 @@ public class WorkflowYamlTest extends AbstractYamlTest {
             defs.forEach(def -> Asserts.assertThat(def, d -> !(d instanceof 
WorkflowStepDefinition)));
         } else {
             EntityAsserts.assertAttributeEqualsContinually(entity, 
MY_WORKFLOW_SENSOR, null);
-            Asserts.assertThat(new 
WorkflowStatePersistenceViaSensors(mgmt()).getWorkflows(entity).values(), 
Collection::isEmpty);
+            Asserts.assertThat(new 
WorkflowStatePersistenceViaSensors(mgmt()).getWorkflows(entity, 
false).values(), Collection::isEmpty);
         }
         return entity;
     }
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java
index c6c8a16d73..4c2c667d59 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java
@@ -106,7 +106,7 @@ public class WorkflowRetentionAndExpiration {
 
     static ThreadLocal<Set<String>> INIT_REENTRANT = new 
ThreadLocal<Set<String>>();
 
-    static Map<String, WorkflowExecutionContext> 
recomputeExpiration(Map<String, WorkflowExecutionContext> v, 
WorkflowExecutionContext optionalContext) {
+    static Map<String, WorkflowExecutionContext> 
recomputeExpiration(Map<String, WorkflowExecutionContext> v, 
WorkflowExecutionContext optionalContext, boolean useSoftlyKeptExpiry) {
         Set<String> workflowHashesToUpdate = optionalContext!=null ? 
MutableSet.of(Strings.firstNonBlank(optionalContext.getRetentionHash(), 
"empty-expiry-hash"))  //should always be set
             : 
v.values().stream().map(WorkflowExecutionContext::getRetentionHash).collect(Collectors.toSet());
 
@@ -121,7 +121,9 @@ public class WorkflowRetentionAndExpiration {
 
             Optional<WorkflowExecutionContext> existingRetentionExpiry = 
finishedTwins.stream().filter(w -> w.getRetentionSettings().expiry != 
null).findAny();
             WorkflowRetentionParser.WorkflowRetentionFilter expiry;
-            if (existingRetentionExpiry.isPresent()) {
+            if (useSoftlyKeptExpiry) {
+                expiry = 
WorkflowRetentionParser.newDefaultSoftFilter().init(finishedTwins.iterator().next());
+            } else if (existingRetentionExpiry.isPresent()) {
                 // log if expiry fn differs for the same hash
                 // (but note if it refers to parents, invocations from 
different places could result in different expiry functions)
                 if (optionalContext!=null && 
optionalContext.getRetentionHash().equals(k)) {
@@ -144,7 +146,7 @@ public class WorkflowRetentionAndExpiration {
                 toRemove.removeAll(retainedFinishedTwins);
                 toRemove.forEach(w -> {
                     log.debug("Expiring old workflow " + w + " as there are 
"+retainedFinishedTwins.size()+" more recent ones also completed");
-                    deleteWorkflowFromMap(v, w, true);
+                    deleteWorkflowFromMap(v, w, true, false);
                 });
             }
         });
@@ -152,9 +154,9 @@ public class WorkflowRetentionAndExpiration {
         return v;
     }
 
-    static boolean deleteWorkflowFromMap(Map<String, WorkflowExecutionContext> 
v, WorkflowExecutionContext w, boolean andAllReplayTasks) {
+    static boolean deleteWorkflowFromMap(Map<String, WorkflowExecutionContext> 
v, WorkflowExecutionContext w, boolean andAllReplayTasks, boolean 
andSoftlyKept) {
         boolean removed = v.remove(w.getWorkflowId()) != null;
-        removed |= 
WorkflowStateActiveInMemory.get(w.getManagementContext()).deleteWorkflow(w);
+        if (andSoftlyKept) removed = 
WorkflowStateActiveInMemory.get(w.getManagementContext()).deleteWorkflow(w) || 
removed;
         if (andAllReplayTasks) {
             BasicExecutionManager em = ((BasicExecutionManager) 
w.getManagementContext().getExecutionManager());
             w.getReplays().forEach(wr -> {
@@ -190,6 +192,6 @@ public class WorkflowRetentionAndExpiration {
 
 
     public static void expireOldWorkflows(Entity entity) {
-        new 
WorkflowStatePersistenceViaSensors(((EntityInternal)entity).getManagementContext()).updateMap(entity,
 true, true, null);
+        new 
WorkflowStatePersistenceViaSensors(((EntityInternal)entity).getManagementContext()).updateMaps(entity,
 true, true, null, null);
     }
 }
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java
index 59f5780a87..88fe2319f2 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java
@@ -159,4 +159,11 @@ public class WorkflowStateActiveInMemory {
         }
         return result;
     }
+
+    public void recomputeExpiration(Entity entity) {
+        withSoftlyKeptForEntity(entity.getId(), false, wfm -> {
+            WorkflowRetentionAndExpiration.recomputeExpiration(wfm.asMap(), 
null, true);
+            return null;
+        });
+    }
 }
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
index 82ef7b0c30..97c37fc1a3 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
@@ -131,7 +131,7 @@ public class WorkflowStatePersistenceViaSensors {
         boolean interrupted = Thread.interrupted();
         boolean doExpiry = 
WorkflowRetentionAndExpiration.isExpirationCheckNeeded(entity);
         try {
-            return updateMap(entity, doExpiry, true, context==null ? null : v 
-> v.put(context.getWorkflowId(), context));
+            return updateMaps(entity, doExpiry, true, context==null ? null : v 
-> v.put(context.getWorkflowId(), context), null);
 
         } finally {
             if (interrupted) Thread.currentThread().interrupt();
@@ -142,10 +142,10 @@ public class WorkflowStatePersistenceViaSensors {
         if (w.getStatus()==null || w.getStatus().expirable || w.getStatus()== 
WorkflowExecutionContext.WorkflowStatus.STAGED) {
             log.debug("Explicit request to delete workflow "+w);
             AtomicBoolean result = new AtomicBoolean(false);
-            updateMap(w.getEntity(), false, true, map -> {
-                boolean removed = 
WorkflowRetentionAndExpiration.deleteWorkflowFromMap(map, w, true);
+            updateMaps(w.getEntity(), false, true, map -> {
+                boolean removed = 
WorkflowRetentionAndExpiration.deleteWorkflowFromMap(map, w, true, true);
                 if (removed) result.set(true);
-            });
+            }, w);
             return result.get();
         } else {
             log.warn("Explicit request to delete non-expirable workflow "+w+"; 
ignoring");
@@ -153,13 +153,26 @@ public class WorkflowStatePersistenceViaSensors {
         }
     }
 
+    int updateMaps(Entity entity, boolean doExpiry, boolean persist, 
Consumer<Map<String,WorkflowExecutionContext>> action, WorkflowExecutionContext 
contextToRemoveFromSoftMemory) {
+        int result = updateMap(entity, doExpiry, persist, action);
+
+        // and update softly kept
+        WorkflowStateActiveInMemory activeInMemory = 
WorkflowStateActiveInMemory.get(mgmt);
+        if (contextToRemoveFromSoftMemory!=null) {
+            activeInMemory.deleteWorkflow(contextToRemoveFromSoftMemory);
+        }
+        if (doExpiry) activeInMemory.recomputeExpiration(entity);
+
+        return result;
+    }
+
     int updateMap(Entity entity, boolean doExpiry, boolean persist, 
Consumer<Map<String,WorkflowExecutionContext>> action) {
         AtomicInteger delta = new AtomicInteger(0);
         entity.sensors().modify(INTERNAL_WORKFLOWS, vo -> {
             Map<String, WorkflowExecutionContext> v = MutableMap.copyOf(vo);
             delta.set(-v.size());
             if (action!=null) action.accept(v);
-            if (doExpiry) v = 
WorkflowRetentionAndExpiration.recomputeExpiration(v, null);
+            if (doExpiry) v = 
WorkflowRetentionAndExpiration.recomputeExpiration(v, null, false);
             delta.getAndAdd(v.size());
             return Maybe.of(v);
         });
@@ -168,7 +181,10 @@ public class WorkflowStatePersistenceViaSensors {
     }
 
     public Map<String,WorkflowExecutionContext> getWorkflows(Entity entity) {
-        MutableMap<String, WorkflowExecutionContext> result = 
WorkflowStateActiveInMemory.get(mgmt).getWorkflowsCopy(entity, true);
+        return getWorkflows(entity, true);
+    }
+    public Map<String,WorkflowExecutionContext> getWorkflows(Entity entity, 
boolean includeSoftlyKeptCompleted) {
+        MutableMap<String, WorkflowExecutionContext> result = 
WorkflowStateActiveInMemory.get(mgmt).getWorkflowsCopy(entity, 
includeSoftlyKeptCompleted);
         result.add(entity.sensors().get(INTERNAL_WORKFLOWS));
         return result;
     }
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java
index d2488457bc..a5dbe86b0f 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java
@@ -227,6 +227,9 @@ also allows `hard` at start or end, or `soft [limit]` at end
     public static WorkflowRetentionFilter newDefaultFilter() {
         return new KeepParent();
     }
+    public static WorkflowRetentionFilter newDefaultSoftFilter() {
+        return new KeepSystem();
+    }
     static class KeepParent extends KeepDelegate {
         @Override
         public WorkflowRetentionFilter findDelegate(WorkflowExecutionContext 
workflow) {
diff --git 
a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
 
b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
index adc6fd2702..f90a0a27d9 100644
--- 
a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
+++ 
b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
@@ -909,29 +909,40 @@ public class WorkflowPersistReplayErrorsTest extends 
RebindTestFixture<BasicAppl
 
         w1 = doTestRetentionDisabled("context", "min(1,2) hash my-fixed-hash", 
false, false, false);
         
Asserts.assertEquals(lastWorkflowContext.getRetentionSettings().expiryResolved, 
"min(1,2)");
-        Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), 
MutableSet.of(w1.getWorkflowId()));
+        Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), 
MutableSet.of(w1.getWorkflowId()));
 
         w1 = doTestRetentionDisabled(2, "hash my-fixed-hash min(1,context)", 
false, false, false);
         
Asserts.assertEquals(lastWorkflowContext.getRetentionSettings().expiryResolved, 
"min(1,2)");
 
-        Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), 
MutableSet.of(w1.getWorkflowId()));
+        Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), 
MutableSet.of(w1.getWorkflowId()));
+
+        // softly in-memory should also have both
+        Asserts.assertSize(wp.get().getWorkflows(app, true).keySet(), 2);
+
+        // do it 3 more times, we should have 1 persisted, but 3 in memory 
(not 4)
+        w1 = doTestRetentionDisabled(2, "hash my-fixed-hash min(1,context)", 
false, false, false);
+        w1 = doTestRetentionDisabled(2, "hash my-fixed-hash min(1,context)", 
false, false, false);
+        w1 = doTestRetentionDisabled(2, "hash my-fixed-hash min(1,context)", 
false, false, false);
+
+        Asserts.assertSize(wp.get().getWorkflows(app, false).keySet(), 1);
+        Asserts.assertSize(wp.get().getWorkflows(app, true).keySet(), 3);
 
         // invoking our test gives a new workflow hash because the effector 
name is different
         w2 = doTestRetentionDisabled(2, "1", false, false, false);
-        Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), 
MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId()));
+        Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), 
MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId()));
 
         // reinvoking that effector still gives 2
         Task<?> t = 
app.invoke(app.getEntityType().getEffectorByName("myWorkflow" + 
effNameCount).get(), null);
         t.blockUntilEnded();
         w2 = BrooklynTaskTags.getWorkflowTaskTag(t, false);
-        Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), 
MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId()));
+        Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), 
MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId()));
 
         // hash accepts variables
         app.config().set(ConfigKeys.newStringConfigKey("hash"), 
"my-fixed-hash");
 
         // this hash replaces old w1
         w1 = doTestRetentionDisabled("context", "min(1,2) hash 
${entity.config.hash}", false, false, false);
-        Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), 
MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId()));  // should replace the 
one above
+        Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), 
MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId()));  // should replace the 
one above
 
         // workflow.id as the hash variable means each invocations has its own 
retention
         w3 = doTestRetentionDisabled("context", "1 hash ${workflow.id}", 
false, false, false);
@@ -940,7 +951,7 @@ public class WorkflowPersistReplayErrorsTest extends 
RebindTestFixture<BasicAppl
         t.blockUntilEnded();
         w4 = BrooklynTaskTags.getWorkflowTaskTag(t, false);
 
-        Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), 
MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId(), w3.getWorkflowId(), 
w4.getWorkflowId()));  // should replace the one above
+        Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), 
MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId(), w3.getWorkflowId(), 
w4.getWorkflowId()));  // should replace the one above
     }
 
     @Test(groups="Integration")  // very slow
@@ -957,19 +968,19 @@ public class WorkflowPersistReplayErrorsTest extends 
RebindTestFixture<BasicAppl
         w1 = doTestRetentionDisabled("1", "min(1,5s)", true, false, false);
 
         // only w1 should be persisted
-        Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), 
MutableSet.of(w1.getWorkflowId()));
+        Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), 
MutableSet.of(w1.getWorkflowId()));
 
         // run something else within 5s, should now be persisting 2
 
         w2 = doTestRetentionDisabled("1", "min(1,5s)", true, false, false);
 
-        Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), 
MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId()));
+        Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), 
MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId()));
 
         // wait 5s and run something, it should cause everything else to expire
         Time.sleep(Duration.FIVE_SECONDS);
         wp.get().expireOldWorkflows(app, null);
         // should now be empty
-        Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), 
MutableSet.of());
+        Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), 
MutableSet.of());
 
         String longWait = "10s";
 
@@ -981,22 +992,22 @@ public class WorkflowPersistReplayErrorsTest extends 
RebindTestFixture<BasicAppl
         w3 = doTestRetentionDisabled("hash my-fixed-hash max(1,"+longWait+")", 
"context", false, true, false);
         // should now have all 3
         wp.get().expireOldWorkflows(app, null);
-        Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), 
MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId(), w3.getWorkflowId()));
+        Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), 
MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId(), w3.getWorkflowId()));
 
         Time.sleep(Duration.seconds(5));
         // now just the last 1 (only 1 in 10s)
         wp.get().expireOldWorkflows(app, null);
-        Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), 
MutableSet.of(w3.getWorkflowId()));
+        Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), 
MutableSet.of(w3.getWorkflowId()));
 
         Time.sleep(Duration.seconds(5));
         // still have last 1 (even after 10s)
         wp.get().expireOldWorkflows(app, null);
-        Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), 
MutableSet.of(w3.getWorkflowId()));
+        Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), 
MutableSet.of(w3.getWorkflowId()));
 
         // run two more, that's all we should have
         w1 = doTestRetentionDisabled("1", "hash my-fixed-hash", false, true, 
false);
         w2 = doTestRetentionDisabled("1", "context", false, true, false);
-        Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), 
MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId()));
+        Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), 
MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId()));
     }
 
     int effNameCount = 0;

Reply via email to