This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git


The following commit(s) were added to refs/heads/master by this push:
     new 2327001553 allow checking call stacks across tasks
2327001553 is described below

commit 2327001553fef6e61a5c3a2243d106bb50a08820
Author: Alex Heneveld <a...@cloudsoft.io>
AuthorDate: Wed Apr 10 16:03:29 2024 +0100

    allow checking call stacks across tasks
    
    so recursive checks between threads are found.
    and improve error messages.
---
 .../workflow/WorkflowExpressionResolution.java     |  8 +--
 .../util/core/task/BasicExecutionContext.java      |  4 +-
 .../util/core/task/CrossTaskThreadLocalStack.java  | 71 ++++++++++++++++++++++
 .../brooklyn/util/core/task/ValueResolver.java     |  2 +-
 .../brooklyn/util/core/text/TemplateProcessor.java |  3 +-
 .../util/collections/ThreadLocalStack.java         | 56 ++++++++++-------
 6 files changed, 116 insertions(+), 28 deletions(-)

diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
index 658aeb1b4d..9831f2dccb 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
@@ -48,10 +48,10 @@ import org.apache.brooklyn.core.typereg.RegisteredTypes;
 import org.apache.brooklyn.util.collections.Jsonya;
 import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.collections.ThreadLocalStack;
 import org.apache.brooklyn.util.core.flags.TypeCoercions;
 import 
org.apache.brooklyn.util.core.predicates.ResolutionFailureTreatedAsAbsent;
 import org.apache.brooklyn.util.core.task.DeferredSupplier;
+import org.apache.brooklyn.util.core.task.CrossTaskThreadLocalStack;
 import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.core.text.TemplateProcessor;
 import org.apache.brooklyn.util.exceptions.Exceptions;
@@ -530,7 +530,7 @@ public class WorkflowExpressionResolution {
             callable);
     }
 
-    static ThreadLocalStack<WorkflowResolutionStackEntry> RESOLVE_STACK = new 
ThreadLocalStack<>(false);
+    static CrossTaskThreadLocalStack<WorkflowResolutionStackEntry> 
RESOLVE_STACK = new CrossTaskThreadLocalStack<>(false);
 
     <T> T inResolveStackEntry(String callPointUid, Object expression, 
Supplier<T> code) {
         return inResolveStackEntry(WorkflowResolutionStackEntry.of(this, 
callPointUid, expression), null, code);
@@ -587,11 +587,11 @@ public class WorkflowExpressionResolution {
 
     public Object processTemplateExpression(Object expression, 
AllowBrooklynDslMode allowBrooklynDsl) {
         return inResolveStackEntry(WorkflowResolutionStackEntry.of(this, 
"process-template-expression", expression), () -> {
-            throw new WorkflowVariableRecursiveReference("Recursive reference: 
" + RESOLVE_STACK.getAll(false).stream().map(p -> "" + 
p.expression).collect(Collectors.joining("->")));
+            throw new WorkflowVariableRecursiveReference("Recursive reference: 
" + RESOLVE_STACK.stream().map(p -> "" + 
p.expression).collect(Collectors.joining("->")));
         }, () -> {
             try {
                 if (RESOLVE_STACK.size() > 100) {
-                    throw new WorkflowVariableRecursiveReference("Reference 
exceeded max depth 100: " + RESOLVE_STACK.getAll(false).stream().map(p -> "" + 
p.expression).collect(Collectors.joining("->")));
+                    throw new WorkflowVariableRecursiveReference("Reference 
exceeded max depth 100: " + RESOLVE_STACK.stream().map(p -> "" + 
p.expression).collect(Collectors.joining("->")));
                 }
 
                 Object result;
diff --git 
a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
 
b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
index b0757bacc8..fcbaf627ab 100644
--- 
a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
+++ 
b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
@@ -36,6 +36,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 import com.google.common.annotations.Beta;
 import com.google.common.base.Function;
@@ -45,6 +46,7 @@ import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.mgmt.ExecutionContext;
 import org.apache.brooklyn.api.mgmt.ExecutionManager;
 import org.apache.brooklyn.api.mgmt.HasTaskChildren;
+import org.apache.brooklyn.api.mgmt.ManagementContext;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.mgmt.TaskAdaptable;
 import org.apache.brooklyn.api.mgmt.entitlement.EntitlementContext;
@@ -589,6 +591,6 @@ public class BasicExecutionContext extends 
AbstractExecutionContext {
 
     @Override
     public String toString() {
-        return super.toString()+"("+tags+")";
+        return getClass().getSimpleName()+tags.stream().filter(t -> !(t 
instanceof ManagementContext)).collect(Collectors.toList());
     }
 }
diff --git 
a/core/src/main/java/org/apache/brooklyn/util/core/task/CrossTaskThreadLocalStack.java
 
b/core/src/main/java/org/apache/brooklyn/util/core/task/CrossTaskThreadLocalStack.java
new file mode 100644
index 0000000000..8421395d73
--- /dev/null
+++ 
b/core/src/main/java/org/apache/brooklyn/util/core/task/CrossTaskThreadLocalStack.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.util.core.task;
+
+import java.util.Collection;
+import java.util.WeakHashMap;
+import java.util.stream.Stream;
+
+import com.google.common.collect.Streams;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.util.collections.ThreadLocalStack;
+
+public class CrossTaskThreadLocalStack<T> extends ThreadLocalStack<T> {
+
+    public CrossTaskThreadLocalStack(boolean acceptDuplicates) {
+        super(acceptDuplicates);
+    }
+    public CrossTaskThreadLocalStack() { super(); }
+
+    // override since we cannot access another thread's thread local
+    final WeakHashMap<Thread,Collection<T>> backingOverride = new 
WeakHashMap<>();
+
+    @Override protected Collection<T> get() {
+        return get(Thread.currentThread());
+    }
+    protected Collection<T> get(Thread t) {
+        synchronized (backingOverride) { return backingOverride.get(t); }
+    }
+    @Override protected void set(Collection<T> value) {
+        synchronized (backingOverride) { 
backingOverride.put(Thread.currentThread(), value); }
+    }
+    @Override protected void remove() {
+        synchronized (backingOverride) { 
backingOverride.remove(Thread.currentThread()); }
+    }
+    @Override protected Collection<T> getCopyReversed() {
+        return getCopyReversed(Thread.currentThread());
+    }
+    protected Collection<T> getCopyReversed(Thread t) {
+        synchronized (backingOverride) { return copyReversed(get(t)); }
+    }
+
+    public Stream<T> stream() {
+        return concatSubmitterTaskThreadStacks(getCopyReversed().stream(), 
Tasks.current());
+    }
+
+    protected Stream<T> concatSubmitterTaskThreadStacks(Stream<T> stream, Task 
current) {
+        if (current==null) return stream;
+        Task submitter = current.getSubmittedByTask();
+        if (submitter==null) return stream;
+        Collection<T> ss = getCopyReversed(submitter.getThread());
+        if (ss!=null && !ss.isEmpty()) stream = Streams.concat(stream, 
ss.stream());
+        return concatSubmitterTaskThreadStacks(stream, submitter);
+    }
+
+}
diff --git 
a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java 
b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
index 03e9ede5cb..446f8d70b6 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
@@ -688,7 +688,7 @@ public class ValueResolver<T> implements 
DeferredSupplier<T>, Iterable<Maybe<Obj
         } catch (Exception e) {
             Exceptions.propagateIfFatal(e);
 
-            String msg = "Error resolving "+(description!=null ? 
description+", " : "")+v+", in "+exec;
+            String msg = (description!=null ? "Error in resolution: 
"+description+"," : "Error resolving value") + " at "+exec;
             String eTxt = Exceptions.collapseText(e);
             IllegalArgumentException problem = eTxt.startsWith(msg) ? new 
IllegalArgumentException(e) : new IllegalArgumentException(msg+": "+eTxt, e);
             if (swallowExceptions) {
diff --git 
a/core/src/main/java/org/apache/brooklyn/util/core/text/TemplateProcessor.java 
b/core/src/main/java/org/apache/brooklyn/util/core/text/TemplateProcessor.java
index af00cbce16..3777c322f4 100644
--- 
a/core/src/main/java/org/apache/brooklyn/util/core/text/TemplateProcessor.java
+++ 
b/core/src/main/java/org/apache/brooklyn/util/core/text/TemplateProcessor.java
@@ -49,6 +49,7 @@ import 
org.apache.brooklyn.core.workflow.WorkflowExpressionResolution;
 import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.collections.ThreadLocalStack;
+import org.apache.brooklyn.util.core.task.CrossTaskThreadLocalStack;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
 import org.apache.brooklyn.util.guava.Maybe;
@@ -95,7 +96,7 @@ public class TemplateProcessor {
     public static void closeLocalTemplateModelCache() { 
TEMPLATE_MODEL_UNWRAP_CACHE.pop(); }
 
     static ThreadLocalStack<String> TEMPLATE_FILE_WANTING_LEGACY_SYNTAX = new 
ThreadLocalStack<>();
-    static ThreadLocalStack<Boolean> IS_FOR_WORKFLOW = new 
ThreadLocalStack<>();
+    static CrossTaskThreadLocalStack<Boolean> IS_FOR_WORKFLOW = new 
CrossTaskThreadLocalStack<>();
 
     public interface UnwrappableTemplateModel {
         Maybe<Object> unwrap();
diff --git 
a/utils/common/src/main/java/org/apache/brooklyn/util/collections/ThreadLocalStack.java
 
b/utils/common/src/main/java/org/apache/brooklyn/util/collections/ThreadLocalStack.java
index f9a3f90479..4170dbcf8d 100644
--- 
a/utils/common/src/main/java/org/apache/brooklyn/util/collections/ThreadLocalStack.java
+++ 
b/utils/common/src/main/java/org/apache/brooklyn/util/collections/ThreadLocalStack.java
@@ -24,6 +24,7 @@ import org.apache.brooklyn.util.guava.Maybe;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Objects;
 import java.util.stream.Stream;
 
@@ -37,28 +38,38 @@ public class ThreadLocalStack<T> implements Iterable<T> {
     }
     public ThreadLocalStack() { this.acceptDuplicates = true; }
 
-    final ThreadLocal<Collection<T>> set = new ThreadLocal<>();
+    protected final ThreadLocal<Collection<T>> backing = new ThreadLocal<>();
 
-    public Collection<T> getAll(boolean forceInitialized) {
-        Collection<T> result = set.get();
-        if (forceInitialized && result==null) {
+    protected Collection<T> get() {
+        return backing.get();
+    }
+    protected void set(Collection<T> value) {
+        backing.set(value);
+    }
+    protected void remove() {
+        backing.remove();
+    }
+
+    protected Collection<T> upsert() {
+        Collection<T> result = get();
+        if (result==null) {
             result = acceptDuplicates ? MutableList.of() : MutableSet.of();
-            set.set(result);
+            set(result);
         }
         return result;
     }
 
     public T pop() {
-        Collection<T> resultS = getAll(true);
+        Collection<T> resultS = upsert();
         T last = Iterables.getLast(resultS);
         resultS.remove(last);
-        if (resultS.isEmpty()) set.remove();
+        if (resultS.isEmpty()) remove();
         return last;
     }
 
     /** returns true unless duplicates are not accepted, in which case it 
returns false iff the object supplied is equal to one already present */
     public boolean push(T object) {
-        return getAll(true).add(object);
+        return upsert().add(object);
     }
 
     /** top of stack first */
@@ -69,23 +80,28 @@ public class ThreadLocalStack<T> implements Iterable<T> {
 
     /** top of stack first */
     public Stream<T> stream() {
-        MutableList<T> l = MutableList.copyOf(getAll(false));
+        return getCopyReversed().stream();
+    }
+    protected Collection<T> getCopyReversed() {
+        return copyReversed(get());
+    }
+    protected Collection<T> copyReversed(Collection<T> c1) {
+        List<T> l = MutableList.copyOf(c1);
         Collections.reverse(l);
-        return l.stream();
+        return l;
     }
 
     public Maybe<T> peek() {
-        Collection<T> resultS = getAll(false);
-        if (resultS==null || resultS.isEmpty()) return Maybe.absent("Nothing 
in local stack");
-        return Maybe.of( Iterables.getLast(resultS) );
+        Iterator<T> si = stream().iterator();
+        if (!si.hasNext()) return Maybe.absent("Nothing in local stack");
+        return Maybe.of( si.next() );
     }
 
     public Maybe<T> peekPenultimate() {
-        Collection<T> resultS = getAll(false);
-        if (resultS==null) return Maybe.absent();
-        int size = resultS.size();
-        if (size<=1) return Maybe.absent();
-        return Maybe.of( Iterables.get(resultS, size-2) );
+        Iterator<T> si = stream().iterator();
+        if (si.hasNext()) si.next();
+        if (!si.hasNext()) return Maybe.absent();
+        return Maybe.of( si.next() );
     }
 
     public void pop(T entry) {
@@ -96,8 +112,6 @@ public class ThreadLocalStack<T> implements Iterable<T> {
     }
 
     public int size() {
-        Collection<T> v = getAll(false);
-        if (v==null) return 0;
-        return v.size();
+        return (int) stream().count();
     }
 }

Reply via email to