This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
The following commit(s) were added to refs/heads/master by this push: new 2327001553 allow checking call stacks across tasks 2327001553 is described below commit 2327001553fef6e61a5c3a2243d106bb50a08820 Author: Alex Heneveld <a...@cloudsoft.io> AuthorDate: Wed Apr 10 16:03:29 2024 +0100 allow checking call stacks across tasks so recursive checks between threads are found. and improve error messages. --- .../workflow/WorkflowExpressionResolution.java | 8 +-- .../util/core/task/BasicExecutionContext.java | 4 +- .../util/core/task/CrossTaskThreadLocalStack.java | 71 ++++++++++++++++++++++ .../brooklyn/util/core/task/ValueResolver.java | 2 +- .../brooklyn/util/core/text/TemplateProcessor.java | 3 +- .../util/collections/ThreadLocalStack.java | 56 ++++++++++------- 6 files changed, 116 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java index 658aeb1b4d..9831f2dccb 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java @@ -48,10 +48,10 @@ import org.apache.brooklyn.core.typereg.RegisteredTypes; import org.apache.brooklyn.util.collections.Jsonya; import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.collections.MutableMap; -import org.apache.brooklyn.util.collections.ThreadLocalStack; import org.apache.brooklyn.util.core.flags.TypeCoercions; import org.apache.brooklyn.util.core.predicates.ResolutionFailureTreatedAsAbsent; import org.apache.brooklyn.util.core.task.DeferredSupplier; +import org.apache.brooklyn.util.core.task.CrossTaskThreadLocalStack; import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.core.text.TemplateProcessor; import org.apache.brooklyn.util.exceptions.Exceptions; @@ -530,7 +530,7 @@ public class WorkflowExpressionResolution { callable); } - static ThreadLocalStack<WorkflowResolutionStackEntry> RESOLVE_STACK = new ThreadLocalStack<>(false); + static CrossTaskThreadLocalStack<WorkflowResolutionStackEntry> RESOLVE_STACK = new CrossTaskThreadLocalStack<>(false); <T> T inResolveStackEntry(String callPointUid, Object expression, Supplier<T> code) { return inResolveStackEntry(WorkflowResolutionStackEntry.of(this, callPointUid, expression), null, code); @@ -587,11 +587,11 @@ public class WorkflowExpressionResolution { public Object processTemplateExpression(Object expression, AllowBrooklynDslMode allowBrooklynDsl) { return inResolveStackEntry(WorkflowResolutionStackEntry.of(this, "process-template-expression", expression), () -> { - throw new WorkflowVariableRecursiveReference("Recursive reference: " + RESOLVE_STACK.getAll(false).stream().map(p -> "" + p.expression).collect(Collectors.joining("->"))); + throw new WorkflowVariableRecursiveReference("Recursive reference: " + RESOLVE_STACK.stream().map(p -> "" + p.expression).collect(Collectors.joining("->"))); }, () -> { try { if (RESOLVE_STACK.size() > 100) { - throw new WorkflowVariableRecursiveReference("Reference exceeded max depth 100: " + RESOLVE_STACK.getAll(false).stream().map(p -> "" + p.expression).collect(Collectors.joining("->"))); + throw new WorkflowVariableRecursiveReference("Reference exceeded max depth 100: " + RESOLVE_STACK.stream().map(p -> "" + p.expression).collect(Collectors.joining("->"))); } Object result; diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java index b0757bacc8..fcbaf627ab 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java @@ -36,6 +36,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import com.google.common.annotations.Beta; import com.google.common.base.Function; @@ -45,6 +46,7 @@ import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.mgmt.ExecutionContext; import org.apache.brooklyn.api.mgmt.ExecutionManager; import org.apache.brooklyn.api.mgmt.HasTaskChildren; +import org.apache.brooklyn.api.mgmt.ManagementContext; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.api.mgmt.TaskAdaptable; import org.apache.brooklyn.api.mgmt.entitlement.EntitlementContext; @@ -589,6 +591,6 @@ public class BasicExecutionContext extends AbstractExecutionContext { @Override public String toString() { - return super.toString()+"("+tags+")"; + return getClass().getSimpleName()+tags.stream().filter(t -> !(t instanceof ManagementContext)).collect(Collectors.toList()); } } diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/CrossTaskThreadLocalStack.java b/core/src/main/java/org/apache/brooklyn/util/core/task/CrossTaskThreadLocalStack.java new file mode 100644 index 0000000000..8421395d73 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/CrossTaskThreadLocalStack.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.util.core.task; + +import java.util.Collection; +import java.util.WeakHashMap; +import java.util.stream.Stream; + +import com.google.common.collect.Streams; +import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.util.collections.ThreadLocalStack; + +public class CrossTaskThreadLocalStack<T> extends ThreadLocalStack<T> { + + public CrossTaskThreadLocalStack(boolean acceptDuplicates) { + super(acceptDuplicates); + } + public CrossTaskThreadLocalStack() { super(); } + + // override since we cannot access another thread's thread local + final WeakHashMap<Thread,Collection<T>> backingOverride = new WeakHashMap<>(); + + @Override protected Collection<T> get() { + return get(Thread.currentThread()); + } + protected Collection<T> get(Thread t) { + synchronized (backingOverride) { return backingOverride.get(t); } + } + @Override protected void set(Collection<T> value) { + synchronized (backingOverride) { backingOverride.put(Thread.currentThread(), value); } + } + @Override protected void remove() { + synchronized (backingOverride) { backingOverride.remove(Thread.currentThread()); } + } + @Override protected Collection<T> getCopyReversed() { + return getCopyReversed(Thread.currentThread()); + } + protected Collection<T> getCopyReversed(Thread t) { + synchronized (backingOverride) { return copyReversed(get(t)); } + } + + public Stream<T> stream() { + return concatSubmitterTaskThreadStacks(getCopyReversed().stream(), Tasks.current()); + } + + protected Stream<T> concatSubmitterTaskThreadStacks(Stream<T> stream, Task current) { + if (current==null) return stream; + Task submitter = current.getSubmittedByTask(); + if (submitter==null) return stream; + Collection<T> ss = getCopyReversed(submitter.getThread()); + if (ss!=null && !ss.isEmpty()) stream = Streams.concat(stream, ss.stream()); + return concatSubmitterTaskThreadStacks(stream, submitter); + } + +} diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java index 03e9ede5cb..446f8d70b6 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java @@ -688,7 +688,7 @@ public class ValueResolver<T> implements DeferredSupplier<T>, Iterable<Maybe<Obj } catch (Exception e) { Exceptions.propagateIfFatal(e); - String msg = "Error resolving "+(description!=null ? description+", " : "")+v+", in "+exec; + String msg = (description!=null ? "Error in resolution: "+description+"," : "Error resolving value") + " at "+exec; String eTxt = Exceptions.collapseText(e); IllegalArgumentException problem = eTxt.startsWith(msg) ? new IllegalArgumentException(e) : new IllegalArgumentException(msg+": "+eTxt, e); if (swallowExceptions) { diff --git a/core/src/main/java/org/apache/brooklyn/util/core/text/TemplateProcessor.java b/core/src/main/java/org/apache/brooklyn/util/core/text/TemplateProcessor.java index af00cbce16..3777c322f4 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/text/TemplateProcessor.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/text/TemplateProcessor.java @@ -49,6 +49,7 @@ import org.apache.brooklyn.core.workflow.WorkflowExpressionResolution; import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.collections.ThreadLocalStack; +import org.apache.brooklyn.util.core.task.CrossTaskThreadLocalStack; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException; import org.apache.brooklyn.util.guava.Maybe; @@ -95,7 +96,7 @@ public class TemplateProcessor { public static void closeLocalTemplateModelCache() { TEMPLATE_MODEL_UNWRAP_CACHE.pop(); } static ThreadLocalStack<String> TEMPLATE_FILE_WANTING_LEGACY_SYNTAX = new ThreadLocalStack<>(); - static ThreadLocalStack<Boolean> IS_FOR_WORKFLOW = new ThreadLocalStack<>(); + static CrossTaskThreadLocalStack<Boolean> IS_FOR_WORKFLOW = new CrossTaskThreadLocalStack<>(); public interface UnwrappableTemplateModel { Maybe<Object> unwrap(); diff --git a/utils/common/src/main/java/org/apache/brooklyn/util/collections/ThreadLocalStack.java b/utils/common/src/main/java/org/apache/brooklyn/util/collections/ThreadLocalStack.java index f9a3f90479..4170dbcf8d 100644 --- a/utils/common/src/main/java/org/apache/brooklyn/util/collections/ThreadLocalStack.java +++ b/utils/common/src/main/java/org/apache/brooklyn/util/collections/ThreadLocalStack.java @@ -24,6 +24,7 @@ import org.apache.brooklyn.util.guava.Maybe; import java.util.Collection; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Objects; import java.util.stream.Stream; @@ -37,28 +38,38 @@ public class ThreadLocalStack<T> implements Iterable<T> { } public ThreadLocalStack() { this.acceptDuplicates = true; } - final ThreadLocal<Collection<T>> set = new ThreadLocal<>(); + protected final ThreadLocal<Collection<T>> backing = new ThreadLocal<>(); - public Collection<T> getAll(boolean forceInitialized) { - Collection<T> result = set.get(); - if (forceInitialized && result==null) { + protected Collection<T> get() { + return backing.get(); + } + protected void set(Collection<T> value) { + backing.set(value); + } + protected void remove() { + backing.remove(); + } + + protected Collection<T> upsert() { + Collection<T> result = get(); + if (result==null) { result = acceptDuplicates ? MutableList.of() : MutableSet.of(); - set.set(result); + set(result); } return result; } public T pop() { - Collection<T> resultS = getAll(true); + Collection<T> resultS = upsert(); T last = Iterables.getLast(resultS); resultS.remove(last); - if (resultS.isEmpty()) set.remove(); + if (resultS.isEmpty()) remove(); return last; } /** returns true unless duplicates are not accepted, in which case it returns false iff the object supplied is equal to one already present */ public boolean push(T object) { - return getAll(true).add(object); + return upsert().add(object); } /** top of stack first */ @@ -69,23 +80,28 @@ public class ThreadLocalStack<T> implements Iterable<T> { /** top of stack first */ public Stream<T> stream() { - MutableList<T> l = MutableList.copyOf(getAll(false)); + return getCopyReversed().stream(); + } + protected Collection<T> getCopyReversed() { + return copyReversed(get()); + } + protected Collection<T> copyReversed(Collection<T> c1) { + List<T> l = MutableList.copyOf(c1); Collections.reverse(l); - return l.stream(); + return l; } public Maybe<T> peek() { - Collection<T> resultS = getAll(false); - if (resultS==null || resultS.isEmpty()) return Maybe.absent("Nothing in local stack"); - return Maybe.of( Iterables.getLast(resultS) ); + Iterator<T> si = stream().iterator(); + if (!si.hasNext()) return Maybe.absent("Nothing in local stack"); + return Maybe.of( si.next() ); } public Maybe<T> peekPenultimate() { - Collection<T> resultS = getAll(false); - if (resultS==null) return Maybe.absent(); - int size = resultS.size(); - if (size<=1) return Maybe.absent(); - return Maybe.of( Iterables.get(resultS, size-2) ); + Iterator<T> si = stream().iterator(); + if (si.hasNext()) si.next(); + if (!si.hasNext()) return Maybe.absent(); + return Maybe.of( si.next() ); } public void pop(T entry) { @@ -96,8 +112,6 @@ public class ThreadLocalStack<T> implements Iterable<T> { } public int size() { - Collection<T> v = getAll(false); - if (v==null) return 0; - return v.size(); + return (int) stream().count(); } }