This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
The following commit(s) were added to refs/heads/master by this push: new e48b67e6dc allow on_update_child, and accept numbers as entity id's e48b67e6dc is described below commit e48b67e6dc31462c05042f75949f0dd1dce1e0f2 Author: Alex Heneveld <a...@cloudsoft.io> AuthorDate: Thu Mar 21 16:06:25 2024 +0000 allow on_update_child, and accept numbers as entity id's --- .../core/workflow/steps/CustomWorkflowStep.java | 20 ++++++- .../steps/appmodel/UpdateChildrenWorkflowStep.java | 65 +++++++++++++++------- .../workflow/steps/flow/ForeachWorkflowStep.java | 2 + .../workflow/WorkflowUpdateChildrenStepTest.java | 36 ++++++++++++ 4 files changed, 102 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java index d61cf7bd8a..237472eec8 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java @@ -160,6 +160,9 @@ public class CustomWorkflowStep extends WorkflowStepDefinition implements Workfl protected Object target_var_name; protected Object target_index_var_name; + // hard-coded key if the entity where it should run should be extracted from a map - should be enhanced in future to be a var expression, + // but often the vars people would want to use are not exposed + protected String target_entity_key; // see WorkflowCommonConfig.LOCK protected Object lock; @@ -606,14 +609,27 @@ public class CustomWorkflowStep extends WorkflowStepDefinition implements Workfl String name = getSubworkflowName(target, targetIndexOrNullIfNotList); + BrooklynObject targetEntity = null; + // would be nice for this to be more configurable, but would want to access foreach vars, which aren't set until 'initializeSubworkflow' below; + // so for now use a map + if (Strings.isNonBlank(target_entity_key)) { + if (target instanceof Map) { + Object targetEntity0 = ((Map)target).get(target_entity_key); + if (targetEntity0==null) throw new IllegalArgumentException("Cannot find entity key '"+target_entity_key+"' in "+target); + targetEntity = WorkflowStepResolution.findEntity(context, targetEntity0).get(); + } else { + throw new IllegalArgumentException("Cannot specify entity key '"+target_entity_key+"' with non-map target entry "+target); + } + } + if (targetEntity==null) targetEntity = target instanceof BrooklynObject ? (BrooklynObject) target : context.getEntity(); + WorkflowExecutionContext nestedWorkflowContext = WorkflowExecutionContext.newInstanceUnpersistedWithParent( - target instanceof BrooklynObject ? (BrooklynObject) target : context.getEntity(), context.getWorkflowExectionContext(), + targetEntity, context.getWorkflowExectionContext(), WorkflowExecutionContext.WorkflowContextType.NESTED_WORKFLOW, name, getConfigForSubWorkflow(false), null, ConfigBag.newInstance(getInput()), null); - String tivn = context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_INPUT, target_index_var_name, String.class); if (targetIndexOrNullIfNotList!=null) nestedWorkflowContext.updateWorkflowScratchVariable(tivn == null ? TARGET_INDEX_VAR_NAME_DEFAULT : tivn, targetIndexOrNullIfNotList); initializeSubWorkflowForTarget(context, target, nestedWorkflowContext); diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/UpdateChildrenWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/UpdateChildrenWorkflowStep.java index df2cd13533..08c805b490 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/UpdateChildrenWorkflowStep.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/UpdateChildrenWorkflowStep.java @@ -30,24 +30,29 @@ import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.workflow.*; import org.apache.brooklyn.core.workflow.steps.CustomWorkflowStep; import org.apache.brooklyn.core.workflow.steps.flow.ForeachWorkflowStep; +import org.apache.brooklyn.core.workflow.steps.flow.SubWorkflowStep; import org.apache.brooklyn.core.workflow.steps.variables.SetVariableWorkflowStep; import org.apache.brooklyn.core.workflow.store.WorkflowStatePersistenceViaSensors; import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.collections.MutableSet; import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.core.flags.TypeCoercions; import org.apache.brooklyn.util.core.task.DynamicTasks; import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.core.text.TemplateProcessor; import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.javalang.Boxing; import org.apache.brooklyn.util.text.StringEscapes; import org.apache.brooklyn.util.text.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -77,6 +82,7 @@ public class UpdateChildrenWorkflowStep extends WorkflowStepDefinition implement public static final ConfigKey<CustomWorkflowStep> DELETION_CHECK_WORKFLOW = ConfigKeys.builder(CustomWorkflowStep.class, "deletion_check").build(); public static final ConfigKey<CustomWorkflowStep> ON_CREATE_WORKFLOW = ConfigKeys.builder(CustomWorkflowStep.class, "on_create").build(); public static final ConfigKey<CustomWorkflowStep> ON_UPDATE_WORKFLOW = ConfigKeys.builder(CustomWorkflowStep.class, "on_update").build(); + public static final ConfigKey<CustomWorkflowStep> ON_UPDATE_CHILD_WORKFLOW = ConfigKeys.builder(CustomWorkflowStep.class, "on_update_child").build(); public static final ConfigKey<CustomWorkflowStep> ON_DELETE_WORKFLOW = ConfigKeys.builder(CustomWorkflowStep.class, "on_delete").build(); /* @@ -166,6 +172,7 @@ public class UpdateChildrenWorkflowStep extends WorkflowStepDefinition implement if (stepState.creationCheck!=null && stepState.creationCheck.workflowTag!=null) return retrieveSubWorkflow(context, stepState.creationCheck.workflowTag.getWorkflowId()); if (stepState.onCreate !=null && stepState.onCreate.workflowTag!=null) return retrieveSubWorkflow(context, stepState.onCreate.workflowTag.getWorkflowId()); if (stepState.onUpdate!=null && stepState.onUpdate.workflowTag!=null) return retrieveSubWorkflow(context, stepState.onUpdate.workflowTag.getWorkflowId()); + if (stepState.onUpdateChild!=null && stepState.onUpdateChild.workflowTag!=null) return retrieveSubWorkflow(context, stepState.onUpdateChild.workflowTag.getWorkflowId()); if (stepState.deletionCheck!=null && stepState.deletionCheck.workflowTag!=null) return retrieveSubWorkflow(context, stepState.deletionCheck.workflowTag.getWorkflowId()); if (stepState.onDelete!=null && stepState.onDelete.workflowTag!=null) return retrieveSubWorkflow(context, stepState.onDelete.workflowTag.getWorkflowId()); return null; @@ -190,6 +197,7 @@ public class UpdateChildrenWorkflowStep extends WorkflowStepDefinition implement WorkflowTagWithResult<List<Map>> creationCheck = new WorkflowTagWithResult<>(); WorkflowTagWithResult<Object> onCreate = new WorkflowTagWithResult<>(); WorkflowTagWithResult<Object> onUpdate = new WorkflowTagWithResult<>(); + WorkflowTagWithResult<Object> onUpdateChild = new WorkflowTagWithResult<>(); WorkflowTagWithResult<List> deletionCheck = new WorkflowTagWithResult<>(); WorkflowTagWithResult<Object> onDelete = new WorkflowTagWithResult<>(); } @@ -274,7 +282,7 @@ public class UpdateChildrenWorkflowStep extends WorkflowStepDefinition implement }; - List matches = runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, instructionsForResuming, subworkflowTargetForResuming, + List matchesReturned = runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, instructionsForResuming, subworkflowTargetForResuming, "Matching items against children", stepState.matchCheck, MATCH_CHECK_WORKFLOW, () -> new CustomWorkflowStep(MutableList.of( "transform ${identifier_expression} | resolve_expression | set id", @@ -292,10 +300,13 @@ public class UpdateChildrenWorkflowStep extends WorkflowStepDefinition implement list -> (List) list.stream().map(m -> m instanceof Entity ? new TransientEntityReference((Entity)m) : m).collect(Collectors.toList()) ); List<Map<String,Object>> stringMatchesToCreate = MutableList.of(); - for (int i=0; i<matches.size(); i++) { - Object m = matches.get(i); - if (m instanceof String) { - stringMatchesToCreate.add(MutableMap.of("match", m, "item", stepState.items.get(i), "index", i)); + Set matchesUnhandled = MutableSet.of(); + for (int i=0; i<matchesReturned.size(); i++) { + Object m = matchesReturned.get(i); + if (Boxing.isPrimitiveOrStringOrBoxedObject(m)) { + stringMatchesToCreate.add(MutableMap.of("match", m.toString(), "item", stepState.items.get(i), "index", i)); + } else if (m!=null) { + matchesUnhandled.add(m); } } List<Map> addedChildren = runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, instructionsForResuming, subworkflowTargetForResuming, @@ -340,13 +351,17 @@ public class UpdateChildrenWorkflowStep extends WorkflowStepDefinition implement list -> list.size()); List<Map<String,Object>> onUpdateTargets = MutableList.copyOf(onCreateTargets); - for (int i=0; i<matches.size(); i++) { - Object m = matches.get(i); + Iterator matchesUnhandledI = matchesUnhandled.iterator(); + for (int i=0; i<matchesUnhandled.size(); i++) { + Object m = matchesUnhandledI.next(); if (m instanceof TransientEntityReference) { m = ((TransientEntityReference)m).getEntity(mgmt); } if (m instanceof Entity) { onUpdateTargets.add(MutableMap.of("child", m, "item", stepState.items.get(i), "index", i)); + } else { + DynamicTasks.queueIfPossible(Tasks.warning("Unexpected match check result ("+m+"); ignoring", null, true)) + .orSubmitAsync(context.getEntity()); } } runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, instructionsForResuming, subworkflowTargetForResuming, @@ -359,13 +374,23 @@ public class UpdateChildrenWorkflowStep extends WorkflowStepDefinition implement "condition", MutableMap.of("target", "${child.effector.on_update}") )) ), checkWorkflow -> outerWorkflowCustomers.apply(checkWorkflow, - // TODO run _on_ each child foreach -> { foreach.setTarget(onUpdateTargets); foreach.setTargetVarName("{child,item,index}"); }), list -> list.size()); + runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, instructionsForResuming, subworkflowTargetForResuming, + "Calling on_update_child on item-matched children ("+onUpdateTargets.size()+")", stepState.onUpdateChild, ON_UPDATE_CHILD_WORKFLOW, + () -> null, + checkWorkflow -> outerWorkflowCustomers.apply(checkWorkflow, + foreach -> { + foreach.setTarget(onUpdateTargets); + foreach.setTargetVarName("{child,item,index}"); + foreach.setTargetEntityKey("child"); + }), + list -> list.size()); + Map<String,Entity> oldChildren = MutableMap.of(); stepState.parent.getChildren().forEach(c -> oldChildren.put(c.getId(), c)); onUpdateTargets.forEach(c -> oldChildren.remove( ((Entity)c.get("child")).getId()) ); @@ -428,17 +453,19 @@ public class UpdateChildrenWorkflowStep extends WorkflowStepDefinition implement if (checkWorkflow == null) { checkWorkflow = defaultWorkflow.get(); } - ConfigBag outerWorkflowConfig = outerWorkflowConfigFn.apply(checkWorkflow); - - WorkflowExecutionContext matchWorkflow = WorkflowExecutionContext.newInstanceUnpersistedWithParent( - context.getEntity(), context.getWorkflowExectionContext(), WorkflowExecutionContext.WorkflowContextType.NESTED_WORKFLOW, - name, - outerWorkflowConfig, null, null, null); - stepSubState.workflowTag = BrooklynTaskTags.tagForWorkflow(matchWorkflow); - WorkflowReplayUtils.addNewSubWorkflow(context, stepSubState.workflowTag); - setStepState(context, stepState); - - stepSubState.result = postprocess.apply((List) DynamicTasks.queue(matchWorkflow.getTask(true).get()).getUnchecked()); + if (checkWorkflow!=null) { + ConfigBag outerWorkflowConfig = outerWorkflowConfigFn.apply(checkWorkflow); + + WorkflowExecutionContext matchWorkflow = WorkflowExecutionContext.newInstanceUnpersistedWithParent( + context.getEntity(), context.getWorkflowExectionContext(), WorkflowExecutionContext.WorkflowContextType.NESTED_WORKFLOW, + name, + outerWorkflowConfig, null, null, null); + stepSubState.workflowTag = BrooklynTaskTags.tagForWorkflow(matchWorkflow); + WorkflowReplayUtils.addNewSubWorkflow(context, stepSubState.workflowTag); + setStepState(context, stepState); + + stepSubState.result = postprocess.apply((List) DynamicTasks.queue(matchWorkflow.getTask(true).get()).getUnchecked()); + } } else { stepSubState.result = postprocess.apply((List) WorkflowReplayUtils.replayResumingInSubWorkflow("workflow effector", context, subworkflowTargetForResuming, instructionsForResuming, (w, e)-> { diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java index 0d205be016..102ee665dc 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java @@ -40,6 +40,8 @@ public class ForeachWorkflowStep extends CustomWorkflowStep { public void setTarget(Object x) { this.target = x; } public void setTargetVarName(Object x) { this.target_var_name = x; } + public void setTargetIndexVarName(Object x) { this.target_index_var_name = x; } + public void setTargetEntityKey(String x) { this.target_entity_key = x; } @Override public void populateFromShorthand(String value) { diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowUpdateChildrenStepTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowUpdateChildrenStepTest.java index 50604ab796..e7050ac473 100644 --- a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowUpdateChildrenStepTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowUpdateChildrenStepTest.java @@ -127,6 +127,29 @@ public class WorkflowUpdateChildrenStepTest extends BrooklynMgmtUnitTestSupport Asserts.assertEquals(childrenNames, MutableSet.of("name1", "name2")); } + @Test + public void testOnUpdateChild() { + WorkflowExecutionContext execution = WorkflowBasicTest.runWorkflow(app, Strings.lines( + "- step: let items", + " value:", + " - x_id: one", + " x_name: name1", + " - x_id: two", + " x_name: name2", + "- step: update-children type " + BasicEntity.class.getName() + " id ${item.x_id} from ${items}", + " on_update_child:", + " - set-entity-name ${item.x_name}-${index}", + ""), + "set entity name using on_update_child"); + execution.getTask(false).get().getUnchecked(); + + Set<String> childrenIds = app.getChildren().stream().map(c -> c.config().get(BrooklynConfigKeys.PLAN_ID)).collect(Collectors.toSet()); + Asserts.assertEquals(childrenIds, MutableSet.of("one", "two")); + + Set<String> childrenNames = app.getChildren().stream().map(c -> c.getDisplayName()).collect(Collectors.toSet()); + Asserts.assertEquals(childrenNames, MutableSet.of("name1-0", "name2-1")); + } + @Test public void testCustomMatch() { WorkflowExecutionContext execution = WorkflowBasicTest.runWorkflow(app, Strings.lines( @@ -151,6 +174,19 @@ public class WorkflowUpdateChildrenStepTest extends BrooklynMgmtUnitTestSupport Asserts.assertEquals(childrenIds, MutableSet.of("ONE", "two")); } + @Test + public void testNumericId() { + WorkflowExecutionContext execution = WorkflowBasicTest.runWorkflow(app, Strings.lines( + "- step: let items", + " value:", + " - x_id: 1", + " - x_id: two", + "- update-children type " + BasicEntity.class.getName() + " id ${item.x_id} from ${items}"), + "first run at children"); + execution.getTask(false).get().getUnchecked(); + Asserts.assertSize(app.getChildren(), 2); + } + @Test public void testStaticIdentifierGivesError() { WorkflowExecutionContext execution = WorkflowBasicTest.runWorkflow(app, Strings.lines(