Rename o.a.b.sensor.enricher to o.a.b.core.enricher - and o.a.b.enricher.stock for actual enricher impls
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/6f15e8a6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/6f15e8a6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/6f15e8a6 Branch: refs/heads/master Commit: 6f15e8a6d61c2e648547cf7faba03fbc06716421 Parents: daf4091 Author: Aled Sage <[email protected]> Authored: Wed Aug 19 23:07:28 2015 +0100 Committer: Aled Sage <[email protected]> Committed: Wed Aug 19 23:07:28 2015 +0100 ---------------------------------------------------------------------- .../core/enricher/AbstractEnricher.java | 115 +++ .../core/enricher/EnricherDynamicType.java | 43 + .../core/enricher/EnricherTypeSnapshot.java | 39 + .../brooklyn/core/entity/AbstractEntity.java | 2 +- .../entity/lifecycle/ServiceStateLogic.java | 8 +- .../mgmt/rebind/BasicEnricherRebindSupport.java | 2 +- .../mgmt/rebind/BasicEntityRebindSupport.java | 2 +- .../core/mgmt/rebind/RebindIteration.java | 2 +- .../core/mgmt/rebind/RebindManagerImpl.java | 2 +- .../mgmt/rebind/dto/MementosGenerators.java | 2 +- .../core/objs/AbstractEntityAdjunct.java | 2 +- .../brooklyn/core/objs/BrooklynTypes.java | 2 +- .../core/objs/proxy/InternalPolicyFactory.java | 2 +- .../brooklyn/core/sensor/StaticSensor.java | 2 +- .../stock/AbstractAggregatingEnricher.java | 174 ++++ .../enricher/stock/AbstractAggregator.java | 238 ++++++ .../stock/AbstractMultipleSensorAggregator.java | 169 ++++ .../enricher/stock/AbstractTransformer.java | 101 +++ .../stock/AbstractTransformingEnricher.java | 38 + .../stock/AbstractTypeTransformingEnricher.java | 68 ++ .../brooklyn/enricher/stock/AddingEnricher.java | 107 +++ .../brooklyn/enricher/stock/Aggregator.java | 222 +++++ .../brooklyn/enricher/stock/Combiner.java | 138 ++++ .../stock/CustomAggregatingEnricher.java | 320 +++++++ .../brooklyn/enricher/stock/Enrichers.java | 825 +++++++++++++++++++ .../apache/brooklyn/enricher/stock/Joiner.java | 127 +++ .../brooklyn/enricher/stock/Propagator.java | 201 +++++ .../stock/SensorPropagatingEnricher.java | 181 ++++ .../stock/SensorTransformingEnricher.java | 106 +++ .../brooklyn/enricher/stock/Transformer.java | 103 +++ .../brooklyn/enricher/stock/UpdatingMap.java | 159 ++++ .../YamlRollingTimeWindowMeanEnricher.java | 178 ++++ .../stock/YamlTimeWeightedDeltaEnricher.java | 83 ++ .../entity/group/DynamicFabricImpl.java | 2 +- .../entity/stock/DelegateEntityImpl.java | 2 +- .../enricher/AbstractAggregatingEnricher.java | 173 ---- .../sensor/enricher/AbstractAggregator.java | 237 ------ .../sensor/enricher/AbstractEnricher.java | 115 --- .../AbstractMultipleSensorAggregator.java | 169 ---- .../sensor/enricher/AbstractTransformer.java | 100 --- .../enricher/AbstractTransformingEnricher.java | 38 - .../AbstractTypeTransformingEnricher.java | 67 -- .../sensor/enricher/AddingEnricher.java | 106 --- .../brooklyn/sensor/enricher/Aggregator.java | 221 ----- .../brooklyn/sensor/enricher/Combiner.java | 137 --- .../enricher/CustomAggregatingEnricher.java | 320 ------- .../sensor/enricher/EnricherDynamicType.java | 43 - .../sensor/enricher/EnricherTypeSnapshot.java | 39 - .../brooklyn/sensor/enricher/Enrichers.java | 824 ------------------ .../apache/brooklyn/sensor/enricher/Joiner.java | 126 --- .../brooklyn/sensor/enricher/Propagator.java | 200 ----- .../enricher/SensorPropagatingEnricher.java | 180 ---- .../enricher/SensorTransformingEnricher.java | 106 --- .../brooklyn/sensor/enricher/Transformer.java | 103 --- .../brooklyn/sensor/enricher/UpdatingMap.java | 158 ---- .../YamlRollingTimeWindowMeanEnricher.java | 178 ---- .../enricher/YamlTimeWeightedDeltaEnricher.java | 83 -- .../core/enricher/BasicEnricherTest.java | 119 +++ .../core/enricher/EnricherConfigTest.java | 147 ++++ .../brooklyn/core/entity/EntitySpecTest.java | 2 +- .../BrooklynMementoPersisterTestFixture.java | 2 +- .../core/mgmt/rebind/RebindEnricherTest.java | 4 +- .../core/mgmt/rebind/RebindFailuresTest.java | 2 +- .../core/mgmt/rebind/RebindPolicyTest.java | 2 +- .../core/policy/basic/EnricherTypeTest.java | 2 +- .../brooklyn/core/test/policy/TestEnricher.java | 2 +- ...stomAggregatingEnricherDeprecatedTest.groovy | 368 +++++++++ .../stock/CustomAggregatingEnricherTest.java | 556 +++++++++++++ .../brooklyn/enricher/stock/EnrichersTest.java | 501 +++++++++++ ...SensorPropagatingEnricherDeprecatedTest.java | 108 +++ .../stock/SensorPropagatingEnricherTest.java | 218 +++++ .../TransformingEnricherDeprecatedTest.groovy | 83 ++ .../stock/TransformingEnricherTest.java | 71 ++ .../YamlRollingTimeWindowMeanEnricherTest.java | 179 ++++ .../YamlTimeWeightedDeltaEnricherTest.java | 107 +++ .../sensor/enricher/BasicEnricherTest.java | 119 --- ...stomAggregatingEnricherDeprecatedTest.groovy | 367 --------- .../enricher/CustomAggregatingEnricherTest.java | 556 ------------- .../sensor/enricher/EnricherConfigTest.java | 147 ---- .../brooklyn/sensor/enricher/EnrichersTest.java | 501 ----------- ...SensorPropagatingEnricherDeprecatedTest.java | 108 --- .../enricher/SensorPropagatingEnricherTest.java | 218 ----- .../TransformingEnricherDeprecatedTest.groovy | 82 -- .../enricher/TransformingEnricherTest.java | 71 -- .../YamlRollingTimeWindowMeanEnricherTest.java | 179 ---- .../YamlTimeWeightedDeltaEnricherTest.java | 107 --- .../brooklyn/demo/ResilientMongoDbApp.java | 2 +- .../demo/WebClusterDatabaseExample.java | 2 +- .../demo/WebClusterDatabaseExampleApp.java | 2 +- ...lusterDatabaseExampleAppIntegrationTest.java | 2 +- .../brooklyn/policy/enricher/DeltaEnricher.java | 2 +- .../policy/enricher/HttpLatencyDetector.java | 2 +- .../policy/enricher/RollingMeanEnricher.java | 2 +- .../enricher/RollingTimeWindowMeanEnricher.java | 4 +- .../enricher/TimeFractionDeltaEnricher.java | 2 +- .../enricher/TimeWeightedDeltaEnricher.java | 4 +- .../brooklynnode/BrooklynClusterImpl.java | 2 +- .../entity/brooklynnode/BrooklynNodeImpl.java | 2 +- .../software/base/SoftwareProcessImpl.java | 2 +- .../system_service/SystemServiceEnricher.java | 2 +- .../entity/database/crate/CrateNodeImpl.java | 2 +- .../entity/database/mysql/MySqlClusterImpl.java | 2 +- .../messaging/kafka/KafkaClusterImpl.java | 2 +- .../messaging/storm/StormDeploymentImpl.java | 2 +- .../bind/BindDnsServerIntegrationTest.java | 2 +- .../network/bind/PrefixAndIdEnricher.java | 2 +- .../cassandra/CassandraDatacenterImpl.java | 2 +- .../nosql/couchbase/CouchbaseClusterImpl.java | 2 +- .../nosql/mongodb/MongoDBReplicaSetImpl.java | 2 +- .../sharding/CoLocatedMongoDBRouterImpl.java | 2 +- .../sharding/MongoDBShardedDeploymentImpl.java | 2 +- .../entity/nosql/redis/RedisClusterImpl.java | 2 +- .../entity/nosql/riak/RiakClusterImpl.java | 2 +- .../entity/nosql/riak/RiakNodeImpl.java | 2 +- .../entity/proxy/nginx/NginxControllerImpl.java | 2 +- .../ControlledDynamicWebAppClusterImpl.java | 2 +- .../entity/webapp/DynamicWebAppClusterImpl.java | 2 +- .../entity/webapp/DynamicWebAppFabricImpl.java | 2 +- .../entity/webapp/jboss/JBoss6ServerImpl.java | 2 +- .../entity/webapp/jboss/JBoss7ServerImpl.java | 2 +- .../entity/webapp/jetty/Jetty6ServerImpl.java | 2 +- .../app/ClusterWebServerDatabaseSample.java | 4 +- .../camp/brooklyn/EnrichersYamlTest.java | 2 +- .../camp/brooklyn/TestReferencingEnricher.java | 2 +- ...est-app-with-enrichers-slightly-simpler.yaml | 12 +- .../test-webapp-with-averaging-enricher.yaml | 4 +- .../apache/brooklyn/cli/lister/ClassFinder.java | 2 +- .../qa/load/SimulatedJBoss7ServerImpl.java | 2 +- .../brooklyn/qa/load/SimulatedTheeTierApp.java | 2 +- .../webcluster/SinusoidalLoadGenerator.java | 2 +- .../qa/longevity/webcluster/WebClusterApp.java | 2 +- .../rest/util/BrooklynRestResourceUtils.java | 2 +- 132 files changed, 6271 insertions(+), 6257 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/enricher/AbstractEnricher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/enricher/AbstractEnricher.java b/core/src/main/java/org/apache/brooklyn/core/enricher/AbstractEnricher.java new file mode 100644 index 0000000..0dc36f6 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/enricher/AbstractEnricher.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.enricher; + +import static com.google.common.base.Preconditions.checkState; + +import java.util.Map; + +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.mgmt.rebind.RebindSupport; +import org.apache.brooklyn.api.mgmt.rebind.mementos.EnricherMemento; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.Enricher; +import org.apache.brooklyn.api.sensor.EnricherType; +import org.apache.brooklyn.api.sensor.Sensor; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.entity.EntityInternal; +import org.apache.brooklyn.core.mgmt.rebind.BasicEnricherRebindSupport; +import org.apache.brooklyn.core.objs.AbstractEntityAdjunct; +import org.apache.brooklyn.util.core.flags.TypeCoercions; + +import com.google.common.base.Objects; +import com.google.common.collect.Maps; + +/** +* Base {@link Enricher} implementation; all enrichers should extend this or its children +*/ +public abstract class AbstractEnricher extends AbstractEntityAdjunct implements Enricher { + + public static final ConfigKey<Boolean> SUPPRESS_DUPLICATES = ConfigKeys.newBooleanConfigKey("enricher.suppressDuplicates", + "Whether duplicate values published by this enricher should be suppressed"); + + private final EnricherDynamicType enricherType; + protected Boolean suppressDuplicates; + + public AbstractEnricher() { + this(Maps.newLinkedHashMap()); + } + + public AbstractEnricher(Map<?,?> flags) { + super(flags); + + enricherType = new EnricherDynamicType(this); + + if (isLegacyConstruction() && !isLegacyNoConstructionInit()) { + init(); + } + } + + @Override + public RebindSupport<EnricherMemento> getRebindSupport() { + return new BasicEnricherRebindSupport(this); + } + + @Override + public EnricherType getEnricherType() { + return enricherType.getSnapshot(); + } + + @Override + public void setEntity(EntityLocal entity) { + super.setEntity(entity); + Boolean suppressDuplicates = getConfig(SUPPRESS_DUPLICATES); + if (suppressDuplicates!=null) + this.suppressDuplicates = suppressDuplicates; + } + + @Override + protected void onChanged() { + requestPersist(); + } + + @Override + protected <T> void emit(Sensor<T> sensor, Object val) { + checkState(entity != null, "entity must first be set"); + if (val == Entities.UNCHANGED) { + return; + } + if (val == Entities.REMOVE) { + ((EntityInternal)entity).removeAttribute((AttributeSensor<T>) sensor); + return; + } + + T newVal = TypeCoercions.coerce(val, sensor.getTypeToken()); + if (sensor instanceof AttributeSensor) { + if (Boolean.TRUE.equals(suppressDuplicates)) { + T oldValue = entity.getAttribute((AttributeSensor<T>)sensor); + if (Objects.equal(oldValue, newVal)) + return; + } + entity.setAttribute((AttributeSensor<T>)sensor, newVal); + } else { + entity.emit(sensor, newVal); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/enricher/EnricherDynamicType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/enricher/EnricherDynamicType.java b/core/src/main/java/org/apache/brooklyn/core/enricher/EnricherDynamicType.java new file mode 100644 index 0000000..b6a0b23 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/enricher/EnricherDynamicType.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.enricher; + +import org.apache.brooklyn.api.sensor.Enricher; +import org.apache.brooklyn.api.sensor.EnricherType; +import org.apache.brooklyn.core.objs.BrooklynDynamicType; + +public class EnricherDynamicType extends BrooklynDynamicType<Enricher, AbstractEnricher> { + + public EnricherDynamicType(Class<? extends Enricher> type) { + super(type); + } + + public EnricherDynamicType(AbstractEnricher enricher) { + super(enricher); + } + + public EnricherType getSnapshot() { + return (EnricherType) super.getSnapshot(); + } + + @Override + protected EnricherTypeSnapshot newSnapshot() { + return new EnricherTypeSnapshot(name, value(configKeys)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/enricher/EnricherTypeSnapshot.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/enricher/EnricherTypeSnapshot.java b/core/src/main/java/org/apache/brooklyn/core/enricher/EnricherTypeSnapshot.java new file mode 100644 index 0000000..240d884 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/enricher/EnricherTypeSnapshot.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.enricher; + +import java.util.Map; + +import org.apache.brooklyn.api.sensor.EnricherType; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.objs.BrooklynTypeSnapshot; + +public class EnricherTypeSnapshot extends BrooklynTypeSnapshot implements EnricherType { + private static final long serialVersionUID = 4670930188951106009L; + + EnricherTypeSnapshot(String name, Map<String, ConfigKey<?>> configKeys) { + super(name, configKeys); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + return (obj instanceof EnricherTypeSnapshot) && super.equals(obj); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java index 0ec5903..fb8f2d0 100644 --- a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java +++ b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java @@ -59,6 +59,7 @@ import org.apache.brooklyn.core.BrooklynFeatureEnablement; import org.apache.brooklyn.core.BrooklynLogging; import org.apache.brooklyn.core.catalog.internal.CatalogUtils; import org.apache.brooklyn.core.config.render.RendererHints; +import org.apache.brooklyn.core.enricher.AbstractEnricher; import org.apache.brooklyn.core.entity.internal.EntityConfigMap; import org.apache.brooklyn.core.entity.lifecycle.PolicyDescriptor; import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic; @@ -83,7 +84,6 @@ import org.apache.brooklyn.core.sensor.AttributeMap; import org.apache.brooklyn.core.sensor.AttributeSensorAndConfigKey; import org.apache.brooklyn.core.sensor.BasicNotificationSensor; import org.apache.brooklyn.core.sensor.Sensors; -import org.apache.brooklyn.sensor.enricher.AbstractEnricher; import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.collections.MutableSet; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/entity/lifecycle/ServiceStateLogic.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/lifecycle/ServiceStateLogic.java b/core/src/main/java/org/apache/brooklyn/core/entity/lifecycle/ServiceStateLogic.java index 654662f..c2606c1 100644 --- a/core/src/main/java/org/apache/brooklyn/core/entity/lifecycle/ServiceStateLogic.java +++ b/core/src/main/java/org/apache/brooklyn/core/entity/lifecycle/ServiceStateLogic.java @@ -42,16 +42,16 @@ import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.BrooklynLogging; import org.apache.brooklyn.core.BrooklynLogging.LoggingLevel; import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.enricher.AbstractEnricher; import org.apache.brooklyn.core.entity.Attributes; import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.entity.EntityAdjuncts; import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.entity.EntityPredicates; import org.apache.brooklyn.core.entity.lifecycle.Lifecycle.Transition; -import org.apache.brooklyn.sensor.enricher.AbstractEnricher; -import org.apache.brooklyn.sensor.enricher.AbstractMultipleSensorAggregator; -import org.apache.brooklyn.sensor.enricher.Enrichers; -import org.apache.brooklyn.sensor.enricher.UpdatingMap; +import org.apache.brooklyn.enricher.stock.AbstractMultipleSensorAggregator; +import org.apache.brooklyn.enricher.stock.Enrichers; +import org.apache.brooklyn.enricher.stock.UpdatingMap; import org.apache.brooklyn.util.collections.CollectionFunctionals; import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.collections.MutableMap; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEnricherRebindSupport.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEnricherRebindSupport.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEnricherRebindSupport.java index 89e11e2..3903655 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEnricherRebindSupport.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEnricherRebindSupport.java @@ -20,7 +20,7 @@ package org.apache.brooklyn.core.mgmt.rebind; import org.apache.brooklyn.api.mgmt.rebind.RebindContext; import org.apache.brooklyn.api.mgmt.rebind.mementos.EnricherMemento; -import org.apache.brooklyn.sensor.enricher.AbstractEnricher; +import org.apache.brooklyn.core.enricher.AbstractEnricher; import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.core.flags.FlagUtils; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java index 2a5e92e..0d80698 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java @@ -32,13 +32,13 @@ import org.apache.brooklyn.api.mgmt.rebind.mementos.EntityMemento; import org.apache.brooklyn.api.objs.BrooklynObjectType; import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.enricher.AbstractEnricher; import org.apache.brooklyn.core.entity.AbstractEntity; import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.feed.AbstractFeed; import org.apache.brooklyn.core.mgmt.rebind.dto.MementosGenerators; import org.apache.brooklyn.core.policy.AbstractPolicy; import org.apache.brooklyn.entity.group.AbstractGroupImpl; -import org.apache.brooklyn.sensor.enricher.AbstractEnricher; import org.apache.brooklyn.util.exceptions.Exceptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java index e9478ef..3f468ba 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java @@ -64,6 +64,7 @@ import org.apache.brooklyn.core.BrooklynLogging; import org.apache.brooklyn.core.BrooklynLogging.LoggingLevel; import org.apache.brooklyn.core.catalog.internal.CatalogInitialization; import org.apache.brooklyn.core.catalog.internal.CatalogUtils; +import org.apache.brooklyn.core.enricher.AbstractEnricher; import org.apache.brooklyn.core.entity.AbstractApplication; import org.apache.brooklyn.core.entity.AbstractEntity; import org.apache.brooklyn.core.entity.EntityInternal; @@ -86,7 +87,6 @@ import org.apache.brooklyn.core.objs.proxy.InternalFactory; import org.apache.brooklyn.core.objs.proxy.InternalLocationFactory; import org.apache.brooklyn.core.objs.proxy.InternalPolicyFactory; import org.apache.brooklyn.core.policy.AbstractPolicy; -import org.apache.brooklyn.sensor.enricher.AbstractEnricher; import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.core.flags.FlagUtils; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java index fdce617..52b984a 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java @@ -46,6 +46,7 @@ import org.apache.brooklyn.api.objs.BrooklynObject; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.BrooklynFeatureEnablement; import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.enricher.AbstractEnricher; import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.mgmt.ha.HighAvailabilityManagerImpl; import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal; @@ -55,7 +56,6 @@ import org.apache.brooklyn.core.mgmt.persist.PersistenceActivityMetrics; import org.apache.brooklyn.core.mgmt.persist.BrooklynPersistenceUtils.CreateBackupMode; import org.apache.brooklyn.core.mgmt.rebind.transformer.CompoundTransformer; import org.apache.brooklyn.core.server.BrooklynServerConfig; -import org.apache.brooklyn.sensor.enricher.AbstractEnricher; import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.collections.QuorumCheck; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java index 929b63c..36daf49 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java @@ -49,6 +49,7 @@ import org.apache.brooklyn.api.sensor.Feed; import org.apache.brooklyn.api.sensor.AttributeSensor.SensorPersistenceMode; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.catalog.internal.CatalogItemDo; +import org.apache.brooklyn.core.enricher.AbstractEnricher; import org.apache.brooklyn.core.entity.EntityDynamicType; import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.feed.AbstractFeed; @@ -58,7 +59,6 @@ import org.apache.brooklyn.core.mgmt.rebind.AbstractBrooklynObjectRebindSupport; import org.apache.brooklyn.core.mgmt.rebind.TreeUtils; import org.apache.brooklyn.core.objs.BrooklynTypes; import org.apache.brooklyn.core.policy.AbstractPolicy; -import org.apache.brooklyn.sensor.enricher.AbstractEnricher; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.core.flags.FlagUtils; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java index efd89d1..e85cc73 100644 --- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java +++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java @@ -44,10 +44,10 @@ import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.config.ConfigMap; import org.apache.brooklyn.config.ConfigKey.HasConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.enricher.AbstractEnricher; import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.mgmt.internal.SubscriptionTracker; -import org.apache.brooklyn.sensor.enricher.AbstractEnricher; import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.core.flags.FlagUtils; import org.apache.brooklyn.util.core.flags.SetFromFlag; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynTypes.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynTypes.java b/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynTypes.java index b6e68ff..4170613 100644 --- a/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynTypes.java +++ b/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynTypes.java @@ -27,9 +27,9 @@ import org.apache.brooklyn.api.policy.Policy; import org.apache.brooklyn.api.sensor.Enricher; import org.apache.brooklyn.api.sensor.Sensor; import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.enricher.EnricherDynamicType; import org.apache.brooklyn.core.entity.EntityDynamicType; import org.apache.brooklyn.core.policy.PolicyDynamicType; -import org.apache.brooklyn.sensor.enricher.EnricherDynamicType; import org.apache.brooklyn.util.exceptions.Exceptions; import com.google.common.collect.Maps; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalPolicyFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalPolicyFactory.java b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalPolicyFactory.java index 4e45580..aaee778 100644 --- a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalPolicyFactory.java +++ b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalPolicyFactory.java @@ -27,10 +27,10 @@ import org.apache.brooklyn.api.sensor.Enricher; import org.apache.brooklyn.api.sensor.EnricherSpec; import org.apache.brooklyn.api.sensor.Feed; import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.enricher.AbstractEnricher; import org.apache.brooklyn.core.entity.AbstractEntity; import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal; import org.apache.brooklyn.core.policy.AbstractPolicy; -import org.apache.brooklyn.sensor.enricher.AbstractEnricher; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.exceptions.Exceptions; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/sensor/StaticSensor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/StaticSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/StaticSensor.java index 4a7b1d4..b017315 100644 --- a/core/src/main/java/org/apache/brooklyn/core/sensor/StaticSensor.java +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/StaticSensor.java @@ -23,7 +23,7 @@ import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.effector.AddSensor; -import org.apache.brooklyn.sensor.enricher.Propagator; +import org.apache.brooklyn.enricher.stock.Propagator; import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.core.task.ValueResolver; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractAggregatingEnricher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractAggregatingEnricher.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractAggregatingEnricher.java new file mode 100644 index 0000000..2d25a75 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractAggregatingEnricher.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.enricher.stock; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.entity.Group; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.api.sensor.SensorEventListener; +import org.apache.brooklyn.core.enricher.AbstractEnricher; +import org.apache.brooklyn.core.entity.trait.Changeable; +import org.apache.brooklyn.util.groovy.GroovyJavaMethods; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; + + +/** + * AggregatingEnrichers implicitly subscribes to the same sensor<S> on all entities inside an + * {@link Group} and should emit an aggregate<T> on the target sensor + * + * @deprecated since 0.7.0; use {@link Enrichers.builder()} + * @see Aggregator if need to sub-class + */ +public abstract class AbstractAggregatingEnricher<S,T> extends AbstractEnricher implements SensorEventListener<S> { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractAggregatingEnricher.class); + + AttributeSensor<? extends S> source; + protected AttributeSensor<T> target; + protected S defaultValue; + + Set<Entity> producers; + List<Entity> hardCodedProducers; + boolean allMembers; + Predicate<Entity> filter; + + /** + * Users of values should either on it synchronize when iterating over its entries or use + * copyOfValues to obtain an immutable copy of the map. + */ + // We use a synchronizedMap over a ConcurrentHashMap for entities that store null values. + protected final Map<Entity, S> values = Collections.synchronizedMap(new LinkedHashMap<Entity, S>()); + + public AbstractAggregatingEnricher(Map<String,?> flags, AttributeSensor<? extends S> source, AttributeSensor<T> target) { + this(flags, source, target, null); + } + + @SuppressWarnings("unchecked") + public AbstractAggregatingEnricher(Map<String,?> flags, AttributeSensor<? extends S> source, AttributeSensor<T> target, S defaultValue) { + super(flags); + this.source = source; + this.target = target; + this.defaultValue = defaultValue; + hardCodedProducers = (List<Entity>) (flags.containsKey("producers") ? flags.get("producers") : Collections.emptyList()); + allMembers = (Boolean) (flags.containsKey("allMembers") ? flags.get("allMembers") : false); + filter = flags.containsKey("filter") ? GroovyJavaMethods.<Entity>castToPredicate(flags.get("filter")) : Predicates.<Entity>alwaysTrue(); + } + + public void addProducer(Entity producer) { + if (LOG.isDebugEnabled()) LOG.debug("{} linked ({}, {}) to {}", new Object[] {this, producer, source, target}); + subscribe(producer, source, this); + synchronized (values) { + S vo = values.get(producer); + if (vo==null) { + S initialVal = ((EntityLocal)producer).getAttribute(source); + values.put(producer, initialVal != null ? initialVal : defaultValue); + //we might skip in onEvent in the short window while !values.containsKey(producer) + //but that's okay because the put which would have been done there is done here now + } else { + //vo will be null unless some weird race with addProducer+removeProducer is occuring + //(and that's something we can tolerate i think) + if (LOG.isDebugEnabled()) LOG.debug("{} already had value ({}) for producer ({}); but that producer has just been added", new Object[] {this, vo, producer}); + } + } + onUpdated(); + } + + // TODO If producer removed but then get (queued) event from it after this method returns, + public S removeProducer(Entity producer) { + if (LOG.isDebugEnabled()) LOG.debug("{} unlinked ({}, {}) from {}", new Object[] {this, producer, source, target}); + unsubscribe(producer); + S removed = values.remove(producer); + onUpdated(); + return removed; + } + + @Override + public void onEvent(SensorEvent<S> event) { + Entity e = event.getSource(); + synchronized (values) { + if (values.containsKey(e)) { + values.put(e, event.getValue()); + } else { + if (LOG.isDebugEnabled()) LOG.debug("{} received event for unknown producer ({}); presumably that producer has recently been removed", this, e); + } + } + onUpdated(); + } + + /** + * Called whenever the values for the set of producers changes (e.g. on an event, or on a member added/removed). + * Defaults to no-op + */ + // TODO should this be abstract? + protected void onUpdated() { + // no-op + } + + @Override + public void setEntity(EntityLocal entity) { + super.setEntity(entity); + + for (Entity producer : hardCodedProducers) { + if (filter.apply(producer)) { + addProducer(producer); + } + } + + if (allMembers) { + subscribe(entity, Changeable.MEMBER_ADDED, new SensorEventListener<Entity>() { + @Override public void onEvent(SensorEvent<Entity> it) { + if (filter.apply(it.getValue())) addProducer(it.getValue()); + } + }); + subscribe(entity, Changeable.MEMBER_REMOVED, new SensorEventListener<Entity>() { + @Override public void onEvent(SensorEvent<Entity> it) { + removeProducer(it.getValue()); + } + }); + + if (entity instanceof Group) { + for (Entity member : ((Group)entity).getMembers()) { + if (filter.apply(member)) { + addProducer(member); + } + } + } + } + } + + protected Map<Entity, S> copyOfValues() { + synchronized (values) { + return ImmutableMap.copyOf(values); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractAggregator.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractAggregator.java new file mode 100644 index 0000000..926b769 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractAggregator.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.enricher.stock; + +import static com.google.common.base.Preconditions.checkState; + +import java.util.Set; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.entity.Group; +import org.apache.brooklyn.api.sensor.Sensor; +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.api.sensor.SensorEventListener; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.enricher.AbstractEnricher; +import org.apache.brooklyn.core.entity.AbstractEntity; +import org.apache.brooklyn.core.entity.trait.Changeable; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.guava.Maybe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; +import com.google.common.reflect.TypeToken; + +/** Abstract superclass for enrichers which aggregate from children and/or members */ +@SuppressWarnings("serial") +public abstract class AbstractAggregator<T,U> extends AbstractEnricher implements SensorEventListener<T> { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractAggregator.class); + + public static final ConfigKey<Entity> PRODUCER = ConfigKeys.newConfigKey(Entity.class, "enricher.producer", "The entity whose children/members will be aggregated"); + + public static final ConfigKey<Sensor<?>> TARGET_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.targetSensor"); + + // FIXME this is not just for "members" i think -Alex + public static final ConfigKey<?> DEFAULT_MEMBER_VALUE = ConfigKeys.newConfigKey(Object.class, "enricher.defaultMemberValue"); + + public static final ConfigKey<Set<? extends Entity>> FROM_HARDCODED_PRODUCERS = ConfigKeys.newConfigKey(new TypeToken<Set<? extends Entity>>() {}, "enricher.aggregating.fromHardcodedProducers"); + + public static final ConfigKey<Boolean> FROM_MEMBERS = ConfigKeys.newBooleanConfigKey("enricher.aggregating.fromMembers", + "Whether this enricher looks at members; only supported if a Group producer is supplier; defaults to true for Group entities"); + + public static final ConfigKey<Boolean> FROM_CHILDREN = ConfigKeys.newBooleanConfigKey("enricher.aggregating.fromChildren", + "Whether this enricher looks at children; this is the default for non-Group producers"); + + public static final ConfigKey<Predicate<? super Entity>> ENTITY_FILTER = ConfigKeys.newConfigKey(new TypeToken<Predicate<? super Entity>>() {}, "enricher.aggregating.entityFilter"); + + public static final ConfigKey<Predicate<?>> VALUE_FILTER = ConfigKeys.newConfigKey(new TypeToken<Predicate<?>>() {}, "enricher.aggregating.valueFilter"); + + protected Entity producer; + protected Sensor<U> targetSensor; + protected T defaultMemberValue; + protected Set<? extends Entity> fromHardcodedProducers; + protected Boolean fromMembers; + protected Boolean fromChildren; + protected Predicate<? super Entity> entityFilter; + protected Predicate<? super T> valueFilter; + + public AbstractAggregator() {} + + @Override + public void setEntity(EntityLocal entity) { + super.setEntity(entity); + setEntityLoadingConfig(); + + if (fromHardcodedProducers == null && producer == null) producer = entity; + checkState(fromHardcodedProducers != null ^ producer != null, "must specify one of %s (%s) or %s (%s)", + PRODUCER.getName(), producer, FROM_HARDCODED_PRODUCERS.getName(), fromHardcodedProducers); + + if (fromHardcodedProducers != null) { + for (Entity producer : Iterables.filter(fromHardcodedProducers, entityFilter)) { + addProducerHardcoded(producer); + } + } + + if (isAggregatingMembers()) { + setEntityBeforeSubscribingProducerMemberEvents(entity); + setEntitySubscribeProducerMemberEvents(); + setEntityAfterSubscribingProducerMemberEvents(); + } + + if (isAggregatingChildren()) { + setEntityBeforeSubscribingProducerChildrenEvents(); + setEntitySubscribingProducerChildrenEvents(); + setEntityAfterSubscribingProducerChildrenEvents(); + } + + onUpdated(); + } + + @SuppressWarnings({ "unchecked" }) + protected void setEntityLoadingConfig() { + this.producer = getConfig(PRODUCER); + this.fromHardcodedProducers= getConfig(FROM_HARDCODED_PRODUCERS); + this.defaultMemberValue = (T) getConfig(DEFAULT_MEMBER_VALUE); + this.fromMembers = Maybe.fromNullable(getConfig(FROM_MEMBERS)).or(fromMembers); + this.fromChildren = Maybe.fromNullable(getConfig(FROM_CHILDREN)).or(fromChildren); + this.entityFilter = (Predicate<? super Entity>) (getConfig(ENTITY_FILTER) == null ? Predicates.alwaysTrue() : getConfig(ENTITY_FILTER)); + this.valueFilter = (Predicate<? super T>) (getConfig(VALUE_FILTER) == null ? getDefaultValueFilter() : getConfig(VALUE_FILTER)); + + setEntityLoadingTargetConfig(); + } + + protected Predicate<?> getDefaultValueFilter() { + return Predicates.alwaysTrue(); + } + + @SuppressWarnings({ "unchecked" }) + protected void setEntityLoadingTargetConfig() { + this.targetSensor = (Sensor<U>) getRequiredConfig(TARGET_SENSOR); + } + + protected void setEntityBeforeSubscribingProducerMemberEvents(EntityLocal entity) { + checkState(producer instanceof Group, "Producer must be a group when fromMembers true: producer=%s; entity=%s; " + + "hardcodedProducers=%s", getConfig(PRODUCER), entity, fromHardcodedProducers); + } + + protected void setEntitySubscribeProducerMemberEvents() { + subscribe(producer, Changeable.MEMBER_ADDED, new SensorEventListener<Entity>() { + @Override public void onEvent(SensorEvent<Entity> event) { + if (entityFilter.apply(event.getValue())) { + addProducerMember(event.getValue()); + onUpdated(); + } + } + }); + subscribe(producer, Changeable.MEMBER_REMOVED, new SensorEventListener<Entity>() { + @Override public void onEvent(SensorEvent<Entity> event) { + removeProducer(event.getValue()); + onUpdated(); + } + }); + } + + protected void setEntityAfterSubscribingProducerMemberEvents() { + if (producer instanceof Group) { + for (Entity member : Iterables.filter(((Group)producer).getMembers(), entityFilter)) { + addProducerMember(member); + } + } + } + + protected void setEntityBeforeSubscribingProducerChildrenEvents() { + } + + protected void setEntitySubscribingProducerChildrenEvents() { + subscribe(producer, AbstractEntity.CHILD_REMOVED, new SensorEventListener<Entity>() { + @Override public void onEvent(SensorEvent<Entity> event) { + removeProducer(event.getValue()); + onUpdated(); + } + }); + subscribe(producer, AbstractEntity.CHILD_ADDED, new SensorEventListener<Entity>() { + @Override public void onEvent(SensorEvent<Entity> event) { + if (entityFilter.apply(event.getValue())) { + addProducerChild(event.getValue()); + onUpdated(); + } + } + }); + } + + protected void setEntityAfterSubscribingProducerChildrenEvents() { + for (Entity child : Iterables.filter(producer.getChildren(), entityFilter)) { + addProducerChild(child); + } + } + + /** true if this should aggregate members */ + protected boolean isAggregatingMembers() { + if (Boolean.TRUE.equals(fromMembers)) return true; + if (Boolean.TRUE.equals(fromChildren)) return false; + if (fromHardcodedProducers!=null) return false; + if (producer instanceof Group) return true; + return false; + } + + /** true if this should aggregate members */ + protected boolean isAggregatingChildren() { + if (Boolean.TRUE.equals(fromChildren)) return true; + if (Boolean.TRUE.equals(fromMembers)) return false; + if (fromHardcodedProducers!=null) return false; + if (producer instanceof Group) return false; + return true; + } + + protected abstract void addProducerHardcoded(Entity producer); + protected abstract void addProducerMember(Entity producer); + protected abstract void addProducerChild(Entity producer); + + // TODO If producer removed but then get (queued) event from it after this method returns, + protected void removeProducer(Entity producer) { + if (LOG.isDebugEnabled()) LOG.debug("{} stopped listening to {}", new Object[] {this, producer }); + unsubscribe(producer); + onProducerRemoved(producer); + } + + protected abstract void onProducerAdded(Entity producer); + + protected abstract void onProducerRemoved(Entity producer); + + + /** + * Called whenever the values for the set of producers changes (e.g. on an event, or on a member added/removed). + */ + protected void onUpdated() { + try { + emit(targetSensor, compute()); + } catch (Throwable t) { + LOG.warn("Error calculating and setting aggregate for enricher "+this, t); + throw Exceptions.propagate(t); + } + } + + protected abstract Object compute(); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractMultipleSensorAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractMultipleSensorAggregator.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractMultipleSensorAggregator.java new file mode 100644 index 0000000..1d76168 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractMultipleSensorAggregator.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.enricher.stock; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.Sensor; +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.api.sensor.SensorEventListener; +import org.apache.brooklyn.core.BrooklynLogging; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.flags.TypeCoercions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; + +/** Building on {@link AbstractAggregator} for a single source sensor (on multiple children and/or members) */ +public abstract class AbstractMultipleSensorAggregator<U> extends AbstractAggregator<Object,U> implements SensorEventListener<Object> { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractMultipleSensorAggregator.class); + + + /** access via {@link #getValues(Sensor)} */ + private final Map<String, Map<Entity,Object>> values = Collections.synchronizedMap(new LinkedHashMap<String, Map<Entity,Object>>()); + + public AbstractMultipleSensorAggregator() {} + + protected abstract Collection<Sensor<?>> getSourceSensors(); + + @Override + protected void setEntityLoadingConfig() { + super.setEntityLoadingConfig(); + Preconditions.checkNotNull(getSourceSensors(), "sourceSensors must be set"); + } + + @Override + protected void setEntityBeforeSubscribingProducerChildrenEvents() { + BrooklynLogging.log(LOG, BrooklynLogging.levelDebugOrTraceIfReadOnly(producer), + "{} subscribing to children of {}", this, producer); + for (Sensor<?> sourceSensor: getSourceSensors()) { + subscribeToChildren(producer, sourceSensor, this); + } + } + + @Override + protected void addProducerHardcoded(Entity producer) { + for (Sensor<?> sourceSensor: getSourceSensors()) { + subscribe(producer, sourceSensor, this); + } + onProducerAdded(producer); + } + + @Override + protected void addProducerChild(Entity producer) { + // no `subscribe` call needed here, due to previous subscribeToChildren call + onProducerAdded(producer); + } + + @Override + protected void addProducerMember(Entity producer) { + addProducerHardcoded(producer); + } + + @Override + protected void onProducerAdded(Entity producer) { + BrooklynLogging.log(LOG, BrooklynLogging.levelDebugOrTraceIfReadOnly(producer), + "{} listening to {}", this, producer); + synchronized (values) { + for (Sensor<?> sensor: getSourceSensors()) { + Map<Entity,Object> vs = values.get(sensor.getName()); + if (vs==null) { + vs = new LinkedHashMap<Entity,Object>(); + values.put(sensor.getName(), vs); + } + + Object vo = vs.get(producer); + if (vo==null) { + Object initialVal; + if (sensor instanceof AttributeSensor) { + initialVal = producer.getAttribute((AttributeSensor<?>)sensor); + } else { + initialVal = null; + } + vs.put(producer, initialVal != null ? initialVal : defaultMemberValue); + // NB: see notes on possible race, in Aggregator#onProducerAdded + } + + } + } + } + + @Override + protected void onProducerRemoved(Entity producer) { + synchronized (values) { + for (Sensor<?> sensor: getSourceSensors()) { + Map<Entity,Object> vs = values.get(sensor.getName()); + if (vs!=null) + vs.remove(producer); + } + } + onUpdated(); + } + + @Override + public void onEvent(SensorEvent<Object> event) { + Entity e = event.getSource(); + synchronized (values) { + Map<Entity,Object> vs = values.get(event.getSensor().getName()); + if (vs==null) { + LOG.debug(this+" received event when no entry for sensor ("+event+"); likely just added or removed, and will initialize subsequently if needed"); + } else { + vs.put(e, event.getValue()); + } + } + onUpdated(); + } + + public <T> Map<Entity,T> getValues(Sensor<T> sensor) { + Map<Entity, T> valuesCopy = copyValues(sensor); + return coerceValues(valuesCopy, sensor.getType()); + } + + private <T> Map<Entity, T> coerceValues(Map<Entity, T> values, Class<? super T> type) { + Map<Entity, T> typedValues = MutableMap.of(); + for (Entry<Entity, T> entry : values.entrySet()) { + @SuppressWarnings("unchecked") + T typedValue = (T) TypeCoercions.coerce(entry.getValue(), type); + typedValues.put(entry.getKey(), typedValue); + } + return typedValues; + } + + private <T> Map<Entity, T> copyValues(Sensor<T> sensor) { + synchronized (values) { + @SuppressWarnings("unchecked") + Map<Entity, T> sv = (Map<Entity, T>) values.get(sensor.getName()); + //use MutableMap because of potentially null values + return MutableMap.copyOf(sv).asUnmodifiable(); + } + } + + @Override + protected abstract Object compute(); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTransformer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTransformer.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTransformer.java new file mode 100644 index 0000000..ab41c1a --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTransformer.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.enricher.stock; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.Sensor; +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.api.sensor.SensorEventListener; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.enricher.AbstractEnricher; +import org.apache.brooklyn.core.sensor.BasicSensorEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.reflect.TypeToken; + +@SuppressWarnings("serial") +public abstract class AbstractTransformer<T,U> extends AbstractEnricher implements SensorEventListener<T> { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractTransformer.class); + + public static ConfigKey<Entity> PRODUCER = ConfigKeys.newConfigKey(Entity.class, "enricher.producer"); + + public static ConfigKey<Sensor<?>> SOURCE_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.sourceSensor"); + + public static ConfigKey<Sensor<?>> TARGET_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.targetSensor"); + + protected Entity producer; + protected Sensor<T> sourceSensor; + protected Sensor<U> targetSensor; + + public AbstractTransformer() { + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public void setEntity(EntityLocal entity) { + super.setEntity(entity); + + Function<SensorEvent<T>, U> transformation = getTransformation(); + this.producer = getConfig(PRODUCER) == null ? entity: getConfig(PRODUCER); + this.sourceSensor = (Sensor<T>) getRequiredConfig(SOURCE_SENSOR); + Sensor<?> targetSensorSpecified = getConfig(TARGET_SENSOR); + this.targetSensor = targetSensorSpecified!=null ? (Sensor<U>) targetSensorSpecified : (Sensor<U>) this.sourceSensor; + if (producer.equals(entity) && targetSensorSpecified==null) { + LOG.error("Refusing to add an enricher which reads and publishes on the same sensor: "+ + producer+"."+sourceSensor+" (computing "+transformation+")"); + // we don't throw because this error may manifest itself after a lengthy deployment, + // and failing it at that point simply because of an enricher is not very pleasant + // (at least not until we have good re-run support across the board) + return; + } + + subscribe(producer, sourceSensor, this); + + if (sourceSensor instanceof AttributeSensor) { + Object value = producer.getAttribute((AttributeSensor<?>)sourceSensor); + // TODO would be useful to have a convenience to "subscribeAndThenIfItIsAlreadySetRunItOnce" + if (value!=null) { + onEvent(new BasicSensorEvent(sourceSensor, producer, value, -1)); + } + } + } + + /** returns a function for transformation, for immediate use only (not for caching, as it may change) */ + protected abstract Function<SensorEvent<T>, U> getTransformation(); + + @Override + public void onEvent(SensorEvent<T> event) { + emit(targetSensor, compute(event)); + } + + protected Object compute(SensorEvent<T> event) { + // transformation is not going to change, but this design makes it easier to support changing config in future. + // if it's an efficiency hole we can switch to populate the transformation at start. + U result = getTransformation().apply(event); + if (LOG.isTraceEnabled()) + LOG.trace("Enricher "+this+" computed "+result+" from "+event); + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTransformingEnricher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTransformingEnricher.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTransformingEnricher.java new file mode 100644 index 0000000..a29cc7b --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTransformingEnricher.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.enricher.stock; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.sensor.Sensor; + +/** + * Convenience base for transforming a single sensor into a single new sensor of the same type + * + * @deprecated since 0.7.0; use {@link Enrichers.builder()} + */ +public abstract class AbstractTransformingEnricher<T> extends AbstractTypeTransformingEnricher<T,T> { + + public AbstractTransformingEnricher() { // for rebinding + } + + public AbstractTransformingEnricher(Entity producer, Sensor<T> source, Sensor<T> target) { + super(producer, source, target); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTypeTransformingEnricher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTypeTransformingEnricher.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTypeTransformingEnricher.java new file mode 100644 index 0000000..1469829 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTypeTransformingEnricher.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.enricher.stock; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.Sensor; +import org.apache.brooklyn.api.sensor.SensorEventListener; +import org.apache.brooklyn.core.enricher.AbstractEnricher; +import org.apache.brooklyn.core.sensor.BasicSensorEvent; +import org.apache.brooklyn.util.core.flags.SetFromFlag; + +/** + * Convenience base for transforming a single sensor into a single new sensor of the same type + * + * @deprecated since 0.7.0; use {@link Enrichers.builder()} + */ +public abstract class AbstractTypeTransformingEnricher<T,U> extends AbstractEnricher implements SensorEventListener<T> { + + @SetFromFlag + private Entity producer; + + @SetFromFlag + private Sensor<T> source; + + @SetFromFlag + protected Sensor<U> target; + + public AbstractTypeTransformingEnricher() { // for rebind + } + + public AbstractTypeTransformingEnricher(Entity producer, Sensor<T> source, Sensor<U> target) { + this.producer = producer; + this.source = source; + this.target = target; + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public void setEntity(EntityLocal entity) { + super.setEntity(entity); + if (producer==null) producer = entity; + subscribe(producer, source, this); + + if (source instanceof AttributeSensor) { + Object value = producer.getAttribute((AttributeSensor)source); + // TODO Aled didn't you write a convenience to "subscribeAndRunIfSet" ? (-Alex) + if (value!=null) + onEvent(new BasicSensorEvent(source, producer, value, -1)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/AddingEnricher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/AddingEnricher.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/AddingEnricher.java new file mode 100644 index 0000000..941d745 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/AddingEnricher.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.enricher.stock; + +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.Sensor; +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.api.sensor.SensorEventListener; +import org.apache.brooklyn.core.enricher.AbstractEnricher; +import org.apache.brooklyn.core.sensor.BasicSensorEvent; + +/** + * enricher which adds multiple sensors on an entity to produce a new sensor + * + * Instead, consider calling: + * <pre> + * {@code + * addEnricher(Enrichers.builder() + * .combining(sources) + * .publishing(target) + * .computeSum() + * .build()); + * } + * </pre> + * <p> + * + * @deprecated since 0.7.0; use {@link Enrichers.builder()} + * @see Combiner if need to sub-class + */ +public class AddingEnricher extends AbstractEnricher implements SensorEventListener { + + private Sensor[] sources; + private Sensor<? extends Number> target; + + public AddingEnricher(Sensor sources[], Sensor<? extends Number> target) { + this.sources = sources; + this.target = target; + } + + public void setEntity(EntityLocal entity) { + super.setEntity(entity); + + for (Sensor source: sources) { + subscribe(entity, source, this); + if (source instanceof AttributeSensor) { + Object value = entity.getAttribute((AttributeSensor)source); + if (value!=null) + onEvent(new BasicSensorEvent(source, entity, value, -1)); + } + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public void onEvent(SensorEvent event) { + Number value = recompute(); + Number typedValue = cast(value, (Class<? extends Number>)target.getType()); + if (target instanceof AttributeSensor) { + entity.setAttribute((AttributeSensor)target, typedValue); + } else if (typedValue!=null) + entity.emit((Sensor)target, typedValue); + } + + @SuppressWarnings("unchecked") + public static <V> V cast(Number value, Class<V> type) { + if (value==null) return null; + if (type.isInstance(value)) return (V)value; + + if (type==Integer.class) return (V) (Integer) (int)Math.round(value.doubleValue()); + if (type==Long.class) return (V) (Long) Math.round(value.doubleValue()); + if (type==Double.class) return (V) (Double) value.doubleValue(); + if (type==Float.class) return (V) (Float) value.floatValue(); + if (type==Byte.class) return (V) (Byte) (byte)Math.round(value.doubleValue()); + if (type==Short.class) return (V) (Short) (short)Math.round(value.doubleValue()); + + throw new UnsupportedOperationException("conversion of mathematical operation to "+type+" not supported"); + } + + protected Number recompute() { + if (sources.length==0) return null; + Double result = 0d; + for (Sensor source: sources) { + Object value = entity.getAttribute((AttributeSensor) source); + if (value==null) return null; + result += ((Number)value).doubleValue(); + } + return result; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/Aggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Aggregator.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Aggregator.java new file mode 100644 index 0000000..e42d2cb --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Aggregator.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.enricher.stock; + +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.Sensor; +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.api.sensor.SensorEventListener; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.BrooklynLogging; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.enricher.stock.Enrichers.ComputingAverage; +import org.apache.brooklyn.util.collections.MutableList; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.flags.SetFromFlag; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.text.StringPredicates; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; +import com.google.common.reflect.TypeToken; + +/** Building on {@link AbstractAggregator} for a single source sensor (on multiple children and/or members) */ +@SuppressWarnings("serial") +//@Catalog(name="Aggregator", description="Aggregates attributes from multiple entities into a single attribute value; see Enrichers.builder().aggregating(...)") +public class Aggregator<T,U> extends AbstractAggregator<T,U> implements SensorEventListener<T> { + + private static final Logger LOG = LoggerFactory.getLogger(Aggregator.class); + + public static final ConfigKey<Sensor<?>> SOURCE_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.sourceSensor"); + + @SetFromFlag("transformation") + public static final ConfigKey<Object> TRANSFORMATION_UNTYPED = ConfigKeys.newConfigKey(Object.class, "enricher.transformation.untyped", + "Specifies a transformation, as a function from a collection to the value, or as a string matching a pre-defined named transformation, " + + "such as 'average' (for numbers), 'sum' (for numbers), or 'list' (the default, putting any collection of items into a list)"); + public static final ConfigKey<Function<? super Collection<?>, ?>> TRANSFORMATION = ConfigKeys.newConfigKey(new TypeToken<Function<? super Collection<?>, ?>>() {}, "enricher.transformation"); + + public static final ConfigKey<Boolean> EXCLUDE_BLANK = ConfigKeys.newBooleanConfigKey("enricher.aggregator.excludeBlank", "Whether explicit nulls or blank strings should be excluded (default false); this only applies if no value filter set", false); + + protected Sensor<T> sourceSensor; + protected Function<? super Collection<T>, ? extends U> transformation; + + /** + * Users of values should either on it synchronize when iterating over its entries or use + * copyOfValues to obtain an immutable copy of the map. + */ + // We use a synchronizedMap over a ConcurrentHashMap for entities that store null values. + protected final Map<Entity, T> values = Collections.synchronizedMap(new LinkedHashMap<Entity, T>()); + + public Aggregator() {} + + @SuppressWarnings("unchecked") + protected void setEntityLoadingConfig() { + super.setEntityLoadingConfig(); + this.sourceSensor = (Sensor<T>) getRequiredConfig(SOURCE_SENSOR); + + this.transformation = (Function<? super Collection<T>, ? extends U>) config().get(TRANSFORMATION); + + Object t1 = config().get(TRANSFORMATION_UNTYPED); + Function<? super Collection<?>, ?> t2 = null; + if (t1 instanceof String) { + t2 = lookupTransformation((String)t1); + if (t2==null) { + LOG.warn("Unknown transformation '"+t1+"' for "+this+"; will use default transformation"); + } + } + + if (this.transformation==null) { + this.transformation = (Function<? super Collection<T>, ? extends U>) t2; + } else if (t1!=null && !Objects.equals(t2, this.transformation)) { + throw new IllegalStateException("Cannot supply both "+TRANSFORMATION_UNTYPED+" and "+TRANSFORMATION+" unless they are equal."); + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + protected Function<? super Collection<?>, ?> lookupTransformation(String t1) { + if ("average".equalsIgnoreCase(t1)) return new Enrichers.ComputingAverage(null, null, targetSensor.getTypeToken()); + if ("sum".equalsIgnoreCase(t1)) return new Enrichers.ComputingAverage(null, null, targetSensor.getTypeToken()); + if ("list".equalsIgnoreCase(t1)) return new ComputingList(); + return null; + } + + private class ComputingList<TT> implements Function<Collection<TT>, List<TT>> { + @Override + public List<TT> apply(Collection<TT> input) { + if (input==null) return null; + return MutableList.copyOf(input).asUnmodifiable(); + } + + } + + @Override + protected void setEntityBeforeSubscribingProducerChildrenEvents() { + BrooklynLogging.log(LOG, BrooklynLogging.levelDebugOrTraceIfReadOnly(producer), + "{} subscribing to children of {}", this, producer); + subscribeToChildren(producer, sourceSensor, this); + } + + @Override + protected void addProducerHardcoded(Entity producer) { + subscribe(producer, sourceSensor, this); + onProducerAdded(producer); + } + + @Override + protected void addProducerChild(Entity producer) { + // no subscription needed here, due to the subscribeToChildren call + onProducerAdded(producer); + } + + @Override + protected void addProducerMember(Entity producer) { + subscribe(producer, sourceSensor, this); + onProducerAdded(producer); + } + + @Override + protected void onProducerAdded(Entity producer) { + BrooklynLogging.log(LOG, BrooklynLogging.levelDebugOrTraceIfReadOnly(producer), + "{} listening to {}", this, producer); + synchronized (values) { + T vo = values.get(producer); + if (vo==null) { + T initialVal; + if (sourceSensor instanceof AttributeSensor) { + initialVal = producer.getAttribute((AttributeSensor<T>)sourceSensor); + } else { + initialVal = null; + } + values.put(producer, initialVal != null ? initialVal : defaultMemberValue); + //we might skip in onEvent in the short window while !values.containsKey(producer) + //but that's okay because the put which would have been done there is done here now + } else { + //vo will be null unless some weird race with addProducer+removeProducer is occuring + //(and that's something we can tolerate i think) + if (LOG.isDebugEnabled()) LOG.debug("{} already had value ({}) for producer ({}); but that producer has just been added", new Object[] {this, vo, producer}); + } + } + } + + @Override + protected Predicate<?> getDefaultValueFilter() { + if (getConfig(EXCLUDE_BLANK)) + return StringPredicates.isNonBlank(); + else + return Predicates.alwaysTrue(); + } + + @Override + protected void onProducerRemoved(Entity producer) { + values.remove(producer); + onUpdated(); + } + + @Override + public void onEvent(SensorEvent<T> event) { + Entity e = event.getSource(); + synchronized (values) { + if (values.containsKey(e)) { + values.put(e, event.getValue()); + } else { + if (LOG.isDebugEnabled()) LOG.debug("{} received event for unknown producer ({}); presumably that producer has recently been removed", this, e); + } + } + onUpdated(); + } + + protected void onUpdated() { + try { + emit(targetSensor, compute()); + } catch (Throwable t) { + LOG.warn("Error calculating and setting aggregate for enricher "+this, t); + throw Exceptions.propagate(t); + } + } + + @Override + protected Object compute() { + synchronized (values) { + // TODO Could avoid copying when filter not needed + List<T> vs = MutableList.copyOf(Iterables.filter(values.values(), valueFilter)); + if (transformation==null) return vs; + return transformation.apply(vs); + } + } + + protected Map<Entity, T> copyOfValues() { + // Don't use ImmutableMap, as can contain null values + synchronized (values) { + return Collections.unmodifiableMap(MutableMap.copyOf(values)); + } + } + +}
