This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit fb7c01507a1bcb4bf01b48f33923bf161d168642 Author: Alex Heneveld <a...@cloudsoft.io> AuthorDate: Sat Mar 30 01:22:34 2024 +0000 expire appropriate softly-kept in memory completed workflows, using system default --- .../brooklyn/camp/brooklyn/WorkflowYamlTest.java | 2 +- .../store/WorkflowRetentionAndExpiration.java | 14 ++++---- .../store/WorkflowStateActiveInMemory.java | 7 ++++ .../store/WorkflowStatePersistenceViaSensors.java | 28 ++++++++++++---- .../workflow/utils/WorkflowRetentionParser.java | 3 ++ .../workflow/WorkflowPersistReplayErrorsTest.java | 37 ++++++++++++++-------- 6 files changed, 65 insertions(+), 26 deletions(-) diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java index b7ba499072..a4dc8d3bc1 100644 --- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java +++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java @@ -362,7 +362,7 @@ public class WorkflowYamlTest extends AbstractYamlTest { defs.forEach(def -> Asserts.assertThat(def, d -> !(d instanceof WorkflowStepDefinition))); } else { EntityAsserts.assertAttributeEqualsContinually(entity, MY_WORKFLOW_SENSOR, null); - Asserts.assertThat(new WorkflowStatePersistenceViaSensors(mgmt()).getWorkflows(entity).values(), Collection::isEmpty); + Asserts.assertThat(new WorkflowStatePersistenceViaSensors(mgmt()).getWorkflows(entity, false).values(), Collection::isEmpty); } return entity; } diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java index c6c8a16d73..4c2c667d59 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java @@ -106,7 +106,7 @@ public class WorkflowRetentionAndExpiration { static ThreadLocal<Set<String>> INIT_REENTRANT = new ThreadLocal<Set<String>>(); - static Map<String, WorkflowExecutionContext> recomputeExpiration(Map<String, WorkflowExecutionContext> v, WorkflowExecutionContext optionalContext) { + static Map<String, WorkflowExecutionContext> recomputeExpiration(Map<String, WorkflowExecutionContext> v, WorkflowExecutionContext optionalContext, boolean useSoftlyKeptExpiry) { Set<String> workflowHashesToUpdate = optionalContext!=null ? MutableSet.of(Strings.firstNonBlank(optionalContext.getRetentionHash(), "empty-expiry-hash")) //should always be set : v.values().stream().map(WorkflowExecutionContext::getRetentionHash).collect(Collectors.toSet()); @@ -121,7 +121,9 @@ public class WorkflowRetentionAndExpiration { Optional<WorkflowExecutionContext> existingRetentionExpiry = finishedTwins.stream().filter(w -> w.getRetentionSettings().expiry != null).findAny(); WorkflowRetentionParser.WorkflowRetentionFilter expiry; - if (existingRetentionExpiry.isPresent()) { + if (useSoftlyKeptExpiry) { + expiry = WorkflowRetentionParser.newDefaultSoftFilter().init(finishedTwins.iterator().next()); + } else if (existingRetentionExpiry.isPresent()) { // log if expiry fn differs for the same hash // (but note if it refers to parents, invocations from different places could result in different expiry functions) if (optionalContext!=null && optionalContext.getRetentionHash().equals(k)) { @@ -144,7 +146,7 @@ public class WorkflowRetentionAndExpiration { toRemove.removeAll(retainedFinishedTwins); toRemove.forEach(w -> { log.debug("Expiring old workflow " + w + " as there are "+retainedFinishedTwins.size()+" more recent ones also completed"); - deleteWorkflowFromMap(v, w, true); + deleteWorkflowFromMap(v, w, true, false); }); } }); @@ -152,9 +154,9 @@ public class WorkflowRetentionAndExpiration { return v; } - static boolean deleteWorkflowFromMap(Map<String, WorkflowExecutionContext> v, WorkflowExecutionContext w, boolean andAllReplayTasks) { + static boolean deleteWorkflowFromMap(Map<String, WorkflowExecutionContext> v, WorkflowExecutionContext w, boolean andAllReplayTasks, boolean andSoftlyKept) { boolean removed = v.remove(w.getWorkflowId()) != null; - removed |= WorkflowStateActiveInMemory.get(w.getManagementContext()).deleteWorkflow(w); + if (andSoftlyKept) removed = WorkflowStateActiveInMemory.get(w.getManagementContext()).deleteWorkflow(w) || removed; if (andAllReplayTasks) { BasicExecutionManager em = ((BasicExecutionManager) w.getManagementContext().getExecutionManager()); w.getReplays().forEach(wr -> { @@ -190,6 +192,6 @@ public class WorkflowRetentionAndExpiration { public static void expireOldWorkflows(Entity entity) { - new WorkflowStatePersistenceViaSensors(((EntityInternal)entity).getManagementContext()).updateMap(entity, true, true, null); + new WorkflowStatePersistenceViaSensors(((EntityInternal)entity).getManagementContext()).updateMaps(entity, true, true, null, null); } } diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java index 59f5780a87..88fe2319f2 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java @@ -159,4 +159,11 @@ public class WorkflowStateActiveInMemory { } return result; } + + public void recomputeExpiration(Entity entity) { + withSoftlyKeptForEntity(entity.getId(), false, wfm -> { + WorkflowRetentionAndExpiration.recomputeExpiration(wfm.asMap(), null, true); + return null; + }); + } } diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java index 82ef7b0c30..97c37fc1a3 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java @@ -131,7 +131,7 @@ public class WorkflowStatePersistenceViaSensors { boolean interrupted = Thread.interrupted(); boolean doExpiry = WorkflowRetentionAndExpiration.isExpirationCheckNeeded(entity); try { - return updateMap(entity, doExpiry, true, context==null ? null : v -> v.put(context.getWorkflowId(), context)); + return updateMaps(entity, doExpiry, true, context==null ? null : v -> v.put(context.getWorkflowId(), context), null); } finally { if (interrupted) Thread.currentThread().interrupt(); @@ -142,10 +142,10 @@ public class WorkflowStatePersistenceViaSensors { if (w.getStatus()==null || w.getStatus().expirable || w.getStatus()== WorkflowExecutionContext.WorkflowStatus.STAGED) { log.debug("Explicit request to delete workflow "+w); AtomicBoolean result = new AtomicBoolean(false); - updateMap(w.getEntity(), false, true, map -> { - boolean removed = WorkflowRetentionAndExpiration.deleteWorkflowFromMap(map, w, true); + updateMaps(w.getEntity(), false, true, map -> { + boolean removed = WorkflowRetentionAndExpiration.deleteWorkflowFromMap(map, w, true, true); if (removed) result.set(true); - }); + }, w); return result.get(); } else { log.warn("Explicit request to delete non-expirable workflow "+w+"; ignoring"); @@ -153,13 +153,26 @@ public class WorkflowStatePersistenceViaSensors { } } + int updateMaps(Entity entity, boolean doExpiry, boolean persist, Consumer<Map<String,WorkflowExecutionContext>> action, WorkflowExecutionContext contextToRemoveFromSoftMemory) { + int result = updateMap(entity, doExpiry, persist, action); + + // and update softly kept + WorkflowStateActiveInMemory activeInMemory = WorkflowStateActiveInMemory.get(mgmt); + if (contextToRemoveFromSoftMemory!=null) { + activeInMemory.deleteWorkflow(contextToRemoveFromSoftMemory); + } + if (doExpiry) activeInMemory.recomputeExpiration(entity); + + return result; + } + int updateMap(Entity entity, boolean doExpiry, boolean persist, Consumer<Map<String,WorkflowExecutionContext>> action) { AtomicInteger delta = new AtomicInteger(0); entity.sensors().modify(INTERNAL_WORKFLOWS, vo -> { Map<String, WorkflowExecutionContext> v = MutableMap.copyOf(vo); delta.set(-v.size()); if (action!=null) action.accept(v); - if (doExpiry) v = WorkflowRetentionAndExpiration.recomputeExpiration(v, null); + if (doExpiry) v = WorkflowRetentionAndExpiration.recomputeExpiration(v, null, false); delta.getAndAdd(v.size()); return Maybe.of(v); }); @@ -168,7 +181,10 @@ public class WorkflowStatePersistenceViaSensors { } public Map<String,WorkflowExecutionContext> getWorkflows(Entity entity) { - MutableMap<String, WorkflowExecutionContext> result = WorkflowStateActiveInMemory.get(mgmt).getWorkflowsCopy(entity, true); + return getWorkflows(entity, true); + } + public Map<String,WorkflowExecutionContext> getWorkflows(Entity entity, boolean includeSoftlyKeptCompleted) { + MutableMap<String, WorkflowExecutionContext> result = WorkflowStateActiveInMemory.get(mgmt).getWorkflowsCopy(entity, includeSoftlyKeptCompleted); result.add(entity.sensors().get(INTERNAL_WORKFLOWS)); return result; } diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java b/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java index d2488457bc..a5dbe86b0f 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java @@ -227,6 +227,9 @@ also allows `hard` at start or end, or `soft [limit]` at end public static WorkflowRetentionFilter newDefaultFilter() { return new KeepParent(); } + public static WorkflowRetentionFilter newDefaultSoftFilter() { + return new KeepSystem(); + } static class KeepParent extends KeepDelegate { @Override public WorkflowRetentionFilter findDelegate(WorkflowExecutionContext workflow) { diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java index adc6fd2702..f90a0a27d9 100644 --- a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java @@ -909,29 +909,40 @@ public class WorkflowPersistReplayErrorsTest extends RebindTestFixture<BasicAppl w1 = doTestRetentionDisabled("context", "min(1,2) hash my-fixed-hash", false, false, false); Asserts.assertEquals(lastWorkflowContext.getRetentionSettings().expiryResolved, "min(1,2)"); - Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w1.getWorkflowId())); + Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), MutableSet.of(w1.getWorkflowId())); w1 = doTestRetentionDisabled(2, "hash my-fixed-hash min(1,context)", false, false, false); Asserts.assertEquals(lastWorkflowContext.getRetentionSettings().expiryResolved, "min(1,2)"); - Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w1.getWorkflowId())); + Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), MutableSet.of(w1.getWorkflowId())); + + // softly in-memory should also have both + Asserts.assertSize(wp.get().getWorkflows(app, true).keySet(), 2); + + // do it 3 more times, we should have 1 persisted, but 3 in memory (not 4) + w1 = doTestRetentionDisabled(2, "hash my-fixed-hash min(1,context)", false, false, false); + w1 = doTestRetentionDisabled(2, "hash my-fixed-hash min(1,context)", false, false, false); + w1 = doTestRetentionDisabled(2, "hash my-fixed-hash min(1,context)", false, false, false); + + Asserts.assertSize(wp.get().getWorkflows(app, false).keySet(), 1); + Asserts.assertSize(wp.get().getWorkflows(app, true).keySet(), 3); // invoking our test gives a new workflow hash because the effector name is different w2 = doTestRetentionDisabled(2, "1", false, false, false); - Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId())); + Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId())); // reinvoking that effector still gives 2 Task<?> t = app.invoke(app.getEntityType().getEffectorByName("myWorkflow" + effNameCount).get(), null); t.blockUntilEnded(); w2 = BrooklynTaskTags.getWorkflowTaskTag(t, false); - Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId())); + Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId())); // hash accepts variables app.config().set(ConfigKeys.newStringConfigKey("hash"), "my-fixed-hash"); // this hash replaces old w1 w1 = doTestRetentionDisabled("context", "min(1,2) hash ${entity.config.hash}", false, false, false); - Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId())); // should replace the one above + Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId())); // should replace the one above // workflow.id as the hash variable means each invocations has its own retention w3 = doTestRetentionDisabled("context", "1 hash ${workflow.id}", false, false, false); @@ -940,7 +951,7 @@ public class WorkflowPersistReplayErrorsTest extends RebindTestFixture<BasicAppl t.blockUntilEnded(); w4 = BrooklynTaskTags.getWorkflowTaskTag(t, false); - Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId(), w3.getWorkflowId(), w4.getWorkflowId())); // should replace the one above + Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId(), w3.getWorkflowId(), w4.getWorkflowId())); // should replace the one above } @Test(groups="Integration") // very slow @@ -957,19 +968,19 @@ public class WorkflowPersistReplayErrorsTest extends RebindTestFixture<BasicAppl w1 = doTestRetentionDisabled("1", "min(1,5s)", true, false, false); // only w1 should be persisted - Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w1.getWorkflowId())); + Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), MutableSet.of(w1.getWorkflowId())); // run something else within 5s, should now be persisting 2 w2 = doTestRetentionDisabled("1", "min(1,5s)", true, false, false); - Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId())); + Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId())); // wait 5s and run something, it should cause everything else to expire Time.sleep(Duration.FIVE_SECONDS); wp.get().expireOldWorkflows(app, null); // should now be empty - Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of()); + Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), MutableSet.of()); String longWait = "10s"; @@ -981,22 +992,22 @@ public class WorkflowPersistReplayErrorsTest extends RebindTestFixture<BasicAppl w3 = doTestRetentionDisabled("hash my-fixed-hash max(1,"+longWait+")", "context", false, true, false); // should now have all 3 wp.get().expireOldWorkflows(app, null); - Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId(), w3.getWorkflowId())); + Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId(), w3.getWorkflowId())); Time.sleep(Duration.seconds(5)); // now just the last 1 (only 1 in 10s) wp.get().expireOldWorkflows(app, null); - Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w3.getWorkflowId())); + Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), MutableSet.of(w3.getWorkflowId())); Time.sleep(Duration.seconds(5)); // still have last 1 (even after 10s) wp.get().expireOldWorkflows(app, null); - Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w3.getWorkflowId())); + Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), MutableSet.of(w3.getWorkflowId())); // run two more, that's all we should have w1 = doTestRetentionDisabled("1", "hash my-fixed-hash", false, true, false); w2 = doTestRetentionDisabled("1", "context", false, true, false); - Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId())); + Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId())); } int effNameCount = 0;