http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java b/core/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java new file mode 100644 index 0000000..d34d0a5 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.feed.windows; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import io.cloudsoft.winrm4j.winrm.WinRmToolResponse; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.annotation.Nullable; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.mgmt.ExecutionContext; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.effector.EffectorTasks; +import org.apache.brooklyn.core.entity.EntityInternal; +import org.apache.brooklyn.core.feed.AbstractFeed; +import org.apache.brooklyn.core.feed.PollHandler; +import org.apache.brooklyn.core.feed.Poller; +import org.apache.brooklyn.core.sensor.Sensors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.brooklyn.location.winrm.WinRmMachineLocation; +import org.apache.brooklyn.util.core.flags.TypeCoercions; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.time.Duration; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.reflect.TypeToken; + +/** + * A sensor feed that retrieves performance counters from a Windows host and posts the values to sensors. + * + * <p>To use this feed, you must provide the entity, and a collection of mappings between Windows performance counter + * names and Brooklyn attribute sensors.</p> + * + * <p>This feed uses SSH to invoke the windows utility <tt>typeperf</tt> to query for a specific set of performance + * counters, by name. The values are extracted from the response, and published to the entity's sensors.</p> + * + * <p>Example:</p> + * + * {@code + * @Override + * protected void connectSensors() { + * WindowsPerformanceCounterFeed feed = WindowsPerformanceCounterFeed.builder() + * .entity(entity) + * .addSensor("\\Processor(_total)\\% Idle Time", CPU_IDLE_TIME) + * .addSensor("\\Memory\\Available MBytes", AVAILABLE_MEMORY) + * .build(); + * } + * } + * + * @since 0.6.0 + * @author richardcloudsoft + */ +public class WindowsPerformanceCounterFeed extends AbstractFeed { + + private static final Logger log = LoggerFactory.getLogger(WindowsPerformanceCounterFeed.class); + + // This pattern matches CSV line(s) with the date in the first field, and at least one further field. + protected static final Pattern lineWithPerfData = Pattern.compile("^\"[\\d:/\\-. ]+\",\".*\"$", Pattern.MULTILINE); + private static final Joiner JOINER_ON_SPACE = Joiner.on(' '); + private static final Joiner JOINER_ON_COMMA = Joiner.on(','); + private static final int OUTPUT_COLUMN_WIDTH = 100; + + @SuppressWarnings("serial") + public static final ConfigKey<Collection<WindowsPerformanceCounterPollConfig<?>>> POLLS = ConfigKeys.newConfigKey( + new TypeToken<Collection<WindowsPerformanceCounterPollConfig<?>>>() {}, + "polls"); + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private EntityLocal entity; + private Set<WindowsPerformanceCounterPollConfig<?>> polls = Sets.newLinkedHashSet(); + private Duration period = Duration.of(30, TimeUnit.SECONDS); + private String uniqueTag; + private volatile boolean built; + + public Builder entity(EntityLocal val) { + this.entity = checkNotNull(val, "entity"); + return this; + } + public Builder addSensor(WindowsPerformanceCounterPollConfig<?> config) { + polls.add(config); + return this; + } + public Builder addSensor(String performanceCounterName, AttributeSensor<?> sensor) { + return addSensor(new WindowsPerformanceCounterPollConfig(sensor).performanceCounterName(checkNotNull(performanceCounterName, "performanceCounterName"))); + } + public Builder addSensors(Map<String, AttributeSensor> sensors) { + for (Map.Entry<String, AttributeSensor> entry : sensors.entrySet()) { + addSensor(entry.getKey(), entry.getValue()); + } + return this; + } + public Builder period(Duration period) { + this.period = checkNotNull(period, "period"); + return this; + } + public Builder period(long millis) { + return period(millis, TimeUnit.MILLISECONDS); + } + public Builder period(long val, TimeUnit units) { + return period(Duration.of(val, units)); + } + public Builder uniqueTag(String uniqueTag) { + this.uniqueTag = uniqueTag; + return this; + } + public WindowsPerformanceCounterFeed build() { + built = true; + WindowsPerformanceCounterFeed result = new WindowsPerformanceCounterFeed(this); + result.setEntity(checkNotNull(entity, "entity")); + result.start(); + return result; + } + @Override + protected void finalize() { + if (!built) log.warn("WindowsPerformanceCounterFeed.Builder created, but build() never called"); + } + } + + /** + * For rebind; do not call directly; use builder + */ + public WindowsPerformanceCounterFeed() { + } + + protected WindowsPerformanceCounterFeed(Builder builder) { + List<WindowsPerformanceCounterPollConfig<?>> polls = Lists.newArrayList(); + for (WindowsPerformanceCounterPollConfig<?> config : builder.polls) { + if (!config.isEnabled()) continue; + @SuppressWarnings({ "unchecked", "rawtypes" }) + WindowsPerformanceCounterPollConfig<?> configCopy = new WindowsPerformanceCounterPollConfig(config); + if (configCopy.getPeriod() < 0) configCopy.period(builder.period); + polls.add(configCopy); + } + config().set(POLLS, polls); + initUniqueTag(builder.uniqueTag, polls); + } + + @Override + protected void preStart() { + Collection<WindowsPerformanceCounterPollConfig<?>> polls = getConfig(POLLS); + + long minPeriod = Integer.MAX_VALUE; + List<String> performanceCounterNames = Lists.newArrayList(); + for (WindowsPerformanceCounterPollConfig<?> config : polls) { + minPeriod = Math.min(minPeriod, config.getPeriod()); + performanceCounterNames.add(config.getPerformanceCounterName()); + } + + Iterable<String> allParams = ImmutableList.<String>builder() + .add("(Get-Counter") + .add("-Counter") + .add(JOINER_ON_COMMA.join(Iterables.transform(performanceCounterNames, QuoteStringFunction.INSTANCE))) + .add("-SampleInterval") + .add("2") // TODO: extract SampleInterval as a config key + .add(").CounterSamples") + .add("|") + .add("Format-Table") + .add(String.format("@{Expression={$_.Path};width=%d},@{Expression={$_.CookedValue};width=%<d}", OUTPUT_COLUMN_WIDTH)) + .add("-HideTableHeaders") + .add("|") + .add("Out-String") + .add("-Width") + .add(String.valueOf(OUTPUT_COLUMN_WIDTH * 2)) + .build(); + String command = JOINER_ON_SPACE.join(allParams); + log.debug("Windows performance counter poll command for {} will be: {}", entity, command); + + GetPerformanceCountersJob<WinRmToolResponse> job = new GetPerformanceCountersJob(getEntity(), command); + getPoller().scheduleAtFixedRate( + new CallInEntityExecutionContext(entity, job), + new SendPerfCountersToSensors(getEntity(), polls), + minPeriod); + } + + private static class GetPerformanceCountersJob<T> implements Callable<T> { + + private final Entity entity; + private final String command; + + GetPerformanceCountersJob(Entity entity, String command) { + this.entity = entity; + this.command = command; + } + + @Override + public T call() throws Exception { + WinRmMachineLocation machine = EffectorTasks.getWinRmMachine(entity); + WinRmToolResponse response = machine.executePsScript(command); + return (T)response; + } + } + + @SuppressWarnings("unchecked") + protected Poller<WinRmToolResponse> getPoller() { + return (Poller<WinRmToolResponse>) super.getPoller(); + } + + /** + * A {@link java.util.concurrent.Callable} that wraps another {@link java.util.concurrent.Callable}, where the + * inner {@link java.util.concurrent.Callable} is executed in the context of a + * specific entity. + * + * @param <T> The type of the {@link java.util.concurrent.Callable}. + */ + private static class CallInEntityExecutionContext<T> implements Callable<T> { + private final Callable<T> job; + private EntityLocal entity; + + private CallInEntityExecutionContext(EntityLocal entity, Callable<T> job) { + this.job = job; + this.entity = entity; + } + + @Override + public T call() throws Exception { + ExecutionContext executionContext = ((EntityInternal) entity).getManagementSupport().getExecutionContext(); + return executionContext.submit(Maps.newHashMap(), job).get(); + } + } + + @VisibleForTesting + static class SendPerfCountersToSensors implements PollHandler<WinRmToolResponse> { + private final EntityLocal entity; + private final List<WindowsPerformanceCounterPollConfig<?>> polls; + private final Set<AttributeSensor<?>> failedAttributes = Sets.newLinkedHashSet(); + private static final Pattern MACHINE_NAME_LOOKBACK_PATTERN = Pattern.compile(String.format("(?<=\\\\\\\\.{0,%d})\\\\.*", OUTPUT_COLUMN_WIDTH)); + + public SendPerfCountersToSensors(EntityLocal entity, Collection<WindowsPerformanceCounterPollConfig<?>> polls) { + this.entity = entity; + this.polls = ImmutableList.copyOf(polls); + } + + @Override + public boolean checkSuccess(WinRmToolResponse val) { + // TODO not just using statusCode; also looking at absence of stderr. + // Status code is (empirically) unreliable: it returns 0 sometimes even when failed + // (but never returns non-zero on success). + if (val.getStatusCode() != 0) return false; + String stderr = val.getStdErr(); + if (stderr == null || stderr.length() != 0) return false; + String out = val.getStdOut(); + if (out == null || out.length() == 0) return false; + return true; + } + + @Override + public void onSuccess(WinRmToolResponse val) { + for (String pollResponse : val.getStdOut().split("\r\n")) { + if (Strings.isNullOrEmpty(pollResponse)) { + continue; + } + String path = pollResponse.substring(0, OUTPUT_COLUMN_WIDTH - 1); + // The performance counter output prepends the sensor name with "\\<machinename>" so we need to remove it + Matcher machineNameLookbackMatcher = MACHINE_NAME_LOOKBACK_PATTERN.matcher(path); + if (!machineNameLookbackMatcher.find()) { + continue; + } + String name = machineNameLookbackMatcher.group(0).trim(); + String rawValue = pollResponse.substring(OUTPUT_COLUMN_WIDTH).replaceAll("^\\s+", ""); + WindowsPerformanceCounterPollConfig<?> config = getPollConfig(name); + Class<?> clazz = config.getSensor().getType(); + AttributeSensor<Object> attribute = (AttributeSensor<Object>) Sensors.newSensor(clazz, config.getSensor().getName(), config.getDescription()); + try { + Object value = TypeCoercions.coerce(rawValue, TypeToken.of(clazz)); + entity.setAttribute(attribute, value); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + if (failedAttributes.add(attribute)) { + log.warn("Failed to coerce value '{}' to {} for {} -> {}", new Object[] {rawValue, clazz, entity, attribute}); + } else { + if (log.isTraceEnabled()) log.trace("Failed (repeatedly) to coerce value '{}' to {} for {} -> {}", new Object[] {rawValue, clazz, entity, attribute}); + } + } + } + } + + @Override + public void onFailure(WinRmToolResponse val) { + log.error("Windows Performance Counter query did not respond as expected. exitcode={} stdout={} stderr={}", + new Object[]{val.getStatusCode(), val.getStdOut(), val.getStdErr()}); + for (WindowsPerformanceCounterPollConfig<?> config : polls) { + Class<?> clazz = config.getSensor().getType(); + AttributeSensor<?> attribute = Sensors.newSensor(clazz, config.getSensor().getName(), config.getDescription()); + entity.setAttribute(attribute, null); + } + } + + @Override + public void onException(Exception exception) { + log.error("Detected exception while retrieving Windows Performance Counters from entity " + + entity.getDisplayName(), exception); + for (WindowsPerformanceCounterPollConfig<?> config : polls) { + entity.setAttribute(Sensors.newSensor(config.getSensor().getClass(), config.getPerformanceCounterName(), config.getDescription()), null); + } + } + + @Override + public String getDescription() { + return "" + polls; + } + + @Override + public String toString() { + return super.toString()+"["+getDescription()+"]"; + } + + private WindowsPerformanceCounterPollConfig<?> getPollConfig(String sensorName) { + for (WindowsPerformanceCounterPollConfig<?> poll : polls) { + if (poll.getPerformanceCounterName().equalsIgnoreCase(sensorName)) { + return poll; + } + } + throw new IllegalStateException(String.format("%s not found in configured polls: %s", sensorName, polls)); + } + } + + static class PerfCounterValueIterator implements Iterator<String> { + + // This pattern matches the contents of the first field, and optionally matches the rest of the line as + // further fields. Feed the second match back into the pattern again to get the next field, and repeat until + // all fields are discovered. + protected static final Pattern splitPerfData = Pattern.compile("^\"([^\\\"]*)\"((,\"[^\\\"]*\")*)$"); + + private Matcher matcher; + + public PerfCounterValueIterator(String input) { + matcher = splitPerfData.matcher(input); + // Throw away the first element (the timestamp) (and also confirm that we have a pattern match) + checkArgument(hasNext(), "input "+input+" does not match expected pattern "+splitPerfData.pattern()); + next(); + } + + @Override + public boolean hasNext() { + return matcher != null && matcher.find(); + } + + @Override + public String next() { + String next = matcher.group(1); + + String remainder = matcher.group(2); + if (!Strings.isNullOrEmpty(remainder)) { + assert remainder.startsWith(","); + remainder = remainder.substring(1); + matcher = splitPerfData.matcher(remainder); + } else { + matcher = null; + } + + return next; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + private static enum QuoteStringFunction implements Function<String, String> { + INSTANCE; + + @Nullable + @Override + public String apply(@Nullable String input) { + return input != null ? "\"" + input + "\"" : null; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterPollConfig.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterPollConfig.java b/core/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterPollConfig.java new file mode 100644 index 0000000..1391c3e --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterPollConfig.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.feed.windows; + +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.core.feed.PollConfig; + +import com.google.common.base.Function; +import com.google.common.base.Functions; + +public class WindowsPerformanceCounterPollConfig<T> extends PollConfig<Object, T, WindowsPerformanceCounterPollConfig<T>>{ + + private String performanceCounterName; + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public WindowsPerformanceCounterPollConfig(AttributeSensor<T> sensor) { + super(sensor); + description(sensor.getDescription()); + onSuccess((Function)Functions.identity()); + } + + public WindowsPerformanceCounterPollConfig(WindowsPerformanceCounterPollConfig<T> other) { + super(other); + this.performanceCounterName = other.performanceCounterName; + } + + public String getPerformanceCounterName() { + return performanceCounterName; + } + + public WindowsPerformanceCounterPollConfig<T> performanceCounterName(String val) { + this.performanceCounterName = val; return this; + } + + @Override protected String toStringPollSource() { return performanceCounterName; } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/AbstractFeed.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/AbstractFeed.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/AbstractFeed.java deleted file mode 100644 index 327181a..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/feed/AbstractFeed.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.feed; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.Collection; - -import org.apache.brooklyn.api.entity.EntityLocal; -import org.apache.brooklyn.api.mgmt.rebind.RebindSupport; -import org.apache.brooklyn.api.mgmt.rebind.mementos.FeedMemento; -import org.apache.brooklyn.api.sensor.Feed; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.BrooklynFeatureEnablement; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.core.entity.EntityInternal; -import org.apache.brooklyn.core.mgmt.rebind.BasicFeedRebindSupport; -import org.apache.brooklyn.core.objs.AbstractEntityAdjunct; -import org.apache.brooklyn.util.javalang.JavaClassNames; -import org.apache.brooklyn.util.text.Strings; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Captures common fields and processes for sensor feeds. - * These generally poll or subscribe to get sensor values for an entity. - * They make it easy to poll over http, jmx, etc. - */ -public abstract class AbstractFeed extends AbstractEntityAdjunct implements Feed { - - private static final Logger log = LoggerFactory.getLogger(AbstractFeed.class); - - public static final ConfigKey<Boolean> ONLY_IF_SERVICE_UP = ConfigKeys.newBooleanConfigKey("feed.onlyIfServiceUp", "", false); - - private final Object pollerStateMutex = new Object(); - private transient volatile Poller<?> poller; - private transient volatile boolean activated; - private transient volatile boolean suspended; - - public AbstractFeed() { - } - - /** - * @deprecated since 0.7.0; use no-arg constructor; call {@link #setEntity(EntityLocal)} - */ - @Deprecated - public AbstractFeed(EntityLocal entity) { - this(entity, false); - } - - /** - * @deprecated since 0.7.0; use no-arg constructor; call {@link #setEntity(EntityLocal)} and {@code setConfig(ONLY_IF_SERVICE_UP, onlyIfServiceUp)} - */ - @Deprecated - public AbstractFeed(EntityLocal entity, boolean onlyIfServiceUp) { - this.entity = checkNotNull(entity, "entity"); - setConfig(ONLY_IF_SERVICE_UP, onlyIfServiceUp); - } - - // Ensure idempotent, as called in builders (in case not registered with entity), and also called - // when registering with entity - @Override - public void setEntity(EntityLocal entity) { - super.setEntity(entity); - if (BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_FEED_REGISTRATION_PROPERTY)) { - ((EntityInternal)entity).feeds().addFeed(this); - } - } - - protected void initUniqueTag(String uniqueTag, Object ...valsForDefault) { - if (Strings.isNonBlank(uniqueTag)) this.uniqueTag = uniqueTag; - else this.uniqueTag = getDefaultUniqueTag(valsForDefault); - } - - protected String getDefaultUniqueTag(Object ...valsForDefault) { - StringBuilder sb = new StringBuilder(); - sb.append(JavaClassNames.simpleClassName(this)); - if (valsForDefault.length==0) { - sb.append("@"); - sb.append(hashCode()); - } else if (valsForDefault.length==1 && valsForDefault[0] instanceof Collection){ - sb.append(Strings.toUniqueString(valsForDefault[0], 80)); - } else { - sb.append("["); - boolean first = true; - for (Object x: valsForDefault) { - if (!first) sb.append(";"); - else first = false; - sb.append(Strings.toUniqueString(x, 80)); - } - sb.append("]"); - } - return sb.toString(); - } - - @Override - public void start() { - if (log.isDebugEnabled()) log.debug("Starting feed {} for {}", this, entity); - if (activated) { - throw new IllegalStateException(String.format("Attempt to start feed %s of entity %s when already running", - this, entity)); - } - if (poller != null) { - throw new IllegalStateException(String.format("Attempt to re-start feed %s of entity %s", this, entity)); - } - - poller = new Poller<Object>(entity, getConfig(ONLY_IF_SERVICE_UP)); - activated = true; - preStart(); - synchronized (pollerStateMutex) { - // don't start poller if we are suspended - if (!suspended) { - poller.start(); - } - } - } - - @Override - public void suspend() { - synchronized (pollerStateMutex) { - if (activated && !suspended) { - poller.stop(); - } - suspended = true; - } - } - - @Override - public void resume() { - synchronized (pollerStateMutex) { - if (activated && suspended) { - poller.start(); - } - suspended = false; - } - } - - @Override - public void destroy() { - stop(); - } - - @Override - public void stop() { - if (!activated) { - log.debug("Ignoring attempt to stop feed {} of entity {} when not running", this, entity); - return; - } - if (log.isDebugEnabled()) log.debug("stopping feed {} for {}", this, entity); - - activated = false; - preStop(); - synchronized (pollerStateMutex) { - if (!suspended) { - poller.stop(); - } - } - postStop(); - super.destroy(); - } - - @Override - public boolean isActivated() { - return activated; - } - - public EntityLocal getEntity() { - return entity; - } - - protected boolean isConnected() { - // TODO Default impl will result in multiple logs for same error if becomes unreachable - // (e.g. if ssh gets NoRouteToHostException, then every AttributePollHandler for that - // feed will log.warn - so if polling for 10 sensors/attributes will get 10 log messages). - // Would be nice if reduced this logging duplication. - // (You can reduce it by providing a better 'isConnected' implementation of course.) - return isRunning() && entity!=null && !((EntityInternal)entity).getManagementSupport().isNoLongerManaged(); - } - - @Override - public boolean isSuspended() { - return suspended; - } - - @Override - public boolean isRunning() { - return isActivated() && !isSuspended() && !isDestroyed() && getPoller()!=null && getPoller().isRunning(); - } - - @Override - public RebindSupport<FeedMemento> getRebindSupport() { - return new BasicFeedRebindSupport(this); - } - - @Override - protected void onChanged() { - // TODO Auto-generated method stub - } - - /** - * For overriding. - */ - protected void preStart() { - } - - /** - * For overriding. - */ - protected void preStop() { - } - - /** - * For overriding. - */ - protected void postStop() { - } - - /** - * For overriding, where sub-class can change return-type generics! - */ - protected Poller<?> getPoller() { - return poller; - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/AttributePollHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/AttributePollHandler.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/AttributePollHandler.java deleted file mode 100644 index eac972e..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/feed/AttributePollHandler.java +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.feed; - -import static com.google.common.base.Preconditions.checkNotNull; - -import org.apache.brooklyn.api.entity.EntityLocal; -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.core.entity.Attributes; -import org.apache.brooklyn.core.entity.Entities; -import org.apache.brooklyn.core.entity.EntityInternal; -import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; -import org.apache.brooklyn.core.entity.lifecycle.Lifecycle.Transition; -import org.apache.brooklyn.util.core.flags.TypeCoercions; -import org.apache.brooklyn.util.core.task.Tasks; -import org.apache.brooklyn.util.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Objects; - -/** - * Handler for when polling an entity's attribute. On each poll result the entity's attribute is set. - * - * Calls to onSuccess and onError will happen sequentially, but may be called from different threads - * each time. Note that no guarantees of a synchronized block exist, so additional synchronization - * would be required for the Java memory model's "happens before" relationship. - * - * @author aled - */ -public class AttributePollHandler<V> implements PollHandler<V> { - - public static final Logger log = LoggerFactory.getLogger(AttributePollHandler.class); - - private final FeedConfig<V,?,?> config; - private final EntityLocal entity; - @SuppressWarnings("rawtypes") - private final AttributeSensor sensor; - private final AbstractFeed feed; - private final boolean suppressDuplicates; - - // allow 30 seconds before logging at WARN, if there has been no success yet; - // after success WARN immediately - // TODO these should both be configurable - private Duration logWarningGraceTimeOnStartup = Duration.THIRTY_SECONDS; - private Duration logWarningGraceTime = Duration.millis(0); - - // internal state to look after whether to log warnings - private volatile Long lastSuccessTime = null; - private volatile Long currentProblemStartTime = null; - private volatile boolean currentProblemLoggedAsWarning = false; - private volatile boolean lastWasProblem = false; - - - public AttributePollHandler(FeedConfig<V,?,?> config, EntityLocal entity, AbstractFeed feed) { - this.config = checkNotNull(config, "config"); - this.entity = checkNotNull(entity, "entity"); - this.sensor = checkNotNull(config.getSensor(), "sensor"); - this.feed = checkNotNull(feed, "feed"); - this.suppressDuplicates = config.getSupressDuplicates(); - } - - @Override - public boolean checkSuccess(V val) { - // Always true if no checkSuccess predicate was configured. - return !config.hasCheckSuccessHandler() || config.getCheckSuccess().apply(val); - } - - @Override - public void onSuccess(V val) { - if (lastWasProblem) { - if (currentProblemLoggedAsWarning) { - log.info("Success (following previous problem) reading "+getBriefDescription()); - } else { - log.debug("Success (following previous problem) reading "+getBriefDescription()); - } - lastWasProblem = false; - currentProblemStartTime = null; - currentProblemLoggedAsWarning = false; - } - lastSuccessTime = System.currentTimeMillis(); - if (log.isTraceEnabled()) log.trace("poll for {} got: {}", new Object[] {getBriefDescription(), val}); - - try { - setSensor(transformValueOnSuccess(val)); - } catch (Exception e) { - if (feed.isConnected()) { - log.warn("unable to compute "+getBriefDescription()+"; on val="+val, e); - } else { - if (log.isDebugEnabled()) log.debug("unable to compute "+getBriefDescription()+"; val="+val+" (when inactive)", e); - } - } - } - - /** allows post-processing, such as applying a success handler; - * default applies the onSuccess handler (which is recommended) */ - protected Object transformValueOnSuccess(V val) { - return config.hasSuccessHandler() ? config.getOnSuccess().apply(val) : val; - } - - @Override - public void onFailure(V val) { - if (!config.hasFailureHandler()) { - onException(new Exception("checkSuccess of "+this+" for "+getBriefDescription()+" was false but poller has no failure handler")); - } else { - logProblem("failure", val); - - try { - setSensor(config.hasFailureHandler() ? config.getOnFailure().apply((V)val) : val); - } catch (Exception e) { - if (feed.isConnected()) { - log.warn("Error computing " + getBriefDescription() + "; val=" + val+": "+ e, e); - } else { - if (log.isDebugEnabled()) - log.debug("Error computing " + getBriefDescription() + "; val=" + val + " (when inactive)", e); - } - } - } - } - - @Override - public void onException(Exception exception) { - if (!feed.isConnected()) { - if (log.isTraceEnabled()) log.trace("Read of {} in {} gave exception (while not connected or not yet connected): {}", new Object[] {this, getBriefDescription(), exception}); - } else { - logProblem("exception", exception); - } - - if (config.hasExceptionHandler()) { - try { - setSensor( config.getOnException().apply(exception) ); - } catch (Exception e) { - if (feed.isConnected()) { - log.warn("unable to compute "+getBriefDescription()+"; on exception="+exception, e); - } else { - if (log.isDebugEnabled()) log.debug("unable to compute "+getBriefDescription()+"; exception="+exception+" (when inactive)", e); - } - } - } - } - - protected void logProblem(String type, Object val) { - if (lastWasProblem && currentProblemLoggedAsWarning) { - if (log.isTraceEnabled()) - log.trace("Recurring {} reading {} in {}: {}", new Object[] {type, this, getBriefDescription(), val}); - } else { - long nowTime = System.currentTimeMillis(); - // get a non-volatile value - Long currentProblemStartTimeCache = currentProblemStartTime; - long expiryTime = - (lastSuccessTime!=null && !isTransitioningOrStopped()) ? lastSuccessTime+logWarningGraceTime.toMilliseconds() : - currentProblemStartTimeCache!=null ? currentProblemStartTimeCache+logWarningGraceTimeOnStartup.toMilliseconds() : - nowTime+logWarningGraceTimeOnStartup.toMilliseconds(); - if (!lastWasProblem) { - if (expiryTime <= nowTime) { - currentProblemLoggedAsWarning = true; - if (entity==null || !Entities.isNoLongerManaged(entity)) { - log.warn("Read of " + getBriefDescription() + " gave " + type + ": " + val); - } else { - log.debug("Read of " + getBriefDescription() + " gave " + type + ": " + val); - } - if (log.isDebugEnabled() && val instanceof Throwable) - log.debug("Trace for "+type+" reading "+getBriefDescription()+": "+val, (Throwable)val); - } else { - if (log.isDebugEnabled()) - log.debug("Read of " + getBriefDescription() + " gave " + type + " (in grace period): " + val); - } - lastWasProblem = true; - currentProblemStartTime = nowTime; - } else { - if (expiryTime <= nowTime) { - currentProblemLoggedAsWarning = true; - log.warn("Read of " + getBriefDescription() + " gave " + type + - " (grace period expired, occurring for "+Duration.millis(nowTime - currentProblemStartTimeCache)+ - (config.hasExceptionHandler() ? "" : ", no exception handler set for sensor")+ - ")"+ - ": " + val); - if (log.isDebugEnabled() && val instanceof Throwable) - log.debug("Trace for "+type+" reading "+getBriefDescription()+": "+val, (Throwable)val); - } else { - if (log.isDebugEnabled()) - log.debug("Recurring {} reading {} in {} (still in grace period): {}", new Object[] {type, this, getBriefDescription(), val}); - } - } - } - } - - protected boolean isTransitioningOrStopped() { - if (entity==null) return false; - Transition expected = entity.getAttribute(Attributes.SERVICE_STATE_EXPECTED); - if (expected==null) return false; - return (expected.getState()==Lifecycle.STARTING || expected.getState()==Lifecycle.STOPPING || expected.getState()==Lifecycle.STOPPED); - } - - @SuppressWarnings("unchecked") - protected void setSensor(Object v) { - if (Entities.isNoLongerManaged(entity)) { - if (Tasks.isInterrupted()) return; - log.warn(""+entity+" is not managed; feed "+this+" setting "+sensor+" to "+v+" at this time is not supported ("+Tasks.current()+")"); - } - - if (v == FeedConfig.UNCHANGED) { - // nothing - } else if (v == FeedConfig.REMOVE) { - ((EntityInternal)entity).removeAttribute(sensor); - } else if (sensor == FeedConfig.NO_SENSOR) { - // nothing - } else { - Object coercedV = TypeCoercions.coerce(v, sensor.getType()); - if (suppressDuplicates && Objects.equal(coercedV, entity.getAttribute(sensor))) { - // no change; nothing - } else { - entity.setAttribute(sensor, coercedV); - } - } - } - - @Override - public String toString() { - return super.toString()+"["+getDescription()+"]"; - } - - @Override - public String getDescription() { - return sensor.getName()+" @ "+entity.getId()+" <- "+config; - } - - protected String getBriefDescription() { - return ""+entity+"->"+(sensor==FeedConfig.NO_SENSOR ? "(dynamic sensors)" : ""+sensor); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/ConfigToAttributes.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/ConfigToAttributes.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/ConfigToAttributes.java deleted file mode 100644 index 7938cc4..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/feed/ConfigToAttributes.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.feed; - -import org.apache.brooklyn.api.entity.EntityLocal; -import org.apache.brooklyn.api.mgmt.ManagementContext; -import org.apache.brooklyn.api.sensor.Sensor; -import org.apache.brooklyn.core.sensor.AttributeSensorAndConfigKey; -import org.apache.brooklyn.core.sensor.TemplatedStringAttributeSensorAndConfigKey; - - -/** Simple config adapter for setting {@link AttributeSensorAndConfigKey} sensor values from the config value or config default */ -public class ConfigToAttributes { - - //normally just applied once, statically, not registered... - public static void apply(EntityLocal entity) { - for (Sensor<?> it : entity.getEntityType().getSensors()) { - if (it instanceof AttributeSensorAndConfigKey) { - apply(entity, (AttributeSensorAndConfigKey<?,?>)it); - } - } - } - - /** - * Convenience for ensuring an individual sensor is set from its config key - * (e.g. sub-classes of DynamicWebAppCluster that don't want to set HTTP_PORT etc!) - */ - public static <T> T apply(EntityLocal entity, AttributeSensorAndConfigKey<?,T> key) { - T v = entity.getAttribute(key); - if (v!=null) return v; - v = key.getAsSensorValue(entity); - if (v!=null) entity.setAttribute(key, v); - return v; - } - - /** - * Convenience for transforming a config value (e.g. processing a {@link TemplatedStringAttributeSensorAndConfigKey}), - * outside of the context of an entity. - */ - public static <T> T transform(ManagementContext managementContext, AttributeSensorAndConfigKey<?,T> key) { - return key.getAsSensorValue(managementContext); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/DelegatingPollHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/DelegatingPollHandler.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/DelegatingPollHandler.java deleted file mode 100644 index 4433e83..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/feed/DelegatingPollHandler.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.feed; - -import java.util.List; - -import com.google.common.collect.ImmutableList; - -/** - * A poll handler that delegates each call to a set of poll handlers. - * - * @author aled - */ -public class DelegatingPollHandler<V> implements PollHandler<V> { - - private final List<AttributePollHandler<? super V>> delegates; - - public DelegatingPollHandler(Iterable<AttributePollHandler<? super V>> delegates) { - super(); - this.delegates = ImmutableList.copyOf(delegates); - } - - @Override - public boolean checkSuccess(V val) { - for (AttributePollHandler<? super V> delegate : delegates) { - if (!delegate.checkSuccess(val)) - return false; - } - return true; - } - - @Override - public void onSuccess(V val) { - for (AttributePollHandler<? super V> delegate : delegates) { - delegate.onSuccess(val); - } - } - - @Override - public void onFailure(V val) { - for (AttributePollHandler<? super V> delegate : delegates) { - delegate.onFailure(val); - } - } - - @Override - public void onException(Exception exception) { - for (AttributePollHandler<? super V> delegate : delegates) { - delegate.onException(exception); - } - } - - @Override - public String toString() { - return super.toString()+"["+getDescription()+"]"; - } - - @Override - public String getDescription() { - if (delegates.isEmpty()) - return "(empty delegate list)"; - if (delegates.size()==1) - return delegates.get(0).getDescription(); - StringBuilder sb = new StringBuilder(); - sb.append("["); - int count = 0; - for (AttributePollHandler<? super V> delegate : delegates) { - if (count>0) sb.append("; "); - sb.append(delegate.getDescription()); - if (count>2) { - sb.append("; ..."); - break; - } - count++; - } - sb.append("]"); - return sb.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/FeedConfig.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/FeedConfig.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/FeedConfig.java deleted file mode 100644 index 32ea2ab..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/feed/FeedConfig.java +++ /dev/null @@ -1,297 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.feed; - -import static com.google.common.base.Preconditions.checkNotNull; - -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.core.entity.Entities; -import org.apache.brooklyn.core.sensor.Sensors; -import org.apache.brooklyn.sensor.feed.http.HttpPollConfig; -import org.apache.brooklyn.util.collections.MutableList; -import org.apache.brooklyn.util.guava.Functionals; -import org.apache.brooklyn.util.javalang.JavaClassNames; -import org.apache.brooklyn.util.text.Strings; - -import com.google.common.base.Function; -import com.google.common.base.Functions; -import com.google.common.base.Objects; -import com.google.common.base.Predicate; - -/** - * Configuration for a poll, or a subscription etc, that is being added to a feed. - * - * @author aled - */ -public class FeedConfig<V, T, F extends FeedConfig<V, T, F>> { - - /** The onSuccess or onError functions can return this value to indicate that the sensor should not change. - * @deprecated since 0.7.0 use UNCHANGED */ - public static final Object UNSET = Entities.UNCHANGED; - /** The onSuccess or onError functions can return this value to indicate that the sensor should not change. */ - public static final Object UNCHANGED = Entities.UNCHANGED; - /** The onSuccess or onError functions can return this value to indicate that the sensor value should be removed - * (cf 'null', but useful in dynamic situations) */ - public static final Object REMOVE = Entities.REMOVE; - - /** Indicates that no sensor is being used here. This sensor is suppressed, - * but is useful where you want to use the feeds with custom success/exception/failure functions - * which directly set multiple sensors, e.g. dynamically based on the poll response. - * <p> - * See {@link HttpPollConfig#forMultiple()} and its usages. - * (It can work for any poll config, but conveniences have not been supplied for others.) */ - public static final AttributeSensor<Void> NO_SENSOR = Sensors.newSensor(Void.class, "brooklyn.no.sensor"); - - private final AttributeSensor<T> sensor; - private Function<? super V, T> onsuccess; - private Function<? super V, T> onfailure; - private Function<? super Exception, T> onexception; - private Predicate<? super V> checkSuccess; - private boolean suppressDuplicates; - private boolean enabled = true; - - public FeedConfig(AttributeSensor<T> sensor) { - this.sensor = checkNotNull(sensor, "sensor"); - } - - public FeedConfig(FeedConfig<V, T, F> other) { - this.sensor = other.sensor; - this.onsuccess = other.onsuccess; - this.onfailure = other.onfailure; - this.onexception = other.onexception; - this.checkSuccess = other.checkSuccess; - this.suppressDuplicates = other.suppressDuplicates; - this.enabled = other.enabled; - } - - @SuppressWarnings("unchecked") - protected F self() { - return (F) this; - } - - public AttributeSensor<T> getSensor() { - return sensor; - } - - public Predicate<? super V> getCheckSuccess() { - return checkSuccess; - } - - public Function<? super V, T> getOnSuccess() { - return onsuccess; - } - - public Function<? super V, T> getOnFailure() { - return onfailure; - } - - public Function<? super Exception, T> getOnException() { - return onexception; - } - - public boolean getSupressDuplicates() { - return suppressDuplicates; - } - - public boolean isEnabled() { - return enabled; - } - - /** sets the predicate used to check whether a feed run is successful */ - public F checkSuccess(Predicate<? super V> val) { - this.checkSuccess = checkNotNull(val, "checkSuccess"); - return self(); - } - /** as {@link #checkSuccess(Predicate)} */ - public F checkSuccess(final Function<? super V,Boolean> val) { - return checkSuccess(Functionals.predicate(val)); - } - @SuppressWarnings("unused") - /** @deprecated since 0.7.0, kept for rebind */ @Deprecated - private F checkSuccessLegacy(final Function<? super V,Boolean> val) { - return checkSuccess(new Predicate<V>() { - @Override - public boolean apply(V input) { - return val.apply(input); - } - }); - } - - public F onSuccess(Function<? super V,T> val) { - this.onsuccess = checkNotNull(val, "onSuccess"); - return self(); - } - - public F setOnSuccess(T val) { - return onSuccess(Functions.constant(val)); - } - - /** a failure is when the connection is fine (no exception) but the other end returns a result object V - * which the feed can tell indicates a failure (e.g. HTTP code 404) */ - public F onFailure(Function<? super V,T> val) { - this.onfailure = checkNotNull(val, "onFailure"); - return self(); - } - - public F setOnFailure(T val) { - return onFailure(Functions.constant(val)); - } - - /** registers a callback to be used {@link #onSuccess(Function)} and {@link #onFailure(Function)}, - * i.e. whenever a result comes back, but not in case of exceptions being thrown (ie problems communicating) */ - public F onResult(Function<? super V, T> val) { - onSuccess(val); - return onFailure(val); - } - - public F setOnResult(T val) { - return onResult(Functions.constant(val)); - } - - /** an exception is when there is an error in the communication */ - public F onException(Function<? super Exception,T> val) { - this.onexception = checkNotNull(val, "onException"); - return self(); - } - - public F setOnException(T val) { - return onException(Functions.constant(val)); - } - - /** convenience for indicating a behaviour to occur for both - * {@link #onException(Function)} - * (error connecting) and - * {@link #onFailure(Function)} - * (successful communication but failure report from remote end) */ - public F onFailureOrException(Function<Object,T> val) { - onFailure(val); - return onException(val); - } - - public F setOnFailureOrException(T val) { - return onFailureOrException(Functions.constant(val)); - } - - public F suppressDuplicates(boolean val) { - suppressDuplicates = val; - return self(); - } - - /** - * Whether this feed is enabled (defaulting to true). - */ - public F enabled(boolean val) { - enabled = val; - return self(); - } - - public boolean hasSuccessHandler() { - return this.onsuccess != null; - } - - public boolean hasFailureHandler() { - return this.onfailure != null; - } - - public boolean hasExceptionHandler() { - return this.onexception != null; - } - - public boolean hasCheckSuccessHandler() { - return this.checkSuccess != null; - } - - - @Override - public String toString() { - StringBuilder result = new StringBuilder(); - result.append(toStringBaseName()); - result.append("["); - boolean contents = false; - Object source = toStringPollSource(); - AttributeSensor<T> s = getSensor(); - if (Strings.isNonBlank(Strings.toString(source))) { - result.append(Strings.toUniqueString(source, 40)); - if (s!=null) { - result.append("->"); - result.append(s.getName()); - } - contents = true; - } else if (s!=null) { - result.append(s.getName()); - contents = true; - } - MutableList<Object> fields = toStringOtherFields(); - if (fields!=null) { - for (Object field: fields) { - if (Strings.isNonBlank(Strings.toString(field))) { - if (contents) result.append(";"); - contents = true; - result.append(field); - } - } - } - result.append("]"); - return result.toString(); - } - - /** can be overridden to supply a simpler base name than the class name */ - protected String toStringBaseName() { - return JavaClassNames.simpleClassName(this); - } - /** can be overridden to supply add'l info for the {@link #toString()}; subclasses can add to the returned value */ - protected MutableList<Object> toStringOtherFields() { - return MutableList.<Object>of(); - } - /** can be overridden to supply add'l info for the {@link #toString()}, placed before the sensor with -> */ - protected Object toStringPollSource() { - return null; - } - /** all configs should supply a unique tag element, inserted into the feed */ - protected String getUniqueTag() { - return toString(); - } - - /** returns fields which should be used for equality, including by default {@link #toStringOtherFields()} and {@link #toStringPollSource()}; - * subclasses can add to the returned value */ - protected MutableList<Object> equalsFields() { - MutableList<Object> result = MutableList.of().appendIfNotNull(getSensor()).appendIfNotNull(toStringPollSource()); - for (Object field: toStringOtherFields()) result.appendIfNotNull(field); - return result; - } - - @Override - public int hashCode() { - int hc = super.hashCode(); - for (Object f: equalsFields()) - hc = Objects.hashCode(hc, f); - return hc; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) return true; - if (!super.equals(obj)) return false; - PollConfig<?,?,?> other = (PollConfig<?,?,?>) obj; - if (!Objects.equal(getUniqueTag(), other.getUniqueTag())) return false; - if (!Objects.equal(equalsFields(), other.equalsFields())) return false; - return true; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/PollConfig.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/PollConfig.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/PollConfig.java deleted file mode 100644 index 01a561b..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/feed/PollConfig.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.feed; - -import static com.google.common.base.Preconditions.checkArgument; - -import java.util.concurrent.TimeUnit; - -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.util.collections.MutableList; -import org.apache.brooklyn.util.time.Duration; - -/** - * Configuration for polling, which is being added to a feed (e.g. to poll a given URL over http). - * - * @author aled - */ -public class PollConfig<V, T, F extends PollConfig<V, T, F>> extends FeedConfig<V, T, F> { - - private long period = -1; - private String description; - - public PollConfig(AttributeSensor<T> sensor) { - super(sensor); - } - - public PollConfig(PollConfig<V,T,F> other) { - super(other); - this.period = other.period; - } - - public long getPeriod() { - return period; - } - - public F period(Duration val) { - checkArgument(val.toMilliseconds() >= 0, "period must be greater than or equal to zero"); - this.period = val.toMilliseconds(); - return self(); - } - - public F period(long val) { - checkArgument(val >= 0, "period must be greater than or equal to zero"); - this.period = val; return self(); - } - - public F period(long val, TimeUnit units) { - checkArgument(val >= 0, "period must be greater than or equal to zero"); - return period(units.toMillis(val)); - } - - public F description(String description) { - this.description = description; - return self(); - } - - public String getDescription() { - return description; - } - - @Override protected MutableList<Object> toStringOtherFields() { - return super.toStringOtherFields().appendIfNotNull(description); - } - - @Override - protected MutableList<Object> equalsFields() { - return super.equalsFields().appendIfNotNull(period); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/PollHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/PollHandler.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/PollHandler.java deleted file mode 100644 index 175c76f..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/feed/PollHandler.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.feed; - -/** - * Notified by the Poller of the result for each job, on each poll. - * - * @author aled - */ -public interface PollHandler<V> { - - public boolean checkSuccess(V val); - - public void onSuccess(V val); - - public void onFailure(V val); - - public void onException(Exception exception); - - public String getDescription(); - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/Poller.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/Poller.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/Poller.java deleted file mode 100644 index f6e8e24..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/feed/Poller.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.feed; - -import java.util.LinkedHashSet; -import java.util.Set; -import java.util.concurrent.Callable; - -import org.apache.brooklyn.api.entity.EntityLocal; -import org.apache.brooklyn.api.mgmt.Task; -import org.apache.brooklyn.core.entity.Attributes; -import org.apache.brooklyn.core.entity.Entities; -import org.apache.brooklyn.core.entity.EntityInternal; -import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; -import org.apache.brooklyn.util.collections.MutableMap; -import org.apache.brooklyn.util.core.task.DynamicSequentialTask; -import org.apache.brooklyn.util.core.task.ScheduledTask; -import org.apache.brooklyn.util.core.task.Tasks; -import org.apache.brooklyn.util.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Objects; - - -/** - * For executing periodic polls. - * Jobs are added to the schedule, and then the poller is started. - * The jobs will then be executed periodically, and the handler called for the result/failure. - * - * Assumes the schedule+start will be done single threaded, and that stop will not be done concurrently. - */ -public class Poller<V> { - public static final Logger log = LoggerFactory.getLogger(Poller.class); - - private final EntityLocal entity; - private final boolean onlyIfServiceUp; - private final Set<Callable<?>> oneOffJobs = new LinkedHashSet<Callable<?>>(); - private final Set<PollJob<V>> pollJobs = new LinkedHashSet<PollJob<V>>(); - private final Set<Task<?>> oneOffTasks = new LinkedHashSet<Task<?>>(); - private final Set<ScheduledTask> tasks = new LinkedHashSet<ScheduledTask>(); - private volatile boolean started = false; - - private static class PollJob<V> { - final PollHandler<? super V> handler; - final Duration pollPeriod; - final Runnable wrappedJob; - private boolean loggedPreviousException = false; - - PollJob(final Callable<V> job, final PollHandler<? super V> handler, Duration period) { - this.handler = handler; - this.pollPeriod = period; - - wrappedJob = new Runnable() { - public void run() { - try { - V val = job.call(); - loggedPreviousException = false; - if (handler.checkSuccess(val)) { - handler.onSuccess(val); - } else { - handler.onFailure(val); - } - } catch (Exception e) { - if (loggedPreviousException) { - if (log.isTraceEnabled()) log.trace("PollJob for {}, repeated consecutive failures, handling {} using {}", new Object[] {job, e, handler}); - } else { - if (log.isDebugEnabled()) log.debug("PollJob for {} handling {} using {}", new Object[] {job, e, handler}); - loggedPreviousException = true; - } - handler.onException(e); - } - } - }; - } - } - - /** @deprecated since 0.7.0, pass in whether should run onlyIfServiceUp */ - @Deprecated - public Poller(EntityLocal entity) { - this(entity, false); - } - public Poller(EntityLocal entity, boolean onlyIfServiceUp) { - this.entity = entity; - this.onlyIfServiceUp = onlyIfServiceUp; - } - - /** Submits a one-off poll job; recommended that callers supply to-String so that task has a decent description */ - public void submit(Callable<?> job) { - if (started) { - throw new IllegalStateException("Cannot submit additional tasks after poller has started"); - } - oneOffJobs.add(job); - } - - public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> handler, long period) { - scheduleAtFixedRate(job, handler, Duration.millis(period)); - } - public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> handler, Duration period) { - if (started) { - throw new IllegalStateException("Cannot schedule additional tasks after poller has started"); - } - PollJob<V> foo = new PollJob<V>(job, handler, period); - pollJobs.add(foo); - } - - @SuppressWarnings({ "unchecked" }) - public void start() { - // TODO Previous incarnation of this logged this logged polledSensors.keySet(), but we don't know that anymore - // Is that ok, are can we do better? - - if (log.isDebugEnabled()) log.debug("Starting poll for {} (using {})", new Object[] {entity, this}); - if (started) { - throw new IllegalStateException(String.format("Attempt to start poller %s of entity %s when already running", - this, entity)); - } - - started = true; - - for (final Callable<?> oneOffJob : oneOffJobs) { - Task<?> task = Tasks.builder().dynamic(false).body((Callable<Object>) oneOffJob).name("Poll").description("One-time poll job "+oneOffJob).build(); - oneOffTasks.add(((EntityInternal)entity).getExecutionContext().submit(task)); - } - - for (final PollJob<V> pollJob : pollJobs) { - final String scheduleName = pollJob.handler.getDescription(); - if (pollJob.pollPeriod.compareTo(Duration.ZERO) > 0) { - Callable<Task<?>> pollingTaskFactory = new Callable<Task<?>>() { - public Task<?> call() { - DynamicSequentialTask<Void> task = new DynamicSequentialTask<Void>(MutableMap.of("displayName", scheduleName, "entity", entity), - new Callable<Void>() { public Void call() { - if (onlyIfServiceUp && !Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP))) { - return null; - } - pollJob.wrappedJob.run(); - return null; - } } ); - BrooklynTaskTags.setTransient(task); - return task; - } - }; - ScheduledTask task = new ScheduledTask(MutableMap.of("period", pollJob.pollPeriod, "displayName", "scheduled:"+scheduleName), pollingTaskFactory); - tasks.add((ScheduledTask)Entities.submit(entity, task)); - } else { - if (log.isDebugEnabled()) log.debug("Activating poll (but leaving off, as period {}) for {} (using {})", new Object[] {pollJob.pollPeriod, entity, this}); - } - } - } - - public void stop() { - if (log.isDebugEnabled()) log.debug("Stopping poll for {} (using {})", new Object[] {entity, this}); - if (!started) { - throw new IllegalStateException(String.format("Attempt to stop poller %s of entity %s when not running", - this, entity)); - } - - started = false; - for (Task<?> task : oneOffTasks) { - if (task != null) task.cancel(true); - } - for (ScheduledTask task : tasks) { - if (task != null) task.cancel(); - } - oneOffTasks.clear(); - tasks.clear(); - } - - public boolean isRunning() { - boolean hasActiveTasks = false; - for (Task<?> task: tasks) { - if (task.isBegun() && !task.isDone()) { - hasActiveTasks = true; - break; - } - } - if (!started && hasActiveTasks) { - log.warn("Poller should not be running, but has active tasks, tasks: "+tasks); - } - return started && hasActiveTasks; - } - - protected boolean isEmpty() { - return pollJobs.isEmpty(); - } - - public String toString() { - return Objects.toStringHelper(this).add("entity", entity).toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/function/FunctionFeed.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/function/FunctionFeed.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/function/FunctionFeed.java deleted file mode 100644 index 1cb6861..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/feed/function/FunctionFeed.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.feed.function; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.List; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; - -import org.apache.brooklyn.api.entity.EntityLocal; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.sensor.feed.AbstractFeed; -import org.apache.brooklyn.sensor.feed.AttributePollHandler; -import org.apache.brooklyn.sensor.feed.DelegatingPollHandler; -import org.apache.brooklyn.util.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Objects; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.SetMultimap; -import com.google.common.collect.Sets; -import com.google.common.reflect.TypeToken; - -/** - * Provides a feed of attribute values, by periodically invoking functions. - * - * Example usage (e.g. in an entity that extends SoftwareProcessImpl): - * <pre> - * {@code - * private FunctionFeed feed; - * - * //@Override - * protected void connectSensors() { - * super.connectSensors(); - * - * feed = FunctionFeed.builder() - * .entity(this) - * .poll(new FunctionPollConfig<Object, Boolean>(SERVICE_UP) - * .period(500, TimeUnit.MILLISECONDS) - * .callable(new Callable<Boolean>() { - * public Boolean call() throws Exception { - * return getDriver().isRunning(); - * } - * }) - * .onExceptionOrFailure(Functions.constant(Boolan.FALSE)) - * .build(); - * } - * - * {@literal @}Override - * protected void disconnectSensors() { - * super.disconnectSensors(); - * if (feed != null) feed.stop(); - * } - * } - * </pre> - * - * @author aled - */ -public class FunctionFeed extends AbstractFeed { - - private static final Logger log = LoggerFactory.getLogger(FunctionFeed.class); - - // Treat as immutable once built - @SuppressWarnings("serial") - public static final ConfigKey<SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?,?>>> POLLS = ConfigKeys.newConfigKey( - new TypeToken<SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?,?>>>() {}, - "polls"); - - public static Builder builder() { - return new Builder(); - } - - public static Builder builder(String uniqueTag) { - return new Builder().uniqueTag(uniqueTag); - } - - public static class Builder { - private EntityLocal entity; - private boolean onlyIfServiceUp = false; - private long period = 500; - private TimeUnit periodUnits = TimeUnit.MILLISECONDS; - private List<FunctionPollConfig<?,?>> polls = Lists.newArrayList(); - private String uniqueTag; - private volatile boolean built; - - public Builder entity(EntityLocal val) { - this.entity = val; - return this; - } - public Builder onlyIfServiceUp() { return onlyIfServiceUp(true); } - public Builder onlyIfServiceUp(boolean onlyIfServiceUp) { - this.onlyIfServiceUp = onlyIfServiceUp; - return this; - } - public Builder period(Duration d) { - return period(d.toMilliseconds(), TimeUnit.MILLISECONDS); - } - public Builder period(long millis) { - return period(millis, TimeUnit.MILLISECONDS); - } - public Builder period(long val, TimeUnit units) { - this.period = val; - this.periodUnits = units; - return this; - } - public Builder poll(FunctionPollConfig<?,?> config) { - polls.add(config); - return this; - } - public Builder uniqueTag(String uniqueTag) { - this.uniqueTag = uniqueTag; - return this; - } - public FunctionFeed build() { - built = true; - FunctionFeed result = new FunctionFeed(this); - result.setEntity(checkNotNull(entity, "entity")); - result.start(); - return result; - } - @Override - protected void finalize() { - if (!built) log.warn("FunctionFeed.Builder created, but build() never called"); - } - } - - private static class FunctionPollIdentifier { - final Callable<?> job; - - private FunctionPollIdentifier(Callable<?> job) { - this.job = checkNotNull(job, "job"); - } - - @Override - public int hashCode() { - return Objects.hashCode(job); - } - - @Override - public boolean equals(Object other) { - return (other instanceof FunctionPollIdentifier) && Objects.equal(job, ((FunctionPollIdentifier)other).job); - } - } - - /** - * For rebind; do not call directly; use builder - */ - public FunctionFeed() { - } - - protected FunctionFeed(Builder builder) { - setConfig(ONLY_IF_SERVICE_UP, builder.onlyIfServiceUp); - - SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?,?>> polls = HashMultimap.<FunctionPollIdentifier,FunctionPollConfig<?,?>>create(); - for (FunctionPollConfig<?,?> config : builder.polls) { - if (!config.isEnabled()) continue; - @SuppressWarnings({ "rawtypes", "unchecked" }) - FunctionPollConfig<?,?> configCopy = new FunctionPollConfig(config); - if (configCopy.getPeriod() < 0) configCopy.period(builder.period, builder.periodUnits); - Callable<?> job = config.getCallable(); - polls.put(new FunctionPollIdentifier(job), configCopy); - } - setConfig(POLLS, polls); - initUniqueTag(builder.uniqueTag, polls.values()); - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Override - protected void preStart() { - SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?, ?>> polls = getConfig(POLLS); - for (final FunctionPollIdentifier pollInfo : polls.keySet()) { - Set<FunctionPollConfig<?,?>> configs = polls.get(pollInfo); - long minPeriod = Integer.MAX_VALUE; - Set<AttributePollHandler<?>> handlers = Sets.newLinkedHashSet(); - - for (FunctionPollConfig<?,?> config : configs) { - handlers.add(new AttributePollHandler(config, entity, this)); - if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod()); - } - - getPoller().scheduleAtFixedRate( - (Callable)pollInfo.job, - new DelegatingPollHandler(handlers), - minPeriod); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/function/FunctionPollConfig.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/function/FunctionPollConfig.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/function/FunctionPollConfig.java deleted file mode 100644 index 7b91988..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/feed/function/FunctionPollConfig.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.feed.function; - -import static com.google.common.base.Preconditions.checkNotNull; -import groovy.lang.Closure; - -import java.util.concurrent.Callable; - -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.sensor.feed.FeedConfig; -import org.apache.brooklyn.sensor.feed.PollConfig; -import org.apache.brooklyn.util.groovy.GroovyJavaMethods; -import org.apache.brooklyn.util.guava.Functionals; -import org.apache.brooklyn.util.javalang.JavaClassNames; - -import com.google.common.base.Supplier; - -public class FunctionPollConfig<S, T> extends PollConfig<S, T, FunctionPollConfig<S, T>> { - - private Callable<?> callable; - - public static <T> FunctionPollConfig<?, T> forSensor(AttributeSensor<T> sensor) { - return new FunctionPollConfig<Object, T>(sensor); - } - - public FunctionPollConfig(AttributeSensor<T> sensor) { - super(sensor); - } - - public FunctionPollConfig(FunctionPollConfig<S, T> other) { - super(other); - callable = other.callable; - } - - public Callable<? extends Object> getCallable() { - return callable; - } - - /** - * The {@link Callable} to be invoked on each poll. - * <p> - * Note this <em>must</em> use generics, otherwise the return type of subsequent chained - * calls will (e.g. to {@link FeedConfig#onException(com.google.common.base.Function)} will - * return the wrong type. - */ - @SuppressWarnings("unchecked") - public <newS> FunctionPollConfig<newS, T> callable(Callable<? extends newS> val) { - this.callable = checkNotNull(val, "callable"); - return (FunctionPollConfig<newS, T>) this; - } - - /** - * Supplies the value to be returned by each poll. - * <p> - * Note this <em>must</em> use generics, otherwise the return type of subsequent chained - * calls will (e.g. to {@link FeedConfig#onException(com.google.common.base.Function)} will - * return the wrong type. - */ - @SuppressWarnings("unchecked") - public <newS> FunctionPollConfig<newS, T> supplier(final Supplier<? extends newS> val) { - this.callable = Functionals.callable( checkNotNull(val, "supplier") ); - return (FunctionPollConfig<newS, T>) this; - } - - /** @deprecated since 0.7.0, kept for legacy compatibility when deserializing */ - @SuppressWarnings({ "unchecked", "unused" }) - private <newS> FunctionPollConfig<newS, T> supplierLegacy(final Supplier<? extends newS> val) { - checkNotNull(val, "supplier"); - this.callable = new Callable<newS>() { - @Override - public newS call() throws Exception { - return val.get(); - } - }; - return (FunctionPollConfig<newS, T>) this; - } - - public FunctionPollConfig<S, T> closure(Closure<?> val) { - this.callable = GroovyJavaMethods.callableFromClosure(checkNotNull(val, "closure")); - return this; - } - - @Override protected String toStringBaseName() { return "fn"; } - @Override protected String toStringPollSource() { - if (callable==null) return null; - String cs = callable.toString(); - if (!cs.contains( ""+Integer.toHexString(callable.hashCode()) )) { - return cs; - } - // if hashcode is in callable it's probably a custom internal; return class name - return JavaClassNames.simpleClassName(callable); - } - -}
