Riak: support disabling direct connection
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/d01dc7c8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/d01dc7c8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/d01dc7c8 Branch: refs/heads/master Commit: d01dc7c843b34da76ea54cbe921c376728f6c5d6 Parents: b1a57d1 Author: Aled Sage <[email protected]> Authored: Tue Nov 24 13:24:56 2015 +0000 Committer: Aled Sage <[email protected]> Committed: Tue Nov 24 13:53:46 2015 +0000 ---------------------------------------------------------------------- .../brooklyn/entity/nosql/riak/RiakNode.java | 3 + .../entity/nosql/riak/RiakNodeImpl.java | 139 ++++++++++--------- .../entity/nosql/riak/RiakNodeEc2LiveTest.java | 32 ++++- 3 files changed, 103 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d01dc7c8/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNode.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNode.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNode.java index 358e8cb..e21260a 100644 --- a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNode.java +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNode.java @@ -109,6 +109,9 @@ public interface RiakNode extends SoftwareProcess, UsesJava { @SetFromFlag("riakPbPort") PortAttributeSensorAndConfigKey RIAK_PB_PORT = new PortAttributeSensorAndConfigKey("riak.pbPort", "Riak Protocol Buffers Port", "8087+"); + @SetFromFlag("useHttpMonitoring") + ConfigKey<Boolean> USE_HTTP_MONITORING = ConfigKeys.newConfigKey("httpMonitoring.enabled", "HTTP(S) monitoring enabled", Boolean.TRUE); + AttributeSensor<Boolean> RIAK_PACKAGE_INSTALL = Sensors.newBooleanSensor( "riak.install.package", "Flag to indicate whether Riak was installed using an OS package"); AttributeSensor<Boolean> RIAK_ON_PATH = Sensors.newBooleanSensor( http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d01dc7c8/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNodeImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNodeImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNodeImpl.java index 995c469..60358a4 100644 --- a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNodeImpl.java +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNodeImpl.java @@ -44,9 +44,9 @@ import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.guava.Functionals; import org.apache.brooklyn.util.time.Duration; -import com.google.common.base.Preconditions; import com.google.common.base.Function; import com.google.common.base.Functions; +import com.google.common.base.Preconditions; import com.google.common.collect.ContiguousSet; import com.google.common.collect.DiscreteDomain; import com.google.common.collect.Range; @@ -116,74 +116,76 @@ public class RiakNodeImpl extends SoftwareProcessImpl implements RiakNode { connectServiceUpIsRunning(); HostAndPort accessible = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, getRiakWebPort()); - HttpFeed.Builder httpFeedBuilder = HttpFeed.builder() - .entity(this) - .period(500, TimeUnit.MILLISECONDS) - .baseUri(String.format("http://%s/stats", accessible.toString())) - .poll(new HttpPollConfig<Integer>(NODE_GETS) - .onSuccess(HttpValueFunctions.jsonContents("node_gets", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(NODE_GETS_TOTAL) - .onSuccess(HttpValueFunctions.jsonContents("node_gets_total", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(NODE_PUTS) - .onSuccess(HttpValueFunctions.jsonContents("node_puts", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(NODE_PUTS_TOTAL) - .onSuccess(HttpValueFunctions.jsonContents("node_puts_total", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(VNODE_GETS) - .onSuccess(HttpValueFunctions.jsonContents("vnode_gets", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(VNODE_GETS_TOTAL) - .onSuccess(HttpValueFunctions.jsonContents("vnode_gets_total", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(VNODE_PUTS) - .onSuccess(HttpValueFunctions.jsonContents("vnode_puts", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(VNODE_PUTS_TOTAL) - .onSuccess(HttpValueFunctions.jsonContents("vnode_puts_total", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(READ_REPAIRS_TOTAL) - .onSuccess(HttpValueFunctions.jsonContents("read_repairs_total", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(COORD_REDIRS_TOTAL) - .onSuccess(HttpValueFunctions.jsonContents("coord_redirs_total", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(MEMORY_PROCESSES_USED) - .onSuccess(HttpValueFunctions.jsonContents("memory_processes_used", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(SYS_PROCESS_COUNT) - .onSuccess(HttpValueFunctions.jsonContents("sys_process_count", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(PBC_CONNECTS) - .onSuccess(HttpValueFunctions.jsonContents("pbc_connects", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(PBC_ACTIVE) - .onSuccess(HttpValueFunctions.jsonContents("pbc_active", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<List<String>>(RING_MEMBERS) - .onSuccess(Functionals.chain( - HttpValueFunctions.jsonContents("ring_members", String[].class), - new Function<String[], List<String>>() { - @Nullable - @Override - public List<String> apply(@Nullable String[] strings) { - return Arrays.asList(strings); + if (isHttpMonitoringEnabled()) { + HttpFeed.Builder httpFeedBuilder = HttpFeed.builder() + .entity(this) + .period(500, TimeUnit.MILLISECONDS) + .baseUri(String.format("http://%s/stats", accessible.toString())) + .poll(new HttpPollConfig<Integer>(NODE_GETS) + .onSuccess(HttpValueFunctions.jsonContents("node_gets", Integer.class)) + .onFailureOrException(Functions.constant(-1))) + .poll(new HttpPollConfig<Integer>(NODE_GETS_TOTAL) + .onSuccess(HttpValueFunctions.jsonContents("node_gets_total", Integer.class)) + .onFailureOrException(Functions.constant(-1))) + .poll(new HttpPollConfig<Integer>(NODE_PUTS) + .onSuccess(HttpValueFunctions.jsonContents("node_puts", Integer.class)) + .onFailureOrException(Functions.constant(-1))) + .poll(new HttpPollConfig<Integer>(NODE_PUTS_TOTAL) + .onSuccess(HttpValueFunctions.jsonContents("node_puts_total", Integer.class)) + .onFailureOrException(Functions.constant(-1))) + .poll(new HttpPollConfig<Integer>(VNODE_GETS) + .onSuccess(HttpValueFunctions.jsonContents("vnode_gets", Integer.class)) + .onFailureOrException(Functions.constant(-1))) + .poll(new HttpPollConfig<Integer>(VNODE_GETS_TOTAL) + .onSuccess(HttpValueFunctions.jsonContents("vnode_gets_total", Integer.class)) + .onFailureOrException(Functions.constant(-1))) + .poll(new HttpPollConfig<Integer>(VNODE_PUTS) + .onSuccess(HttpValueFunctions.jsonContents("vnode_puts", Integer.class)) + .onFailureOrException(Functions.constant(-1))) + .poll(new HttpPollConfig<Integer>(VNODE_PUTS_TOTAL) + .onSuccess(HttpValueFunctions.jsonContents("vnode_puts_total", Integer.class)) + .onFailureOrException(Functions.constant(-1))) + .poll(new HttpPollConfig<Integer>(READ_REPAIRS_TOTAL) + .onSuccess(HttpValueFunctions.jsonContents("read_repairs_total", Integer.class)) + .onFailureOrException(Functions.constant(-1))) + .poll(new HttpPollConfig<Integer>(COORD_REDIRS_TOTAL) + .onSuccess(HttpValueFunctions.jsonContents("coord_redirs_total", Integer.class)) + .onFailureOrException(Functions.constant(-1))) + .poll(new HttpPollConfig<Integer>(MEMORY_PROCESSES_USED) + .onSuccess(HttpValueFunctions.jsonContents("memory_processes_used", Integer.class)) + .onFailureOrException(Functions.constant(-1))) + .poll(new HttpPollConfig<Integer>(SYS_PROCESS_COUNT) + .onSuccess(HttpValueFunctions.jsonContents("sys_process_count", Integer.class)) + .onFailureOrException(Functions.constant(-1))) + .poll(new HttpPollConfig<Integer>(PBC_CONNECTS) + .onSuccess(HttpValueFunctions.jsonContents("pbc_connects", Integer.class)) + .onFailureOrException(Functions.constant(-1))) + .poll(new HttpPollConfig<Integer>(PBC_ACTIVE) + .onSuccess(HttpValueFunctions.jsonContents("pbc_active", Integer.class)) + .onFailureOrException(Functions.constant(-1))) + .poll(new HttpPollConfig<List<String>>(RING_MEMBERS) + .onSuccess(Functionals.chain( + HttpValueFunctions.jsonContents("ring_members", String[].class), + new Function<String[], List<String>>() { + @Nullable + @Override + public List<String> apply(@Nullable String[] strings) { + return Arrays.asList(strings); + } } - } - )) - .onFailureOrException(Functions.constant(Arrays.asList(new String[0])))); - - for (AttributeSensor<Integer> sensor : ONE_MINUTE_SENSORS) { - httpFeedBuilder.poll(new HttpPollConfig<Integer>(sensor) - .period(Duration.ONE_MINUTE) - .onSuccess(HttpValueFunctions.jsonContents(sensor.getName().substring(5), Integer.class)) - .onFailureOrException(Functions.constant(-1))); + )) + .onFailureOrException(Functions.constant(Arrays.asList(new String[0])))); + + for (AttributeSensor<Integer> sensor : ONE_MINUTE_SENSORS) { + httpFeedBuilder.poll(new HttpPollConfig<Integer>(sensor) + .period(Duration.ONE_MINUTE) + .onSuccess(HttpValueFunctions.jsonContents(sensor.getName().substring(5), Integer.class)) + .onFailureOrException(Functions.constant(-1))); + } + + httpFeed = httpFeedBuilder.build(); } - - httpFeed = httpFeedBuilder.build(); - + enrichers().add(Enrichers.builder().combining(NODE_GETS, NODE_PUTS).computingSum().publishing(NODE_OPS).build()); enrichers().add(Enrichers.builder().combining(NODE_GETS_TOTAL, NODE_PUTS_TOTAL).computingSum().publishing(NODE_OPS_TOTAL).build()); WebAppServiceMethods.connectWebAppServerPolicies(this); @@ -243,6 +245,9 @@ public class RiakNodeImpl extends SoftwareProcessImpl implements RiakNode { getDriver().recoverFailedNode(nodeName); } + protected boolean isHttpMonitoringEnabled() { + return Boolean.TRUE.equals(getConfig(USE_HTTP_MONITORING)); + } @Override public Integer getRiakWebPort() { return getAttribute(RiakNode.RIAK_WEB_PORT); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d01dc7c8/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/riak/RiakNodeEc2LiveTest.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/riak/RiakNodeEc2LiveTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/riak/RiakNodeEc2LiveTest.java index 907f8b3..fa01e3e 100644 --- a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/riak/RiakNodeEc2LiveTest.java +++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/riak/RiakNodeEc2LiveTest.java @@ -18,8 +18,14 @@ */ package org.apache.brooklyn.entity.nosql.riak; +import static org.testng.Assert.assertNotNull; + import org.apache.brooklyn.api.entity.EntitySpec; import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.core.entity.Attributes; +import org.apache.brooklyn.core.entity.EntityAsserts; +import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; +import org.apache.brooklyn.core.location.cloud.CloudLocationConfig; import org.apache.brooklyn.entity.AbstractEc2LiveTest; import org.apache.brooklyn.test.EntityTestUtils; import org.slf4j.Logger; @@ -27,6 +33,7 @@ import org.slf4j.LoggerFactory; import org.testng.annotations.Test; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; public class RiakNodeEc2LiveTest extends AbstractEc2LiveTest { @@ -42,9 +49,26 @@ public class RiakNodeEc2LiveTest extends AbstractEc2LiveTest { } - @Test(enabled = false) - public void testDummy() { - } // Convince TestNG IDE integration that this really does have test methods - + @Test(groups = {"Live"}) + public void testWithOnlyPort22() throws Exception { + // CentOS-6.3-x86_64-GA-EBS-02-85586466-5b6c-4495-b580-14f72b4bcf51-ami-bb9af1d2.1 + jcloudsLocation = mgmt.getLocationRegistry().resolve(LOCATION_SPEC, ImmutableMap.of( + "tags", ImmutableList.of(getClass().getName()), + "imageId", "us-east-1/ami-a96b01c0", + "hardwareId", SMALL_HARDWARE_ID)); + RiakNode server = app.createAndManageChild(EntitySpec.create(RiakNode.class) + .configure(RiakNode.PROVISIONING_PROPERTIES.subKey(CloudLocationConfig.INBOUND_PORTS.getName()), ImmutableList.of(22)) + .configure(RiakNode.USE_HTTP_MONITORING, false)); + + app.start(ImmutableList.of(jcloudsLocation)); + + EntityAsserts.assertAttributeEqualsEventually(server, Attributes.SERVICE_UP, true); + EntityAsserts.assertAttributeEqualsEventually(server, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); + + Integer port = server.getAttribute(RiakNode.RIAK_PB_PORT); + assertNotNull(port); + + assertViaSshLocalPortListeningEventually(server, port); + } }
