Don't try to re-submit DST secondary tasks if already executed

Effectors invoked against an entity will be submitted twice - once in the 
entity's context and again in the TaskQueueingContext of the caller (see 
brooklyn.management.internal.LocalManagementContext.runAtEntity(Entity, 
TaskAdaptable<T>)). Usually it's not a problem because the ExecutionContext 
will notice it's already submitted and ignore it. But if the entity is 
unmanaged in the time being then the call to get the entity's ExecutionContext 
will fail.


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/160b3ca0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/160b3ca0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/160b3ca0

Branch: refs/heads/master
Commit: 160b3ca0fc6fefd9ef5b6e6929c7d3594436cf09
Parents: 807e6d1
Author: Svetoslav Neykov <[email protected]>
Authored: Tue Jul 7 17:24:24 2015 +0300
Committer: Svetoslav Neykov <[email protected]>
Committed: Tue Jul 7 17:29:16 2015 +0300

----------------------------------------------------------------------
 .../entity/group/DynamicClusterImpl.java        |  6 +-
 .../util/task/DynamicSequentialTask.java        | 10 ++-
 .../entity/effector/EffectorTaskTest.java       | 79 ++++++++++++++++++++
 3 files changed, 88 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/160b3ca0/core/src/main/java/brooklyn/entity/group/DynamicClusterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/group/DynamicClusterImpl.java 
b/core/src/main/java/brooklyn/entity/group/DynamicClusterImpl.java
index 447a4bb..0e2f164 100644
--- a/core/src/main/java/brooklyn/entity/group/DynamicClusterImpl.java
+++ b/core/src/main/java/brooklyn/entity/group/DynamicClusterImpl.java
@@ -874,11 +874,7 @@ public class DynamicClusterImpl extends AbstractGroupImpl 
implements DynamicClus
         try {
             if (member instanceof Startable) {
                 Task<?> task = member.invoke(Startable.STOP, 
Collections.<String,Object>emptyMap());
-                try {
-                    task.get();
-                } catch (Exception e) {
-                    throw Exceptions.propagate(e);
-                }
+                task.getUnchecked();
             }
         } finally {
             Entities.unmanage(member);

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/160b3ca0/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java 
b/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java
index 967c831..7756388 100644
--- a/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java
+++ b/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java
@@ -209,12 +209,18 @@ public class DynamicSequentialTask<T> extends 
BasicTask<T> implements HasTaskChi
             throw new IllegalStateException(message);
         }
         synchronized (task) {
-            if (task.isSubmitted() && !task.isDone()) {
+            if (task.isSubmitted()) {
                 if (log.isTraceEnabled()) {
                     log.trace("DST "+this+" skipping submission of child 
"+task+" because it is already submitted");
                 }
             } else {
-                ec.submit(task);
+                try {
+                    ec.submit(task);
+                } catch (Exception e) {
+                    Exceptions.propagateIfFatal(e);
+                    // Give some context when the submit fails (happens when 
the target is already unmanaged)
+                    throw new IllegalStateException("Failure submitting task 
"+task+" in "+this+": "+e.getMessage(), e);
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/160b3ca0/core/src/test/java/brooklyn/entity/effector/EffectorTaskTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/effector/EffectorTaskTest.java 
b/core/src/test/java/brooklyn/entity/effector/EffectorTaskTest.java
index e9584db..bc7677c 100644
--- a/core/src/test/java/brooklyn/entity/effector/EffectorTaskTest.java
+++ b/core/src/test/java/brooklyn/entity/effector/EffectorTaskTest.java
@@ -19,6 +19,7 @@
 package brooklyn.entity.effector;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -28,6 +29,7 @@ import brooklyn.entity.Effector;
 import brooklyn.entity.Entity;
 import brooklyn.entity.basic.AbstractEntity;
 import brooklyn.entity.basic.BrooklynTaskTags;
+import brooklyn.entity.basic.Entities;
 import brooklyn.entity.basic.EntityInternal;
 import brooklyn.entity.effector.EffectorTasks.EffectorTaskFactory;
 import brooklyn.entity.proxying.EntitySpec;
@@ -37,12 +39,14 @@ import brooklyn.management.Task;
 import brooklyn.test.entity.TestEntity;
 import brooklyn.util.collections.MutableMap;
 import brooklyn.util.config.ConfigBag;
+import brooklyn.util.exceptions.Exceptions;
 import brooklyn.util.task.DynamicSequentialTask;
 import brooklyn.util.task.DynamicTasks;
 import brooklyn.util.task.TaskBuilder;
 import brooklyn.util.task.Tasks;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 
 public class EffectorTaskTest extends BrooklynAppUnitTestSupport {
@@ -53,6 +57,7 @@ public class EffectorTaskTest extends 
BrooklynAppUnitTestSupport {
             .description("doubles the given number")
             .parameter(Integer.class, "numberToDouble")
             .impl(new EffectorBody<Integer>() {
+                @Override
                 public Integer call(ConfigBag parameters) {
                     // do a sanity check
                     Assert.assertNotNull(entity());
@@ -352,4 +357,78 @@ public class EffectorTaskTest extends 
BrooklynAppUnitTestSupport {
         
         Assert.assertEquals(doubler.invoke(DoublingEntity.DOUBLE, 
MutableMap.of("numberToDouble", 3, "numberToStartWith", 3)).get(), (Integer)7);
     }
+    
+    public static final Effector<Void> DUMMY = Effectors.effector(Void.class, 
"dummy")
+            .impl(new EffectorBody<Void>() {
+                @Override
+                public Void call(ConfigBag parameters) {
+                    return null;
+                }
+            })
+            .build();
+    
+    public static final Effector<Void> STALL = Effectors.effector(Void.class, 
"stall")
+            .parameter(AtomicBoolean.class, "lock")
+            .impl(new EffectorBody<Void>() {
+                @Override
+                public Void call(ConfigBag parameters) {
+                    AtomicBoolean lock = 
(AtomicBoolean)parameters.getStringKey("lock");
+                    synchronized(lock) {
+                        if (!lock.get()) {
+                            try {
+                                lock.wait();
+                            } catch (InterruptedException e) {
+                                Exceptions.propagate(e);
+                            }
+                        }
+                    }
+                    return null;
+                }
+            })
+            .build();
+
+    public static final Effector<Void> CONTEXT = 
Effectors.effector(Void.class, "stall_caller")
+            .parameter(AtomicBoolean.class, "lock")
+            .impl(new EffectorBody<Void>() {
+                @Override
+                public Void call(ConfigBag parameters) {
+                    Entity child = 
Iterables.getOnlyElement(entity().getChildren());
+                    AtomicBoolean lock = new AtomicBoolean();
+                    Task<Void> dummyTask = null;
+
+                    try {
+                        // Queue a (DST secondary) task which waits until 
notified, so that tasks queued later will get blocked
+                        queue(Effectors.invocation(entity(), STALL, 
ImmutableMap.of("lock", lock)));
+    
+                        // Start a new task - submitted directly to child's 
ExecutionContext, as well as added as a
+                        // DST secondary of the current effector.
+                        dummyTask = child.invoke(DUMMY, ImmutableMap.<String, 
Object>of());
+                        dummyTask.getUnchecked();
+
+                        // Execution completed in the child's 
ExecutionContext, but still queued as a secondary.
+                        // Destroy the child entity so that no subsequent 
tasks can be executed in its context.
+                        Entities.destroy(child);
+                    } finally {
+                        // Let STALL complete
+                        synchronized(lock) {
+                            lock.set(true);
+                            lock.notifyAll();
+                        }
+                        // At this point DUMMY will be unblocked and the DST 
will try to execute it as a secondary.
+                        // Submission will be ignored because DUMMY already 
executed.
+                        // If it's not ignored then submission will fail 
because entity is already unmanaged.
+                    }
+                    return null;
+                }
+            })
+            .build();
+    
+
+    @Test
+    public void testNestedEffectorExecutedAsSecondaryTask() throws Exception {
+        app.createAndManageChild(EntitySpec.create(TestEntity.class));
+        Task<Void> effTask = app.invoke(CONTEXT, ImmutableMap.<String, 
Object>of());
+        effTask.get();
+    }
+
 }

Reply via email to