This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit a09ab7e1aadccc6844ac2cf78b0040983ed9430a Author: Alex Heneveld <a...@cloudsoft.io> AuthorDate: Thu Feb 22 09:09:21 2024 +0000 better workflow step resolution, using workflow/entity/mgmt context --- .../brooklyn/core/entity/EntityAdjuncts.java | 1 + .../brooklyn/core/objs/BrooklynObjectInternal.java | 5 + .../core/workflow/WorkflowErrorHandling.java | 2 +- .../core/workflow/WorkflowExecutionContext.java | 4 +- .../core/workflow/WorkflowStepResolution.java | 107 ++++++++++++++++----- .../core/workflow/steps/CustomWorkflowStep.java | 4 +- .../core/workflow/steps/flow/SubWorkflowStep.java | 7 ++ .../workflow/steps/flow/SwitchWorkflowStep.java | 4 +- .../brooklyn/core/workflow/WorkflowBasicTest.java | 4 +- .../rest/resources/ApplicationResourceTest.java | 1 + 10 files changed, 107 insertions(+), 32 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/EntityAdjuncts.java b/core/src/main/java/org/apache/brooklyn/core/entity/EntityAdjuncts.java index 3bdf12b3cc..fcc6032683 100644 --- a/core/src/main/java/org/apache/brooklyn/core/entity/EntityAdjuncts.java +++ b/core/src/main/java/org/apache/brooklyn/core/entity/EntityAdjuncts.java @@ -210,6 +210,7 @@ public class EntityAdjuncts { /** supported by nearly all EntityAdjuncts, but a few in the wild might that don't extend the standard AbstractEntityAdjunct might not implement this; see {@link #getEntity()} */ public interface EntityAdjunctProxyable extends EntityAdjunct { Entity getEntity(); + ManagementContext getManagementContext(); } } diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java b/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java index 56128442d8..953f106d7a 100644 --- a/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java +++ b/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java @@ -36,6 +36,11 @@ import org.apache.brooklyn.util.guava.Maybe; import com.google.common.annotations.Beta; +/** + * Super-interface for entity and adjunct actual instances. + * Note that delegates, esp for adjuncts, do not necessarily implement this. + * Consider using {@link org.apache.brooklyn.core.entity.EntityAdjuncts.EntityAdjunctProxyable} for adjuncts. + */ public interface BrooklynObjectInternal extends BrooklynObject, Rebindable { void setCatalogItemId(String id); diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowErrorHandling.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowErrorHandling.java index 8d5a21fac6..cae5956060 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowErrorHandling.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowErrorHandling.java @@ -111,7 +111,7 @@ public class WorkflowErrorHandling implements Callable<WorkflowErrorHandling.Wor public WorkflowErrorHandling(Object errorOptionsO, WorkflowExecutionContext context, Integer stepIndexIfStepErrorHandler, Task<?> failedTask, Throwable error) { List<Object> errorOptions = wrappedInListIfNecessaryOrNullIfEmpty(errorOptionsO); - this.errorOptions = WorkflowStepResolution.resolveSubSteps(context.getManagementContext(), "error handling", errorOptions); + this.errorOptions = new WorkflowStepResolution(context).resolveSubSteps("error handling", errorOptions); this.context = context; this.stepIndexIfStepErrorHandler = stepIndexIfStepErrorHandler; this.failedTask = failedTask; diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java index 302ef6144e..72b5477b04 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java @@ -326,7 +326,7 @@ public class WorkflowExecutionContext { w.timeout = paramsDefiningWorkflow.get(WorkflowCommonConfig.TIMEOUT); w.onError = paramsDefiningWorkflow.get(WorkflowCommonConfig.ON_ERROR); // fail fast if error steps not resolveable - WorkflowStepResolution.resolveSubSteps(w.getManagementContext(), "error handling", WorkflowErrorHandling.wrappedInListIfNecessaryOrNullIfEmpty(w.onError)); + new WorkflowStepResolution(w).resolveSubSteps("error handling", WorkflowErrorHandling.wrappedInListIfNecessaryOrNullIfEmpty(w.onError)); // some fields need to be resolved at setting time, in the context of the workflow w.setCondition(paramsDefiningWorkflow.getStringKey(WorkflowCommonConfig.CONDITION.getName())); @@ -937,7 +937,7 @@ public class WorkflowExecutionContext { @JsonIgnore List<WorkflowStepDefinition> getStepsResolved() { if (stepsResolved ==null) { - stepsResolved = MutableList.copyOf(WorkflowStepResolution.resolveSteps(getManagementContext(), WorkflowExecutionContext.this.stepsDefinition, outputDefinition)); + stepsResolved = MutableList.copyOf(new WorkflowStepResolution(this).resolveSteps(WorkflowExecutionContext.this.stepsDefinition, outputDefinition)); } return stepsResolved; } diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepResolution.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepResolution.java index 8526483ed8..c08b1d2c36 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepResolution.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepResolution.java @@ -18,31 +18,29 @@ */ package org.apache.brooklyn.core.workflow; -import com.google.common.base.Predicates; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; + import com.google.common.collect.Iterables; import com.google.common.reflect.TypeToken; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.mgmt.ManagementContext; import org.apache.brooklyn.api.mgmt.classloading.BrooklynClassLoadingContext; import org.apache.brooklyn.api.objs.BrooklynObject; -import org.apache.brooklyn.core.entity.BrooklynConfigKeys; -import org.apache.brooklyn.core.entity.EntityPredicates; +import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.entity.EntityAdjuncts.EntityAdjunctProxyable; import org.apache.brooklyn.core.mgmt.internal.AbstractManagementContext; -import org.apache.brooklyn.core.mgmt.internal.AppGroupTraverser; import org.apache.brooklyn.core.objs.BrooklynObjectInternal; import org.apache.brooklyn.core.resolve.jackson.BeanWithTypeUtils; import org.apache.brooklyn.core.typereg.RegisteredTypes; -import org.apache.brooklyn.core.workflow.steps.CustomWorkflowStep; import org.apache.brooklyn.core.workflow.steps.flow.SubWorkflowStep; import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.core.config.ConfigBag; +import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.guava.Maybe; -import org.apache.brooklyn.util.javalang.Boxing; - -import java.util.List; -import java.util.Map; public class WorkflowStepResolution { @@ -50,6 +48,51 @@ public class WorkflowStepResolution { return resolveSteps(mgmt, steps, null); } public static List<WorkflowStepDefinition> resolveSteps(ManagementContext mgmt, List<Object> steps, Object outputDefinition) { + return new WorkflowStepResolution(mgmt, null, null).resolveSteps(steps, outputDefinition); + } + public static List<WorkflowStepDefinition> resolveSubSteps(ManagementContext mgmt, String scope, List<Object> subSteps) { + return new WorkflowStepResolution(mgmt, null, null).resolveSubSteps(scope, subSteps); + } + + private final ManagementContext _mgmt; + private final BrooklynObject _broolynObject; + private final WorkflowExecutionContext _workflow; + + public WorkflowStepResolution(WorkflowExecutionContext context) { + this(null, null, context); + } + public WorkflowStepResolution(BrooklynObject bo) { + this(null, bo, null); + } + public WorkflowStepResolution(ManagementContext mgmt, BrooklynObject bo, WorkflowExecutionContext workflow) { + this._mgmt = mgmt; + this._broolynObject = bo; + this._workflow = workflow; + } + + public ManagementContext mgmt() { + if (_mgmt!=null) return _mgmt; + if (_workflow!=null) return _workflow.getManagementContext(); + BrooklynObject bo = brooklynObject(); + if (bo instanceof BrooklynObjectInternal) return ((BrooklynObjectInternal)bo).getManagementContext(); + if (bo instanceof EntityAdjunctProxyable) return ((EntityAdjunctProxyable)bo).getManagementContext(); + return null; + } + + public BrooklynObject brooklynObject() { + if (_broolynObject!=null) return _broolynObject; + if (_workflow!=null) return _workflow.getEntityOrAdjunctWhereRunning(); + return null; + } + + public Entity entity() { + BrooklynObject bo = brooklynObject(); + if (bo==null || (bo instanceof Entity)) return (Entity) bo; + if (bo instanceof EntityAdjunctProxyable) return ((EntityAdjunctProxyable)bo).getEntity(); + return null; + } + + public List<WorkflowStepDefinition> resolveSteps(List<Object> steps, Object outputDefinition) { List<WorkflowStepDefinition> result = MutableList.of(); if (steps==null || steps.isEmpty()) { if (outputDefinition==null) throw new IllegalStateException("No steps defined in workflow and no output set"); @@ -58,34 +101,42 @@ public class WorkflowStepResolution { } for (int i=0; i<steps.size(); i++) { try { - result.add(resolveStep(mgmt, steps.get(i))); + result.add(resolveStep(steps.get(i))); } catch (Exception e) { throw Exceptions.propagateAnnotated("Error in definition of step "+(i+1)+" ("+steps.get(i)+")", e); } } - WorkflowExecutionContext.validateSteps(mgmt, result, true); + WorkflowExecutionContext.validateSteps(mgmt(), result, true); return result; } - public static List<WorkflowStepDefinition> resolveSubSteps(ManagementContext mgmt, String scope, List<Object> subSteps) { + public List<WorkflowStepDefinition> resolveSubSteps(String scope, List<Object> subSteps) { List<WorkflowStepDefinition> result = MutableList.of(); if (subSteps!=null) { + // it's useful to allow subworkflows + // XXX also useful to compress if a long-winded syntax was used +// if (subSteps.size()==1) { +// WorkflowStepDefinition subStepResolved = resolveStep(mgmt, Iterables.getOnlyElement(subSteps)); +// if (subStepResolved instanceof SubWorkflowStep && ((SubWorkflowStep)subStepResolved).isSimpleListOfStepsOnly()) { +// return resolveSubSteps(mgmt, scope, ((SubWorkflowStep)subStepResolved).peekSteps()); +// } +// } subSteps.forEach(subStep -> { - WorkflowStepDefinition subStepResolved = resolveStep(mgmt, subStep); + WorkflowStepDefinition subStepResolved = resolveStep(subStep); if (subStepResolved.getId() != null) throw new IllegalArgumentException("Sub steps for "+scope+" are not permitted to have IDs: " + subStep); - if (subStepResolved instanceof CustomWorkflowStep && ((CustomWorkflowStep)subStepResolved).peekSteps()!=null) - throw new IllegalArgumentException("Sub steps for "+scope+" are not permitted to run sub-workflows: " + subStep); +// if (subStepResolved instanceof CustomWorkflowStep && ((CustomWorkflowStep)subStepResolved).peekSteps()!=null) +// throw new IllegalArgumentException("Sub steps for "+scope+" are not permitted to run sub-workflows: " + subStep); result.add(subStepResolved); }); } return result; } - static WorkflowStepDefinition resolveStep(ManagementContext mgmt, Object def) { + public WorkflowStepDefinition resolveStep(Object def) { if (def instanceof WorkflowStepDefinition) return (WorkflowStepDefinition) def; - BrooklynClassLoadingContext loader = RegisteredTypes.getCurrentClassLoadingContextOrManagement(mgmt); + BrooklynClassLoadingContext loader = RegisteredTypes.getClassLoadingContextMaybe(brooklynObject()).or(() -> RegisteredTypes.getCurrentClassLoadingContextOrManagement(mgmt())); String shorthand = null; Map defM = null; @@ -126,7 +177,7 @@ public class WorkflowStepResolution { } if (s instanceof Map && defM.size()==1) { // allow shorthand to contain a nested map if the shorthand is the only thing in the map, eg { step: { step: "xxx" } } - return resolveStep(mgmt, s); + return resolveStep(s); } if (!(s instanceof String)) { throw new IllegalArgumentException("step shorthand must be a string"); @@ -153,10 +204,20 @@ public class WorkflowStepResolution { try { Object def0 = defM !=null ? defM : def; - def = BeanWithTypeUtils.convert(mgmt, def0, TypeToken.of(WorkflowStepDefinition.class), true, loader, true); + + // if it's unable to convert a complex type via the above, the original type will be returned; the above doesn't fail. + // this is checked below so it's not a serious error, but the reason for it might be obscured. + Callable<Object> converter = () -> BeanWithTypeUtils.convert(mgmt(), def0, TypeToken.of(WorkflowStepDefinition.class), true, loader, true); + Entity entity = entity(); + if (entity==null) { + def = converter.call(); + } else { + // run in a task context if we can, to facilitate conversion and type lookup + def = Entities.submit(entity, Tasks.create("convert steps", converter)).getUnchecked(); + } if (def instanceof WorkflowStepDefinition.WorkflowStepDefinitionWithSpecialDeserialization) { - def = ((WorkflowStepDefinition.WorkflowStepDefinitionWithSpecialDeserialization)def).applySpecialDefinition(mgmt, def0, typeBestGuess, (WorkflowStepDefinition.WorkflowStepDefinitionWithSpecialDeserialization) def); + def = ((WorkflowStepDefinition.WorkflowStepDefinitionWithSpecialDeserialization)def).applySpecialDefinition(mgmt(), def0, typeBestGuess, (WorkflowStepDefinition.WorkflowStepDefinitionWithSpecialDeserialization) def); } } catch (Exception e) { throw Exceptions.propagateAnnotated("Unable to resolve step '"+def+"'", e); @@ -177,10 +238,10 @@ public class WorkflowStepResolution { List<Object> onError = WorkflowErrorHandling.wrappedInListIfNecessaryOrNullIfEmpty(defW.getOnError()); if (onError!=null && !onError.isEmpty()) { - defW.onError = resolveSubSteps(mgmt, "error handling", onError); + defW.onError = resolveSubSteps("error handling", onError); } - defW.validateStep(mgmt, null); + defW.validateStep(mgmt(), null); return defW; } else { @@ -197,7 +258,7 @@ public class WorkflowStepResolution { if (!hasCondition && entityOrAdjunctWhereRunningIfKnown!=null) { // ideally try to resolve the steps at entity init time; except if a condition is required we skip that so you can have steps that only resolve late, // and if entity isn't available then we don't need that either - WorkflowStepResolution.resolveSteps( ((BrooklynObjectInternal)entityOrAdjunctWhereRunningIfKnown).getManagementContext(), steps, params.containsKey(WorkflowCommonConfig.OUTPUT.getName()) ? "has_output" : null); + new WorkflowStepResolution(entityOrAdjunctWhereRunningIfKnown).resolveSteps(steps, params.containsKey(WorkflowCommonConfig.OUTPUT.getName()) ? "has_output" : null); } } diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java index 55681a7386..a7c766f339 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java @@ -180,8 +180,8 @@ public class CustomWorkflowStep extends WorkflowStepDefinition implements Workfl super.validateStep(mgmt, workflow); if (steps instanceof List) { - if (((List)steps).isEmpty()) throw new IllegalArgumentException("Workflow `steps` must be supplied for a custom or nested workflow"); - WorkflowStepResolution.resolveSteps(mgmt, (List<Object>) steps, null); + if (steps.isEmpty()) throw new IllegalArgumentException("Workflow `steps` must be supplied for a custom or nested workflow"); + new WorkflowStepResolution(mgmt, null, workflow).resolveSteps(steps, null); } else if (steps!=null) throw new IllegalArgumentException("Workflow `steps` must be a list"); else if (target!=null) throw new IllegalArgumentException("Workflow cannot take a `target` without `steps`"); } diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/SubWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/SubWorkflowStep.java index 0e097852c7..416853aa23 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/SubWorkflowStep.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/SubWorkflowStep.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import com.google.common.base.MoreObjects; import org.apache.brooklyn.api.mgmt.ManagementContext; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.workflow.WorkflowCommonConfig; @@ -66,6 +67,12 @@ public class SubWorkflowStep extends CustomWorkflowStep { }); } + public WorkflowStepDefinition applySpecialDefinition(ManagementContext mgmt, Object definition, String typeBestGuess, WorkflowStepDefinitionWithSpecialDeserialization firstParse) { + // allow null guesses and other types to instantiate this + if (typeBestGuess==null || !(definition instanceof Map)) return this; + return super.applySpecialDefinition(mgmt, definition, typeBestGuess, firstParse); + } + @Override protected Map initializeReducingVariables(WorkflowStepInstanceExecutionContext context, Map<String, Object> reducing) { return super.initializeReducingVariables(context, context.getWorkflowExectionContext().getWorkflowScratchVariables()); diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/SwitchWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/SwitchWorkflowStep.java index 107bff89f6..d1da619394 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/SwitchWorkflowStep.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/SwitchWorkflowStep.java @@ -60,7 +60,7 @@ public class SwitchWorkflowStep extends WorkflowStepDefinition implements Workfl public void validateStep(@Nullable ManagementContext mgmt, @Nullable WorkflowExecutionContext workflow) { super.validateStep(mgmt, workflow); if (cases==null) throw new IllegalStateException("No cases defined for "+Strings.firstNonBlank(getName(), "switch")); - List<WorkflowStepDefinition> stepsResolved = WorkflowStepResolution.resolveSubSteps(mgmt, Strings.firstNonBlank(getName(), "switch"), cases); + List<WorkflowStepDefinition> stepsResolved = new WorkflowStepResolution(mgmt, null, workflow).resolveSubSteps(Strings.firstNonBlank(getName(), "switch"), cases); if (stepsResolved.size()>1) { for (int i = 0; i < stepsResolved.size()-1; i++) { if (stepsResolved.get(i).getConditionRaw() == null) { @@ -96,7 +96,7 @@ public class SwitchWorkflowStep extends WorkflowStepDefinition implements Workfl @Override protected Object doTaskBody(WorkflowStepInstanceExecutionContext context) { - List<WorkflowStepDefinition> stepsResolved = WorkflowStepResolution.resolveSubSteps(context.getManagementContext(), getName(), cases); + List<WorkflowStepDefinition> stepsResolved = new WorkflowStepResolution(context.getWorkflowExectionContext()).resolveSubSteps(getName(), cases); Object valueResolved = context.getInput(VALUE); for (int i = 0; i<stepsResolved.size(); i++) { diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java index dba25bf967..10124ada75 100644 --- a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java @@ -227,7 +227,7 @@ public class WorkflowBasicTest extends BrooklynMgmtUnitTestSupport { Asserts.assertInstanceOf(s, NoOpWorkflowStep.class); // util - s = WorkflowStepResolution.resolveStep(mgmt, input); + s = new WorkflowStepResolution(mgmt, null, null).resolveStep(input); Asserts.assertInstanceOf(s, NoOpWorkflowStep.class); String output1 = BrooklynObjectsJsonMapper.newDslToStringSerializingMapper(mgmt).writeValueAsString(s); @@ -243,7 +243,7 @@ public class WorkflowBasicTest extends BrooklynMgmtUnitTestSupport { String input = "sleep 1s"; // jackson doesn't handle shorthand; our custom method does that - WorkflowStepDefinition s = WorkflowStepResolution.resolveStep(mgmt, input); + WorkflowStepDefinition s = new WorkflowStepResolution(mgmt, null, null).resolveStep(input); Asserts.assertInstanceOf(s, SleepWorkflowStep.class); Asserts.assertEquals( Duration.of(s.getInput().get(SleepWorkflowStep.DURATION.getName())), Duration.ONE_SECOND); diff --git a/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/resources/ApplicationResourceTest.java b/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/resources/ApplicationResourceTest.java index 1882fdf42e..48d037e262 100644 --- a/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/resources/ApplicationResourceTest.java +++ b/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/resources/ApplicationResourceTest.java @@ -205,6 +205,7 @@ public class ApplicationResourceTest extends BrooklynRestResourceTest { } @Test(dependsOnMethods = { "testDeployApplication", "testLocatedLocation" }) + // one intermittent failure observed 2024-02-22; possibly a local mvn jar update conflict but flagging in case something else public void testDeployApplicationFromInterface() throws Exception { ApplicationSpec spec = ApplicationSpec.builder() .type(BasicApplication.class.getCanonicalName())