This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 931caf0bf03e306d7cd15e296a7ffdcb6ef04b5a Author: Alex Heneveld <a...@cloudsoft.io> AuthorDate: Fri Mar 29 16:27:06 2024 +0000 record the workflow resolution context better allows better tracing, and the ability to use that context when coercing --- .../workflow/WorkflowExpressionResolution.java | 247 ++++++++++++--------- .../brooklyn/util/core/text/TemplateProcessor.java | 6 +- .../util/collections/ThreadLocalStack.java | 23 +- 3 files changed, 163 insertions(+), 113 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java index c8e32abad5..658aeb1b4d 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java @@ -20,7 +20,6 @@ package org.apache.brooklyn.core.workflow; import java.time.Instant; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -29,6 +28,9 @@ import java.util.Set; import java.util.function.BiFunction; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; + +import javax.annotation.Nullable; import com.google.common.annotations.Beta; import com.google.common.reflect.TypeToken; @@ -188,7 +190,7 @@ public class WorkflowExpressionResolution { } } catch (Throwable t) { Exceptions.propagateIfFatal(t); - if (stage==WorkflowExpressionStage.STEP_INPUT && WorkflowVariableResolutionStackEntry.isStackForSettingVariable(RESOLVE_STACK.getAll(true), key) && Exceptions.getFirstThrowableOfType(t, WorkflowVariableRecursiveReference.class)!=null) { + if (stage==WorkflowExpressionStage.STEP_INPUT && isSettingVariable(key) && Exceptions.getFirstThrowableOfType(t, WorkflowVariableRecursiveReference.class)!=null) { // input evaluation can look at local input, and will gracefully handle some recursive references. // this is needed so we can handle things like env:=${env} in input, and also {message:="Hi ${name}", name:="Bob"}. @@ -401,94 +403,119 @@ public class WorkflowExpressionResolution { /** does not use templates */ public <T> T resolveCoercingOnly(Object expression, TypeToken<T> type) { if (expression==null) return null; - boolean triedCoercion = false; - List<Exception> exceptions = MutableList.of(); - if (expression instanceof String) { - try { - // prefer simple coercion if it's a string coming in - return TypeCoercions.coerce(expression, type); - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - exceptions.add(e); - triedCoercion = true; + return inResolveStackEntry("resolve-coercing", expression, () -> { + boolean triedCoercion = false; + List<Exception> exceptions = MutableList.of(); + if (expression instanceof String) { + try { + // prefer simple coercion if it's a string coming in + return TypeCoercions.coerce(expression, type); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + exceptions.add(e); + triedCoercion = true; + } } - } - if (Jsonya.isJsonPrimitiveDeep(expression) && !(expression instanceof Set)) { - try { - // next try yaml coercion for anything complex, as values are normally set from yaml and will be raw at this stage (but not if they are from a DSL) - return BeanWithTypeUtils.convert(context.getManagementContext(), expression, type, true, - RegisteredTypes.getClassLoadingContext(context.getEntity()), true /* needed for wrapped resolved holders */); - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - exceptions.add(e); + if (Jsonya.isJsonPrimitiveDeep(expression) && !(expression instanceof Set)) { + try { + // next try yaml coercion for anything complex, as values are normally set from yaml and will be raw at this stage (but not if they are from a DSL) + return BeanWithTypeUtils.convert(context.getManagementContext(), expression, type, true, + RegisteredTypes.getClassLoadingContext(context.getEntity()), true /* needed for wrapped resolved holders */); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + exceptions.add(e); + } } - } - if (!triedCoercion) { - try { - // fallback to simple coercion - return TypeCoercions.coerce(expression, type); - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - exceptions.add(e); - triedCoercion = true; + if (!triedCoercion) { + try { + // fallback to simple coercion + return TypeCoercions.coerce(expression, type); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + exceptions.add(e); + triedCoercion = true; + } } - } - throw Exceptions.propagate(exceptions.iterator().next()); + throw Exceptions.propagate(exceptions.iterator().next()); + }); } - static class WorkflowVariableResolutionStackEntry { + public static class WorkflowResolutionStackEntry { + // resolver is null if caller has indicated evaluation before resolution + @Nullable WorkflowExpressionResolution resolver; WorkflowExecutionContext context; WorkflowExpressionStage stage; - Object object; + String callPointUid; + Object expression; String settingVariable; - public static WorkflowVariableResolutionStackEntry of(WorkflowExecutionContext context, WorkflowExpressionStage stage, Object expression) { - WorkflowVariableResolutionStackEntry result = new WorkflowVariableResolutionStackEntry(); + public static WorkflowResolutionStackEntry of(WorkflowExpressionResolution resolver, String callPointUid, Object expression) { + WorkflowResolutionStackEntry result = of(resolver == null ? null : resolver.context, resolver.stage, callPointUid, expression); + result.resolver = resolver; + return result; + } + public static WorkflowResolutionStackEntry of(WorkflowExecutionContext context, WorkflowExpressionStage stage, String callPointUid, Object expression) { + WorkflowResolutionStackEntry result = new WorkflowResolutionStackEntry(); result.context = context; + result.callPointUid = callPointUid; result.stage = stage; - result.object = expression; + result.expression = expression; return result; } - public static WorkflowVariableResolutionStackEntry setting(WorkflowExecutionContext context, WorkflowExpressionStage stage, String settingVariable) { - WorkflowVariableResolutionStackEntry result = new WorkflowVariableResolutionStackEntry(); - result.context = context; - result.stage = stage; + public static WorkflowResolutionStackEntry settingVariable(WorkflowExecutionContext context, WorkflowExpressionStage stage, String settingVariable) { + WorkflowResolutionStackEntry result = of(context, stage, "setting-variable", null); result.settingVariable = settingVariable; return result; } - public static boolean isStackForSettingVariable(Collection<WorkflowVariableResolutionStackEntry> stack, String key) { + public static boolean isStackForSettingVariable(Stream<WorkflowResolutionStackEntry> stack, String key) { if (stack==null) return true; - MutableList<WorkflowVariableResolutionStackEntry> s2 = MutableList.copyOf(stack); - Collections.reverse(s2); - Optional<WorkflowVariableResolutionStackEntry> s = s2.stream().filter(si -> si.settingVariable != null).findFirst(); + Optional<WorkflowResolutionStackEntry> s = stack.filter(si -> si.settingVariable != null).findFirst(); if (!s.isPresent()) return false; return s.get().settingVariable.equals(key); } + public String getWorkflowId() { + WorkflowExecutionContext ctx = getWorkflowExecutionContext(); + return ctx == null ? null : ctx.getWorkflowId(); + } + public WorkflowExpressionResolution getWorkflowExpressionResolution() { + return resolver; + } + public WorkflowExecutionContext getWorkflowExecutionContext() { + return context; + } + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - WorkflowVariableResolutionStackEntry that = (WorkflowVariableResolutionStackEntry) o; + WorkflowResolutionStackEntry that = (WorkflowResolutionStackEntry) o; - if (context != null && that.context != null ? !Objects.equals(context.getWorkflowId(), that.context.getWorkflowId()) : !Objects.equals(context, that.context)) return false; + String wid = getWorkflowId(); + String tid = that.getWorkflowId(); + // might have different contexts with same ID; but if ID not set for some reason then use context + boolean checkIdNotContext = wid!=null && tid!=null; + if (checkIdNotContext && tid!=null && !Objects.equals(wid, tid)) return false; + if (!checkIdNotContext && !Objects.equals(getWorkflowExecutionContext(), that.getWorkflowExecutionContext())) return false; if (stage != that.stage) return false; - if (object != null ? !object.equals(that.object) : that.object != null) return false; + if (!Objects.equals(callPointUid, that.callPointUid)) return false; + if (expression != null ? !expression.equals(that.expression) : that.expression != null) return false; if (settingVariable != null ? !settingVariable.equals(that.settingVariable) : that.settingVariable != null) return false; return true; } @Override public int hashCode() { - int result = context != null && context.getWorkflowId()!=null ? context.getWorkflowId().hashCode() : 0; + int result = getWorkflowId() != null ? getWorkflowId().hashCode() : 0; result = 31 * result + (stage != null ? stage.hashCode() : 0); - result = 31 * result + (object != null ? object.hashCode() : 0); + result = 31 * result + (callPointUid != null ? callPointUid.hashCode() : 0); + result = 31 * result + (expression != null ? expression.hashCode() : 0); result = 31 * result + (settingVariable != null ? settingVariable.hashCode() : 0); return result; } @@ -497,27 +524,39 @@ public class WorkflowExpressionResolution { /** method which can be used to indicate that a reference to the variable, if it is recursive, is recoverable, because we are in the process of setting that variable. * see discussion on usages of WorkflowVariableResolutionStackEntry.isStackForSettingVariable */ public static <T> T allowingRecursionWhenSetting(WorkflowExecutionContext context, WorkflowExpressionStage stage, String variable, Supplier<T> callable) { - WorkflowVariableResolutionStackEntry entry = null; - try { - entry = WorkflowVariableResolutionStackEntry.setting(context, stage, variable); - if (!RESOLVE_STACK.push(entry)) { - entry = null; - throw new WorkflowVariableRecursiveReference("Recursive or missing reference setting "+variable+": "+RESOLVE_STACK.getAll(false).stream().map(p -> p.object!=null ? p.object.toString() : p.settingVariable).collect(Collectors.joining("->"))); - } + return inResolveStackEntry(WorkflowResolutionStackEntry.settingVariable(context, stage, variable), () -> { + throw new WorkflowVariableRecursiveReference("Recursive or missing reference setting "+variable+": "+RESOLVE_STACK.stream().map(p -> p.expression !=null ? p.expression.toString() : p.settingVariable).filter(x -> x!=null).collect(Collectors.joining("->"))); + }, + callable); + } - return callable.get(); + static ThreadLocalStack<WorkflowResolutionStackEntry> RESOLVE_STACK = new ThreadLocalStack<>(false); + <T> T inResolveStackEntry(String callPointUid, Object expression, Supplier<T> code) { + return inResolveStackEntry(WorkflowResolutionStackEntry.of(this, callPointUid, expression), null, code); + } + static <T> T inResolveStackEntry(WorkflowResolutionStackEntry entry, Runnable errorIfDuplicate, Supplier<T> code) { + boolean added = RESOLVE_STACK.push(entry); + if (!added && errorIfDuplicate!=null) errorIfDuplicate.run(); + try { + return code.get(); + } catch (Exception e) { + throw Exceptions.propagate(e); } finally { - if (entry!=null) { - RESOLVE_STACK.pop(entry); - } + if (added) RESOLVE_STACK.pop(entry); } } - static ThreadLocalStack<WorkflowVariableResolutionStackEntry> RESOLVE_STACK = new ThreadLocalStack<>(false); - WorkflowExpressionStage previousStage() { - return RESOLVE_STACK.peekPenultimate().map(s -> s.stage).orNull(); + return RESOLVE_STACK.stream().skip(1).map(s -> s.stage).filter(s -> s!=null).findFirst().orElse(null); + } + + public static boolean isSettingVariable(String key) { + return WorkflowResolutionStackEntry.isStackForSettingVariable(RESOLVE_STACK.stream(), key); + } + + public static WorkflowExpressionResolution getCurrentWorkflowExpressionResolution() { + return RESOLVE_STACK.stream().map(WorkflowResolutionStackEntry::getWorkflowExpressionResolution).filter(x -> x!=null).findFirst().orElse(null); } public static class WorkflowVariableRecursiveReference extends IllegalArgumentException { @@ -547,50 +586,48 @@ public class WorkflowExpressionResolution { } public Object processTemplateExpression(Object expression, AllowBrooklynDslMode allowBrooklynDsl) { - WorkflowVariableResolutionStackEntry entry = null; - try { - entry = WorkflowVariableResolutionStackEntry.of(context, stage, expression); - if (!RESOLVE_STACK.push(entry)) { - entry = null; - throw new WorkflowVariableRecursiveReference("Recursive reference: " + RESOLVE_STACK.getAll(false).stream().map(p -> "" + p.object).collect(Collectors.joining("->"))); - } - if (RESOLVE_STACK.size() > 100) { - throw new WorkflowVariableRecursiveReference("Reference exceeded max depth 100: " + RESOLVE_STACK.getAll(false).stream().map(p -> "" + p.object).collect(Collectors.joining("->"))); - } + return inResolveStackEntry(WorkflowResolutionStackEntry.of(this, "process-template-expression", expression), () -> { + throw new WorkflowVariableRecursiveReference("Recursive reference: " + RESOLVE_STACK.getAll(false).stream().map(p -> "" + p.expression).collect(Collectors.joining("->"))); + }, () -> { + try { + if (RESOLVE_STACK.size() > 100) { + throw new WorkflowVariableRecursiveReference("Reference exceeded max depth 100: " + RESOLVE_STACK.getAll(false).stream().map(p -> "" + p.expression).collect(Collectors.joining("->"))); + } - Object result; - if (expression instanceof String) result = processTemplateExpressionString((String) expression, allowBrooklynDsl); - else if (expression instanceof Map) result = processTemplateExpressionMap((Map) expression, allowBrooklynDsl); - else if (expression instanceof Collection) - result = processTemplateExpressionCollection((Collection) expression, allowBrooklynDsl); - else if (expression == null || Boxing.isPrimitiveOrBoxedObject(expression)) result = expression; - else { - // otherwise resolve DSL - result = allowBrooklynDsl.isAllowedHere() ? resolveDsl(expression) : expression; - if (wrappingMode.wrapResolvedValues && !Objects.equals(result, expression) && !(result instanceof DeferredSupplier)) { - result = WrappedResolvedExpression.ifNonDeferred(expression, result); + Object result; + if (expression instanceof String) + result = processTemplateExpressionString((String) expression, allowBrooklynDsl); + else if (expression instanceof Map) + result = processTemplateExpressionMap((Map) expression, allowBrooklynDsl); + else if (expression instanceof Collection) + result = processTemplateExpressionCollection((Collection) expression, allowBrooklynDsl); + else if (expression == null || Boxing.isPrimitiveOrBoxedObject(expression)) result = expression; + else { + // otherwise resolve DSL + result = allowBrooklynDsl.isAllowedHere() ? resolveDsl(expression) : expression; + if (wrappingMode.wrapResolvedValues && !Objects.equals(result, expression) && !(result instanceof DeferredSupplier)) { + result = WrappedResolvedExpression.ifNonDeferred(expression, result); + } } - } - return result; + return result; - } catch (Exception e) { - Exception e2 = e; - if (wrappingMode.deferAndRetryErroneousExpressions) { - return WrappedUnresolvedExpression.ofExpression(expression, this, allowBrooklynDsl); - } - if (!allowWaiting && Exceptions.isCausedByInterruptInAnyThread(e)) { - e2 = new IllegalArgumentException("Expression value '"+expression+"' unavailable and not permitted to wait: "+ Exceptions.collapseText(e), e); - } - if (wrappingMode.deferThrowingError) { - // in wrapped value mode, errors don't throw until accessed, and when used in conditions they can be tested as absent - return WrappedResolvedExpression.ofError(expression, new ResolutionFailureTreatedAsAbsent.ResolutionFailureTreatedAsAbsentDefaultException(e2)); - } else { - throw Exceptions.propagate(e2); + } catch (Exception e) { + Exception e2 = e; + if (wrappingMode.deferAndRetryErroneousExpressions) { + return WrappedUnresolvedExpression.ofExpression(expression, this, allowBrooklynDsl); + } + if (!allowWaiting && Exceptions.isCausedByInterruptInAnyThread(e)) { + e2 = new IllegalArgumentException("Expression value '" + expression + "' unavailable and not permitted to wait: " + Exceptions.collapseText(e), e); + } + if (wrappingMode.deferThrowingError) { + // in wrapped value mode, errors don't throw until accessed, and when used in conditions they can be tested as absent + return WrappedResolvedExpression.ofError(expression, new ResolutionFailureTreatedAsAbsent.ResolutionFailureTreatedAsAbsentDefaultException(e2)); + } else { + throw Exceptions.propagate(e2); + } } - } finally { - if (entry != null) RESOLVE_STACK.pop(entry); - } + }); } private Object resolveDsl(Object expression) { @@ -631,7 +668,7 @@ public class WorkflowExpressionResolution { return errorMode; } - public Object processTemplateExpressionString(String expression, AllowBrooklynDslMode allowBrooklynDsl) { + protected Object processTemplateExpressionString(String expression, AllowBrooklynDslMode allowBrooklynDsl) { Object result; boolean ourWait = false; try { @@ -698,7 +735,7 @@ public class WorkflowExpressionResolution { interruptSetIfNeededToPreventWaiting.remove(); } - public Object processTemplateExpressionMap(Map<?,?> object, AllowBrooklynDslMode allowBrooklynDsl) { + protected Object processTemplateExpressionMap(Map<?,?> object, AllowBrooklynDslMode allowBrooklynDsl) { if (allowBrooklynDsl.isAllowedHere() && object.size()==1) { Object key = object.keySet().iterator().next(); if (key instanceof String && ((String)key).startsWith("$brooklyn:")) { diff --git a/core/src/main/java/org/apache/brooklyn/util/core/text/TemplateProcessor.java b/core/src/main/java/org/apache/brooklyn/util/core/text/TemplateProcessor.java index 8a51d7c1b4..af00cbce16 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/text/TemplateProcessor.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/text/TemplateProcessor.java @@ -86,7 +86,7 @@ public class TemplateProcessor { public static TemplateModel wrapAsTemplateModel(Object o) throws TemplateModelException { return BROOKLYN_WRAPPER.wrap(o); } public static Maybe<Object> unwrapTemplateModelMaybe(TemplateModel templateModel) { return BROOKLYN_WRAPPER.unwrapMaybe(templateModel); } - static ThreadLocalStack<Map<TemplateModel,Object>> TEMPLATE_MODEL_UNWRAP_CACHE = new ThreadLocalStack<>(true); + static ThreadLocalStack<Map<TemplateModel,Object>> TEMPLATE_MODEL_UNWRAP_CACHE = new ThreadLocalStack<>(); /** A cache is used to be able to retrieve the object from which a TemplateModel was created, if needed, * because Freemarker doesn't support that except on selected UnwrappableTemplateModel subclasses. * Use wrap and unwrap methods above to access. @@ -94,8 +94,8 @@ public class TemplateProcessor { public static void openLocalTemplateModelCache() { TEMPLATE_MODEL_UNWRAP_CACHE.push(MutableMap.of()); } public static void closeLocalTemplateModelCache() { TEMPLATE_MODEL_UNWRAP_CACHE.pop(); } - static ThreadLocalStack<String> TEMPLATE_FILE_WANTING_LEGACY_SYNTAX = new ThreadLocalStack<>(true); - static ThreadLocalStack<Boolean> IS_FOR_WORKFLOW = new ThreadLocalStack<>(true); + static ThreadLocalStack<String> TEMPLATE_FILE_WANTING_LEGACY_SYNTAX = new ThreadLocalStack<>(); + static ThreadLocalStack<Boolean> IS_FOR_WORKFLOW = new ThreadLocalStack<>(); public interface UnwrappableTemplateModel { Maybe<Object> unwrap(); diff --git a/utils/common/src/main/java/org/apache/brooklyn/util/collections/ThreadLocalStack.java b/utils/common/src/main/java/org/apache/brooklyn/util/collections/ThreadLocalStack.java index d225e1770f..f9a3f90479 100644 --- a/utils/common/src/main/java/org/apache/brooklyn/util/collections/ThreadLocalStack.java +++ b/utils/common/src/main/java/org/apache/brooklyn/util/collections/ThreadLocalStack.java @@ -22,23 +22,27 @@ import com.google.common.collect.Iterables; import org.apache.brooklyn.util.guava.Maybe; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.Objects; +import java.util.stream.Stream; public class ThreadLocalStack<T> implements Iterable<T> { - private final boolean allowDuplicates; + private final boolean acceptDuplicates; - public ThreadLocalStack(boolean allowsDuplicates) { - this.allowDuplicates = allowsDuplicates; + /** if duplicates not accepted, the call to push will return false */ + public ThreadLocalStack(boolean acceptDuplicates) { + this.acceptDuplicates = acceptDuplicates; } + public ThreadLocalStack() { this.acceptDuplicates = true; } final ThreadLocal<Collection<T>> set = new ThreadLocal<>(); public Collection<T> getAll(boolean forceInitialized) { Collection<T> result = set.get(); if (forceInitialized && result==null) { - result = allowDuplicates ? MutableList.of() : MutableSet.of(); + result = acceptDuplicates ? MutableList.of() : MutableSet.of(); set.set(result); } return result; @@ -52,13 +56,22 @@ public class ThreadLocalStack<T> implements Iterable<T> { return last; } + /** returns true unless duplicates are not accepted, in which case it returns false iff the object supplied is equal to one already present */ public boolean push(T object) { return getAll(true).add(object); } + /** top of stack first */ @Override public Iterator<T> iterator() { - return null; + return stream().iterator(); + } + + /** top of stack first */ + public Stream<T> stream() { + MutableList<T> l = MutableList.copyOf(getAll(false)); + Collections.reverse(l); + return l.stream(); } public Maybe<T> peek() {