http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2a78e273/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java b/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java new file mode 100644 index 0000000..b909856 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java @@ -0,0 +1,823 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.sensor; + +import static com.google.common.base.Preconditions.checkNotNull; +import groovy.lang.Closure; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +import javax.annotation.Nullable; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.mgmt.ExecutionContext; +import org.apache.brooklyn.api.mgmt.SubscriptionHandle; +import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.api.mgmt.TaskAdaptable; +import org.apache.brooklyn.api.mgmt.TaskFactory; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.api.sensor.SensorEventListener; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.entity.Attributes; +import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.entity.EntityInternal; +import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; +import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; +import org.apache.brooklyn.util.collections.CollectionFunctionals; +import org.apache.brooklyn.util.collections.MutableList; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.task.BasicExecutionContext; +import org.apache.brooklyn.util.core.task.BasicTask; +import org.apache.brooklyn.util.core.task.DeferredSupplier; +import org.apache.brooklyn.util.core.task.DynamicTasks; +import org.apache.brooklyn.util.core.task.ParallelTask; +import org.apache.brooklyn.util.core.task.TaskInternal; +import org.apache.brooklyn.util.core.task.Tasks; +import org.apache.brooklyn.util.core.task.ValueResolver; +import org.apache.brooklyn.util.exceptions.CompoundRuntimeException; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.exceptions.NotManagedException; +import org.apache.brooklyn.util.exceptions.RuntimeTimeoutException; +import org.apache.brooklyn.util.groovy.GroovyJavaMethods; +import org.apache.brooklyn.util.guava.Functionals; +import org.apache.brooklyn.util.guava.Maybe; +import org.apache.brooklyn.util.text.Strings; +import org.apache.brooklyn.util.time.CountdownTimer; +import org.apache.brooklyn.util.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.Beta; +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +/** Conveniences for making tasks which run in entity {@link ExecutionContext}s, subscribing to attributes from other entities, possibly transforming those; + * these {@link Task} instances are typically passed in {@link EntityLocal#setConfig(ConfigKey, Object)}. + * <p> + * If using a lot it may be useful to: + * <pre> + * {@code + * import static brooklyn.event.basic.DependentConfiguration.*; + * } + * </pre> + */ +public class DependentConfiguration { + + private static final Logger LOG = LoggerFactory.getLogger(DependentConfiguration.class); + + //not instantiable, only a static helper + private DependentConfiguration() {} + + /** + * Default readiness is Groovy truth. + * + * @see #attributeWhenReady(Entity, AttributeSensor, Predicate) + */ + public static <T> Task<T> attributeWhenReady(Entity source, AttributeSensor<T> sensor) { + return attributeWhenReady(source, sensor, GroovyJavaMethods.truthPredicate()); + } + + public static <T> Task<T> attributeWhenReady(Entity source, AttributeSensor<T> sensor, Closure<Boolean> ready) { + Predicate<Object> readyPredicate = (ready != null) ? GroovyJavaMethods.<Object>predicateFromClosure(ready) : GroovyJavaMethods.truthPredicate(); + return attributeWhenReady(source, sensor, readyPredicate); + } + + /** returns an unsubmitted {@link Task} which blocks until the given sensor on the given source entity gives a value that satisfies ready, then returns that value; + * particular useful in Entity configuration where config will block until Tasks have a value + */ + public static <T> Task<T> attributeWhenReady(final Entity source, final AttributeSensor<T> sensor, final Predicate<? super T> ready) { + Builder<T, T> builder = builder().attributeWhenReady(source, sensor); + if (ready != null) builder.readiness(ready); + return builder.build(); + + } + + public static <T,V> Task<V> attributePostProcessedWhenReady(Entity source, AttributeSensor<T> sensor, Closure<Boolean> ready, Closure<V> postProcess) { + Predicate<? super T> readyPredicate = (ready != null) ? GroovyJavaMethods.predicateFromClosure(ready) : GroovyJavaMethods.truthPredicate(); + Function<? super T, V> postProcessFunction = GroovyJavaMethods.<T,V>functionFromClosure(postProcess); + return attributePostProcessedWhenReady(source, sensor, readyPredicate, postProcessFunction); + } + + public static <T,V> Task<V> attributePostProcessedWhenReady(Entity source, AttributeSensor<T> sensor, Closure<V> postProcess) { + return attributePostProcessedWhenReady(source, sensor, GroovyJavaMethods.truthPredicate(), GroovyJavaMethods.<T,V>functionFromClosure(postProcess)); + } + + public static <T> Task<T> valueWhenAttributeReady(Entity source, AttributeSensor<T> sensor, T value) { + return DependentConfiguration.<T,T>attributePostProcessedWhenReady(source, sensor, GroovyJavaMethods.truthPredicate(), Functions.constant(value)); + } + + public static <T,V> Task<V> valueWhenAttributeReady(Entity source, AttributeSensor<T> sensor, Function<? super T,V> valueProvider) { + return attributePostProcessedWhenReady(source, sensor, GroovyJavaMethods.truthPredicate(), valueProvider); + } + + public static <T,V> Task<V> valueWhenAttributeReady(Entity source, AttributeSensor<T> sensor, Closure<V> valueProvider) { + return attributePostProcessedWhenReady(source, sensor, GroovyJavaMethods.truthPredicate(), valueProvider); + } + + public static <T,V> Task<V> attributePostProcessedWhenReady(final Entity source, final AttributeSensor<T> sensor, final Predicate<? super T> ready, final Closure<V> postProcess) { + return attributePostProcessedWhenReady(source, sensor, ready, GroovyJavaMethods.<T,V>functionFromClosure(postProcess)); + } + + @SuppressWarnings("unchecked") + public static <T,V> Task<V> attributePostProcessedWhenReady(final Entity source, final AttributeSensor<T> sensor, final Predicate<? super T> ready, final Function<? super T,V> postProcess) { + Builder<T,T> builder1 = DependentConfiguration.builder().attributeWhenReady(source, sensor); + // messy generics here to support null postProcess; would be nice to disallow that here + Builder<T,V> builder; + if (postProcess != null) { + builder = builder1.postProcess(postProcess); + } else { + builder = (Builder<T,V>)builder1; + } + if (ready != null) builder.readiness(ready); + + return builder.build(); + } + + public static <T> T waitInTaskForAttributeReady(Entity source, AttributeSensor<T> sensor, Predicate<? super T> ready) { + return waitInTaskForAttributeReady(source, sensor, ready, ImmutableList.<AttributeAndSensorCondition<?>>of()); + } + + public static <T> T waitInTaskForAttributeReady(final Entity source, final AttributeSensor<T> sensor, Predicate<? super T> ready, List<AttributeAndSensorCondition<?>> abortConditions) { + String blockingDetails = "Waiting for ready from "+source+" "+sensor+" (subscription)"; + return waitInTaskForAttributeReady(source, sensor, ready, abortConditions, blockingDetails); + } + + // TODO would be nice to have an easy semantics for whenServiceUp (cf DynamicWebAppClusterImpl.whenServiceUp) + + public static <T> T waitInTaskForAttributeReady(final Entity source, final AttributeSensor<T> sensor, Predicate<? super T> ready, List<AttributeAndSensorCondition<?>> abortConditions, String blockingDetails) { + return new WaitInTaskForAttributeReady<T,T>(source, sensor, ready, abortConditions, blockingDetails).call(); + } + + protected static class WaitInTaskForAttributeReady<T,V> implements Callable<V> { + + /* This is a change since before Oct 2014. Previously it would continue to poll, + * (maybe finding a different error) if the target entity becomes unmanaged. + * Now it actively checks unmanaged by default, and still throws although it might + * now find a different problem. */ + private final static boolean DEFAULT_IGNORE_UNMANAGED = false; + + protected final Entity source; + protected final AttributeSensor<T> sensor; + protected final Predicate<? super T> ready; + protected final List<AttributeAndSensorCondition<?>> abortSensorConditions; + protected final String blockingDetails; + protected final Function<? super T,? extends V> postProcess; + protected final Duration timeout; + protected final Maybe<V> onTimeout; + protected final boolean ignoreUnmanaged; + protected final Maybe<V> onUnmanaged; + // TODO onError Continue / Throw / Return(V) + + protected WaitInTaskForAttributeReady(Builder<T, V> builder) { + this.source = builder.source; + this.sensor = builder.sensor; + this.ready = builder.readiness; + this.abortSensorConditions = builder.abortSensorConditions; + this.blockingDetails = builder.blockingDetails; + this.postProcess = builder.postProcess; + this.timeout = builder.timeout; + this.onTimeout = builder.onTimeout; + this.ignoreUnmanaged = builder.ignoreUnmanaged; + this.onUnmanaged = builder.onUnmanaged; + } + + private WaitInTaskForAttributeReady(Entity source, AttributeSensor<T> sensor, Predicate<? super T> ready, + List<AttributeAndSensorCondition<?>> abortConditions, String blockingDetails) { + this.source = source; + this.sensor = sensor; + this.ready = ready; + this.abortSensorConditions = abortConditions; + this.blockingDetails = blockingDetails; + + this.timeout = Duration.PRACTICALLY_FOREVER; + this.onTimeout = Maybe.absent(); + this.ignoreUnmanaged = DEFAULT_IGNORE_UNMANAGED; + this.onUnmanaged = Maybe.absent(); + this.postProcess = null; + } + + @SuppressWarnings("unchecked") + protected V postProcess(T value) { + if (this.postProcess!=null) return postProcess.apply(value); + // if no post-processing assume the types are correct + return (V) value; + } + + protected boolean ready(T value) { + if (ready!=null) return ready.apply(value); + return GroovyJavaMethods.truth(value); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public V call() { + T value = source.getAttribute(sensor); + + // return immediately if either the ready predicate or the abort conditions hold + if (ready(value)) return postProcess(value); + + final List<Exception> abortionExceptions = Lists.newCopyOnWriteArrayList(); + long start = System.currentTimeMillis(); + + for (AttributeAndSensorCondition abortCondition : abortSensorConditions) { + Object abortValue = abortCondition.source.getAttribute(abortCondition.sensor); + if (abortCondition.predicate.apply(abortValue)) { + abortionExceptions.add(new Exception("Abort due to "+abortCondition.source+" -> "+abortCondition.sensor)); + } + } + if (abortionExceptions.size() > 0) { + throw new CompoundRuntimeException("Aborted waiting for ready from "+source+" "+sensor, abortionExceptions); + } + + TaskInternal<?> current = (TaskInternal<?>) Tasks.current(); + if (current == null) throw new IllegalStateException("Should only be invoked in a running task"); + Entity entity = BrooklynTaskTags.getTargetOrContextEntity(current); + if (entity == null) throw new IllegalStateException("Should only be invoked in a running task with an entity tag; "+ + current+" has no entity tag ("+current.getStatusDetail(false)+")"); + + final LinkedList<T> publishedValues = new LinkedList<T>(); + final Semaphore semaphore = new Semaphore(0); // could use Exchanger + SubscriptionHandle subscription = null; + List<SubscriptionHandle> abortSubscriptions = Lists.newArrayList(); + + try { + subscription = ((EntityInternal)entity).getSubscriptionContext().subscribe(source, sensor, new SensorEventListener<T>() { + @Override public void onEvent(SensorEvent<T> event) { + synchronized (publishedValues) { publishedValues.add(event.getValue()); } + semaphore.release(); + }}); + for (final AttributeAndSensorCondition abortCondition : abortSensorConditions) { + abortSubscriptions.add(((EntityInternal)entity).getSubscriptionContext().subscribe(abortCondition.source, abortCondition.sensor, new SensorEventListener<Object>() { + @Override public void onEvent(SensorEvent<Object> event) { + if (abortCondition.predicate.apply(event.getValue())) { + abortionExceptions.add(new Exception("Abort due to "+abortCondition.source+" -> "+abortCondition.sensor)); + semaphore.release(); + } + }})); + Object abortValue = abortCondition.source.getAttribute(abortCondition.sensor); + if (abortCondition.predicate.apply(abortValue)) { + abortionExceptions.add(new Exception("Abort due to "+abortCondition.source+" -> "+abortCondition.sensor)); + } + } + if (abortionExceptions.size() > 0) { + throw new CompoundRuntimeException("Aborted waiting for ready from "+source+" "+sensor, abortionExceptions); + } + + CountdownTimer timer = timeout!=null ? timeout.countdownTimer() : null; + Duration maxPeriod = ValueResolver.PRETTY_QUICK_WAIT; + Duration nextPeriod = ValueResolver.REAL_QUICK_PERIOD; + while (true) { + // check the source on initial run (could be done outside the loop) + // and also (optionally) on each iteration in case it is more recent + value = source.getAttribute(sensor); + if (ready(value)) break; + + if (timer!=null) { + if (timer.getDurationRemaining().isShorterThan(nextPeriod)) { + nextPeriod = timer.getDurationRemaining(); + } + if (timer.isExpired()) { + if (onTimeout.isPresent()) return onTimeout.get(); + throw new RuntimeTimeoutException("Unsatisfied after "+Duration.sinceUtc(start)); + } + } + + String prevBlockingDetails = current.setBlockingDetails(blockingDetails); + try { + if (semaphore.tryAcquire(nextPeriod.toMilliseconds(), TimeUnit.MILLISECONDS)) { + // immediately release so we are available for the next check + semaphore.release(); + // if other permits have been made available (e.g. multiple notifications) drain them all as no point running multiple times + semaphore.drainPermits(); + } + } finally { + current.setBlockingDetails(prevBlockingDetails); + } + + // check any subscribed values which have come in first + while (true) { + synchronized (publishedValues) { + if (publishedValues.isEmpty()) break; + value = publishedValues.pop(); + } + if (ready(value)) break; + } + + // if unmanaged then ignore the other abort conditions + if (!ignoreUnmanaged && Entities.isNoLongerManaged(entity)) { + if (onUnmanaged.isPresent()) return onUnmanaged.get(); + throw new NotManagedException(entity); + } + + if (abortionExceptions.size() > 0) { + throw new CompoundRuntimeException("Aborted waiting for ready from "+source+" "+sensor, abortionExceptions); + } + + nextPeriod = nextPeriod.times(2).upperBound(maxPeriod); + } + if (LOG.isDebugEnabled()) LOG.debug("Attribute-ready for {} in entity {}", sensor, source); + return postProcess(value); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } finally { + if (subscription != null) { + ((EntityInternal)entity).getSubscriptionContext().unsubscribe(subscription); + } + for (SubscriptionHandle handle : abortSubscriptions) { + ((EntityInternal)entity).getSubscriptionContext().unsubscribe(handle); + } + } + } + } + + /** + * Returns a {@link Task} which blocks until the given job returns, then returns the value of that job. + * + * @deprecated since 0.7; code will be moved into test utilities + */ + @Deprecated + public static <T> Task<T> whenDone(Callable<T> job) { + return new BasicTask<T>(MutableMap.of("tag", "whenDone", "displayName", "waiting for job"), job); + } + + /** + * Returns a {@link Task} which waits for the result of first parameter, then applies the function in the second + * parameter to it, returning that result. + * + * Particular useful in Entity configuration where config will block until Tasks have completed, + * allowing for example an {@link #attributeWhenReady(Entity, AttributeSensor, Predicate)} expression to be + * passed in the first argument then transformed by the function in the second argument to generate + * the value that is used for the configuration + */ + public static <U,T> Task<T> transform(final Task<U> task, final Function<U,T> transformer) { + return transform(MutableMap.of("displayName", "transforming "+task), task, transformer); + } + + /** @see #transform(Task, Function) */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public static <U,T> Task<T> transform(Task<U> task, Closure transformer) { + return transform(task, GroovyJavaMethods.functionFromClosure(transformer)); + } + + /** @see #transform(Task, Function) */ + @SuppressWarnings({ "rawtypes" }) + public static <U,T> Task<T> transform(final Map flags, final TaskAdaptable<U> task, final Function<U,T> transformer) { + return new BasicTask<T>(flags, new Callable<T>() { + public T call() throws Exception { + if (!task.asTask().isSubmitted()) { + BasicExecutionContext.getCurrentExecutionContext().submit(task); + } + return transformer.apply(task.asTask().get()); + }}); + } + + /** Returns a task which waits for multiple other tasks (submitting if necessary) + * and performs arbitrary computation over the List of results. + * @see #transform(Task, Function) but note argument order is reversed (counterintuitive) to allow for varargs */ + public static <U,T> Task<T> transformMultiple(Function<List<U>,T> transformer, @SuppressWarnings("unchecked") TaskAdaptable<U> ...tasks) { + return transformMultiple(MutableMap.of("displayName", "transforming multiple"), transformer, tasks); + } + + /** @see #transformMultiple(Function, TaskAdaptable...) */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public static <U,T> Task<T> transformMultiple(Closure transformer, TaskAdaptable<U> ...tasks) { + return transformMultiple(GroovyJavaMethods.functionFromClosure(transformer), tasks); + } + + /** @see #transformMultiple(Function, TaskAdaptable...) */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public static <U,T> Task<T> transformMultiple(Map flags, Closure transformer, TaskAdaptable<U> ...tasks) { + return transformMultiple(flags, GroovyJavaMethods.functionFromClosure(transformer), tasks); + } + + /** @see #transformMultiple(Function, TaskAdaptable...) */ + @SuppressWarnings({ "rawtypes" }) + public static <U,T> Task<T> transformMultiple(Map flags, final Function<List<U>,T> transformer, @SuppressWarnings("unchecked") TaskAdaptable<U> ...tasks) { + return transformMultiple(flags, transformer, Arrays.asList(tasks)); + } + @SuppressWarnings({ "rawtypes" }) + public static <U,T> Task<T> transformMultiple(Map flags, final Function<List<U>,T> transformer, Collection<? extends TaskAdaptable<U>> tasks) { + if (tasks.size()==1) { + return transform(flags, Iterables.getOnlyElement(tasks), new Function<U,T>() { + @Override + @Nullable + public T apply(@Nullable U input) { + return transformer.apply(ImmutableList.of(input)); + } + }); + } + return transform(flags, new ParallelTask<U>(tasks), transformer); + } + + + /** Method which returns a Future containing a string formatted using String.format, + * where the arguments can be normal objects or tasks; + * tasks will be waited on (submitted if necessary) and their results substituted in the call + * to String.format. + * <p> + * Example: + * <pre> + * {@code + * setConfig(URL, DependentConfiguration.formatString("%s:%s", + * DependentConfiguration.attributeWhenReady(target, Target.HOSTNAME), + * DependentConfiguration.attributeWhenReady(target, Target.PORT) ) ); + * } + * </pre> + */ + @SuppressWarnings("unchecked") + public static Task<String> formatString(final String spec, final Object ...args) { + List<TaskAdaptable<Object>> taskArgs = Lists.newArrayList(); + for (Object arg: args) { + if (arg instanceof TaskAdaptable) taskArgs.add((TaskAdaptable<Object>)arg); + else if (arg instanceof TaskFactory) taskArgs.add( ((TaskFactory<TaskAdaptable<Object>>)arg).newTask() ); + } + + return transformMultiple( + MutableMap.<String,String>of("displayName", "formatting '"+spec+"' with "+taskArgs.size()+" task"+(taskArgs.size()!=1?"s":"")), + new Function<List<Object>, String>() { + @Override public String apply(List<Object> input) { + Iterator<?> tri = input.iterator(); + Object[] vv = new Object[args.length]; + int i=0; + for (Object arg : args) { + if (arg instanceof TaskAdaptable || arg instanceof TaskFactory) vv[i] = tri.next(); + else if (arg instanceof DeferredSupplier) vv[i] = ((DeferredSupplier<?>) arg).get(); + else vv[i] = arg; + i++; + } + return String.format(spec, vv); + }}, + taskArgs); + } + + /** returns a task for parallel execution returning a list of values for the given sensor for the given entity list, + * optionally when the values satisfy a given readiness predicate (defaulting to groovy truth if not supplied) */ + public static <T> Task<List<T>> listAttributesWhenReady(AttributeSensor<T> sensor, Iterable<Entity> entities) { + return listAttributesWhenReady(sensor, entities, GroovyJavaMethods.truthPredicate()); + } + + public static <T> Task<List<T>> listAttributesWhenReady(AttributeSensor<T> sensor, Iterable<Entity> entities, Closure<Boolean> readiness) { + Predicate<Object> readinessPredicate = (readiness != null) ? GroovyJavaMethods.<Object>predicateFromClosure(readiness) : GroovyJavaMethods.truthPredicate(); + return listAttributesWhenReady(sensor, entities, readinessPredicate); + } + + /** returns a task for parallel execution returning a list of values of the given sensor list on the given entity, + * optionally when the values satisfy a given readiness predicate (defaulting to groovy truth if not supplied) */ + public static <T> Task<List<T>> listAttributesWhenReady(final AttributeSensor<T> sensor, Iterable<Entity> entities, Predicate<? super T> readiness) { + if (readiness == null) readiness = GroovyJavaMethods.truthPredicate(); + return builder().attributeWhenReadyFromMultiple(entities, sensor, readiness).build(); + } + + /** @see #waitForTask(Task, Entity, String) */ + public static <T> T waitForTask(Task<T> t, Entity context) throws InterruptedException { + return waitForTask(t, context, null); + } + + /** blocks until the given task completes, submitting if necessary, returning the result of that task; + * optional contextMessage is available in status if this is running in a task + */ + @SuppressWarnings("unchecked") + public static <T> T waitForTask(Task<T> t, Entity context, String contextMessage) throws InterruptedException { + try { + return (T) Tasks.resolveValue(t, Object.class, ((EntityInternal)context).getExecutionContext(), contextMessage); + } catch (ExecutionException e) { + throw Throwables.propagate(e); + } + } + + public static class AttributeAndSensorCondition<T> { + protected final Entity source; + protected final AttributeSensor<T> sensor; + protected final Predicate<? super T> predicate; + + public AttributeAndSensorCondition(Entity source, AttributeSensor<T> sensor, Predicate<? super T> predicate) { + this.source = checkNotNull(source, "source"); + this.sensor = checkNotNull(sensor, "sensor"); + this.predicate = checkNotNull(predicate, "predicate"); + } + } + + public static ProtoBuilder builder() { + return new ProtoBuilder(); + } + + /** + * Builder for producing variants of attributeWhenReady. + */ + @Beta + public static class ProtoBuilder { + /** + * Will wait for the attribute on the given entity. + * If that entity reports {@link Lifecycle#ON_FIRE} for its {@link Attributes#SERVICE_STATE} then it will abort. + */ + public <T2> Builder<T2,T2> attributeWhenReady(Entity source, AttributeSensor<T2> sensor) { + return new Builder<T2,T2>(source, sensor).abortIfOnFire(); + } + + /** + * Will wait for the attribute on the given entity, not aborting when it goes {@link Lifecycle#ON_FIRE}. + */ + public <T2> Builder<T2,T2> attributeWhenReadyAllowingOnFire(Entity source, AttributeSensor<T2> sensor) { + return new Builder<T2,T2>(source, sensor); + } + + /** Constructs a builder for task for parallel execution returning a list of values of the given sensor list on the given entity, + * optionally when the values satisfy a given readiness predicate (defaulting to groovy truth if not supplied) */ + @Beta + public <T> MultiBuilder<T, T, List<T>> attributeWhenReadyFromMultiple(Iterable<? extends Entity> sources, AttributeSensor<T> sensor) { + return attributeWhenReadyFromMultiple(sources, sensor, GroovyJavaMethods.truthPredicate()); + } + /** As {@link #attributeWhenReadyFromMultiple(Iterable, AttributeSensor)} with an explicit readiness test. */ + @Beta + public <T> MultiBuilder<T, T, List<T>> attributeWhenReadyFromMultiple(Iterable<? extends Entity> sources, AttributeSensor<T> sensor, Predicate<? super T> readiness) { + return new MultiBuilder<T, T, List<T>>(sources, sensor, readiness); + } + } + + /** + * Builder for producing variants of attributeWhenReady. + */ + public static class Builder<T,V> { + protected Entity source; + protected AttributeSensor<T> sensor; + protected Predicate<? super T> readiness; + protected Function<? super T, ? extends V> postProcess; + protected List<AttributeAndSensorCondition<?>> abortSensorConditions = Lists.newArrayList(); + protected String blockingDetails; + protected Duration timeout; + protected Maybe<V> onTimeout; + protected boolean ignoreUnmanaged = WaitInTaskForAttributeReady.DEFAULT_IGNORE_UNMANAGED; + protected Maybe<V> onUnmanaged; + + protected Builder(Entity source, AttributeSensor<T> sensor) { + this.source = source; + this.sensor = sensor; + } + + /** + * Will wait for the attribute on the given entity. + * If that entity report {@link Lifecycle#ON_FIRE} for its {@link Attributes#SERVICE_STATE_ACTUAL} then it will abort. + * @deprecated since 0.7.0 use {@link DependentConfiguration#builder()} then {@link ProtoBuilder#attributeWhenReady(Entity, AttributeSensor)} then {@link #abortIfOnFire()} + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public <T2> Builder<T2,T2> attributeWhenReady(Entity source, AttributeSensor<T2> sensor) { + this.source = checkNotNull(source, "source"); + this.sensor = (AttributeSensor) checkNotNull(sensor, "sensor"); + abortIfOnFire(); + return (Builder<T2, T2>) this; + } + public Builder<T,V> readiness(Closure<Boolean> val) { + this.readiness = GroovyJavaMethods.predicateFromClosure(checkNotNull(val, "val")); + return this; + } + public Builder<T,V> readiness(Predicate<? super T> val) { + this.readiness = checkNotNull(val, "ready"); + return this; + } + @SuppressWarnings({ "unchecked", "rawtypes" }) + public <V2> Builder<T,V2> postProcess(Closure<V2> val) { + this.postProcess = (Function) GroovyJavaMethods.<T,V2>functionFromClosure(checkNotNull(val, "postProcess")); + return (Builder<T,V2>) this; + } + @SuppressWarnings({ "unchecked", "rawtypes" }) + public <V2> Builder<T,V2> postProcess(final Function<? super T, V2> val) { + this.postProcess = (Function) checkNotNull(val, "postProcess"); + return (Builder<T,V2>) this; + } + public <T2> Builder<T,V> abortIf(Entity source, AttributeSensor<T2> sensor) { + return abortIf(source, sensor, GroovyJavaMethods.truthPredicate()); + } + public <T2> Builder<T,V> abortIf(Entity source, AttributeSensor<T2> sensor, Predicate<? super T2> predicate) { + abortSensorConditions.add(new AttributeAndSensorCondition<T2>(source, sensor, predicate)); + return this; + } + public Builder<T,V> abortIfOnFire() { + abortIf(source, Attributes.SERVICE_STATE_ACTUAL, Predicates.equalTo(Lifecycle.ON_FIRE)); + return this; + } + public Builder<T,V> blockingDetails(String val) { + blockingDetails = val; + return this; + } + /** specifies an optional timeout; by default it waits forever, or until unmanaged or other abort condition */ + public Builder<T,V> timeout(Duration val) { + timeout = val; + return this; + } + public Builder<T,V> onTimeoutReturn(V val) { + onTimeout = Maybe.of(val); + return this; + } + public Builder<T,V> onTimeoutThrow() { + onTimeout = Maybe.<V>absent(); + return this; + } + public Builder<T,V> onUnmanagedReturn(V val) { + onUnmanaged = Maybe.of(val); + return this; + } + public Builder<T,V> onUnmanagedThrow() { + onUnmanaged = Maybe.<V>absent(); + return this; + } + /** @since 0.7.0 included in case old behaviour of not checking whether the entity is managed is required + * (I can't see why it is; polling will likely give errors, once it is unmanaged this will never completed, + * and before management the current code will continue, so long as there are no other errors) */ @Deprecated + public Builder<T,V> onUnmanagedContinue() { + ignoreUnmanaged = true; + return this; + } + /** take advantage of the fact that this builder can build multiple times, allowing subclasses + * to change the source along the way */ + protected Builder<T,V> source(Entity source) { + this.source = source; + return this; + } + /** as {@link #source(Entity)} */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + protected Builder<T,V> sensor(AttributeSensor<? extends T> sensor) { + this.sensor = (AttributeSensor) sensor; + return this; + } + public Task<V> build() { + validate(); + + return Tasks.<V>builder().dynamic(false) + .name("waiting on "+sensor.getName()) + .description("Waiting on sensor "+sensor.getName()+" from "+source) + .tag("attributeWhenReady") + .body(new WaitInTaskForAttributeReady<T,V>(this)) + .build(); + } + + public V runNow() { + validate(); + return new WaitInTaskForAttributeReady<T,V>(this).call(); + } + @SuppressWarnings({ "unchecked", "rawtypes" }) + private void validate() { + checkNotNull(source, "Entity source"); + checkNotNull(sensor, "Sensor"); + if (readiness == null) readiness = GroovyJavaMethods.truthPredicate(); + if (postProcess == null) postProcess = (Function) Functions.identity(); + } + } + + /** + * Builder for producing variants of attributeWhenReady. + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Beta + public static class MultiBuilder<T, V, V2> { + protected final String name; + protected final String descriptionBase; + protected final Builder<T,V> builder; + // if desired, the use of this multiSource could allow different conditions; + // but probably an easier API just for the caller to build the parallel task + protected final List<AttributeAndSensorCondition<?>> multiSource = Lists.newArrayList(); + protected Function<? super List<V>, V2> postProcessFromMultiple; + + /** returns a task for parallel execution returning a list of values of the given sensor list on the given entity, + * optionally when the values satisfy a given readiness predicate (defaulting to groovy truth if not supplied) */ + @Beta + protected MultiBuilder(Iterable<? extends Entity> sources, AttributeSensor<T> sensor) { + this(sources, sensor, GroovyJavaMethods.truthPredicate()); + } + @Beta + protected MultiBuilder(Iterable<? extends Entity> sources, AttributeSensor<T> sensor, Predicate<? super T> readiness) { + builder = new Builder<T,V>(null, sensor); + builder.readiness(readiness); + + for (Entity s : checkNotNull(sources, "sources")) { + multiSource.add(new AttributeAndSensorCondition<T>(s, sensor, readiness)); + } + this.name = "waiting on "+sensor.getName(); + this.descriptionBase = "waiting on "+sensor.getName()+" "+readiness + +" from "+Iterables.size(sources)+" entit"+Strings.ies(sources); + } + + /** Apply post-processing to the entire list of results */ + public <V2b> MultiBuilder<T, V, V2b> postProcessFromMultiple(final Function<? super List<V>, V2b> val) { + this.postProcessFromMultiple = (Function) checkNotNull(val, "postProcessFromMulitple"); + return (MultiBuilder<T,V, V2b>) this; + } + /** Apply post-processing to the entire list of results + * See {@link CollectionFunctionals#all(Predicate)} and {@link CollectionFunctionals#quorum(org.apache.brooklyn.util.collections.QuorumCheck, Predicate) + * which allow useful arguments. */ + public MultiBuilder<T, V, Boolean> postProcessFromMultiple(final Predicate<? super List<V>> val) { + return postProcessFromMultiple(Functions.forPredicate(val)); + } + + public <V1> MultiBuilder<T, V1, V2> postProcess(Closure<V1> val) { + builder.postProcess(val); + return (MultiBuilder<T, V1, V2>) this; + } + public <V1> MultiBuilder<T, V1, V2> postProcess(final Function<? super T, V1> val) { + builder.postProcess(val); + return (MultiBuilder<T, V1, V2>) this; + } + public <T2> MultiBuilder<T, V, V2> abortIf(Entity source, AttributeSensor<T2> sensor) { + builder.abortIf(source, sensor); + return this; + } + public <T2> MultiBuilder<T, V, V2> abortIf(Entity source, AttributeSensor<T2> sensor, Predicate<? super T2> predicate) { + builder.abortIf(source, sensor, predicate); + return this; + } + public MultiBuilder<T, V, V2> abortIfOnFire() { + builder.abortIfOnFire(); + return this; + } + public MultiBuilder<T, V, V2> blockingDetails(String val) { + builder.blockingDetails(val); + return this; + } + public MultiBuilder<T, V, V2> timeout(Duration val) { + builder.timeout(val); + return this; + } + public MultiBuilder<T, V, V2> onTimeoutReturn(V val) { + builder.onTimeoutReturn(val); + return this; + } + public MultiBuilder<T, V, V2> onTimeoutThrow() { + builder.onTimeoutThrow(); + return this; + } + public MultiBuilder<T, V, V2> onUnmanagedReturn(V val) { + builder.onUnmanagedReturn(val); + return this; + } + public MultiBuilder<T, V, V2> onUnmanagedThrow() { + builder.onUnmanagedThrow(); + return this; + } + + public Task<V2> build() { + List<Task<V>> tasks = MutableList.of(); + for (AttributeAndSensorCondition<?> source: multiSource) { + builder.source(source.source); + builder.sensor((AttributeSensor)source.sensor); + builder.readiness((Predicate)source.predicate); + tasks.add(builder.build()); + } + final Task<List<V>> parallelTask = Tasks.<List<V>>builder().parallel(true).addAll(tasks) + .name(name) + .description(descriptionBase+ + (builder.timeout!=null ? ", timeout "+builder.timeout : "")) + .build(); + + if (postProcessFromMultiple == null) { + // V2 should be the right type in normal operations + return (Task<V2>) parallelTask; + } else { + return Tasks.<V2>builder().name(name).description(descriptionBase) + .tag("attributeWhenReady") + .body(new Callable<V2>() { + @Override public V2 call() throws Exception { + List<V> prePostProgress = DynamicTasks.queue(parallelTask).get(); + return DynamicTasks.queue( + Tasks.<V2>builder().name("post-processing").description("Applying "+postProcessFromMultiple) + .body(Functionals.callable(postProcessFromMultiple, prePostProgress)) + .build()).get(); + } + }) + .build(); + } + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2a78e273/core/src/main/java/org/apache/brooklyn/core/sensor/HttpRequestSensor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/HttpRequestSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/HttpRequestSensor.java new file mode 100644 index 0000000..79660ce --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/HttpRequestSensor.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.sensor; + +import java.net.URI; + +import net.minidev.json.JSONObject; + +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.effector.AddSensor; +import org.apache.brooklyn.sensor.feed.http.HttpFeed; +import org.apache.brooklyn.sensor.feed.http.HttpPollConfig; +import org.apache.brooklyn.sensor.feed.http.HttpValueFunctions; +import org.apache.brooklyn.util.core.config.ConfigBag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.Beta; +import com.google.common.base.Functions; +import com.google.common.base.Supplier; + +/** + * Configurable {@link org.apache.brooklyn.api.entity.EntityInitializer} which adds an HTTP sensor feed to retrieve the + * {@link JSONObject} from a JSON response in order to populate the sensor with the data at the {@code jsonPath}. + * + * @see SshCommandSensor + * @see JmxAttributeSensor + */ +@Beta +public final class HttpRequestSensor<T> extends AddSensor<T> { + + private static final Logger LOG = LoggerFactory.getLogger(HttpRequestSensor.class); + + public static final ConfigKey<String> SENSOR_URI = ConfigKeys.newStringConfigKey("uri", "HTTP URI to poll for JSON"); + public static final ConfigKey<String> JSON_PATH = ConfigKeys.newStringConfigKey("jsonPath", "JSON path to select in HTTP response; default $", "$"); + public static final ConfigKey<String> USERNAME = ConfigKeys.newStringConfigKey("username", "Username for HTTP request, if required"); + public static final ConfigKey<String> PASSWORD = ConfigKeys.newStringConfigKey("password", "Password for HTTP request, if required"); + + protected final Supplier<URI> uri; + protected final String jsonPath; + protected final String username; + protected final String password; + + public HttpRequestSensor(final ConfigBag params) { + super(params); + + uri = new Supplier<URI>() { + @Override + public URI get() { + return URI.create(params.get(SENSOR_URI)); + } + }; + jsonPath = params.get(JSON_PATH); + username = params.get(USERNAME); + password = params.get(PASSWORD); + } + + @Override + public void apply(final EntityLocal entity) { + super.apply(entity); + + if (LOG.isDebugEnabled()) { + LOG.debug("Adding HTTP JSON sensor {} to {}", name, entity); + } + + HttpPollConfig<T> pollConfig = new HttpPollConfig<T>(sensor) + .checkSuccess(HttpValueFunctions.responseCodeEquals(200)) + .onFailureOrException(Functions.constant((T) null)) + .onSuccess(HttpValueFunctions.<T>jsonContentsFromPath(jsonPath)) + .period(period); + + HttpFeed.builder().entity(entity) + .baseUri(uri) + .credentialsIfNotNull(username, password) + .poll(pollConfig) + .build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2a78e273/core/src/main/java/org/apache/brooklyn/core/sensor/PortAttributeSensorAndConfigKey.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/PortAttributeSensorAndConfigKey.java b/core/src/main/java/org/apache/brooklyn/core/sensor/PortAttributeSensorAndConfigKey.java new file mode 100644 index 0000000..3de0de6 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/PortAttributeSensorAndConfigKey.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.sensor; + +import java.util.Collection; +import java.util.Iterator; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.api.location.MachineProvisioningLocation; +import org.apache.brooklyn.api.location.PortRange; +import org.apache.brooklyn.api.location.PortSupplier; +import org.apache.brooklyn.api.mgmt.ManagementContext; +import org.apache.brooklyn.api.sensor.Sensor; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.entity.BrooklynConfigKeys; +import org.apache.brooklyn.core.internal.BrooklynInitialization; +import org.apache.brooklyn.core.location.Locations; +import org.apache.brooklyn.util.core.flags.TypeCoercions; +import org.apache.brooklyn.util.guava.Maybe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; +import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; + +/** + * A {@link Sensor} describing a port on a system, + * with a {@link ConfigKey} which can be configured with a port range + * (either a number e.g. 80, or a string e.g. "80" or "8080-8089" or even "80, 8080-8089, 8800+", or a list of these). + * <p> + * To convert at runtime a single port is chosen, respecting the entity. + */ +public class PortAttributeSensorAndConfigKey extends AttributeSensorAndConfigKey<PortRange,Integer> { + + private static final long serialVersionUID = 4680651022807491321L; + + public static final Logger LOG = LoggerFactory.getLogger(PortAttributeSensorAndConfigKey.class); + + static { BrooklynInitialization.initAll(); } + + public PortAttributeSensorAndConfigKey(String name) { + this(name, name, null); + } + public PortAttributeSensorAndConfigKey(String name, String description) { + this(name, description, null); + } + public PortAttributeSensorAndConfigKey(String name, String description, Object defaultValue) { + super(PortRange.class, Integer.class, name, description, defaultValue); + } + public PortAttributeSensorAndConfigKey(PortAttributeSensorAndConfigKey orig, Object defaultValue) { + super(orig, TypeCoercions.coerce(defaultValue, PortRange.class)); + } + @Override + protected Integer convertConfigToSensor(PortRange value, Entity entity) { + if (value==null) return null; + Collection<? extends Location> locations = entity.getLocations(); + if (!locations.isEmpty()) { + Maybe<? extends Location> lo = Locations.findUniqueMachineLocation(locations); + if (!lo.isPresent()) { + // Try a unique location which isn't a machine provisioner + Iterator<? extends Location> li = Iterables.filter(locations, + Predicates.not(Predicates.instanceOf(MachineProvisioningLocation.class))).iterator(); + if (li.hasNext()) lo = Maybe.of(li.next()); + if (li.hasNext()) lo = Maybe.absent(); + } + // Fall back to selecting the single location + if (!lo.isPresent() && locations.size() == 1) { + lo = Maybe.of(locations.iterator().next()); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Convert config to sensor for {} found locations: {}. Selected: {}", new Object[] {entity, locations, lo}); + } + if (lo.isPresent()) { + Location l = lo.get(); + Optional<Boolean> locationRunning = Optional.fromNullable(l.getConfig(BrooklynConfigKeys.SKIP_ENTITY_START_IF_RUNNING)); + Optional<Boolean> entityRunning = Optional.fromNullable(entity.getConfig(BrooklynConfigKeys.SKIP_ENTITY_START_IF_RUNNING)); + Optional<Boolean> locationInstalled = Optional.fromNullable(l.getConfig(BrooklynConfigKeys.SKIP_ENTITY_INSTALLATION)); + Optional<Boolean> entityInstalled = Optional.fromNullable(entity.getConfig(BrooklynConfigKeys.SKIP_ENTITY_INSTALLATION)); + Optional<Boolean> entityStarted = Optional.fromNullable(entity.getConfig(BrooklynConfigKeys.SKIP_ENTITY_START)); + boolean skipCheck = locationRunning.or(entityRunning).or(locationInstalled).or(entityInstalled).or(entityStarted).or(false); + if (l instanceof PortSupplier) { + int p = ((PortSupplier) l).obtainPort(value); + if (p != -1) { + LOG.debug("{}: choosing port {} for {}", new Object[] { entity, p, getName() }); + return p; + } + // If we are not skipping install or already started, fail now + if (!skipCheck) { + int rangeSize = Iterables.size(value); + if (rangeSize == 0) { + LOG.warn("{}: no port available for {} (empty range {})", new Object[] { entity, getName(), value }); + } else if (rangeSize == 1) { + Integer pp = value.iterator().next(); + if (pp > 1024) { + LOG.warn("{}: port {} not available for {}", new Object[] { entity, pp, getName() }); + } else { + LOG.warn("{}: port {} not available for {} (root may be required?)", new Object[] { entity, pp, getName() }); + } + } else { + LOG.warn("{}: no port available for {} (tried range {})", new Object[] { entity, getName(), value }); + } + return null; // Definitively, no ports available + } + } + // Ports may be available, we just can't tell from the location + Integer v = (value.isEmpty() ? null : value.iterator().next()); + LOG.debug("{}: choosing port {} (unconfirmed) for {}", new Object[] { entity, v, getName() }); + return v; + } else { + LOG.warn("{}: ports not applicable, or not yet applicable, because has multiple locations {}; ignoring ", new Object[] { entity, locations, getName() }); + } + } else { + LOG.warn("{}: ports not applicable, or not yet applicable, bacause has no locations; ignoring {}", entity, getName()); + } + return null; + } + + @Override + protected Integer convertConfigToSensor(PortRange value, ManagementContext managementContext) { + LOG.warn("ports not applicable, because given managementContext rather than entity; ignoring {}", getName()); + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2a78e273/core/src/main/java/org/apache/brooklyn/core/sensor/Sensors.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/Sensors.java b/core/src/main/java/org/apache/brooklyn/core/sensor/Sensors.java new file mode 100644 index 0000000..05227d0 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/Sensors.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.sensor; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.net.InetAddress; +import java.net.URI; +import java.net.URL; + +import javax.annotation.Nullable; + +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.AttributeSensor.SensorPersistenceMode; +import org.apache.brooklyn.core.config.render.RendererHints; +import org.apache.brooklyn.util.net.UserAndHostAndPort; +import org.apache.brooklyn.util.text.StringFunctions; +import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.Time; + +import com.google.common.annotations.Beta; +import com.google.common.base.Function; +import com.google.common.net.HostAndPort; +import com.google.common.reflect.TypeToken; + +public class Sensors { + + @Beta + public static <T> Builder<T> builder(TypeToken<T> type, String name) { + return new Builder<T>().type(type).name(name); + } + + @Beta + public static <T> Builder<T> builder(Class<T> type, String name) { + return new Builder<T>().type(type).name(name); + } + + @Beta + public static class Builder<T> { + private String name; + private TypeToken<T> type; + private String description; + private SensorPersistenceMode persistence; + + protected Builder() { // use builder(type, name) instead + } + public Builder<T> name(String val) { + this.name = checkNotNull(val, "name"); return this; + } + public Builder<T> type(Class<T> val) { + return type(TypeToken.of(val)); + } + public Builder<T> type(TypeToken<T> val) { + this.type = checkNotNull(val, "type"); return this; + } + public Builder<T> description(String val) { + this.description = val; return this; + } + public Builder<T> persistence(SensorPersistenceMode val) { + this.persistence = val; return this; + } + public AttributeSensor<T> build() { + return new BasicAttributeSensor<T>(type, name, description, persistence); + } + } + + public static <T> AttributeSensor<T> newSensor(Class<T> type, String name) { + return new BasicAttributeSensor<T>(type, name); + } + + public static <T> AttributeSensor<T> newSensor(Class<T> type, String name, String description) { + return new BasicAttributeSensor<T>(type, name, description); + } + + public static <T> AttributeSensor<T> newSensor(TypeToken<T> type, String name, String description) { + return new BasicAttributeSensor<T>(type, name, description); + } + + public static AttributeSensor<String> newStringSensor(String name) { + return newSensor(String.class, name); + } + + public static AttributeSensor<String> newStringSensor(String name, String description) { + return newSensor(String.class, name, description); + } + + public static AttributeSensor<Integer> newIntegerSensor(String name) { + return newSensor(Integer.class, name); + } + + public static AttributeSensor<Integer> newIntegerSensor(String name, String description) { + return newSensor(Integer.class, name, description); + } + + public static AttributeSensor<Long> newLongSensor(String name) { + return newSensor(Long.class, name); + } + + public static AttributeSensor<Long> newLongSensor(String name, String description) { + return newSensor(Long.class, name, description); + } + + public static AttributeSensor<Double> newDoubleSensor(String name) { + return newSensor(Double.class, name); + } + + public static AttributeSensor<Double> newDoubleSensor(String name, String description) { + return newSensor(Double.class, name, description); + } + + public static AttributeSensor<Boolean> newBooleanSensor(String name) { + return newSensor(Boolean.class, name); + } + + public static AttributeSensor<Boolean> newBooleanSensor(String name, String description) { + return newSensor(Boolean.class, name, description); + } + + // Extensions to sensors + + public static <T> AttributeSensor<T> newSensorRenamed(String newName, AttributeSensor<T> sensor) { + return new BasicAttributeSensor<T>(sensor.getTypeToken(), newName, sensor.getDescription()); + } + + public static <T> AttributeSensor<T> newSensorWithPrefix(String prefix, AttributeSensor<T> sensor) { + return newSensorRenamed(prefix+sensor.getName(), sensor); + } + + // Display hints for common utility objects + + static { + RendererHints.register(Duration.class, RendererHints.displayValue(Time.fromDurationToTimeStringRounded())); + RendererHints.register(HostAndPort.class, RendererHints.displayValue(StringFunctions.toStringFunction())); + RendererHints.register(UserAndHostAndPort.class, RendererHints.displayValue(StringFunctions.toStringFunction())); + RendererHints.register(InetAddress.class, RendererHints.displayValue(new Function<InetAddress,String>() { + @Override + public String apply(@Nullable InetAddress input) { + return input == null ? null : input.getHostAddress(); + } + })); + + RendererHints.register(URL.class, RendererHints.displayValue(StringFunctions.toStringFunction())); + RendererHints.register(URL.class, RendererHints.openWithUrl(StringFunctions.toStringFunction())); + RendererHints.register(URI.class, RendererHints.displayValue(StringFunctions.toStringFunction())); + RendererHints.register(URI.class, RendererHints.openWithUrl(StringFunctions.toStringFunction())); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2a78e273/core/src/main/java/org/apache/brooklyn/core/sensor/StaticSensor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/StaticSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/StaticSensor.java new file mode 100644 index 0000000..4a7b1d4 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/StaticSensor.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.sensor; + +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.effector.AddSensor; +import org.apache.brooklyn.sensor.enricher.Propagator; +import org.apache.brooklyn.util.core.config.ConfigBag; +import org.apache.brooklyn.util.core.task.Tasks; +import org.apache.brooklyn.util.core.task.ValueResolver; +import org.apache.brooklyn.util.guava.Maybe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Supplier; + +/** + * Provides an initializer/feed which simply sets a given value. + * <p> + * {@link Task}/{@link Supplier} values are resolved when written, + * unlike config values which are resolved on each read. + * <p> + * This supports a {@link StaticSensor#SENSOR_PERIOD} + * which can be useful if the supplied value is such a function. + * However when the source is another sensor, + * consider using {@link Propagator} which listens for changes instead. */ +public class StaticSensor<T> extends AddSensor<T> { + + private static final Logger log = LoggerFactory.getLogger(StaticSensor.class); + + public static final ConfigKey<Object> STATIC_VALUE = ConfigKeys.newConfigKey(Object.class, "static.value"); + + private final Object value; + + public StaticSensor(ConfigBag params) { + super(params); + value = params.get(STATIC_VALUE); + } + + @SuppressWarnings("unchecked") + @Override + public void apply(EntityLocal entity) { + super.apply(entity); + + Maybe<T> v = Tasks.resolving(value).as((Class<T>)sensor.getType()).timeout(ValueResolver.PRETTY_QUICK_WAIT).getMaybe(); + if (v.isPresent()) { + log.debug(this+" setting sensor "+sensor+" to "+v.get()); + entity.setAttribute(sensor, v.get()); + } else { + log.debug(this+" not setting sensor "+sensor+"; cannot resolve "+value); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2a78e273/core/src/main/java/org/apache/brooklyn/core/sensor/TemplatedStringAttributeSensorAndConfigKey.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/TemplatedStringAttributeSensorAndConfigKey.java b/core/src/main/java/org/apache/brooklyn/core/sensor/TemplatedStringAttributeSensorAndConfigKey.java new file mode 100644 index 0000000..953b7d8 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/TemplatedStringAttributeSensorAndConfigKey.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.sensor; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.mgmt.ManagementContext; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.entity.EntityInternal; +import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal; +import org.apache.brooklyn.util.core.text.TemplateProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableMap; + +/** + * A {@link ConfigKey} which takes a freemarker-templated string, + * and whose value is converted to a sensor by processing the template + * with access to config and methods on the entity where it is set. + */ +public class TemplatedStringAttributeSensorAndConfigKey extends BasicAttributeSensorAndConfigKey<String> { + private static final long serialVersionUID = 4680651022807491321L; + + public static final Logger LOG = LoggerFactory.getLogger(TemplatedStringAttributeSensorAndConfigKey.class); + + public TemplatedStringAttributeSensorAndConfigKey(String name) { + this(name, name, null); + } + public TemplatedStringAttributeSensorAndConfigKey(String name, String description) { + this(name, description, null); + } + public TemplatedStringAttributeSensorAndConfigKey(String name, String description, String defaultValue) { + super(String.class, name, description, defaultValue); + } + public TemplatedStringAttributeSensorAndConfigKey(TemplatedStringAttributeSensorAndConfigKey orig, String defaultValue) { + super(orig, defaultValue); + } + + @Override + protected String convertConfigToSensor(String value, Entity entity) { + if (value == null) return null; + return TemplateProcessor.processTemplateContents(value, (EntityInternal)entity, ImmutableMap.<String,Object>of()); + } + + @Override + protected String convertConfigToSensor(String value, ManagementContext managementContext) { + if (value == null) return null; + return TemplateProcessor.processTemplateContents(value, (ManagementContextInternal)managementContext, ImmutableMap.<String,Object>of()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2a78e273/core/src/main/java/org/apache/brooklyn/core/server/entity/BrooklynMetrics.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/server/entity/BrooklynMetrics.java b/core/src/main/java/org/apache/brooklyn/core/server/entity/BrooklynMetrics.java index 5f060c2..6e9cef5 100644 --- a/core/src/main/java/org/apache/brooklyn/core/server/entity/BrooklynMetrics.java +++ b/core/src/main/java/org/apache/brooklyn/core/server/entity/BrooklynMetrics.java @@ -21,8 +21,8 @@ package org.apache.brooklyn.core.server.entity; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.ImplementedBy; import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.sensor.core.BasicAttributeSensor; -import org.apache.brooklyn.sensor.core.BasicAttributeSensorAndConfigKey; +import org.apache.brooklyn.core.sensor.BasicAttributeSensor; +import org.apache.brooklyn.core.sensor.BasicAttributeSensorAndConfigKey; import org.apache.brooklyn.util.core.flags.SetFromFlag; @ImplementedBy(BrooklynMetricsImpl.class) http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2a78e273/core/src/main/java/org/apache/brooklyn/entity/group/AbstractGroup.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/AbstractGroup.java b/core/src/main/java/org/apache/brooklyn/entity/group/AbstractGroup.java index 00710ca..aa9ca90 100644 --- a/core/src/main/java/org/apache/brooklyn/entity/group/AbstractGroup.java +++ b/core/src/main/java/org/apache/brooklyn/entity/group/AbstractGroup.java @@ -27,7 +27,7 @@ import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ComputeServiceIndicatorsFromChildrenAndMembers; import org.apache.brooklyn.core.entity.trait.Changeable; -import org.apache.brooklyn.sensor.core.Sensors; +import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.util.collections.QuorumCheck; import org.apache.brooklyn.util.collections.QuorumCheck.QuorumChecks; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2a78e273/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java index bf39663..b6ee23a 100644 --- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java +++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java @@ -38,11 +38,11 @@ import org.apache.brooklyn.core.entity.Attributes; import org.apache.brooklyn.core.entity.factory.EntityFactory; import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; import org.apache.brooklyn.core.entity.trait.MemberReplaceable; +import org.apache.brooklyn.core.sensor.BasicAttributeSensor; +import org.apache.brooklyn.core.sensor.BasicNotificationSensor; +import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.entity.group.zoneaware.BalancingNodePlacementStrategy; import org.apache.brooklyn.entity.group.zoneaware.ProportionalZoneFailureDetector; -import org.apache.brooklyn.sensor.core.BasicAttributeSensor; -import org.apache.brooklyn.sensor.core.BasicNotificationSensor; -import org.apache.brooklyn.sensor.core.Sensors; import org.apache.brooklyn.util.core.flags.SetFromFlag; import org.apache.brooklyn.util.time.Duration; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2a78e273/core/src/main/java/org/apache/brooklyn/entity/group/DynamicFabric.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicFabric.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicFabric.java index fc53c21..943ed9b 100644 --- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicFabric.java +++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicFabric.java @@ -28,7 +28,7 @@ import org.apache.brooklyn.core.entity.Attributes; import org.apache.brooklyn.core.entity.factory.EntityFactory; import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; import org.apache.brooklyn.core.entity.trait.Startable; -import org.apache.brooklyn.sensor.core.Sensors; +import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.util.core.flags.SetFromFlag; import com.google.common.collect.ImmutableMap; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2a78e273/core/src/main/java/org/apache/brooklyn/entity/group/DynamicGroup.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicGroup.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicGroup.java index bb43d3b..c06aa58 100644 --- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicGroup.java +++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicGroup.java @@ -30,7 +30,7 @@ import org.apache.brooklyn.core.annotation.Effector; import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.effector.MethodEffector; import org.apache.brooklyn.core.entity.trait.Startable; -import org.apache.brooklyn.sensor.core.Sensors; +import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.util.core.flags.SetFromFlag; import com.google.common.base.Predicate; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2a78e273/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroup.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroup.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroup.java index 5e9b75f..048a304 100644 --- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroup.java +++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroup.java @@ -27,7 +27,7 @@ import org.apache.brooklyn.api.entity.ImplementedBy; import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.sensor.core.Sensors; +import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.util.core.flags.SetFromFlag; import com.google.common.annotations.Beta; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2a78e273/core/src/main/java/org/apache/brooklyn/entity/stock/DelegateEntity.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/entity/stock/DelegateEntity.java b/core/src/main/java/org/apache/brooklyn/entity/stock/DelegateEntity.java index 645426a..793bac6 100644 --- a/core/src/main/java/org/apache/brooklyn/entity/stock/DelegateEntity.java +++ b/core/src/main/java/org/apache/brooklyn/entity/stock/DelegateEntity.java @@ -26,8 +26,8 @@ import org.apache.brooklyn.api.entity.ImplementedBy; import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.config.render.RendererHints; -import org.apache.brooklyn.sensor.core.AttributeSensorAndConfigKey; -import org.apache.brooklyn.sensor.core.Sensors; +import org.apache.brooklyn.core.sensor.AttributeSensorAndConfigKey; +import org.apache.brooklyn.core.sensor.Sensors; import com.google.common.base.Function; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2a78e273/core/src/main/java/org/apache/brooklyn/location/winrm/AdvertiseWinrmLoginPolicy.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/location/winrm/AdvertiseWinrmLoginPolicy.java b/core/src/main/java/org/apache/brooklyn/location/winrm/AdvertiseWinrmLoginPolicy.java index 0d5828d..0945407 100644 --- a/core/src/main/java/org/apache/brooklyn/location/winrm/AdvertiseWinrmLoginPolicy.java +++ b/core/src/main/java/org/apache/brooklyn/location/winrm/AdvertiseWinrmLoginPolicy.java @@ -26,9 +26,9 @@ import org.apache.brooklyn.api.sensor.SensorEvent; import org.apache.brooklyn.api.sensor.SensorEventListener; import org.apache.brooklyn.core.entity.AbstractEntity; import org.apache.brooklyn.core.policy.AbstractPolicy; +import org.apache.brooklyn.core.sensor.Sensors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.brooklyn.sensor.core.Sensors; import com.google.common.annotations.Beta; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2a78e273/core/src/main/java/org/apache/brooklyn/sensor/core/AttributeMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/core/AttributeMap.java b/core/src/main/java/org/apache/brooklyn/sensor/core/AttributeMap.java deleted file mode 100644 index 4125a96..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/core/AttributeMap.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.core; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.Serializable; -import java.util.Collection; -import java.util.Map; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.core.BrooklynLogging; -import org.apache.brooklyn.core.entity.AbstractEntity; -import org.apache.brooklyn.util.core.flags.TypeCoercions; -import org.apache.brooklyn.util.guava.Maybe; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; - -/** - * A {@link Map} of {@link Entity} attribute values. - */ -public final class AttributeMap implements Serializable { - - private static final long serialVersionUID = -6834883734250888344L; - - static final Logger log = LoggerFactory.getLogger(AttributeMap.class); - - private static enum Marker { - NULL; - } - - private final AbstractEntity entity; - - // Assumed to be something like a ConcurrentMap passed in. - private final Map<Collection<String>, Object> values; - - /** - * Creates a new AttributeMap. - * - * @param entity the EntityLocal this AttributeMap belongs to. - * @throws IllegalArgumentException if entity is null - */ - public AttributeMap(AbstractEntity entity, Map<Collection<String>, Object> storage) { - this.entity = checkNotNull(entity, "entity must be specified"); - this.values = checkNotNull(storage, "storage map must not be null"); - } - - public Map<Collection<String>, Object> asRawMap() { - return ImmutableMap.copyOf(values); - } - - public Map<String, Object> asMap() { - Map<String, Object> result = Maps.newLinkedHashMap(); - for (Map.Entry<Collection<String>, Object> entry : values.entrySet()) { - String sensorName = Joiner.on('.').join(entry.getKey()); - Object val = (isNull(entry.getValue())) ? null : entry.getValue(); - result.put(sensorName, val); - } - return result; - } - - /** - * Updates the value. - * - * @param path the path to the value. - * @param newValue the new value - * @return the old value. - * @throws IllegalArgumentException if path is null or empty - */ - // TODO path must be ordered(and legal to contain duplicates like "a.b.a"; list would be better - public <T> T update(Collection<String> path, T newValue) { - checkPath(path); - - if (newValue == null) { - newValue = typedNull(); - } - - if (log.isTraceEnabled()) { - log.trace("setting sensor {}={} for {}", new Object[] {path, newValue, entity}); - } - - @SuppressWarnings("unchecked") - T oldValue = (T) values.put(path, newValue); - return (isNull(oldValue)) ? null : oldValue; - } - - private void checkPath(Collection<String> path) { - Preconditions.checkNotNull(path, "path can't be null"); - Preconditions.checkArgument(!path.isEmpty(), "path can't be empty"); - } - - public <T> T update(AttributeSensor<T> attribute, T newValue) { - T oldValue = updateWithoutPublishing(attribute, newValue); - entity.emitInternal(attribute, newValue); - return oldValue; - } - - public <T> T updateWithoutPublishing(AttributeSensor<T> attribute, T newValue) { - if (log.isTraceEnabled()) { - Object oldValue = getValue(attribute); - if (!Objects.equal(oldValue, newValue != null)) { - log.trace("setting attribute {} to {} (was {}) on {}", new Object[] {attribute.getName(), newValue, oldValue, entity}); - } else { - log.trace("setting attribute {} to {} (unchanged) on {}", new Object[] {attribute.getName(), newValue, this}); - } - } - - T oldValue = (T) update(attribute.getNameParts(), newValue); - - return (isNull(oldValue)) ? null : oldValue; - } - - /** - * Where atomicity is desired, the methods in this class synchronize on the {@link #values} map. - */ - public <T> T modify(AttributeSensor<T> attribute, Function<? super T, Maybe<T>> modifier) { - synchronized (values) { - T oldValue = getValue(attribute); - Maybe<? extends T> newValue = modifier.apply(oldValue); - - if (newValue.isPresent()) { - if (log.isTraceEnabled()) log.trace("modified attribute {} to {} (was {}) on {}", new Object[] {attribute.getName(), newValue, oldValue, entity}); - return update(attribute, newValue.get()); - } else { - if (log.isTraceEnabled()) log.trace("modified attribute {} unchanged; not emitting on {}", new Object[] {attribute.getName(), newValue, this}); - return oldValue; - } - } - } - - public void remove(AttributeSensor<?> attribute) { - BrooklynLogging.log(log, BrooklynLogging.levelDebugOrTraceIfReadOnly(entity), - "removing attribute {} on {}", attribute.getName(), entity); - - remove(attribute.getNameParts()); - } - - // TODO path must be ordered(and legal to contain duplicates like "a.b.a"; list would be better - public void remove(Collection<String> path) { - checkPath(path); - - if (log.isTraceEnabled()) { - log.trace("removing sensor {} for {}", new Object[] {path, entity}); - } - - values.remove(path); - } - - /** - * Gets the value - * - * @param path the path of the value to get - * @return the value - * @throws IllegalArgumentException path is null or empty. - */ - public Object getValue(Collection<String> path) { - // TODO previously this would return a map of the sub-tree if the path matched a prefix of a group of sensors, - // or the leaf value if only one value. Arguably that is not required - what is/was the use-case? - // - checkPath(path); - Object result = values.get(path); - return (isNull(result)) ? null : result; - } - - @SuppressWarnings("unchecked") - public <T> T getValue(AttributeSensor<T> sensor) { - return (T) TypeCoercions.coerce(getValue(sensor.getNameParts()), sensor.getType()); - } - - @SuppressWarnings("unchecked") - private <T> T typedNull() { - return (T) Marker.NULL; - } - - private boolean isNull(Object t) { - return t == Marker.NULL; - } -}
