This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 51ad80bf693adc11085d219842bf529ad04257b9 Author: Alex Heneveld <a...@cloudsoft.io> AuthorDate: Fri Mar 29 23:54:50 2024 +0000 allow keeping soft references to previous workflows means we can set 'retention: disabled' for persistence purposes but still - usually - have access to them in memory. does not yet apply any limits to that cache, but thinking is to follow system default for now, and perhaps to allow configuration in future. --- .../camp/brooklyn/WorkflowYamlRebindTest.java | 18 ++-- .../store/WorkflowRetentionAndExpiration.java | 7 +- .../store/WorkflowStateActiveInMemory.java | 106 ++++++++++++--------- .../store/WorkflowStatePersistenceViaSensors.java | 13 ++- .../workflow/utils/WorkflowRetentionParser.java | 34 +++++-- .../workflow/WorkflowPersistReplayErrorsTest.java | 2 +- 6 files changed, 111 insertions(+), 69 deletions(-) diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlRebindTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlRebindTest.java index b4b1145185..697428ccec 100644 --- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlRebindTest.java +++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlRebindTest.java @@ -18,7 +18,10 @@ */ package org.apache.brooklyn.camp.brooklyn; -import com.google.common.base.Stopwatch; +import java.io.File; +import java.util.Map; +import java.util.Set; + import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -28,20 +31,20 @@ import org.apache.brooklyn.api.entity.EntitySpec; import org.apache.brooklyn.api.location.LocationSpec; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.api.objs.EntityAdjunct; -import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.api.sensor.Feed; import org.apache.brooklyn.camp.brooklyn.spi.dsl.methods.BrooklynDslCommon; import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.core.entity.*; +import org.apache.brooklyn.core.entity.Dumper; +import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.entity.EntityAsserts; +import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext; -import org.apache.brooklyn.core.mgmt.rebind.RebindOptions; import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.core.test.entity.TestApplication; import org.apache.brooklyn.core.test.entity.TestEntity; import org.apache.brooklyn.core.workflow.WorkflowBasicTest; import org.apache.brooklyn.core.workflow.WorkflowEffector; -import org.apache.brooklyn.core.workflow.WorkflowExecutionContext; import org.apache.brooklyn.entity.software.base.EmptySoftwareProcess; import org.apache.brooklyn.entity.stock.BasicApplication; import org.apache.brooklyn.entity.stock.BasicEntity; @@ -59,11 +62,6 @@ import org.slf4j.LoggerFactory; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.io.File; -import java.util.Map; -import java.util.Set; -import java.util.function.Predicate; - public class WorkflowYamlRebindTest extends AbstractYamlRebindTest { private static final Logger log = LoggerFactory.getLogger(WorkflowYamlRebindTest.class); diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java index cc0adc5e9b..c6c8a16d73 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java @@ -50,9 +50,14 @@ public class WorkflowRetentionAndExpiration { public static class WorkflowRetentionSettings { public Boolean disabled; public String hash; + public String expiry; public String expiryResolved; + // soft expiry refers to what is kept in memory + public String softExpiry; + public String softExpiryResolved; + @JsonIgnore private transient WorkflowRetentionParser.WorkflowRetentionFilter expiryFn; @@ -163,7 +168,7 @@ public class WorkflowRetentionAndExpiration { private static boolean isExpirable(WorkflowExecutionContext c) { if (c.getStatus() == null || !c.getStatus().expirable) return false; if (c.getParent()!=null) { - // fow now, don't expire children workflows unless parents are also expirable + // XXX for size reasons, should skip this - fow now, don't expire children workflows unless parents are also expirable if (!isExpirable(c.getParent())) return false; // we could weaken this if we have lots of children workflows, but that is more work; left as an enhancement diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java index d27a161a16..59f5780a87 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java @@ -20,13 +20,17 @@ package org.apache.brooklyn.core.workflow.store; import java.util.Map; import java.util.Set; +import java.util.function.Function; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.mgmt.ManagementContext; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.workflow.WorkflowExecutionContext; +import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.collections.MutableSet; import org.slf4j.Logger; @@ -55,7 +59,10 @@ public class WorkflowStateActiveInMemory { } private final ManagementContext mgmt; - final Map<String,Map<String,WorkflowExecutionContext>> data = MutableMap.of(); + // active workflows by entity then workflow id + final Map<String,Map<String,WorkflowExecutionContext>> active = MutableMap.of(); + // cached remembered workflows by entity then workflow id + final Map<String, Cache<String, WorkflowExecutionContext>> completedSoftlyKept = MutableMap.of(); // created and managed by mgmt context scratchpad protected WorkflowStateActiveInMemory(ManagementContext mgmt) { @@ -67,40 +74,25 @@ public class WorkflowStateActiveInMemory { public void expireAbsentEntities() { lastInMemClear = System.currentTimeMillis(); Set<String> copy; - synchronized (data) { - copy = MutableSet.copyOf(data.keySet()); - } + synchronized (active) { copy = MutableSet.copyOf(active.keySet()); } + synchronized (completedSoftlyKept) { copy.addAll(completedSoftlyKept.keySet()); } + copy.forEach(entityId -> { if (mgmt.getEntityManager().getEntity(entityId) == null) { - data.remove(entityId); + synchronized (active) { active.remove(entityId); } + synchronized (completedSoftlyKept) { completedSoftlyKept.remove(entityId); } } }); } public void checkpoint(WorkflowExecutionContext context) { - // keep active workflows in memory, even if disabled - Map<String, WorkflowExecutionContext> entityActiveWorkflows = getForWorkflowIdWithLockButResultNeedsSynch(context.getEntity().getId()); if (context.getStatus().expirable) { - if (entityActiveWorkflows!=null) { - synchronized (entityActiveWorkflows) { - entityActiveWorkflows.remove(context.getWorkflowId()); - } - } + withActiveForEntity(context.getEntity().getId(), false, wfm -> wfm.remove(context.getWorkflowId())); + withSoftlyKeptForEntity(context.getEntity().getId(), true, wfm -> { wfm.put(context.getWorkflowId(), context); return null; }); } else { - if (entityActiveWorkflows==null) { - synchronized (data) { - entityActiveWorkflows = data.get(context.getEntity().getId()); - if (entityActiveWorkflows==null) { - entityActiveWorkflows = MutableMap.of(); - data.put(context.getEntity().getId(), entityActiveWorkflows); - } - } - } - synchronized (entityActiveWorkflows) { - entityActiveWorkflows.put(context.getWorkflowId(), context); - } + // keep active workflows in memory, even if disabled + withActiveForEntity(context.getEntity().getId(), true, wfm -> wfm.put(context.getWorkflowId(), context)); } - if (lastInMemClear + GLOBAL_UPDATE_FREQUENCY < System.currentTimeMillis()) { // poor man's cleanup, every minute, but good enough expireAbsentEntities(); @@ -112,37 +104,59 @@ public class WorkflowStateActiveInMemory { return getWorkflowsCopy(entity); } public MutableMap<String,WorkflowExecutionContext> getWorkflowsCopy(Entity entity) { - Map<String, WorkflowExecutionContext> entityActiveWorkflows = getForWorkflowIdWithLockButResultNeedsSynch(entity.getId()); - if (entityActiveWorkflows == null) return MutableMap.of(); - synchronized (entityActiveWorkflows) { - return MutableMap.copyOf(entityActiveWorkflows); - } + return getWorkflowsCopy(entity, true); + } + public MutableMap<String,WorkflowExecutionContext> getWorkflowsCopy(Entity entity, boolean includeSoftlyKeptCompleted) { + MutableMap<String,WorkflowExecutionContext> result = MutableMap.of(); + withActiveForEntity(entity.getId(), false, wfm -> { result.putAll(wfm); return null; }); + if (includeSoftlyKeptCompleted) withSoftlyKeptForEntity(entity.getId(), false, wfm -> { result.putAll(wfm.asMap()); return null; }); + return result; } boolean deleteWorkflow(WorkflowExecutionContext context) { - Map<String, WorkflowExecutionContext> entityActiveWorkflows = getForWorkflowIdWithLockButResultNeedsSynch(context.getEntity().getId()); - if (entityActiveWorkflows!=null) { - synchronized (entityActiveWorkflows) { - return entityActiveWorkflows.remove(context.getWorkflowId()) != null; - } + boolean result = false; + result = Boolean.TRUE.equals(withActiveForEntity(context.getEntity().getId(), false, wfm -> wfm.remove(context.getWorkflowId())!=null)) || result; + result = Boolean.TRUE.equals(withSoftlyKeptForEntity(context.getEntity().getId(), false, wfm -> { + WorkflowExecutionContext soft = wfm.getIfPresent(context.getWorkflowId()); + wfm.invalidate(context.getWorkflowId()); + return (soft!=null); + })) || result; + + return result; + } + + private <T> T withActiveForEntity(String entityId, boolean upsert, Function<Map<String, WorkflowExecutionContext>,T> fn) { + if (entityId==null) return null; + Map<String, WorkflowExecutionContext> result; + synchronized (active) { + result = active.computeIfAbsent(entityId, _key -> upsert ? MutableMap.of() : null); + } + if (result==null) return null; + synchronized (result) { + return fn.apply(result); } - return false; } - // note: callers should subsequently sync on the returned map - private Map<String, WorkflowExecutionContext> getForWorkflowIdWithLockButResultNeedsSynch(String entityId) { - synchronized (data) { - return data.get(entityId); + private <T> T withSoftlyKeptForEntity(String entityId, boolean upsert, Function<Cache<String, WorkflowExecutionContext>,T> fn) { + if (entityId==null) return null; + Cache<String, WorkflowExecutionContext> result; + synchronized (completedSoftlyKept) { + result = completedSoftlyKept.computeIfAbsent(entityId, _key -> upsert ? CacheBuilder.newBuilder().softValues().build() : null); + } + if (result==null) return null; + synchronized (result) { + return fn.apply(result); } } public WorkflowExecutionContext getFromTag(BrooklynTaskTags.WorkflowTaskTag tag) { - Map<String, WorkflowExecutionContext> activeForEntity = getForWorkflowIdWithLockButResultNeedsSynch(tag.getEntityId()); - if (activeForEntity!=null) { - synchronized (activeForEntity) { - return activeForEntity.get(tag.getWorkflowId()); - } + return getFromTag(tag, true); + } + public WorkflowExecutionContext getFromTag(BrooklynTaskTags.WorkflowTaskTag tag, boolean includeCompletedSoftlyKept) { + WorkflowExecutionContext result = withActiveForEntity(tag.getEntityId(), false, wfm -> wfm.get(tag.getWorkflowId())); + if (includeCompletedSoftlyKept && result==null) { + result = withSoftlyKeptForEntity(tag.getEntityId(), false, wfm -> wfm.getIfPresent(tag.getWorkflowId())); } - return null; + return result; } } diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java index 63170c539b..82ef7b0c30 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java @@ -94,7 +94,7 @@ public class WorkflowStatePersistenceViaSensors { WorkflowStateActiveInMemory.get(context.getManagementContext()).checkpoint(context); if (Boolean.TRUE.equals(context.getRetentionSettings().disabled)) { - if (getFromTag(BrooklynTaskTags.tagForWorkflow(context), false)!=null) { + if (getFromTag(BrooklynTaskTags.tagForWorkflow(context), false, false)!=null) { // need to clear updateMap(entity, false, true, v -> v.remove(context.getWorkflowId(), context)); } @@ -168,7 +168,7 @@ public class WorkflowStatePersistenceViaSensors { } public Map<String,WorkflowExecutionContext> getWorkflows(Entity entity) { - MutableMap<String, WorkflowExecutionContext> result = WorkflowStateActiveInMemory.get(mgmt).getWorkflowsCopy(entity); + MutableMap<String, WorkflowExecutionContext> result = WorkflowStateActiveInMemory.get(mgmt).getWorkflowsCopy(entity, true); result.add(entity.sensors().get(INTERNAL_WORKFLOWS)); return result; } @@ -185,17 +185,20 @@ public class WorkflowStatePersistenceViaSensors { } public Maybe<WorkflowExecutionContext> getFromTag(BrooklynTaskTags.WorkflowTaskTag tag) { - return getFromTag(tag, true); + return getFromTag(tag, true, true); + } + public Maybe<WorkflowExecutionContext> getFromTag(BrooklynTaskTags.WorkflowTaskTag tag, boolean includeSoftlyKeptInMemory) { + return getFromTag(tag, true, includeSoftlyKeptInMemory); } - private Maybe<WorkflowExecutionContext> getFromTag(BrooklynTaskTags.WorkflowTaskTag tag, boolean allowInMemory) { + private Maybe<WorkflowExecutionContext> getFromTag(BrooklynTaskTags.WorkflowTaskTag tag, boolean allowActiveInMemory, boolean allowActiveAndSoftlyKeptInMemory) { Entity targetEntity = mgmt.lookup(tag.getEntityId(), Entity.class); if (targetEntity==null) { return Maybe.absent("Entity "+tag.getWorkflowId()+" not found"); } else { WorkflowExecutionContext w = null; - if (allowInMemory) w = WorkflowStateActiveInMemory.get(mgmt).getFromTag(tag); + if (allowActiveInMemory || allowActiveAndSoftlyKeptInMemory) w = WorkflowStateActiveInMemory.get(mgmt).getFromTag(tag, allowActiveAndSoftlyKeptInMemory); if (w==null) { w = new WorkflowStatePersistenceViaSensors(mgmt).getWorkflows(targetEntity).get(tag.getWorkflowId()); diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java b/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java index 18ea10d7b0..d2488457bc 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java @@ -55,6 +55,8 @@ the semantics of `min` and `max` are * `max` means completed workflow instances must be retained if they meet any of the constraints implied by the `<value>` arguments, i.e. `max(2, 3, 1h, 2h)` means to keep the 3 most recent instances irrespective of when they run, and to keep all instances for up to two hours also allows a `hash <value>` to be set at the start or the end + +also allows `hard` at start or end, or `soft [limit]` at end */ public static WorkflowRetentionSettings parse(String retentionExpression, @Nullable WorkflowExecutionContext context) { @@ -63,20 +65,40 @@ also allows a `hash <value>` to be set at the start or the end if (Strings.isBlank(retentionExpression)) return result; retentionExpression = retentionExpression.trim().toLowerCase(); - if (retentionExpression.equals("disabled")) { - result.disabled = true; - - } else { - + do { if (retentionExpression.startsWith("hash ")) { + if (result.hash != null) + throw new IllegalArgumentException("Cannot set multiple 'hash' in retention expression"); retentionExpression = Strings.removeFromStart(retentionExpression, "hash").trim(); result.hash = Strings.getFirstWord(retentionExpression); retentionExpression = retentionExpression.substring(result.hash.length()).trim(); - } else if (retentionExpression.contains(" hash ")) { + continue; + } + if (retentionExpression.startsWith("hard ")) { + if (result.softExpiry != null) + throw new IllegalArgumentException("Cannot set multiple 'hard' or 'soft' in retention expression"); + retentionExpression = Strings.removeFromStart(retentionExpression, "hard").trim(); + result.softExpiry = "0"; + continue; + } + + // TODO soft/hard keyword; take whichever occurs last + if (retentionExpression.contains(" hash ")) { + if (result.hash != null) + throw new IllegalArgumentException("Cannot set multiple 'hash' in retention expression"); int i = retentionExpression.indexOf(" hash "); result.hash = Strings.removeFromStart(retentionExpression.substring(i).trim(), "hash").trim(); retentionExpression = retentionExpression.substring(0, i).trim(); + continue; } + break; + } while (false); + + if (retentionExpression.equals("disabled")) { + result.disabled = true; + + } else { + if (Strings.isNonBlank(result.hash) && context!=null) { result.hash = context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_RUNNING, result.hash, String.class); } diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java index 32a0328970..adc6fd2702 100644 --- a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java @@ -1036,7 +1036,7 @@ public class WorkflowPersistReplayErrorsTest extends RebindTestFixture<BasicAppl } app.sensors().set(b, 1); - Maybe<WorkflowExecutionContext> wf = new WorkflowStatePersistenceViaSensors(mgmt()).getFromTag(wt); + Maybe<WorkflowExecutionContext> wf = new WorkflowStatePersistenceViaSensors(mgmt()).getFromTag(wt, false); if (shouldBeDisabled) { Time.sleep(Duration.millis(500));