This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit d369f4bc38e416e5368db440a03abdea7c469feb
Author: Alex Heneveld <a...@cloudsoft.io>
AuthorDate: Mon Feb 12 00:42:22 2024 +0000

    add a subworkflow step which shares vars with parents
    
    and stricter+better error messages if reducing, targets etc used on custom 
workflow steps where they shouldn't be
---
 .../core/workflow/WorkflowExecutionContext.java    |  20 ++-
 .../workflow/WorkflowExpressionResolution.java     |   5 +-
 .../WorkflowStepInstanceExecutionContext.java      |   9 ++
 .../core/workflow/steps/CustomWorkflowStep.java    |  89 +++++++++---
 .../workflow/steps/flow/ForeachWorkflowStep.java   |   4 +-
 .../core/workflow/steps/flow/SubWorkflowStep.java  |  96 +++++++++++++
 .../brooklyn/core/workflow/WorkflowBasicTest.java  |   2 +
 .../WorkflowNestedAndCustomExtensionTest.java      |  24 ++--
 .../WorkflowSubAndCustomExtensionEdgeTest.java     | 160 +++++++++++++++++++++
 karaf/init/src/main/resources/catalog.bom          |   5 +
 .../java/org/apache/brooklyn/test/Asserts.java     |  29 +++-
 11 files changed, 404 insertions(+), 39 deletions(-)

diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
index 87ea371b61..f154d92df3 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
@@ -148,6 +148,24 @@ public class WorkflowExecutionContext {
     @JsonIgnore  // persist as sensor but not via REST in case it has secrets 
resolved
     Map<String,Object> inputResolved = MutableMap.of();
 
+    public boolean hasInput(ConfigKey<?> key) {
+        return hasInput(key.getName());
+    }
+    public boolean hasInput(String key) {
+        return input.containsKey(key);
+    }
+    @JsonIgnore
+    public Map<String, Object> getAllInput() {
+        return input;
+    }
+    @JsonIgnore
+    public Map<String, Object> getAllInputResolved() {
+        return inputResolved;
+    }
+    public void noteInputResolved(String k, Object v) {
+        inputResolved.put(k, v);
+    }
+
     Object outputDefinition;
     /** final output of the workflow, set at end */
     Object output;
@@ -657,7 +675,7 @@ public class WorkflowExecutionContext {
         if (vm.isPresent()) {
             if (WorkflowStepInstanceExecutionContext.REMEMBER_RESOLVED_INPUT) {
                 // this will keep spending time resolving, but will resolve 
the resolved value
-                inputResolved.put(key, vm.get());
+                noteInputResolved(key, vm.get());
             }
         }
         return vm;
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
index dd1603880b..ab4836fc56 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
@@ -20,6 +20,7 @@ package org.apache.brooklyn.core.workflow;
 
 import com.google.common.annotations.Beta;
 import com.google.common.reflect.TypeToken;
+import freemarker.core.InvalidReferenceException;
 import freemarker.template.TemplateHashModel;
 import freemarker.template.TemplateModel;
 import freemarker.template.TemplateModelException;
@@ -216,8 +217,6 @@ public class WorkflowExpressionResolution {
                         // the main exception, handled here, is if we are 
setting an input
                         candidate = null;
                         errors.add(t);
-                    } else {
-                        throw Exceptions.propagate(t);
                     }
                 }
                 if (candidate != null) return 
TemplateProcessor.wrapAsTemplateModel(candidate);
@@ -495,7 +494,7 @@ public class WorkflowExpressionResolution {
             entry = WorkflowVariableResolutionStackEntry.setting(context, 
stage, variable);
             if (!RESOLVE_STACK.push(entry)) {
                 entry = null;
-                throw new WorkflowVariableRecursiveReference("Recursive 
reference setting "+variable+": "+RESOLVE_STACK.getAll(false).stream().map(p -> 
p.object!=null ? p.object.toString() : 
p.settingVariable).collect(Collectors.joining("->")));
+                throw new WorkflowVariableRecursiveReference("Recursive or 
missing reference setting "+variable+": 
"+RESOLVE_STACK.getAll(false).stream().map(p -> p.object!=null ? 
p.object.toString() : p.settingVariable).collect(Collectors.joining("->")));
             }
 
             return callable.get();
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
index 704904f692..db8543f361 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
@@ -69,6 +69,15 @@ public class WorkflowStepInstanceExecutionContext {
     @JsonIgnore  // persist as sensor but not via REST in case it has secrets 
resolved
     Map<String,Object> inputResolved = MutableMap.of();
 
+    @JsonIgnore
+    public Map<String, Object> getAllInput() {
+        return input;
+    }
+    @JsonIgnore
+    public Map<String, Object> getAllInputResolved() {
+        return inputResolved;
+    }
+
     transient WorkflowExecutionContext context;
     // replay instructions or a string explicit next step identifier
     public Object next;
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
index e56d38e00a..55681a7386 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
@@ -43,12 +43,14 @@ import 
org.apache.brooklyn.core.workflow.utils.WorkflowConcurrencyParser;
 import org.apache.brooklyn.core.workflow.utils.WorkflowRetentionParser;
 import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.core.flags.TypeCoercions;
 import org.apache.brooklyn.util.core.task.DynamicTasks;
 import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.javalang.Reflections;
 import org.apache.brooklyn.util.text.Strings;
 import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
@@ -56,9 +58,11 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -76,6 +80,17 @@ public class CustomWorkflowStep extends 
WorkflowStepDefinition implements Workfl
 
     private static final String WORKFLOW_SETTING_SHORTHAND = "[ \"replayable\" 
${replayable...} ] [ \"retention\" ${retention...} ] ";
 
+    /* fields which are only permitted in the registered type definition */
+    protected static final Set<String> FORBIDDEN_IN_WORKFLOW_STEP = 
MutableSet.copyOf(Arrays.asList(WorkflowCommonConfig.PARAMETER_DEFS)
+            
.stream().map(ConfigKey::getName).collect(Collectors.toSet())).asUnmodifiable();
+
+    protected static final Set<String> 
FORBIDDEN_IN_WORKFLOW_STEP_IN_SUBCLASSES = 
MutableSet.copyOf(Arrays.asList(WorkflowCommonConfig.STEPS)
+            
.stream().map(ConfigKey::getName).collect(Collectors.toSet())).put("target").asUnmodifiable();
+
+    public static final boolean 
CUSTOM_WORKFLOW_STEP_REGISTERED_TYPE_EXTENSIONS_CAN_REDUCE = false;
+    protected static final Set<String> FORBIDDEN_IN_REGISTERED_TYPE_EXTENSIONS 
=
+            (CUSTOM_WORKFLOW_STEP_REGISTERED_TYPE_EXTENSIONS_CAN_REDUCE ? 
MutableSet.<String>of() : MutableSet.of("reducing")).asUnmodifiable();
+
     public CustomWorkflowStep() {}
     public CustomWorkflowStep(String name, List<Object> steps) {
         this.name = name;
@@ -245,7 +260,7 @@ public class CustomWorkflowStep extends 
WorkflowStepDefinition implements Workfl
 
         targetR = checkTarget(targetR);
 
-        Map reducingV = 
context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_INPUT,
 reducing, Map.class);
+        Map reducingV = initializeReducingVariables(context, reducing);
 
         AtomicInteger index = new AtomicInteger(0);
         ((Iterable<?>) targetR).forEach(t -> {
@@ -263,6 +278,8 @@ public class CustomWorkflowStep extends 
WorkflowStepDefinition implements Workfl
             }
         });
 
+        initializeNestedWorkflows(context, nestedWorkflowContext);
+
         StepState state = getStepState(context);
         state.wasList = wasList;
         state.reducing = reducingV;
@@ -407,7 +424,7 @@ public class CustomWorkflowStep extends 
WorkflowStepDefinition implements Workfl
             assert submitted.isEmpty();
             if (lastWorkflowRunBeforeReplay!=null) {
                 // if interrupted we need to explicitly take from the last step
-                reducingV = 
updateReducingWorkflowVarsFromLastStep(lastWorkflowRunBeforeReplay, reducingV);
+                reducingV = getReducingWorkflowVarsFromLastStep(context, 
lastWorkflowRunBeforeReplay, reducingV);
             }
             for (Pair<WorkflowExecutionContext, Task<?>> p : 
delayedBecauseReducing) {
                 WorkflowExecutionContext wc = p.getLeft();
@@ -418,7 +435,7 @@ public class CustomWorkflowStep extends 
WorkflowStepDefinition implements Workfl
 
                 DynamicTasks.queue(p.getRight()).getUnchecked();
 
-                reducingV = updateReducingWorkflowVarsFromLastStep(wc, 
reducingV);
+                reducingV = getReducingWorkflowVarsFromLastStep(context, wc, 
reducingV);
             }
         }
 
@@ -433,16 +450,22 @@ public class CustomWorkflowStep extends 
WorkflowStepDefinition implements Workfl
             returnValue = !wasList ? Iterables.getOnlyElement(result) : result;
         } else {
             context.setOutput(reducingV);
-            
context.getWorkflowExectionContext().updateWorkflowScratchVariables(reducingV);
             returnValue = reducingV;
         }
 
         return returnValue;
     }
 
-    private static MutableMap<String, Object> 
updateReducingWorkflowVarsFromLastStep(WorkflowExecutionContext lastStep, 
Map<String, Object> prevWorkflowVars) {
-        Map<String, Object> lastStepReducingWorkflowVars = 
lastStep.getWorkflowScratchVariables();
-        Object lastStepOutput = lastStep.getOutput();
+    protected Map 
initializeReducingVariables(WorkflowStepInstanceExecutionContext context, 
Map<String, Object> reducing) {
+        return 
context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_INPUT,
 reducing, Map.class);
+    }
+
+    protected void 
initializeNestedWorkflows(WorkflowStepInstanceExecutionContext outerContext, 
List<WorkflowExecutionContext> nestedWorkflowContext) {
+    }
+
+    protected Map<String, Object> 
getReducingWorkflowVarsFromLastStep(WorkflowStepInstanceExecutionContext 
outerContext, WorkflowExecutionContext lastRun, Map<String, Object> 
prevWorkflowVars) {
+        Map<String, Object> lastStepReducingWorkflowVars = 
lastRun.getWorkflowScratchVariables();
+        Object lastStepOutput = lastRun.getOutput();
         MutableMap<String, Object> result = 
MutableMap.copyOf(prevWorkflowVars);
         prevWorkflowVars.keySet().forEach(k -> {
             result.put(k, lastStepReducingWorkflowVars.get(k));
@@ -450,6 +473,11 @@ public class CustomWorkflowStep extends 
WorkflowStepDefinition implements Workfl
                 result.put(k, ((Map) lastStepOutput).get(k));
             }
         });
+
+        // we return them as output; we also set them as scratch, which might 
not be necessary / ideal,
+        // but it's what we've done for a while, so don't break that, yet
+        
outerContext.getWorkflowExectionContext().updateWorkflowScratchVariables(result);
+
         return result;
     }
 
@@ -486,7 +514,7 @@ public class CustomWorkflowStep extends 
WorkflowStepDefinition implements Workfl
 
     @Override
     public WorkflowStepDefinition applySpecialDefinition(ManagementContext 
mgmt, Object definition, String typeBestGuess, 
WorkflowStepDefinitionWithSpecialDeserialization firstParse) {
-        // if we've resolved a custom workflow step, we need to make sure that 
the map supplied here
+        // if we've resolved a custom workflow step, we need to make sure that 
the map supplied as part of the workflow
         // - doesn't set parameters
         // - doesn't set steps unless it is a simple `workflow` step (not a 
custom step)
         // - (also caller must not override shorthand definition, but that is 
explicitly removed by WorkflowStepResolution)
@@ -497,19 +525,8 @@ public class CustomWorkflowStep extends 
WorkflowStepDefinition implements Workfl
         }
         CustomWorkflowStep result = (CustomWorkflowStep) firstParse;
         Map m = (Map)definition;
-        for (String forbiddenKey: new String[] { "parameters" }) {
-            if (m.containsKey(forbiddenKey)) {
-                throw new IllegalArgumentException("Not permitted to override 
'" + forbiddenKey + "' when using a workflow step");
-            }
-        }
-        if (!isPermittedToSetSteps(typeBestGuess)) {
-            // custom workflow step
-            for (String forbiddenKey : new String[]{"steps"}) {
-                if (m.containsKey(forbiddenKey)) {
-                    throw new IllegalArgumentException("Not permitted to 
override '" + forbiddenKey + "' when using a custom workflow step");
-                }
-            }
-        }
+        checkCallerSuppliedDefinition(typeBestGuess, m);
+
         if (m.containsKey("output")) {
             // need to restore the workflow output from the base definition
             try {
@@ -526,8 +543,34 @@ public class CustomWorkflowStep extends 
WorkflowStepDefinition implements Workfl
         return result;
     }
 
-    protected boolean isPermittedToSetSteps(String typeBestGuess) {
-        return typeBestGuess==null || 
SHORTHAND_TYPE_NAME_DEFAULT.equals(typeBestGuess) || 
CustomWorkflowStep.class.getName().equals(typeBestGuess);
+    protected void checkCallerSuppliedDefinition(String typeBestGuess, Map m) {
+        // caller (workflow author) cannot set parameters, that makes no sense
+        
FORBIDDEN_IN_WORKFLOW_STEP.stream().filter(m::containsKey).forEach(forbiddenKey 
-> {
+            throw new IllegalArgumentException("Not permitted to override '" + 
forbiddenKey + "' when using a workflow step");
+        });
+
+        if (!CUSTOM_WORKFLOW_STEP_REGISTERED_TYPE_EXTENSIONS_CAN_REDUCE && 
!isInternalClassNotExtendedAndUserAllowedToSetMostThings(typeBestGuess)) {
+            // caller can't specify these
+            
FORBIDDEN_IN_REGISTERED_TYPE_EXTENSIONS.stream().filter(m::containsKey).forEach(forbiddenKey
 -> {
+                throw new IllegalArgumentException("Not permitted to set '" + 
forbiddenKey + "' when using a custom workflow step");
+            });
+            // neither should the custom registered type itself!
+            FORBIDDEN_IN_REGISTERED_TYPE_EXTENSIONS.stream().filter(k -> 
(Reflections.getFieldValueMaybe(this, 
k).isPresentAndNonNull())).forEach(forbiddenKey -> {
+                throw new IllegalArgumentException("Not permitted for a custom 
workflow step to use '" + forbiddenKey + "'");
+            });
+        }
+        if 
(!isInternalClassNotExtendedAndUserAllowedToSetMostThings(typeBestGuess)) {
+            
FORBIDDEN_IN_WORKFLOW_STEP_IN_SUBCLASSES.stream().filter(m::containsKey).forEach(forbiddenKey
 -> {
+                throw new IllegalArgumentException("Not permitted to override 
'" + forbiddenKey + "' when using a custom workflow step");
+            });
+        }
+    }
+
+    protected boolean 
isInternalClassNotExtendedAndUserAllowedToSetMostThings(String typeBestGuess) {
+        return !isRegisteredTypeExtensionToClass(CustomWorkflowStep.class, 
SHORTHAND_TYPE_NAME_DEFAULT, typeBestGuess);
+    }
+    protected boolean isRegisteredTypeExtensionToClass(Class<? extends 
CustomWorkflowStep> clazz, String shorthandDefault, String typeBestGuess) {
+        return typeBestGuess!=null && !shorthandDefault.equals(typeBestGuess) 
&& !clazz.getName().equals(typeBestGuess);
     }
 
     protected WorkflowExecutionContext 
newWorkflow(WorkflowStepInstanceExecutionContext context, Object target, 
Integer targetIndexOrNull) {
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java
index 3c179bc19a..e4a511ce0f 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java
@@ -55,8 +55,8 @@ public class ForeachWorkflowStep extends CustomWorkflowStep {
         throw new IllegalArgumentException("Target of foreach must be a list 
or an expression that resolves to a list, not "+targetR);
     }
 
-    protected boolean isPermittedToSetSteps(String typeBestGuess) {
-        return typeBestGuess==null || 
SHORTHAND_TYPE_NAME_DEFAULT.equals(typeBestGuess) || 
ForeachWorkflowStep.class.getName().equals(typeBestGuess);
+    protected boolean 
isInternalClassNotExtendedAndUserAllowedToSetMostThings(String typeBestGuess) {
+        return !isRegisteredTypeExtensionToClass(ForeachWorkflowStep.class, 
SHORTHAND_TYPE_NAME_DEFAULT, typeBestGuess);
     }
 
     protected void 
initializeSubWorkflowForTarget(WorkflowStepInstanceExecutionContext context, 
Object target, WorkflowExecutionContext nestedWorkflowContext) {
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/SubWorkflowStep.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/SubWorkflowStep.java
new file mode 100644
index 0000000000..0e097852c7
--- /dev/null
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/SubWorkflowStep.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.workflow.steps.flow;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.brooklyn.api.mgmt.ManagementContext;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.workflow.WorkflowCommonConfig;
+import org.apache.brooklyn.core.workflow.WorkflowExecutionContext;
+import org.apache.brooklyn.core.workflow.WorkflowExpressionResolution;
+import org.apache.brooklyn.core.workflow.WorkflowStepDefinition;
+import org.apache.brooklyn.core.workflow.WorkflowStepInstanceExecutionContext;
+import org.apache.brooklyn.core.workflow.steps.CustomWorkflowStep;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.collections.MutableSet;
+import org.apache.brooklyn.util.javalang.Reflections;
+
+public class SubWorkflowStep extends CustomWorkflowStep {
+
+    public static final String SHORTHAND_TYPE_NAME_DEFAULT = "subworkflow";
+
+    protected static final Set<String> FORBIDDEN_IN_SUBWORKFLOW_STEP_ALWAYS = 
MutableSet.copyOf(FORBIDDEN_IN_REGISTERED_TYPE_EXTENSIONS)
+            .putAll(MutableSet.of("target", "concurrency")).asUnmodifiable();
+
+    public SubWorkflowStep() {}
+
+    public SubWorkflowStep(CustomWorkflowStep base) {
+        super(base);
+    }
+
+    protected boolean 
isInternalClassNotExtendedAndUserAllowedToSetMostThings(String typeBestGuess) {
+        return !isRegisteredTypeExtensionToClass(SubWorkflowStep.class, 
SHORTHAND_TYPE_NAME_DEFAULT, typeBestGuess);
+    }
+
+    protected void checkCallerSuppliedDefinition(String typeBestGuess, Map m) {
+        if 
(!isInternalClassNotExtendedAndUserAllowedToSetMostThings(typeBestGuess)) {
+            throw new IllegalArgumentException("Not permitted to define a 
custom subworkflow step");
+        }
+        // these can't be set by user or registered type for subworkflow
+        
FORBIDDEN_IN_SUBWORKFLOW_STEP_ALWAYS.stream().filter(m::containsKey).forEach(forbiddenKey
 -> {
+            throw new IllegalArgumentException("Not permitted to set '" + 
forbiddenKey + "' when using a subworkflow step");
+        });
+        FORBIDDEN_IN_SUBWORKFLOW_STEP_ALWAYS.stream().filter(k -> 
(Reflections.getFieldValueMaybe(this, 
k).isPresentAndNonNull())).forEach(forbiddenKey -> {
+            throw new IllegalArgumentException("Not permitted for a 
subworkflow step to use '" + forbiddenKey + "'");
+        });
+    }
+
+    @Override
+    protected Map 
initializeReducingVariables(WorkflowStepInstanceExecutionContext context, 
Map<String, Object> reducing) {
+        return super.initializeReducingVariables(context, 
context.getWorkflowExectionContext().getWorkflowScratchVariables());
+    }
+
+    @Override
+    protected void 
initializeNestedWorkflows(WorkflowStepInstanceExecutionContext outerContext, 
List<WorkflowExecutionContext> nestedWorkflowContext) {
+        // wouldn't work if we iterated in the sub-workflow; but it doesn't 
allow an iterable target
+        outerContext.getWorkflowExectionContext().getAllInput().forEach( (k,v) 
-> {
+            if (!outerContext.hasInput(k)) {
+                nestedWorkflowContext.forEach((c -> c.getAllInput().put(k, 
v)));
+            }
+        });
+    }
+
+    @Override
+    protected Map<String, Object> 
getReducingWorkflowVarsFromLastStep(WorkflowStepInstanceExecutionContext 
outerContext, WorkflowExecutionContext lastRun, Map<String, Object> 
prevWorkflowVars) {
+        // wouldn't work if we iterated in the sub-workflow; but it doesn't 
allow an iterable target
+        lastRun.getAllInputResolved().forEach( (k,v) -> {
+            if (outerContext.getWorkflowExectionContext().hasInput(k)) 
outerContext.getWorkflowExectionContext().noteInputResolved(k, v);
+        });
+
+        
outerContext.getWorkflowExectionContext().updateWorkflowScratchVariables(lastRun.getWorkflowScratchVariables());
+
+        // output should just be last step, not the reduced vars
+        return null;
+    }
+}
diff --git 
a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java 
b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java
index 9c4bd303e6..dba25bf967 100644
--- 
a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java
+++ 
b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java
@@ -77,6 +77,7 @@ import 
org.apache.brooklyn.core.workflow.steps.flow.NoOpWorkflowStep;
 import org.apache.brooklyn.core.workflow.steps.flow.RetryWorkflowStep;
 import org.apache.brooklyn.core.workflow.steps.flow.ReturnWorkflowStep;
 import org.apache.brooklyn.core.workflow.steps.flow.SleepWorkflowStep;
+import org.apache.brooklyn.core.workflow.steps.flow.SubWorkflowStep;
 import org.apache.brooklyn.core.workflow.steps.flow.SwitchWorkflowStep;
 import 
org.apache.brooklyn.core.workflow.steps.variables.ClearVariableWorkflowStep;
 import org.apache.brooklyn.core.workflow.steps.variables.LoadWorkflowStep;
@@ -155,6 +156,7 @@ public class WorkflowBasicTest extends 
BrooklynMgmtUnitTestSupport {
 
         addRegisteredTypeBean(mgmt, "retry", RetryWorkflowStep.class);
         addRegisteredTypeBean(mgmt, "workflow", CustomWorkflowStep.class);
+        addRegisteredTypeBean(mgmt, "subworkflow", SubWorkflowStep.class);
         addRegisteredTypeBean(mgmt, "foreach", ForeachWorkflowStep.class);
         addRegisteredTypeBean(mgmt, "ssh", SshWorkflowStep.class);
         addRegisteredTypeBean(mgmt, "shell", ShellWorkflowStep.class);
diff --git 
a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java
 
b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java
index 36667e971d..20f59d5165 100644
--- 
a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java
+++ 
b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java
@@ -18,6 +18,12 @@
  */
 package org.apache.brooklyn.core.workflow;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
 import com.google.common.collect.Iterables;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntityLocal;
@@ -61,13 +67,6 @@ import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CancellationException;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
 public class WorkflowNestedAndCustomExtensionTest extends 
RebindTestFixture<TestApplication> {
 
     private static final Logger log = 
LoggerFactory.getLogger(WorkflowNestedAndCustomExtensionTest.class);
@@ -174,8 +173,15 @@ public class WorkflowNestedAndCustomExtensionTest extends 
RebindTestFixture<Test
 
         Asserts.assertFailsWith(() -> 
invokeWorkflowStepsWithLogging(MutableList.of(
                         MutableMap.of("type", "log-hi",
-                                "steps", MutableList.of("return not allowed to 
override")))),
-                e -> Asserts.expectedFailureContainsIgnoreCase(e, "steps"));
+                                "steps", MutableList.of("return should have 
failed because not allowed to override")))),
+                Asserts.expectedFailureContainsIgnoreCase("error", "in 
definition", "step 1", "steps=", "should have failed"));
+
+        Asserts.assertFailsWith(() -> 
invokeWorkflowStepsWithLogging(MutableList.of(
+                        "log-hi")),
+                Asserts.expectedFailureContainsIgnoreCase("evaluated to null 
or missing", "name").and(
+                        
Asserts.expectedFailureDoesNotContainIgnoreCase("recursive")
+                )
+        );
     }
 
     @Test
diff --git 
a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowSubAndCustomExtensionEdgeTest.java
 
b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowSubAndCustomExtensionEdgeTest.java
new file mode 100644
index 0000000000..dc6fe61ffc
--- /dev/null
+++ 
b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowSubAndCustomExtensionEdgeTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.workflow;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.typereg.RegisteredType;
+import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
+import org.apache.brooklyn.core.mgmt.rebind.RebindOptions;
+import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixture;
+import org.apache.brooklyn.core.resolve.jackson.BeanWithTypePlanTransformer;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestApplication;
+import org.apache.brooklyn.core.typereg.BasicTypeImplementationPlan;
+import org.apache.brooklyn.core.workflow.steps.CustomWorkflowStep;
+import org.apache.brooklyn.core.workflow.store.WorkflowRetentionAndExpiration;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.text.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+
+public class WorkflowSubAndCustomExtensionEdgeTest extends 
RebindTestFixture<TestApplication> {
+
+    private static final Logger log = 
LoggerFactory.getLogger(WorkflowSubAndCustomExtensionEdgeTest.class);
+
+    @Override
+    protected LocalManagementContext 
decorateOrigOrNewManagementContext(LocalManagementContext mgmt) {
+        WorkflowBasicTest.addWorkflowStepTypes(mgmt);
+        app = null; // clear this
+        
mgmt.getBrooklynProperties().put(WorkflowRetentionAndExpiration.WORKFLOW_RETENTION_DEFAULT,
 "forever");
+        return super.decorateOrigOrNewManagementContext(mgmt);
+    }
+
+    @Override
+    protected TestApplication createApp() {
+        return null;
+    }
+
+    @Override protected TestApplication rebind() throws Exception {
+        return 
rebind(RebindOptions.create().terminateOrigManagementContext(true));
+    }
+
+    public RegisteredType addBeanWithType(String typeName, String version, 
String plan) {
+        return BrooklynAppUnitTestSupport.addRegisteredTypeBean(mgmt(), 
typeName, version,
+                new 
BasicTypeImplementationPlan(BeanWithTypePlanTransformer.FORMAT, plan));
+    }
+
+    TestApplication app;
+    Task<?> lastInvocation;
+
+    Object runWorkflow(List<Object> steps) throws Exception {
+        return runWorkflow(steps, null);
+    }
+    Object runWorkflow(List<Object> steps, ConfigBag extraEffectorConfig) 
throws Exception {
+        if (app==null) app = 
mgmt().getEntityManager().createEntity(EntitySpec.create(TestApplication.class));
+        WorkflowEffector eff = new WorkflowEffector(ConfigBag.newInstance()
+                .configure(WorkflowEffector.EFFECTOR_NAME, "myWorkflow")
+                .configure(WorkflowEffector.STEPS, steps)
+                .putAll(extraEffectorConfig));
+        eff.apply((EntityLocal)app);
+
+        lastInvocation = 
app.invoke(app.getEntityType().getEffectorByName("myWorkflow").get(), null);
+        return lastInvocation.getUnchecked();
+    }
+
+    @Test
+    public void testVisibilityOfReducingWhenDefiningCustomWorkflowStep() 
throws Exception {
+        // test the fixture is right
+        MutableMap<String, Serializable> basicReducingStep = MutableMap.of(
+                "type", "workflow",
+                "reducing", MutableMap.of("hello_word", "${hello_word}"),
+                "steps", MutableList.of("set-sensor hi = ${hello_word} 
world"));
+        runWorkflow(MutableList.of("let hello_word = HI", basicReducingStep));
+        EntityAsserts.assertAttributeEquals(app, 
Sensors.newSensor(String.class, "hi"), "HI world");
+
+        addBeanWithType("set-sensor-hi-reducing", "1-SNAPSHOT", Strings.lines(
+                "type: workflow",
+                "parameters:",
+                "  value: {}",
+                "reducing:",
+                "  hello_word: ${hello_word}",
+                "steps:",
+                "  - let hi_word = ${hello_word} ?? hi",
+                "  - set-sensor hi = ${hi_word} ${value}",
+                "  - let hello_word = bye"
+        ));
+
+        if 
(CustomWorkflowStep.CUSTOM_WORKFLOW_STEP_REGISTERED_TYPE_EXTENSIONS_CAN_REDUCE) 
{
+            runWorkflow(MutableList.of(
+                    "let hello_word = HELLO",
+                    MutableMap.of("type", "set-sensor-hi-reducing", "input", 
MutableMap.of("value", "bob")),
+                    "return ${hello_word}"));
+            Asserts.assertEquals(lastInvocation.getUnchecked(), "bye");
+            EntityAsserts.assertAttributeEquals(app, 
Sensors.newSensor(String.class, "hi"), "HELLO bob");
+        } else {
+            Asserts.assertFailsWith(() -> runWorkflow(MutableList.of(
+                    "let hello_word = HELLO",
+                    MutableMap.of("type", "set-sensor-hi-reducing", "input", 
MutableMap.of("value", "bob")),
+                    "return ${hello_word}")),
+                Asserts.expectedFailureContainsIgnoreCase("not permitted", 
"reducing"));
+        }
+
+        addBeanWithType("set-sensor-hi", "1-SNAPSHOT", Strings.lines(
+                "type: workflow",
+                "parameters:",
+                "  value: {}",
+                "steps:",
+                "  - set-sensor hi = hi ${value}"
+        ));
+        // value is a poor choice with set-sensor, because set-sensor tries to 
evaluate the input; but let's make sure the message is not too confusing
+        Asserts.assertFailsWith(() -> runWorkflow(MutableList.of(
+                "let hello_word = HI",
+                "set-sensor-hi")),
+                Asserts.expectedFailureContainsIgnoreCase("recursive or 
missing reference","value")
+        );
+    }
+
+    @Test
+    public void testSubWorkflowStep() throws Exception {
+        runWorkflow(MutableList.of(
+                "let v1 = V1",
+                "let v2 = V2",
+                MutableMap.of("step", "subworkflow",
+                    "steps", MutableList.of(
+                            "let v0 = ${v0}B",
+                            "let v1 = ${v1}B",
+                            "let v3 = V3B",
+                            "return done")),
+                "return ${v0}-${v1}-${v2}-${v3}-${output}"),
+                ConfigBag.newInstance().configure(WorkflowCommonConfig.INPUT, 
MutableMap.of("v0", "V0")) );
+        Asserts.assertEquals(lastInvocation.getUnchecked(), 
"V0B-V1B-V2-V3B-done");
+    }
+
+}
diff --git a/karaf/init/src/main/resources/catalog.bom 
b/karaf/init/src/main/resources/catalog.bom
index ba10136645..c606ff1914 100644
--- a/karaf/init/src/main/resources/catalog.bom
+++ b/karaf/init/src/main/resources/catalog.bom
@@ -184,6 +184,11 @@ brooklyn.catalog:
     itemType: bean
     item:
       type: org.apache.brooklyn.core.workflow.steps.flow.ForeachWorkflowStep
+  - id: subworkflow
+    format: java-type-name
+    itemType: bean
+    item:
+      type: org.apache.brooklyn.core.workflow.steps.flow.SubWorkflowStep
   - id: retry
     format: java-type-name
     itemType: bean
diff --git a/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java 
b/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
index fdfe11ab96..fedcd9468d 100644
--- a/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
+++ b/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
@@ -1239,6 +1239,18 @@ public class Asserts {
             }
         }
     }
+
+    public static void assertStringDoesNotContainIgnoreCase(String input, 
String phrase1ToNotContain, String ...optionalOtherPhrasesToNotContain) {
+        if (input==null) fail("Input is null.");
+        if (phrase1ToNotContain!=null) {
+            assertThat(input, 
Predicates.not(StringPredicates.containsLiteralIgnoreCase(phrase1ToNotContain)));
+        }
+        for (String otherPhrase: optionalOtherPhrasesToNotContain) {
+            if (otherPhrase!=null) {
+                assertThat(input, 
Predicates.not(StringPredicates.containsLiteralIgnoreCase(otherPhrase)));
+            }
+        }
+    }
     
     public static void assertStringMatchesRegex(String input, String 
regex1ToMatch, String ...optionalOtherRegexesToMatch) {
         if (input==null) fail("Input is null.");
@@ -1369,7 +1381,22 @@ public class Asserts {
         }
         return true;
     }
-    
+    public static boolean expectedFailureDoesNotContainIgnoreCase(Throwable e, 
String phrase1ToNotContain, String ...optionalOtherPhrasesToNotContain) {
+        if (e instanceof ShouldHaveFailedPreviouslyAssertionError) throw 
(Error)e;
+        try {
+            assertStringDoesNotContainIgnoreCase(Exceptions.collapseText(e), 
phrase1ToNotContain, optionalOtherPhrasesToNotContain);
+        } catch (AssertionError ee) {
+            rethrowPreferredException(e, ee);
+        }
+        return true;
+    }
+    public static Predicate<Throwable> expectedFailureDoesNotContain( String 
phrase1ToNotContain, String ...optionalOtherPhrasesToNotContain) {
+        return e -> expectedFailureDoesNotContain(e, phrase1ToNotContain, 
optionalOtherPhrasesToNotContain);
+    }
+    public static Predicate<Throwable> 
expectedFailureDoesNotContainIgnoreCase(String phrase1ToNotContain, String 
...optionalOtherPhrasesToNotContain) {
+        return e -> expectedFailureDoesNotContainIgnoreCase(e, 
phrase1ToNotContain, optionalOtherPhrasesToNotContain);
+    }
+
     /** Implements the return behavior for {@link 
#expectedFailureOfType(Throwable, Class, Class...)} and others,
      * to log interesting earlier errors but to suppress those which are 
internal or redundant. */
     private static void rethrowPreferredException(Throwable 
earlierPreferredIfFatalElseLogged, Throwable laterPreferredOtherwise) throws 
AssertionError {


Reply via email to