http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/Combiner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Combiner.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Combiner.java new file mode 100644 index 0000000..6819a33 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Combiner.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.enricher.stock; + +import static com.google.common.base.Preconditions.checkState; + +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.Sensor; +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.api.sensor.SensorEventListener; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.enricher.AbstractEnricher; +import org.apache.brooklyn.core.sensor.BasicSensorEvent; +import org.apache.brooklyn.util.collections.MutableList; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; +import com.google.common.reflect.TypeToken; + +@SuppressWarnings("serial") +//@Catalog(name="Combiner", description="Combines attributes; see Enrichers.builder().combining(...)") +public class Combiner<T,U> extends AbstractEnricher implements SensorEventListener<T> { + + private static final Logger LOG = LoggerFactory.getLogger(Combiner.class); + + public static ConfigKey<Function<?, ?>> TRANSFORMATION = ConfigKeys.newConfigKey(new TypeToken<Function<?, ?>>() {}, "enricher.transformation"); + + public static ConfigKey<Entity> PRODUCER = ConfigKeys.newConfigKey(Entity.class, "enricher.producer"); + + public static ConfigKey<Set<Sensor<?>>> SOURCE_SENSORS = ConfigKeys.newConfigKey(new TypeToken<Set<Sensor<?>>>() {}, "enricher.sourceSensors"); + + public static ConfigKey<Sensor<?>> TARGET_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.targetSensor"); + + public static final ConfigKey<Predicate<?>> VALUE_FILTER = ConfigKeys.newConfigKey(new TypeToken<Predicate<?>>() {}, "enricher.aggregating.valueFilter"); + + protected Function<? super Collection<T>, ? extends U> transformation; + protected Entity producer; + protected Set<Sensor<T>> sourceSensors; + protected Sensor<U> targetSensor; + protected Predicate<? super T> valueFilter; + + /** + * Users of values should either on it synchronize when iterating over its entries or use + * copyOfValues to obtain an immutable copy of the map. + */ + // We use a synchronizedMap over a ConcurrentHashMap for entities that store null values. + protected final Map<Sensor<T>, T> values = Collections.synchronizedMap(new LinkedHashMap<Sensor<T>, T>()); + + public Combiner() { + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public void setEntity(EntityLocal entity) { + super.setEntity(entity); + this.transformation = (Function<? super Collection<T>, ? extends U>) getRequiredConfig(TRANSFORMATION); + this.producer = getConfig(PRODUCER) == null ? entity: getConfig(PRODUCER); + this.sourceSensors = (Set) getRequiredConfig(SOURCE_SENSORS); + this.targetSensor = (Sensor<U>) getRequiredConfig(TARGET_SENSOR); + this.valueFilter = (Predicate<? super T>) (getConfig(VALUE_FILTER) == null ? Predicates.alwaysTrue() : getConfig(VALUE_FILTER)); + + checkState(sourceSensors.size() > 0, "must specify at least one sourceSensor"); + + for (Sensor<T> sourceSensor : sourceSensors) { + subscribe(producer, sourceSensor, this); + } + + for (Sensor<T> sourceSensor : sourceSensors) { + if (sourceSensor instanceof AttributeSensor) { + Object value = producer.getAttribute((AttributeSensor<?>)sourceSensor); + // TODO Aled didn't you write a convenience to "subscribeAndRunIfSet" ? (-Alex) + // Unfortunately not yet! + if (value != null) { + onEvent(new BasicSensorEvent(sourceSensor, producer, value, -1)); + } + } + } + } + + @Override + public void onEvent(SensorEvent<T> event) { + synchronized (values) { + values.put(event.getSensor(), event.getValue()); + } + onUpdated(); + } + + /** + * Called whenever the values for the set of producers changes (e.g. on an event, or on a member added/removed). + */ + protected void onUpdated() { + try { + emit(targetSensor, compute()); + } catch (Throwable t) { + LOG.warn("Error calculating and setting combination for enricher "+this, t); + throw Exceptions.propagate(t); + } + } + + protected Object compute() { + synchronized (values) { + // TODO Could avoid copying when filter not needed + List<T> vs = MutableList.copyOf(Iterables.filter(values.values(), valueFilter)); + return transformation.apply(vs); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/CustomAggregatingEnricher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/CustomAggregatingEnricher.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/CustomAggregatingEnricher.java new file mode 100644 index 0000000..0a03dca --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/CustomAggregatingEnricher.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.enricher.stock; + +import groovy.lang.Closure; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.SensorEventListener; +import org.apache.brooklyn.util.core.flags.TypeCoercions; +import org.apache.brooklyn.util.groovy.GroovyJavaMethods; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.base.Throwables; +import com.google.common.reflect.TypeToken; + +/** + * Subscribes to events from producers with a sensor of type T, aggregates them with the + * provided closure and emits the result on the target sensor V. + * @param <T> + * + * @deprecated since 0.7.0; use {@link Enrichers#builder()} + */ +public class CustomAggregatingEnricher<S,T> extends AbstractAggregatingEnricher<S,T> implements SensorEventListener<S> { + + private static final Logger LOG = LoggerFactory.getLogger(CustomAggregatingEnricher.class); + + protected final Function<Collection<S>, T> aggregator; + + /** + * The valid keys for the flags are: + * - producers: a collection of entities to be aggregated + * - allMembers: indicates that should track members of the entity that the aggregator is associated with, + * to aggregate across all those members. + * - filter: a Predicate or Closure, indicating which entities to include + * + * @param flags + * @param source + * @param target + * @param aggregator Aggregates a collection of values, to return a single value for the target sensor + * @param defaultIniitalValueForUnreportedSensors Default value to populate the collection given to aggregator, + * where sensors are null or not present initially, defaults to null (note however that subsequent null reports will put an explicit null) + */ + public CustomAggregatingEnricher(Map<String,?> flags, AttributeSensor<? extends S> source, AttributeSensor<T> target, + Function<Collection<S>, T> aggregator, S defaultIniitalValueForUnreportedSensors) { + super(flags, source, target, defaultIniitalValueForUnreportedSensors); + this.aggregator = aggregator; + } + + public CustomAggregatingEnricher(Map<String,?> flags, AttributeSensor<? extends S> source, AttributeSensor<T> target, + Function<Collection<S>, T> aggregator) { + this(flags, source, target, aggregator, null); + } + + public CustomAggregatingEnricher(AttributeSensor<? extends S> source, AttributeSensor<T> target, + Function<Collection<S>, T> aggregator, S defaultValue) { + this(Collections.<String,Object>emptyMap(), source, target, aggregator, defaultValue); + } + + public CustomAggregatingEnricher(AttributeSensor<? extends S> source, AttributeSensor<T> target, + Function<Collection<S>, T> aggregator) { + this(Collections.<String,Object>emptyMap(), source, target, aggregator, null); + } + + /** + * @param flags + * @param source + * @param target + * @param aggregator Should take a collection of values and return a single, aggregate value + * @param defaultValueForUnreportedSensors + * + * @see #CustomAggregatingEnricher(Map, AttributeSensor, AttributeSensor, Function, Object) + */ + @SuppressWarnings("unchecked") + public CustomAggregatingEnricher(Map<String,?> flags, AttributeSensor<? extends S> source, AttributeSensor<T> target, + Closure<?> aggregator, S defaultValueForUnreportedSensors) { + this(flags, source, target, GroovyJavaMethods.<Collection<S>, T>functionFromClosure((Closure<T>)aggregator), defaultValueForUnreportedSensors); + } + + public CustomAggregatingEnricher(Map<String,?> flags, AttributeSensor<? extends S> source, AttributeSensor<T> target, Closure<?> aggregator) { + this(flags, source, target, aggregator, null); + } + + public CustomAggregatingEnricher(AttributeSensor<S> source, AttributeSensor<T> target, Closure<?> aggregator, S defaultValueForUnreportedSensors) { + this(Collections.<String,Object>emptyMap(), source, target, aggregator, defaultValueForUnreportedSensors); + } + + public CustomAggregatingEnricher(AttributeSensor<S> source, AttributeSensor<T> target, Closure<?> aggregator) { + this(Collections.<String,Object>emptyMap(), source, target, aggregator, null); + } + + @Override + public void onUpdated() { + try { + entity.setAttribute(target, getAggregate()); + } catch (Throwable t) { + LOG.warn("Error calculating and setting aggregate for enricher "+this, t); + throw Throwables.propagate(t); + } + } + + public T getAggregate() { + synchronized (values) { + return (T) aggregator.apply(values.values()); + } + } + + /** + * Instead, consider calling: + * <pre> + * {@code + * Enrichers.Builder builder = Enrichers.builder() + * .aggregating(source) + * .publishing(target) + * .computing(GroovyJavaMethods.<Collection<S>, T>functionFromClosure((Closure<T>)aggregator)) + * .defaultValueForUnreportedSensors(defaultValueForUnreportedSensors); + * + * if (Boolean.TRUE.equals(allMembers)) builder.fromMembers(); + * if (filter != null) builder.entityFilter(filter); + * if (hardCodedProducers != null) builder.fromHardcodedProducers(hardCodedProducers); + * + * addEnricher(builder.build()); + * } + * </pre> + * + * @deprecated since 0.7.0; use {@link Enrichers#builder()} + */ + public static <S,T> CustomAggregatingEnricher<S,T> newEnricher( + Map<String,?> flags, AttributeSensor<S> source, AttributeSensor<T> target, Closure<?> aggregator, S defaultVal) { + return new CustomAggregatingEnricher<S,T>(flags, source, target, aggregator, defaultVal); + } + public static <S,T> CustomAggregatingEnricher<S,T> newEnricher( + Map<String,?> flags, AttributeSensor<S> source, AttributeSensor<T> target, Closure<?> aggregator) { + return newEnricher(flags, source, target, aggregator, null); + } + public static <S,T> CustomAggregatingEnricher<S,T> newEnricher( + AttributeSensor<S> source, AttributeSensor<T> target, Closure<?> aggregator, S defaultVal) { + return newEnricher(Collections.<String,Object>emptyMap(), source, target, aggregator, defaultVal); + } + public static <S,T> CustomAggregatingEnricher<S,T> newEnricher( + AttributeSensor<S> source, AttributeSensor<T> target, Closure<?> aggregator) { + return newEnricher(Collections.<String,Object>emptyMap(), source, target, aggregator, null); + } + + /** + * Instead, consider calling: + * <pre> + * {@code + * Enrichers.Builder builder = Enrichers.builder() + * .aggregating(source) + * .publishing(target) + * .computing(aggregator) + * .defaultValueForUnreportedSensors(defaultVal); + * + * if (Boolean.TRUE.equals(allMembers)) builder.fromMembers(); + * if (filter != null) builder.entityFilter(filter); + * if (hardCodedProducers != null) builder.fromHardcodedProducers(hardCodedProducers); + * + * addEnricher(builder.build()); + * } + * </pre> + * + * @deprecated since 0.7.0; use {@link Enrichers#builder()} + */ + public static <S,T> CustomAggregatingEnricher<S,T> newEnricher( + Map<String,?> flags, AttributeSensor<S> source, AttributeSensor<T> target, Function<Collection<S>, T> aggregator, S defaultVal) { + return new CustomAggregatingEnricher<S,T>(flags, source, target, aggregator, defaultVal); + } + public static <S,T> CustomAggregatingEnricher<S,T> newEnricher( + Map<String,?> flags, AttributeSensor<S> source, AttributeSensor<T> target, Function<Collection<S>, T> aggregator) { + return newEnricher(flags, source, target, aggregator, null); + } + public static <S,T> CustomAggregatingEnricher<S,T> newEnricher( + AttributeSensor<S> source, AttributeSensor<T> target, Function<Collection<S>, T> aggregator, S defaultVal) { + return newEnricher(Collections.<String,Object>emptyMap(), source, target, aggregator, defaultVal); + } + public static <S,T> CustomAggregatingEnricher<S,T> newEnricher( + AttributeSensor<S> source, AttributeSensor<T> target, Function<Collection<S>, T> aggregator) { + return newEnricher(Collections.<String,Object>emptyMap(), source, target, aggregator, null); + } + + /** + * creates an enricher which sums over all children/members, + * defaulting to excluding sensors which have not published anything (or published null), and null if there are no sensors; + * this behaviour can be customised, both default value for sensors, and what to report if no sensors + * + * Instead, consider calling: + * <pre> + * {@code + * Enrichers.Builder builder = Enrichers.builder() + * .aggregating(source) + * .publishing(target) + * .computingSum() + * .defaultValueForUnreportedSensors(defaultValueForUnreportedSensors) + * .valueToReportIfNoSensors(valueToReportIfNoSensors); + * + * if (Boolean.TRUE.equals(allMembers)) builder.fromMembers(); + * if (filter != null) builder.entityFilter(filter); + * if (hardCodedProducers != null) builder.fromHardcodedProducers(hardCodedProducers); + * + * addEnricher(builder.build()); + * } + * </pre> + * + * @deprecated since 0.7.0; use {@link Enrichers#builder()} + */ + public static <N extends Number, T extends Number> CustomAggregatingEnricher<N,T> newSummingEnricher( + Map<String,?> flags, AttributeSensor<N> source, final AttributeSensor<T> target, + final N defaultValueForUnreportedSensors, final T valueToReportIfNoSensors) { + + Function<Collection<N>, T> aggregator = new Function<Collection<N>, T>() { + @Override public T apply(Collection<N> vals) { + return sum(vals, defaultValueForUnreportedSensors, valueToReportIfNoSensors, target.getTypeToken()); + } + }; + return new CustomAggregatingEnricher<N,T>(flags, source, target, aggregator, defaultValueForUnreportedSensors); + } + + /** @see {@link #newSummingEnricher(Map, AttributeSensor, AttributeSensor, Number, Number)} */ + public static <N extends Number> CustomAggregatingEnricher<N,N> newSummingEnricher( + AttributeSensor<N> source, AttributeSensor<N> target) { + return newSummingEnricher(Collections.<String,Object>emptyMap(), source, target, null, null); + } + + /** creates an enricher which averages over all children/members, + * defaulting to excluding sensors which have not published anything (or published null), and null if there are no sensors; + * this behaviour can be customised, both default value for sensors, and what to report if no sensors + * + * Instead, consider calling: + * <pre> + * {@code + * Enrichers.Builder builder = Enrichers.builder() + * .aggregating(source) + * .publishing(target) + * .computingAverage() + * .defaultValueForUnreportedSensors(defaultValueForUnreportedSensors) + * .valueToReportIfNoSensors(valueToReportIfNoSensors); + * + * if (Boolean.TRUE.equals(allMembers)) builder.fromMembers(); + * if (filter != null) builder.entityFilter(filter); + * if (hardCodedProducers != null) builder.fromHardcodedProducers(hardCodedProducers); + * + * addEnricher(builder.build()); + * } + * </pre> + * + * @deprecated since 0.7.0; use {@link Enrichers#builder()} + */ + public static <N extends Number> CustomAggregatingEnricher<N,Double> newAveragingEnricher( + Map<String,?> flags, AttributeSensor<? extends N> source, final AttributeSensor<Double> target, + final N defaultValueForUnreportedSensors, final Double valueToReportIfNoSensors) { + Function<Collection<N>, Double> aggregator = new Function<Collection<N>, Double>() { + @Override public Double apply(Collection<N> vals) { + int count = count(vals, defaultValueForUnreportedSensors!=null); + return (count==0) ? valueToReportIfNoSensors : + (Double) ((sum(vals, defaultValueForUnreportedSensors, 0, TypeToken.of(Double.class)) / count)); + } + }; + return new CustomAggregatingEnricher<N,Double>(flags, source, target, aggregator, defaultValueForUnreportedSensors); + } + + /** @see #newAveragingEnricher(Map, AttributeSensor, AttributeSensor, Number, Double) */ + public static <N extends Number> CustomAggregatingEnricher<N,Double> newAveragingEnricher( + AttributeSensor<N> source, AttributeSensor<Double> target) { + return newAveragingEnricher(Collections.<String,Object>emptyMap(), source, target, null, null); + } + + @SuppressWarnings("unchecked") + private static <N extends Number> N cast(Number n, TypeToken<? extends N> numberType) { + return (N) TypeCoercions.castPrimitive(n, numberType.getRawType()); + } + + private static <N extends Number> N sum(Iterable<? extends Number> vals, Number valueIfNull, Number valueIfNone, TypeToken<N> type) { + double result = 0d; + int count = 0; + if (vals!=null) { + for (Number val : vals) { + if (val!=null) { + result += val.doubleValue(); + count++; + } else if (valueIfNull!=null) { + result += valueIfNull.doubleValue(); + count++; + } + } + } + if (count==0) return cast(valueIfNone, type); + return cast(result, type); + } + + private static int count(Iterable<? extends Object> vals, boolean includeNullValues) { + int result = 0; + if (vals!=null) + for (Object val : vals) + if (val!=null || includeNullValues) result++; + return result; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java new file mode 100644 index 0000000..baed4f1 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java @@ -0,0 +1,825 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.enricher.stock; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.Enricher; +import org.apache.brooklyn.api.sensor.EnricherSpec; +import org.apache.brooklyn.api.sensor.Sensor; +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.core.enricher.AbstractEnricher; +import org.apache.brooklyn.util.collections.MutableList; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.collections.MutableSet; +import org.apache.brooklyn.util.core.flags.TypeCoercions; +import org.apache.brooklyn.util.text.StringPredicates; +import org.apache.brooklyn.util.text.Strings; + +import com.google.common.annotations.Beta; +import com.google.common.base.Function; +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.reflect.TypeToken; + +public class Enrichers { + + private Enrichers() {} + + public static InitialBuilder builder() { + return new InitialBuilder(); + } + + public abstract static class Builder<B extends Builder<B>> { + @SuppressWarnings("unchecked") + protected B self() { + return (B) this; + } + } + + public abstract static class AbstractEnricherBuilder<B extends AbstractEnricherBuilder<B>> extends Builder<B> { + final Class<? extends Enricher> enricherType; + Boolean suppressDuplicates; + String uniqueTag; + Set<Object> tags = MutableSet.of(); + + public AbstractEnricherBuilder(Class<? extends Enricher> enricherType) { + this.enricherType = enricherType; + } + + public B uniqueTag(String tag) { + uniqueTag = Preconditions.checkNotNull(tag); + return self(); + } + public B addTag(Object tag) { + tags.add(Preconditions.checkNotNull(tag)); + return self(); + } + public B suppressDuplicates(Boolean suppressDuplicates) { + this.suppressDuplicates = suppressDuplicates; + return self(); + } + + protected abstract String getDefaultUniqueTag(); + + protected EnricherSpec<? extends Enricher> build() { + EnricherSpec<? extends Enricher> spec = EnricherSpec.create(enricherType); + + String uniqueTag2 = uniqueTag; + if (uniqueTag2==null) + uniqueTag2 = getDefaultUniqueTag(); + if (uniqueTag2!=null) + spec.uniqueTag(uniqueTag2); + + if (!tags.isEmpty()) spec.tags(tags); + if (suppressDuplicates!=null) + spec.configure(AbstractEnricher.SUPPRESS_DUPLICATES, suppressDuplicates); + + return spec; + } + } + + protected abstract static class AbstractInitialBuilder<B extends AbstractInitialBuilder<B>> extends Builder<B> { + public PropagatorBuilder propagating(Map<? extends Sensor<?>, ? extends Sensor<?>> vals) { + return new PropagatorBuilder(vals); + } + public PropagatorBuilder propagating(Iterable<? extends Sensor<?>> vals) { + return new PropagatorBuilder(vals); + } + public PropagatorBuilder propagating(Sensor<?>... vals) { + return new PropagatorBuilder(vals); + } + public PropagatorBuilder propagatingAll() { + return new PropagatorBuilder(true, null); + } + public PropagatorBuilder propagatingAllButUsualAnd(Sensor<?>... vals) { + return new PropagatorBuilder(true, ImmutableSet.<Sensor<?>>builder().addAll(Propagator.SENSORS_NOT_USUALLY_PROPAGATED).add(vals).build()); + } + public PropagatorBuilder propagatingAllBut(Sensor<?>... vals) { + return new PropagatorBuilder(true, ImmutableSet.copyOf(vals)); + } + public PropagatorBuilder propagatingAllBut(Iterable<? extends Sensor<?>> vals) { + return new PropagatorBuilder(true, vals); + } + + /** + * Builds an enricher which transforms a given sensor: + * <li> applying a (required) function ({@link TransformerBuilder#computing(Function)}, or + * {@link AbstractAggregatorBuilder#computingAverage()}/ + * {@link AbstractAggregatorBuilder#computingSum()}, mandatory); + * <li> and publishing it on the entity where the enricher is attached; + * <li> optionally taking the sensor from a different source entity ({@link TransformerBuilder#from(Entity)}); + * <li> and optionally publishing it as a different sensor ({@link TransformerBuilder#publishing(AttributeSensor)}); + * <p> You must supply at least one of the optional values, of course, otherwise the enricher may loop endlessly! + */ + public <S> TransformerBuilder<S, Object> transforming(AttributeSensor<S> val) { + return new TransformerBuilder<S, Object>(val); + } + /** as {@link #transforming(AttributeSensor)} but accepting multiple sensors, with the function acting on the set of values */ + public <S> CombinerBuilder<S, Object> combining(Collection<AttributeSensor<? extends S>> vals) { + return new CombinerBuilder<S, Object>(vals); + } + /** as {@link #combining(Collection)} */ + @SafeVarargs + public final <S> CombinerBuilder<S, Object> combining(AttributeSensor<? extends S>... vals) { + return new CombinerBuilder<S, Object>(vals); + } + /** as {@link #combining(Collection)} but the collection of values comes from the given sensor on multiple entities */ + public <S> AggregatorBuilder<S, Object> aggregating(AttributeSensor<S> val) { + return new AggregatorBuilder<S,Object>(val); + } + /** creates an {@link UpdatingMap} enricher: + * {@link UpdatingMapBuilder#from(AttributeSensor)} and {@link UpdatingMapBuilder#computing(Function)} are required + **/ + public <S,TKey,TVal> UpdatingMapBuilder<S, TKey, TVal> updatingMap(AttributeSensor<Map<TKey,TVal>> target) { + return new UpdatingMapBuilder<S, TKey, TVal>(target); + } + /** creates a {@link org.apache.brooklyn.enricher.stock.Joiner} enricher builder + * which joins entries in a list to produce a String + **/ + public JoinerBuilder joining(AttributeSensor<?> source) { + return new JoinerBuilder(source); + } + } + + + protected abstract static class AbstractAggregatorBuilder<S, T, B extends AbstractAggregatorBuilder<S, T, B>> extends AbstractEnricherBuilder<B> { + protected final AttributeSensor<S> aggregating; + protected AttributeSensor<T> publishing; + protected Entity fromEntity; + /** @deprecated since 0.7.0, kept for backwards compatibility for rebind, but not used otherwise */ + @Deprecated protected Function<? super Collection<S>, ? extends T> computing; + // use supplier so latest values of other fields can be used + protected Supplier<Function<? super Collection<S>, ? extends T>> computingSupplier; + protected Boolean fromMembers; + protected Boolean fromChildren; + protected Boolean excludingBlank; + protected ImmutableSet<Entity> fromHardcodedProducers; + protected Predicate<? super Entity> entityFilter; + protected Predicate<Object> valueFilter; + protected Object defaultValueForUnreportedSensors; + protected Object valueToReportIfNoSensors; + + public AbstractAggregatorBuilder(AttributeSensor<S> aggregating) { + super(Aggregator.class); + this.aggregating = aggregating; + } + @SuppressWarnings({ "unchecked", "rawtypes" }) + public <T2 extends T> AggregatorBuilder<S,T2> publishing(AttributeSensor<? extends T2> val) { + this.publishing = (AttributeSensor) checkNotNull(val); + return (AggregatorBuilder) self(); + } + public B from(Entity val) { + this.fromEntity = checkNotNull(val); + return self(); + } + public B fromMembers() { + this.fromMembers = true; + return self(); + } + public B fromChildren() { + this.fromChildren = true; + return self(); + } + public B fromHardcodedProducers(Iterable<? extends Entity> val) { + this.fromHardcodedProducers = ImmutableSet.copyOf(val); + return self(); + } + @SuppressWarnings({ "unchecked", "rawtypes" }) + public B computing(Function<? super Collection<S>, ? extends T> val) { + this.computingSupplier = (Supplier)Suppliers.ofInstance(checkNotNull(val)); + return self(); + } + @SuppressWarnings({ "unchecked", "rawtypes", "unused" }) + private B computingSumLegacy() { + // since 0.7.0, kept in case we need to rebind to this + Function<Collection<S>, Number> function = new Function<Collection<S>, Number>() { + @Override public Number apply(Collection<S> input) { + return sum((Collection)input, (Number)defaultValueForUnreportedSensors, (Number)valueToReportIfNoSensors, (TypeToken) publishing.getTypeToken()); + }}; + this.computing((Function)function); + return self(); + } + @SuppressWarnings({ "unchecked", "rawtypes", "unused" }) + private B computingAverageLegacy() { + // since 0.7.0, kept in case we need to rebind to this + Function<Collection<S>, Number> function = new Function<Collection<S>, Number>() { + @Override public Number apply(Collection<S> input) { + return average((Collection)input, (Number)defaultValueForUnreportedSensors, (Number)valueToReportIfNoSensors, (TypeToken) publishing.getTypeToken()); + }}; + this.computing((Function)function); + return self(); + } + public B computingSum() { + this.computingSupplier = new Supplier<Function<? super Collection<S>, ? extends T>>() { + @Override + @SuppressWarnings({ "unchecked", "rawtypes" }) + public Function<? super Collection<S>, ? extends T> get() { + // relies on TypeCoercion of result from Number to T, and type erasure for us to get away with it! + return (Function)new ComputingSum((Number)defaultValueForUnreportedSensors, (Number)valueToReportIfNoSensors, publishing.getTypeToken()); + } + }; + return self(); + } + + public B computingAverage() { + this.computingSupplier = new Supplier<Function<? super Collection<S>, ? extends T>>() { + @Override + @SuppressWarnings({ "unchecked", "rawtypes" }) + public Function<? super Collection<S>, ? extends T> get() { + // relies on TypeCoercion of result from Number to T, and type erasure for us to get away with it! + return (Function)new ComputingAverage((Number)defaultValueForUnreportedSensors, (Number)valueToReportIfNoSensors, publishing.getTypeToken()); + } + }; + return self(); + } + public B defaultValueForUnreportedSensors(S val) { + this.defaultValueForUnreportedSensors = val; + return self(); + } + public B valueToReportIfNoSensors(Object val) { + this.valueToReportIfNoSensors = val; + return self(); + } + public B entityFilter(Predicate<? super Entity> val) { + this.entityFilter = val; + return self(); + } + public B excludingBlank() { + this.excludingBlank = true; + return self(); + } + @Override + protected String getDefaultUniqueTag() { + if (publishing==null) return null; + return "aggregator:"+publishing.getName(); + } + public EnricherSpec<?> build() { + Predicate<Object> valueFilter; + if (Boolean.TRUE.equals(excludingBlank)) { + valueFilter = new Predicate<Object>() { + @Override public boolean apply(Object input) { + return (input != null) && + ((input instanceof CharSequence) ? Strings.isNonBlank((CharSequence)input) : true); + } + }; + // above kept for deserialization; not sure necessary + valueFilter = StringPredicates.isNonBlank(); + } else { + valueFilter = null; + } + // FIXME excludingBlank; use valueFilter? exclude means ignored entirely or substituted for defaultMemberValue? + return super.build().configure(MutableMap.builder() + .putIfNotNull(Aggregator.PRODUCER, fromEntity) + .put(Aggregator.TARGET_SENSOR, publishing) + .put(Aggregator.SOURCE_SENSOR, aggregating) + .putIfNotNull(Aggregator.FROM_CHILDREN, fromChildren) + .putIfNotNull(Aggregator.FROM_MEMBERS, fromMembers) + .putIfNotNull(Aggregator.TRANSFORMATION, computingSupplier.get()) + .putIfNotNull(Aggregator.FROM_HARDCODED_PRODUCERS, fromHardcodedProducers) + .putIfNotNull(Aggregator.ENTITY_FILTER, entityFilter) + .putIfNotNull(Aggregator.VALUE_FILTER, valueFilter) + .putIfNotNull(Aggregator.DEFAULT_MEMBER_VALUE, defaultValueForUnreportedSensors) + .build()); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .omitNullValues() + .add("aggregating", aggregating) + .add("publishing", publishing) + .add("fromEntity", fromEntity) + .add("computing", computingSupplier) + .add("fromMembers", fromMembers) + .add("fromChildren", fromChildren) + .add("excludingBlank", excludingBlank) + .add("fromHardcodedProducers", fromHardcodedProducers) + .add("entityFilter", entityFilter) + .add("valueFilter", valueFilter) + .add("defaultValueForUnreportedSensors", defaultValueForUnreportedSensors) + .add("valueToReportIfNoSensors", valueToReportIfNoSensors) + .toString(); + } + } + + protected abstract static class AbstractCombinerBuilder<S, T, B extends AbstractCombinerBuilder<S, T, B>> extends AbstractEnricherBuilder<B> { + protected final List<AttributeSensor<? extends S>> combining; + protected AttributeSensor<T> publishing; + protected Entity fromEntity; + protected Function<? super Collection<S>, ? extends T> computing; + protected Boolean excludingBlank; + protected Object valueToReportIfNoSensors; + protected Predicate<Object> valueFilter; + + // For summing/averaging + protected Object defaultValueForUnreportedSensors; + + @SafeVarargs + public AbstractCombinerBuilder(AttributeSensor<? extends S>... vals) { + this(ImmutableList.copyOf(vals)); + } + public AbstractCombinerBuilder(Collection<AttributeSensor<? extends S>> vals) { + super(Combiner.class); + checkArgument(checkNotNull(vals).size() > 0, "combining-sensors must be non-empty"); + this.combining = ImmutableList.<AttributeSensor<? extends S>>copyOf(vals); + } + @SuppressWarnings({ "unchecked", "rawtypes" }) + public <T2 extends T> CombinerBuilder<S,T2> publishing(AttributeSensor<? extends T2> val) { + this.publishing = (AttributeSensor) checkNotNull(val); + return (CombinerBuilder) this; + } + public B from(Entity val) { + this.fromEntity = checkNotNull(val); + return self(); + } + public B computing(Function<? super Collection<S>, ? extends T> val) { + this.computing = checkNotNull(val); + return self(); + } + @SuppressWarnings({ "unchecked", "rawtypes" }) + public B computingSum() { + Function<Collection<S>, Number> function = new Function<Collection<S>, Number>() { + @Override public Number apply(Collection<S> input) { + return sum((Collection)input, (Number)defaultValueForUnreportedSensors, (Number)valueToReportIfNoSensors, (TypeToken) publishing.getTypeToken()); + }}; + this.computing((Function)function); + return self(); + } + @SuppressWarnings({ "unchecked", "rawtypes" }) + public B computingAverage() { + Function<Collection<S>, Number> function = new Function<Collection<S>, Number>() { + @Override public Number apply(Collection<S> input) { + return average((Collection)input, (Number)defaultValueForUnreportedSensors, (Number)valueToReportIfNoSensors, (TypeToken) publishing.getTypeToken()); + }}; + this.computing((Function)function); + return self(); + } + public B defaultValueForUnreportedSensors(Object val) { + this.defaultValueForUnreportedSensors = val; + return self(); + } + public B valueToReportIfNoSensors(Object val) { + this.valueToReportIfNoSensors = val; + return self(); + } + public B excludingBlank() { + this.excludingBlank = true; + return self(); + } + @Override + protected String getDefaultUniqueTag() { + if (publishing==null) return null; + return "combiner:"+publishing.getName(); + } + public EnricherSpec<?> build() { + return super.build().configure(MutableMap.builder() + .putIfNotNull(Combiner.PRODUCER, fromEntity) + .put(Combiner.TARGET_SENSOR, publishing) + .put(Combiner.SOURCE_SENSORS, combining) + .putIfNotNull(Combiner.TRANSFORMATION, computing) + .putIfNotNull(Combiner.VALUE_FILTER, valueFilter) + .build()); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .omitNullValues() + .add("combining", combining) + .add("publishing", publishing) + .add("fromEntity", fromEntity) + .add("computing", computing) + .add("excludingBlank", excludingBlank) + .add("valueToReportIfNoSensors", valueToReportIfNoSensors) + .add("valueFilter", valueFilter) + .toString(); + } + } + + protected abstract static class AbstractTransformerBuilder<S, T, B extends AbstractTransformerBuilder<S, T, B>> extends AbstractEnricherBuilder<B> { + protected final AttributeSensor<S> transforming; + protected AttributeSensor<T> publishing; + protected Entity fromEntity; + protected Function<? super S, ?> computing; + protected Function<? super SensorEvent<S>, ?> computingFromEvent; + + public AbstractTransformerBuilder(AttributeSensor<S> val) { + super(Transformer.class); + this.transforming = checkNotNull(val); + } + @SuppressWarnings({ "unchecked", "rawtypes" }) + public <T2 extends T> TransformerBuilder<S,T2> publishing(AttributeSensor<? extends T2> val) { + this.publishing = (AttributeSensor) checkNotNull(val); + return (TransformerBuilder) this; + } + public B from(Entity val) { + this.fromEntity = checkNotNull(val); + return self(); + } + public B computing(Function<? super S, ? extends T> val) { + this.computing = checkNotNull(val); + return self(); + } + public B computingFromEvent(Function<? super SensorEvent<S>, ? extends T> val) { + this.computingFromEvent = checkNotNull(val); + return self(); + } + @Override + protected String getDefaultUniqueTag() { + if (publishing==null) return null; + return "transformer:"+publishing.getName(); + } + public EnricherSpec<?> build() { + return super.build().configure(MutableMap.builder() + .putIfNotNull(Transformer.PRODUCER, fromEntity) + .put(Transformer.TARGET_SENSOR, publishing) + .put(Transformer.SOURCE_SENSOR, transforming) + .putIfNotNull(Transformer.TRANSFORMATION_FROM_VALUE, computing) + .putIfNotNull(Transformer.TRANSFORMATION_FROM_EVENT, computingFromEvent) + .build()); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .omitNullValues() + .add("publishing", publishing) + .add("transforming", transforming) + .add("fromEntity", fromEntity) + .add("computing", computing) + .toString(); + } + } + + protected abstract static class AbstractPropagatorBuilder<B extends AbstractPropagatorBuilder<B>> extends AbstractEnricherBuilder<B> { + protected final Map<? extends Sensor<?>, ? extends Sensor<?>> propagating; + protected final Boolean propagatingAll; + protected final Iterable<? extends Sensor<?>> propagatingAllBut; + protected Entity fromEntity; + + public AbstractPropagatorBuilder(Map<? extends Sensor<?>, ? extends Sensor<?>> vals) { + super(Propagator.class); + checkArgument(checkNotNull(vals).size() > 0, "propagating-sensors must be non-empty"); + this.propagating = vals; + this.propagatingAll = null; + this.propagatingAllBut = null; + } + public AbstractPropagatorBuilder(Iterable<? extends Sensor<?>> vals) { + this(newIdentityMap(ImmutableSet.copyOf(vals))); + } + public AbstractPropagatorBuilder(Sensor<?>... vals) { + this(newIdentityMap(ImmutableSet.copyOf(vals))); + } + AbstractPropagatorBuilder(boolean propagatingAll, Iterable<? extends Sensor<?>> butVals) { + super(Propagator.class); + // Ugly constructor! Taking boolean to differentiate it from others; could use a static builder + // but feels like overkill having a builder for a builder, being called by a builder! + checkArgument(propagatingAll, "Not propagating all; use PropagatingAll(vals)"); + this.propagating = null; + this.propagatingAll = true; + this.propagatingAllBut = (butVals == null || Iterables.isEmpty(butVals)) ? null: butVals; + } + public B from(Entity val) { + this.fromEntity = checkNotNull(val); + return self(); + } + @Override + protected String getDefaultUniqueTag() { + List<String> summary = MutableList.of(); + if (propagating!=null) { + for (Map.Entry<? extends Sensor<?>, ? extends Sensor<?>> entry: propagating.entrySet()) { + if (entry.getKey().getName().equals(entry.getValue().getName())) + summary.add(entry.getKey().getName()); + else + summary.add(entry.getKey().getName()+"->"+entry.getValue().getName()); + } + } + if (Boolean.TRUE.equals(propagatingAll)) + summary.add("ALL"); + if (propagatingAllBut!=null && !Iterables.isEmpty(propagatingAllBut)) { + List<String> allBut = MutableList.of(); + for (Sensor<?> s: propagatingAllBut) allBut.add(s.getName()); + summary.add("ALL_BUT:"+com.google.common.base.Joiner.on(",").join(allBut)); + } + + return "propagating["+fromEntity.getId()+":"+com.google.common.base.Joiner.on(",").join(summary)+"]"; + } + public EnricherSpec<? extends Enricher> build() { + return super.build().configure(MutableMap.builder() + .putIfNotNull(Propagator.PRODUCER, fromEntity) + .putIfNotNull(Propagator.SENSOR_MAPPING, propagating) + .putIfNotNull(Propagator.PROPAGATING_ALL, propagatingAll) + .putIfNotNull(Propagator.PROPAGATING_ALL_BUT, propagatingAllBut) + .build()); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .omitNullValues() + .add("fromEntity", fromEntity) + .add("propagating", propagating) + .add("propagatingAll", propagatingAll) + .add("propagatingAllBut", propagatingAllBut) + .toString(); + } + } + + public abstract static class AbstractUpdatingMapBuilder<S, TKey, TVal, B extends AbstractUpdatingMapBuilder<S, TKey, TVal, B>> extends AbstractEnricherBuilder<B> { + protected AttributeSensor<Map<TKey,TVal>> targetSensor; + protected AttributeSensor<? extends S> fromSensor; + protected TKey key; + protected Function<S, ? extends TVal> computing; + protected Boolean removingIfResultIsNull; + + public AbstractUpdatingMapBuilder(AttributeSensor<Map<TKey,TVal>> target) { + super(UpdatingMap.class); + this.targetSensor = target; + } + @SuppressWarnings({ "unchecked", "rawtypes" }) + public <S2 extends S> UpdatingMapBuilder<S2,TKey,TVal> from(AttributeSensor<S2> fromSensor) { + this.fromSensor = checkNotNull(fromSensor); + return (UpdatingMapBuilder) this; + } + public B computing(Function<S,? extends TVal> val) { + this.computing = checkNotNull(val); + return self(); + } + /** sets an explicit key to use; defaults to using the name of the source sensor specified in {@link #from(AttributeSensor)} */ + public B key(TKey key) { + this.key = key; + return self(); + } + /** sets explicit behaviour for treating <code>null</code> return values; + * default is to remove */ + public B removingIfResultIsNull(boolean val) { + this.removingIfResultIsNull = val; + return self(); + } + @Override + protected String getDefaultUniqueTag() { + if (targetSensor==null || fromSensor==null) return null; + return "updating:"+targetSensor.getName()+"<-"+fromSensor.getName(); + } + public EnricherSpec<?> build() { + return super.build().configure(MutableMap.builder() + .put(UpdatingMap.TARGET_SENSOR, targetSensor) + .put(UpdatingMap.SOURCE_SENSOR, fromSensor) + .putIfNotNull(UpdatingMap.KEY_IN_TARGET_SENSOR, key) + .put(UpdatingMap.COMPUTING, computing) + .putIfNotNull(UpdatingMap.REMOVING_IF_RESULT_IS_NULL, removingIfResultIsNull) + .build()); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .omitNullValues() + .add("publishing", targetSensor) + .add("fromSensor", fromSensor) + .add("key", key) + .add("computing", computing) + .add("removingIfResultIsNull", removingIfResultIsNull) + .toString(); + } + } + + protected abstract static class AbstractJoinerBuilder<B extends AbstractJoinerBuilder<B>> extends AbstractEnricherBuilder<B> { + protected final AttributeSensor<?> transforming; + protected AttributeSensor<String> publishing; + protected Entity fromEntity; + protected String separator; + protected Boolean quote; + protected Integer minimum; + protected Integer maximum; + + public AbstractJoinerBuilder(AttributeSensor<?> source) { + super(Joiner.class); + this.transforming = checkNotNull(source); + } + public B publishing(AttributeSensor<String> target) { + this.publishing = checkNotNull(target); + return self(); + } + public B separator(String separator) { + this.separator = separator; + return self(); + } + public B quote(Boolean quote) { + this.quote = quote; + return self(); + } + public B minimum(Integer minimum) { + this.minimum = minimum; + return self(); + } + public B maximum(Integer maximum) { + this.maximum = maximum; + return self(); + } + @Override + protected String getDefaultUniqueTag() { + if (transforming==null || publishing==null) return null; + return "joiner:"+transforming.getName()+"->"+publishing.getName(); + } + public EnricherSpec<?> build() { + return super.build().configure(MutableMap.builder() + .putIfNotNull(Joiner.PRODUCER, fromEntity) + .put(Joiner.TARGET_SENSOR, publishing) + .put(Joiner.SOURCE_SENSOR, transforming) + .putIfNotNull(Joiner.SEPARATOR, separator) + .putIfNotNull(Joiner.QUOTE, quote) + .putIfNotNull(Joiner.MINIMUM, minimum) + .putIfNotNull(Joiner.MAXIMUM, maximum) + .build()); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .omitNullValues() + .add("publishing", publishing) + .add("transforming", transforming) + .add("separator", separator) + .toString(); + } + } + + public static class InitialBuilder extends AbstractInitialBuilder<InitialBuilder> { + } + + public static class AggregatorBuilder<S, T> extends AbstractAggregatorBuilder<S, T, AggregatorBuilder<S, T>> { + public AggregatorBuilder(AttributeSensor<S> aggregating) { + super(aggregating); + } + } + + public static class PropagatorBuilder extends AbstractPropagatorBuilder<PropagatorBuilder> { + public PropagatorBuilder(Map<? extends Sensor<?>, ? extends Sensor<?>> vals) { + super(vals); + } + public PropagatorBuilder(Iterable<? extends Sensor<?>> vals) { + super(vals); + } + public PropagatorBuilder(Sensor<?>... vals) { + super(vals); + } + PropagatorBuilder(boolean propagatingAll, Iterable<? extends Sensor<?>> butVals) { + super(propagatingAll, butVals); + } + } + + public static class CombinerBuilder<S, T> extends AbstractCombinerBuilder<S, T, CombinerBuilder<S, T>> { + @SafeVarargs + public CombinerBuilder(AttributeSensor<? extends S>... vals) { + super(vals); + } + public CombinerBuilder(Collection<AttributeSensor<? extends S>> vals) { + super(vals); + } + } + + public static class TransformerBuilder<S, T> extends AbstractTransformerBuilder<S, T, TransformerBuilder<S, T>> { + public TransformerBuilder(AttributeSensor<S> val) { + super(val); + } + } + + public static class UpdatingMapBuilder<S, TKey, TVal> extends AbstractUpdatingMapBuilder<S, TKey, TVal, UpdatingMapBuilder<S, TKey, TVal>> { + public UpdatingMapBuilder(AttributeSensor<Map<TKey,TVal>> val) { + super(val); + } + } + + public static class JoinerBuilder extends AbstractJoinerBuilder<JoinerBuilder> { + public JoinerBuilder(AttributeSensor<?> source) { + super(source); + } + } + + @Beta + private abstract static class ComputingNumber<T extends Number> implements Function<Collection<T>, T> { + protected final Number defaultValueForUnreportedSensors; + protected final Number valueToReportIfNoSensors; + protected final TypeToken<T> typeToken; + @SuppressWarnings({ "rawtypes", "unchecked" }) + public ComputingNumber(Number defaultValueForUnreportedSensors, Number valueToReportIfNoSensors, TypeToken<T> typeToken) { + this.defaultValueForUnreportedSensors = defaultValueForUnreportedSensors; + this.valueToReportIfNoSensors = valueToReportIfNoSensors; + if (typeToken!=null && TypeToken.of(Number.class).isAssignableFrom(typeToken.getType())) { + this.typeToken = typeToken; + } else if (typeToken==null || typeToken.isAssignableFrom(Number.class)) { + // use double if e.g. Object is supplied + this.typeToken = (TypeToken)TypeToken.of(Double.class); + } else { + throw new IllegalArgumentException("Type "+typeToken+" is not valid for "+this); + } + } + @Override public abstract T apply(Collection<T> input); + } + + @Beta + public static class ComputingSum<T extends Number> extends ComputingNumber<T> { + public ComputingSum(Number defaultValueForUnreportedSensors, Number valueToReportIfNoSensors, TypeToken<T> typeToken) { + super(defaultValueForUnreportedSensors, valueToReportIfNoSensors, typeToken); + } + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override public T apply(Collection<T> input) { + return (T) sum((Collection)input, (Number)defaultValueForUnreportedSensors, (Number)valueToReportIfNoSensors, typeToken); + } + } + + @Beta + public static class ComputingAverage<T extends Number> extends ComputingNumber<T> { + public ComputingAverage(Number defaultValueForUnreportedSensors, Number valueToReportIfNoSensors, TypeToken<T> typeToken) { + super(defaultValueForUnreportedSensors, valueToReportIfNoSensors, typeToken); + } + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override public T apply(Collection<T> input) { + return (T) average((Collection)input, (Number)defaultValueForUnreportedSensors, (Number)valueToReportIfNoSensors, typeToken); + } + } + + protected static <T extends Number> T average(Collection<T> vals, Number defaultValueForUnreportedSensors, Number valueToReportIfNoSensors, TypeToken<T> type) { + Double doubleValueToReportIfNoSensors = (valueToReportIfNoSensors == null) ? null : valueToReportIfNoSensors.doubleValue(); + int count = count(vals, defaultValueForUnreportedSensors!=null); + Double result = (count==0) ? doubleValueToReportIfNoSensors : + (Double) ((sum(vals, defaultValueForUnreportedSensors, 0, TypeToken.of(Double.class)) / count)); + + return cast(result, type); + } + + @SuppressWarnings("unchecked") + protected static <N extends Number> N cast(Number n, TypeToken<? extends N> numberType) { + return (N) TypeCoercions.castPrimitive(n, numberType.getRawType()); + } + + @Beta //may be moved + public static <N extends Number> N sum(Iterable<? extends Number> vals, Number valueIfNull, Number valueIfNone, TypeToken<N> type) { + double result = 0d; + int count = 0; + if (vals!=null) { + for (Number val : vals) { + if (val!=null) { + result += val.doubleValue(); + count++; + } else if (valueIfNull!=null) { + result += valueIfNull.doubleValue(); + count++; + } + } + } + if (count==0) return cast(valueIfNone, type); + return cast(result, type); + } + + protected static int count(Iterable<? extends Object> vals, boolean includeNullValues) { + int result = 0; + if (vals != null) + for (Object val : vals) + if (val!=null || includeNullValues) result++; + return result; + } + + private static <T> Map<T,T> newIdentityMap(Set<T> keys) { + Map<T,T> result = Maps.newLinkedHashMap(); + for (T key : keys) { + result.put(key, key); + } + return result; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/Joiner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Joiner.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Joiner.java new file mode 100644 index 0000000..5cd2071 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Joiner.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.enricher.stock; + +import java.util.Map; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.Sensor; +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.api.sensor.SensorEventListener; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.enricher.AbstractEnricher; +import org.apache.brooklyn.core.sensor.BasicSensorEvent; +import org.apache.brooklyn.util.collections.MutableList; +import org.apache.brooklyn.util.core.flags.SetFromFlag; +import org.apache.brooklyn.util.text.StringEscapes; +import org.apache.brooklyn.util.text.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.reflect.TypeToken; + +//@Catalog(name="Transformer", description="Transforms attributes of an entity; see Enrichers.builder().transforming(...)") +@SuppressWarnings("serial") +public class Joiner<T> extends AbstractEnricher implements SensorEventListener<T> { + + private static final Logger LOG = LoggerFactory.getLogger(Joiner.class); + + public static ConfigKey<Entity> PRODUCER = ConfigKeys.newConfigKey(Entity.class, "enricher.producer"); + public static ConfigKey<Sensor<?>> SOURCE_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.sourceSensor"); + public static ConfigKey<Sensor<?>> TARGET_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.targetSensor"); + @SetFromFlag("separator") + public static ConfigKey<String> SEPARATOR = ConfigKeys.newStringConfigKey("enricher.joiner.separator", "Separator string to insert between each argument", ","); + @SetFromFlag("quote") + public static ConfigKey<Boolean> QUOTE = ConfigKeys.newBooleanConfigKey("enricher.joiner.quote", "Whether to bash-escape each parameter and wrap in double-quotes, defaulting to true", true); + @SetFromFlag("minimum") + public static ConfigKey<Integer> MINIMUM = ConfigKeys.newIntegerConfigKey("enricher.joiner.minimum", "Minimum number of elements to join; if fewer than this, sets null; default 0 (no minimum)"); + @SetFromFlag("maximum") + public static ConfigKey<Integer> MAXIMUM = ConfigKeys.newIntegerConfigKey("enricher.joiner.maximum", "Maximum number of elements to join; default null means all elements always taken"); + + protected Entity producer; + protected AttributeSensor<T> sourceSensor; + protected Sensor<String> targetSensor; + + public Joiner() { + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public void setEntity(EntityLocal entity) { + super.setEntity(entity); + + this.producer = getConfig(PRODUCER) == null ? entity: getConfig(PRODUCER); + this.sourceSensor = (AttributeSensor<T>) getRequiredConfig(SOURCE_SENSOR); + this.targetSensor = (Sensor<String>) getRequiredConfig(TARGET_SENSOR); + + subscribe(producer, sourceSensor, this); + + Object value = producer.getAttribute((AttributeSensor<?>)sourceSensor); + // TODO would be useful to have a convenience to "subscribeAndThenIfItIsAlreadySetRunItOnce" + if (value!=null) { + onEvent(new BasicSensorEvent(sourceSensor, producer, value, -1)); + } + } + + @Override + public void onEvent(SensorEvent<T> event) { + emit(targetSensor, compute(event)); + } + + protected Object compute(SensorEvent<T> event) { + Object v = event.getValue(); + Object result = null; + if (v!=null) { + if (v instanceof Map) { + v = ((Map<?,?>)v).values(); + } + if (!(v instanceof Iterable)) { + LOG.warn("Enricher "+this+" received a non-iterable value "+v.getClass()+" "+v+"; refusing to join"); + } else { + MutableList<Object> c1 = MutableList.of(); + Integer maximum = getConfig(MAXIMUM); + for (Object ci: (Iterable<?>)v) { + if (maximum!=null && maximum>=0) { + if (c1.size()>=maximum) break; + } + c1.appendIfNotNull(Strings.toString(ci)); + } + Integer minimum = getConfig(MINIMUM); + if (minimum!=null && c1.size() < minimum) { + // use default null return value + } else { + if (getConfig(QUOTE)) { + MutableList<Object> c2 = MutableList.of(); + for (Object ci: c1) { + c2.add(StringEscapes.BashStringEscapes.wrapBash((String)ci)); + } + c1 = c2; + } + result = Strings.join(c1, getConfig(SEPARATOR)); + } + } + } + if (LOG.isTraceEnabled()) + LOG.trace("Enricher "+this+" computed "+result+" from "+event); + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/Propagator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Propagator.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Propagator.java new file mode 100644 index 0000000..e6050fd --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Propagator.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.enricher.stock; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.Sensor; +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.api.sensor.SensorEventListener; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.enricher.AbstractEnricher; +import org.apache.brooklyn.core.entity.Attributes; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.flags.SetFromFlag; +import org.apache.brooklyn.util.core.task.Tasks; +import org.apache.brooklyn.util.core.task.ValueResolver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.reflect.TypeToken; + +@SuppressWarnings("serial") +//@Catalog(name="Propagator", description="Propagates attributes from one entity to another; see Enrichers.builder().propagating(...)") +public class Propagator extends AbstractEnricher implements SensorEventListener<Object> { + + private static final Logger LOG = LoggerFactory.getLogger(Propagator.class); + + public static final Set<Sensor<?>> SENSORS_NOT_USUALLY_PROPAGATED = ImmutableSet.<Sensor<?>>of( + Attributes.SERVICE_UP, Attributes.SERVICE_NOT_UP_INDICATORS, + Attributes.SERVICE_STATE_ACTUAL, Attributes.SERVICE_STATE_EXPECTED, Attributes.SERVICE_PROBLEMS); + + @SetFromFlag("producer") + public static ConfigKey<Entity> PRODUCER = ConfigKeys.newConfigKey(Entity.class, "enricher.producer"); + + @SetFromFlag("propagatingAllBut") + public static ConfigKey<Collection<Sensor<?>>> PROPAGATING_ALL_BUT = ConfigKeys.newConfigKey(new TypeToken<Collection<Sensor<?>>>() {}, "enricher.propagating.propagatingAllBut"); + + @SetFromFlag("propagatingAll") + public static ConfigKey<Boolean> PROPAGATING_ALL = ConfigKeys.newBooleanConfigKey("enricher.propagating.propagatingAll"); + + @SetFromFlag("propagating") + public static ConfigKey<Collection<? extends Sensor<?>>> PROPAGATING = ConfigKeys.newConfigKey(new TypeToken<Collection<? extends Sensor<?>>>() {}, "enricher.propagating.inclusions"); + + @SetFromFlag("sensorMapping") + public static ConfigKey<Map<? extends Sensor<?>, ? extends Sensor<?>>> SENSOR_MAPPING = ConfigKeys.newConfigKey(new TypeToken<Map<? extends Sensor<?>, ? extends Sensor<?>>>() {}, "enricher.propagating.sensorMapping"); + + protected Entity producer; + protected Map<? extends Sensor<?>, ? extends Sensor<?>> sensorMapping; + protected boolean propagatingAll; + protected Collection<Sensor<?>> propagatingAllBut; + protected Predicate<Sensor<?>> sensorFilter; + + public Propagator() { + } + + @Override + public void setEntity(EntityLocal entity) { + super.setEntity(entity); + + this.producer = getConfig(PRODUCER) == null ? entity : getConfig(PRODUCER); + boolean sensorMappingSet = getConfig(SENSOR_MAPPING)!=null; + MutableMap<Sensor<?>,Sensor<?>> sensorMappingTemp = MutableMap.copyOf(getConfig(SENSOR_MAPPING)); + this.propagatingAll = Boolean.TRUE.equals(getConfig(PROPAGATING_ALL)) || getConfig(PROPAGATING_ALL_BUT)!=null; + + if (getConfig(PROPAGATING) != null) { + if (propagatingAll) { + throw new IllegalStateException("Propagator enricher "+this+" must not have 'propagating' set at same time as either 'propagatingAll' or 'propagatingAllBut'"); + } + + for (Object sensorO : getConfig(PROPAGATING)) { + Sensor<?> sensor = Tasks.resolving(sensorO).as(Sensor.class).timeout(ValueResolver.REAL_QUICK_WAIT).context(producer).get(); + if (!sensorMappingTemp.containsKey(sensor)) { + sensorMappingTemp.put(sensor, sensor); + } + } + this.sensorMapping = ImmutableMap.copyOf(sensorMappingTemp); + this.sensorFilter = new Predicate<Sensor<?>>() { + @Override public boolean apply(Sensor<?> input) { + // TODO kept for deserialization of inner classes, but shouldn't be necessary, as with other inner classes (qv); + // NB: previously this did this check: +// return input != null && sensorMapping.keySet().contains(input); + // but those clauses seems wrong (when would input be null?) and unnecessary (we are doing an explicit subscribe in this code path) + return true; + } + }; + } else if (sensorMappingSet) { + if (propagatingAll) { + throw new IllegalStateException("Propagator enricher "+this+" must not have 'sensorMapping' set at same time as either 'propagatingAll' or 'propagatingAllBut'"); + } + this.sensorMapping = ImmutableMap.copyOf(sensorMappingTemp); + this.sensorFilter = Predicates.alwaysTrue(); + } else { + this.sensorMapping = ImmutableMap.<Sensor<?>, Sensor<?>>of(); + if (!propagatingAll) { + // default if nothing specified is to do all but the ones not usually propagated + propagatingAll = true; + // user specified nothing, so *set* the all_but to the default set + // if desired, we could allow this to be dynamically reconfigurable, remove this field and always look up; + // slight performance hit (always looking up), and might need to recompute subscriptions, so not supported currently + // TODO this default is @Beta behaviour! -- maybe better to throw? + propagatingAllBut = SENSORS_NOT_USUALLY_PROPAGATED; + } else { + propagatingAllBut = getConfig(PROPAGATING_ALL_BUT); + } + this.sensorFilter = new Predicate<Sensor<?>>() { + @Override public boolean apply(Sensor<?> input) { + Collection<Sensor<?>> exclusions = propagatingAllBut; + // TODO this anonymous inner class and getConfig check kept should be removed / confirmed for rebind compatibility. + // we *should* be regenerating these fields on each rebind (calling to this method), + // so serialization of this class shouldn't be needed (and should be skipped), but that needs to be checked. + if (propagatingAllBut==null) exclusions = getConfig(PROPAGATING_ALL_BUT); + return input != null && (exclusions==null || !exclusions.contains(input)); + } + }; + } + + Preconditions.checkState(propagatingAll ^ sensorMapping.size() > 0, + "Nothing to propagate; detected: propagatingAll (%s, excluding %s), sensorMapping (%s)", propagatingAll, getConfig(PROPAGATING_ALL_BUT), sensorMapping); + + if (propagatingAll) { + subscribe(producer, null, this); + } else { + for (Sensor<?> sensor : sensorMapping.keySet()) { + subscribe(producer, sensor, this); + } + } + + emitAllAttributes(); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public void onEvent(SensorEvent<Object> event) { + // propagate upwards + Sensor<?> sourceSensor = event.getSensor(); + Sensor<?> destinationSensor = getDestinationSensor(sourceSensor); + + if (!sensorFilter.apply(sourceSensor)) { + return; // ignoring excluded sensor + } + + if (LOG.isTraceEnabled()) LOG.trace("enricher {} got {}, propagating via {}{}", + new Object[] {this, event, entity, (sourceSensor == destinationSensor ? "" : " (as "+destinationSensor+")")}); + + emit((Sensor)destinationSensor, event.getValue()); + } + + /** useful once sensors are added to emit all values */ + public void emitAllAttributes() { + emitAllAttributes(false); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void emitAllAttributes(boolean includeNullValues) { + Iterable<? extends Sensor<?>> sensorsToPopulate = propagatingAll + ? Iterables.filter(producer.getEntityType().getSensors(), sensorFilter) + : sensorMapping.keySet(); + + for (Sensor<?> s : sensorsToPopulate) { + if (s instanceof AttributeSensor) { + AttributeSensor destinationSensor = (AttributeSensor<?>) getDestinationSensor(s); + Object v = producer.getAttribute((AttributeSensor<?>)s); + // TODO we should keep a timestamp for the source sensor and echo it + // (this pretends timestamps are current, which probably isn't the case when we are propagating) + if (v != null || includeNullValues) entity.setAttribute(destinationSensor, v); + } + } + } + + private Sensor<?> getDestinationSensor(Sensor<?> sourceSensor) { + return sensorMapping.containsKey(sourceSensor) ? sensorMapping.get(sourceSensor): sourceSensor; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/SensorPropagatingEnricher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/SensorPropagatingEnricher.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/SensorPropagatingEnricher.java new file mode 100644 index 0000000..4c198f8 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/SensorPropagatingEnricher.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.enricher.stock; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.Sensor; +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.api.sensor.SensorEventListener; +import org.apache.brooklyn.core.enricher.AbstractEnricher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +/** + * an enricher policy which just listens for the target sensor(s) on a child entity and passes it up. + * now superseded by syntax such as: + * + * <pre>{@code Enrichers.builder().propagating(XXX).from(source).build()}</pre> + * + * @deprecated since 0.7.0; use {@link Enrichers#builder()} + * + * @see Propagator if need to sub-class + */ +public class SensorPropagatingEnricher extends AbstractEnricher implements SensorEventListener<Object> { + + public static final Logger log = LoggerFactory.getLogger(SensorPropagatingEnricher.class); + + /** the entity to listen to */ + private final Entity source; + + /** the sensors to listen to */ + private final Set<Sensor<?>> sensors; + + /** the sensors to listen to */ + private final Map<Sensor<?>, Sensor<?>> sensorMappings; + + public static SensorPropagatingEnricher newInstanceListeningToAllSensors(Entity source) { + return newInstanceListeningToAllSensorsBut(source); + } + public static SensorPropagatingEnricher newInstanceListeningToAllSensorsBut(Entity source, Sensor<?>... excludes) { + Set<Sensor<?>> excluded = ImmutableSet.copyOf(excludes); + Set<Sensor<?>> includes = Sets.newLinkedHashSet(); + + for (Sensor<?> it : source.getEntityType().getSensors()) { + if (!excluded.contains(it)) includes.add(it); + } + return new SensorPropagatingEnricher(source, includes); + } + + public static SensorPropagatingEnricher newInstanceListeningTo(Entity source, Sensor<?>... includes) { + return new SensorPropagatingEnricher(source, includes); + } + + /** + * listens to sensors from source, propagates them here renamed according to the map + * + * Instead, consider calling: + * <pre> + * {@code + * addEnricher(Enrichers.builder() + * .propagating(mapOfOldSensorNamesToNewSensorNames) + * .from(source) + * .build()); + * } + * </pre> + * + * @deprecated since 0.7.0; use {@link Enrichers#builder()} + */ + public static SensorPropagatingEnricher newInstanceRenaming(Entity source, Map<? extends Sensor<?>, ? extends Sensor<?>> sensors) { + return new SensorPropagatingEnricher(source, sensors); + } + + /** + * @deprecated since 0.7.0; use {@link Enrichers#builder()} + */ + public SensorPropagatingEnricher(Entity source, Sensor<?>... sensors) { + this(source, ImmutableList.copyOf(sensors)); + } + + /** + * Instead, consider calling: + * <pre> + * {@code + * addEnricher(Enrichers.builder() + * .propagating(sensors) + * .from(source) + * .build()); + * } + * </pre> + * + * @deprecated since 0.7.0; use {@link Enrichers#builder()} + */ + public SensorPropagatingEnricher(Entity source, Collection<Sensor<?>> sensors) { + this.source = source; + this.sensors = ImmutableSet.copyOf(sensors); + this.sensorMappings = ImmutableMap.of(); + } + + public SensorPropagatingEnricher(Entity source, Map<? extends Sensor<?>, ? extends Sensor<?>> sensors) { + this.source = source; + this.sensors = ImmutableSet.copyOf(sensors.keySet()); + this.sensorMappings = ImmutableMap.copyOf(sensors); + } + + public void setEntity(EntityLocal entity) { + super.setEntity(entity); + for (Sensor<?> s: sensors) { + subscribe(source, s, this); + } + } + + @Override + @SuppressWarnings({ "unchecked", "rawtypes" }) + public void onEvent(SensorEvent<Object> event) { + // propagate upwards + Sensor<?> sourceSensor = event.getSensor(); + Sensor<?> destinationSensor = getDestinationSensor(sourceSensor); + + if (log.isTraceEnabled()) log.trace("policy {} got {}, propagating via {}{}", + new Object[] {this, event, entity, (sourceSensor == destinationSensor ? "" : " (as "+destinationSensor+")")}); + + if (event.getSensor() instanceof AttributeSensor) { + entity.setAttribute((AttributeSensor)destinationSensor, event.getValue()); + } else { + entity.emit((Sensor)destinationSensor, event.getValue()); + } + } + + /** useful post-addition to emit current values */ + public void emitAllAttributes() { + emitAllAttributes(false); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void emitAllAttributes(boolean includeNullValues) { + for (Sensor s: sensors) { + if (s instanceof AttributeSensor) { + AttributeSensor destinationSensor = (AttributeSensor<?>) getDestinationSensor(s); + Object v = source.getAttribute((AttributeSensor)s); + if (v != null || includeNullValues) entity.setAttribute(destinationSensor, v); + } + } + } + + /** convenience, to be called by the host */ + public SensorPropagatingEnricher addToEntityAndEmitAll(Entity host) { + host.addEnricher(this); + emitAllAttributes(); + return this; + } + + private Sensor<?> getDestinationSensor(Sensor<?> sourceSensor) { + return sensorMappings.containsKey(sourceSensor) ? sensorMappings.get(sourceSensor): sourceSensor; + } +}
