Rename o.a.b.sensor.feed to o.a.b.feed and o.a.b.core.feed Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/daf40919 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/daf40919 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/daf40919
Branch: refs/heads/master Commit: daf40919b6fa20f16b6cd7d9eb1ad9f79baaa8f4 Parents: 2a78e27 Author: Aled Sage <[email protected]> Authored: Wed Aug 19 22:56:13 2015 +0100 Committer: Aled Sage <[email protected]> Committed: Wed Aug 19 22:56:13 2015 +0100 ---------------------------------------------------------------------- .../brooklyn/core/entity/AbstractEntity.java | 4 +- .../apache/brooklyn/core/feed/AbstractFeed.java | 240 ++++++ .../core/feed/AttributePollHandler.java | 248 +++++++ .../brooklyn/core/feed/ConfigToAttributes.java | 59 ++ .../core/feed/DelegatingPollHandler.java | 96 +++ .../apache/brooklyn/core/feed/FeedConfig.java | 297 ++++++++ .../apache/brooklyn/core/feed/PollConfig.java | 85 +++ .../apache/brooklyn/core/feed/PollHandler.java | 38 + .../org/apache/brooklyn/core/feed/Poller.java | 205 ++++++ .../mgmt/rebind/BasicEntityRebindSupport.java | 2 +- .../mgmt/rebind/BasicFeedRebindSupport.java | 2 +- .../core/mgmt/rebind/RebindIteration.java | 2 +- .../mgmt/rebind/dto/MementosGenerators.java | 2 +- .../sensor/AttributeSensorAndConfigKey.java | 2 +- .../brooklyn/core/sensor/HttpRequestSensor.java | 6 +- .../entity/group/DynamicMultiGroupImpl.java | 4 +- .../brooklyn/entity/stock/DataEntityImpl.java | 4 +- .../brooklyn/feed/function/FunctionFeed.java | 208 ++++++ .../feed/function/FunctionPollConfig.java | 111 +++ .../org/apache/brooklyn/feed/http/HttpFeed.java | 382 ++++++++++ .../brooklyn/feed/http/HttpPollConfig.java | 160 ++++ .../brooklyn/feed/http/HttpPollValue.java | 40 + .../apache/brooklyn/feed/http/HttpPolls.java | 39 + .../brooklyn/feed/http/HttpValueFunctions.java | 154 ++++ .../brooklyn/feed/http/JsonFunctions.java | 235 ++++++ .../apache/brooklyn/feed/shell/ShellFeed.java | 273 +++++++ .../brooklyn/feed/shell/ShellPollConfig.java | 125 ++++ .../org/apache/brooklyn/feed/ssh/SshFeed.java | 290 ++++++++ .../apache/brooklyn/feed/ssh/SshPollConfig.java | 142 ++++ .../apache/brooklyn/feed/ssh/SshPollValue.java | 60 ++ .../brooklyn/feed/ssh/SshValueFunctions.java | 73 ++ .../windows/WindowsPerformanceCounterFeed.java | 412 +++++++++++ .../WindowsPerformanceCounterPollConfig.java | 53 ++ .../brooklyn/sensor/feed/AbstractFeed.java | 240 ------ .../sensor/feed/AttributePollHandler.java | 248 ------- .../sensor/feed/ConfigToAttributes.java | 59 -- .../sensor/feed/DelegatingPollHandler.java | 96 --- .../apache/brooklyn/sensor/feed/FeedConfig.java | 297 -------- .../apache/brooklyn/sensor/feed/PollConfig.java | 85 --- .../brooklyn/sensor/feed/PollHandler.java | 38 - .../org/apache/brooklyn/sensor/feed/Poller.java | 205 ------ .../sensor/feed/function/FunctionFeed.java | 208 ------ .../feed/function/FunctionPollConfig.java | 111 --- .../brooklyn/sensor/feed/http/HttpFeed.java | 382 ---------- .../sensor/feed/http/HttpPollConfig.java | 160 ---- .../sensor/feed/http/HttpPollValue.java | 40 - .../brooklyn/sensor/feed/http/HttpPolls.java | 39 - .../sensor/feed/http/HttpValueFunctions.java | 154 ---- .../sensor/feed/http/JsonFunctions.java | 235 ------ .../brooklyn/sensor/feed/shell/ShellFeed.java | 273 ------- .../sensor/feed/shell/ShellPollConfig.java | 125 ---- .../brooklyn/sensor/feed/ssh/SshFeed.java | 290 -------- .../brooklyn/sensor/feed/ssh/SshPollConfig.java | 142 ---- .../brooklyn/sensor/feed/ssh/SshPollValue.java | 60 -- .../sensor/feed/ssh/SshValueFunctions.java | 73 -- .../windows/WindowsPerformanceCounterFeed.java | 412 ----------- .../WindowsPerformanceCounterPollConfig.java | 53 -- .../util/core/http/HttpToolResponse.java | 2 +- .../core/feed/ConfigToAttributesTest.java | 70 ++ .../apache/brooklyn/core/feed/PollerTest.java | 108 +++ .../core/location/TestPortSupplierLocation.java | 2 +- .../core/mgmt/rebind/RebindFeedTest.java | 16 +- .../feed/function/FunctionFeedTest.java | 315 ++++++++ .../feed/http/HttpFeedIntegrationTest.java | 160 ++++ .../apache/brooklyn/feed/http/HttpFeedTest.java | 392 ++++++++++ .../feed/http/HttpValueFunctionsTest.java | 94 +++ .../brooklyn/feed/http/JsonFunctionsTest.java | 130 ++++ .../feed/shell/ShellFeedIntegrationTest.java | 226 ++++++ .../feed/ssh/SshFeedIntegrationTest.java | 264 +++++++ .../WindowsPerformanceCounterFeedLiveTest.java | 104 +++ .../WindowsPerformanceCounterFeedTest.java | 132 ++++ .../sensor/feed/ConfigToAttributesTest.java | 70 -- .../apache/brooklyn/sensor/feed/PollerTest.java | 108 --- .../sensor/feed/function/FunctionFeedTest.java | 315 -------- .../feed/http/HttpFeedIntegrationTest.java | 160 ---- .../brooklyn/sensor/feed/http/HttpFeedTest.java | 392 ---------- .../feed/http/HttpValueFunctionsTest.java | 94 --- .../sensor/feed/http/JsonFunctionsTest.java | 130 ---- .../feed/shell/ShellFeedIntegrationTest.java | 226 ------ .../sensor/feed/ssh/SshFeedIntegrationTest.java | 264 ------- .../WindowsPerformanceCounterFeedLiveTest.java | 104 --- .../WindowsPerformanceCounterFeedTest.java | 132 ---- .../policy/enricher/HttpLatencyDetector.java | 6 +- .../entity/database/derby/DerbyDatabase.java | 2 +- .../entity/database/derby/DerbySchema.java | 6 +- .../postgresql/PostgreSqlNodeSaltImpl.java | 4 +- .../entity/salt/SaltStackMasterImpl.java | 3 +- .../entity/monitoring/zabbix/ZabbixFeed.java | 10 +- .../monitoring/zabbix/ZabbixPollConfig.java | 6 +- .../monitoring/zabbix/ZabbixServerImpl.java | 6 +- .../nosql/hazelcast/HazelcastNodeImpl.java | 6 +- .../brooklynnode/BrooklynClusterImpl.java | 4 +- .../brooklynnode/BrooklynEntityMirrorImpl.java | 4 +- .../entity/brooklynnode/BrooklynNodeImpl.java | 10 +- .../SetHighAvailabilityModeEffectorBody.java | 4 +- .../brooklyn/entity/chef/ChefAttributeFeed.java | 8 +- .../entity/chef/ChefAttributePollConfig.java | 2 +- .../brooklyn/entity/java/JavaAppUtils.java | 6 +- .../entity/java/JmxAttributeSensor.java | 6 +- .../apache/brooklyn/entity/java/JmxSupport.java | 2 +- .../entity/java/VanillaJavaAppImpl.java | 2 +- .../entity/machine/MachineEntityImpl.java | 6 +- .../base/AbstractSoftwareProcessSshDriver.java | 2 +- .../software/base/SoftwareProcessImpl.java | 4 +- .../MachineLifecycleEffectorTasks.java | 2 +- .../feed/jmx/JmxAttributePollConfig.java | 74 ++ .../org/apache/brooklyn/feed/jmx/JmxFeed.java | 423 +++++++++++ .../org/apache/brooklyn/feed/jmx/JmxHelper.java | 724 +++++++++++++++++++ .../feed/jmx/JmxNotificationFilters.java | 64 ++ .../jmx/JmxNotificationSubscriptionConfig.java | 95 +++ .../feed/jmx/JmxOperationPollConfig.java | 121 ++++ .../brooklyn/feed/jmx/JmxValueFunctions.java | 95 +++ .../sensor/feed/jmx/JmxAttributePollConfig.java | 74 -- .../brooklyn/sensor/feed/jmx/JmxFeed.java | 423 ----------- .../brooklyn/sensor/feed/jmx/JmxHelper.java | 724 ------------------- .../sensor/feed/jmx/JmxNotificationFilters.java | 64 -- .../jmx/JmxNotificationSubscriptionConfig.java | 95 --- .../sensor/feed/jmx/JmxOperationPollConfig.java | 121 ---- .../sensor/feed/jmx/JmxValueFunctions.java | 95 --- .../brooklyn/sensor/ssh/SshCommandSensor.java | 6 +- .../winrm/WindowsPerformanceCounterSensors.java | 2 +- .../BrooklynNodeIntegrationTest.java | 2 +- .../entity/brooklynnode/BrooklynNodeTest.java | 2 +- .../brooklynnode/SameBrooklynNodeImpl.java | 6 +- .../brooklynnode/SelectMasterEffectorTest.java | 6 +- .../mysql/ChefSoloDriverToyMySqlEntity.java | 4 +- .../brooklyn/entity/java/EntityPollingTest.java | 4 +- .../entity/java/VanillaJavaAppTest.java | 2 +- .../base/lifecycle/ScriptHelperTest.java | 4 +- .../software/base/test/jmx/JmxService.java | 2 +- .../apache/brooklyn/feed/jmx/JmxFeedTest.java | 422 +++++++++++ .../apache/brooklyn/feed/jmx/JmxHelperTest.java | 311 ++++++++ .../brooklyn/feed/jmx/RebindJmxFeedTest.java | 148 ++++ .../brooklyn/sensor/feed/jmx/JmxFeedTest.java | 422 ----------- .../brooklyn/sensor/feed/jmx/JmxHelperTest.java | 311 -------- .../sensor/feed/jmx/RebindJmxFeedTest.java | 148 ---- .../entity/database/crate/CrateNodeImpl.java | 8 +- .../database/mariadb/MariaDbNodeImpl.java | 6 +- .../entity/database/mysql/MySqlClusterImpl.java | 4 +- .../entity/database/mysql/MySqlNodeImpl.java | 6 +- .../PostgreSqlNodeChefImplFromScratch.java | 4 +- .../messaging/activemq/ActiveMQBrokerImpl.java | 4 +- .../activemq/ActiveMQDestinationImpl.java | 4 +- .../messaging/activemq/ActiveMQQueueImpl.java | 4 +- .../entity/messaging/kafka/KafkaBrokerImpl.java | 6 +- .../messaging/kafka/KafkaClusterImpl.java | 2 +- .../entity/messaging/qpid/QpidBrokerImpl.java | 6 +- .../messaging/qpid/QpidDestinationImpl.java | 4 +- .../entity/messaging/qpid/QpidQueueImpl.java | 4 +- .../entity/messaging/rabbit/RabbitQueue.java | 6 +- .../entity/messaging/storm/StormImpl.java | 4 +- .../entity/zookeeper/AbstractZooKeeperImpl.java | 6 +- .../entity/monitoring/monit/MonitNodeImpl.java | 6 +- .../nosql/cassandra/CassandraNodeImpl.java | 12 +- .../nosql/couchbase/CouchbaseClusterImpl.java | 8 +- .../nosql/couchbase/CouchbaseNodeImpl.java | 8 +- .../nosql/couchbase/CouchbaseNodeSshDriver.java | 2 +- .../couchbase/CouchbaseSyncGatewayImpl.java | 6 +- .../entity/nosql/couchdb/CouchDBNodeImpl.java | 6 +- .../elasticsearch/ElasticSearchNodeImpl.java | 8 +- .../entity/nosql/mongodb/MongoDBServerImpl.java | 4 +- .../mongodb/sharding/MongoDBRouterImpl.java | 4 +- .../entity/nosql/redis/RedisStoreImpl.java | 8 +- .../entity/nosql/riak/RiakNodeImpl.java | 6 +- .../entity/nosql/solr/SolrServerImpl.java | 6 +- .../ElasticSearchClusterIntegrationTest.java | 2 +- .../ElasticSearchNodeIntegrationTest.java | 2 +- .../entity/osgi/karaf/KarafContainerImpl.java | 10 +- .../entity/proxy/AbstractControllerImpl.java | 2 +- .../AbstractNonProvisionedControllerImpl.java | 2 +- .../entity/proxy/nginx/NginxControllerImpl.java | 8 +- .../ControlledDynamicWebAppClusterImpl.java | 2 +- .../entity/webapp/jboss/JBoss6ServerImpl.java | 4 +- .../entity/webapp/jboss/JBoss7ServerImpl.java | 6 +- .../entity/webapp/jetty/Jetty6ServerImpl.java | 4 +- .../webapp/nodejs/NodeJsWebAppServiceImpl.java | 8 +- .../entity/webapp/tomcat/TomcatServerImpl.java | 4 +- .../qa/load/SimulatedJBoss7ServerImpl.java | 10 +- .../qa/load/SimulatedMySqlNodeImpl.java | 4 +- .../qa/load/SimulatedNginxControllerImpl.java | 10 +- .../brooklynnode/DeployBlueprintTest.java | 2 +- 181 files changed, 8717 insertions(+), 8718 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java index 994961c..0ec5903 100644 --- a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java +++ b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java @@ -63,6 +63,8 @@ import org.apache.brooklyn.core.entity.internal.EntityConfigMap; import org.apache.brooklyn.core.entity.lifecycle.PolicyDescriptor; import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic; import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ServiceNotUpLogic; +import org.apache.brooklyn.core.feed.AbstractFeed; +import org.apache.brooklyn.core.feed.ConfigToAttributes; import org.apache.brooklyn.core.internal.BrooklynInitialization; import org.apache.brooklyn.core.internal.storage.BrooklynStorage; import org.apache.brooklyn.core.internal.storage.Reference; @@ -82,8 +84,6 @@ import org.apache.brooklyn.core.sensor.AttributeSensorAndConfigKey; import org.apache.brooklyn.core.sensor.BasicNotificationSensor; import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.sensor.enricher.AbstractEnricher; -import org.apache.brooklyn.sensor.feed.AbstractFeed; -import org.apache.brooklyn.sensor.feed.ConfigToAttributes; import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.collections.MutableSet; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java b/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java new file mode 100644 index 0000000..a31b73e --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.feed; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Collection; + +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.mgmt.rebind.RebindSupport; +import org.apache.brooklyn.api.mgmt.rebind.mementos.FeedMemento; +import org.apache.brooklyn.api.sensor.Feed; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.BrooklynFeatureEnablement; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.entity.EntityInternal; +import org.apache.brooklyn.core.mgmt.rebind.BasicFeedRebindSupport; +import org.apache.brooklyn.core.objs.AbstractEntityAdjunct; +import org.apache.brooklyn.util.javalang.JavaClassNames; +import org.apache.brooklyn.util.text.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Captures common fields and processes for sensor feeds. + * These generally poll or subscribe to get sensor values for an entity. + * They make it easy to poll over http, jmx, etc. + */ +public abstract class AbstractFeed extends AbstractEntityAdjunct implements Feed { + + private static final Logger log = LoggerFactory.getLogger(AbstractFeed.class); + + public static final ConfigKey<Boolean> ONLY_IF_SERVICE_UP = ConfigKeys.newBooleanConfigKey("feed.onlyIfServiceUp", "", false); + + private final Object pollerStateMutex = new Object(); + private transient volatile Poller<?> poller; + private transient volatile boolean activated; + private transient volatile boolean suspended; + + public AbstractFeed() { + } + + /** + * @deprecated since 0.7.0; use no-arg constructor; call {@link #setEntity(EntityLocal)} + */ + @Deprecated + public AbstractFeed(EntityLocal entity) { + this(entity, false); + } + + /** + * @deprecated since 0.7.0; use no-arg constructor; call {@link #setEntity(EntityLocal)} and {@code setConfig(ONLY_IF_SERVICE_UP, onlyIfServiceUp)} + */ + @Deprecated + public AbstractFeed(EntityLocal entity, boolean onlyIfServiceUp) { + this.entity = checkNotNull(entity, "entity"); + setConfig(ONLY_IF_SERVICE_UP, onlyIfServiceUp); + } + + // Ensure idempotent, as called in builders (in case not registered with entity), and also called + // when registering with entity + @Override + public void setEntity(EntityLocal entity) { + super.setEntity(entity); + if (BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_FEED_REGISTRATION_PROPERTY)) { + ((EntityInternal)entity).feeds().addFeed(this); + } + } + + protected void initUniqueTag(String uniqueTag, Object ...valsForDefault) { + if (Strings.isNonBlank(uniqueTag)) this.uniqueTag = uniqueTag; + else this.uniqueTag = getDefaultUniqueTag(valsForDefault); + } + + protected String getDefaultUniqueTag(Object ...valsForDefault) { + StringBuilder sb = new StringBuilder(); + sb.append(JavaClassNames.simpleClassName(this)); + if (valsForDefault.length==0) { + sb.append("@"); + sb.append(hashCode()); + } else if (valsForDefault.length==1 && valsForDefault[0] instanceof Collection){ + sb.append(Strings.toUniqueString(valsForDefault[0], 80)); + } else { + sb.append("["); + boolean first = true; + for (Object x: valsForDefault) { + if (!first) sb.append(";"); + else first = false; + sb.append(Strings.toUniqueString(x, 80)); + } + sb.append("]"); + } + return sb.toString(); + } + + @Override + public void start() { + if (log.isDebugEnabled()) log.debug("Starting feed {} for {}", this, entity); + if (activated) { + throw new IllegalStateException(String.format("Attempt to start feed %s of entity %s when already running", + this, entity)); + } + if (poller != null) { + throw new IllegalStateException(String.format("Attempt to re-start feed %s of entity %s", this, entity)); + } + + poller = new Poller<Object>(entity, getConfig(ONLY_IF_SERVICE_UP)); + activated = true; + preStart(); + synchronized (pollerStateMutex) { + // don't start poller if we are suspended + if (!suspended) { + poller.start(); + } + } + } + + @Override + public void suspend() { + synchronized (pollerStateMutex) { + if (activated && !suspended) { + poller.stop(); + } + suspended = true; + } + } + + @Override + public void resume() { + synchronized (pollerStateMutex) { + if (activated && suspended) { + poller.start(); + } + suspended = false; + } + } + + @Override + public void destroy() { + stop(); + } + + @Override + public void stop() { + if (!activated) { + log.debug("Ignoring attempt to stop feed {} of entity {} when not running", this, entity); + return; + } + if (log.isDebugEnabled()) log.debug("stopping feed {} for {}", this, entity); + + activated = false; + preStop(); + synchronized (pollerStateMutex) { + if (!suspended) { + poller.stop(); + } + } + postStop(); + super.destroy(); + } + + @Override + public boolean isActivated() { + return activated; + } + + public EntityLocal getEntity() { + return entity; + } + + protected boolean isConnected() { + // TODO Default impl will result in multiple logs for same error if becomes unreachable + // (e.g. if ssh gets NoRouteToHostException, then every AttributePollHandler for that + // feed will log.warn - so if polling for 10 sensors/attributes will get 10 log messages). + // Would be nice if reduced this logging duplication. + // (You can reduce it by providing a better 'isConnected' implementation of course.) + return isRunning() && entity!=null && !((EntityInternal)entity).getManagementSupport().isNoLongerManaged(); + } + + @Override + public boolean isSuspended() { + return suspended; + } + + @Override + public boolean isRunning() { + return isActivated() && !isSuspended() && !isDestroyed() && getPoller()!=null && getPoller().isRunning(); + } + + @Override + public RebindSupport<FeedMemento> getRebindSupport() { + return new BasicFeedRebindSupport(this); + } + + @Override + protected void onChanged() { + // TODO Auto-generated method stub + } + + /** + * For overriding. + */ + protected void preStart() { + } + + /** + * For overriding. + */ + protected void preStop() { + } + + /** + * For overriding. + */ + protected void postStop() { + } + + /** + * For overriding, where sub-class can change return-type generics! + */ + protected Poller<?> getPoller() { + return poller; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java b/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java new file mode 100644 index 0000000..c266836 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.feed; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.core.entity.Attributes; +import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.entity.EntityInternal; +import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; +import org.apache.brooklyn.core.entity.lifecycle.Lifecycle.Transition; +import org.apache.brooklyn.util.core.flags.TypeCoercions; +import org.apache.brooklyn.util.core.task.Tasks; +import org.apache.brooklyn.util.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Objects; + +/** + * Handler for when polling an entity's attribute. On each poll result the entity's attribute is set. + * + * Calls to onSuccess and onError will happen sequentially, but may be called from different threads + * each time. Note that no guarantees of a synchronized block exist, so additional synchronization + * would be required for the Java memory model's "happens before" relationship. + * + * @author aled + */ +public class AttributePollHandler<V> implements PollHandler<V> { + + public static final Logger log = LoggerFactory.getLogger(AttributePollHandler.class); + + private final FeedConfig<V,?,?> config; + private final EntityLocal entity; + @SuppressWarnings("rawtypes") + private final AttributeSensor sensor; + private final AbstractFeed feed; + private final boolean suppressDuplicates; + + // allow 30 seconds before logging at WARN, if there has been no success yet; + // after success WARN immediately + // TODO these should both be configurable + private Duration logWarningGraceTimeOnStartup = Duration.THIRTY_SECONDS; + private Duration logWarningGraceTime = Duration.millis(0); + + // internal state to look after whether to log warnings + private volatile Long lastSuccessTime = null; + private volatile Long currentProblemStartTime = null; + private volatile boolean currentProblemLoggedAsWarning = false; + private volatile boolean lastWasProblem = false; + + + public AttributePollHandler(FeedConfig<V,?,?> config, EntityLocal entity, AbstractFeed feed) { + this.config = checkNotNull(config, "config"); + this.entity = checkNotNull(entity, "entity"); + this.sensor = checkNotNull(config.getSensor(), "sensor"); + this.feed = checkNotNull(feed, "feed"); + this.suppressDuplicates = config.getSupressDuplicates(); + } + + @Override + public boolean checkSuccess(V val) { + // Always true if no checkSuccess predicate was configured. + return !config.hasCheckSuccessHandler() || config.getCheckSuccess().apply(val); + } + + @Override + public void onSuccess(V val) { + if (lastWasProblem) { + if (currentProblemLoggedAsWarning) { + log.info("Success (following previous problem) reading "+getBriefDescription()); + } else { + log.debug("Success (following previous problem) reading "+getBriefDescription()); + } + lastWasProblem = false; + currentProblemStartTime = null; + currentProblemLoggedAsWarning = false; + } + lastSuccessTime = System.currentTimeMillis(); + if (log.isTraceEnabled()) log.trace("poll for {} got: {}", new Object[] {getBriefDescription(), val}); + + try { + setSensor(transformValueOnSuccess(val)); + } catch (Exception e) { + if (feed.isConnected()) { + log.warn("unable to compute "+getBriefDescription()+"; on val="+val, e); + } else { + if (log.isDebugEnabled()) log.debug("unable to compute "+getBriefDescription()+"; val="+val+" (when inactive)", e); + } + } + } + + /** allows post-processing, such as applying a success handler; + * default applies the onSuccess handler (which is recommended) */ + protected Object transformValueOnSuccess(V val) { + return config.hasSuccessHandler() ? config.getOnSuccess().apply(val) : val; + } + + @Override + public void onFailure(V val) { + if (!config.hasFailureHandler()) { + onException(new Exception("checkSuccess of "+this+" for "+getBriefDescription()+" was false but poller has no failure handler")); + } else { + logProblem("failure", val); + + try { + setSensor(config.hasFailureHandler() ? config.getOnFailure().apply((V)val) : val); + } catch (Exception e) { + if (feed.isConnected()) { + log.warn("Error computing " + getBriefDescription() + "; val=" + val+": "+ e, e); + } else { + if (log.isDebugEnabled()) + log.debug("Error computing " + getBriefDescription() + "; val=" + val + " (when inactive)", e); + } + } + } + } + + @Override + public void onException(Exception exception) { + if (!feed.isConnected()) { + if (log.isTraceEnabled()) log.trace("Read of {} in {} gave exception (while not connected or not yet connected): {}", new Object[] {this, getBriefDescription(), exception}); + } else { + logProblem("exception", exception); + } + + if (config.hasExceptionHandler()) { + try { + setSensor( config.getOnException().apply(exception) ); + } catch (Exception e) { + if (feed.isConnected()) { + log.warn("unable to compute "+getBriefDescription()+"; on exception="+exception, e); + } else { + if (log.isDebugEnabled()) log.debug("unable to compute "+getBriefDescription()+"; exception="+exception+" (when inactive)", e); + } + } + } + } + + protected void logProblem(String type, Object val) { + if (lastWasProblem && currentProblemLoggedAsWarning) { + if (log.isTraceEnabled()) + log.trace("Recurring {} reading {} in {}: {}", new Object[] {type, this, getBriefDescription(), val}); + } else { + long nowTime = System.currentTimeMillis(); + // get a non-volatile value + Long currentProblemStartTimeCache = currentProblemStartTime; + long expiryTime = + (lastSuccessTime!=null && !isTransitioningOrStopped()) ? lastSuccessTime+logWarningGraceTime.toMilliseconds() : + currentProblemStartTimeCache!=null ? currentProblemStartTimeCache+logWarningGraceTimeOnStartup.toMilliseconds() : + nowTime+logWarningGraceTimeOnStartup.toMilliseconds(); + if (!lastWasProblem) { + if (expiryTime <= nowTime) { + currentProblemLoggedAsWarning = true; + if (entity==null || !Entities.isNoLongerManaged(entity)) { + log.warn("Read of " + getBriefDescription() + " gave " + type + ": " + val); + } else { + log.debug("Read of " + getBriefDescription() + " gave " + type + ": " + val); + } + if (log.isDebugEnabled() && val instanceof Throwable) + log.debug("Trace for "+type+" reading "+getBriefDescription()+": "+val, (Throwable)val); + } else { + if (log.isDebugEnabled()) + log.debug("Read of " + getBriefDescription() + " gave " + type + " (in grace period): " + val); + } + lastWasProblem = true; + currentProblemStartTime = nowTime; + } else { + if (expiryTime <= nowTime) { + currentProblemLoggedAsWarning = true; + log.warn("Read of " + getBriefDescription() + " gave " + type + + " (grace period expired, occurring for "+Duration.millis(nowTime - currentProblemStartTimeCache)+ + (config.hasExceptionHandler() ? "" : ", no exception handler set for sensor")+ + ")"+ + ": " + val); + if (log.isDebugEnabled() && val instanceof Throwable) + log.debug("Trace for "+type+" reading "+getBriefDescription()+": "+val, (Throwable)val); + } else { + if (log.isDebugEnabled()) + log.debug("Recurring {} reading {} in {} (still in grace period): {}", new Object[] {type, this, getBriefDescription(), val}); + } + } + } + } + + protected boolean isTransitioningOrStopped() { + if (entity==null) return false; + Transition expected = entity.getAttribute(Attributes.SERVICE_STATE_EXPECTED); + if (expected==null) return false; + return (expected.getState()==Lifecycle.STARTING || expected.getState()==Lifecycle.STOPPING || expected.getState()==Lifecycle.STOPPED); + } + + @SuppressWarnings("unchecked") + protected void setSensor(Object v) { + if (Entities.isNoLongerManaged(entity)) { + if (Tasks.isInterrupted()) return; + log.warn(""+entity+" is not managed; feed "+this+" setting "+sensor+" to "+v+" at this time is not supported ("+Tasks.current()+")"); + } + + if (v == FeedConfig.UNCHANGED) { + // nothing + } else if (v == FeedConfig.REMOVE) { + ((EntityInternal)entity).removeAttribute(sensor); + } else if (sensor == FeedConfig.NO_SENSOR) { + // nothing + } else { + Object coercedV = TypeCoercions.coerce(v, sensor.getType()); + if (suppressDuplicates && Objects.equal(coercedV, entity.getAttribute(sensor))) { + // no change; nothing + } else { + entity.setAttribute(sensor, coercedV); + } + } + } + + @Override + public String toString() { + return super.toString()+"["+getDescription()+"]"; + } + + @Override + public String getDescription() { + return sensor.getName()+" @ "+entity.getId()+" <- "+config; + } + + protected String getBriefDescription() { + return ""+entity+"->"+(sensor==FeedConfig.NO_SENSOR ? "(dynamic sensors)" : ""+sensor); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/feed/ConfigToAttributes.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/ConfigToAttributes.java b/core/src/main/java/org/apache/brooklyn/core/feed/ConfigToAttributes.java new file mode 100644 index 0000000..d455e80 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/feed/ConfigToAttributes.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.feed; + +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.mgmt.ManagementContext; +import org.apache.brooklyn.api.sensor.Sensor; +import org.apache.brooklyn.core.sensor.AttributeSensorAndConfigKey; +import org.apache.brooklyn.core.sensor.TemplatedStringAttributeSensorAndConfigKey; + + +/** Simple config adapter for setting {@link AttributeSensorAndConfigKey} sensor values from the config value or config default */ +public class ConfigToAttributes { + + //normally just applied once, statically, not registered... + public static void apply(EntityLocal entity) { + for (Sensor<?> it : entity.getEntityType().getSensors()) { + if (it instanceof AttributeSensorAndConfigKey) { + apply(entity, (AttributeSensorAndConfigKey<?,?>)it); + } + } + } + + /** + * Convenience for ensuring an individual sensor is set from its config key + * (e.g. sub-classes of DynamicWebAppCluster that don't want to set HTTP_PORT etc!) + */ + public static <T> T apply(EntityLocal entity, AttributeSensorAndConfigKey<?,T> key) { + T v = entity.getAttribute(key); + if (v!=null) return v; + v = key.getAsSensorValue(entity); + if (v!=null) entity.setAttribute(key, v); + return v; + } + + /** + * Convenience for transforming a config value (e.g. processing a {@link TemplatedStringAttributeSensorAndConfigKey}), + * outside of the context of an entity. + */ + public static <T> T transform(ManagementContext managementContext, AttributeSensorAndConfigKey<?,T> key) { + return key.getAsSensorValue(managementContext); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/feed/DelegatingPollHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/DelegatingPollHandler.java b/core/src/main/java/org/apache/brooklyn/core/feed/DelegatingPollHandler.java new file mode 100644 index 0000000..fae7dd6 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/feed/DelegatingPollHandler.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.feed; + +import java.util.List; + +import com.google.common.collect.ImmutableList; + +/** + * A poll handler that delegates each call to a set of poll handlers. + * + * @author aled + */ +public class DelegatingPollHandler<V> implements PollHandler<V> { + + private final List<AttributePollHandler<? super V>> delegates; + + public DelegatingPollHandler(Iterable<AttributePollHandler<? super V>> delegates) { + super(); + this.delegates = ImmutableList.copyOf(delegates); + } + + @Override + public boolean checkSuccess(V val) { + for (AttributePollHandler<? super V> delegate : delegates) { + if (!delegate.checkSuccess(val)) + return false; + } + return true; + } + + @Override + public void onSuccess(V val) { + for (AttributePollHandler<? super V> delegate : delegates) { + delegate.onSuccess(val); + } + } + + @Override + public void onFailure(V val) { + for (AttributePollHandler<? super V> delegate : delegates) { + delegate.onFailure(val); + } + } + + @Override + public void onException(Exception exception) { + for (AttributePollHandler<? super V> delegate : delegates) { + delegate.onException(exception); + } + } + + @Override + public String toString() { + return super.toString()+"["+getDescription()+"]"; + } + + @Override + public String getDescription() { + if (delegates.isEmpty()) + return "(empty delegate list)"; + if (delegates.size()==1) + return delegates.get(0).getDescription(); + StringBuilder sb = new StringBuilder(); + sb.append("["); + int count = 0; + for (AttributePollHandler<? super V> delegate : delegates) { + if (count>0) sb.append("; "); + sb.append(delegate.getDescription()); + if (count>2) { + sb.append("; ..."); + break; + } + count++; + } + sb.append("]"); + return sb.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/feed/FeedConfig.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/FeedConfig.java b/core/src/main/java/org/apache/brooklyn/core/feed/FeedConfig.java new file mode 100644 index 0000000..4d06680 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/feed/FeedConfig.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.feed; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.sensor.Sensors; +import org.apache.brooklyn.feed.http.HttpPollConfig; +import org.apache.brooklyn.util.collections.MutableList; +import org.apache.brooklyn.util.guava.Functionals; +import org.apache.brooklyn.util.javalang.JavaClassNames; +import org.apache.brooklyn.util.text.Strings; + +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.base.Objects; +import com.google.common.base.Predicate; + +/** + * Configuration for a poll, or a subscription etc, that is being added to a feed. + * + * @author aled + */ +public class FeedConfig<V, T, F extends FeedConfig<V, T, F>> { + + /** The onSuccess or onError functions can return this value to indicate that the sensor should not change. + * @deprecated since 0.7.0 use UNCHANGED */ + public static final Object UNSET = Entities.UNCHANGED; + /** The onSuccess or onError functions can return this value to indicate that the sensor should not change. */ + public static final Object UNCHANGED = Entities.UNCHANGED; + /** The onSuccess or onError functions can return this value to indicate that the sensor value should be removed + * (cf 'null', but useful in dynamic situations) */ + public static final Object REMOVE = Entities.REMOVE; + + /** Indicates that no sensor is being used here. This sensor is suppressed, + * but is useful where you want to use the feeds with custom success/exception/failure functions + * which directly set multiple sensors, e.g. dynamically based on the poll response. + * <p> + * See {@link HttpPollConfig#forMultiple()} and its usages. + * (It can work for any poll config, but conveniences have not been supplied for others.) */ + public static final AttributeSensor<Void> NO_SENSOR = Sensors.newSensor(Void.class, "brooklyn.no.sensor"); + + private final AttributeSensor<T> sensor; + private Function<? super V, T> onsuccess; + private Function<? super V, T> onfailure; + private Function<? super Exception, T> onexception; + private Predicate<? super V> checkSuccess; + private boolean suppressDuplicates; + private boolean enabled = true; + + public FeedConfig(AttributeSensor<T> sensor) { + this.sensor = checkNotNull(sensor, "sensor"); + } + + public FeedConfig(FeedConfig<V, T, F> other) { + this.sensor = other.sensor; + this.onsuccess = other.onsuccess; + this.onfailure = other.onfailure; + this.onexception = other.onexception; + this.checkSuccess = other.checkSuccess; + this.suppressDuplicates = other.suppressDuplicates; + this.enabled = other.enabled; + } + + @SuppressWarnings("unchecked") + protected F self() { + return (F) this; + } + + public AttributeSensor<T> getSensor() { + return sensor; + } + + public Predicate<? super V> getCheckSuccess() { + return checkSuccess; + } + + public Function<? super V, T> getOnSuccess() { + return onsuccess; + } + + public Function<? super V, T> getOnFailure() { + return onfailure; + } + + public Function<? super Exception, T> getOnException() { + return onexception; + } + + public boolean getSupressDuplicates() { + return suppressDuplicates; + } + + public boolean isEnabled() { + return enabled; + } + + /** sets the predicate used to check whether a feed run is successful */ + public F checkSuccess(Predicate<? super V> val) { + this.checkSuccess = checkNotNull(val, "checkSuccess"); + return self(); + } + /** as {@link #checkSuccess(Predicate)} */ + public F checkSuccess(final Function<? super V,Boolean> val) { + return checkSuccess(Functionals.predicate(val)); + } + @SuppressWarnings("unused") + /** @deprecated since 0.7.0, kept for rebind */ @Deprecated + private F checkSuccessLegacy(final Function<? super V,Boolean> val) { + return checkSuccess(new Predicate<V>() { + @Override + public boolean apply(V input) { + return val.apply(input); + } + }); + } + + public F onSuccess(Function<? super V,T> val) { + this.onsuccess = checkNotNull(val, "onSuccess"); + return self(); + } + + public F setOnSuccess(T val) { + return onSuccess(Functions.constant(val)); + } + + /** a failure is when the connection is fine (no exception) but the other end returns a result object V + * which the feed can tell indicates a failure (e.g. HTTP code 404) */ + public F onFailure(Function<? super V,T> val) { + this.onfailure = checkNotNull(val, "onFailure"); + return self(); + } + + public F setOnFailure(T val) { + return onFailure(Functions.constant(val)); + } + + /** registers a callback to be used {@link #onSuccess(Function)} and {@link #onFailure(Function)}, + * i.e. whenever a result comes back, but not in case of exceptions being thrown (ie problems communicating) */ + public F onResult(Function<? super V, T> val) { + onSuccess(val); + return onFailure(val); + } + + public F setOnResult(T val) { + return onResult(Functions.constant(val)); + } + + /** an exception is when there is an error in the communication */ + public F onException(Function<? super Exception,T> val) { + this.onexception = checkNotNull(val, "onException"); + return self(); + } + + public F setOnException(T val) { + return onException(Functions.constant(val)); + } + + /** convenience for indicating a behaviour to occur for both + * {@link #onException(Function)} + * (error connecting) and + * {@link #onFailure(Function)} + * (successful communication but failure report from remote end) */ + public F onFailureOrException(Function<Object,T> val) { + onFailure(val); + return onException(val); + } + + public F setOnFailureOrException(T val) { + return onFailureOrException(Functions.constant(val)); + } + + public F suppressDuplicates(boolean val) { + suppressDuplicates = val; + return self(); + } + + /** + * Whether this feed is enabled (defaulting to true). + */ + public F enabled(boolean val) { + enabled = val; + return self(); + } + + public boolean hasSuccessHandler() { + return this.onsuccess != null; + } + + public boolean hasFailureHandler() { + return this.onfailure != null; + } + + public boolean hasExceptionHandler() { + return this.onexception != null; + } + + public boolean hasCheckSuccessHandler() { + return this.checkSuccess != null; + } + + + @Override + public String toString() { + StringBuilder result = new StringBuilder(); + result.append(toStringBaseName()); + result.append("["); + boolean contents = false; + Object source = toStringPollSource(); + AttributeSensor<T> s = getSensor(); + if (Strings.isNonBlank(Strings.toString(source))) { + result.append(Strings.toUniqueString(source, 40)); + if (s!=null) { + result.append("->"); + result.append(s.getName()); + } + contents = true; + } else if (s!=null) { + result.append(s.getName()); + contents = true; + } + MutableList<Object> fields = toStringOtherFields(); + if (fields!=null) { + for (Object field: fields) { + if (Strings.isNonBlank(Strings.toString(field))) { + if (contents) result.append(";"); + contents = true; + result.append(field); + } + } + } + result.append("]"); + return result.toString(); + } + + /** can be overridden to supply a simpler base name than the class name */ + protected String toStringBaseName() { + return JavaClassNames.simpleClassName(this); + } + /** can be overridden to supply add'l info for the {@link #toString()}; subclasses can add to the returned value */ + protected MutableList<Object> toStringOtherFields() { + return MutableList.<Object>of(); + } + /** can be overridden to supply add'l info for the {@link #toString()}, placed before the sensor with -> */ + protected Object toStringPollSource() { + return null; + } + /** all configs should supply a unique tag element, inserted into the feed */ + protected String getUniqueTag() { + return toString(); + } + + /** returns fields which should be used for equality, including by default {@link #toStringOtherFields()} and {@link #toStringPollSource()}; + * subclasses can add to the returned value */ + protected MutableList<Object> equalsFields() { + MutableList<Object> result = MutableList.of().appendIfNotNull(getSensor()).appendIfNotNull(toStringPollSource()); + for (Object field: toStringOtherFields()) result.appendIfNotNull(field); + return result; + } + + @Override + public int hashCode() { + int hc = super.hashCode(); + for (Object f: equalsFields()) + hc = Objects.hashCode(hc, f); + return hc; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (!super.equals(obj)) return false; + PollConfig<?,?,?> other = (PollConfig<?,?,?>) obj; + if (!Objects.equal(getUniqueTag(), other.getUniqueTag())) return false; + if (!Objects.equal(equalsFields(), other.equalsFields())) return false; + return true; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java b/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java new file mode 100644 index 0000000..133431b --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.feed; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.concurrent.TimeUnit; + +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.util.collections.MutableList; +import org.apache.brooklyn.util.time.Duration; + +/** + * Configuration for polling, which is being added to a feed (e.g. to poll a given URL over http). + * + * @author aled + */ +public class PollConfig<V, T, F extends PollConfig<V, T, F>> extends FeedConfig<V, T, F> { + + private long period = -1; + private String description; + + public PollConfig(AttributeSensor<T> sensor) { + super(sensor); + } + + public PollConfig(PollConfig<V,T,F> other) { + super(other); + this.period = other.period; + } + + public long getPeriod() { + return period; + } + + public F period(Duration val) { + checkArgument(val.toMilliseconds() >= 0, "period must be greater than or equal to zero"); + this.period = val.toMilliseconds(); + return self(); + } + + public F period(long val) { + checkArgument(val >= 0, "period must be greater than or equal to zero"); + this.period = val; return self(); + } + + public F period(long val, TimeUnit units) { + checkArgument(val >= 0, "period must be greater than or equal to zero"); + return period(units.toMillis(val)); + } + + public F description(String description) { + this.description = description; + return self(); + } + + public String getDescription() { + return description; + } + + @Override protected MutableList<Object> toStringOtherFields() { + return super.toStringOtherFields().appendIfNotNull(description); + } + + @Override + protected MutableList<Object> equalsFields() { + return super.equalsFields().appendIfNotNull(period); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/feed/PollHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/PollHandler.java b/core/src/main/java/org/apache/brooklyn/core/feed/PollHandler.java new file mode 100644 index 0000000..a63ebde --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/feed/PollHandler.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.feed; + +/** + * Notified by the Poller of the result for each job, on each poll. + * + * @author aled + */ +public interface PollHandler<V> { + + public boolean checkSuccess(V val); + + public void onSuccess(V val); + + public void onFailure(V val); + + public void onException(Exception exception); + + public String getDescription(); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java new file mode 100644 index 0000000..fd50ebd --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.feed; + +import java.util.LinkedHashSet; +import java.util.Set; +import java.util.concurrent.Callable; + +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.core.entity.Attributes; +import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.entity.EntityInternal; +import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.task.DynamicSequentialTask; +import org.apache.brooklyn.util.core.task.ScheduledTask; +import org.apache.brooklyn.util.core.task.Tasks; +import org.apache.brooklyn.util.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Objects; + + +/** + * For executing periodic polls. + * Jobs are added to the schedule, and then the poller is started. + * The jobs will then be executed periodically, and the handler called for the result/failure. + * + * Assumes the schedule+start will be done single threaded, and that stop will not be done concurrently. + */ +public class Poller<V> { + public static final Logger log = LoggerFactory.getLogger(Poller.class); + + private final EntityLocal entity; + private final boolean onlyIfServiceUp; + private final Set<Callable<?>> oneOffJobs = new LinkedHashSet<Callable<?>>(); + private final Set<PollJob<V>> pollJobs = new LinkedHashSet<PollJob<V>>(); + private final Set<Task<?>> oneOffTasks = new LinkedHashSet<Task<?>>(); + private final Set<ScheduledTask> tasks = new LinkedHashSet<ScheduledTask>(); + private volatile boolean started = false; + + private static class PollJob<V> { + final PollHandler<? super V> handler; + final Duration pollPeriod; + final Runnable wrappedJob; + private boolean loggedPreviousException = false; + + PollJob(final Callable<V> job, final PollHandler<? super V> handler, Duration period) { + this.handler = handler; + this.pollPeriod = period; + + wrappedJob = new Runnable() { + public void run() { + try { + V val = job.call(); + loggedPreviousException = false; + if (handler.checkSuccess(val)) { + handler.onSuccess(val); + } else { + handler.onFailure(val); + } + } catch (Exception e) { + if (loggedPreviousException) { + if (log.isTraceEnabled()) log.trace("PollJob for {}, repeated consecutive failures, handling {} using {}", new Object[] {job, e, handler}); + } else { + if (log.isDebugEnabled()) log.debug("PollJob for {} handling {} using {}", new Object[] {job, e, handler}); + loggedPreviousException = true; + } + handler.onException(e); + } + } + }; + } + } + + /** @deprecated since 0.7.0, pass in whether should run onlyIfServiceUp */ + @Deprecated + public Poller(EntityLocal entity) { + this(entity, false); + } + public Poller(EntityLocal entity, boolean onlyIfServiceUp) { + this.entity = entity; + this.onlyIfServiceUp = onlyIfServiceUp; + } + + /** Submits a one-off poll job; recommended that callers supply to-String so that task has a decent description */ + public void submit(Callable<?> job) { + if (started) { + throw new IllegalStateException("Cannot submit additional tasks after poller has started"); + } + oneOffJobs.add(job); + } + + public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> handler, long period) { + scheduleAtFixedRate(job, handler, Duration.millis(period)); + } + public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> handler, Duration period) { + if (started) { + throw new IllegalStateException("Cannot schedule additional tasks after poller has started"); + } + PollJob<V> foo = new PollJob<V>(job, handler, period); + pollJobs.add(foo); + } + + @SuppressWarnings({ "unchecked" }) + public void start() { + // TODO Previous incarnation of this logged this logged polledSensors.keySet(), but we don't know that anymore + // Is that ok, are can we do better? + + if (log.isDebugEnabled()) log.debug("Starting poll for {} (using {})", new Object[] {entity, this}); + if (started) { + throw new IllegalStateException(String.format("Attempt to start poller %s of entity %s when already running", + this, entity)); + } + + started = true; + + for (final Callable<?> oneOffJob : oneOffJobs) { + Task<?> task = Tasks.builder().dynamic(false).body((Callable<Object>) oneOffJob).name("Poll").description("One-time poll job "+oneOffJob).build(); + oneOffTasks.add(((EntityInternal)entity).getExecutionContext().submit(task)); + } + + for (final PollJob<V> pollJob : pollJobs) { + final String scheduleName = pollJob.handler.getDescription(); + if (pollJob.pollPeriod.compareTo(Duration.ZERO) > 0) { + Callable<Task<?>> pollingTaskFactory = new Callable<Task<?>>() { + public Task<?> call() { + DynamicSequentialTask<Void> task = new DynamicSequentialTask<Void>(MutableMap.of("displayName", scheduleName, "entity", entity), + new Callable<Void>() { public Void call() { + if (onlyIfServiceUp && !Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP))) { + return null; + } + pollJob.wrappedJob.run(); + return null; + } } ); + BrooklynTaskTags.setTransient(task); + return task; + } + }; + ScheduledTask task = new ScheduledTask(MutableMap.of("period", pollJob.pollPeriod, "displayName", "scheduled:"+scheduleName), pollingTaskFactory); + tasks.add((ScheduledTask)Entities.submit(entity, task)); + } else { + if (log.isDebugEnabled()) log.debug("Activating poll (but leaving off, as period {}) for {} (using {})", new Object[] {pollJob.pollPeriod, entity, this}); + } + } + } + + public void stop() { + if (log.isDebugEnabled()) log.debug("Stopping poll for {} (using {})", new Object[] {entity, this}); + if (!started) { + throw new IllegalStateException(String.format("Attempt to stop poller %s of entity %s when not running", + this, entity)); + } + + started = false; + for (Task<?> task : oneOffTasks) { + if (task != null) task.cancel(true); + } + for (ScheduledTask task : tasks) { + if (task != null) task.cancel(); + } + oneOffTasks.clear(); + tasks.clear(); + } + + public boolean isRunning() { + boolean hasActiveTasks = false; + for (Task<?> task: tasks) { + if (task.isBegun() && !task.isDone()) { + hasActiveTasks = true; + break; + } + } + if (!started && hasActiveTasks) { + log.warn("Poller should not be running, but has active tasks, tasks: "+tasks); + } + return started && hasActiveTasks; + } + + protected boolean isEmpty() { + return pollJobs.isEmpty(); + } + + public String toString() { + return Objects.toStringHelper(this).add("entity", entity).toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java index 2e4a971..2a5e92e 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java @@ -34,11 +34,11 @@ import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.entity.AbstractEntity; import org.apache.brooklyn.core.entity.EntityInternal; +import org.apache.brooklyn.core.feed.AbstractFeed; import org.apache.brooklyn.core.mgmt.rebind.dto.MementosGenerators; import org.apache.brooklyn.core.policy.AbstractPolicy; import org.apache.brooklyn.entity.group.AbstractGroupImpl; import org.apache.brooklyn.sensor.enricher.AbstractEnricher; -import org.apache.brooklyn.sensor.feed.AbstractFeed; import org.apache.brooklyn.util.exceptions.Exceptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicFeedRebindSupport.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicFeedRebindSupport.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicFeedRebindSupport.java index 479fbbf..4630be1 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicFeedRebindSupport.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicFeedRebindSupport.java @@ -20,7 +20,7 @@ package org.apache.brooklyn.core.mgmt.rebind; import org.apache.brooklyn.api.mgmt.rebind.RebindContext; import org.apache.brooklyn.api.mgmt.rebind.mementos.FeedMemento; -import org.apache.brooklyn.sensor.feed.AbstractFeed; +import org.apache.brooklyn.core.feed.AbstractFeed; import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.core.flags.FlagUtils; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java index c3e8030..e9478ef 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java @@ -67,6 +67,7 @@ import org.apache.brooklyn.core.catalog.internal.CatalogUtils; import org.apache.brooklyn.core.entity.AbstractApplication; import org.apache.brooklyn.core.entity.AbstractEntity; import org.apache.brooklyn.core.entity.EntityInternal; +import org.apache.brooklyn.core.feed.AbstractFeed; import org.apache.brooklyn.core.location.AbstractLocation; import org.apache.brooklyn.core.location.internal.LocationInternal; import org.apache.brooklyn.core.mgmt.classloading.BrooklynClassLoadingContext; @@ -86,7 +87,6 @@ import org.apache.brooklyn.core.objs.proxy.InternalLocationFactory; import org.apache.brooklyn.core.objs.proxy.InternalPolicyFactory; import org.apache.brooklyn.core.policy.AbstractPolicy; import org.apache.brooklyn.sensor.enricher.AbstractEnricher; -import org.apache.brooklyn.sensor.feed.AbstractFeed; import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.core.flags.FlagUtils; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java index 761341b..929b63c 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java @@ -51,6 +51,7 @@ import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.catalog.internal.CatalogItemDo; import org.apache.brooklyn.core.entity.EntityDynamicType; import org.apache.brooklyn.core.entity.EntityInternal; +import org.apache.brooklyn.core.feed.AbstractFeed; import org.apache.brooklyn.core.location.internal.LocationInternal; import org.apache.brooklyn.core.mgmt.persist.BrooklynPersistenceUtils; import org.apache.brooklyn.core.mgmt.rebind.AbstractBrooklynObjectRebindSupport; @@ -58,7 +59,6 @@ import org.apache.brooklyn.core.mgmt.rebind.TreeUtils; import org.apache.brooklyn.core.objs.BrooklynTypes; import org.apache.brooklyn.core.policy.AbstractPolicy; import org.apache.brooklyn.sensor.enricher.AbstractEnricher; -import org.apache.brooklyn.sensor.feed.AbstractFeed; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.core.flags.FlagUtils; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeSensorAndConfigKey.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeSensorAndConfigKey.java b/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeSensorAndConfigKey.java index 940d949..f76baaa 100644 --- a/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeSensorAndConfigKey.java +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeSensorAndConfigKey.java @@ -27,7 +27,7 @@ import org.apache.brooklyn.core.config.BasicConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.entity.AbstractEntity; import org.apache.brooklyn.core.entity.BrooklynConfigKeys; -import org.apache.brooklyn.sensor.feed.ConfigToAttributes; +import org.apache.brooklyn.core.feed.ConfigToAttributes; import org.apache.brooklyn.util.core.flags.TypeCoercions; import org.apache.brooklyn.util.exceptions.Exceptions; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/sensor/HttpRequestSensor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/HttpRequestSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/HttpRequestSensor.java index 79660ce..542fc01 100644 --- a/core/src/main/java/org/apache/brooklyn/core/sensor/HttpRequestSensor.java +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/HttpRequestSensor.java @@ -26,9 +26,9 @@ import org.apache.brooklyn.api.entity.EntityLocal; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.effector.AddSensor; -import org.apache.brooklyn.sensor.feed.http.HttpFeed; -import org.apache.brooklyn.sensor.feed.http.HttpPollConfig; -import org.apache.brooklyn.sensor.feed.http.HttpValueFunctions; +import org.apache.brooklyn.feed.http.HttpFeed; +import org.apache.brooklyn.feed.http.HttpPollConfig; +import org.apache.brooklyn.feed.http.HttpValueFunctions; import org.apache.brooklyn.util.core.config.ConfigBag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java index a6880b7..e914bd2 100644 --- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java +++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java @@ -30,8 +30,8 @@ import org.apache.brooklyn.api.entity.EntitySpec; import org.apache.brooklyn.api.entity.Group; import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.core.entity.Entities; -import org.apache.brooklyn.sensor.feed.function.FunctionFeed; -import org.apache.brooklyn.sensor.feed.function.FunctionPollConfig; +import org.apache.brooklyn.feed.function.FunctionFeed; +import org.apache.brooklyn.feed.function.FunctionPollConfig; import org.apache.brooklyn.util.collections.MutableMap; import com.google.common.base.Function; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/entity/stock/DataEntityImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/entity/stock/DataEntityImpl.java b/core/src/main/java/org/apache/brooklyn/entity/stock/DataEntityImpl.java index af34e0d..7f10d5b 100644 --- a/core/src/main/java/org/apache/brooklyn/entity/stock/DataEntityImpl.java +++ b/core/src/main/java/org/apache/brooklyn/entity/stock/DataEntityImpl.java @@ -24,8 +24,8 @@ import java.util.Map; import org.apache.brooklyn.api.location.Location; import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.core.entity.AbstractEntity; -import org.apache.brooklyn.sensor.feed.function.FunctionFeed; -import org.apache.brooklyn.sensor.feed.function.FunctionPollConfig; +import org.apache.brooklyn.feed.function.FunctionFeed; +import org.apache.brooklyn.feed.function.FunctionPollConfig; import com.google.common.base.Functions; import com.google.common.base.Supplier; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java new file mode 100644 index 0000000..55db890 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.feed.function; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.feed.AbstractFeed; +import org.apache.brooklyn.core.feed.AttributePollHandler; +import org.apache.brooklyn.core.feed.DelegatingPollHandler; +import org.apache.brooklyn.util.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Objects; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.SetMultimap; +import com.google.common.collect.Sets; +import com.google.common.reflect.TypeToken; + +/** + * Provides a feed of attribute values, by periodically invoking functions. + * + * Example usage (e.g. in an entity that extends SoftwareProcessImpl): + * <pre> + * {@code + * private FunctionFeed feed; + * + * //@Override + * protected void connectSensors() { + * super.connectSensors(); + * + * feed = FunctionFeed.builder() + * .entity(this) + * .poll(new FunctionPollConfig<Object, Boolean>(SERVICE_UP) + * .period(500, TimeUnit.MILLISECONDS) + * .callable(new Callable<Boolean>() { + * public Boolean call() throws Exception { + * return getDriver().isRunning(); + * } + * }) + * .onExceptionOrFailure(Functions.constant(Boolan.FALSE)) + * .build(); + * } + * + * {@literal @}Override + * protected void disconnectSensors() { + * super.disconnectSensors(); + * if (feed != null) feed.stop(); + * } + * } + * </pre> + * + * @author aled + */ +public class FunctionFeed extends AbstractFeed { + + private static final Logger log = LoggerFactory.getLogger(FunctionFeed.class); + + // Treat as immutable once built + @SuppressWarnings("serial") + public static final ConfigKey<SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?,?>>> POLLS = ConfigKeys.newConfigKey( + new TypeToken<SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?,?>>>() {}, + "polls"); + + public static Builder builder() { + return new Builder(); + } + + public static Builder builder(String uniqueTag) { + return new Builder().uniqueTag(uniqueTag); + } + + public static class Builder { + private EntityLocal entity; + private boolean onlyIfServiceUp = false; + private long period = 500; + private TimeUnit periodUnits = TimeUnit.MILLISECONDS; + private List<FunctionPollConfig<?,?>> polls = Lists.newArrayList(); + private String uniqueTag; + private volatile boolean built; + + public Builder entity(EntityLocal val) { + this.entity = val; + return this; + } + public Builder onlyIfServiceUp() { return onlyIfServiceUp(true); } + public Builder onlyIfServiceUp(boolean onlyIfServiceUp) { + this.onlyIfServiceUp = onlyIfServiceUp; + return this; + } + public Builder period(Duration d) { + return period(d.toMilliseconds(), TimeUnit.MILLISECONDS); + } + public Builder period(long millis) { + return period(millis, TimeUnit.MILLISECONDS); + } + public Builder period(long val, TimeUnit units) { + this.period = val; + this.periodUnits = units; + return this; + } + public Builder poll(FunctionPollConfig<?,?> config) { + polls.add(config); + return this; + } + public Builder uniqueTag(String uniqueTag) { + this.uniqueTag = uniqueTag; + return this; + } + public FunctionFeed build() { + built = true; + FunctionFeed result = new FunctionFeed(this); + result.setEntity(checkNotNull(entity, "entity")); + result.start(); + return result; + } + @Override + protected void finalize() { + if (!built) log.warn("FunctionFeed.Builder created, but build() never called"); + } + } + + private static class FunctionPollIdentifier { + final Callable<?> job; + + private FunctionPollIdentifier(Callable<?> job) { + this.job = checkNotNull(job, "job"); + } + + @Override + public int hashCode() { + return Objects.hashCode(job); + } + + @Override + public boolean equals(Object other) { + return (other instanceof FunctionPollIdentifier) && Objects.equal(job, ((FunctionPollIdentifier)other).job); + } + } + + /** + * For rebind; do not call directly; use builder + */ + public FunctionFeed() { + } + + protected FunctionFeed(Builder builder) { + setConfig(ONLY_IF_SERVICE_UP, builder.onlyIfServiceUp); + + SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?,?>> polls = HashMultimap.<FunctionPollIdentifier,FunctionPollConfig<?,?>>create(); + for (FunctionPollConfig<?,?> config : builder.polls) { + if (!config.isEnabled()) continue; + @SuppressWarnings({ "rawtypes", "unchecked" }) + FunctionPollConfig<?,?> configCopy = new FunctionPollConfig(config); + if (configCopy.getPeriod() < 0) configCopy.period(builder.period, builder.periodUnits); + Callable<?> job = config.getCallable(); + polls.put(new FunctionPollIdentifier(job), configCopy); + } + setConfig(POLLS, polls); + initUniqueTag(builder.uniqueTag, polls.values()); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + protected void preStart() { + SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?, ?>> polls = getConfig(POLLS); + for (final FunctionPollIdentifier pollInfo : polls.keySet()) { + Set<FunctionPollConfig<?,?>> configs = polls.get(pollInfo); + long minPeriod = Integer.MAX_VALUE; + Set<AttributePollHandler<?>> handlers = Sets.newLinkedHashSet(); + + for (FunctionPollConfig<?,?> config : configs) { + handlers.add(new AttributePollHandler(config, entity, this)); + if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod()); + } + + getPoller().scheduleAtFixedRate( + (Callable)pollInfo.job, + new DelegatingPollHandler(handlers), + minPeriod); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/function/FunctionPollConfig.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/feed/function/FunctionPollConfig.java b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionPollConfig.java new file mode 100644 index 0000000..4951868 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionPollConfig.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.feed.function; + +import static com.google.common.base.Preconditions.checkNotNull; +import groovy.lang.Closure; + +import java.util.concurrent.Callable; + +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.core.feed.FeedConfig; +import org.apache.brooklyn.core.feed.PollConfig; +import org.apache.brooklyn.util.groovy.GroovyJavaMethods; +import org.apache.brooklyn.util.guava.Functionals; +import org.apache.brooklyn.util.javalang.JavaClassNames; + +import com.google.common.base.Supplier; + +public class FunctionPollConfig<S, T> extends PollConfig<S, T, FunctionPollConfig<S, T>> { + + private Callable<?> callable; + + public static <T> FunctionPollConfig<?, T> forSensor(AttributeSensor<T> sensor) { + return new FunctionPollConfig<Object, T>(sensor); + } + + public FunctionPollConfig(AttributeSensor<T> sensor) { + super(sensor); + } + + public FunctionPollConfig(FunctionPollConfig<S, T> other) { + super(other); + callable = other.callable; + } + + public Callable<? extends Object> getCallable() { + return callable; + } + + /** + * The {@link Callable} to be invoked on each poll. + * <p> + * Note this <em>must</em> use generics, otherwise the return type of subsequent chained + * calls will (e.g. to {@link FeedConfig#onException(com.google.common.base.Function)} will + * return the wrong type. + */ + @SuppressWarnings("unchecked") + public <newS> FunctionPollConfig<newS, T> callable(Callable<? extends newS> val) { + this.callable = checkNotNull(val, "callable"); + return (FunctionPollConfig<newS, T>) this; + } + + /** + * Supplies the value to be returned by each poll. + * <p> + * Note this <em>must</em> use generics, otherwise the return type of subsequent chained + * calls will (e.g. to {@link FeedConfig#onException(com.google.common.base.Function)} will + * return the wrong type. + */ + @SuppressWarnings("unchecked") + public <newS> FunctionPollConfig<newS, T> supplier(final Supplier<? extends newS> val) { + this.callable = Functionals.callable( checkNotNull(val, "supplier") ); + return (FunctionPollConfig<newS, T>) this; + } + + /** @deprecated since 0.7.0, kept for legacy compatibility when deserializing */ + @SuppressWarnings({ "unchecked", "unused" }) + private <newS> FunctionPollConfig<newS, T> supplierLegacy(final Supplier<? extends newS> val) { + checkNotNull(val, "supplier"); + this.callable = new Callable<newS>() { + @Override + public newS call() throws Exception { + return val.get(); + } + }; + return (FunctionPollConfig<newS, T>) this; + } + + public FunctionPollConfig<S, T> closure(Closure<?> val) { + this.callable = GroovyJavaMethods.callableFromClosure(checkNotNull(val, "closure")); + return this; + } + + @Override protected String toStringBaseName() { return "fn"; } + @Override protected String toStringPollSource() { + if (callable==null) return null; + String cs = callable.toString(); + if (!cs.contains( ""+Integer.toHexString(callable.hashCode()) )) { + return cs; + } + // if hashcode is in callable it's probably a custom internal; return class name + return JavaClassNames.simpleClassName(callable); + } + +}
