Repository: incubator-brooklyn Updated Branches: refs/heads/master 17baa8b7a -> 9657f5f3b
sensors at riak cluster level and other minor tidies for riak and couchbase Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/33f1e49e Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/33f1e49e Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/33f1e49e Branch: refs/heads/master Commit: 33f1e49e5919ce58f600336f9929947be2778e7d Parents: 2556b29 Author: Alex Heneveld <[email protected]> Authored: Fri Jun 19 08:31:19 2015 -0700 Committer: Alex Heneveld <[email protected]> Committed: Fri Jun 19 08:32:53 2015 -0700 ---------------------------------------------------------------------- .../nosql/couchbase/CouchbaseClusterImpl.java | 1 + .../nosql/couchbase/CouchbaseNodeImpl.java | 3 -- .../brooklyn/entity/nosql/riak/RiakCluster.java | 4 ++ .../entity/nosql/riak/RiakClusterImpl.java | 55 +++++++++++++++++--- .../brooklyn/entity/nosql/riak/RiakNode.java | 11 ++-- .../entity/nosql/riak/RiakNodeImpl.java | 4 ++ 6 files changed, 65 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/33f1e49e/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java index dcdafb6..5c47fe7 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java @@ -414,6 +414,7 @@ public class CouchbaseClusterImpl extends DynamicClusterImpl implements Couchbas if (config().getLocalRaw(UP_QUORUM_CHECK).isAbsent()) { // TODO Only leaving CouchbaseQuorumCheck here in case it is contained in persisted state. // If so, need a transformer and then to delete it + @SuppressWarnings({ "unused", "hiding" }) @Deprecated class CouchbaseQuorumCheck implements QuorumCheck { @Override http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/33f1e49e/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java index e4b1c0a..d7439ca 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java @@ -21,7 +21,6 @@ package brooklyn.entity.nosql.couchbase; import static java.lang.String.format; import java.net.URI; -import java.nio.charset.Charset; import java.util.Collection; import java.util.Map; import java.util.Set; @@ -37,7 +36,6 @@ import brooklyn.entity.effector.EffectorBody; import brooklyn.event.AttributeSensor; import brooklyn.event.SensorEvent; import brooklyn.event.SensorEventListener; -import brooklyn.event.basic.DependentConfiguration; import brooklyn.event.feed.http.HttpFeed; import brooklyn.event.feed.http.HttpPollConfig; import brooklyn.event.feed.http.HttpValueFunctions; @@ -55,7 +53,6 @@ import brooklyn.util.guava.TypeTokens; import brooklyn.util.http.HttpTool; import brooklyn.util.http.HttpToolResponse; import brooklyn.util.net.Urls; -import brooklyn.util.task.DynamicTasks; import brooklyn.util.task.Tasks; import brooklyn.util.text.Strings; import brooklyn.util.time.Duration; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/33f1e49e/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java index 41aa6be..876ebde 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java @@ -58,4 +58,8 @@ public interface RiakCluster extends DynamicCluster { AttributeSensor<URI> RIAK_CONSOLE_URI = Attributes.MAIN_URI; + AttributeSensor<Integer> NODE_GETS_1MIN_PER_NODE = Sensors.newIntegerSensor("riak.node.gets.1m.perNode", "Gets in the last minute, averaged across cluster"); + AttributeSensor<Integer> NODE_PUTS_1MIN_PER_NODE = Sensors.newIntegerSensor("riak.node.puts.1m.perNode", "Puts in the last minute, averaged across cluster"); + AttributeSensor<Integer> NODE_OPS_1MIN_PER_NODE = Sensors.newIntegerSensor("riak.node.ops.1m.perNode", "Sum of node gets and puts in the last minute, averaged across cluster"); + } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/33f1e49e/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java index 637346e..7b256c0 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java @@ -41,10 +41,11 @@ import brooklyn.entity.group.AbstractMembershipTrackingPolicy; import brooklyn.entity.group.DynamicClusterImpl; import brooklyn.entity.proxying.EntitySpec; import brooklyn.entity.trait.Startable; +import brooklyn.event.AttributeSensor; import brooklyn.event.basic.DependentConfiguration; -import brooklyn.location.Location; import brooklyn.policy.EnricherSpec; import brooklyn.policy.PolicySpec; +import brooklyn.util.task.Tasks; import brooklyn.util.time.Duration; import brooklyn.util.time.Time; @@ -53,6 +54,7 @@ import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -71,13 +73,20 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster { } @Override - public void start(Collection<? extends Location> locations) { - super.start(locations); + protected void doStart() { + super.doStart(); connectSensors(); - Time.sleep(getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER)); + try { + Duration delay = getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER); + Tasks.setBlockingDetails("Sleeping for "+delay+" before advertising cluster available"); + Time.sleep(delay); + } finally { + Tasks.resetBlockingDetails(); + } //FIXME: add a quorum to tolerate failed nodes before setting on fire. + @SuppressWarnings("unchecked") Optional<Entity> anyNode = Iterables.tryFind(getMembers(), Predicates.and( Predicates.instanceOf(RiakNode.class), EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true), @@ -91,8 +100,9 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster { } protected EntitySpec<?> getMemberSpec() { - return getConfig(MEMBER_SPEC, EntitySpec.create(RiakNode.class)); - + EntitySpec<?> result = config().get(MEMBER_SPEC); + if (result!=null) return result; + return EntitySpec.create(RiakNode.class); } protected void connectSensors() { @@ -112,6 +122,38 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster { .fromMembers() .build(); addEnricher(first); + + Map<? extends AttributeSensor<? extends Number>, ? extends AttributeSensor<? extends Number>> enricherSetup = + ImmutableMap.<AttributeSensor<? extends Number>, AttributeSensor<? extends Number>>builder() + .put(RiakNode.NODE_PUTS, RiakCluster.NODE_PUTS_1MIN_PER_NODE) + .put(RiakNode.NODE_GETS, RiakCluster.NODE_GETS_1MIN_PER_NODE) + .put(RiakNode.NODE_OPS, RiakCluster.NODE_OPS_1MIN_PER_NODE) + .build(); + // construct sum and average over cluster + for (AttributeSensor<? extends Number> nodeSensor : enricherSetup.keySet()) { + addSummingMemberEnricher(nodeSensor); + addAveragingMemberEnricher(nodeSensor, enricherSetup.get(nodeSensor)); + } + } + + private void addAveragingMemberEnricher(AttributeSensor<? extends Number> fromSensor, AttributeSensor<? extends Number> toSensor) { + addEnricher(Enrichers.builder() + .aggregating(fromSensor) + .publishing(toSensor) + .fromMembers() + .computingAverage() + .build() + ); + } + + private void addSummingMemberEnricher(AttributeSensor<? extends Number> source) { + addEnricher(Enrichers.builder() + .aggregating(source) + .publishing(source) + .fromMembers() + .computingSum() + .build() + ); } protected void onServerPoolMemberChanged(final Entity member) { @@ -161,6 +203,7 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster { } else { if (nodes != null && nodes.containsKey(member)) { DependentConfiguration.attributeWhenReady(member, RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Predicates.equalTo(false)).blockUntilEnded(Duration.TWO_MINUTES); + @SuppressWarnings("unchecked") Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), Predicates.and( Predicates.instanceOf(RiakNode.class), EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true), http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/33f1e49e/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java index d82c3f4..1f29366 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java @@ -126,10 +126,10 @@ public interface RiakNode extends SoftwareProcess { PortAttributeSensorAndConfigKey SEARCH_SOLR_PORT = new PortAttributeSensorAndConfigKey("search.solr.port", "Solr port", "8093+"); PortAttributeSensorAndConfigKey SEARCH_SOLR_JMX_PORT = new PortAttributeSensorAndConfigKey("search.solr.jmx_port", "Solr port", "8985+"); - AttributeSensor<Integer> NODE_GETS = Sensors.newIntegerSensor("riak.node.gets"); - AttributeSensor<Integer> NODE_GETS_TOTAL = Sensors.newIntegerSensor("riak.node.gets.total"); - AttributeSensor<Integer> NODE_PUTS = Sensors.newIntegerSensor("riak.node.puts"); - AttributeSensor<Integer> NODE_PUTS_TOTAL = Sensors.newIntegerSensor("riak.node.puts.total"); + AttributeSensor<Integer> NODE_GETS = Sensors.newIntegerSensor("riak.node.gets", "Gets in the last minute"); + AttributeSensor<Integer> NODE_GETS_TOTAL = Sensors.newIntegerSensor("riak.node.gets.total", "Total gets since node started"); + AttributeSensor<Integer> NODE_PUTS = Sensors.newIntegerSensor("riak.node.puts", "Puts in the last minute"); + AttributeSensor<Integer> NODE_PUTS_TOTAL = Sensors.newIntegerSensor("riak.node.puts.total", "Total puts since node started"); AttributeSensor<Integer> VNODE_GETS = Sensors.newIntegerSensor("riak.vnode.gets"); AttributeSensor<Integer> VNODE_GETS_TOTAL = Sensors.newIntegerSensor("riak.vnode.gets.total"); @@ -147,6 +147,9 @@ public interface RiakNode extends SoftwareProcess { @SuppressWarnings("serial") AttributeSensor<List<String>> RING_MEMBERS = Sensors.newSensor(new TypeToken<List<String>>() {}, "ring.members", "all the riak nodes in the ring"); + + AttributeSensor<Integer> NODE_OPS = Sensors.newIntegerSensor("riak.node.ops", "Sum of node gets and puts in the last minute"); + AttributeSensor<Integer> NODE_OPS_TOTAL = Sensors.newIntegerSensor("riak.node.ops.total", "Sum of node gets and puts since the node started"); MethodEffector<Void> JOIN_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "joinCluster"); MethodEffector<Void> LEAVE_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "leaveCluster"); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/33f1e49e/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java index dea8b87..590cb3a 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; +import brooklyn.enricher.Enrichers; import brooklyn.entity.basic.Entities; import brooklyn.entity.basic.SoftwareProcessImpl; import brooklyn.entity.webapp.WebAppServiceMethods; @@ -78,6 +79,7 @@ public class RiakNodeImpl extends SoftwareProcessImpl implements RiakNode { maxOpenFiles, defaultMaxOpenFiles); } + @SuppressWarnings("rawtypes") public boolean isPackageDownloadUrlProvided() { AttributeSensorAndConfigKey[] downloadProperties = { DOWNLOAD_URL_RHEL_CENTOS, DOWNLOAD_URL_UBUNTU, DOWNLOAD_URL_DEBIAN }; for (AttributeSensorAndConfigKey property : downloadProperties) { @@ -182,6 +184,8 @@ public class RiakNodeImpl extends SoftwareProcessImpl implements RiakNode { httpFeed = httpFeedBuilder.build(); + addEnricher(Enrichers.builder().combining(NODE_GETS, NODE_PUTS).computingSum().publishing(NODE_OPS).build()); + addEnricher(Enrichers.builder().combining(NODE_GETS_TOTAL, NODE_PUTS_TOTAL).computingSum().publishing(NODE_OPS_TOTAL).build()); WebAppServiceMethods.connectWebAppServerPolicies(this); }
