Repository: incubator-brooklyn
Updated Branches:
  refs/heads/master 9e7b523bf -> 7fbcf8488


Riak Linux - add more sensors


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/bdffc952
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/bdffc952
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/bdffc952

Branch: refs/heads/master
Commit: bdffc9520a5b9b7c11e5fb6266fa43595c2ea301
Parents: e08d320
Author: Valentin Aitken <[email protected]>
Authored: Mon Mar 23 20:29:58 2015 +0200
Committer: Valentin Aitken <[email protected]>
Committed: Mon Mar 23 22:31:44 2015 +0200

----------------------------------------------------------------------
 .../brooklyn/entity/nosql/riak/RiakNode.java    | 55 +++++++++++++-------
 .../entity/nosql/riak/RiakNodeImpl.java         | 14 +++--
 2 files changed, 48 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bdffc952/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java 
b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
index c1f1bf8..12724d1 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
@@ -29,12 +29,10 @@ import brooklyn.entity.basic.MethodEffector;
 import brooklyn.entity.basic.SoftwareProcess;
 import brooklyn.entity.proxying.ImplementedBy;
 import brooklyn.event.AttributeSensor;
-import brooklyn.event.basic.AttributeSensorAndConfigKey;
-import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
-import brooklyn.event.basic.Sensors;
-import brooklyn.event.basic.TemplatedStringAttributeSensorAndConfigKey;
+import brooklyn.event.basic.*;
 import brooklyn.util.flags.SetFromFlag;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.reflect.TypeToken;
 
 @Catalog(name="Riak Node", description="Riak is a distributed NoSQL key-value 
data store that offers "
@@ -116,24 +114,24 @@ public interface RiakNode extends SoftwareProcess {
     PortAttributeSensorAndConfigKey SEARCH_SOLR_PORT = new 
PortAttributeSensorAndConfigKey("riak.search.solr.port", "Solr port", "8093+");
     PortAttributeSensorAndConfigKey SEARCH_SOLR_JMX_PORT = new 
PortAttributeSensorAndConfigKey("riak.search.solr.jmx_port", "Solr port", 
"8985+");
 
-    AttributeSensor<Integer> NODE_GETS = Sensors.newIntegerSensor("node.gets");
-    AttributeSensor<Integer> NODE_GETS_TOTAL = 
Sensors.newIntegerSensor("node.gets.total");
-    AttributeSensor<Integer> NODE_PUTS = Sensors.newIntegerSensor("node.puts");
-    AttributeSensor<Integer> NODE_PUTS_TOTAL = 
Sensors.newIntegerSensor("node.puts.total");
-    AttributeSensor<Integer> VNODE_GETS = 
Sensors.newIntegerSensor("vnode.gets");
-    AttributeSensor<Integer> VNODE_GETS_TOTAL = 
Sensors.newIntegerSensor("vnode.gets.total");
+    AttributeSensor<Integer> NODE_GETS = 
Sensors.newIntegerSensor("riak.node.gets");
+    AttributeSensor<Integer> NODE_GETS_TOTAL = 
Sensors.newIntegerSensor("riak.node.gets.total");
+    AttributeSensor<Integer> NODE_PUTS = 
Sensors.newIntegerSensor("riak.node.puts");
+    AttributeSensor<Integer> NODE_PUTS_TOTAL = 
Sensors.newIntegerSensor("riak.node.puts.total");
+    AttributeSensor<Integer> VNODE_GETS = 
Sensors.newIntegerSensor("riak.vnode.gets");
+    AttributeSensor<Integer> VNODE_GETS_TOTAL = 
Sensors.newIntegerSensor("riak.vnode.gets.total");
 
     //Sensors for Riak Node Counters (within 1 minute window or lifetime of 
node.
     
//http://docs.basho.com/riak/latest/ops/running/stats-and-monitoring/#Statistics-from-Riak
-    AttributeSensor<Integer> VNODE_PUTS = 
Sensors.newIntegerSensor("vnode.puts");
-    AttributeSensor<Integer> VNODE_PUTS_TOTAL = 
Sensors.newIntegerSensor("vnode.puts.total");
-    AttributeSensor<Integer> READ_REPAIRS_TOTAL = 
Sensors.newIntegerSensor("read.repairs.total");
-    AttributeSensor<Integer> COORD_REDIRS_TOTAL = 
Sensors.newIntegerSensor("coord.redirs.total");
+    AttributeSensor<Integer> VNODE_PUTS = 
Sensors.newIntegerSensor("riak.vnode.puts");
+    AttributeSensor<Integer> VNODE_PUTS_TOTAL = 
Sensors.newIntegerSensor("riak.vnode.puts.total");
+    AttributeSensor<Integer> READ_REPAIRS_TOTAL = 
Sensors.newIntegerSensor("riak.read.repairs.total");
+    AttributeSensor<Integer> COORD_REDIRS_TOTAL = 
Sensors.newIntegerSensor("riak.coord.redirs.total");
     //Additional Riak node counters
-    AttributeSensor<Integer> MEMORY_PROCESSES_USED = 
Sensors.newIntegerSensor("memory.processes.used");
-    AttributeSensor<Integer> SYS_PROCESS_COUNT = 
Sensors.newIntegerSensor("sys.process.count");
-    AttributeSensor<Integer> PBC_CONNECTS = 
Sensors.newIntegerSensor("pbc.connects");
-    AttributeSensor<Integer> PBC_ACTIVE = 
Sensors.newIntegerSensor("pbc.active");
+    AttributeSensor<Integer> MEMORY_PROCESSES_USED = 
Sensors.newIntegerSensor("riak.memory.processes.used");
+    AttributeSensor<Integer> SYS_PROCESS_COUNT = 
Sensors.newIntegerSensor("riak.sys.process.count");
+    AttributeSensor<Integer> PBC_CONNECTS = 
Sensors.newIntegerSensor("riak.pbc.connects");
+    AttributeSensor<Integer> PBC_ACTIVE = 
Sensors.newIntegerSensor("riak.pbc.active");
     @SuppressWarnings("serial")
     AttributeSensor<List<String>> RING_MEMBERS = Sensors.newSensor(new 
TypeToken<List<String>>() {},
             "ring.members", "all the riak nodes in the ring");
@@ -142,6 +140,27 @@ public interface RiakNode extends SoftwareProcess {
     MethodEffector<Void> LEAVE_RIAK_CLUSTER = new 
MethodEffector<Void>(RiakNode.class, "leaveCluster");
     MethodEffector<Void> COMMIT_RIAK_CLUSTER = new 
MethodEffector<Void>(RiakNode.class, "commitCluster");
 
+    AttributeSensor<Integer> RIAK_NODE_GET_FSM_TIME_MEAN = 
Sensors.newIntegerSensor("riak.node_get_fsm_time_mean", "Time between reception 
of client read request and subsequent response to client");
+    AttributeSensor<Integer> RIAK_NODE_PUT_FSM_TIME_MEAN = 
Sensors.newIntegerSensor("riak.node_put_fsm_time_mean", "Time between reception 
of client write request and subsequent response to client");
+    AttributeSensor<Integer> RIAK_OBJECT_COUNTER_MERGE_TIME_MEAN = 
Sensors.newIntegerSensor("riak.object_counter_merge_time_mean", "Time it takes 
to perform an Update Counter operation");
+    AttributeSensor<Integer> RIAK_OBJECT_SET_MERGE_TIME_MEAN = 
Sensors.newIntegerSensor("riak.object_set_merge_time_mean", "Time it takes to 
perform an Update Set operation");
+    AttributeSensor<Integer> RIAK_OBJECT_MAP_MERGE_TIME_MEAN = 
Sensors.newIntegerSensor("riak.object_map_merge_time_mean", "Time it takes to 
perform an Update Map operation");
+    AttributeSensor<Integer> RIAK_SEARCH_QUERY_LATENCY_MEDIAN = 
Sensors.newIntegerSensor("riak.search_query_latency_median", "Search query 
latency");
+    AttributeSensor<Integer> RIAK_SEARCH_INDEX_LATENCY_MEDIAN = 
Sensors.newIntegerSensor("riak.search_index_latency_median", "Time it takes 
Search to index a new document");
+    AttributeSensor<Integer> RIAK_CONSISTENT_GET_TIME_MEAN = 
Sensors.newIntegerSensor("riak.consistent_get_time_mean", "Strongly consistent 
read latency");
+    AttributeSensor<Integer> RIAK_CONSISTENT_PUT_TIME_MEAN = 
Sensors.newIntegerSensor("riak.consistent_put_time_mean", "Strongly consistent 
write latency");
+
+    List<AttributeSensor<Integer>> ONE_MINUTE_STRING_SENSORS = 
ImmutableList.of(RIAK_NODE_GET_FSM_TIME_MEAN, RIAK_NODE_PUT_FSM_TIME_MEAN,
+            RIAK_OBJECT_COUNTER_MERGE_TIME_MEAN, 
RIAK_OBJECT_SET_MERGE_TIME_MEAN, RIAK_OBJECT_MAP_MERGE_TIME_MEAN,
+            RIAK_SEARCH_QUERY_LATENCY_MEDIAN, RIAK_SEARCH_INDEX_LATENCY_MEDIAN,
+            RIAK_CONSISTENT_GET_TIME_MEAN, RIAK_CONSISTENT_PUT_TIME_MEAN);
+
+    ConfigKey<Boolean> DO_OS_TUNING = new BasicConfigKey<Boolean>(
+            Boolean.class,
+            "do.os.tunning",
+            "Does sysctl OS optimizations. By default it is true",
+            true);
+
     // accessors, for use from template file
     Integer getRiakWebPort();
 

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bdffc952/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java 
b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
index 0667a7a..992c77c 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
 import brooklyn.entity.basic.Entities;
 import brooklyn.entity.basic.SoftwareProcessImpl;
 import brooklyn.entity.webapp.WebAppServiceMethods;
+import brooklyn.event.AttributeSensor;
 import brooklyn.event.basic.AttributeSensorAndConfigKey;
 import brooklyn.event.feed.http.HttpFeed;
 import brooklyn.event.feed.http.HttpPollConfig;
@@ -104,7 +105,7 @@ public class RiakNodeImpl extends SoftwareProcessImpl 
implements RiakNode {
         connectServiceUpIsRunning();
         HostAndPort accessible = 
BrooklynAccessUtils.getBrooklynAccessibleAddress(this, getRiakWebPort());
 
-        httpFeed = HttpFeed.builder()
+        HttpFeed.Builder httpFeedBuilder = HttpFeed.builder()
                 .entity(this)
                 .period(500, TimeUnit.MILLISECONDS)
                 .baseUri(String.format("http://%s/stats";, 
accessible.toString()))
@@ -161,8 +162,15 @@ public class RiakNodeImpl extends SoftwareProcessImpl 
implements RiakNode {
                                     }
                                 }
                         ))
-                        
.onFailureOrException(Functions.constant(Arrays.asList(new String[0]))))
-                .build();
+                        
.onFailureOrException(Functions.constant(Arrays.asList(new String[0]))));
+
+        for (AttributeSensor<Integer> sensor : ONE_MINUTE_STRING_SENSORS) {
+            httpFeedBuilder.poll(new HttpPollConfig<Integer>(sensor)
+                    
.onSuccess(HttpValueFunctions.jsonContents(sensor.getName().substring(5), 
Integer.class))
+                    .onFailureOrException(Functions.constant(-1)));
+        }
+
+        httpFeed = httpFeedBuilder.build();
 
         WebAppServiceMethods.connectWebAppServerPolicies(this);
     }

Reply via email to