http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java
 
b/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java
deleted file mode 100644
index 5c7b8fd..0000000
--- 
a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java
+++ /dev/null
@@ -1,594 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.nosql.cassandra;
-
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.math.BigInteger;
-import java.net.Socket;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.annotation.Nullable;
-import javax.management.ObjectName;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.enricher.RollingTimeWindowMeanEnricher;
-import brooklyn.enricher.TimeWeightedDeltaEnricher;
-import brooklyn.entity.Entity;
-import brooklyn.entity.basic.Attributes;
-import brooklyn.entity.basic.Entities;
-import brooklyn.entity.basic.SoftwareProcessImpl;
-import brooklyn.entity.effector.EffectorBody;
-import brooklyn.entity.java.JavaAppUtils;
-import brooklyn.event.AttributeSensor;
-import brooklyn.event.basic.DependentConfiguration;
-import brooklyn.event.basic.Sensors;
-import brooklyn.event.feed.function.FunctionFeed;
-import brooklyn.event.feed.function.FunctionPollConfig;
-import brooklyn.event.feed.jmx.JmxAttributePollConfig;
-import brooklyn.event.feed.jmx.JmxFeed;
-import brooklyn.event.feed.jmx.JmxHelper;
-import brooklyn.event.feed.jmx.JmxOperationPollConfig;
-import brooklyn.location.MachineLocation;
-import brooklyn.location.MachineProvisioningLocation;
-import brooklyn.location.basic.Machines;
-import brooklyn.location.cloud.CloudLocationConfig;
-import brooklyn.util.collections.MutableSet;
-import brooklyn.util.config.ConfigBag;
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.guava.Maybe;
-import brooklyn.util.text.Strings;
-import brooklyn.util.text.TemplateProcessor;
-import brooklyn.util.time.Duration;
-
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Joiner;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.base.Splitter;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * Implementation of {@link CassandraNode}.
- */
-public class CassandraNodeImpl extends SoftwareProcessImpl implements 
CassandraNode {
-
-    private static final Logger log = 
LoggerFactory.getLogger(CassandraNodeImpl.class);
-
-    private final AtomicReference<Boolean> detectedCloudSensors = new 
AtomicReference<Boolean>(false);
-    
-    public CassandraNodeImpl() {
-    }
-    
-    @Override
-    public void init() {
-        super.init();
-        
-        getMutableEntityType().addEffector(EXECUTE_SCRIPT, new 
EffectorBody<String>() {
-            @Override
-            public String call(ConfigBag parameters) {
-                return 
executeScript((String)parameters.getStringKey("commands"));
-            }
-        });
-        
-        Entities.checkRequiredUrl(this, getCassandraConfigTemplateUrl());
-        Entities.getRequiredUrlConfig(this, 
CASSANDRA_RACKDC_CONFIG_TEMPLATE_URL);
-        
-        connectEnrichers();
-    }
-    
-    /**
-     * Some clouds (e.g. Rackspace) give us VMs that have two nics: one for 
private and one for public.
-     * If the private IP is used then it doesn't work, even for a cluster 
purely internal to Rackspace!
-     * 
-     * TODO Ugly. Need to understand more and find a better fix. Perhaps in 
Cassandra itself if necessary.
-     * Also need to investigate further:
-     *  - does it still fail if BroadcastAddress is set to private IP?
-     *  - is `openIptables` opening it up for both interfaces?
-     *  - for aws->rackspace comms between nodes (thus using the public IP), 
will it be listening on an accessible port?
-     *  - ideally do a check, open a server on one port on the machine, see if 
it is contactable on the public address;
-     *    and set that as a flag on the cloud
-     */
-    protected void setCloudPreferredSensorNames() {
-        if (detectedCloudSensors.get()) return;
-        synchronized (detectedCloudSensors) {
-            if (detectedCloudSensors.get()) return;
-
-            MachineProvisioningLocation<?> loc = getProvisioningLocation();
-            if (loc != null) {
-                try {
-                    Method method = loc.getClass().getMethod("getProvider");
-                    method.setAccessible(true);
-                    String provider = (String) method.invoke(loc);
-                    String result = "(nothing special)";
-                    if (provider!=null) {
-                        if (provider.contains("rackspace") || 
provider.contains("cloudservers") || provider.contains("softlayer")) {
-                            /* These clouds have 2 NICs and it has to be 
consistent, so use public IP here to allow external access;
-                             * (TODO internal access could be configured to 
improve performance / lower cost, 
-                             * if we know all nodes are visible to each other) 
*/
-                            if (getConfig(LISTEN_ADDRESS_SENSOR)==null)
-                                setConfig(LISTEN_ADDRESS_SENSOR, 
CassandraNode.ADDRESS.getName());
-                            if (getConfig(BROADCAST_ADDRESS_SENSOR)==null)
-                                setConfig(BROADCAST_ADDRESS_SENSOR, 
CassandraNode.ADDRESS.getName());
-                            result = "public IP for both listen and broadcast";
-                        } else if (provider.contains("google-compute")) {
-                            /* Google nodes cannot reach themselves/each-other 
on the public IP,
-                             * and there is no hostname, so use private IP 
here */
-                            if (getConfig(LISTEN_ADDRESS_SENSOR)==null)
-                                setConfig(LISTEN_ADDRESS_SENSOR, 
CassandraNode.SUBNET_HOSTNAME.getName());
-                            if (getConfig(BROADCAST_ADDRESS_SENSOR)==null)
-                                setConfig(BROADCAST_ADDRESS_SENSOR, 
CassandraNode.SUBNET_HOSTNAME.getName());
-                            result = "private IP for both listen and 
broadcast";
-                        }
-                    }
-                    log.debug("Cassandra NICs inferred {} for {}; using 
location {}, based on provider {}", new Object[] {result, this, loc, provider});
-                } catch (Exception e) {
-                    log.debug("Cassandra NICs auto-detection failed for {} in 
location {}: {}", new Object[] {this, loc, e});
-                }
-            }
-            detectedCloudSensors.set(true);
-        }
-    }
-    
-    @Override
-    protected void preStart() {
-        super.preStart();
-        setCloudPreferredSensorNames();
-    }
-    
-    // Used for freemarker
-    public String getMajorMinorVersion() {
-        String version = getConfig(CassandraNode.SUGGESTED_VERSION);
-        if (Strings.isBlank(version)) return "";
-        List<String> versionParts = 
ImmutableList.copyOf(Splitter.on(".").split(version));
-        return versionParts.get(0) + (versionParts.size() > 1 ? 
"."+versionParts.get(1) : "");
-    }
-    
-    public String getCassandraConfigTemplateUrl() {
-        String templatedUrl = 
getConfig(CassandraNode.CASSANDRA_CONFIG_TEMPLATE_URL);
-        return TemplateProcessor.processTemplateContents(templatedUrl, this, 
ImmutableMap.<String, Object>of());
-    }
-
-    @Override public Integer getGossipPort() { return 
getAttribute(CassandraNode.GOSSIP_PORT); }
-    @Override public Integer getSslGossipPort() { return 
getAttribute(CassandraNode.SSL_GOSSIP_PORT); }
-    @Override public Integer getThriftPort() { return 
getAttribute(CassandraNode.THRIFT_PORT); }
-    @Override public Integer getNativeTransportPort() { return 
getAttribute(CassandraNode.NATIVE_TRANSPORT_PORT); }
-    @Override public String getClusterName() { return 
getAttribute(CassandraNode.CLUSTER_NAME); }
-    
-    @Override public int getNumTokensPerNode() {
-        return getConfig(CassandraNode.NUM_TOKENS_PER_NODE);
-    }
-
-    @Deprecated
-    @Override public BigInteger getToken() {
-        BigInteger token = getAttribute(CassandraNode.TOKEN);
-        if (token == null) {
-            token = getConfig(CassandraNode.TOKEN);
-        }
-        return token;
-    }
-    
-    @Override public Set<BigInteger> getTokens() {
-        // Prefer an already-set attribute over the config.
-        // Prefer TOKENS over TOKEN.
-        Set<BigInteger> tokens = getAttribute(CassandraNode.TOKENS);
-        if (tokens == null) {
-            BigInteger token = getAttribute(CassandraNode.TOKEN);
-            if (token != null) {
-                tokens = ImmutableSet.of(token);
-            }
-        }
-        if (tokens == null) {
-            tokens = getConfig(CassandraNode.TOKENS);
-        }
-        if (tokens == null) {
-            BigInteger token = getConfig(CassandraNode.TOKEN);
-            if (token != null) {
-                tokens = ImmutableSet.of(token);
-            }
-        }
-        return tokens;
-    }
-    
-    @Deprecated
-    @Override public String getTokenAsString() {
-        BigInteger token = getToken();
-        if (token==null) return "";
-        return ""+token;
-    }
-
-    @Override public String getTokensAsString() {
-        // TODO check what is required when replacing failed node.
-        // with vnodes in Cassandra 2.x, don't bother supplying token
-        Set<BigInteger> tokens = getTokens();
-        if (tokens == null) return "";
-        return Joiner.on(",").join(tokens);
-    }
-    
-    @Override public String getListenAddress() {
-        String sensorName = getConfig(LISTEN_ADDRESS_SENSOR);
-        if (Strings.isNonBlank(sensorName))
-            return Entities.submit(this, 
DependentConfiguration.attributeWhenReady(this, 
Sensors.newStringSensor(sensorName))).getUnchecked();
-        
-        String subnetAddress = getAttribute(CassandraNode.SUBNET_ADDRESS);
-        return Strings.isNonBlank(subnetAddress) ? subnetAddress : 
getAttribute(CassandraNode.ADDRESS);
-    }
-    @Override public String getBroadcastAddress() {
-        String sensorName = getConfig(BROADCAST_ADDRESS_SENSOR);
-        if (Strings.isNonBlank(sensorName))
-            return Entities.submit(this, 
DependentConfiguration.attributeWhenReady(this, 
Sensors.newStringSensor(sensorName))).getUnchecked();
-        
-        String snitchName = getConfig(CassandraNode.ENDPOINT_SNITCH_NAME);
-        if (snitchName.equals("Ec2MultiRegionSnitch") || 
snitchName.contains("MultiCloudSnitch")) {
-            // 
http://www.datastax.com/documentation/cassandra/2.0/mobile/cassandra/architecture/architectureSnitchEC2MultiRegion_c.html
-            // describes that the listen_address is set to the private IP, and 
the broadcast_address is set to the public IP.
-            return getAttribute(CassandraNode.ADDRESS);
-        } else if (!getDriver().isClustered()) {
-            return getListenAddress();
-        } else {
-            // In other situations, prefer the hostname, so other regions can 
see it
-            // *Unless* hostname resolves at the target to a local-only 
interface which is different to ADDRESS
-            // (workaround for issue deploying to localhost)
-            String hostname = getAttribute(CassandraNode.HOSTNAME);
-            try {
-                String resolvedAddress = 
getDriver().getResolvedAddress(hostname);
-                if (resolvedAddress==null) {
-                    log.debug("Cassandra using broadcast address 
"+getListenAddress()+" for "+this+" because hostname "+hostname+" could not be 
resolved at remote machine");
-                    return getListenAddress();
-                }
-                if (resolvedAddress.equals("127.0.0.1")) {
-                    log.debug("Cassandra using broadcast address 
"+getListenAddress()+" for "+this+" because hostname "+hostname+" resolves to 
127.0.0.1");
-                    return getListenAddress();                    
-                }
-                return hostname;
-            } catch (Exception e) {
-                Exceptions.propagateIfFatal(e);
-                log.warn("Error resolving hostname "+hostname+" for "+this+": 
"+e, e);
-                return hostname;
-            }
-        }
-    }
-    /** not always the private IP, if public IP has been insisted on for 
broadcast, e.g. setting up a rack topology */
-    // have not confirmed this does the right thing in all clouds ... only 
used for rack topology however
-    public String getPrivateIp() {
-        String sensorName = getConfig(BROADCAST_ADDRESS_SENSOR);
-        if (Strings.isNonBlank(sensorName)) {
-            return getAttribute(Sensors.newStringSensor(sensorName));
-        } else {
-            String subnetAddress = getAttribute(CassandraNode.SUBNET_ADDRESS);
-            return Strings.isNonBlank(subnetAddress) ? subnetAddress : 
getAttribute(CassandraNode.ADDRESS);
-        }
-    }
-    public String getPublicIp() {
-        // may need to be something else in google
-        return getAttribute(CassandraNode.ADDRESS);
-    }
-
-    @Override public String getRpcAddress() {
-        String sensorName = getConfig(RPC_ADDRESS_SENSOR);
-        if (Strings.isNonBlank(sensorName))
-            return Entities.submit(this, 
DependentConfiguration.attributeWhenReady(this, 
Sensors.newStringSensor(sensorName))).getUnchecked();
-        return "0.0.0.0";
-    }
-    
-    @Override public String getSeeds() { 
-        Set<Entity> seeds = getConfig(CassandraNode.INITIAL_SEEDS);
-        if (seeds==null) {
-            log.warn("No seeds available when requested for "+this, new 
Throwable("source of no Cassandra seeds when requested"));
-            return null;
-        }
-        String snitchName = getConfig(CassandraNode.ENDPOINT_SNITCH_NAME);
-        MutableSet<String> seedsHostnames = MutableSet.of();
-        for (Entity entity : seeds) {
-            // tried removing ourselves if there are other nodes, but that is 
a BAD idea!
-            // blows up with a "java.lang.RuntimeException: No other nodes 
seen!"
-            
-            if (snitchName.equals("Ec2MultiRegionSnitch") || 
snitchName.contains("MultiCloudSnitch")) {
-                // 
http://www.datastax.com/documentation/cassandra/2.0/mobile/cassandra/architecture/architectureSnitchEC2MultiRegion_c.html
-                // says the seeds should be public IPs.
-                seedsHostnames.add(entity.getAttribute(CassandraNode.ADDRESS));
-            } else {
-                String sensorName = getConfig(BROADCAST_ADDRESS_SENSOR);
-                if (Strings.isNonBlank(sensorName)) {
-                    
seedsHostnames.add(entity.getAttribute(Sensors.newStringSensor(sensorName)));
-                } else {
-                    Maybe<String> optionalSeedHostname = 
Machines.findSubnetOrPublicHostname(entity);
-                    if (optionalSeedHostname.isPresent()) {
-                        String seedHostname = optionalSeedHostname.get();
-                        seedsHostnames.add(seedHostname);
-                    } else {
-                        log.warn("In node {}, seed hostname missing for {}; 
not including in seeds list", this, entity);
-                    }
-                }
-            }
-        }
-        
-        String result = Strings.join(seedsHostnames, ",");
-        log.info("Seeds for {}: {}", this, result);
-        return result;
-    }
-
-    // referenced by cassandra-rackdc.properties, read by some of the 
cassandra snitches
-    public String getDatacenterName() {
-        String name = getAttribute(CassandraNode.DATACENTER_NAME);
-        if (name == null) {
-            MachineLocation machine = getMachineOrNull();
-            MachineProvisioningLocation<?> provisioningLocation = 
getProvisioningLocation();
-            if (machine != null) {
-                name = machine.getConfig(CloudLocationConfig.CLOUD_REGION_ID);
-            }
-            if (name == null && provisioningLocation != null) {
-                name = 
provisioningLocation.getConfig(CloudLocationConfig.CLOUD_REGION_ID);
-            }
-            if (name == null) {
-                name = "UNKNOWN_DATACENTER";
-            }
-            setAttribute((AttributeSensor<String>)DATACENTER_NAME, name);
-        }
-        return name;
-    }
-
-    public String getRackName() {
-        String name = getAttribute(CassandraNode.RACK_NAME);
-        if (name == null) {
-            MachineLocation machine = getMachineOrNull();
-            MachineProvisioningLocation<?> provisioningLocation = 
getProvisioningLocation();
-            if (machine != null) {
-                name = 
machine.getConfig(CloudLocationConfig.CLOUD_AVAILABILITY_ZONE_ID);
-            }
-            if (name == null && provisioningLocation != null) {
-                name = 
provisioningLocation.getConfig(CloudLocationConfig.CLOUD_AVAILABILITY_ZONE_ID);
-            }
-            if (name == null) {
-                name = "UNKNOWN_RACK";
-            }
-            setAttribute((AttributeSensor<String>)RACK_NAME, name);
-        }
-        return name;
-    }
-
-    @Override
-    public Class<? extends CassandraNodeDriver> getDriverInterface() {
-        return CassandraNodeDriver.class;
-    }
-    
-    @Override
-    public CassandraNodeDriver getDriver() {
-        return (CassandraNodeDriver) super.getDriver();
-    }
-
-    private volatile JmxFeed jmxFeed;
-    private volatile FunctionFeed functionFeed;
-    private JmxFeed jmxMxBeanFeed;
-    private JmxHelper jmxHelper;
-    private ObjectName storageServiceMBean = 
JmxHelper.createObjectName("org.apache.cassandra.db:type=StorageService");
-    private ObjectName readStageMBean = 
JmxHelper.createObjectName("org.apache.cassandra.request:type=ReadStage");
-    private ObjectName mutationStageMBean = 
JmxHelper.createObjectName("org.apache.cassandra.request:type=MutationStage");
-    private ObjectName snitchMBean = 
JmxHelper.createObjectName("org.apache.cassandra.db:type=EndpointSnitchInfo");
-
-    
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    @Override
-    protected void connectSensors() {
-        // "cassandra" isn't really a protocol, but okay for now
-        setAttribute(DATASTORE_URL, 
"cassandra://"+getAttribute(HOSTNAME)+":"+getAttribute(THRIFT_PORT));
-        
-        super.connectSensors();
-
-        jmxHelper = new JmxHelper(this);
-        jmxFeed = JmxFeed.builder()
-                .entity(this)
-                .period(3000, TimeUnit.MILLISECONDS)
-                .helper(jmxHelper)
-                .pollAttribute(new 
JmxAttributePollConfig<Boolean>(SERVICE_UP_JMX)
-                        .objectName(storageServiceMBean)
-                        .attributeName("Initialized")
-                        
.onSuccess(Functions.forPredicate(Predicates.notNull()))
-                        .onException(Functions.constant(false)))
-                .pollAttribute(new 
JmxAttributePollConfig<Set<BigInteger>>(TOKENS)
-                        .objectName(storageServiceMBean)
-                        .attributeName("TokenToEndpointMap")
-                        .onSuccess(new Function<Object, Set<BigInteger>>() {
-                            @Override
-                            public Set<BigInteger> apply(@Nullable Object arg) 
{
-                                Map input = (Map)arg;
-                                if (input == null || input.isEmpty()) return 
null;
-                                // FIXME does not work on aws-ec2, uses 
RFC1918 address
-                                Predicate<String> self = 
Predicates.in(ImmutableList.of(getAttribute(HOSTNAME), getAttribute(ADDRESS), 
getAttribute(SUBNET_ADDRESS), getAttribute(SUBNET_HOSTNAME)));
-                                Set<String> tokens = Maps.filterValues(input, 
self).keySet();
-                                Set<BigInteger> result = 
Sets.newLinkedHashSet();
-                                for (String token : tokens) {
-                                    result.add(new BigInteger(token));
-                                }
-                                return result;
-                            }})
-                        
.onException(Functions.<Set<BigInteger>>constant(null)))
-                .pollAttribute(new JmxAttributePollConfig<BigInteger>(TOKEN)
-                        .objectName(storageServiceMBean)
-                        .attributeName("TokenToEndpointMap")
-                        .onSuccess(new Function<Object, BigInteger>() {
-                            @Override
-                            public BigInteger apply(@Nullable Object arg) {
-                                Map input = (Map)arg;
-                                // TODO remove duplication from setting TOKENS
-                                if (input == null || input.isEmpty()) return 
null;
-                                // FIXME does not work on aws-ec2, uses 
RFC1918 address
-                                Predicate<String> self = 
Predicates.in(ImmutableList.of(getAttribute(HOSTNAME), getAttribute(ADDRESS), 
getAttribute(SUBNET_ADDRESS), getAttribute(SUBNET_HOSTNAME)));
-                                Set<String> tokens = Maps.filterValues(input, 
self).keySet();
-                                String token = Iterables.getFirst(tokens, 
null);
-                                return (token != null) ? new BigInteger(token) 
: null;
-                            }})
-                        .onException(Functions.<BigInteger>constant(null)))
-                .pollOperation(new 
JmxOperationPollConfig<String>(DATACENTER_NAME)
-                        .period(60, TimeUnit.SECONDS)
-                        .objectName(snitchMBean)
-                        .operationName("getDatacenter")
-                        
.operationParams(ImmutableList.of(getBroadcastAddress()))
-                        .onException(Functions.<String>constant(null)))
-                .pollOperation(new JmxOperationPollConfig<String>(RACK_NAME)
-                        .period(60, TimeUnit.SECONDS)
-                        .objectName(snitchMBean)
-                        .operationName("getRack")
-                        
.operationParams(ImmutableList.of(getBroadcastAddress()))
-                        .onException(Functions.<String>constant(null)))
-                .pollAttribute(new JmxAttributePollConfig<Integer>(PEERS)
-                        .objectName(storageServiceMBean)
-                        .attributeName("TokenToEndpointMap")
-                        .onSuccess(new Function<Object, Integer>() {
-                            @Override
-                            public Integer apply(@Nullable Object arg) {
-                                Map input = (Map)arg;
-                                if (input == null || input.isEmpty()) return 0;
-                                return input.size();
-                            }
-                        })
-                        .onException(Functions.constant(-1)))
-                .pollAttribute(new 
JmxAttributePollConfig<Integer>(LIVE_NODE_COUNT)
-                        .objectName(storageServiceMBean)
-                        .attributeName("LiveNodes")
-                        .onSuccess(new Function<Object, Integer>() {
-                            @Override
-                            public Integer apply(@Nullable Object arg) {
-                                List input = (List)arg;
-                                if (input == null || input.isEmpty()) return 0;
-                                return input.size();
-                            }
-                        })
-                        .onException(Functions.constant(-1)))
-                .pollAttribute(new JmxAttributePollConfig<Integer>(READ_ACTIVE)
-                        .objectName(readStageMBean)
-                        .attributeName("ActiveCount")
-                        .onException(Functions.constant((Integer)null)))
-                .pollAttribute(new JmxAttributePollConfig<Long>(READ_PENDING)
-                        .objectName(readStageMBean)
-                        .attributeName("PendingTasks")
-                        .onException(Functions.constant((Long)null)))
-                .pollAttribute(new JmxAttributePollConfig<Long>(READ_COMPLETED)
-                        .objectName(readStageMBean)
-                        .attributeName("CompletedTasks")
-                        .onException(Functions.constant((Long)null)))
-                .pollAttribute(new 
JmxAttributePollConfig<Integer>(WRITE_ACTIVE)
-                        .objectName(mutationStageMBean)
-                        .attributeName("ActiveCount")
-                        .onException(Functions.constant((Integer)null)))
-                .pollAttribute(new JmxAttributePollConfig<Long>(WRITE_PENDING)
-                        .objectName(mutationStageMBean)
-                        .attributeName("PendingTasks")
-                        .onException(Functions.constant((Long)null)))
-                .pollAttribute(new 
JmxAttributePollConfig<Long>(WRITE_COMPLETED)
-                        .objectName(mutationStageMBean)
-                        .attributeName("CompletedTasks")
-                        .onException(Functions.constant((Long)null)))
-                .build();
-        
-        functionFeed = FunctionFeed.builder()
-                .entity(this)
-                .period(3000, TimeUnit.MILLISECONDS)
-                .poll(new FunctionPollConfig<Long, Long>(THRIFT_PORT_LATENCY)
-                        .onException(Functions.constant((Long)null))
-                        .callable(new Callable<Long>() {
-                            public Long call() {
-                                try {
-                                    long start = System.currentTimeMillis();
-                                    Socket s = new 
Socket(getAttribute(Attributes.HOSTNAME), getThriftPort());
-                                    s.close();
-                                    long latency = System.currentTimeMillis() 
- start;
-                                    computeServiceUp();
-                                    return latency;
-                                } catch (Exception e) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Cassandra thrift port poll 
failure: "+e);
-                                    setAttribute(SERVICE_UP, false);
-                                    return null;
-                                }
-                            }
-                            public void computeServiceUp() {
-                                // this will wait an additional poll period 
after thrift port is up,
-                                // as the caller will not have set yet, but 
that will help ensure it is really healthy!
-                                setAttribute(SERVICE_UP,
-                                        
getAttribute(THRIFT_PORT_LATENCY)!=null && getAttribute(THRIFT_PORT_LATENCY)>=0 
&& 
-                                        
Boolean.TRUE.equals(getAttribute(SERVICE_UP_JMX)));
-                            }
-                        }))
-                .build();
-        
-        jmxMxBeanFeed = JavaAppUtils.connectMXBeanSensors(this);
-    }
-    
-    protected void connectEnrichers() {
-        connectEnrichers(Duration.TEN_SECONDS);
-    }
-    
-    protected void connectEnrichers(Duration windowPeriod) {
-        JavaAppUtils.connectJavaAppServerPolicies(this);
-
-        
addEnricher(TimeWeightedDeltaEnricher.<Long>getPerSecondDeltaEnricher(this, 
READ_COMPLETED, READS_PER_SECOND_LAST));
-        
addEnricher(TimeWeightedDeltaEnricher.<Long>getPerSecondDeltaEnricher(this, 
WRITE_COMPLETED, WRITES_PER_SECOND_LAST));
-        
-        if (windowPeriod!=null) {
-            addEnricher(new RollingTimeWindowMeanEnricher<Long>(this, 
THRIFT_PORT_LATENCY, 
-                    THRIFT_PORT_LATENCY_IN_WINDOW, windowPeriod));
-            addEnricher(new RollingTimeWindowMeanEnricher<Double>(this, 
READS_PER_SECOND_LAST, 
-                    READS_PER_SECOND_IN_WINDOW, windowPeriod));
-            addEnricher(new RollingTimeWindowMeanEnricher<Double>(this, 
WRITES_PER_SECOND_LAST, 
-                    WRITES_PER_SECOND_IN_WINDOW, windowPeriod));
-        }
-    }
-    
-    @Override
-    public void disconnectSensors() {
-        super.disconnectSensors();
-
-        if (jmxFeed != null) jmxFeed.stop();
-        if (jmxMxBeanFeed != null) jmxMxBeanFeed.stop();
-        if (jmxHelper != null) jmxHelper.terminate();
-        if (functionFeed != null) functionFeed.stop();
-    }
-
-    @Override
-    public void setToken(String token) {
-        try {
-            if (!jmxHelper.isConnected()) jmxHelper.connect();;
-            jmxHelper.operation(storageServiceMBean, "move", token);
-            log.info("Moved server {} to token {}", getId(), token);
-        } catch (IOException ioe) {
-            Throwables.propagate(ioe);
-        }
-    }
-    
-    @Override
-    public String executeScript(String commands) {
-        return getDriver().executeScriptAsync(commands).block().getStdout();
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraNodeSshDriver.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraNodeSshDriver.java
 
b/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraNodeSshDriver.java
deleted file mode 100644
index 44651ba..0000000
--- 
a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraNodeSshDriver.java
+++ /dev/null
@@ -1,420 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.nosql.cassandra;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-import brooklyn.entity.Entity;
-import brooklyn.entity.basic.Attributes;
-import brooklyn.entity.basic.Entities;
-import brooklyn.entity.basic.EntityLocal;
-import brooklyn.entity.database.DatastoreMixins;
-import brooklyn.entity.java.JavaSoftwareProcessSshDriver;
-import brooklyn.entity.java.UsesJmx;
-import brooklyn.entity.software.SshEffectorTasks;
-import brooklyn.event.basic.DependentConfiguration;
-import brooklyn.location.Location;
-import brooklyn.location.access.BrooklynAccessUtils;
-import brooklyn.location.basic.Machines;
-import brooklyn.location.basic.SshMachineLocation;
-import brooklyn.management.TaskWrapper;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.collections.MutableSet;
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.guava.Maybe;
-import brooklyn.util.net.Networking;
-import brooklyn.util.os.Os;
-import brooklyn.util.ssh.BashCommands;
-import brooklyn.util.stream.Streams;
-import brooklyn.util.task.DynamicTasks;
-import brooklyn.util.task.Tasks;
-import brooklyn.util.task.system.ProcessTaskWrapper;
-import brooklyn.util.text.Identifiers;
-import brooklyn.util.text.Strings;
-import brooklyn.util.text.TemplateProcessor;
-import brooklyn.util.time.Duration;
-import brooklyn.util.time.Time;
-
-/**
- * Start a {@link CassandraNode} in a {@link Location} accessible over ssh.
- */
-public class CassandraNodeSshDriver extends JavaSoftwareProcessSshDriver 
implements CassandraNodeDriver {
-
-    private static final Logger log = 
LoggerFactory.getLogger(CassandraNodeSshDriver.class);
-
-    protected Maybe<String> resolvedAddressCache = Maybe.absent();
-
-    public CassandraNodeSshDriver(CassandraNodeImpl entity, SshMachineLocation 
machine) {
-        super(entity, machine);
-    }
-
-    @Override
-    protected String getLogFileLocation() { return 
Os.mergePathsUnix(getRunDir(),"cassandra.log"); }
-
-    @Override
-    public Integer getGossipPort() { return 
entity.getAttribute(CassandraNode.GOSSIP_PORT); }
-
-    @Override
-    public Integer getSslGossipPort() { return 
entity.getAttribute(CassandraNode.SSL_GOSSIP_PORT); }
-
-    @Override
-    public Integer getThriftPort() { return 
entity.getAttribute(CassandraNode.THRIFT_PORT); }
-
-    @Override
-    public Integer getNativeTransportPort() { return 
entity.getAttribute(CassandraNode.NATIVE_TRANSPORT_PORT); }
-
-    @Override
-    public String getClusterName() { return 
entity.getAttribute(CassandraNode.CLUSTER_NAME); }
-
-    @Override
-    public String getCassandraConfigTemplateUrl() {
-        String templatedUrl = 
entity.getConfig(CassandraNode.CASSANDRA_CONFIG_TEMPLATE_URL);
-        return TemplateProcessor.processTemplateContents(templatedUrl, this, 
ImmutableMap.<String, Object>of());
-    }
-
-    @Override
-    public String getCassandraConfigFileName() { return 
entity.getConfig(CassandraNode.CASSANDRA_CONFIG_FILE_NAME); }
-
-    public String getEndpointSnitchName() { return 
entity.getConfig(CassandraNode.ENDPOINT_SNITCH_NAME); }
-
-    public String getCassandraRackdcConfigTemplateUrl() { return 
entity.getConfig(CassandraNode.CASSANDRA_RACKDC_CONFIG_TEMPLATE_URL); }
-
-    public String getCassandraRackdcConfigFileName() { return 
entity.getConfig(CassandraNode.CASSANDRA_RACKDC_CONFIG_FILE_NAME); }
-
-    public String getMirrorUrl() { return 
entity.getConfig(CassandraNode.MIRROR_URL); }
-
-    protected String getDefaultUnpackedDirectoryName() {
-        return "apache-cassandra-"+getVersion();
-    }
-
-    protected boolean isV2() {
-        String version = getVersion();
-        return version.startsWith("2.");
-    }
-
-    @Override
-    public boolean installJava() {
-        if (isV2()) {
-            return checkForAndInstallJava("1.8");
-        } else {
-            return super.installJava();
-        }
-    }
-
-    @Override
-    public void preInstall() {
-        resolver = Entities.newDownloader(this);
-        setExpandedInstallDir(Os.mergePaths(getInstallDir(), 
resolver.getUnpackedDirectoryName(getDefaultUnpackedDirectoryName())));
-    }
-
-    @Override
-    public void install() {
-        List<String> urls = resolver.getTargets();
-        String saveAs = resolver.getFilename();
-
-        List<String> commands = ImmutableList.<String>builder()
-                .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs))
-                .add(BashCommands.INSTALL_TAR)
-                .add("tar xzfv " + saveAs)
-                .build();
-
-        newScript(INSTALLING)
-                .body.append(commands)
-                .execute();
-    }
-
-    @Override
-    public Set<Integer> getPortsUsed() {
-        return ImmutableSet.<Integer>builder()
-                .addAll(super.getPortsUsed())
-                .addAll(getPortMap().values())
-                .build();
-    }
-
-    protected Map<String, Integer> getPortMap() {
-        return ImmutableMap.<String, Integer>builder()
-                .put("jmxPort", entity.getAttribute(UsesJmx.JMX_PORT))
-                .put("rmiPort", entity.getAttribute(UsesJmx.RMI_REGISTRY_PORT))
-                .put("gossipPort", getGossipPort())
-                .put("sslGossipPort", getSslGossipPort())
-                .put("thriftPort", getThriftPort())
-                .build();
-    }
-
-    @Override
-    public void customize() {
-        log.debug("Customizing {} (Cluster {})", entity, getClusterName());
-        Networking.checkPortsValid(getPortMap());
-
-        customizeInitialSeeds();
-
-        String logFileEscaped = getLogFileLocation().replace("/", "\\/"); // 
escape slashes
-
-        ImmutableList.Builder<String> commands = new 
ImmutableList.Builder<String>()
-                .add(String.format("cp -R 
%s/{bin,conf,lib,interface,pylib,tools} .", getExpandedInstallDir()))
-                .add("mkdir -p data")
-                .add("mkdir -p brooklyn_commands")
-                .add(String.format("sed -i.bk 
's/log4j.appender.R.File=.*/log4j.appender.R.File=%s/g' 
%s/conf/log4j-server.properties", logFileEscaped, getRunDir()))
-                .add(String.format("sed -i.bk '/JMX_PORT/d' 
%s/conf/cassandra-env.sh", getRunDir()))
-                // Script sets 180k on Linux which gives Java error:  The 
stack size specified is too small, Specify at least 228k
-                .add(String.format("sed -i.bk 's/-Xss180k/-Xss280k/g' 
%s/conf/cassandra-env.sh", getRunDir()));
-
-        newScript(CUSTOMIZING)
-                .body.append(commands.build())
-                .failOnNonZeroResultCode()
-                .execute();
-
-        // Copy the cassandra.yaml configuration file across
-        String destinationConfigFile = Os.mergePathsUnix(getRunDir(), "conf", 
getCassandraConfigFileName());
-        copyTemplate(getCassandraConfigTemplateUrl(), destinationConfigFile);
-
-        // Copy the cassandra-rackdc.properties configuration file across
-        String rackdcDestinationFile = Os.mergePathsUnix(getRunDir(), "conf", 
getCassandraRackdcConfigFileName());
-        copyTemplate(getCassandraRackdcConfigTemplateUrl(), 
rackdcDestinationFile);
-
-        customizeCopySnitch();
-    }
-
-    protected void customizeCopySnitch() {
-        // Copy the custom snitch jar file across
-        String customSnitchJarUrl = 
entity.getConfig(CassandraNode.CUSTOM_SNITCH_JAR_URL);
-        if (Strings.isNonBlank(customSnitchJarUrl)) {
-            int lastSlashIndex = customSnitchJarUrl.lastIndexOf("/");
-            String customSnitchJarName = (lastSlashIndex > 0) ? 
customSnitchJarUrl.substring(lastSlashIndex+1) : "customBrooklynSnitch.jar";
-            String jarDestinationFile = Os.mergePathsUnix(getRunDir(), "lib", 
customSnitchJarName);
-            InputStream customSnitchJarStream = 
checkNotNull(resource.getResourceFromUrl(customSnitchJarUrl), "%s could not be 
loaded", customSnitchJarUrl);
-            try {
-                getMachine().copyTo(customSnitchJarStream, jarDestinationFile);
-            } finally {
-                Streams.closeQuietly(customSnitchJarStream);
-            }
-        }
-    }
-
-    protected void customizeInitialSeeds() {
-        if (entity.getConfig(CassandraNode.INITIAL_SEEDS)==null) {
-            if (isClustered()) {
-                entity.setConfig(CassandraNode.INITIAL_SEEDS,
-                    
DependentConfiguration.attributeWhenReady(entity.getParent(), 
CassandraDatacenter.CURRENT_SEEDS));
-            } else {
-                entity.setConfig(CassandraNode.INITIAL_SEEDS, 
MutableSet.<Entity>of(entity));
-            }
-        }
-    }
-
-    @Override
-    public boolean isClustered() {
-        return entity.getParent() instanceof CassandraDatacenter;
-    }
-
-    @Override
-    public void launch() {
-        String subnetHostname = 
Machines.findSubnetOrPublicHostname(entity).get();
-        Set<Entity> seeds = getEntity().getConfig(CassandraNode.INITIAL_SEEDS);
-        List<Entity> ancestors = getCassandraAncestors();
-        log.info("Launching " + entity + ": " +
-                "cluster "+getClusterName()+", " +
-                "hostname (public) " + 
getEntity().getAttribute(Attributes.HOSTNAME) + ", " +
-                "hostname (subnet) " + subnetHostname + ", " +
-                "seeds "+((CassandraNode)entity).getSeeds()+" (from 
"+seeds+")");
-
-        boolean isFirst = seeds.iterator().next().equals(entity);
-        if (isClustered() && !isFirst && CassandraDatacenter.WAIT_FOR_FIRST) {
-            // wait for the first node
-            long firstStartTime = Entities.submit(entity, 
DependentConfiguration.attributeWhenReady(
-                ancestors.get(ancestors.size()-1), 
CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC)).getUnchecked();
-            // optionally force a delay before starting subsequent nodes; see 
comment at CassandraCluster.DELAY_AFTER_FIRST
-            Duration toWait = Duration.millis(firstStartTime + 
CassandraDatacenter.DELAY_AFTER_FIRST.toMilliseconds() -  
System.currentTimeMillis());
-            if (toWait.toMilliseconds()>0) {
-                log.info("Launching " + entity + ": delaying launch of 
non-first node by "+toWait+" to prevent schema disagreements");
-                Tasks.setBlockingDetails("Pausing to ensure first node has 
time to start");
-                Time.sleep(toWait);
-                Tasks.resetBlockingDetails();
-            }
-        }
-
-        List<Entity> queuedStart = null;
-        if (CassandraDatacenter.DELAY_BETWEEN_STARTS!=null && 
!ancestors.isEmpty()) {
-            Entity root = ancestors.get(ancestors.size()-1);
-            // TODO currently use the class as a semaphore; messy, and 
obviously will not federate;
-            // should develop a brooklyn framework semaphore (similar to that 
done on SshMachineLocation)
-            // and use it - note however the synch block is very very short so 
relatively safe at least
-            synchronized (CassandraNode.class) {
-                queuedStart = 
root.getAttribute(CassandraDatacenter.QUEUED_START_NODES);
-                if (queuedStart==null) {
-                    queuedStart = new ArrayList<Entity>();
-                    
((EntityLocal)root).setAttribute(CassandraDatacenter.QUEUED_START_NODES, 
queuedStart);
-                }
-                queuedStart.add(getEntity());
-                
((EntityLocal)root).setAttribute(CassandraDatacenter.QUEUED_START_NODES, 
queuedStart);
-            }
-            do {
-                // get it again in case it is backed by something external
-                queuedStart = 
root.getAttribute(CassandraDatacenter.QUEUED_START_NODES);
-                if (queuedStart.get(0).equals(getEntity())) break;
-                synchronized (queuedStart) {
-                    try {
-                        queuedStart.wait(1000);
-                    } catch (InterruptedException e) {
-                        Exceptions.propagate(e);
-                    }
-                }
-            } while (true);
-
-            // TODO should look at last start time... but instead we always 
wait
-            
CassandraDatacenter.DELAY_BETWEEN_STARTS.countdownTimer().waitForExpiryUnchecked();
-        }
-
-        try {
-            // Relies on `bin/cassandra -p <pidfile>`, rather than us writing 
pid file ourselves.
-            newScript(MutableMap.of(USE_PID_FILE, false), LAUNCHING)
-                    .body.append(
-                            // log the date to attempt to debug occasional 
http://wiki.apache.org/cassandra/FAQ#schema_disagreement
-                            // (can be caused by machines out of synch 
time-wise; but in our case it seems to be caused by other things!)
-                            "echo date on cassandra server `hostname` when 
launching is `date`",
-                            launchEssentialCommand(),
-                            "echo after essential command")
-                    .execute();
-            if (!isClustered()) {
-                InputStream creationScript = 
DatastoreMixins.getDatabaseCreationScript(entity);
-                if (creationScript!=null) {
-                    Tasks.setBlockingDetails("Pausing to ensure Cassandra 
(singleton) has started before running creation script");
-                    Time.sleep(Duration.seconds(20));
-                    Tasks.resetBlockingDetails();
-                    
executeScriptAsync(Streams.readFullyString(creationScript));
-                }
-            }
-            if (isClustered() && isFirst) {
-                for (Entity ancestor: getCassandraAncestors()) {
-                    
((EntityLocal)ancestor).setAttribute(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC,
 System.currentTimeMillis());
-                }
-            }
-        } finally {
-            if (queuedStart!=null) {
-                Entity head = queuedStart.remove(0);
-                checkArgument(head.equals(getEntity()), "first queued node was 
"+head+" but we are "+getEntity());
-                synchronized (queuedStart) {
-                    queuedStart.notifyAll();
-                }
-            }
-        }
-    }
-
-    /** returns cassandra-related ancestors (datacenter, fabric), with 
datacenter first and fabric last */
-    protected List<Entity> getCassandraAncestors() {
-        List<Entity> result = new ArrayList<Entity>();
-        Entity ancestor = getEntity().getParent();
-        while (ancestor!=null) {
-            if (ancestor instanceof CassandraDatacenter || ancestor instanceof 
CassandraFabric)
-                result.add(ancestor);
-            ancestor = ancestor.getParent();
-        }
-        return result;
-    }
-
-    protected String launchEssentialCommand() {
-        if (isV2()) {
-            return String.format("./bin/cassandra -p %s > 
./cassandra-console.log 2>&1", getPidFile());
-        } else {
-            // TODO Could probably get rid of the nohup here, as script does 
equivalent itself
-            // with `exec ... <&- &`
-            return String.format("nohup ./bin/cassandra -p %s > 
./cassandra-console.log 2>&1 &", getPidFile());
-        }
-    }
-
-    public String getPidFile() { return Os.mergePathsUnix(getRunDir(), 
"cassandra.pid"); }
-
-    @Override
-    public boolean isRunning() {
-        return newScript(MutableMap.of(USE_PID_FILE, getPidFile()), 
CHECK_RUNNING).execute() == 0;
-    }
-
-    @Override
-    public void stop() {
-        newScript(MutableMap.of(USE_PID_FILE, getPidFile()), 
STOPPING).execute();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    protected Map<String,String> getCustomJavaSystemProperties() {
-        return MutableMap.<String, String>builder()
-                .putAll(super.getCustomJavaSystemProperties())
-                .put("cassandra.config", getCassandraConfigFileName())
-                .build();
-    }
-
-    @Override
-    public Map<String, String> getShellEnvironment() {
-        return MutableMap.<String, String>builder()
-                .putAll(super.getShellEnvironment())
-                .put("CASSANDRA_HOME", getRunDir())
-                .put("CASSANDRA_CONF", Os.mergePathsUnix(getRunDir(), "conf"))
-                .renameKey("JAVA_OPTS", "JVM_OPTS")
-                .build();
-    }
-
-    @Override
-    public ProcessTaskWrapper<Integer> executeScriptAsync(String commands) {
-        String fileToRun = Os.mergePathsUnix("brooklyn_commands", 
"cassandra-commands-"+Identifiers.makeRandomId(8));
-        TaskWrapper<Void> task = 
SshEffectorTasks.put(Os.mergePathsUnix(getRunDir(), fileToRun))
-                .machine(getMachine())
-                .contents(commands)
-                .summary("copying cassandra script to execute "+fileToRun)
-                .newTask();
-        
DynamicTasks.queueIfPossible(task).orSubmitAndBlock(getEntity()).andWaitForSuccess();
-        return executeScriptFromInstalledFileAsync(fileToRun);
-    }
-
-    public ProcessTaskWrapper<Integer> 
executeScriptFromInstalledFileAsync(String fileToRun) {
-        ProcessTaskWrapper<Integer> task = SshEffectorTasks.ssh(
-                        "cd "+getRunDir(),
-                        scriptInvocationCommand(getThriftPort(), fileToRun))
-                .machine(getMachine())
-                .summary("executing cassandra script "+fileToRun)
-                .newTask();
-        DynamicTasks.queueIfPossible(task).orSubmitAndBlock(getEntity());
-        return task;
-    }
-
-    protected String scriptInvocationCommand(Integer optionalThriftPort, 
String fileToRun) {
-        return "bin/cassandra-cli " +
-                (optionalThriftPort != null ? "--port " + optionalThriftPort : 
"") +
-                " --file "+fileToRun;
-    }
-
-    @Override
-    public String getResolvedAddress(String hostname) {
-        return 
resolvedAddressCache.or(BrooklynAccessUtils.resolvedAddressSupplier(getEntity(),
 getMachine(), hostname));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/TokenGenerator.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/TokenGenerator.java
 
b/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/TokenGenerator.java
deleted file mode 100644
index 8503ad7..0000000
--- 
a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/TokenGenerator.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.nosql.cassandra;
-
-import java.math.BigInteger;
-import java.util.Set;
-
-public interface TokenGenerator {
-
-    BigInteger max();
-    BigInteger min();
-    BigInteger range();
-
-    void setOrigin(BigInteger shift);
-    
-    BigInteger newToken();
-    
-    BigInteger getTokenForReplacementNode(BigInteger oldToken);
-    
-    Set<BigInteger> getTokensForReplacementNode(Set<BigInteger> oldTokens);
-    
-    /**
-     * Indicates that we are starting a new cluster of the given number of 
nodes,
-     * so expect that number of consecutive calls to {@link #newToken()}.
-     * 
-     * @param numNewNodes
-     */
-    void growingCluster(int numNewNodes);
-
-    void shrinkingCluster(Set<BigInteger> nodesToRemove);
-    
-    void refresh(Set<BigInteger> currentNodes);
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/TokenGenerators.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/TokenGenerators.java
 
b/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/TokenGenerators.java
deleted file mode 100644
index 70dd0f6..0000000
--- 
a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/TokenGenerators.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.nosql.cassandra;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.Serializable;
-import java.math.BigInteger;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import brooklyn.util.collections.MutableList;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-public class TokenGenerators {
-
-    /**
-     * Sub-classes are recommended to call {@link #checkRangeValid()} at 
construction time.
-     */
-    public static abstract class AbstractTokenGenerator implements 
TokenGenerator, Serializable {
-        
-        private static final long serialVersionUID = -1884526356161711176L;
-        
-        public static final BigInteger TWO = BigInteger.valueOf(2);
-        
-        public abstract BigInteger max();
-        public abstract BigInteger min();
-        public abstract BigInteger range();
-
-        private final Set<BigInteger> currentTokens = Sets.newTreeSet();
-        private final List<BigInteger> nextTokens = Lists.newArrayList();
-        private BigInteger origin = BigInteger.ZERO;
-        
-        protected void checkRangeValid() {
-            
Preconditions.checkState(range().equals(max().subtract(min()).add(BigInteger.ONE)),
 
-                    "min=%s; max=%s; range=%s", min(), max(), range());
-        }
-        
-        @Override
-        public void setOrigin(BigInteger shift) {
-            this.origin = Preconditions.checkNotNull(shift, "shift");
-        }
-        
-        /**
-         * Unless we're explicitly starting a new cluster or resizing by a 
pre-defined number of nodes, then
-         * let Cassandra decide (i.e. return null).
-         */
-        @Override
-        public synchronized BigInteger newToken() {
-            BigInteger result = (nextTokens.isEmpty()) ? null : 
nextTokens.remove(0);
-            if (result != null) currentTokens.add(result);
-            return result;
-        }
-
-        @Override
-        public synchronized BigInteger getTokenForReplacementNode(BigInteger 
oldToken) {
-            checkNotNull(oldToken, "oldToken");
-            return normalize(oldToken.subtract(BigInteger.ONE));
-        }
-
-        @Override
-        public synchronized Set<BigInteger> 
getTokensForReplacementNode(Set<BigInteger> oldTokens) {
-            checkNotNull(oldTokens, "oldToken");
-            Set<BigInteger> result = Sets.newLinkedHashSet();
-            for (BigInteger oldToken : oldTokens) {
-                result.add(getTokenForReplacementNode(oldToken));
-            }
-            return result;
-        }
-        
-        @Override
-        public synchronized void growingCluster(int numNewNodes) {
-            if (currentTokens.isEmpty() && nextTokens.isEmpty()) {
-                nextTokens.addAll(generateEquidistantTokens(numNewNodes));
-            } else {
-                // simple strategy which iteratively finds best midpoint
-                for (int i=0; i<numNewNodes; i++) {
-                    nextTokens.add(generateBestNextToken());
-                }
-            }
-        }
-
-        @Override
-        public synchronized void shrinkingCluster(Set<BigInteger> 
nodesToRemove) {
-            currentTokens.remove(nodesToRemove);
-        }
-
-        @Override
-        public synchronized void refresh(Set<BigInteger> currentNodes) {
-            currentTokens.clear();
-            currentTokens.addAll(currentNodes);
-        }
-
-        private List<BigInteger> generateEquidistantTokens(int numTokens) {
-            List<BigInteger> result = Lists.newArrayList();
-            for (int i = 0; i < numTokens; i++) {
-                BigInteger token = 
range().multiply(BigInteger.valueOf(i)).divide(BigInteger.valueOf(numTokens)).add(min());
-                token = normalize(token.add(origin));
-                result.add(token);
-            }
-            return result;
-        }
-        
-        private BigInteger normalize(BigInteger input) {
-            while (input.compareTo(min()) < 0)
-                input = input.add(range());
-            while (input.compareTo(max()) > 0)
-                input = input.subtract(range());
-            return input;
-        }
-        
-        private BigInteger generateBestNextToken() {
-            List<BigInteger> allTokens = 
MutableList.<BigInteger>of().appendAll(currentTokens).appendAll(nextTokens);
-            Collections.sort(allTokens);
-            Iterator<BigInteger> ti = allTokens.iterator();
-            
-            BigInteger thisValue = ti.next();
-            BigInteger prevValue = 
allTokens.get(allTokens.size()-1).subtract(range());
-            
-            BigInteger bestNewTokenSoFar = 
normalize(prevValue.add(thisValue).divide(TWO));
-            BigInteger biggestRangeSizeSoFar = thisValue.subtract(prevValue);
-            
-            while (ti.hasNext()) {
-                prevValue = thisValue;
-                thisValue = ti.next();
-                
-                BigInteger rangeHere = thisValue.subtract(prevValue);
-                if (rangeHere.compareTo(biggestRangeSizeSoFar) > 0) {
-                    bestNewTokenSoFar = prevValue.add(thisValue).divide(TWO);
-                    biggestRangeSizeSoFar = rangeHere;
-                }
-            }
-            return bestNewTokenSoFar;
-        }
-
-    }
-
-    public static class PosNeg63TokenGenerator extends AbstractTokenGenerator {
-        private static final long serialVersionUID = 7327403957176106754L;
-        
-        public static final BigInteger MIN_TOKEN = TWO.pow(63).negate();
-        public static final BigInteger MAX_TOKEN = 
TWO.pow(63).subtract(BigInteger.ONE);
-        public static final BigInteger RANGE = TWO.pow(64);
-
-        public PosNeg63TokenGenerator() {
-            checkRangeValid();
-        }
-
-        @Override public BigInteger max() { return MAX_TOKEN; }
-        @Override public BigInteger min() { return MIN_TOKEN; }
-        @Override public BigInteger range() { return RANGE; }
-    }
-    
-    /** token generator used by cassandra pre v1.2 */
-    public static class NonNeg127TokenGenerator extends AbstractTokenGenerator 
{
-        private static final long serialVersionUID = 1357426905711548198L;
-        
-        public static final BigInteger MIN_TOKEN = BigInteger.ZERO;
-        public static final BigInteger MAX_TOKEN = 
TWO.pow(127).subtract(BigInteger.ONE);
-        public static final BigInteger RANGE = TWO.pow(127);
-
-        public NonNeg127TokenGenerator() {
-            checkRangeValid();
-        }
-        
-        @Override public BigInteger max() { return MAX_TOKEN; }
-        @Override public BigInteger min() { return MIN_TOKEN; }
-        @Override public BigInteger range() { return RANGE; }
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseCluster.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseCluster.java
 
b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseCluster.java
deleted file mode 100644
index e824b71..0000000
--- 
a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseCluster.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.nosql.couchbase;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.brooklyn.catalog.Catalog;
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.Entity;
-import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.entity.group.DynamicCluster;
-import brooklyn.entity.proxying.ImplementedBy;
-import brooklyn.event.AttributeSensor;
-import brooklyn.event.basic.Sensors;
-import brooklyn.util.flags.SetFromFlag;
-import brooklyn.util.time.Duration;
-
-import com.google.common.reflect.TypeToken;
-
-@Catalog(name="CouchBase Cluster", description="Couchbase is an open source, 
distributed (shared-nothing architecture) "
-        + "NoSQL document-oriented database that is optimized for interactive 
applications.")
-@ImplementedBy(CouchbaseClusterImpl.class)
-public interface CouchbaseCluster extends DynamicCluster {
-
-    AttributeSensor<Integer> ACTUAL_CLUSTER_SIZE = 
Sensors.newIntegerSensor("coucbase.cluster.actualClusterSize", "returns the 
actual number of nodes in the cluster");
-
-    @SuppressWarnings("serial")
-    AttributeSensor<Set<Entity>> COUCHBASE_CLUSTER_UP_NODES = 
Sensors.newSensor(new TypeToken<Set<Entity>>() {
-    }, "couchbase.cluster.clusterEntities", "the set of service up nodes");
-
-    @SuppressWarnings("serial")
-    AttributeSensor<List<String>> COUCHBASE_CLUSTER_BUCKETS = 
Sensors.newSensor(new TypeToken<List<String>>() {
-    }, "couchbase.cluster.buckets", "Names of all the buckets the couchbase 
cluster");
-
-    AttributeSensor<Entity> COUCHBASE_PRIMARY_NODE = 
Sensors.newSensor(Entity.class, "couchbase.cluster.primaryNode", "The primary 
couchbase node to query and issue add-server and rebalance on");
-
-    AttributeSensor<Boolean> IS_CLUSTER_INITIALIZED = 
Sensors.newBooleanSensor("couchbase.cluster.isClusterInitialized", "flag to 
emit if the couchbase cluster was intialized");
-
-    @SetFromFlag("clusterName")
-    ConfigKey<String> CLUSTER_NAME = 
ConfigKeys.newStringConfigKey("couchbase.cluster.name", "Optional name for this 
cluster");
-
-    @SetFromFlag("intialQuorumSize")
-    ConfigKey<Integer> INITIAL_QUORUM_SIZE = 
ConfigKeys.newIntegerConfigKey("couchbase.cluster.intialQuorumSize", "Initial 
cluster quorum size - number of initial nodes that must have been successfully 
started to report success (if < 0, then use value of INITIAL_SIZE)",
-            -1);
-
-    @SetFromFlag("delayBeforeAdvertisingCluster")
-    ConfigKey<Duration> DELAY_BEFORE_ADVERTISING_CLUSTER = 
ConfigKeys.newConfigKey(Duration.class, 
"couchbase.cluster.delayBeforeAdvertisingCluster", "Delay after cluster is 
started before checking and advertising its availability", 
Duration.TEN_SECONDS);
-
-    // TODO not sure if this is needed; previously waited 3m 
(SERVICE_UP_TIME_OUT) but that seems absurdly long
-    @SetFromFlag("postStartStabilizationDelay")
-    ConfigKey<Duration> NODES_STARTED_STABILIZATION_DELAY = 
ConfigKeys.newConfigKey(Duration.class, 
"couchbase.cluster.postStartStabilizationDelay", "Delay after nodes have been 
started before treating it as a cluster", Duration.TEN_SECONDS);
-    
-    @SetFromFlag("adminUsername")
-    ConfigKey<String> COUCHBASE_ADMIN_USERNAME = 
CouchbaseNode.COUCHBASE_ADMIN_USERNAME;
-
-    @SetFromFlag("adminPassword")
-    ConfigKey<String> COUCHBASE_ADMIN_PASSWORD = 
CouchbaseNode.COUCHBASE_ADMIN_PASSWORD;
-
-    @SuppressWarnings("serial")
-    AttributeSensor<List<String>> COUCHBASE_CLUSTER_UP_NODE_ADDRESSES = 
Sensors.newSensor(new TypeToken<List<String>>() {},
-            "couchbase.cluster.node.addresses", "List of host:port of all 
active nodes in the cluster (http admin port, and public hostname/IP)");
-    AttributeSensor<String> COUCHBASE_CLUSTER_CONNECTION_URL = 
Sensors.newStringSensor(
-            "couchbase.cluster.connection.url", "Couchbase-style URL to 
connect to the cluster (e.g. http://127.0.0.1:8091/ or 
couchbase://10.0.0.1,10.0.0.2/)");
-    
-    // Interesting stats
-    AttributeSensor<Double> OPS_PER_NODE = 
Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.ops", 
-            "Average across cluster for pools/nodes/<current 
node>/interestingStats/ops");
-    AttributeSensor<Double> EP_BG_FETCHED_PER_NODE = 
Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.ep.bg.fetched", 
-            "Average across cluster for pools/nodes/<current 
node>/interestingStats/ep_bg_fetched");
-    AttributeSensor<Double> CURR_ITEMS_PER_NODE = 
Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.curr.items", 
-            "Average across cluster for pools/nodes/<current 
node>/interestingStats/curr_items");
-    AttributeSensor<Double> VB_REPLICA_CURR_ITEMS_PER_NODE = 
Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.vb.replica.curr.items",
 
-            "Average across cluster for pools/nodes/<current 
node>/interestingStats/vb_replica_curr_items");
-    AttributeSensor<Double> GET_HITS_PER_NODE = 
Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.get.hits", 
-            "Average across cluster for pools/nodes/<current 
node>/interestingStats/get_hits");
-    AttributeSensor<Double> CMD_GET_PER_NODE = 
Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.cmd.get", 
-            "Average across cluster for pools/nodes/<current 
node>/interestingStats/cmd_get");
-    AttributeSensor<Double> CURR_ITEMS_TOT_PER_NODE = 
Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.curr.items.tot", 
-            "Average across cluster for pools/nodes/<current 
node>/interestingStats/curr_items_tot");
-    // Although these are Double (after aggregation), they need to be coerced 
to Long for ByteSizeStrings rendering
-    AttributeSensor<Long> COUCH_DOCS_DATA_SIZE_PER_NODE = 
Sensors.newLongSensor("couchbase.stats.cluster.per.node.couch.docs.data.size", 
-            "Average across cluster for pools/nodes/<current 
node>/interestingStats/couch_docs_data_size");
-    AttributeSensor<Long> MEM_USED_PER_NODE = 
Sensors.newLongSensor("couchbase.stats.cluster.per.node.mem.used", 
-            "Average across cluster for pools/nodes/<current 
node>/interestingStats/mem_used");
-    AttributeSensor<Long> COUCH_VIEWS_ACTUAL_DISK_SIZE_PER_NODE = 
Sensors.newLongSensor("couchbase.stats.cluster.per.node.couch.views.actual.disk.size",
 
-            "Average across cluster for pools/nodes/<current 
node>/interestingStats/couch_views_actual_disk_size");
-    AttributeSensor<Long> COUCH_DOCS_ACTUAL_DISK_SIZE_PER_NODE = 
Sensors.newLongSensor("couchbase.stats.cluster.per.node.couch.docs.actual.disk.size",
 
-            "Average across cluster for pools/nodes/<current 
node>/interestingStats/couch_docs_actual_disk_size");
-    AttributeSensor<Long> COUCH_VIEWS_DATA_SIZE_PER_NODE = 
Sensors.newLongSensor("couchbase.stats.cluster.per.node.couch.views.data.size", 
-            "Average across cluster for pools/nodes/<current 
node>/interestingStats/couch_views_data_size");
-    
-    AttributeSensor<Boolean> BUCKET_CREATION_IN_PROGRESS = 
Sensors.newBooleanSensor("couchbase.cluster.bucketCreationInProgress", 
"Indicates that a bucket is currently being created, and" +
-            "further bucket creation should be deferred");
-
-    /**
-     * createBuckets is a list of all the buckets to be created on the 
couchbase cluster
-     * the buckets will be created on the primary node of the cluster
-     * each map entry for a bucket should contain the following parameters:
-     * - <"bucket",(String) name of the bucket (default: default)>
-     * - <"bucket-type",(String) name of bucket type (default: couchbase)>
-     * - <"bucket-port",(Integer) the bucket port to connect to (default: 
11222)>
-     * - <"bucket-ramsize",(Integer) ram size allowed for bucket (default: 
200)>
-     * - <"bucket-replica",(Integer) number of replicas for the bucket 
(default: 1)>
-     */
-    @SuppressWarnings("serial")
-    @SetFromFlag("createBuckets")
-    ConfigKey<List<Map<String, Object>>> CREATE_BUCKETS = 
ConfigKeys.newConfigKey(new TypeToken<List<Map<String, Object>>>() {}, 
-            "couchbase.cluster.createBuckets", "a list of all dedicated port 
buckets to be created on the couchbase cluster");
-    
-    @SuppressWarnings("serial")
-    @SetFromFlag("replication")
-    ConfigKey<List<Map<String,Object>>> REPLICATION = 
ConfigKeys.newConfigKey(new TypeToken<List<Map<String,Object>>>() {}, 
-            "couchbase.cluster.replicationConfiguration", "List of replication 
rules to configure, each rule including target (id of another cluster) and mode 
(unidirectional or bidirectional)");
-
-    int getQuorumSize();
-}

Reply via email to