This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit 9ef5dcc7a40add81414786fa2be84e19084f71f3
Author: Alex Heneveld <a...@cloudsoft.io>
AuthorDate: Mon Apr 1 15:29:15 2024 +0100

    pause most tasks submitted during entity startup, until all entities are 
managed
    
    can affect new tasks and especially rebind if an adjunct is started and 
tries to access another entity
    which hasn't yet started up; particularly an issue if there is workflow 
which will be checkpointed,
    and another persisted workflow references some other entity as a parent 
workflow
---
 .../brooklyn/api/mgmt/ManagementContext.java       |  9 ++--
 .../camp/brooklyn/SshCommandSensorYamlTest.java    | 19 +++++---
 .../mgmt/internal/EntityManagementSupport.java     | 34 ++++++++-----
 .../core/mgmt/internal/LocalManagementContext.java |  3 +-
 .../util/core/task/BasicExecutionContext.java      | 55 +++++++++++++++++-----
 .../brooklyn/core/mgmt/ha/HotStandbyTest.java      |  2 +-
 .../system_service/SystemServiceEnricher.java      |  3 +-
 7 files changed, 86 insertions(+), 39 deletions(-)

diff --git 
a/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java 
b/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
index 356533e089..8052af3961 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
@@ -156,12 +156,9 @@ public interface ManagementContext {
      */
     SubscriptionManager getSubscriptionManager();
 
-    //TODO (Alex) I'm not sure the following two getXxxContext methods are 
needed on the interface;
-    //I expect they will only be called once, in AbstractEntity, and fully 
capable
-    //there of generating the respective contexts from the managers
-    //(Litmus test will be whether there is anything in 
FederatedManagementContext
-    //which requires a custom FederatedExecutionContext -- or whether BasicEC 
-    //works with FederatedExecutionManager)
+    // not sure the following two getXxxContext methods are desired on this 
interface;
+    // almost everyone should use the entity.getExecutionContext() which is a 
shared instance with better blocking;
+    // this should just be called to initialize and for special cases where we 
want to bypass that startup blocking
     /**
      * Returns an {@link ExecutionContext} instance representing tasks 
      * (from the {@link ExecutionManager}) associated with this entity, and 
capable 
diff --git 
a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/SshCommandSensorYamlTest.java
 
b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/SshCommandSensorYamlTest.java
index 644909b555..9bb4efa49c 100644
--- 
a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/SshCommandSensorYamlTest.java
+++ 
b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/SshCommandSensorYamlTest.java
@@ -31,6 +31,7 @@ import 
org.apache.brooklyn.core.entity.RecordingSensorEventListener;
 import org.apache.brooklyn.core.sensor.Sensors;
 import org.apache.brooklyn.core.test.entity.TestApplication;
 import org.apache.brooklyn.entity.software.base.VanillaSoftwareProcess;
+import org.apache.brooklyn.entity.stock.BasicApplication;
 import org.apache.brooklyn.test.Asserts;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.internal.ssh.RecordingSshTool;
@@ -38,6 +39,7 @@ import 
org.apache.brooklyn.util.core.internal.ssh.RecordingSshTool.CustomRespons
 import org.apache.brooklyn.util.core.internal.ssh.RecordingSshTool.ExecParams;
 import org.apache.brooklyn.util.text.Strings;
 import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.BeforeMethod;
@@ -232,10 +234,9 @@ public class SshCommandSensorYamlTest extends 
AbstractYamlTest {
 
     @Test(groups="Integration") // because slow
     public void 
testSshCommandSensorPeriodicFeedServiceUpFalseDoesNotRunAtStartup() throws 
Exception {
-        RecordingSshTool.setCustomResponse(".*myCommand.*", new 
RecordingSshTool.CustomResponse(0, "myResponse", null));
+        RecordingSshTool.setCustomResponse(".*myCommand.*", new 
RecordingSshTool.CustomResponse(0, "myResponse0", null));
 
-        Stopwatch sw = Stopwatch.createStarted();
-        Entity app = createAndStartApplication(
+        BasicApplication app = (BasicApplication) createAndStartApplication(
                 "location:",
                 "  localhost:",
                 "    sshToolClass: "+RecordingSshTool.class.getName(),
@@ -248,13 +249,19 @@ public class SshCommandSensorYamlTest extends 
AbstractYamlTest {
                 "    brooklyn.config:",
                 "      name: mySensor",
                 "      command: myCommand",
-                "      period: 5s",
+                "      period: 2s",
                 "      onlyIfServiceUp: true");
         waitForApplicationTasks(app);
 
         VanillaSoftwareProcess entity = (VanillaSoftwareProcess) 
Iterables.getOnlyElement(app.getChildren());
-        EntityAsserts.assertAttributeEqualsEventually(entity, 
Sensors.newStringSensor("mySensor"), "myResponse");
-        Asserts.assertThat(Duration.of(sw), d -> 
d.isLongerThan(Duration.seconds(4)));
+        EntityAsserts.assertAttributeEqualsEventually(entity, 
Sensors.newStringSensor("mySensor"), "myResponse0");
+
+        // once run once, it shouldn't run again for 2s (plus or minus 1s 
tolerance here)
+        Stopwatch sw = Stopwatch.createStarted();
+        RecordingSshTool.setCustomResponse(".*myCommand.*", new 
RecordingSshTool.CustomResponse(0, "myResponse1", null));
+        EntityAsserts.assertAttributeEqualsEventually(entity, 
Sensors.newStringSensor("mySensor"), "myResponse1");
+        Asserts.assertThat(Duration.of(sw), d -> 
d.isLongerThan(Duration.seconds(1)));
+        Asserts.assertThat(Duration.of(sw), d -> 
d.isShorterThan(Duration.seconds(3)));
     }
 
     @Test
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
index c3ea682c3e..366b9ab05d 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 import org.apache.brooklyn.api.effector.Effector;
@@ -52,6 +53,7 @@ import org.apache.brooklyn.core.objs.AbstractEntityAdjunct;
 import org.apache.brooklyn.core.workflow.DanglingWorkflowException;
 import org.apache.brooklyn.core.workflow.WorkflowExecutionContext;
 import 
org.apache.brooklyn.core.workflow.store.WorkflowStatePersistenceViaSensors;
+import org.apache.brooklyn.util.core.task.BasicExecutionContext;
 import org.apache.brooklyn.util.core.task.DynamicTasks;
 import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
@@ -231,10 +233,11 @@ public class EntityManagementSupport {
                         this.managementContext = info.getManagementContext();
                         
nonDeploymentManagementContext.setMode(NonDeploymentManagementContextMode.MANAGEMENT_STARTING);
 
-                        if (!isReadOnly()) {
-                            
nonDeploymentManagementContext.getSubscriptionManager().setDelegate((AbstractSubscriptionManager)
 managementContext.getSubscriptionManager());
-                            
nonDeploymentManagementContext.getSubscriptionManager().startDelegatingForSubscribing();
-                        }
+                        // defer this until mgmt context started, so all other 
entities will be known, in case they are accessed in the tasks
+//                        if (!isReadOnly()) {
+//                            
nonDeploymentManagementContext.getSubscriptionManager().setDelegate((AbstractSubscriptionManager)
 managementContext.getSubscriptionManager());
+//                            
nonDeploymentManagementContext.getSubscriptionManager().startDelegatingForSubscribing();
+//                        }
 
                         managementContextUsable.set(true);
                         currentlyDeployed.set(true);
@@ -280,9 +283,13 @@ public class EntityManagementSupport {
                         entity.onManagementStarting();
 
                         // start those policies etc which are labelled as 
auto-start
-                        entity.policies().forEach(adj -> { if (adj instanceof 
EntityAdjunct.AutoStartEntityAdjunct) 
((EntityAdjunct.AutoStartEntityAdjunct)adj).start(); });
-                        entity.enrichers().forEach(adj -> { if (adj instanceof 
EntityAdjunct.AutoStartEntityAdjunct) 
((EntityAdjunct.AutoStartEntityAdjunct)adj).start(); });
-                        entity.feeds().forEach(f -> { if (!f.isActivated()) 
f.start(); });
+                        BiConsumer<String,Runnable> queueTask = (name, r) -> 
entity.getExecutionContext().submit(name, r);
+                        entity.policies().forEach(adj -> { if (adj instanceof 
EntityAdjunct.AutoStartEntityAdjunct)
+                            queueTask.accept("Start policy "+adj, 
((EntityAdjunct.AutoStartEntityAdjunct)adj)::start); });
+                        entity.enrichers().forEach(adj -> { if (adj instanceof 
EntityAdjunct.AutoStartEntityAdjunct)
+                            queueTask.accept("Start enricher "+adj, 
((EntityAdjunct.AutoStartEntityAdjunct)adj)::start); });
+                        entity.feeds().forEach(f -> { if (!f.isActivated())
+                            queueTask.accept("Start feed "+f, f::start); });
 
                         if (AUTO_FAIL_AND_RESUME_WORKFLOWS) {
                             // resume any workflows that were dangling due to 
shutdown
@@ -293,7 +300,7 @@ public class EntityManagementSupport {
                                     .collect(Collectors.toList());
                             if (!shutdownInterruptedWorkflows.isEmpty()) {
                                 log.debug("Discovered workflows noted as 
'interrupted' on startup at "+entity+", will resume as dangling: 
"+shutdownInterruptedWorkflows);
-                                
entity.getExecutionContext().submit(DynamicTasks.of("Resuming with failure " + 
shutdownInterruptedWorkflows.size() + " interrupted workflow" + 
(shutdownInterruptedWorkflows.size() != 1 ? "s" : ""), () -> {
+                                
getManagementContext().getExecutionContext(entity).submit(DynamicTasks.of("Resuming
 with failure " + shutdownInterruptedWorkflows.size() + " interrupted workflow" 
+ (shutdownInterruptedWorkflows.size() != 1 ? "s" : ""), () -> {
                                     shutdownInterruptedWorkflows.forEach(w -> {
                                         // these are backgrounded because they 
are expected to fail
                                         // we also have to wait until mgmt is 
complete
@@ -346,9 +353,7 @@ public class EntityManagementSupport {
             /* on start, we want to:
              * - set derived/inherited config values (not needed, the specs 
should have taken care of that?)
              * - publish all queued sensors (done below)
-             * - start all queued executions 
-             *   (e.g. subscription delivery - done below? are there others 
and if so how are they unlocked?
-             *   curious where the "start queued tasks" logic is; must be 
somewhere as it all seems to have been working fine (Aug 2016)) 
+             * - start all queued executions (unpause entity's execution 
context, subscription delivery)
              * [in exactly this order, at each entity]
              * then subsequent sensor events and executions occur directly (no 
queueing)
              * 
@@ -357,7 +362,10 @@ public class EntityManagementSupport {
              */                
             
             if (!isReadOnly()) {
+                
nonDeploymentManagementContext.getSubscriptionManager().setDelegate((AbstractSubscriptionManager)
 managementContext.getSubscriptionManager());
                 
nonDeploymentManagementContext.getSubscriptionManager().startDelegatingForPublishing();
+                
nonDeploymentManagementContext.getSubscriptionManager().startDelegatingForSubscribing();
+                ((BasicExecutionContext)getExecutionContext()).unpause();
             }
             
             if (!isReadOnly()) {
@@ -520,7 +528,9 @@ public class EntityManagementSupport {
         if (managementContextUsable.get()) {
             synchronized (this) {
                 if (executionContext!=null) return executionContext;
-                executionContext = 
managementContext.getExecutionContext(entity);
+                ExecutionContext newExecutionContext = 
managementContext.getExecutionContext(entity);
+                ((BasicExecutionContext)newExecutionContext).pause(); // start 
paused, so things don't run until mgmt is started, and all entities known
+                executionContext = newExecutionContext;
                 return executionContext;
             }
         }
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalManagementContext.java
 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalManagementContext.java
index db284614f1..4fb32e5230 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalManagementContext.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalManagementContext.java
@@ -47,6 +47,7 @@ import org.apache.brooklyn.api.mgmt.TaskAdaptable;
 import org.apache.brooklyn.api.mgmt.ha.ManagementNodeState;
 import org.apache.brooklyn.core.BrooklynFeatureEnablement;
 import org.apache.brooklyn.core.effector.Effectors;
+import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.entity.drivers.downloads.BasicDownloadsManager;
 import org.apache.brooklyn.core.internal.BrooklynInitialization;
 import org.apache.brooklyn.core.internal.BrooklynProperties;
@@ -360,7 +361,7 @@ public class LocalManagementContext extends 
AbstractManagementContext {
     }
 
     protected <T> Task<T> runAtEntity(Entity entity, TaskAdaptable<T> task) {
-        getExecutionContext(entity).submit(task);
+        ((EntityInternal)entity).getExecutionContext().submit(task);
         if (DynamicTasks.getTaskQueuingContext()!=null) {
             // put it in the queueing context so it appears in the GUI
             // mark it inessential as this is being invoked from code,
diff --git 
a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
 
b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
index dc5dfad97f..b0757bacc8 100644
--- 
a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
+++ 
b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
@@ -18,10 +18,6 @@
  */
 package org.apache.brooklyn.util.core.task;
 
-import com.google.common.annotations.Beta;
-import com.google.common.base.Function;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Iterables;
 import java.lang.reflect.Proxy;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -29,6 +25,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
@@ -39,6 +36,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Function;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Iterables;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.mgmt.ExecutionContext;
 import org.apache.brooklyn.api.mgmt.ExecutionManager;
@@ -51,6 +53,7 @@ import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedItem;
 import org.apache.brooklyn.core.mgmt.entitlement.Entitlements;
+import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.collections.MutableSet;
 import 
org.apache.brooklyn.util.core.task.BasicExecutionManager.BrooklynTaskLoggingMdc;
@@ -60,6 +63,7 @@ import org.apache.brooklyn.util.guava.Maybe;
 import org.apache.brooklyn.util.javalang.Threads;
 import org.apache.brooklyn.util.time.CountdownTimer;
 import org.apache.brooklyn.util.time.Duration;
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -404,17 +408,44 @@ public class BasicExecutionContext extends 
AbstractExecutionContext {
             });
         }
 
-        if (task instanceof Task) {
-            return executionManager.submit(properties, (Task)task);
-        } else if (task instanceof Callable) {
-            return executionManager.submit(properties, (Callable)task);
-        } else if (task instanceof Runnable) {
-            return (Task<T>) executionManager.submit(properties, 
(Runnable)task);
-        } else {
-            throw new IllegalArgumentException("Unhandled task type: 
task="+task+"; type="+(task!=null ? task.getClass() : "null"));
+        return submitViaExecutionManagerOrHold(task, properties);
+    }
+
+    boolean paused = false;
+    List<Pair<Task,Map>> tasksQueuedWhilePaused = MutableList.of();
+    public void pause() {
+        this.paused = true;
+    }
+    public void unpause() {
+        synchronized (tasksQueuedWhilePaused) {
+            tasksQueuedWhilePaused.forEach(pair -> 
submitWithoutCheckingPaused(pair.getLeft(), pair.getRight()));
+            tasksQueuedWhilePaused.clear();
+            this.paused = false;
         }
     }
 
+    private <T> Task submitViaExecutionManagerOrHold(Object task, Map 
properties) {
+        Task taskT = null;
+        if (task instanceof Task) taskT = (Task) task;
+        else if (task instanceof TaskAdaptable) taskT = ((TaskAdaptable) 
task).asTask();
+        else if (task instanceof Callable) taskT = new BasicTask(properties, 
(Callable)task);
+        else if (task instanceof Runnable) taskT = new BasicTask(properties, 
(Runnable)task);
+        else throw new IllegalArgumentException("Unhandled task type: task="+ 
task +"; type="+(task !=null ? task.getClass() : "null"));
+
+        if (paused) {
+            synchronized (tasksQueuedWhilePaused) {
+                if (paused) {
+                    tasksQueuedWhilePaused.add(Pair.of(taskT, properties));
+                    return taskT;
+                }
+            }
+        }
+        return submitWithoutCheckingPaused(taskT, properties);
+    }
+    private <T> Task submitWithoutCheckingPaused(Task task, Map properties) {
+        return executionManager.submit(properties, task);
+    }
+
     private String idStack(Entity target) {
         Deque<String> ids = new ArrayDeque<>();
         Entity e = target;
diff --git 
a/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HotStandbyTest.java 
b/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HotStandbyTest.java
index f1e7a41b0b..87570bc983 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HotStandbyTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/ha/HotStandbyTest.java
@@ -626,7 +626,7 @@ public class HotStandbyTest {
         forcePersistNow(n1);
         Assert.assertTrue(entity.feeds().getFeeds().size() > 0, "Feeds: 
"+entity.feeds().getFeeds());
         for (Feed feed : entity.feeds().getFeeds()) {
-            assertTrue(feed.isRunning(), "Feed expected running, but it is 
non-running");
+            Asserts.eventually(() -> feed, Feed::isRunning, 
Duration.seconds(2), Duration.millis(10), "Feed expected running, but it is 
non-running");
         }
 
         HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
diff --git 
a/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java
 
b/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java
index ac9b4122ae..cfcf407460 100644
--- 
a/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java
+++ 
b/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java
@@ -29,6 +29,7 @@ import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.effector.EffectorTasks;
 import org.apache.brooklyn.core.enricher.AbstractEnricher;
 import org.apache.brooklyn.core.entity.Attributes;
+import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedStream;
 import org.apache.brooklyn.entity.software.base.SoftwareProcess;
@@ -128,6 +129,6 @@ public class SystemServiceEnricher extends AbstractEnricher 
implements Enricher
     }
 
     ExecutionContext getEntityExecutionContext() {
-        return getManagementContext().getExecutionContext(entity);
+        return ((EntityInternal)entity).getExecutionContext();
     }
 }

Reply via email to