Repository: incubator-brooklyn Updated Branches: refs/heads/master b3ce25696 -> b118c1194
Fix Cassandra node in private network - Improve how service-up is set. - Ensure that mapped ip:port is used (so goes through NAT etc, where necessary). Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/bc5c9e4e Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/bc5c9e4e Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/bc5c9e4e Branch: refs/heads/master Commit: bc5c9e4e1fbbfc25d4836e72c6d87e1cb444069d Parents: 7ca2781 Author: Aled Sage <[email protected]> Authored: Tue Sep 15 09:13:23 2015 +0100 Committer: Aled Sage <[email protected]> Committed: Tue Sep 15 09:13:23 2015 +0100 ---------------------------------------------------------------------- .../nosql/cassandra/CassandraNodeImpl.java | 72 +++++++++++++------- 1 file changed, 47 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bc5c9e4e/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java index b753df6..578fa6b 100644 --- a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java @@ -40,9 +40,11 @@ import org.apache.brooklyn.core.effector.EffectorBody; import org.apache.brooklyn.core.entity.Attributes; import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.location.Machines; +import org.apache.brooklyn.core.location.access.BrooklynAccessUtils; import org.apache.brooklyn.core.location.cloud.CloudLocationConfig; import org.apache.brooklyn.core.sensor.DependentConfiguration; import org.apache.brooklyn.core.sensor.Sensors; +import org.apache.brooklyn.enricher.stock.Enrichers; import org.apache.brooklyn.entity.java.JavaAppUtils; import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl; import org.apache.brooklyn.feed.function.FunctionFeed; @@ -59,6 +61,7 @@ import org.apache.brooklyn.util.collections.MutableSet; import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.core.text.TemplateProcessor; import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.guava.Functionals; import org.apache.brooklyn.util.guava.Maybe; import org.apache.brooklyn.util.text.Strings; import org.apache.brooklyn.util.time.Duration; @@ -76,6 +79,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.common.net.HostAndPort; /** * Implementation of {@link CassandraNode}. @@ -527,34 +531,14 @@ public class CassandraNodeImpl extends SoftwareProcessImpl implements CassandraN .entity(this) .period(3000, TimeUnit.MILLISECONDS) .poll(new FunctionPollConfig<Long, Long>(THRIFT_PORT_LATENCY) - .onException(Functions.constant((Long)null)) - .callable(new Callable<Long>() { - public Long call() { - try { - long start = System.currentTimeMillis(); - Socket s = new Socket(getAttribute(Attributes.HOSTNAME), getThriftPort()); - s.close(); - long latency = System.currentTimeMillis() - start; - computeServiceUp(); - return latency; - } catch (Exception e) { - if (log.isDebugEnabled()) - log.debug("Cassandra thrift port poll failure: "+e); - setAttribute(SERVICE_UP, false); - return null; - } - } - public void computeServiceUp() { - // this will wait an additional poll period after thrift port is up, - // as the caller will not have set yet, but that will help ensure it is really healthy! - setAttribute(SERVICE_UP, - getAttribute(THRIFT_PORT_LATENCY)!=null && getAttribute(THRIFT_PORT_LATENCY)>=0 && - Boolean.TRUE.equals(getAttribute(SERVICE_UP_JMX))); - }}) + .onException(Functions.constant(-1L)) + .callable(new ThriftLatencyChecker(CassandraNodeImpl.this)) .enabled(retrieveUsageMetrics)) .build(); jmxMxBeanFeed = JavaAppUtils.connectMXBeanSensors(this); + + connectServiceUpIsRunning(); } protected void connectEnrichers() { @@ -575,12 +559,24 @@ public class CassandraNodeImpl extends SoftwareProcessImpl implements CassandraN addEnricher(new RollingTimeWindowMeanEnricher<Double>(this, WRITES_PER_SECOND_LAST, WRITES_PER_SECOND_IN_WINDOW, windowPeriod)); } + + // service-up checks + addEnricher(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS) + .from(THRIFT_PORT_LATENCY) + .computing(Functionals.ifEquals(-1L).value("Thrift latency polling failed") ) + .build()); + + addEnricher(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS) + .from(SERVICE_UP_JMX) + .computing(Functionals.ifEquals(false).value("JMX reports not up") ) + .build()); } @Override public void disconnectSensors() { super.disconnectSensors(); - + + disconnectServiceUpIsRunning(); if (jmxFeed != null) jmxFeed.stop(); if (jmxMxBeanFeed != null) jmxMxBeanFeed.stop(); if (jmxHelper != null) jmxHelper.terminate(); @@ -603,4 +599,30 @@ public class CassandraNodeImpl extends SoftwareProcessImpl implements CassandraN return getDriver().executeScriptAsync(commands).block().getStdout(); } + private static class ThriftLatencyChecker implements Callable<Long> { + private final CassandraNode entity; + + public ThriftLatencyChecker(CassandraNode entity) { + this.entity = entity; + } + public Long call() { + Integer privatePort = entity.getThriftPort(); + if (privatePort == null) return -1L; + + HostAndPort hp = BrooklynAccessUtils.getBrooklynAccessibleAddress(entity, privatePort); + + try { + long start = System.currentTimeMillis(); + Socket s = new Socket(hp.getHostText(), hp.getPort()); + s.close(); + long latency = System.currentTimeMillis() - start; + return latency; + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + if (log.isDebugEnabled()) + log.debug("Cassandra thrift port poll failure: "+e); + return -1L; + } + } + } }
