This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 16f868a9b918c46e32fb689f6abc7c6c8c9ff501 Author: Alex Heneveld <a...@cloudsoft.io> AuthorDate: Mon Apr 1 11:30:51 2024 +0100 allow soft retention to be specified, globally or per workflow/step --- .../core/workflow/WorkflowExecutionContext.java | 7 +- .../store/WorkflowRetentionAndExpiration.java | 184 ++++++++++++++++----- .../store/WorkflowStateActiveInMemory.java | 18 +- .../store/WorkflowStatePersistenceViaSensors.java | 71 +++----- .../workflow/utils/WorkflowRetentionParser.java | 105 ++++++++---- .../workflow/WorkflowPersistReplayErrorsTest.java | 22 ++- .../SoftwareProcessRebindNotRunningEntityTest.java | 8 +- 7 files changed, 275 insertions(+), 140 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java index 8cffdcbcc3..f026023198 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java @@ -633,17 +633,12 @@ public class WorkflowExecutionContext { return ((EntityInternal)getEntity()).getManagementContext(); } - @JsonIgnore - protected WorkflowStatePersistenceViaSensors getPersister() { - return new WorkflowStatePersistenceViaSensors(getManagementContext()); - } - public void persist() { if (isInErrorHandlerSubWorkflow()) { // currently don't persist error handler sub-workflows return; } - getPersister().checkpoint(this); + WorkflowRetentionAndExpiration.checkpoint(getManagementContext(), this); } /** Get the value of the input. Supports Brooklyn DSL resolution but NOT Freemarker resolution. */ diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java index 4c2c667d59..852c75d11f 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowRetentionAndExpiration.java @@ -18,9 +18,20 @@ */ package org.apache.brooklyn.core.workflow.store; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; +import javax.annotation.Nullable; + import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.mgmt.ManagementContext; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; @@ -28,6 +39,7 @@ import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.workflow.WorkflowExecutionContext; +import org.apache.brooklyn.core.workflow.store.WorkflowStatePersistenceViaSensors.PersistenceWithQueuedTasks; import org.apache.brooklyn.core.workflow.utils.WorkflowRetentionParser; import org.apache.brooklyn.util.collections.MutableSet; import org.apache.brooklyn.util.core.task.BasicExecutionManager; @@ -36,15 +48,51 @@ import org.apache.brooklyn.util.text.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; -import java.util.stream.Collectors; - public class WorkflowRetentionAndExpiration { private static final Logger log = LoggerFactory.getLogger(WorkflowRetentionAndExpiration.class); public static final ConfigKey<String> WORKFLOW_RETENTION_DEFAULT = ConfigKeys.newStringConfigKey("workflow.retention.default", - "Default retention for workflows", "3"); + "Default retention for workflows (persisted)", "3"); + public static final ConfigKey<String> WORKFLOW_RETENTION_DEFAULT_SOFT = ConfigKeys.newStringConfigKey("workflow.retention.default.soft", + "Default soft retention for workflows (in-memory)", "3"); + + public static void checkpoint(ManagementContext mgmt, WorkflowExecutionContext context) { + Entity entity = context.getEntity(); + if (Entities.isUnmanagingOrNoLongerManaged(entity)) { + log.debug("Skipping persistence of "+context+" as entity is no longer active here"); + return; + } + + doGlobalUpdateIfNeededOnDiskAndInMemory(mgmt); + + new WorkflowStatePersistenceViaSensors(mgmt).checkpoint(context, PersistenceWithQueuedTasks.WARN); + + // keep active workflows in memory, even if disabled + WorkflowStateActiveInMemory.get(context.getManagementContext()).checkpoint(context); + } + + static final long GLOBAL_UPDATE_FREQUENCY = 5*60*1000; // every 5m wipe out old workflows + + static void doGlobalUpdateIfNeededOnDiskAndInMemory(ManagementContext mgmt) { + WorkflowStateActiveInMemory inMem = WorkflowStateActiveInMemory.get(mgmt); + + if (inMem.lastGlobalClear + GLOBAL_UPDATE_FREQUENCY > System.currentTimeMillis()) return; + inMem.lastGlobalClear = System.currentTimeMillis(); + + AtomicInteger total = new AtomicInteger(0); + Collection<Entity> entities = mgmt.getEntityManager().getEntities(); + entities.forEach(entity -> { + // on disk + int change = new WorkflowStatePersistenceViaSensors(mgmt).expireOldWorkflowsOnDisk(entity, null); + if (change!=0) log.debug("Global entity workflow persistence update, removed "+(-change)+" workflows from "+entity); + total.addAndGet(change); + + // in memory + inMem.recomputeExpiration(entity, null); + }); + if (total.get()!=0) log.debug("Global entity workflow persistence update, removed "+(-total.get())+" workflows across all "+entities.size()+" entities"); + } @JsonInclude(JsonInclude.Include.NON_EMPTY) public static class WorkflowRetentionSettings { @@ -60,35 +108,64 @@ public class WorkflowRetentionAndExpiration { @JsonIgnore private transient WorkflowRetentionParser.WorkflowRetentionFilter expiryFn; + @JsonIgnore + private transient WorkflowRetentionParser.WorkflowRetentionFilter softExpiryFn; public WorkflowRetentionParser.WorkflowRetentionFilter getExpiryFn(WorkflowExecutionContext w) { return init(w).expiryFn; } + public WorkflowRetentionParser.WorkflowRetentionFilter getSoftExpiryFn(WorkflowExecutionContext w) { + return init(w).softExpiryFn; + } public WorkflowRetentionSettings init(WorkflowExecutionContext w) { if (w.getParent()!=null && Boolean.TRUE.equals(w.getParent().getRetentionSettings().disabled)) { disabled = true; - } else if (expiryFn==null) { - expiryResolved = expiryResolved!=null ? expiryResolved : expiry; - expiryFn = new WorkflowRetentionParser(expiryResolved).parse(); - if (w != null) { - Set<String> set = INIT_REENTRANT.get(); - if (set==null) { - set = MutableSet.of(); - INIT_REENTRANT.set(set); - } - if (!set.add(w.getWorkflowId()+":"+expiryResolved)) { - // double-check we don't cause endless loops; see KeepContext notes - throw new IllegalStateException("Invalid workflow retention '"+expiryResolved+"' as it refers to itself"); + } else { + if (expiryFn == null) { + expiryResolved = expiryResolved != null ? expiryResolved : expiry; + expiryFn = new WorkflowRetentionParser(expiryResolved).parse(); + if (w != null) { + Set<String> set = INIT_REENTRANT.get(); + if (set == null) { + set = MutableSet.of(); + INIT_REENTRANT.set(set); + } + if (!set.add(w.getWorkflowId() + ":" + expiryResolved)) { + // double-check we don't cause endless loops; see KeepContext notes + throw new IllegalStateException("Invalid workflow retention '" + expiryResolved + "' as it refers to itself"); + } + try { + expiryFn = expiryFn.init(w); + } finally { + set.remove(w.getWorkflowId() + ":" + expiryResolved); + if (set.isEmpty()) INIT_REENTRANT.remove(); + } } - try { - expiryFn = expiryFn.init(w); - } finally { - set.remove(w.getWorkflowId()+":"+expiryResolved); - if (set.isEmpty()) INIT_REENTRANT.remove(); + expiryResolved = expiryFn.toString(); // remove any references to `context` that might trigger an infinite loop + } + if (softExpiryFn == null) { + softExpiryResolved = softExpiryResolved != null ? softExpiryResolved : softExpiry; + softExpiryFn = new WorkflowRetentionParser(softExpiryResolved).soft().parse(); + if (w != null) { + Set<String> set = INIT_REENTRANT.get(); + if (set == null) { + set = MutableSet.of(); + INIT_REENTRANT.set(set); + } + if (!set.add(w.getWorkflowId() + ":" + softExpiryResolved)) { + // double-check we don't cause endless loops; see KeepContext notes + throw new IllegalStateException("Invalid workflow retention '" + softExpiryResolved + "' as it refers to itself"); + } + try { + softExpiryFn = softExpiryFn.init(w); + } finally { + set.remove(w.getWorkflowId() + ":" + softExpiryResolved); + if (set.isEmpty()) INIT_REENTRANT.remove(); + } } + softExpiryResolved = softExpiryFn.toString(); // remove any references to `context` that might trigger an infinite loop } - expiryResolved = expiryFn.toString(); // remove any references to `context` that might trigger an infinite loop } return this; } @@ -101,42 +178,67 @@ public class WorkflowRetentionAndExpiration { this.expiryFn = r2.expiryFn; this.expiryResolved = r2.expiryResolved; } + if (Strings.isNonEmpty(r2.softExpiry)) { + this.softExpiry = r2.softExpiry; + this.softExpiryFn = r2.softExpiryFn; + this.softExpiryResolved = r2.softExpiryResolved; + } } } static ThreadLocal<Set<String>> INIT_REENTRANT = new ThreadLocal<Set<String>>(); - static Map<String, WorkflowExecutionContext> recomputeExpiration(Map<String, WorkflowExecutionContext> v, WorkflowExecutionContext optionalContext, boolean useSoftlyKeptExpiry) { + static Map<String, WorkflowExecutionContext> recomputeExpiration(Map<String, WorkflowExecutionContext> v, @Nullable WorkflowExecutionContext optionalContext, boolean useSoftlyKeptExpiry) { Set<String> workflowHashesToUpdate = optionalContext!=null ? MutableSet.of(Strings.firstNonBlank(optionalContext.getRetentionHash(), "empty-expiry-hash")) //should always be set : v.values().stream().map(WorkflowExecutionContext::getRetentionHash).collect(Collectors.toSet()); workflowHashesToUpdate.forEach(k -> { + // if optional context supplied, perhaps only recompute for that hash + if (optionalContext!=null && !k.equals(optionalContext.getRetentionHash())) { + if (!isExpirable(optionalContext)) { + return; + } else { + // no-op -- if it is expirable, do a full recompute for the entity, to ensure sub-workflows are no longer retained + // (cross-entity subworkflows will not be cleaned up; they will get collected when another workflow runs there, + // or when there is a global cleanup event) + } + } + List<WorkflowExecutionContext> finishedTwins = v.values().stream() .filter(c -> k.equals(c.getRetentionHash())) .filter(c -> isExpirable(c)) - .filter(c -> !c.equals(optionalContext)) .collect(Collectors.toList()); if (finishedTwins.isEmpty()) return; - Optional<WorkflowExecutionContext> existingRetentionExpiry = finishedTwins.stream().filter(w -> w.getRetentionSettings().expiry != null).findAny(); + Function<WorkflowRetentionSettings,String> expiryAccessor = useSoftlyKeptExpiry ? wrs -> wrs.softExpiry : wrs -> wrs.expiry; + Optional<WorkflowExecutionContext> existingRetentionExpiry; + if (optionalContext!=null && k.equals(optionalContext.getRetentionHash()) && expiryAccessor.apply(optionalContext.getRetentionSettings())!=null) + existingRetentionExpiry = Optional.of(optionalContext); + else + existingRetentionExpiry = finishedTwins.stream().filter(w -> expiryAccessor.apply(w.getRetentionSettings()) != null).findAny(); + WorkflowRetentionParser.WorkflowRetentionFilter expiry; - if (useSoftlyKeptExpiry) { - expiry = WorkflowRetentionParser.newDefaultSoftFilter().init(finishedTwins.iterator().next()); - } else if (existingRetentionExpiry.isPresent()) { + + if (existingRetentionExpiry.isPresent()) { // log if expiry fn differs for the same hash // (but note if it refers to parents, invocations from different places could result in different expiry functions) - if (optionalContext!=null && optionalContext.getRetentionHash().equals(k)) { - if (optionalContext.getRetentionSettings().expiry != null) { - if (!optionalContext.getRetentionSettings().expiry.equals(existingRetentionExpiry.get().getRetentionSettings().expiry)) { - log.warn("Retention specification for " + optionalContext + " '" + optionalContext.getRetentionSettings().expiry + "' is different for same hash. " + - "Expiry should be constant within a hash but " + existingRetentionExpiry.get() + " has '" + existingRetentionExpiry.get().getRetentionSettings().expiry + "'"); + // (no such warning for the soft side of it) + if (useSoftlyKeptExpiry) { + expiry = existingRetentionExpiry.get().getRetentionSettings().getSoftExpiryFn(existingRetentionExpiry.get()); + } else { + if (optionalContext != null && optionalContext.getRetentionHash().equals(k)) { + if (optionalContext.getRetentionSettings().expiry != null) { + if (!optionalContext.getRetentionSettings().expiry.equals(existingRetentionExpiry.get().getRetentionSettings().expiry)) { + log.warn("Retention specification for " + optionalContext + " '" + optionalContext.getRetentionSettings().expiry + "' is different for same hash. " + + "Expiry should be constant within a hash but " + existingRetentionExpiry.get() + " has '" + existingRetentionExpiry.get().getRetentionSettings().expiry + "'"); + } } } + expiry = existingRetentionExpiry.get().getRetentionSettings().getExpiryFn(existingRetentionExpiry.get()); } - expiry = existingRetentionExpiry.get().getRetentionSettings().getExpiryFn(existingRetentionExpiry.get()); } else { - expiry = WorkflowRetentionParser.newDefaultFilter().init(finishedTwins.iterator().next()); + expiry = WorkflowRetentionParser.newDefaultFilter(useSoftlyKeptExpiry).init(finishedTwins.iterator().next()); } Collection<WorkflowExecutionContext> retainedFinishedTwins = expiry.apply(finishedTwins); @@ -169,12 +271,16 @@ public class WorkflowRetentionAndExpiration { private static boolean isExpirable(WorkflowExecutionContext c) { if (c.getStatus() == null || !c.getStatus().expirable) return false; + + // should we expire of completed children workflows even if an ancestor workflow is not expirable? + // this would prevent silly retention of workflows whose parents are about to end, where the retention check runs on the child just before the parent finishes; + // however it would also limit the ability to inspect children workflows e.g. in a foreach block, as only e.g. 3 of the children would be kept ever. + // on balance, do NOT expire those; wait for another event to trigger their clean-up. + // (if the child workflow is marked disabled, it is not persisted, but takes effect in all other cases.) if (c.getParent()!=null) { - // XXX for size reasons, should skip this - fow now, don't expire children workflows unless parents are also expirable if (!isExpirable(c.getParent())) return false; - - // we could weaken this if we have lots of children workflows, but that is more work; left as an enhancement } + return true; } @@ -192,6 +298,6 @@ public class WorkflowRetentionAndExpiration { public static void expireOldWorkflows(Entity entity) { - new WorkflowStatePersistenceViaSensors(((EntityInternal)entity).getManagementContext()).updateMaps(entity, true, true, null, null); + new WorkflowStatePersistenceViaSensors(((EntityInternal)entity).getManagementContext()).updateMaps(entity, null, true, true, true, null, null); } } diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java index 88fe2319f2..0083b167dc 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStateActiveInMemory.java @@ -21,6 +21,7 @@ package org.apache.brooklyn.core.workflow.store; import java.util.Map; import java.util.Set; import java.util.function.Function; +import javax.annotation.Nullable; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; @@ -30,7 +31,6 @@ import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.workflow.WorkflowExecutionContext; -import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.collections.MutableSet; import org.slf4j.Logger; @@ -42,7 +42,10 @@ public class WorkflowStateActiveInMemory { public static final ConfigKey<WorkflowStateActiveInMemory> IN_MEMORY_WORKFLOWS = ConfigKeys.newConfigKey(WorkflowStateActiveInMemory.class, "internals.brooklyn.workflow.in_memory"); - private static final long GLOBAL_UPDATE_FREQUENCY = 5*60*1000; // every 5m wipe out workflows from old entities + long lastInMemEntitiesClear = System.currentTimeMillis(); + + // this applies to both sensors and active, but is stored here as this instance is kept on the mgmt context + long lastGlobalClear = System.currentTimeMillis(); public static WorkflowStateActiveInMemory get(ManagementContext mgmt) { WorkflowStateActiveInMemory localActiveWorkflows = mgmt.getScratchpad().get(IN_MEMORY_WORKFLOWS); @@ -69,10 +72,8 @@ public class WorkflowStateActiveInMemory { this.mgmt = mgmt; } - long lastInMemClear = System.currentTimeMillis(); - public void expireAbsentEntities() { - lastInMemClear = System.currentTimeMillis(); + lastInMemEntitiesClear = System.currentTimeMillis(); Set<String> copy; synchronized (active) { copy = MutableSet.copyOf(active.keySet()); } synchronized (completedSoftlyKept) { copy.addAll(completedSoftlyKept.keySet()); } @@ -89,11 +90,12 @@ public class WorkflowStateActiveInMemory { if (context.getStatus().expirable) { withActiveForEntity(context.getEntity().getId(), false, wfm -> wfm.remove(context.getWorkflowId())); withSoftlyKeptForEntity(context.getEntity().getId(), true, wfm -> { wfm.put(context.getWorkflowId(), context); return null; }); + recomputeExpiration(context.getEntity(), context); } else { // keep active workflows in memory, even if disabled withActiveForEntity(context.getEntity().getId(), true, wfm -> wfm.put(context.getWorkflowId(), context)); } - if (lastInMemClear + GLOBAL_UPDATE_FREQUENCY < System.currentTimeMillis()) { + if (lastInMemEntitiesClear + WorkflowRetentionAndExpiration.GLOBAL_UPDATE_FREQUENCY < System.currentTimeMillis()) { // poor man's cleanup, every minute, but good enough expireAbsentEntities(); } @@ -160,9 +162,9 @@ public class WorkflowStateActiveInMemory { return result; } - public void recomputeExpiration(Entity entity) { + public void recomputeExpiration(Entity entity, @Nullable WorkflowExecutionContext optionalContext) { withSoftlyKeptForEntity(entity.getId(), false, wfm -> { - WorkflowRetentionAndExpiration.recomputeExpiration(wfm.asMap(), null, true); + WorkflowRetentionAndExpiration.recomputeExpiration(wfm.asMap(), optionalContext, true); return null; }); } diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java index 97c37fc1a3..ffd1698c91 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java @@ -18,6 +18,14 @@ */ package org.apache.brooklyn.core.workflow.store; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import javax.annotation.Nullable; + import com.google.common.reflect.TypeToken; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.mgmt.ManagementContext; @@ -25,7 +33,6 @@ import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.core.workflow.WorkflowExecutionContext; @@ -35,15 +42,6 @@ import org.apache.brooklyn.util.guava.Maybe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; -import java.util.stream.Collectors; - public class WorkflowStatePersistenceViaSensors { private static final Logger log = LoggerFactory.getLogger(WorkflowStatePersistenceViaSensors.class); @@ -52,8 +50,6 @@ public class WorkflowStatePersistenceViaSensors { public static final AttributeSensor<Map<String,WorkflowExecutionContext>> INTERNAL_WORKFLOWS = Sensors.newSensor(new TypeToken<Map<String, WorkflowExecutionContext>>() {}, "internals.brooklyn.workflow"); - private static final long GLOBAL_UPDATE_FREQUENCY = 5*60*1000; // every 5m wipe out old workflows - public static WorkflowStatePersistenceViaSensors get(ManagementContext mgmt) { WorkflowStatePersistenceViaSensors sharedInstance = mgmt.getScratchpad().get(SENSOR_WORKFLOW_PERSISTER); if (sharedInstance==null) { @@ -76,27 +72,11 @@ public class WorkflowStatePersistenceViaSensors { enum PersistenceWithQueuedTasks { ALLOW, WARN, FAIL } - long lastInMemClear = System.currentTimeMillis(); - - public void checkpoint(WorkflowExecutionContext context) { - checkpoint(context, PersistenceWithQueuedTasks.WARN); - } - public void checkpoint(WorkflowExecutionContext context, PersistenceWithQueuedTasks expectQueuedTasks) { - doGlobalUpdateIfNeeded(); - - Entity entity = context.getEntity(); - if (Entities.isUnmanagingOrNoLongerManaged(entity)) { - log.debug("Skipping persistence of "+context+" as entity is no longer active here"); - return; - } - - // keep active workflows in memory, even if disabled - WorkflowStateActiveInMemory.get(context.getManagementContext()).checkpoint(context); - + protected void checkpoint(WorkflowExecutionContext context, PersistenceWithQueuedTasks expectQueuedTasks) { if (Boolean.TRUE.equals(context.getRetentionSettings().disabled)) { if (getFromTag(BrooklynTaskTags.tagForWorkflow(context), false, false)!=null) { // need to clear - updateMap(entity, false, true, v -> v.remove(context.getWorkflowId(), context)); + updateMap(context.getEntity(), context, false, true, v -> v.remove(context.getWorkflowId(), context)); } return; } @@ -110,28 +90,15 @@ public class WorkflowStatePersistenceViaSensors { } } - expireOldWorkflows(entity, context); - } - - private void doGlobalUpdateIfNeeded() { - if (lastInMemClear + GLOBAL_UPDATE_FREQUENCY > System.currentTimeMillis()) return; - lastInMemClear = System.currentTimeMillis(); - AtomicInteger total = new AtomicInteger(0); - Collection<Entity> entities = mgmt.getEntityManager().getEntities(); - entities.forEach(entity -> { - int change = expireOldWorkflows(entity, null); - if (change!=0) log.debug("Global entity workflow persistence update, removed "+(-change)+" workflows from "+entity); - total.addAndGet(change); - }); - if (total.get()!=0) log.debug("Global entity workflow persistence update, removed "+(-total.get())+" workflows across all "+entities.size()+" entities"); + expireOldWorkflowsOnDisk(context.getEntity(), context); } - public int expireOldWorkflows(Entity entity, @Nullable WorkflowExecutionContext context) { + int expireOldWorkflowsOnDisk(Entity entity, @Nullable WorkflowExecutionContext context) { // clear interrupt status so we can persist e.g. if we are interrupted or shutdown boolean interrupted = Thread.interrupted(); boolean doExpiry = WorkflowRetentionAndExpiration.isExpirationCheckNeeded(entity); try { - return updateMaps(entity, doExpiry, true, context==null ? null : v -> v.put(context.getWorkflowId(), context), null); + return updateMaps(entity, null, doExpiry, false, true, context==null ? null : v -> v.put(context.getWorkflowId(), context), null); } finally { if (interrupted) Thread.currentThread().interrupt(); @@ -142,7 +109,7 @@ public class WorkflowStatePersistenceViaSensors { if (w.getStatus()==null || w.getStatus().expirable || w.getStatus()== WorkflowExecutionContext.WorkflowStatus.STAGED) { log.debug("Explicit request to delete workflow "+w); AtomicBoolean result = new AtomicBoolean(false); - updateMaps(w.getEntity(), false, true, map -> { + updateMaps(w.getEntity(), w, false, false, true, map -> { boolean removed = WorkflowRetentionAndExpiration.deleteWorkflowFromMap(map, w, true, true); if (removed) result.set(true); }, w); @@ -153,26 +120,26 @@ public class WorkflowStatePersistenceViaSensors { } } - int updateMaps(Entity entity, boolean doExpiry, boolean persist, Consumer<Map<String,WorkflowExecutionContext>> action, WorkflowExecutionContext contextToRemoveFromSoftMemory) { - int result = updateMap(entity, doExpiry, persist, action); + int updateMaps(Entity entity, @Nullable WorkflowExecutionContext optionalContext, boolean doExpiryForSensor, boolean doExpiryInMemory, boolean persist, Consumer<Map<String,WorkflowExecutionContext>> action, WorkflowExecutionContext contextToRemoveFromSoftMemory) { + int result = updateMap(entity, optionalContext, doExpiryForSensor, persist, action); // and update softly kept WorkflowStateActiveInMemory activeInMemory = WorkflowStateActiveInMemory.get(mgmt); if (contextToRemoveFromSoftMemory!=null) { activeInMemory.deleteWorkflow(contextToRemoveFromSoftMemory); } - if (doExpiry) activeInMemory.recomputeExpiration(entity); + if (doExpiryInMemory) activeInMemory.recomputeExpiration(entity, optionalContext); return result; } - int updateMap(Entity entity, boolean doExpiry, boolean persist, Consumer<Map<String,WorkflowExecutionContext>> action) { + int updateMap(Entity entity, @Nullable WorkflowExecutionContext optionalContext, boolean doExpiry, boolean persist, Consumer<Map<String,WorkflowExecutionContext>> action) { AtomicInteger delta = new AtomicInteger(0); entity.sensors().modify(INTERNAL_WORKFLOWS, vo -> { Map<String, WorkflowExecutionContext> v = MutableMap.copyOf(vo); delta.set(-v.size()); if (action!=null) action.accept(v); - if (doExpiry) v = WorkflowRetentionAndExpiration.recomputeExpiration(v, null, false); + if (doExpiry) v = WorkflowRetentionAndExpiration.recomputeExpiration(v, optionalContext, false); delta.getAndAdd(v.size()); return Maybe.of(v); }); diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java b/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java index a5dbe86b0f..db5a2bdf3f 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/utils/WorkflowRetentionParser.java @@ -18,6 +18,18 @@ */ package org.apache.brooklyn.core.workflow.utils; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import javax.annotation.Nullable; + import org.apache.brooklyn.core.workflow.WorkflowExecutionContext; import org.apache.brooklyn.core.workflow.WorkflowExpressionResolution; import org.apache.brooklyn.core.workflow.store.WorkflowRetentionAndExpiration; @@ -27,13 +39,7 @@ import org.apache.brooklyn.util.collections.MutableSet; import org.apache.brooklyn.util.guava.Maybe; import org.apache.brooklyn.util.text.Strings; import org.apache.brooklyn.util.time.Duration; - -import javax.annotation.Nullable; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.*; -import java.util.function.Function; -import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; public class WorkflowRetentionParser { @@ -82,17 +88,48 @@ also allows `hard` at start or end, or `soft [limit]` at end continue; } - // TODO soft/hard keyword; take whichever occurs last - if (retentionExpression.contains(" hash ")) { - if (result.hash != null) - throw new IllegalArgumentException("Cannot set multiple 'hash' in retention expression"); - int i = retentionExpression.indexOf(" hash "); - result.hash = Strings.removeFromStart(retentionExpression.substring(i).trim(), "hash").trim(); - retentionExpression = retentionExpression.substring(0, i).trim(); - continue; + List<Pair<String,Integer>> specialTerms = MutableList.of(); + for (String term: MutableList.of("hash", "soft", "hard")) + specialTerms.add(Pair.of(term, retentionExpression.indexOf(" "+term+" "))); + Collections.sort(specialTerms, (x,y) -> -Integer.compare(x.getRight(), y.getRight())); + Pair<String, Integer> last = specialTerms.iterator().next(); + if (last.getRight()>=0) { + if ("hash".equals(last.getLeft())) { + if (result.hash != null) + throw new IllegalArgumentException("Cannot set multiple 'hash' in retention expression"); + result.hash = Strings.removeFromStart(retentionExpression.substring(last.getRight()).trim(), last.getLeft()).trim(); + retentionExpression = retentionExpression.substring(0, last.getRight()).trim(); + continue; + } + if ("hard".equals(last.getLeft())) { + if (result.softExpiry != null) + throw new IllegalArgumentException("Cannot set multiple 'hard' or 'soft' in retention expression"); + result.softExpiry = "0"; + String hardTrailing = Strings.removeFromStart(retentionExpression.substring(last.getRight()).trim(), last.getLeft()).trim(); + if (Strings.isNonBlank(hardTrailing)) { + if (last.getRight() == 0) retentionExpression = hardTrailing; + else throw new IllegalArgumentException("Cannot have retention definition both before and after 'hard' keyword"); + } else { + retentionExpression = retentionExpression.substring(0, last.getRight()).trim(); + } + continue; + } + if ("soft".equals(last.getLeft())) { + if (result.softExpiry != null) + throw new IllegalArgumentException("Cannot set multiple 'hard' or 'soft' in retention expression"); + String softTrailing = Strings.removeFromStart(retentionExpression.substring(last.getRight()).trim(), last.getLeft()).trim(); + if (Strings.isNonBlank(softTrailing)) { + result.softExpiry = softTrailing; + new WorkflowRetentionParser(result.softExpiry).soft().parse(); + retentionExpression = retentionExpression.substring(0, last.getRight()).trim(); + } else { + throw new IllegalArgumentException("Specification for 'soft' retetntion must provide retention expression after the keyword"); + } + continue; + } } break; - } while (false); + } while (true); if (retentionExpression.equals("disabled")) { result.disabled = true; @@ -201,6 +238,8 @@ also allows `hard` at start or end, or `soft [limit]` at end static abstract class KeepDelegate implements WorkflowRetentionFilter { WorkflowRetentionFilter delegate; + final boolean soft; + KeepDelegate(boolean soft) { this.soft = soft; } @Override public Collection<WorkflowExecutionContext> apply(Collection<WorkflowExecutionContext> workflowExecutionContexts) { if (delegate==null) throw new IllegalStateException("Not initialized"); @@ -214,30 +253,31 @@ also allows `hard` at start or end, or `soft [limit]` at end protected abstract WorkflowRetentionFilter findDelegate(WorkflowExecutionContext workflow); } static class KeepSystem extends KeepDelegate { + KeepSystem(boolean soft) { super(soft); } @Override public WorkflowRetentionFilter findDelegate(WorkflowExecutionContext workflow) { if (workflow==null) throw new IllegalStateException("Retention 'system' cannot be used here"); - return new WorkflowRetentionParser(workflow.getManagementContext().getConfig().getConfig(WorkflowRetentionAndExpiration.WORKFLOW_RETENTION_DEFAULT)).parse().init(null); + return new WorkflowRetentionParser(workflow.getManagementContext().getConfig().getConfig( + soft ? WorkflowRetentionAndExpiration.WORKFLOW_RETENTION_DEFAULT_SOFT : WorkflowRetentionAndExpiration.WORKFLOW_RETENTION_DEFAULT)) + .soft(soft).parse().init(null); } @Override public String toString() { return "system"; } } - public static WorkflowRetentionFilter newDefaultFilter() { - return new KeepParent(); - } - public static WorkflowRetentionFilter newDefaultSoftFilter() { - return new KeepSystem(); + public static WorkflowRetentionFilter newDefaultFilter(boolean soft) { + return new KeepParent(soft); } static class KeepParent extends KeepDelegate { + KeepParent(boolean soft) { super(soft); } @Override public WorkflowRetentionFilter findDelegate(WorkflowExecutionContext workflow) { if (workflow == null) throw new IllegalStateException("Retention 'parent' cannot be used here"); else if (workflow.getParent()!=null) { - return workflow.getParent().getRetentionSettings().getExpiryFn(workflow.getParent()); + return soft ? workflow.getParent().getRetentionSettings().getSoftExpiryFn(workflow.getParent()) : workflow.getParent().getRetentionSettings().getExpiryFn(workflow.getParent()); } else { - return new KeepSystem().init(workflow); + return new KeepSystem(soft).init(workflow); } } @Override @@ -246,12 +286,13 @@ also allows `hard` at start or end, or `soft [limit]` at end } } static class KeepContext extends KeepDelegate { + KeepContext(boolean soft) { super(soft); } @Override public WorkflowRetentionFilter findDelegate(WorkflowExecutionContext workflow) { if (workflow == null) throw new IllegalStateException("Retention 'context' cannot be used here"); // expands to string to something that doesn't reference context so that this does not infinitely recurse - return workflow.getRetentionSettings().getExpiryFn(workflow); + return soft ? workflow.getRetentionSettings().getSoftExpiryFn(workflow) : workflow.getRetentionSettings().getExpiryFn(workflow); } @Override public String toString() { @@ -261,17 +302,21 @@ also allows `hard` at start or end, or `soft [limit]` at end String fullExpression; String rest; + boolean soft = false; public WorkflowRetentionParser(String fullExpression) { this.fullExpression = fullExpression; } + public WorkflowRetentionParser soft() { return soft(true); } + public WorkflowRetentionParser soft(boolean soft) { this.soft = soft; return this; } + public WorkflowRetentionFilter parse() { - if (Strings.isBlank(fullExpression)) return newDefaultFilter(); + if (Strings.isBlank(fullExpression)) return newDefaultFilter(soft); rest = Strings.trimStart(fullExpression.toLowerCase()); WorkflowRetentionFilter result = parseTerm(); - if (!Strings.isBlank(rest)) return newDefaultFilter(); + if (!Strings.isBlank(rest)) return newDefaultFilter(soft); return result; } @@ -337,9 +382,9 @@ also allows `hard` at start or end, or `soft [limit]` at end if (term.isPresent()) return term.get(); if (eatNA("all") || eatNA("forever")) return new KeepAll(); - if (eatNA("system")) return new KeepSystem(); - if (eatNA("parent")) return new KeepParent(); - if (eatNA("context")) return new KeepContext(); + if (eatNA("system")) return new KeepSystem(soft); + if (eatNA("parent")) return new KeepParent(soft); + if (eatNA("context")) return new KeepContext(soft); int i = maxPositive(rest.indexOf(","), rest.indexOf(")")); if (i==-1) i = rest.length(); diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java index f90a0a27d9..4f5cb3684c 100644 --- a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java @@ -20,6 +20,7 @@ package org.apache.brooklyn.core.workflow; import ch.qos.logback.classic.Level; import ch.qos.logback.classic.spi.ILoggingEvent; +import com.google.common.base.Predicates; import com.google.common.base.Stopwatch; import com.google.common.collect.Iterables; import org.apache.brooklyn.api.entity.EntityLocal; @@ -39,6 +40,7 @@ import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal; import org.apache.brooklyn.core.mgmt.rebind.RebindOptions; import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixture; import org.apache.brooklyn.core.sensor.Sensors; +import org.apache.brooklyn.core.workflow.store.WorkflowRetentionAndExpiration; import org.apache.brooklyn.core.workflow.store.WorkflowStatePersistenceViaSensors; import org.apache.brooklyn.entity.stock.BasicApplication; import org.apache.brooklyn.test.Asserts; @@ -927,6 +929,18 @@ public class WorkflowPersistReplayErrorsTest extends RebindTestFixture<BasicAppl Asserts.assertSize(wp.get().getWorkflows(app, false).keySet(), 1); Asserts.assertSize(wp.get().getWorkflows(app, true).keySet(), 3); + // setting soft 4 should give one more, and allow hash keyword at end + w1 = doTestRetentionDisabled(2, "min(1,context) soft 4 hash my-fixed-hash", false, false, false); + Asserts.assertEquals(lastWorkflowContext.getRetentionSettings().softExpiryResolved, "4"); + Asserts.assertSize(wp.get().getWorkflows(app, false).keySet(), 1); + Asserts.assertSize(wp.get().getWorkflows(app, true).keySet(), 4); + + // and soft with min(2,...) allowing hash keyword before should also work + w1 = doTestRetentionDisabled(2, "min(1,context) hash my-fixed-hash soft min(2,system)", false, false, false); + Asserts.assertEquals(lastWorkflowContext.getRetentionSettings().softExpiryResolved, "min(2,system)"); + Asserts.assertSize(wp.get().getWorkflows(app, false).keySet(), 1); + Asserts.eventually(() -> wp.get().getWorkflows(app, true).keySet().size(), Predicates.equalTo(2), Duration.seconds(2)); + // invoking our test gives a new workflow hash because the effector name is different w2 = doTestRetentionDisabled(2, "1", false, false, false); Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId())); @@ -978,7 +992,7 @@ public class WorkflowPersistReplayErrorsTest extends RebindTestFixture<BasicAppl // wait 5s and run something, it should cause everything else to expire Time.sleep(Duration.FIVE_SECONDS); - wp.get().expireOldWorkflows(app, null); + WorkflowRetentionAndExpiration.expireOldWorkflows(app); // should now be empty Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), MutableSet.of()); @@ -991,17 +1005,17 @@ public class WorkflowPersistReplayErrorsTest extends RebindTestFixture<BasicAppl Time.sleep(Duration.seconds(5)); w3 = doTestRetentionDisabled("hash my-fixed-hash max(1,"+longWait+")", "context", false, true, false); // should now have all 3 - wp.get().expireOldWorkflows(app, null); + WorkflowRetentionAndExpiration.expireOldWorkflows(app); Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId(), w3.getWorkflowId())); Time.sleep(Duration.seconds(5)); // now just the last 1 (only 1 in 10s) - wp.get().expireOldWorkflows(app, null); + WorkflowRetentionAndExpiration.expireOldWorkflows(app); Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), MutableSet.of(w3.getWorkflowId())); Time.sleep(Duration.seconds(5)); // still have last 1 (even after 10s) - wp.get().expireOldWorkflows(app, null); + WorkflowRetentionAndExpiration.expireOldWorkflows(app); Asserts.assertEquals(wp.get().getWorkflows(app, false).keySet(), MutableSet.of(w3.getWorkflowId())); // run two more, that's all we should have diff --git a/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessRebindNotRunningEntityTest.java b/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessRebindNotRunningEntityTest.java index 7d1849af97..4defc5b109 100644 --- a/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessRebindNotRunningEntityTest.java +++ b/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessRebindNotRunningEntityTest.java @@ -129,7 +129,13 @@ public class SoftwareProcessRebindNotRunningEntityTest extends RebindTestFixture } if (latch instanceof TerminableCountDownLatch) ((TerminableCountDownLatch)latch).terminate(); } - super.tearDown(Duration.millis(10)); // stops here can be blocked, don't wait on them + try { + super.tearDown(Duration.millis(50)); // stops here can be blocked, don't wait on them + } catch (Exception e) { + // we fail on this in case it is a real problem, but not believed to be, only seen occasionally, and not since timeout was increased 2024-04-01 + LOG.warn("Teardown of test encountered exception; not unknown if multiple processes attempt to destroy, as destruction is deliberately unsynchronized to minimize race errors", e); + throw Exceptions.propagateAnnotated("Concurrent teardown issue", e); + } if (executor != null) executor.shutdownNow(); } finally { latches.clear();