Repository: incubator-brooklyn
Updated Branches:
  refs/heads/master b3ce25696 -> b118c1194


Fix Cassandra node in private network

- Improve how service-up is set.
- Ensure that mapped ip:port is used (so goes through NAT etc,
  where necessary).

Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/bc5c9e4e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/bc5c9e4e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/bc5c9e4e

Branch: refs/heads/master
Commit: bc5c9e4e1fbbfc25d4836e72c6d87e1cb444069d
Parents: 7ca2781
Author: Aled Sage <[email protected]>
Authored: Tue Sep 15 09:13:23 2015 +0100
Committer: Aled Sage <[email protected]>
Committed: Tue Sep 15 09:13:23 2015 +0100

----------------------------------------------------------------------
 .../nosql/cassandra/CassandraNodeImpl.java      | 72 +++++++++++++-------
 1 file changed, 47 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bc5c9e4e/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java
 
b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java
index b753df6..578fa6b 100644
--- 
a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java
+++ 
b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java
@@ -40,9 +40,11 @@ import org.apache.brooklyn.core.effector.EffectorBody;
 import org.apache.brooklyn.core.entity.Attributes;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.location.Machines;
+import org.apache.brooklyn.core.location.access.BrooklynAccessUtils;
 import org.apache.brooklyn.core.location.cloud.CloudLocationConfig;
 import org.apache.brooklyn.core.sensor.DependentConfiguration;
 import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.enricher.stock.Enrichers;
 import org.apache.brooklyn.entity.java.JavaAppUtils;
 import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl;
 import org.apache.brooklyn.feed.function.FunctionFeed;
@@ -59,6 +61,7 @@ import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.core.text.TemplateProcessor;
 import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.guava.Functionals;
 import org.apache.brooklyn.util.guava.Maybe;
 import org.apache.brooklyn.util.text.Strings;
 import org.apache.brooklyn.util.time.Duration;
@@ -76,6 +79,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.common.net.HostAndPort;
 
 /**
  * Implementation of {@link CassandraNode}.
@@ -527,34 +531,14 @@ public class CassandraNodeImpl extends 
SoftwareProcessImpl implements CassandraN
                 .entity(this)
                 .period(3000, TimeUnit.MILLISECONDS)
                 .poll(new FunctionPollConfig<Long, Long>(THRIFT_PORT_LATENCY)
-                        .onException(Functions.constant((Long)null))
-                        .callable(new Callable<Long>() {
-                            public Long call() {
-                                try {
-                                    long start = System.currentTimeMillis();
-                                    Socket s = new 
Socket(getAttribute(Attributes.HOSTNAME), getThriftPort());
-                                    s.close();
-                                    long latency = System.currentTimeMillis() 
- start;
-                                    computeServiceUp();
-                                    return latency;
-                                } catch (Exception e) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Cassandra thrift port poll 
failure: "+e);
-                                    setAttribute(SERVICE_UP, false);
-                                    return null;
-                                }
-                            }
-                            public void computeServiceUp() {
-                                // this will wait an additional poll period 
after thrift port is up,
-                                // as the caller will not have set yet, but 
that will help ensure it is really healthy!
-                                setAttribute(SERVICE_UP,
-                                        
getAttribute(THRIFT_PORT_LATENCY)!=null && getAttribute(THRIFT_PORT_LATENCY)>=0 
&& 
-                                        
Boolean.TRUE.equals(getAttribute(SERVICE_UP_JMX)));
-                            }})
+                        .onException(Functions.constant(-1L))
+                        .callable(new 
ThriftLatencyChecker(CassandraNodeImpl.this))
                         .enabled(retrieveUsageMetrics))
                 .build();
         
         jmxMxBeanFeed = JavaAppUtils.connectMXBeanSensors(this);
+        
+        connectServiceUpIsRunning();
     }
     
     protected void connectEnrichers() {
@@ -575,12 +559,24 @@ public class CassandraNodeImpl extends 
SoftwareProcessImpl implements CassandraN
             addEnricher(new RollingTimeWindowMeanEnricher<Double>(this, 
WRITES_PER_SECOND_LAST, 
                     WRITES_PER_SECOND_IN_WINDOW, windowPeriod));
         }
+        
+        // service-up checks
+        
addEnricher(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS)
+                .from(THRIFT_PORT_LATENCY)
+                .computing(Functionals.ifEquals(-1L).value("Thrift latency 
polling failed") )
+                .build());
+        
+        
addEnricher(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS)
+                .from(SERVICE_UP_JMX)
+                .computing(Functionals.ifEquals(false).value("JMX reports not 
up") )
+                .build());
     }
     
     @Override
     public void disconnectSensors() {
         super.disconnectSensors();
-
+        
+        disconnectServiceUpIsRunning();
         if (jmxFeed != null) jmxFeed.stop();
         if (jmxMxBeanFeed != null) jmxMxBeanFeed.stop();
         if (jmxHelper != null) jmxHelper.terminate();
@@ -603,4 +599,30 @@ public class CassandraNodeImpl extends SoftwareProcessImpl 
implements CassandraN
         return getDriver().executeScriptAsync(commands).block().getStdout();
     }
     
+    private static class ThriftLatencyChecker implements Callable<Long> {
+        private final CassandraNode entity;
+        
+        public ThriftLatencyChecker(CassandraNode entity) {
+            this.entity = entity;
+        }
+        public Long call() {
+            Integer privatePort = entity.getThriftPort();
+            if (privatePort == null) return -1L;
+            
+            HostAndPort hp = 
BrooklynAccessUtils.getBrooklynAccessibleAddress(entity, privatePort);
+
+            try {
+                long start = System.currentTimeMillis();
+                Socket s = new Socket(hp.getHostText(), hp.getPort());
+                s.close();
+                long latency = System.currentTimeMillis() - start;
+                return latency;
+            } catch (Exception e) {
+                Exceptions.propagateIfFatal(e);
+                if (log.isDebugEnabled())
+                    log.debug("Cassandra thrift port poll failure: "+e);
+                return -1L;
+            }
+        }
+    }
 }

Reply via email to