Cassandra: support disabling direct connection
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/0904ed97 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/0904ed97 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/0904ed97 Branch: refs/heads/master Commit: 0904ed972813e659d96b706b31333818f4bfae3e Parents: d32693a Author: Aled Sage <[email protected]> Authored: Tue Nov 24 13:25:58 2015 +0000 Committer: Aled Sage <[email protected]> Committed: Tue Nov 24 13:53:47 2015 +0000 ---------------------------------------------------------------------- .../entity/nosql/cassandra/CassandraNode.java | 3 + .../nosql/cassandra/CassandraNodeImpl.java | 239 ++++++++++--------- .../cassandra/CassandraNodeEc2LiveTest.java | 32 +++ 3 files changed, 159 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0904ed97/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java index 0f42be5..fb937ae 100644 --- a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java @@ -141,6 +141,9 @@ public interface CassandraNode extends DatastoreMixins.DatastoreCommon, Software BasicAttributeSensorAndConfigKey<Set<BigInteger>> TOKENS = new BasicAttributeSensorAndConfigKey<Set<BigInteger>>( new TypeToken<Set<BigInteger>>() {}, "cassandra.tokens", "Cassandra Tokens"); + @SetFromFlag("useThriftMonitoring") + ConfigKey<Boolean> USE_THRIFT_MONITORING = ConfigKeys.newConfigKey("thriftMonitoring.enabled", "Thrift-port monitoring enabled", Boolean.TRUE); + AttributeSensor<Integer> PEERS = Sensors.newIntegerSensor( "cassandra.peers", "Number of peers in cluster"); AttributeSensor<Integer> LIVE_NODE_COUNT = Sensors.newIntegerSensor( "cassandra.liveNodeCount", "Number of live nodes in cluster"); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0904ed97/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java index e08c99a..43bf4b2 100644 --- a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java @@ -46,6 +46,7 @@ import org.apache.brooklyn.core.sensor.DependentConfiguration; import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.enricher.stock.Enrichers; import org.apache.brooklyn.entity.java.JavaAppUtils; +import org.apache.brooklyn.entity.java.UsesJmx; import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl; import org.apache.brooklyn.feed.function.FunctionFeed; import org.apache.brooklyn.feed.function.FunctionPollConfig; @@ -397,115 +398,119 @@ public class CassandraNodeImpl extends SoftwareProcessImpl implements CassandraN jmxHelper = new JmxHelper(this); boolean retrieveUsageMetrics = getConfig(RETRIEVE_USAGE_METRICS); - jmxFeed = JmxFeed.builder() - .entity(this) - .period(3000, TimeUnit.MILLISECONDS) - .helper(jmxHelper) - .pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP_JMX) - .objectName(storageServiceMBean) - .attributeName("Initialized") - .onSuccess(Functions.forPredicate(Predicates.notNull())) - .onException(Functions.constant(false)) - .suppressDuplicates(true)) - .pollAttribute(new JmxAttributePollConfig<Set<BigInteger>>(TOKENS) - .objectName(storageServiceMBean) - .attributeName("TokenToEndpointMap") - .onSuccess(new Function<Object, Set<BigInteger>>() { - @Override - public Set<BigInteger> apply(@Nullable Object arg) { - Map input = (Map)arg; - if (input == null || input.isEmpty()) return null; - // FIXME does not work on aws-ec2, uses RFC1918 address - Predicate<String> self = Predicates.in(ImmutableList.of(getAttribute(HOSTNAME), getAttribute(ADDRESS), getAttribute(SUBNET_ADDRESS), getAttribute(SUBNET_HOSTNAME))); - Set<String> tokens = Maps.filterValues(input, self).keySet(); - Set<BigInteger> result = Sets.newLinkedHashSet(); - for (String token : tokens) { - result.add(new BigInteger(token)); + if (getDriver().isJmxEnabled()) { + jmxFeed = JmxFeed.builder() + .entity(this) + .period(3000, TimeUnit.MILLISECONDS) + .helper(jmxHelper) + .pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP_JMX) + .objectName(storageServiceMBean) + .attributeName("Initialized") + .onSuccess(Functions.forPredicate(Predicates.notNull())) + .onException(Functions.constant(false)) + .suppressDuplicates(true)) + .pollAttribute(new JmxAttributePollConfig<Set<BigInteger>>(TOKENS) + .objectName(storageServiceMBean) + .attributeName("TokenToEndpointMap") + .onSuccess(new Function<Object, Set<BigInteger>>() { + @Override + public Set<BigInteger> apply(@Nullable Object arg) { + Map input = (Map)arg; + if (input == null || input.isEmpty()) return null; + // FIXME does not work on aws-ec2, uses RFC1918 address + Predicate<String> self = Predicates.in(ImmutableList.of(getAttribute(HOSTNAME), getAttribute(ADDRESS), getAttribute(SUBNET_ADDRESS), getAttribute(SUBNET_HOSTNAME))); + Set<String> tokens = Maps.filterValues(input, self).keySet(); + Set<BigInteger> result = Sets.newLinkedHashSet(); + for (String token : tokens) { + result.add(new BigInteger(token)); + } + return result; + }}) + .onException(Functions.<Set<BigInteger>>constant(null)) + .suppressDuplicates(true)) + .pollOperation(new JmxOperationPollConfig<String>(DATACENTER_NAME) + .period(60, TimeUnit.SECONDS) + .objectName(snitchMBean) + .operationName("getDatacenter") + .operationParams(ImmutableList.of(getBroadcastAddress())) + .onException(Functions.<String>constant(null)) + .suppressDuplicates(true)) + .pollOperation(new JmxOperationPollConfig<String>(RACK_NAME) + .period(60, TimeUnit.SECONDS) + .objectName(snitchMBean) + .operationName("getRack") + .operationParams(ImmutableList.of(getBroadcastAddress())) + .onException(Functions.<String>constant(null)) + .suppressDuplicates(true)) + .pollAttribute(new JmxAttributePollConfig<Integer>(PEERS) + .objectName(storageServiceMBean) + .attributeName("TokenToEndpointMap") + .onSuccess(new Function<Object, Integer>() { + @Override + public Integer apply(@Nullable Object arg) { + Map input = (Map)arg; + if (input == null || input.isEmpty()) return 0; + return input.size(); } - return result; - }}) - .onException(Functions.<Set<BigInteger>>constant(null)) - .suppressDuplicates(true)) - .pollOperation(new JmxOperationPollConfig<String>(DATACENTER_NAME) - .period(60, TimeUnit.SECONDS) - .objectName(snitchMBean) - .operationName("getDatacenter") - .operationParams(ImmutableList.of(getBroadcastAddress())) - .onException(Functions.<String>constant(null)) - .suppressDuplicates(true)) - .pollOperation(new JmxOperationPollConfig<String>(RACK_NAME) - .period(60, TimeUnit.SECONDS) - .objectName(snitchMBean) - .operationName("getRack") - .operationParams(ImmutableList.of(getBroadcastAddress())) - .onException(Functions.<String>constant(null)) - .suppressDuplicates(true)) - .pollAttribute(new JmxAttributePollConfig<Integer>(PEERS) - .objectName(storageServiceMBean) - .attributeName("TokenToEndpointMap") - .onSuccess(new Function<Object, Integer>() { - @Override - public Integer apply(@Nullable Object arg) { - Map input = (Map)arg; - if (input == null || input.isEmpty()) return 0; - return input.size(); - } - }) - .onException(Functions.constant(-1))) - .pollAttribute(new JmxAttributePollConfig<Integer>(LIVE_NODE_COUNT) - .objectName(storageServiceMBean) - .attributeName("LiveNodes") - .onSuccess(new Function<Object, Integer>() { - @Override - public Integer apply(@Nullable Object arg) { - List input = (List)arg; - if (input == null || input.isEmpty()) return 0; - return input.size(); - } - }) - .onException(Functions.constant(-1))) - .pollAttribute(new JmxAttributePollConfig<Integer>(READ_ACTIVE) - .objectName(readStageMBean) - .attributeName("ActiveCount") - .onException(Functions.constant((Integer)null)) - .enabled(retrieveUsageMetrics)) - .pollAttribute(new JmxAttributePollConfig<Long>(READ_PENDING) - .objectName(readStageMBean) - .attributeName("PendingTasks") - .onException(Functions.constant((Long)null)) - .enabled(retrieveUsageMetrics)) - .pollAttribute(new JmxAttributePollConfig<Long>(READ_COMPLETED) - .objectName(readStageMBean) - .attributeName("CompletedTasks") - .onException(Functions.constant((Long)null)) - .enabled(retrieveUsageMetrics)) - .pollAttribute(new JmxAttributePollConfig<Integer>(WRITE_ACTIVE) - .objectName(mutationStageMBean) - .attributeName("ActiveCount") - .onException(Functions.constant((Integer)null)) - .enabled(retrieveUsageMetrics)) - .pollAttribute(new JmxAttributePollConfig<Long>(WRITE_PENDING) - .objectName(mutationStageMBean) - .attributeName("PendingTasks") - .onException(Functions.constant((Long)null)) - .enabled(retrieveUsageMetrics)) - .pollAttribute(new JmxAttributePollConfig<Long>(WRITE_COMPLETED) - .objectName(mutationStageMBean) - .attributeName("CompletedTasks") - .onException(Functions.constant((Long)null)) - .enabled(retrieveUsageMetrics)) - .build(); - - functionFeed = FunctionFeed.builder() - .entity(this) - .period(3000, TimeUnit.MILLISECONDS) - .poll(new FunctionPollConfig<Long, Long>(THRIFT_PORT_LATENCY) - .onException(Functions.constant(-1L)) - .callable(new ThriftLatencyChecker(CassandraNodeImpl.this)) - .enabled(retrieveUsageMetrics)) - .build(); + }) + .onException(Functions.constant(-1))) + .pollAttribute(new JmxAttributePollConfig<Integer>(LIVE_NODE_COUNT) + .objectName(storageServiceMBean) + .attributeName("LiveNodes") + .onSuccess(new Function<Object, Integer>() { + @Override + public Integer apply(@Nullable Object arg) { + List input = (List)arg; + if (input == null || input.isEmpty()) return 0; + return input.size(); + } + }) + .onException(Functions.constant(-1))) + .pollAttribute(new JmxAttributePollConfig<Integer>(READ_ACTIVE) + .objectName(readStageMBean) + .attributeName("ActiveCount") + .onException(Functions.constant((Integer)null)) + .enabled(retrieveUsageMetrics)) + .pollAttribute(new JmxAttributePollConfig<Long>(READ_PENDING) + .objectName(readStageMBean) + .attributeName("PendingTasks") + .onException(Functions.constant((Long)null)) + .enabled(retrieveUsageMetrics)) + .pollAttribute(new JmxAttributePollConfig<Long>(READ_COMPLETED) + .objectName(readStageMBean) + .attributeName("CompletedTasks") + .onException(Functions.constant((Long)null)) + .enabled(retrieveUsageMetrics)) + .pollAttribute(new JmxAttributePollConfig<Integer>(WRITE_ACTIVE) + .objectName(mutationStageMBean) + .attributeName("ActiveCount") + .onException(Functions.constant((Integer)null)) + .enabled(retrieveUsageMetrics)) + .pollAttribute(new JmxAttributePollConfig<Long>(WRITE_PENDING) + .objectName(mutationStageMBean) + .attributeName("PendingTasks") + .onException(Functions.constant((Long)null)) + .enabled(retrieveUsageMetrics)) + .pollAttribute(new JmxAttributePollConfig<Long>(WRITE_COMPLETED) + .objectName(mutationStageMBean) + .attributeName("CompletedTasks") + .onException(Functions.constant((Long)null)) + .enabled(retrieveUsageMetrics)) + .build(); + + jmxMxBeanFeed = JavaAppUtils.connectMXBeanSensors(this); + } - jmxMxBeanFeed = JavaAppUtils.connectMXBeanSensors(this); + if (Boolean.TRUE.equals(getConfig(USE_THRIFT_MONITORING))) { + functionFeed = FunctionFeed.builder() + .entity(this) + .period(3000, TimeUnit.MILLISECONDS) + .poll(new FunctionPollConfig<Long, Long>(THRIFT_PORT_LATENCY) + .onException(Functions.constant(-1L)) + .callable(new ThriftLatencyChecker(CassandraNodeImpl.this)) + .enabled(retrieveUsageMetrics)) + .build(); + } connectServiceUpIsRunning(); } @@ -530,15 +535,19 @@ public class CassandraNodeImpl extends SoftwareProcessImpl implements CassandraN } // service-up checks - enrichers().add(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS) - .from(THRIFT_PORT_LATENCY) - .computing(Functionals.ifEquals(-1L).value("Thrift latency polling failed") ) - .build()); + if (Boolean.TRUE.equals(getConfig(USE_THRIFT_MONITORING))) { + enrichers().add(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS) + .from(THRIFT_PORT_LATENCY) + .computing(Functionals.ifEquals(-1L).value("Thrift latency polling failed") ) + .build()); + } - enrichers().add(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS) - .from(SERVICE_UP_JMX) - .computing(Functionals.ifEquals(false).value("JMX reports not up") ) - .build()); + if (Boolean.TRUE.equals(getConfig(UsesJmx.USE_JMX))) { + enrichers().add(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS) + .from(SERVICE_UP_JMX) + .computing(Functionals.ifEquals(false).value("JMX reports not up") ) + .build()); + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0904ed97/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java index 3899c3b..c1b8b04 100644 --- a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java +++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java @@ -18,15 +18,23 @@ */ package org.apache.brooklyn.entity.nosql.cassandra; +import static org.testng.Assert.assertNotNull; + import org.apache.brooklyn.api.entity.EntitySpec; import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.core.entity.Attributes; +import org.apache.brooklyn.core.entity.EntityAsserts; +import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; +import org.apache.brooklyn.core.location.cloud.CloudLocationConfig; import org.apache.brooklyn.entity.AbstractEc2LiveTest; import org.apache.brooklyn.entity.nosql.cassandra.AstyanaxSupport.AstyanaxSample; import org.apache.brooklyn.test.EntityTestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.annotations.Test; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; public class CassandraNodeEc2LiveTest extends AbstractEc2LiveTest { @@ -46,4 +54,28 @@ public class CassandraNodeEc2LiveTest extends AbstractEc2LiveTest { AstyanaxSample astyanax = new AstyanaxSample(cassandra); astyanax.astyanaxTest(); } + + @Test(groups = {"Live"}) + public void testWithOnlyPort22() throws Exception { + // CentOS-6.3-x86_64-GA-EBS-02-85586466-5b6c-4495-b580-14f72b4bcf51-ami-bb9af1d2.1 + jcloudsLocation = mgmt.getLocationRegistry().resolve(LOCATION_SPEC, ImmutableMap.of( + "tags", ImmutableList.of(getClass().getName()), + "imageId", "us-east-1/ami-a96b01c0", + "hardwareId", SMALL_HARDWARE_ID)); + + CassandraNode server = app.createAndManageChild(EntitySpec.create(CassandraNode.class) + .configure(CassandraNode.PROVISIONING_PROPERTIES.subKey(CloudLocationConfig.INBOUND_PORTS.getName()), ImmutableList.of(22)) + .configure(CassandraNode.USE_JMX, false) + .configure(CassandraNode.USE_THRIFT_MONITORING, false)); + + app.start(ImmutableList.of(jcloudsLocation)); + + EntityAsserts.assertAttributeEqualsEventually(server, Attributes.SERVICE_UP, true); + EntityAsserts.assertAttributeEqualsEventually(server, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); + + Integer port = server.getAttribute(CassandraNode.THRIFT_PORT); + assertNotNull(port); + + assertViaSshLocalPortListeningEventually(server, port); + } }
