Move subscriptions() from Entity to BrooklynObject - Add support for subscriptions to Location (and test) - For CatalogItem.subscriptions(), throw UnsupportedOperationException - Deprecate methods on AbstractEntityAdjunct, in preference for AbstractEntityAdjunct.subscriptions().*
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/4ad6cc96 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/4ad6cc96 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/4ad6cc96 Branch: refs/heads/master Commit: 4ad6cc967730001715ac3a97661106b5ed1b6c67 Parents: 2b29795 Author: Aled Sage <[email protected]> Authored: Mon Sep 21 10:05:11 2015 +0100 Committer: Aled Sage <[email protected]> Committed: Wed Sep 23 10:33:50 2015 +0100 ---------------------------------------------------------------------- .../brooklyn/api/catalog/CatalogItem.java | 12 + .../org/apache/brooklyn/api/entity/Entity.java | 51 ---- .../brooklyn/api/mgmt/ManagementContext.java | 9 + .../brooklyn/api/objs/BrooklynObject.java | 63 ++++- .../core/catalog/internal/CatalogItemDo.java | 11 +- .../internal/CatalogItemDtoAbstract.java | 14 +- .../brooklyn/core/entity/EntityInternal.java | 8 - .../core/location/AbstractLocation.java | 79 +++++- .../access/PortForwardManagerClient.java | 5 + .../internal/AbstractManagementContext.java | 11 +- .../mgmt/internal/BasicSubscriptionContext.java | 3 +- .../NonDeploymentManagementContext.java | 7 + .../core/objs/AbstractEntityAdjunct.java | 164 ++++++++++--- .../core/objs/BrooklynObjectInternal.java | 9 + .../core/location/LocationSubscriptionTest.java | 241 +++++++++++++++++++ 15 files changed, 580 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/api/src/main/java/org/apache/brooklyn/api/catalog/CatalogItem.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/brooklyn/api/catalog/CatalogItem.java b/api/src/main/java/org/apache/brooklyn/api/catalog/CatalogItem.java index 3758d08..bf806aa 100644 --- a/api/src/main/java/org/apache/brooklyn/api/catalog/CatalogItem.java +++ b/api/src/main/java/org/apache/brooklyn/api/catalog/CatalogItem.java @@ -48,6 +48,18 @@ public interface CatalogItem<T,SpecT> extends BrooklynObject, Rebindable { public boolean isNamed(); } + /** + * @throws UnsupportedOperationException; config not supported for catalog items + */ + @Override + ConfigurationSupport config(); + + /** + * @throws UnsupportedOperationException; subscriptions are not supported for catalog items + */ + @Override + SubscriptionSupport subscriptions(); + @Deprecated public static interface CatalogItemLibraries { Collection<String> getBundles(); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/api/src/main/java/org/apache/brooklyn/api/entity/Entity.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/brooklyn/api/entity/Entity.java b/api/src/main/java/org/apache/brooklyn/api/entity/Entity.java index 795218c..dd141f0 100644 --- a/api/src/main/java/org/apache/brooklyn/api/entity/Entity.java +++ b/api/src/main/java/org/apache/brooklyn/api/entity/Entity.java @@ -301,8 +301,6 @@ public interface Entity extends BrooklynObject { SensorSupport sensors(); - SubscriptionSupport subscriptions(); - PolicySupport policies(); EnricherSupport enrichers(); @@ -354,55 +352,6 @@ public interface Entity extends BrooklynObject { } @Beta - public interface SubscriptionSupport { - /** - * Allow us to subscribe to data from a {@link Sensor} on another entity. - * - * @return a subscription id which can be used to unsubscribe - * - * @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener) - */ - // FIXME remove from interface? - @Beta - <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener); - - /** @see SubscriptionManager#subscribeToChildren(Map, Entity, Sensor, SensorEventListener) */ - // FIXME remove from interface? - @Beta - <T> SubscriptionHandle subscribeToChildren(Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener); - - /** @see SubscriptionManager#subscribeToMembers(Group, Sensor, SensorEventListener) */ - // FIXME remove from interface? - @Beta - <T> SubscriptionHandle subscribeToMembers(Group group, Sensor<T> sensor, SensorEventListener<? super T> listener); - - /** - * Unsubscribes from the given producer. - * - * @see SubscriptionContext#unsubscribe(SubscriptionHandle) - */ - @Beta - boolean unsubscribe(Entity producer); - - /** - * Unsubscribes the given handle. - * - * @see SubscriptionContext#unsubscribe(SubscriptionHandle) - */ - @Beta - boolean unsubscribe(Entity producer, SubscriptionHandle handle); - - /** - * Unsubscribes the given handle. - * - * It is (currently) more efficient to also pass in the producer - - * see {@link BasicSubscriptionSupport#unsubscribe(Entity, SubscriptionHandle)} - */ - @Beta - boolean unsubscribe(SubscriptionHandle handle); - } - - @Beta public interface PolicySupport { /** * @return an immutable thread-safe view of the policies. http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java index ac4924e..f809fb2 100644 --- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java +++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java @@ -28,6 +28,7 @@ import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.drivers.DriverDependentEntity; import org.apache.brooklyn.api.entity.drivers.EntityDriverManager; import org.apache.brooklyn.api.entity.drivers.downloads.DownloadResolverManager; +import org.apache.brooklyn.api.location.Location; import org.apache.brooklyn.api.location.LocationRegistry; import org.apache.brooklyn.api.mgmt.entitlement.EntitlementManager; import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityManager; @@ -156,6 +157,14 @@ public interface ManagementContext { */ SubscriptionContext getSubscriptionContext(Entity entity); + /** + * Returns a {@link SubscriptionContext} instance representing subscriptions + * (from the {@link SubscriptionManager}) associated with this location, and capable + * of conveniently subscribing on behalf of that location + */ + @Beta + SubscriptionContext getSubscriptionContext(Location location); + @Beta // method may move to an internal interface; brooklyn users should not need to call this directly RebindManager getRebindManager(); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/api/src/main/java/org/apache/brooklyn/api/objs/BrooklynObject.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/brooklyn/api/objs/BrooklynObject.java b/api/src/main/java/org/apache/brooklyn/api/objs/BrooklynObject.java index c932f02..094586f 100644 --- a/api/src/main/java/org/apache/brooklyn/api/objs/BrooklynObject.java +++ b/api/src/main/java/org/apache/brooklyn/api/objs/BrooklynObject.java @@ -18,10 +18,20 @@ */ package org.apache.brooklyn.api.objs; +import java.util.Map; import java.util.Set; import javax.annotation.Nonnull; +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.Group; +import org.apache.brooklyn.api.mgmt.SubscriptionContext; +import org.apache.brooklyn.api.mgmt.SubscriptionHandle; +import org.apache.brooklyn.api.mgmt.SubscriptionManager; +import org.apache.brooklyn.api.sensor.Sensor; +import org.apache.brooklyn.api.sensor.SensorEventListener; + +import com.google.common.annotations.Beta; import com.google.common.collect.ImmutableMap; /** @@ -53,7 +63,13 @@ public interface BrooklynObject extends Identifiable, Configurable { * and they should be amenable to our persistence (on-disk serialization) and our JSON serialization in the REST API. */ TagSupport tags(); - + + /** + * Subscriptions are the mechanism for receiving notifications of sensor-events (e.g. attribute-changed) from + * other entities. + */ + SubscriptionSupport subscriptions(); + public interface TagSupport { /** * @return An immutable copy of the set of tags on this entity. @@ -70,4 +86,49 @@ public interface BrooklynObject extends Identifiable, Configurable { boolean removeTag(@Nonnull Object tag); } + + @Beta + public interface SubscriptionSupport { + /** + * Allow us to subscribe to data from a {@link Sensor} on another entity. + * + * @return a subscription id which can be used to unsubscribe + * + * @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener) + */ + @Beta + <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener); + + /** @see SubscriptionManager#subscribeToChildren(Map, Entity, Sensor, SensorEventListener) */ + @Beta + <T> SubscriptionHandle subscribeToChildren(Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener); + + /** @see SubscriptionManager#subscribeToMembers(Group, Sensor, SensorEventListener) */ + @Beta + <T> SubscriptionHandle subscribeToMembers(Group group, Sensor<T> sensor, SensorEventListener<? super T> listener); + + /** + * Unsubscribes from the given producer. + * + * @see SubscriptionContext#unsubscribe(SubscriptionHandle) + */ + @Beta + boolean unsubscribe(Entity producer); + + /** + * Unsubscribes the given handle. + * + * @see SubscriptionContext#unsubscribe(SubscriptionHandle) + */ + @Beta + boolean unsubscribe(Entity producer, SubscriptionHandle handle); + + /** + * Unsubscribes the given handle. + * + * It is (currently) more efficient to also pass in the producer - + * see {@link SubscriptionSupport#unsubscribe(Entity, SubscriptionHandle)} + */ + boolean unsubscribe(SubscriptionHandle handle); + } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDo.java b/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDo.java index 5029d8d..0545a06 100644 --- a/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDo.java +++ b/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDo.java @@ -25,6 +25,7 @@ import javax.annotation.Nullable; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.objs.BrooklynObjectInternal; +import org.apache.brooklyn.core.objs.BrooklynObjectInternal.SubscriptionSupportInternal; import org.apache.brooklyn.api.catalog.CatalogItem; import org.apache.brooklyn.api.mgmt.ManagementContext; import org.apache.brooklyn.api.mgmt.rebind.RebindSupport; @@ -49,13 +50,21 @@ public class CatalogItemDo<T,SpecT> implements CatalogItem<T,SpecT>, BrooklynObj } /** - * Config not supported for catalog item. See {@link #getPlanYaml()}. + * @throws UnsupportedOperationException; Config not supported for catalog item. See {@link #getPlanYaml()}. */ @Override public ConfigurationSupportInternal config() { throw new UnsupportedOperationException(); } + /** + * @throws UnsupportedOperationException; subscriptions are not supported for catalog items + */ + @Override + public SubscriptionSupportInternal subscriptions() { + throw new UnsupportedOperationException(); + } + @Override public <U> U setConfig(ConfigKey<U> key, U val) { return config().set(key, val); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDtoAbstract.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDtoAbstract.java b/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDtoAbstract.java index b281941..c950b7b 100644 --- a/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDtoAbstract.java +++ b/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDtoAbstract.java @@ -26,8 +26,6 @@ import java.util.Set; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.brooklyn.api.catalog.CatalogItem; import org.apache.brooklyn.api.mgmt.rebind.RebindSupport; import org.apache.brooklyn.api.mgmt.rebind.mementos.CatalogItemMemento; @@ -37,6 +35,8 @@ import org.apache.brooklyn.core.objs.AbstractBrooklynObject; import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.core.flags.FlagUtils; import org.apache.brooklyn.util.core.flags.SetFromFlag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Objects; import com.google.common.collect.ImmutableList; @@ -66,13 +66,21 @@ public abstract class CatalogItemDtoAbstract<T, SpecT> extends AbstractBrooklynO private @SetFromFlag boolean disabled; /** - * Config not supported for catalog item. See {@link #getPlanYaml()}. + * @throws UnsupportedOperationException; Config not supported for catalog item. See {@link #getPlanYaml()}. */ @Override public ConfigurationSupportInternal config() { throw new UnsupportedOperationException(); } + /** + * @throws UnsupportedOperationException; subscriptions are not supported for catalog items + */ + @Override + public SubscriptionSupportInternal subscriptions() { + throw new UnsupportedOperationException(); + } + @Override public <U> U setConfig(ConfigKey<U> key, U val) { return config().set(key, val); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/entity/EntityInternal.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/EntityInternal.java b/core/src/main/java/org/apache/brooklyn/core/entity/EntityInternal.java index 4fa9c67..49dfa00 100644 --- a/core/src/main/java/org/apache/brooklyn/core/entity/EntityInternal.java +++ b/core/src/main/java/org/apache/brooklyn/core/entity/EntityInternal.java @@ -184,9 +184,6 @@ public interface EntityInternal extends BrooklynObjectInternal, EntityLocal, Reb SensorSupportInternal sensors(); @Override - SubscriptionSupportInternal subscriptions(); - - @Override PolicySupportInternal policies(); @Override @@ -230,11 +227,6 @@ public interface EntityInternal extends BrooklynObjectInternal, EntityLocal, Reb } @Beta - public interface SubscriptionSupportInternal extends Entity.SubscriptionSupport { - public void unsubscribeAll(); - } - - @Beta public interface PolicySupportInternal extends Entity.PolicySupport { /** * Removes all policy from this entity. http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/location/AbstractLocation.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/location/AbstractLocation.java b/core/src/main/java/org/apache/brooklyn/core/location/AbstractLocation.java index 507e7f5..b8859d6 100644 --- a/core/src/main/java/org/apache/brooklyn/core/location/AbstractLocation.java +++ b/core/src/main/java/org/apache/brooklyn/core/location/AbstractLocation.java @@ -29,12 +29,18 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.Group; import org.apache.brooklyn.api.location.Location; import org.apache.brooklyn.api.location.LocationSpec; +import org.apache.brooklyn.api.mgmt.SubscriptionContext; +import org.apache.brooklyn.api.mgmt.SubscriptionHandle; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.api.mgmt.rebind.RebindSupport; import org.apache.brooklyn.api.mgmt.rebind.mementos.LocationMemento; import org.apache.brooklyn.api.objs.Configurable; +import org.apache.brooklyn.api.sensor.Sensor; +import org.apache.brooklyn.api.sensor.SensorEventListener; import org.apache.brooklyn.config.ConfigInheritance; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.config.ConfigKey.HasConfigKey; @@ -50,17 +56,19 @@ import org.apache.brooklyn.core.location.internal.LocationDynamicType; import org.apache.brooklyn.core.location.internal.LocationInternal; import org.apache.brooklyn.core.mgmt.internal.LocalLocationManager; import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal; +import org.apache.brooklyn.core.mgmt.internal.SubscriptionTracker; import org.apache.brooklyn.core.mgmt.rebind.BasicLocationRebindSupport; import org.apache.brooklyn.core.objs.AbstractBrooklynObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.brooklyn.util.collections.SetFromLiveMap; import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.core.flags.FlagUtils; import org.apache.brooklyn.util.core.flags.TypeCoercions; import org.apache.brooklyn.util.guava.Maybe; import org.apache.brooklyn.util.stream.Streams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.common.annotations.Beta; import com.google.common.base.Objects; import com.google.common.base.Objects.ToStringHelper; import com.google.common.base.Splitter; @@ -110,8 +118,13 @@ public abstract class AbstractLocation extends AbstractBrooklynObject implements private BasicConfigurationSupport config = new BasicConfigurationSupport(); + private BasicSubscriptionSupport subscriptions = new BasicSubscriptionSupport(); + private ConfigBag configBag = new ConfigBag(); + /** not for direct access; refer to as 'subscriptionTracker' via getter so that it is initialized */ + protected transient SubscriptionTracker _subscriptionTracker; + private volatile boolean managed; private boolean inConstruction; @@ -354,7 +367,16 @@ public abstract class AbstractLocation extends AbstractBrooklynObject implements @Override public ConfigurationSupportInternal config() { - return config ; + return config; + } + + // the concrete type rather than an interface is returned because Groovy subclasses + // complain (incorrectly) if we return SubscriptionSupportInternal + // TODO revert to SubscriptionSupportInternal when groovy subclasses work without this (eg new groovy version) + @Override + @Beta + public BasicSubscriptionSupport subscriptions() { + return subscriptions; } private class BasicConfigurationSupport implements ConfigurationSupportInternal { @@ -478,6 +500,57 @@ public abstract class AbstractLocation extends AbstractBrooklynObject implements } } + public class BasicSubscriptionSupport implements SubscriptionSupportInternal { + @Override + public <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) { + return getSubscriptionTracker().subscribe(producer, sensor, listener); + } + + @Override + public <T> SubscriptionHandle subscribeToMembers(Group producerGroup, Sensor<T> sensor, SensorEventListener<? super T> listener) { + return getSubscriptionTracker().subscribeToMembers(producerGroup, sensor, listener); + } + + @Override + public <T> SubscriptionHandle subscribeToChildren(Entity producerParent, Sensor<T> sensor, SensorEventListener<? super T> listener) { + return getSubscriptionTracker().subscribeToChildren(producerParent, sensor, listener); + } + + @Override + public boolean unsubscribe(Entity producer) { + return getSubscriptionTracker().unsubscribe(producer); + } + + @Override + public boolean unsubscribe(Entity producer, SubscriptionHandle handle) { + return getSubscriptionTracker().unsubscribe(producer, handle); + } + + @Override + public boolean unsubscribe(SubscriptionHandle handle) { + return getSubscriptionTracker().unsubscribe(handle); + } + + @Override + public void unsubscribeAll() { + getSubscriptionTracker().unsubscribeAll(); + } + + protected SubscriptionTracker getSubscriptionTracker() { + synchronized (AbstractLocation.this) { + if (_subscriptionTracker!=null) return _subscriptionTracker; + _subscriptionTracker = new SubscriptionTracker(newSubscriptionContext()); + return _subscriptionTracker; + } + } + + private SubscriptionContext newSubscriptionContext() { + synchronized (AbstractLocation.this) { + return getManagementContext().getSubscriptionContext(AbstractLocation.this); + } + } + } + @Override public <T> T getConfig(HasConfigKey<T> key) { return config().get(key); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/location/access/PortForwardManagerClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/location/access/PortForwardManagerClient.java b/core/src/main/java/org/apache/brooklyn/core/location/access/PortForwardManagerClient.java index 12e15aa..d0199a0 100644 --- a/core/src/main/java/org/apache/brooklyn/core/location/access/PortForwardManagerClient.java +++ b/core/src/main/java/org/apache/brooklyn/core/location/access/PortForwardManagerClient.java @@ -402,4 +402,9 @@ public class PortForwardManagerClient implements PortForwardManager { public ConfigurationSupport config() { return getDelegate().config(); } + + @Override + public SubscriptionSupport subscriptions() { + return getDelegate().subscriptions(); + } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java index 76871cd..343528d 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java @@ -32,14 +32,13 @@ import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.brooklyn.api.catalog.BrooklynCatalog; import org.apache.brooklyn.api.catalog.CatalogItem; import org.apache.brooklyn.api.effector.Effector; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.drivers.EntityDriverManager; import org.apache.brooklyn.api.entity.drivers.downloads.DownloadResolverManager; +import org.apache.brooklyn.api.location.Location; import org.apache.brooklyn.api.location.LocationRegistry; import org.apache.brooklyn.api.mgmt.ExecutionContext; import org.apache.brooklyn.api.mgmt.ManagementContext; @@ -78,6 +77,8 @@ import org.apache.brooklyn.util.core.task.BasicExecutionContext; import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.groovy.GroovyJavaMethods; import org.apache.brooklyn.util.guava.Maybe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Function; import com.google.common.base.Objects; @@ -260,6 +261,12 @@ public abstract class AbstractManagementContext implements ManagementContextInte } @Override + public SubscriptionContext getSubscriptionContext(Location loc) { + // BSC is a thin wrapper around SM so fine to create a new one here + return new BasicSubscriptionContext(getSubscriptionManager(), loc); + } + + @Override public EntityDriverManager getEntityDriverManager() { return entityDriverManager; } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java index d821c4e..57d4712 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java @@ -19,7 +19,6 @@ package org.apache.brooklyn.core.mgmt.internal; import static org.apache.brooklyn.util.JavaGroovyEquivalents.mapOf; -import groovy.lang.Closure; import java.util.Collection; import java.util.Collections; @@ -42,6 +41,8 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; +import groovy.lang.Closure; + /** * A {@link SubscriptionContext} for an entity or other user of a {@link SubscriptionManager}. */ http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java index 87e8f84..1f62add 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java @@ -241,6 +241,13 @@ public class NonDeploymentManagementContext implements ManagementContextInternal } @Override + public synchronized SubscriptionContext getSubscriptionContext(Location loc) { + // Should never be called; the NonDeploymentManagementContext is associated with a particular entity, whereas + // the #getSubscriptionContext(loc) should only be called in the context of a location. + throw new UnsupportedOperationException(); + } + + @Override public ExecutionContext getExecutionContext(Entity entity) { if (!this.entity.equals(entity)) throw new IllegalStateException("Non-deployment context "+this+" can only use a single Entity: has "+this.entity+", but passed "+entity); if (mode==NonDeploymentManagementContextMode.MANAGEMENT_STOPPED) http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java index fb71901..814923a 100644 --- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java +++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java @@ -32,17 +32,17 @@ import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.EntityLocal; import org.apache.brooklyn.api.entity.Group; import org.apache.brooklyn.api.mgmt.ExecutionContext; -import org.apache.brooklyn.api.mgmt.SubscriptionContext; import org.apache.brooklyn.api.mgmt.SubscriptionHandle; import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.api.objs.BrooklynObject; import org.apache.brooklyn.api.objs.Configurable; import org.apache.brooklyn.api.objs.EntityAdjunct; import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.api.sensor.Sensor; import org.apache.brooklyn.api.sensor.SensorEventListener; import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.config.ConfigMap; import org.apache.brooklyn.config.ConfigKey.HasConfigKey; +import org.apache.brooklyn.config.ConfigMap; import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.enricher.AbstractEnricher; import org.apache.brooklyn.core.entity.Entities; @@ -58,7 +58,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.Beta; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; @@ -82,7 +81,9 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple protected transient ExecutionContext execution; private final BasicConfigurationSupport config = new BasicConfigurationSupport(); - + + private final BasicSubscriptionSupport subscriptions = new BasicSubscriptionSupport(); + /** * The config values of this entity. Updating this map should be done * via {@link #config()}. @@ -199,6 +200,89 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple return config; } + @Override + public BasicSubscriptionSupport subscriptions() { + return subscriptions; + } + + public class BasicSubscriptionSupport implements SubscriptionSupportInternal { + @Override + public <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) { + if (!checkCanSubscribe()) return null; + return getSubscriptionTracker().subscribe(producer, sensor, listener); + } + + @Override + public <T> SubscriptionHandle subscribeToMembers(Group producerGroup, Sensor<T> sensor, SensorEventListener<? super T> listener) { + if (!checkCanSubscribe(producerGroup)) return null; + return getSubscriptionTracker().subscribeToMembers(producerGroup, sensor, listener); + } + + @Override + public <T> SubscriptionHandle subscribeToChildren(Entity producerParent, Sensor<T> sensor, SensorEventListener<? super T> listener) { + if (!checkCanSubscribe(producerParent)) return null; + return getSubscriptionTracker().subscribeToChildren(producerParent, sensor, listener); + } + + @Override + public boolean unsubscribe(Entity producer) { + if (destroyed.get()) return false; + return getSubscriptionTracker().unsubscribe(producer); + } + + @Override + public boolean unsubscribe(Entity producer, SubscriptionHandle handle) { + if (destroyed.get()) return false; + return getSubscriptionTracker().unsubscribe(producer, handle); + } + + @Override + public boolean unsubscribe(SubscriptionHandle handle) { + if (destroyed.get()) return false; + return getSubscriptionTracker().unsubscribe(handle); + } + + @Override + public void unsubscribeAll() { + if (destroyed.get()) return; + getSubscriptionTracker().unsubscribeAll(); + } + + protected SubscriptionTracker getSubscriptionTracker() { + synchronized (AbstractEntityAdjunct.this) { + if (_subscriptionTracker!=null) return _subscriptionTracker; + if (entity==null) return null; + _subscriptionTracker = new SubscriptionTracker(((EntityInternal)entity).getManagementSupport().getSubscriptionContext()); + return _subscriptionTracker; + } + } + + /** returns false if deleted, throws exception if invalid state, otherwise true. + * okay if entity is not yet managed (but not if entity is no longer managed). */ + protected boolean checkCanSubscribe(Entity producer) { + if (destroyed.get()) return false; + if (producer==null) throw new IllegalStateException(this+" given a null target for subscription"); + if (entity==null) throw new IllegalStateException(this+" cannot subscribe to "+producer+" because it is not associated to an entity"); + if (((EntityInternal)entity).getManagementSupport().isNoLongerManaged()) throw new IllegalStateException(this+" cannot subscribe to "+producer+" because the associated entity "+entity+" is no longer managed"); + return true; + } + + protected boolean checkCanSubscribe() { + if (destroyed.get()) return false; + if (entity==null) throw new IllegalStateException(this+" cannot subscribe because it is not associated to an entity"); + if (((EntityInternal)entity).getManagementSupport().isNoLongerManaged()) throw new IllegalStateException(this+" cannot subscribe because the associated entity "+entity+" is no longer managed"); + return true; + } + + /** + * @return a list of all subscription handles + */ + protected Collection<SubscriptionHandle> getAllSubscriptions() { + SubscriptionTracker tracker = getSubscriptionTracker(); + return (tracker != null) ? tracker.getAllSubscriptions() : Collections.<SubscriptionHandle>emptyList(); + } + } + private class BasicConfigurationSupport implements ConfigurationSupportInternal { @Override @@ -377,6 +461,10 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple } } + /** + * @deprecated since 0.9.0; for internal use only + */ + @Deprecated protected synchronized SubscriptionTracker getSubscriptionTracker() { if (_subscriptionTracker!=null) return _subscriptionTracker; if (entity==null) return null; @@ -384,14 +472,15 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple return _subscriptionTracker; } - @VisibleForTesting //intended as protected, meant for subclasses - /** @see SubscriptionContext#subscribe(Entity, Sensor, SensorEventListener) */ + /** + * @deprecated since 0.9.0; see {@link SubscriptionSupport#subscribe(Entity, Sensor, SensorEventListener)} and {@link BrooklynObject#subscriptions()} + */ + @Deprecated public <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) { if (!checkCanSubscribe()) return null; return getSubscriptionTracker().subscribe(producer, sensor, listener); } - @VisibleForTesting //intended as protected, meant for subclasses @Beta /** @see SubscriptionContext#subscribe(Map, Entity, Sensor, SensorEventListener) */ public <T> SubscriptionHandle subscribe(Map<String, ?> flags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) { @@ -399,67 +488,68 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple return getSubscriptionTracker().subscribe(flags, producer, sensor, listener); } - @VisibleForTesting //intended as protected, meant for subclasses - /** @see SubscriptionContext#subscribe(Entity, Sensor, SensorEventListener) */ + /** + * @deprecated since 0.9.0; see {@link SubscriptionSupport#subscribeToMembers(Entity, Sensor, SensorEventListener)} and {@link BrooklynObject#subscriptions()} + */ + @Deprecated public <T> SubscriptionHandle subscribeToMembers(Group producerGroup, Sensor<T> sensor, SensorEventListener<? super T> listener) { if (!checkCanSubscribe(producerGroup)) return null; return getSubscriptionTracker().subscribeToMembers(producerGroup, sensor, listener); } - @VisibleForTesting //intended as protected, meant for subclasses - /** @see SubscriptionContext#subscribe(Entity, Sensor, SensorEventListener) */ + /** + * @deprecated since 0.9.0; see {@link SubscriptionSupport#subscribeToChildren(Entity, Sensor, SensorEventListener)} and {@link BrooklynObject#subscriptions()} + */ + @Deprecated public <T> SubscriptionHandle subscribeToChildren(Entity producerParent, Sensor<T> sensor, SensorEventListener<? super T> listener) { if (!checkCanSubscribe(producerParent)) return null; return getSubscriptionTracker().subscribeToChildren(producerParent, sensor, listener); } - /** @deprecated since 0.7.0 use {@link #checkCanSubscribe(Entity)} */ + /** + * @deprecated since 0.7.0 use {@link BasicSubscriptionSupport#checkCanSubscribe(Entity) + */ @Deprecated protected boolean check(Entity requiredEntity) { return checkCanSubscribe(requiredEntity); } - /** returns false if deleted, throws exception if invalid state, otherwise true. - * okay if entity is not yet managed (but not if entity is no longer managed). */ + + /** + * @deprecated since 0.9.0; for internal use only + */ + @Deprecated protected boolean checkCanSubscribe(Entity producer) { - if (destroyed.get()) return false; - if (producer==null) throw new IllegalStateException(this+" given a null target for subscription"); - if (entity==null) throw new IllegalStateException(this+" cannot subscribe to "+producer+" because it is not associated to an entity"); - if (((EntityInternal)entity).getManagementSupport().isNoLongerManaged()) throw new IllegalStateException(this+" cannot subscribe to "+producer+" because the associated entity "+entity+" is no longer managed"); - return true; + return subscriptions().checkCanSubscribe(producer); } + + /** + * @deprecated since 0.9.0; for internal use only + */ + @Deprecated protected boolean checkCanSubscribe() { - if (destroyed.get()) return false; - if (entity==null) throw new IllegalStateException(this+" cannot subscribe because it is not associated to an entity"); - if (((EntityInternal)entity).getManagementSupport().isNoLongerManaged()) throw new IllegalStateException(this+" cannot subscribe because the associated entity "+entity+" is no longer managed"); - return true; + return subscriptions().checkCanSubscribe(); } /** - * Unsubscribes the given producer. - * - * @see SubscriptionContext#unsubscribe(SubscriptionHandle) + * @deprecated since 0.9.0; see {@link SubscriptionSupport#unsubscribe(Entity)} and {@link BrooklynObject#subscriptions()} */ - @VisibleForTesting //intended as protected, meant for subclasses + @Deprecated public boolean unsubscribe(Entity producer) { - if (destroyed.get()) return false; - return getSubscriptionTracker().unsubscribe(producer); + return subscriptions().unsubscribe(producer); } /** - * Unsubscribes the given producer. - * - * @see SubscriptionContext#unsubscribe(SubscriptionHandle) + * @deprecated since 0.9.0; see {@link SubscriptionSupport#unsubscribe(Entity, SubscriptionHandle)} and {@link BrooklynObject#subscriptions()} */ - @VisibleForTesting //intended as protected, meant for subclasses + @Deprecated public boolean unsubscribe(Entity producer, SubscriptionHandle handle) { - if (destroyed.get()) return false; - return getSubscriptionTracker().unsubscribe(producer, handle); + return subscriptions().unsubscribe(producer, handle); } /** - * @return a list of all subscription handles + * @deprecated since 0.9.0; for internal use only */ - @VisibleForTesting //intended as protected, meant for subclasses + @Deprecated protected Collection<SubscriptionHandle> getAllSubscriptions() { SubscriptionTracker tracker = getSubscriptionTracker(); return (tracker != null) ? tracker.getAllSubscriptions() : Collections.<SubscriptionHandle>emptyList(); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java b/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java index ad2cca1..d076f4e 100644 --- a/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java +++ b/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java @@ -38,8 +38,12 @@ public interface BrooklynObjectInternal extends BrooklynObject, Rebindable { @SuppressWarnings("rawtypes") // subclasses typically apply stronger typing RebindSupport getRebindSupport(); + @Override ConfigurationSupportInternal config(); + @Override + SubscriptionSupportInternal subscriptions(); + @Beta public interface ConfigurationSupportInternal extends Configurable.ConfigurationSupport { @@ -100,4 +104,9 @@ public interface BrooklynObjectInternal extends BrooklynObject, Rebindable { @Beta void refreshInheritedConfigOfChildren(); } + + @Beta + public interface SubscriptionSupportInternal extends BrooklynObject.SubscriptionSupport { + public void unsubscribeAll(); + } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/test/java/org/apache/brooklyn/core/location/LocationSubscriptionTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/location/LocationSubscriptionTest.java b/core/src/test/java/org/apache/brooklyn/core/location/LocationSubscriptionTest.java new file mode 100644 index 0000000..30352c7 --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/location/LocationSubscriptionTest.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.location; + +import static org.testng.Assert.assertEquals; + +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.entity.EntitySpec; +import org.apache.brooklyn.api.mgmt.SubscriptionHandle; +import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.entity.RecordingSensorEventListener; +import org.apache.brooklyn.core.location.SimulatedLocation; +import org.apache.brooklyn.core.sensor.BasicSensorEvent; +import org.apache.brooklyn.core.test.entity.TestApplication; +import org.apache.brooklyn.core.test.entity.TestEntity; +import org.apache.brooklyn.entity.group.BasicGroup; +import org.apache.brooklyn.test.Asserts; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +public class LocationSubscriptionTest { + + // TODO Duplication between this and PolicySubscriptionTest + + private SimulatedLocation loc; + private TestApplication app; + private TestEntity observedEntity; + private BasicGroup observedGroup; + private TestEntity observedChildEntity; + private TestEntity observedMemberEntity; + private TestEntity otherEntity; + private RecordingSensorEventListener<Object> listener; + + @BeforeMethod(alwaysRun=true) + public void setUp() { + app = TestApplication.Factory.newManagedInstanceForTests(); + loc = app.newSimulatedLocation(); + observedEntity = app.createAndManageChild(EntitySpec.create(TestEntity.class)); + observedChildEntity = observedEntity.createAndManageChild(EntitySpec.create(TestEntity.class)); + + observedGroup = app.createAndManageChild(EntitySpec.create(BasicGroup.class)); + observedMemberEntity = app.createAndManageChild(EntitySpec.create(TestEntity.class)); + observedGroup.addMember(observedMemberEntity); + + otherEntity = app.createAndManageChild(EntitySpec.create(TestEntity.class)); + + listener = new RecordingSensorEventListener<>(); + + app.start(ImmutableList.of(loc)); + } + + @AfterMethod(alwaysRun=true) + public void tearDown() { + if (app != null) Entities.destroyAll(app.getManagementContext()); + } + + @Test + public void testSubscriptionReceivesEvents() { + loc.subscriptions().subscribe(observedEntity, TestEntity.SEQUENCE, listener); + loc.subscriptions().subscribe(observedEntity, TestEntity.NAME, listener); + loc.subscriptions().subscribe(observedEntity, TestEntity.MY_NOTIF, listener); + + otherEntity.sensors().set(TestEntity.SEQUENCE, 123); + observedEntity.sensors().set(TestEntity.SEQUENCE, 123); + observedEntity.sensors().set(TestEntity.NAME, "myname"); + observedEntity.sensors().emit(TestEntity.MY_NOTIF, 456); + + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + assertEquals(listener.getEvents(), ImmutableList.of( + new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, observedEntity, 123), + new BasicSensorEvent<String>(TestEntity.NAME, observedEntity, "myname"), + new BasicSensorEvent<Integer>(TestEntity.MY_NOTIF, observedEntity, 456))); + }}); + } + + @Test + public void testSubscriptionToAllReceivesEvents() { + loc.subscriptions().subscribe(null, TestEntity.SEQUENCE, listener); + + observedEntity.sensors().set(TestEntity.SEQUENCE, 123); + otherEntity.sensors().set(TestEntity.SEQUENCE, 456); + + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + assertEquals(listener.getEvents(), ImmutableList.of( + new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, observedEntity, 123), + new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, otherEntity, 456))); + }}); + } + + @Test + public void testSubscribeToChildrenReceivesEvents() { + loc.subscriptions().subscribeToChildren(observedEntity, TestEntity.SEQUENCE, listener); + + observedChildEntity.sensors().set(TestEntity.SEQUENCE, 123); + observedEntity.sensors().set(TestEntity.SEQUENCE, 456); + + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + assertEquals(listener.getEvents(), ImmutableList.of( + new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, observedChildEntity, 123))); + }}); + } + + @Test + public void testSubscribeToChildrenReceivesEventsForDynamicallyAddedChildren() { + loc.subscriptions().subscribeToChildren(observedEntity, TestEntity.SEQUENCE, listener); + + final TestEntity observedChildEntity2 = observedEntity.createAndManageChild(EntitySpec.create(TestEntity.class)); + observedChildEntity2.sensors().set(TestEntity.SEQUENCE, 123); + + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + assertEquals(listener.getEvents(), ImmutableList.of( + new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, observedChildEntity2, 123))); + }}); + } + + @Test + public void testSubscribeToMembersReceivesEvents() { + loc.subscriptions().subscribeToMembers(observedGroup, TestEntity.SEQUENCE, listener); + + observedMemberEntity.sensors().set(TestEntity.SEQUENCE, 123); + observedGroup.sensors().set(TestEntity.SEQUENCE, 456); + + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + assertEquals(listener.getEvents(), ImmutableList.of( + new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, observedMemberEntity, 123))); + }}); + } + + @Test + public void testSubscribeToMembersReceivesEventsForDynamicallyAddedMembers() { + loc.subscriptions().subscribeToMembers(observedGroup, TestEntity.SEQUENCE, listener); + + final TestEntity observedMemberEntity2 = app.createAndManageChild(EntitySpec.create(TestEntity.class)); + observedGroup.addMember(observedMemberEntity2); + observedMemberEntity2.sensors().set(TestEntity.SEQUENCE, 123); + + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + assertEquals(listener.getEvents(), ImmutableList.of( + new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, observedMemberEntity2, 123))); + }}); + } + + @Test(groups="Integration") + public void testSubscribeToMembersIgnoresEventsForDynamicallyRemovedMembers() { + loc.subscriptions().subscribeToMembers(observedGroup, TestEntity.SEQUENCE, listener); + + observedGroup.removeMember(observedMemberEntity); + + observedMemberEntity.sensors().set(TestEntity.SEQUENCE, 123); + + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + assertEquals(listener.getEvents(), ImmutableList.of()); + }}); + } + + @Test + public void testUnsubscribeRemovesAllSubscriptionsForThatEntity() { + loc.subscriptions().subscribe(observedEntity, TestEntity.SEQUENCE, listener); + loc.subscriptions().subscribe(observedEntity, TestEntity.NAME, listener); + loc.subscriptions().subscribe(observedEntity, TestEntity.MY_NOTIF, listener); + loc.subscriptions().subscribe(otherEntity, TestEntity.SEQUENCE, listener); + loc.subscriptions().unsubscribe(observedEntity); + + observedEntity.sensors().set(TestEntity.SEQUENCE, 123); + observedEntity.sensors().set(TestEntity.NAME, "myname"); + observedEntity.sensors().emit(TestEntity.MY_NOTIF, 123); + otherEntity.sensors().set(TestEntity.SEQUENCE, 456); + + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + assertEquals(listener.getEvents(), ImmutableList.of( + new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, otherEntity, 456))); + }}); + } + + @Test + public void testUnsubscribeUsingHandleStopsEvents() { + SubscriptionHandle handle1 = loc.subscriptions().subscribe(observedEntity, TestEntity.SEQUENCE, listener); + SubscriptionHandle handle2 = loc.subscriptions().subscribe(observedEntity, TestEntity.NAME, listener); + SubscriptionHandle handle3 = loc.subscriptions().subscribe(otherEntity, TestEntity.SEQUENCE, listener); + + loc.subscriptions().unsubscribe(observedEntity, handle2); + + observedEntity.sensors().set(TestEntity.SEQUENCE, 123); + observedEntity.sensors().set(TestEntity.NAME, "myname"); + otherEntity.sensors().set(TestEntity.SEQUENCE, 456); + + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + assertEquals(listener.getEvents(), ImmutableList.of( + new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, observedEntity, 123), + new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, otherEntity, 456))); + }}); + } + + @Test + public void testSubscriptionReceivesEventsInOrder() { + final int NUM_EVENTS = 100; + loc.subscriptions().subscribe(observedEntity, TestEntity.MY_NOTIF, listener); + + for (int i = 0; i < NUM_EVENTS; i++) { + observedEntity.sensors().emit(TestEntity.MY_NOTIF, i); + } + + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + assertEquals(Iterables.size(listener.getEvents()), NUM_EVENTS); + for (int i = 0; i < NUM_EVENTS; i++) { + assertEquals(Iterables.get(listener.getEvents(), i).getValue(), i); + } + }}); + } + +}
