http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
 
b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
deleted file mode 100644
index 5c47fe7..0000000
--- 
a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
+++ /dev/null
@@ -1,597 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.nosql.couchbase;
-
-import static brooklyn.util.JavaGroovyEquivalents.groovyTruth;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.annotation.Nonnull;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.config.render.RendererHints;
-import brooklyn.enricher.Enrichers;
-import brooklyn.entity.Entity;
-import brooklyn.entity.basic.Attributes;
-import brooklyn.entity.basic.Entities;
-import brooklyn.entity.basic.EntityInternal;
-import brooklyn.entity.basic.ServiceStateLogic;
-import brooklyn.entity.basic.SoftwareProcess;
-import brooklyn.entity.effector.Effectors;
-import brooklyn.entity.group.AbstractMembershipTrackingPolicy;
-import brooklyn.entity.group.DynamicClusterImpl;
-import brooklyn.entity.proxying.EntitySpec;
-import brooklyn.entity.trait.Startable;
-import brooklyn.event.AttributeSensor;
-import brooklyn.event.basic.DependentConfiguration;
-import brooklyn.event.feed.http.HttpFeed;
-import brooklyn.event.feed.http.HttpPollConfig;
-import brooklyn.event.feed.http.HttpValueFunctions;
-import brooklyn.event.feed.http.JsonFunctions;
-import brooklyn.location.access.BrooklynAccessUtils;
-import brooklyn.policy.PolicySpec;
-import brooklyn.util.collections.CollectionFunctionals;
-import brooklyn.util.collections.MutableSet;
-import brooklyn.util.collections.QuorumCheck;
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.guava.Functionals;
-import brooklyn.util.guava.IfFunctions;
-import brooklyn.util.math.MathPredicates;
-import brooklyn.util.task.DynamicTasks;
-import brooklyn.util.task.TaskBuilder;
-import brooklyn.util.task.Tasks;
-import brooklyn.util.text.ByteSizeStrings;
-import brooklyn.util.text.StringFunctions;
-import brooklyn.util.text.Strings;
-import brooklyn.util.time.Duration;
-import brooklyn.util.time.Time;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.net.HostAndPort;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-
-public class CouchbaseClusterImpl extends DynamicClusterImpl implements 
CouchbaseCluster {
-    
-    /*
-     * Refactoring required:
-     * 
-     * Currently, on start() the cluster waits for an arbitrary 
SERVICE_UP_TIME_OUT (3 minutes) before assuming that a quorate 
-     * number of servers are available. The servers are then added to the 
cluster, and a further wait period of  
-     * DELAY_BEFORE_ADVERTISING_CLUSTER (30 seconds) is used before 
advertising the cluster
-     * 
-     * DELAY_BEFORE_ADVERTISING_CLUSTER: It should be possible to refactor 
this away by adding a repeater that will poll
-     * the REST API of the primary node (once established) until the API 
indicates that the cluster is available
-     * 
-     * SERVICE_UP_TIME_OUT: The refactoring of this would be more substantial. 
One method would be to remove the bulk of the 
-     * logic from the start() method, and rely entirely on the membership 
tracking policy and the onServerPoolMemberChanged()
-     * method. The addition of a RUNNING sensor on the nodes would allow the 
cluster to determine that a node is up and
-     * running but has not yet been added to the cluster. The 
IS_CLUSTER_INITIALIZED key could be used to determine whether
-     * or not the cluster should be initialized, or a node simply added to an 
existing cluster. A repeater could be used
-     * in the driver's to ensure that the method does not return until the 
node has been fully added
-     * 
-     * There is an (incomplete) first-pass at this here: 
https://github.com/Nakomis/incubator-brooklyn/compare/couchbase-running-sensor
-     * however, there have been significant changes to the cluster 
initialization since that work was done so it will probably
-     * need to be re-done
-     * 
-     * Additionally, during bucket creation, a HttpPoll is used to check that 
the bucket has been created. This should be 
-     * refactored to use a Repeater in CouchbaseNodeSshDriver.bucketCreate() 
in a similar way to the one employed in
-     * CouchbaseNodeSshDriver.rebalance(). Were this done, this class could 
simply queue the bucket creation tasks
-     * 
-     */
-    
-    private static final Logger log = 
LoggerFactory.getLogger(CouchbaseClusterImpl.class);
-    private final Object mutex = new Object[0];
-    // Used to serialize bucket creation as only one bucket can be created at 
a time,
-    // so a feed is used to determine when a bucket has finished being created
-    private final AtomicReference<HttpFeed> resetBucketCreation = new 
AtomicReference<HttpFeed>();
-
-    public void init() {
-        log.info("Initializing the Couchbase cluster...");
-        super.init();
-        
-        addEnricher(
-            Enrichers.builder()
-                .transforming(COUCHBASE_CLUSTER_UP_NODES)
-                .from(this)
-                .publishing(COUCHBASE_CLUSTER_UP_NODE_ADDRESSES)
-                .computing(new ListOfHostAndPort()).build() );
-        addEnricher(
-            Enrichers.builder()
-                .transforming(COUCHBASE_CLUSTER_UP_NODE_ADDRESSES)
-                .from(this)
-                .publishing(COUCHBASE_CLUSTER_CONNECTION_URL)
-                .computing(
-                    IfFunctions.<List<String>>ifPredicate(
-                        
Predicates.compose(MathPredicates.lessThan(getConfig(CouchbaseCluster.INITIAL_QUORUM_SIZE)),
 
-                            CollectionFunctionals.sizeFunction(0)) )
-                    .value((String)null)
-                    .defaultApply(
-                        Functionals.chain(
-                            
CollectionFunctionals.<String,List<String>>limit(4), 
-                            StringFunctions.joiner(","),
-                            StringFunctions.formatter("http://%s/";))) )
-                .build() );
-        
-        Map<? extends AttributeSensor<? extends Number>, ? extends 
AttributeSensor<? extends Number>> enricherSetup = 
-            ImmutableMap.<AttributeSensor<? extends Number>, AttributeSensor<? 
extends Number>>builder()
-                .put(CouchbaseNode.OPS, CouchbaseCluster.OPS_PER_NODE)
-                .put(CouchbaseNode.COUCH_DOCS_DATA_SIZE, 
CouchbaseCluster.COUCH_DOCS_DATA_SIZE_PER_NODE)
-                .put(CouchbaseNode.COUCH_DOCS_ACTUAL_DISK_SIZE, 
CouchbaseCluster.COUCH_DOCS_ACTUAL_DISK_SIZE_PER_NODE)
-                .put(CouchbaseNode.EP_BG_FETCHED, 
CouchbaseCluster.EP_BG_FETCHED_PER_NODE)
-                .put(CouchbaseNode.MEM_USED, 
CouchbaseCluster.MEM_USED_PER_NODE)
-                .put(CouchbaseNode.COUCH_VIEWS_ACTUAL_DISK_SIZE, 
CouchbaseCluster.COUCH_VIEWS_ACTUAL_DISK_SIZE_PER_NODE)
-                .put(CouchbaseNode.CURR_ITEMS, 
CouchbaseCluster.CURR_ITEMS_PER_NODE)
-                .put(CouchbaseNode.VB_REPLICA_CURR_ITEMS, 
CouchbaseCluster.VB_REPLICA_CURR_ITEMS_PER_NODE)
-                .put(CouchbaseNode.COUCH_VIEWS_DATA_SIZE, 
CouchbaseCluster.COUCH_VIEWS_DATA_SIZE_PER_NODE)
-                .put(CouchbaseNode.GET_HITS, 
CouchbaseCluster.GET_HITS_PER_NODE)
-                .put(CouchbaseNode.CMD_GET, CouchbaseCluster.CMD_GET_PER_NODE)
-                .put(CouchbaseNode.CURR_ITEMS_TOT, 
CouchbaseCluster.CURR_ITEMS_TOT_PER_NODE)
-            .build();
-        
-        for (AttributeSensor<? extends Number> nodeSensor : 
enricherSetup.keySet()) {
-            addSummingMemberEnricher(nodeSensor);
-            addAveragingMemberEnricher(nodeSensor, 
enricherSetup.get(nodeSensor));
-        }
-        
-        
addEnricher(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS)
-            .from(IS_CLUSTER_INITIALIZED).computing(
-                IfFunctions.ifNotEquals(true).value("The cluster is not yet 
completely initialized")
-                    .defaultValue(null).build()).build() );
-    }
-    
-    private void addAveragingMemberEnricher(AttributeSensor<? extends Number> 
fromSensor, AttributeSensor<? extends Number> toSensor) {
-        addEnricher(Enrichers.builder()
-            .aggregating(fromSensor)
-            .publishing(toSensor)
-            .fromMembers()
-            .computingAverage()
-            .build()
-        );
-    }
-
-    private void addSummingMemberEnricher(AttributeSensor<? extends Number> 
source) {
-        addEnricher(Enrichers.builder()
-            .aggregating(source)
-            .publishing(source)
-            .fromMembers()
-            .computingSum()
-            .build()
-        );
-    }
-
-    @Override
-    protected void doStart() {
-        setAttribute(IS_CLUSTER_INITIALIZED, false);
-        
-        super.doStart();
-
-        connectSensors();
-        
-        setAttribute(BUCKET_CREATION_IN_PROGRESS, false);
-
-        //start timeout before adding the servers
-        Tasks.setBlockingDetails("Pausing while Couchbase stabilizes");
-        Time.sleep(getConfig(NODES_STARTED_STABILIZATION_DELAY));
-
-        Optional<Set<Entity>> upNodes = 
Optional.<Set<Entity>>fromNullable(getAttribute(COUCHBASE_CLUSTER_UP_NODES));
-        if (upNodes.isPresent() && !upNodes.get().isEmpty()) {
-
-            Tasks.setBlockingDetails("Adding servers to Couchbase");
-            
-            //TODO: select a new primary node if this one fails
-            Entity primaryNode = upNodes.get().iterator().next();
-            ((EntityInternal) 
primaryNode).setAttribute(CouchbaseNode.IS_PRIMARY_NODE, true);
-            setAttribute(COUCHBASE_PRIMARY_NODE, primaryNode);
-
-            Set<Entity> serversToAdd = MutableSet.<Entity>copyOf(getUpNodes());
-
-            if (serversToAdd.size() >= getQuorumSize() && serversToAdd.size() 
> 1) {
-                log.info("Number of SERVICE_UP nodes:{} in cluster:{} reached 
Quorum:{}, adding the servers", new Object[]{serversToAdd.size(), getId(), 
getQuorumSize()});
-                addServers(serversToAdd);
-
-                //wait for servers to be added to the couchbase server
-                try {
-                    Tasks.setBlockingDetails("Delaying before advertising 
cluster up");
-                    Time.sleep(getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER));
-                } finally {
-                    Tasks.resetBlockingDetails();
-                }
-                
-                ((CouchbaseNode)getPrimaryNode()).rebalance();
-            } else {
-                if (getQuorumSize()>1) {
-                    log.warn(this+" is not quorate; will likely fail later, 
but proceeding for now");
-                }
-                for (Entity server: serversToAdd) {
-                    ((EntityInternal) 
server).setAttribute(CouchbaseNode.IS_IN_CLUSTER, true);
-                }
-            }
-                
-            if (getConfig(CREATE_BUCKETS)!=null) {
-                try {
-                    Tasks.setBlockingDetails("Creating buckets in Couchbase");
-
-                    createBuckets();
-                    DependentConfiguration.waitInTaskForAttributeReady(this, 
CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false));
-
-                } finally {
-                    Tasks.resetBlockingDetails();
-                }
-            }
-
-            if (getConfig(REPLICATION)!=null) {
-                try {
-                    Tasks.setBlockingDetails("Configuring replication rules");
-
-                    List<Map<String, Object>> replRules = 
getConfig(REPLICATION);
-                    for (Map<String, Object> replRule: replRules) {
-                        
DynamicTasks.queue(Effectors.invocation(getPrimaryNode(), 
CouchbaseNode.ADD_REPLICATION_RULE, replRule));
-                    }
-                    DynamicTasks.waitForLast();
-
-                } finally {
-                    Tasks.resetBlockingDetails();
-                }
-            }
-
-            setAttribute(IS_CLUSTER_INITIALIZED, true);
-            
-        } else {
-            throw new IllegalStateException("No up nodes available after 
starting");
-        }
-    }
-
-    @Override
-    public void stop() {
-        if (resetBucketCreation.get() != null) {
-            resetBucketCreation.get().stop();
-        }
-        super.stop();
-    }
-
-    protected void connectSensors() {
-        addPolicy(PolicySpec.create(MemberTrackingPolicy.class)
-                .displayName("Controller targets tracker")
-                .configure("group", this));
-    }
-    
-    private final static class ListOfHostAndPort implements 
Function<Set<Entity>, List<String>> {
-        @Override public List<String> apply(Set<Entity> input) {
-            List<String> addresses = Lists.newArrayList();
-            for (Entity entity : input) {
-                addresses.add(String.format("%s",
-                        
BrooklynAccessUtils.getBrooklynAccessibleAddress(entity, 
entity.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT))));
-            }
-            return addresses;
-        }
-    }
-
-    public static class MemberTrackingPolicy extends 
AbstractMembershipTrackingPolicy {
-        @Override protected void onEntityChange(Entity member) {
-            ((CouchbaseClusterImpl)entity).onServerPoolMemberChanged(member);
-        }
-
-        @Override protected void onEntityAdded(Entity member) {
-            ((CouchbaseClusterImpl)entity).onServerPoolMemberChanged(member);
-        }
-
-        @Override protected void onEntityRemoved(Entity member) {
-            ((CouchbaseClusterImpl)entity).onServerPoolMemberChanged(member);
-        }
-    };
-
-    protected synchronized void onServerPoolMemberChanged(Entity member) {
-        if (log.isTraceEnabled()) log.trace("For {}, considering membership of 
{} which is in locations {}",
-                new Object[]{this, member, member.getLocations()});
-
-        //FIXME: make use of servers to be added after cluster initialization.
-        synchronized (mutex) {
-            if (belongsInServerPool(member)) {
-
-                Optional<Set<Entity>> upNodes = 
Optional.fromNullable(getUpNodes());
-                if (upNodes.isPresent()) {
-
-                    if (!upNodes.get().contains(member)) {
-                        Set<Entity> newNodes = Sets.newHashSet(getUpNodes());
-                        newNodes.add(member);
-                        setAttribute(COUCHBASE_CLUSTER_UP_NODES, newNodes);
-
-                        //add to set of servers to be added.
-                        if (isClusterInitialized()) {
-                            addServer(member);
-                        }
-                    }
-                } else {
-                    Set<Entity> newNodes = Sets.newHashSet();
-                    newNodes.add(member);
-                    setAttribute(COUCHBASE_CLUSTER_UP_NODES, newNodes);
-
-                    if (isClusterInitialized()) {
-                        addServer(member);
-                    }
-                }
-            } else {
-                Set<Entity> upNodes = getUpNodes();
-                if (upNodes != null && upNodes.contains(member)) {
-                    upNodes.remove(member);
-                    setAttribute(COUCHBASE_CLUSTER_UP_NODES, upNodes);
-                    log.info("Removing couchbase node {}: {}; from cluster", 
new Object[]{this, member});
-                }
-            }
-            if (log.isTraceEnabled()) log.trace("Done {} checkEntity {}", 
this, member);
-        }
-    }
-
-    protected boolean belongsInServerPool(Entity member) {
-        if (!groovyTruth(member.getAttribute(Startable.SERVICE_UP))) {
-            if (log.isTraceEnabled()) log.trace("Members of {}, checking {}, 
eliminating because not up", this, member);
-            return false;
-        }
-        if (!getMembers().contains(member)) {
-            if (log.isTraceEnabled())
-                log.trace("Members of {}, checking {}, eliminating because not 
member", this, member);
-
-            return false;
-        }
-        if (log.isTraceEnabled()) log.trace("Members of {}, checking {}, 
approving", this, member);
-
-        return true;
-    }
-
-
-    protected EntitySpec<?> getMemberSpec() {
-        EntitySpec<?> result = super.getMemberSpec();
-        if (result != null) return result;
-        return EntitySpec.create(CouchbaseNode.class);
-    }
-
-    @Override
-    public int getQuorumSize() {
-        Integer quorumSize = getConfig(CouchbaseCluster.INITIAL_QUORUM_SIZE);
-        if (quorumSize != null && quorumSize > 0)
-            return quorumSize;
-        // by default the quorum would be floor(initial_cluster_size/2) + 1
-        return (int) Math.floor(getConfig(INITIAL_SIZE) / 2) + 1;
-    }
-
-    protected int getActualSize() {
-        return 
Optional.fromNullable(getAttribute(CouchbaseCluster.ACTUAL_CLUSTER_SIZE)).or(-1);
-    }
-
-    private Set<Entity> getUpNodes() {
-        return getAttribute(COUCHBASE_CLUSTER_UP_NODES);
-    }
-
-    private CouchbaseNode getPrimaryNode() {
-        return (CouchbaseNode) getAttribute(COUCHBASE_PRIMARY_NODE);
-    }
-
-    @Override
-    protected void initEnrichers() {
-        
addEnricher(Enrichers.builder().updatingMap(ServiceStateLogic.SERVICE_NOT_UP_INDICATORS)
-            .from(COUCHBASE_CLUSTER_UP_NODES)
-            .computing(new Function<Set<Entity>, Object>() {
-                @Override
-                public Object apply(Set<Entity> input) {
-                    if (input==null) return "Couchbase up nodes not set";
-                    if (input.isEmpty()) return "No Couchbase up nodes";
-                    if (input.size() < getQuorumSize()) return "Couchbase up 
nodes not quorate";
-                    return null;
-                }
-            }).build());
-        
-        if (config().getLocalRaw(UP_QUORUM_CHECK).isAbsent()) {
-            // TODO Only leaving CouchbaseQuorumCheck here in case it is 
contained in persisted state.
-            // If so, need a transformer and then to delete it
-            @SuppressWarnings({ "unused", "hiding" })
-            @Deprecated
-            class CouchbaseQuorumCheck implements QuorumCheck {
-                @Override
-                public boolean isQuorate(int sizeHealthy, int totalSize) {
-                    // check members count passed in AND the sensor  
-                    if (sizeHealthy < getQuorumSize()) return false;
-                    return true;
-                }
-            }
-            config().set(UP_QUORUM_CHECK, new 
CouchbaseClusterImpl.CouchbaseQuorumCheck(this));
-        }
-        super.initEnrichers();
-    }
-    
-    static class CouchbaseQuorumCheck implements QuorumCheck {
-        private final CouchbaseCluster cluster;
-        CouchbaseQuorumCheck(CouchbaseCluster cluster) {
-            this.cluster = cluster;
-        }
-        @Override
-        public boolean isQuorate(int sizeHealthy, int totalSize) {
-            // check members count passed in AND the sensor  
-            if (sizeHealthy < cluster.getQuorumSize()) return false;
-            return true;
-        }
-    }
-    protected void addServers(Set<Entity> serversToAdd) {
-        Preconditions.checkNotNull(serversToAdd);
-        for (Entity s : serversToAdd) {
-            addServerSeveralTimes(s, 12, Duration.TEN_SECONDS);
-        }
-    }
-
-    /** try adding in a loop because we are seeing spurious port failures in 
AWS */
-    protected void addServerSeveralTimes(Entity s, int numRetries, Duration 
delayOnFailure) {
-        try {
-            addServer(s);
-        } catch (Exception e) {
-            Exceptions.propagateIfFatal(e);
-            if (numRetries<=0) throw Exceptions.propagate(e);
-            // retry once after sleep because we are getting some odd 
primary-change events
-            log.warn("Error adding "+s+" to "+this+", "+numRetries+" retries 
remaining, will retry after delay ("+e+")");
-            Time.sleep(delayOnFailure);
-            addServerSeveralTimes(s, numRetries-1, delayOnFailure);
-        }
-    }
-
-    protected void addServer(Entity serverToAdd) {
-        Preconditions.checkNotNull(serverToAdd);
-        if (serverToAdd.equals(getPrimaryNode())) {
-            // no need to add; but we pass it in anyway because it makes the 
calling logic easier
-            return;
-        }
-        if (!isMemberInCluster(serverToAdd)) {
-            HostAndPort webAdmin = 
HostAndPort.fromParts(serverToAdd.getAttribute(SoftwareProcess.SUBNET_HOSTNAME),
-                    
serverToAdd.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT));
-            String username = 
serverToAdd.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME);
-            String password = 
serverToAdd.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD);
-
-            if (isClusterInitialized()) {
-                Entities.invokeEffectorWithArgs(this, getPrimaryNode(), 
CouchbaseNode.SERVER_ADD_AND_REBALANCE, webAdmin.toString(), username, 
password).getUnchecked();
-            } else {
-                Entities.invokeEffectorWithArgs(this, getPrimaryNode(), 
CouchbaseNode.SERVER_ADD, webAdmin.toString(), username, 
password).getUnchecked();
-            }
-            //FIXME check feedback of whether the server was added.
-            ((EntityInternal) 
serverToAdd).setAttribute(CouchbaseNode.IS_IN_CLUSTER, true);
-        }
-    }
-
-    /** finds the cluster name specified for a node or a cluster, 
-     * using {@link CouchbaseCluster#CLUSTER_NAME} or falling back to the 
cluster (or node) ID. */
-    public static String getClusterName(Entity node) {
-        String name = node.getConfig(CLUSTER_NAME);
-        if (!Strings.isBlank(name)) return Strings.makeValidFilename(name);
-        return getClusterOrNode(node).getId();
-    }
-    
-    /** returns Couchbase cluster in ancestry, defaulting to the given node if 
none */
-    @Nonnull public static Entity getClusterOrNode(Entity node) {
-        Iterable<CouchbaseCluster> clusterNodes = 
Iterables.filter(Entities.ancestors(node), CouchbaseCluster.class);
-        return Iterables.getFirst(clusterNodes, node);
-    }
-    
-    public boolean isClusterInitialized() {
-        return 
Optional.fromNullable(getAttribute(IS_CLUSTER_INITIALIZED)).or(false);
-    }
-
-    public boolean isMemberInCluster(Entity e) {
-        return 
Optional.fromNullable(e.getAttribute(CouchbaseNode.IS_IN_CLUSTER)).or(false);
-    }
-    
-    public void createBuckets() {
-        //TODO: check for port conflicts if buckets are being created with a 
port
-        List<Map<String, Object>> bucketsToCreate = getConfig(CREATE_BUCKETS);
-        if (bucketsToCreate==null) return;
-        
-        Entity primaryNode = getPrimaryNode();
-
-        for (Map<String, Object> bucketMap : bucketsToCreate) {
-            String bucketName = bucketMap.containsKey("bucket") ? (String) 
bucketMap.get("bucket") : "default";
-            String bucketType = bucketMap.containsKey("bucket-type") ? 
(String) bucketMap.get("bucket-type") : "couchbase";
-            // default bucket must be on this port; other buckets can (must) 
specify their own (unique) port
-            Integer bucketPort = bucketMap.containsKey("bucket-port") ? 
(Integer) bucketMap.get("bucket-port") : 11211;
-            Integer bucketRamSize = bucketMap.containsKey("bucket-ramsize") ? 
(Integer) bucketMap.get("bucket-ramsize") : 100;
-            Integer bucketReplica = bucketMap.containsKey("bucket-replica") ? 
(Integer) bucketMap.get("bucket-replica") : 1;
-
-            createBucket(primaryNode, bucketName, bucketType, bucketPort, 
bucketRamSize, bucketReplica);
-        }
-    }
-
-    public void createBucket(final Entity primaryNode, final String 
bucketName, final String bucketType, final Integer bucketPort, final Integer 
bucketRamSize, final Integer bucketReplica) {
-        
DynamicTasks.queueIfPossible(TaskBuilder.<Void>builder().name("Creating bucket 
" + bucketName).body(
-                new Callable<Void>() {
-                    @Override
-                    public Void call() throws Exception {
-                        
DependentConfiguration.waitInTaskForAttributeReady(CouchbaseClusterImpl.this, 
CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false));
-                        if 
(CouchbaseClusterImpl.this.resetBucketCreation.get() != null) {
-                            
CouchbaseClusterImpl.this.resetBucketCreation.get().stop();
-                        }
-                        
setAttribute(CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, true);
-                        HostAndPort hostAndPort = 
BrooklynAccessUtils.getBrooklynAccessibleAddress(primaryNode, 
primaryNode.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT));
-
-                        
CouchbaseClusterImpl.this.resetBucketCreation.set(HttpFeed.builder()
-                                .entity(CouchbaseClusterImpl.this)
-                                .period(500, TimeUnit.MILLISECONDS)
-                                
.baseUri(String.format("http://%s/pools/default/buckets/%s";, hostAndPort, 
bucketName))
-                                
.credentials(primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME), 
primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD))
-                                .poll(new 
HttpPollConfig<Boolean>(BUCKET_CREATION_IN_PROGRESS)
-                                        
.onSuccess(Functionals.chain(HttpValueFunctions.jsonContents(), 
JsonFunctions.walkN("nodes"), new Function<JsonElement, Boolean>() {
-                                            @Override
-                                            public Boolean apply(JsonElement 
input) {
-                                                // Wait until bucket has been 
created on all nodes and the couchApiBase element has been published 
(indicating that the bucket is useable)
-                                                JsonArray servers = 
input.getAsJsonArray();
-                                                if (servers.size() != 
CouchbaseClusterImpl.this.getMembers().size()) {
-                                                    return true;
-                                                }
-                                                for (JsonElement server : 
servers) {
-                                                    Object api = 
server.getAsJsonObject().get("couchApiBase");
-                                                    if (api == null || 
Strings.isEmpty(String.valueOf(api))) {
-                                                        return true;
-                                                    }
-                                                }
-                                                return false;
-                                            }
-                                        }))
-                                        .onFailureOrException(new 
Function<Object, Boolean>() {
-                                            @Override
-                                            public Boolean apply(Object input) 
{
-                                                if (input instanceof 
brooklyn.util.http.HttpToolResponse) {
-                                                    if 
(((brooklyn.util.http.HttpToolResponse) input).getResponseCode() == 404) {
-                                                        return true;
-                                                    }
-                                                }
-                                                if (input instanceof Throwable)
-                                                    
Exceptions.propagate((Throwable) input);
-                                                throw new 
IllegalStateException("Unexpected response when creating bucket:" + input);
-                                            }
-                                        }))
-                                .build());
-
-                        // TODO: Bail out if bucket creation fails, to allow 
next bucket to proceed
-                        
Entities.invokeEffectorWithArgs(CouchbaseClusterImpl.this, primaryNode, 
CouchbaseNode.BUCKET_CREATE, bucketName, bucketType, bucketPort, bucketRamSize, 
bucketReplica);
-                        
DependentConfiguration.waitInTaskForAttributeReady(CouchbaseClusterImpl.this, 
CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false));
-                        if 
(CouchbaseClusterImpl.this.resetBucketCreation.get() != null) {
-                            
CouchbaseClusterImpl.this.resetBucketCreation.get().stop();
-                        }
-                        return null;
-                    }
-                }
-        ).build()).orSubmitAndBlock();
-    }
-    
-    static {
-        RendererHints.register(COUCH_DOCS_DATA_SIZE_PER_NODE, 
RendererHints.displayValue(ByteSizeStrings.metric()));
-        RendererHints.register(COUCH_DOCS_ACTUAL_DISK_SIZE_PER_NODE, 
RendererHints.displayValue(ByteSizeStrings.metric()));
-        RendererHints.register(MEM_USED_PER_NODE, 
RendererHints.displayValue(ByteSizeStrings.metric()));
-        RendererHints.register(COUCH_VIEWS_ACTUAL_DISK_SIZE_PER_NODE, 
RendererHints.displayValue(ByteSizeStrings.metric()));
-        RendererHints.register(COUCH_VIEWS_DATA_SIZE_PER_NODE, 
RendererHints.displayValue(ByteSizeStrings.metric()));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java
 
b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java
deleted file mode 100644
index 727f942..0000000
--- 
a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.nosql.couchbase;
-
-import java.net.URI;
-
-import org.apache.brooklyn.catalog.Catalog;
-import brooklyn.config.ConfigKey;
-import brooklyn.config.render.RendererHints;
-import brooklyn.entity.annotation.Effector;
-import brooklyn.entity.annotation.EffectorParam;
-import brooklyn.entity.basic.Attributes;
-import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.entity.basic.MethodEffector;
-import brooklyn.entity.basic.SoftwareProcess;
-import brooklyn.entity.effector.Effectors;
-import brooklyn.entity.proxying.ImplementedBy;
-import brooklyn.event.AttributeSensor;
-import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
-import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
-import brooklyn.event.basic.Sensors;
-import brooklyn.util.flags.SetFromFlag;
-import brooklyn.util.text.ByteSizeStrings;
-
-@Catalog(name="CouchBase Node", description="Couchbase Server is an open 
source, distributed (shared-nothing architecture) "
-        + "NoSQL document-oriented database that is optimized for interactive 
applications.")
-@ImplementedBy(CouchbaseNodeImpl.class)
-public interface CouchbaseNode extends SoftwareProcess {
-
-    @SetFromFlag("adminUsername")
-    ConfigKey<String> COUCHBASE_ADMIN_USERNAME = 
ConfigKeys.newStringConfigKey("couchbase.adminUsername", "Username for the 
admin user on the node", "Administrator");
-
-    @SetFromFlag("adminPassword")
-    ConfigKey<String> COUCHBASE_ADMIN_PASSWORD = 
ConfigKeys.newStringConfigKey("couchbase.adminPassword", "Password for the 
admin user on the node", "Password");
-
-    @SetFromFlag("version")
-    ConfigKey<String> SUGGESTED_VERSION = 
ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION,
-            "3.0.0");
-
-    @SetFromFlag("enterprise")
-    ConfigKey<Boolean> USE_ENTERPRISE = 
ConfigKeys.newBooleanConfigKey("couchbase.enterprise.enabled",
-        "Whether to use Couchbase Enterprise; if false uses the community 
version. Defaults to true.", true);
-
-    @SetFromFlag("downloadUrl")
-    BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new 
BasicAttributeSensorAndConfigKey<String>(
-            SoftwareProcess.DOWNLOAD_URL, 
"http://packages.couchbase.com/releases/${version}/";
-                + 
"couchbase-server-${driver.communityOrEnterprise}${driver.downloadLinkPreVersionSeparator}${version}${driver.downloadLinkOsTagWithPrefix}");
-
-    @SetFromFlag("clusterInitRamSize")
-    BasicAttributeSensorAndConfigKey<Integer> COUCHBASE_CLUSTER_INIT_RAM_SIZE 
= new BasicAttributeSensorAndConfigKey<Integer>(
-            Integer.class, "couchbase.clusterInitRamSize", "initial ram size 
of the cluster", 300);
-
-    PortAttributeSensorAndConfigKey COUCHBASE_WEB_ADMIN_PORT = new 
PortAttributeSensorAndConfigKey("couchbase.webAdminPort", "Web Administration 
Port", "8091+");
-    PortAttributeSensorAndConfigKey COUCHBASE_API_PORT = new 
PortAttributeSensorAndConfigKey("couchbase.apiPort", "Couchbase API Port", 
"8092+");
-    PortAttributeSensorAndConfigKey COUCHBASE_INTERNAL_BUCKET_PORT = new 
PortAttributeSensorAndConfigKey("couchbase.internalBucketPort", "Internal 
Bucket Port", "11209");
-    PortAttributeSensorAndConfigKey COUCHBASE_INTERNAL_EXTERNAL_BUCKET_PORT = 
new PortAttributeSensorAndConfigKey("couchbase.internalExternalBucketPort", 
"Internal/External Bucket Port", "11210");
-    PortAttributeSensorAndConfigKey COUCHBASE_CLIENT_INTERFACE_PROXY = new 
PortAttributeSensorAndConfigKey("couchbase.clientInterfaceProxy", "Client 
interface (proxy)", "11211");
-    PortAttributeSensorAndConfigKey COUCHBASE_INCOMING_SSL_PROXY = new 
PortAttributeSensorAndConfigKey("couchbase.incomingSslProxy", "Incoming SSL 
Proxy", "11214");
-    PortAttributeSensorAndConfigKey COUCHBASE_INTERNAL_OUTGOING_SSL_PROXY = 
new PortAttributeSensorAndConfigKey("couchbase.internalOutgoingSslProxy", 
"Internal Outgoing SSL Proxy", "11215");
-    PortAttributeSensorAndConfigKey COUCHBASE_REST_HTTPS_FOR_SSL = new 
PortAttributeSensorAndConfigKey("couchbase.internalRestHttpsForSsl", "Internal 
REST HTTPS for SSL", "18091");
-    PortAttributeSensorAndConfigKey COUCHBASE_CAPI_HTTPS_FOR_SSL = new 
PortAttributeSensorAndConfigKey("couchbase.internalCapiHttpsForSsl", "Internal 
CAPI HTTPS for SSL", "18092");
-    PortAttributeSensorAndConfigKey ERLANG_PORT_MAPPER = new 
PortAttributeSensorAndConfigKey("couchbase.erlangPortMapper", "Erlang Port 
Mapper Daemon Listener Port (epmd)", "4369");
-    PortAttributeSensorAndConfigKey NODE_DATA_EXCHANGE_PORT_RANGE_START = new 
PortAttributeSensorAndConfigKey("couchbase.nodeDataExchangePortRangeStart", 
"Node data exchange Port Range Start", "21100+");
-    PortAttributeSensorAndConfigKey NODE_DATA_EXCHANGE_PORT_RANGE_END = new 
PortAttributeSensorAndConfigKey("couchbase.nodeDataExchangePortRangeEnd", "Node 
data exchange Port Range End", "21199+");
-
-    AttributeSensor<Boolean> IS_PRIMARY_NODE = 
Sensors.newBooleanSensor("couchbase.isPrimaryNode", "flag to determine if the 
current couchbase node is the primary node for the cluster");
-    AttributeSensor<Boolean> IS_IN_CLUSTER = 
Sensors.newBooleanSensor("couchbase.isInCluster", "flag to determine if the 
current couchbase node has been added to a cluster, "
-        + "including being the first / primary node");
-    AttributeSensor<URI> COUCHBASE_WEB_ADMIN_URL = Attributes.MAIN_URI;
-    
-    // Interesting stats
-    AttributeSensor<Double> OPS = 
Sensors.newDoubleSensor("couchbase.stats.ops", 
-            "Retrieved from pools/nodes/<current node>/interestingStats/ops");
-    AttributeSensor<Long> COUCH_DOCS_DATA_SIZE = 
Sensors.newLongSensor("couchbase.stats.couch.docs.data.size", 
-            "Retrieved from pools/nodes/<current 
node>/interestingStats/couch_docs_data_size");
-    AttributeSensor<Long> COUCH_DOCS_ACTUAL_DISK_SIZE = 
Sensors.newLongSensor("couchbase.stats.couch.docs.actual.disk.size", 
-            "Retrieved from pools/nodes/<current 
node>/interestingStats/couch_docs_actual_disk_size");
-    AttributeSensor<Long> EP_BG_FETCHED = 
Sensors.newLongSensor("couchbase.stats.ep.bg.fetched", 
-            "Retrieved from pools/nodes/<current 
node>/interestingStats/ep_bg_fetched");
-    AttributeSensor<Long> MEM_USED = 
Sensors.newLongSensor("couchbase.stats.mem.used", 
-            "Retrieved from pools/nodes/<current 
node>/interestingStats/mem_used");
-    AttributeSensor<Long> COUCH_VIEWS_ACTUAL_DISK_SIZE = 
Sensors.newLongSensor("couchbase.stats.couch.views.actual.disk.size", 
-            "Retrieved from pools/nodes/<current 
node>/interestingStats/couch_views_actual_disk_size");
-    AttributeSensor<Long> CURR_ITEMS = 
Sensors.newLongSensor("couchbase.stats.curr.items", 
-            "Retrieved from pools/nodes/<current 
node>/interestingStats/curr_items");
-    AttributeSensor<Long> VB_REPLICA_CURR_ITEMS = 
Sensors.newLongSensor("couchbase.stats.vb.replica.curr.items", 
-            "Retrieved from pools/nodes/<current 
node>/interestingStats/vb_replica_curr_items");
-    AttributeSensor<Long> COUCH_VIEWS_DATA_SIZE = 
Sensors.newLongSensor("couchbase.stats.couch.views.data.size", 
-            "Retrieved from pools/nodes/<current 
node>/interestingStats/couch_views_data_size");
-    AttributeSensor<Long> GET_HITS = 
Sensors.newLongSensor("couchbase.stats.get.hits", 
-            "Retrieved from pools/nodes/<current 
node>/interestingStats/get_hits");
-    AttributeSensor<Double> CMD_GET = 
Sensors.newDoubleSensor("couchbase.stats.cmd.get", 
-            "Retrieved from pools/nodes/<current 
node>/interestingStats/cmd_get");
-    AttributeSensor<Long> CURR_ITEMS_TOT = 
Sensors.newLongSensor("couchbase.stats.curr.items.tot", 
-            "Retrieved from pools/nodes/<current 
node>/interestingStats/curr_items_tot");
-    AttributeSensor<String> REBALANCE_STATUS = 
Sensors.newStringSensor("couchbase.rebalance.status", 
-            "Displays the current rebalance status from 
pools/nodes/rebalanceStatus");
-    
-    class MainUri {
-        public static final AttributeSensor<URI> MAIN_URI = 
Attributes.MAIN_URI;
-        
-        static {
-            // ROOT_URL does not need init because it refers to something 
already initialized
-            RendererHints.register(COUCHBASE_WEB_ADMIN_URL, 
RendererHints.namedActionWithUrl());
-
-            RendererHints.register(COUCH_DOCS_DATA_SIZE, 
RendererHints.displayValue(ByteSizeStrings.metric()));
-            RendererHints.register(COUCH_DOCS_ACTUAL_DISK_SIZE, 
RendererHints.displayValue(ByteSizeStrings.metric()));
-            RendererHints.register(MEM_USED, 
RendererHints.displayValue(ByteSizeStrings.metric()));
-            RendererHints.register(COUCH_VIEWS_ACTUAL_DISK_SIZE, 
RendererHints.displayValue(ByteSizeStrings.metric()));
-            RendererHints.register(COUCH_VIEWS_DATA_SIZE, 
RendererHints.displayValue(ByteSizeStrings.metric()));
-        }
-    }
-    
-    // this long-winded reference is done just to trigger the initialization 
above
-    AttributeSensor<URI> MAIN_URI = MainUri.MAIN_URI;
-
-    MethodEffector<Void> SERVER_ADD = new 
MethodEffector<Void>(CouchbaseNode.class, "serverAdd");
-    MethodEffector<Void> SERVER_ADD_AND_REBALANCE = new 
MethodEffector<Void>(CouchbaseNode.class, "serverAddAndRebalance");
-    MethodEffector<Void> REBALANCE = new 
MethodEffector<Void>(CouchbaseNode.class, "rebalance");
-    MethodEffector<Void> BUCKET_CREATE = new 
MethodEffector<Void>(CouchbaseNode.class, "bucketCreate");
-    brooklyn.entity.Effector<Void> ADD_REPLICATION_RULE = 
Effectors.effector(Void.class, "addReplicationRule")
-        .description("Adds a replication rule from the indicated bucket on the 
cluster where this node is located "
-            + "to the indicated cluster and optional destination bucket")
-        .parameter(String.class, "fromBucket", "Bucket to be replicated")
-        .parameter(Object.class, "toCluster", "Entity (or ID) of the cluster 
to which this should replicate")
-        .parameter(String.class, "toBucket", "Destination bucket for 
replication in the toCluster, defaulting to the same as the fromBucket")
-        .buildAbstract();
-
-    @Effector(description = "add a server to a cluster")
-    public void serverAdd(@EffectorParam(name = "serverHostname") String 
serverToAdd, @EffectorParam(name = "username") String username, 
@EffectorParam(name = "password") String password);
-    
-    @Effector(description = "add a server to a cluster, and immediately 
rebalances")
-    public void serverAddAndRebalance(@EffectorParam(name = "serverHostname") 
String serverToAdd, @EffectorParam(name = "username") String username, 
@EffectorParam(name = "password") String password);
-
-    @Effector(description = "rebalance the couchbase cluster")
-    public void rebalance();
-    
-    @Effector(description = "create a new bucket")
-    public void bucketCreate(@EffectorParam(name = "bucketName") String 
bucketName, @EffectorParam(name = "bucketType") String bucketType, 
-            @EffectorParam(name = "bucketPort") Integer bucketPort, 
@EffectorParam(name = "bucketRamSize") Integer bucketRamSize, 
-            @EffectorParam(name = "bucketReplica") Integer bucketReplica);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java
 
b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java
deleted file mode 100644
index 37f2f74..0000000
--- 
a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.nosql.couchbase;
-
-import brooklyn.entity.Entity;
-import brooklyn.entity.basic.SoftwareProcessDriver;
-
-public interface CouchbaseNodeDriver extends SoftwareProcessDriver {
-    public String getOsTag();
-    public String getDownloadLinkPreVersionSeparator();
-    public String getDownloadLinkOsTagWithPrefix();
-    
-    public String getCommunityOrEnterprise();
-
-    public void serverAdd(String serverToAdd, String username, String 
password);
-
-    public void rebalance();
-    
-    public void bucketCreate(String bucketName, String bucketType, Integer 
bucketPort, Integer bucketRamSize, Integer bucketReplica);
-
-    public void serverAddAndRebalance(String serverToAdd, String username, 
String password);
-
-    public void addReplicationRule(Entity toCluster, String fromBucket, String 
toBucket);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java
 
b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java
deleted file mode 100644
index d7439ca..0000000
--- 
a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.nosql.couchbase;
-
-import static java.lang.String.format;
-
-import java.net.URI;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.entity.Entity;
-import brooklyn.entity.basic.Attributes;
-import brooklyn.entity.basic.SoftwareProcessImpl;
-import brooklyn.entity.effector.EffectorBody;
-import brooklyn.event.AttributeSensor;
-import brooklyn.event.SensorEvent;
-import brooklyn.event.SensorEventListener;
-import brooklyn.event.feed.http.HttpFeed;
-import brooklyn.event.feed.http.HttpPollConfig;
-import brooklyn.event.feed.http.HttpValueFunctions;
-import brooklyn.event.feed.http.JsonFunctions;
-import brooklyn.location.MachineProvisioningLocation;
-import brooklyn.location.access.BrooklynAccessUtils;
-import brooklyn.location.cloud.CloudLocationConfig;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.collections.MutableSet;
-import brooklyn.util.config.ConfigBag;
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.guava.Functionals;
-import brooklyn.util.guava.MaybeFunctions;
-import brooklyn.util.guava.TypeTokens;
-import brooklyn.util.http.HttpTool;
-import brooklyn.util.http.HttpToolResponse;
-import brooklyn.util.net.Urls;
-import brooklyn.util.task.Tasks;
-import brooklyn.util.text.Strings;
-import brooklyn.util.time.Duration;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Preconditions;
-import com.google.common.net.HostAndPort;
-import com.google.common.net.HttpHeaders;
-import com.google.common.net.MediaType;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-
-public class CouchbaseNodeImpl extends SoftwareProcessImpl implements 
CouchbaseNode {
-
-    private static final Logger log = 
LoggerFactory.getLogger(CouchbaseNodeImpl.class);
-
-    private volatile HttpFeed httpFeed;
-
-    @Override
-    public Class<CouchbaseNodeDriver> getDriverInterface() {
-        return CouchbaseNodeDriver.class;
-    }
-
-    @Override
-    public CouchbaseNodeDriver getDriver() {
-        return (CouchbaseNodeDriver) super.getDriver();
-    }
-
-    @Override
-    public void init() {
-        super.init();
-
-        subscribe(this, Attributes.SERVICE_UP, new 
SensorEventListener<Boolean>() {
-            @Override
-            public void onEvent(SensorEvent<Boolean> booleanSensorEvent) {
-                if (Boolean.TRUE.equals(booleanSensorEvent.getValue())) {
-                    Integer webPort = 
getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT);
-                    Preconditions.checkNotNull(webPort, 
CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT+" not set for %s; is an acceptable port 
available?", this);
-                    String hostAndPort = 
BrooklynAccessUtils.getBrooklynAccessibleAddress(CouchbaseNodeImpl.this, 
webPort).toString();
-                    setAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_URL, 
URI.create(format("http://%s";, hostAndPort)));
-                }
-            }
-        });
-
-        getMutableEntityType().addEffector(ADD_REPLICATION_RULE, new 
EffectorBody<Void>() {
-            @Override
-            public Void call(ConfigBag parameters) {
-                addReplicationRule(parameters);
-                return null;
-            }
-        });
-    }
-
-    protected Map<String, Object> 
obtainProvisioningFlags(@SuppressWarnings("rawtypes") 
MachineProvisioningLocation location) {
-        ConfigBag result = 
ConfigBag.newInstance(super.obtainProvisioningFlags(location));
-        result.configure(CloudLocationConfig.OS_64_BIT, true);
-        return result.getAllConfig();
-    }
-
-    @Override
-    protected Collection<Integer> getRequiredOpenPorts() {
-        // TODO this creates a huge list of inbound ports; much better to 
define on a security group using range syntax!
-        int erlangRangeStart = 
getConfig(NODE_DATA_EXCHANGE_PORT_RANGE_START).iterator().next();
-        int erlangRangeEnd = 
getConfig(NODE_DATA_EXCHANGE_PORT_RANGE_END).iterator().next();
-
-        Set<Integer> newPorts = 
MutableSet.<Integer>copyOf(super.getRequiredOpenPorts());
-        newPorts.remove(erlangRangeStart);
-        newPorts.remove(erlangRangeEnd);
-        for (int i = erlangRangeStart; i <= erlangRangeEnd; i++)
-            newPorts.add(i);
-        return newPorts;
-    }
-
-    @Override
-    public void serverAdd(String serverToAdd, String username, String 
password) {
-        getDriver().serverAdd(serverToAdd, username, password);
-    }
-
-    @Override
-    public void serverAddAndRebalance(String serverToAdd, String username, 
String password) {
-        getDriver().serverAddAndRebalance(serverToAdd, username, password);
-    }
-
-    @Override
-    public void rebalance() {
-        getDriver().rebalance();
-    }
-
-    protected final static Function<HttpToolResponse, JsonElement> 
GET_THIS_NODE_STATS = Functionals.chain(
-        HttpValueFunctions.jsonContents(),
-        JsonFunctions.walk("nodes"),
-        new Function<JsonElement, JsonElement>() {
-            @Override public JsonElement apply(JsonElement input) {
-                JsonArray nodes = input.getAsJsonArray();
-                for (JsonElement element : nodes) {
-                    JsonElement thisNode = 
element.getAsJsonObject().get("thisNode");
-                    if (thisNode!=null && 
Boolean.TRUE.equals(thisNode.getAsBoolean())) {
-                        return 
element.getAsJsonObject().get("interestingStats");
-                    }
-                }
-                return null;
-        }}
-    );
-
-    protected final static <T> HttpPollConfig<T> 
getSensorFromNodeStat(AttributeSensor<T> sensor, String ...jsonPath) {
-        return new HttpPollConfig<T>(sensor)
-            .onSuccess(Functionals.chain(GET_THIS_NODE_STATS,
-                MaybeFunctions.<JsonElement>wrap(),
-                JsonFunctions.walkM(jsonPath),
-                
JsonFunctions.castM(TypeTokens.getRawRawType(sensor.getTypeToken()), null)))
-            .onFailureOrException(Functions.<T>constant(null));
-    }
-
-    @Override
-    protected void postStart() {
-        super.postStart();
-        renameServerToPublicHostname();
-    }
-
-    protected void renameServerToPublicHostname() {
-        // 
http://docs.couchbase.com/couchbase-manual-2.5/cb-install/#couchbase-getting-started-hostnames
-        URI apiUri = null;
-        try {
-            HostAndPort accessible = 
BrooklynAccessUtils.getBrooklynAccessibleAddress(this, 
getAttribute(COUCHBASE_WEB_ADMIN_PORT));
-            apiUri = 
URI.create(String.format("http://%s:%d/node/controller/rename";, 
accessible.getHostText(), accessible.getPort()));
-            UsernamePasswordCredentials credentials = new 
UsernamePasswordCredentials(getConfig(COUCHBASE_ADMIN_USERNAME), 
getConfig(COUCHBASE_ADMIN_PASSWORD));
-            HttpToolResponse response = HttpTool.httpPost(
-                    // the uri is required by the HttpClientBuilder in order 
to set the AuthScope of the credentials
-                    
HttpTool.httpClientBuilder().uri(apiUri).credentials(credentials).build(),
-                    apiUri,
-                    MutableMap.of(
-                            HttpHeaders.CONTENT_TYPE, 
MediaType.FORM_DATA.toString(),
-                            HttpHeaders.ACCEPT, "*/*",
-                            // this appears needed; without it we get 
org.apache.http.NoHttpResponseException !?
-                            HttpHeaders.AUTHORIZATION, 
HttpTool.toBasicAuthorizationValue(credentials)),
-                    
Charsets.UTF_8.encode("hostname="+Urls.encode(accessible.getHostText())).array());
-            log.debug("Renamed Couchbase server "+this+" via "+apiUri+": 
"+response);
-            if (!HttpTool.isStatusCodeHealthy(response.getResponseCode())) {
-                log.warn("Invalid response code, renaming "+apiUri+": 
"+response);
-            }
-        } catch (Exception e) {
-            Exceptions.propagateIfFatal(e);
-            log.warn("Error renaming server, using "+apiUri+": "+e, e);
-        }
-    }
-
-    public void connectSensors() {
-        super.connectSensors();
-        connectServiceUpIsRunning();
-
-        HostAndPort hostAndPort = 
BrooklynAccessUtils.getBrooklynAccessibleAddress(this, 
this.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT));
-        httpFeed = HttpFeed.builder()
-            .entity(this)
-            .period(Duration.seconds(3))
-            .baseUri("http://"; + hostAndPort + "/pools/nodes/")
-            
.credentialsIfNotNull(getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME), 
getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD))
-            .poll(getSensorFromNodeStat(CouchbaseNode.OPS, "ops"))
-            .poll(getSensorFromNodeStat(CouchbaseNode.COUCH_DOCS_DATA_SIZE, 
"couch_docs_data_size"))
-            
.poll(getSensorFromNodeStat(CouchbaseNode.COUCH_DOCS_ACTUAL_DISK_SIZE, 
"couch_docs_actual_disk_size"))
-            .poll(getSensorFromNodeStat(CouchbaseNode.EP_BG_FETCHED, 
"ep_bg_fetched"))
-            .poll(getSensorFromNodeStat(CouchbaseNode.MEM_USED, "mem_used"))
-            
.poll(getSensorFromNodeStat(CouchbaseNode.COUCH_VIEWS_ACTUAL_DISK_SIZE, 
"couch_views_actual_disk_size"))
-            .poll(getSensorFromNodeStat(CouchbaseNode.CURR_ITEMS, 
"curr_items"))
-            .poll(getSensorFromNodeStat(CouchbaseNode.VB_REPLICA_CURR_ITEMS, 
"vb_replica_curr_items"))
-            .poll(getSensorFromNodeStat(CouchbaseNode.COUCH_VIEWS_DATA_SIZE, 
"couch_views_data_size"))
-            .poll(getSensorFromNodeStat(CouchbaseNode.GET_HITS, "get_hits"))
-            .poll(getSensorFromNodeStat(CouchbaseNode.CMD_GET, "cmd_get"))
-            .poll(getSensorFromNodeStat(CouchbaseNode.CURR_ITEMS_TOT, 
"curr_items_tot"))
-            .poll(new HttpPollConfig<String>(CouchbaseNode.REBALANCE_STATUS)
-                    
.onSuccess(HttpValueFunctions.jsonContents("rebalanceStatus", String.class))
-                    .onFailureOrException(Functions.constant("Could not 
retrieve")))
-            .build();
-    }
-
-    public void disconnectSensors() {
-        super.disconnectSensors();
-        disconnectServiceUpIsRunning();
-        if (httpFeed != null) {
-            httpFeed.stop();
-        }
-    }
-
-    @Override
-    public void bucketCreate(String bucketName, String bucketType, Integer 
bucketPort, Integer bucketRamSize, Integer bucketReplica) {
-        if (Strings.isBlank(bucketType)) bucketType = "couchbase";
-        if (bucketRamSize==null || bucketRamSize<=0) bucketRamSize = 200;
-        if (bucketReplica==null || bucketReplica<0) bucketReplica = 1;
-
-        getDriver().bucketCreate(bucketName, bucketType, bucketPort, 
bucketRamSize, bucketReplica);
-    }
-
-    /** exposed through {@link CouchbaseNode#ADD_REPLICATION_RULE} */
-    protected void addReplicationRule(ConfigBag ruleArgs) {
-        Object toClusterO = 
Preconditions.checkNotNull(ruleArgs.getStringKey("toCluster"), "toCluster must 
not be null");
-        if (toClusterO instanceof String) {
-            toClusterO = getManagementContext().lookup((String)toClusterO);
-        }
-        Entity toCluster = Tasks.resolving(toClusterO, 
Entity.class).context(getExecutionContext()).get();
-
-        String fromBucket = Preconditions.checkNotNull( 
(String)ruleArgs.getStringKey("fromBucket"), "fromBucket must be specified" );
-
-        String toBucket = (String)ruleArgs.getStringKey("toBucket");
-        if (toBucket==null) toBucket = fromBucket;
-
-        if (!ruleArgs.getUnusedConfig().isEmpty()) {
-            throw new IllegalArgumentException("Unsupported replication rule 
data: "+ruleArgs.getUnusedConfig());
-        }
-
-        getDriver().addReplicationRule(toCluster, fromBucket, toBucket);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java
 
b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java
deleted file mode 100644
index 6dd97d6..0000000
--- 
a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java
+++ /dev/null
@@ -1,512 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.nosql.couchbase;
-
-import static brooklyn.util.ssh.BashCommands.*;
-import static java.lang.String.format;
-
-import java.net.URI;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.Callable;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import org.apache.http.auth.UsernamePasswordCredentials;
-
-import brooklyn.entity.Entity;
-import brooklyn.entity.Group;
-import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver;
-import brooklyn.entity.basic.Attributes;
-import brooklyn.entity.basic.Entities;
-import brooklyn.entity.basic.ServiceStateLogic;
-import brooklyn.entity.drivers.downloads.BasicDownloadRequirement;
-import brooklyn.entity.drivers.downloads.DownloadProducerFromUrlAttribute;
-import brooklyn.entity.software.SshEffectorTasks;
-import brooklyn.event.basic.DependentConfiguration;
-import brooklyn.event.feed.http.HttpValueFunctions;
-import brooklyn.location.OsDetails;
-import brooklyn.location.access.BrooklynAccessUtils;
-import brooklyn.location.basic.SshMachineLocation;
-import brooklyn.management.Task;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.http.HttpTool;
-import brooklyn.util.http.HttpToolResponse;
-import brooklyn.util.repeat.Repeater;
-import brooklyn.util.ssh.BashCommands;
-import brooklyn.util.task.DynamicTasks;
-import brooklyn.util.task.TaskBuilder;
-import brooklyn.util.task.TaskTags;
-import brooklyn.util.task.Tasks;
-import brooklyn.util.text.NaturalOrderComparator;
-import brooklyn.util.text.StringEscapes.BashStringEscapes;
-import brooklyn.util.text.Strings;
-import brooklyn.util.time.Duration;
-
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.net.HostAndPort;
-
-public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver 
implements CouchbaseNodeDriver {
-
-    public CouchbaseNodeSshDriver(final CouchbaseNodeImpl entity, final 
SshMachineLocation machine) {
-        super(entity, machine);
-    }
-
-    public static String couchbaseCli(String cmd) {
-        return "/opt/couchbase/bin/couchbase-cli " + cmd + " ";
-    }
-
-    @Override
-    public void preInstall() {
-        resolver = Entities.newDownloader(this);
-        setExpandedInstallDir(getInstallDir());
-    }
-
-    @Override
-    public void install() {
-        //for reference 
https://github.com/urbandecoder/couchbase/blob/master/recipes/server.rb
-        //installation instructions 
(http://docs.couchbase.com/couchbase-manual-2.5/cb-install/#preparing-to-install)
-
-        List<String> urls = resolver.getTargets();
-        String saveAs = resolver.getFilename();
-
-        OsDetails osDetails = getMachine().getMachineDetails().getOsDetails();
-
-        if (osDetails.isLinux()) {
-            List<String> commands = installLinux(urls, saveAs);
-            //FIXME installation return error but the server is up and running.
-            newScript(INSTALLING)
-                    .body.append(commands).execute();
-        } else {
-            Tasks.markInessential();
-            throw new IllegalStateException("Unsupported OS for installing 
Couchbase. Will continue but may fail later.");
-        }
-    }
-
-    private List<String> installLinux(List<String> urls, String saveAs) {
-
-        log.info("Installing " + getEntity() + " using couchbase-server-{} 
{}", getCommunityOrEnterprise(), getVersion());
-
-        String apt = chainGroup(
-                installPackage(MutableMap.of("apt", "python-httplib2 
libssl0.9.8"), null),
-                sudo(format("dpkg -i %s", saveAs)));
-
-        String yum = chainGroup(
-                "which yum",
-                // The following prevents failure on RHEL AWS nodes:
-                // https://forums.aws.amazon.com/thread.jspa?threadID=100509
-                ok(sudo("sed -i.bk s/^enabled=1$/enabled=0/ 
/etc/yum/pluginconf.d/subscription-manager.conf")),
-                ok(sudo("yum check-update")),
-                sudo("yum install -y pkgconfig"),
-                // RHEL requires openssl version 098
-                sudo("[ -f /etc/redhat-release ] && (grep -i \"red hat\" 
/etc/redhat-release && sudo yum install -y openssl098e) || :"),
-                sudo(format("rpm --install %s", saveAs)));
-
-        String link = new DownloadProducerFromUrlAttribute().apply(new 
BasicDownloadRequirement(this)).getPrimaryLocations().iterator().next();
-        return ImmutableList.<String>builder()
-                .add(INSTALL_CURL)
-                .addAll(Arrays.asList(INSTALL_CURL,
-                        
BashCommands.require(BashCommands.alternatives(BashCommands.simpleDownloadUrlAs(urls,
 saveAs),
-                                        // Referer link is required for 3.0.0; 
note mis-spelling is correct, as per http://en.wikipedia.org/wiki/HTTP_referer
-                                        "curl -f -L -k " + 
BashStringEscapes.wrapBash(link)
-                                                + " -H 'Referer: 
http://www.couchbase.com/downloads'"
-                                                + " -o " + saveAs),
-                                "Could not retrieve " + saveAs + " (from " + 
urls.size() + " sites)", 9)))
-                .add(alternatives(apt, yum))
-                .build();
-    }
-
-    @Override
-    public void customize() {
-        //TODO: add linux tweaks for couchbase
-        //http://blog.couchbase.com/often-overlooked-linux-os-tweaks
-        //http://blog.couchbase.com/kirk
-
-        //turn off swappiness
-        //vm.swappiness=0
-        //sudo echo 0 > /proc/sys/vm/swappiness
-
-        //os page cache = 20%
-
-        //disable THP
-        //sudo echo never > /sys/kernel/mm/transparent_hugepage/enabled
-        //sudo echo never > /sys/kernel/mm/transparent_hugepage/defrag
-
-        //turn off transparent huge pages
-        //limit page cache disty bytes
-        //control the rate page cache is flused ... vm.dirty_*
-    }
-
-    @Override
-    public void launch() {
-        String clusterPrefix = "--cluster-" + (isPreV3() ? "init-" : "");
-        // in v30, the cluster arguments were changed, and it became mandatory 
to supply a url + password (if there is none, these are ignored)
-        newScript(LAUNCHING)
-                .body.append(
-                sudo("/etc/init.d/couchbase-server start"),
-                "for i in {0..120}\n" +
-                        "do\n" +
-                        "    if [ $i -eq 120 ]; then echo REST API unavailable 
after 120 seconds, failing; exit 1; fi;\n" +
-                        "    curl -s " + String.format("http://localhost:%s";, 
getWebPort()) + " > /dev/null && echo REST API available after $i seconds && 
break\n" +
-                        "    sleep 1\n" +
-                        "done\n" +
-                        couchbaseCli("cluster-init") +
-                        (isPreV3() ? getCouchbaseHostnameAndPort() : 
getCouchbaseHostnameAndCredentials()) +
-                        " " + clusterPrefix + "username=" + getUsername() +
-                        " " + clusterPrefix + "password=" + getPassword() +
-                        " " + clusterPrefix + "port=" + getWebPort() +
-                        " " + clusterPrefix + "ramsize=" + 
getClusterInitRamSize())
-                .execute();
-    }
-
-    @Override
-    public boolean isRunning() {
-        //TODO add a better way to check if couchbase server is running
-        return (newScript(CHECK_RUNNING)
-                .body.append(format("curl -u %s:%s 
http://localhost:%s/pools/nodes";, getUsername(), getPassword(), getWebPort()))
-                .execute() == 0);
-    }
-
-    @Override
-    public void stop() {
-        newScript(STOPPING)
-                .body.append(sudo("/etc/init.d/couchbase-server stop"))
-                .execute();
-    }
-
-    @Override
-    public String getVersion() {
-        return entity.getConfig(CouchbaseNode.SUGGESTED_VERSION);
-    }
-
-    @Override
-    public String getOsTag() {
-        return newDownloadLinkSegmentComputer().getOsTag();
-    }
-
-    protected DownloadLinkSegmentComputer newDownloadLinkSegmentComputer() {
-        return new DownloadLinkSegmentComputer(getLocation().getOsDetails(), 
!isPreV3(), Strings.toString(getEntity()));
-    }
-
-    public static class DownloadLinkSegmentComputer {
-        // links are:
-        // 
http://packages.couchbase.com/releases/2.2.0/couchbase-server-community_2.2.0_x86_64.rpm
-        // 
http://packages.couchbase.com/releases/2.2.0/couchbase-server-community_2.2.0_x86_64.deb
-        // ^^^ preV3 is _ everywhere
-        // 
http://packages.couchbase.com/releases/3.0.0/couchbase-server-community_3.0.0-ubuntu12.04_amd64.deb
-        // ^^^ most V3 is _${version}-
-        // 
http://packages.couchbase.com/releases/3.0.0/couchbase-server-community-3.0.0-centos6.x86_64.rpm
-        // ^^^ but RHEL is -${version}-
-
-        @Nullable
-        private final OsDetails os;
-        @Nonnull
-        private final boolean isV3OrLater;
-        @Nonnull
-        private final String context;
-        @Nonnull
-        private final String osName;
-        @Nonnull
-        private final boolean isRpm;
-        @Nonnull
-        private final boolean is64bit;
-
-        public DownloadLinkSegmentComputer(@Nullable OsDetails os, boolean 
isV3OrLater, @Nonnull String context) {
-            this.os = os;
-            this.isV3OrLater = isV3OrLater;
-            this.context = context;
-            if (os == null) {
-                // guess centos as RPM is sensible default
-                log.warn("No details known for OS of " + context + "; assuming 
64-bit RPM distribution of Couchbase");
-                osName = "centos";
-                isRpm = true;
-                is64bit = true;
-                return;
-            }
-            osName = os.getName().toLowerCase();
-            isRpm = !(osName.contains("deb") || osName.contains("ubuntu"));
-            is64bit = os.is64bit();
-        }
-
-        /**
-         * separator after the version number used to be _ but is - in 3.0 and 
later
-         */
-        public String getPreVersionSeparator() {
-            if (!isV3OrLater) return "_";
-            if (isRpm) return "-";
-            return "_";
-        }
-
-        public String getOsTag() {
-            // couchbase only provide certain versions; if on other platforms 
let's suck-it-and-see
-            String family;
-            if (osName.contains("debian")) family = "debian7_";
-            else if (osName.contains("ubuntu")) family = "ubuntu12.04_";
-            else if (osName.contains("centos") || osName.contains("rhel") || 
(osName.contains("red") && osName.contains("hat")))
-                family = "centos6.";
-            else {
-                log.warn("Unrecognised OS " + os + " of " + context + "; 
assuming RPM distribution of Couchbase");
-                family = "centos6.";
-            }
-
-            if (!is64bit && !isV3OrLater) {
-                // NB: 32-bit binaries aren't (yet?) available for v30
-                log.warn("32-bit binaries for Couchbase might not be 
available, when deploying " + context);
-            }
-            String arch = !is64bit ? "x86" : !isRpm && isV3OrLater ? "amd64" : 
"x86_64";
-            String fileExtension = isRpm ? ".rpm" : ".deb";
-
-            if (isV3OrLater)
-                return family + arch + fileExtension;
-            else
-                return arch + fileExtension;
-        }
-
-        public String getOsTagWithPrefix() {
-            return (!isV3OrLater ? "_" : "-") + getOsTag();
-        }
-    }
-
-    @Override
-    public String getDownloadLinkOsTagWithPrefix() {
-        return newDownloadLinkSegmentComputer().getOsTagWithPrefix();
-    }
-
-    @Override
-    public String getDownloadLinkPreVersionSeparator() {
-        return newDownloadLinkSegmentComputer().getPreVersionSeparator();
-    }
-
-    private boolean isPreV3() {
-        return 
NaturalOrderComparator.INSTANCE.compare(getEntity().getConfig(CouchbaseNode.SUGGESTED_VERSION),
 "3.0") < 0;
-    }
-
-    @Override
-    public String getCommunityOrEnterprise() {
-        Boolean isEnterprise = 
getEntity().getConfig(CouchbaseNode.USE_ENTERPRISE);
-        return isEnterprise ? "enterprise" : "community";
-    }
-
-    private String getUsername() {
-        return entity.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME);
-    }
-
-    private String getPassword() {
-        return entity.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD);
-    }
-
-    private String getWebPort() {
-        return "" + 
entity.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT);
-    }
-
-    private String getCouchbaseHostnameAndCredentials() {
-        return format("-c %s:%s -u %s -p %s", getSubnetHostname(), 
getWebPort(), getUsername(), getPassword());
-    }
-
-    private String getCouchbaseHostnameAndPort() {
-        return format("-c %s:%s", getSubnetHostname(), getWebPort());
-    }
-
-    private String getClusterInitRamSize() {
-        return 
entity.getConfig(CouchbaseNode.COUCHBASE_CLUSTER_INIT_RAM_SIZE).toString();
-    }
-
-    @Override
-    public void rebalance() {
-        entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "explicitly 
started");
-        newScript("rebalance")
-                .body.append(
-                couchbaseCli("rebalance") + 
getCouchbaseHostnameAndCredentials())
-                .failOnNonZeroResultCode()
-                .execute();
-
-        // wait until the re-balance is started
-        // (if it's quick, this might miss it, but it will only block for 30s 
if so)
-        Repeater.create()
-                .backoff(Repeater.DEFAULT_REAL_QUICK_PERIOD, 2, 
Duration.millis(500))
-                .limitTimeTo(Duration.THIRTY_SECONDS)
-                .until(new Callable<Boolean>() {
-                           @Override
-                           public Boolean call() throws Exception {
-                               for (HostAndPort nodeHostAndPort : 
getNodesHostAndPort()) {
-                                   if 
(isNodeRebalancing(nodeHostAndPort.toString())) {
-                                       return true;
-                                   }
-                               }
-                               return false;
-                           }
-                       }
-                ).run();
-
-        entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "waiting for 
completion");
-        // Wait until the Couchbase node finishes the re-balancing
-        Task<Boolean> reBalance = TaskBuilder.<Boolean>builder()
-                .name("Waiting until node is rebalancing")
-                .body(new Callable<Boolean>() {
-                    @Override
-                    public Boolean call() throws Exception {
-                        return Repeater.create()
-                                .backoff(Duration.ONE_SECOND, 1.2, 
Duration.TEN_SECONDS)
-                                .limitTimeTo(Duration.FIVE_MINUTES)
-                                .until(new Callable<Boolean>() {
-                                    @Override
-                                    public Boolean call() throws Exception {
-                                        for (HostAndPort nodeHostAndPort : 
getNodesHostAndPort()) {
-                                            if 
(isNodeRebalancing(nodeHostAndPort.toString())) {
-                                                return false;
-                                            }
-                                        }
-                                        return true;
-                                    }
-                                })
-                                .run();
-                        }
-                })
-                .build();
-        Boolean completed = DynamicTasks.queueIfPossible(reBalance)
-                .orSubmitAndBlock()
-                .andWaitForSuccess();
-        if (completed) {
-            entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "completed");
-            
ServiceStateLogic.ServiceNotUpLogic.clearNotUpIndicator(getEntity(), 
"rebalancing");
-            log.info("Rebalanced cluster via primary node {}", getEntity());
-        } else {
-            entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "timed out");
-            
ServiceStateLogic.ServiceNotUpLogic.updateNotUpIndicator(getEntity(), 
"rebalancing", "rebalance did not complete within time limit");
-            log.warn("Timeout rebalancing cluster via primary node {}", 
getEntity());
-        }
-    }
-
-    private Iterable<HostAndPort> getNodesHostAndPort() {
-        Group group = Iterables.getFirst(getEntity().getGroups(), null);
-        if (group == null) return Lists.newArrayList();
-        return 
Iterables.transform(group.getAttribute(CouchbaseCluster.COUCHBASE_CLUSTER_UP_NODES),
-                new Function<Entity, HostAndPort>() {
-                    @Override
-                    public HostAndPort apply(Entity input) {
-                        return 
BrooklynAccessUtils.getBrooklynAccessibleAddress(input, 
input.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT));
-                    }
-                });
-    }
-
-    private boolean isNodeRebalancing(String nodeHostAndPort) {
-        HttpToolResponse response = getApiResponse("http://"; + nodeHostAndPort 
+ "/pools/default/rebalanceProgress");
-        if (response.getResponseCode() != 200) {
-            throw new IllegalStateException("failed retrieving rebalance 
status: " + response);
-        }
-        return !"none".equals(HttpValueFunctions.jsonContents("status", 
String.class).apply(response));
-    }
-
-    private HttpToolResponse getApiResponse(String uri) {
-        return HttpTool.httpGet(HttpTool.httpClientBuilder()
-                        // the uri is required by the HttpClientBuilder in 
order to set the AuthScope of the credentials
-                        .uri(uri)
-                        .credentials(new 
UsernamePasswordCredentials(getUsername(), getPassword()))
-                        .build(),
-                URI.create(uri),
-                ImmutableMap.<String, String>of());
-    }
-
-    @Override
-    public void serverAdd(String serverToAdd, String username, String 
password) {
-        newScript("serverAdd").body.append(couchbaseCli("server-add")
-                + getCouchbaseHostnameAndCredentials() +
-                " --server-add=" + BashStringEscapes.wrapBash(serverToAdd) +
-                " --server-add-username=" + 
BashStringEscapes.wrapBash(username) +
-                " --server-add-password=" + 
BashStringEscapes.wrapBash(password))
-                .failOnNonZeroResultCode()
-                .execute();
-    }
-
-    @Override
-    public void serverAddAndRebalance(String serverToAdd, String username, 
String password) {
-        
newScript("serverAddAndRebalance").body.append(couchbaseCli("rebalance")
-                + getCouchbaseHostnameAndCredentials() +
-                " --server-add=" + BashStringEscapes.wrapBash(serverToAdd) +
-                " --server-add-username=" + 
BashStringEscapes.wrapBash(username) +
-                " --server-add-password=" + 
BashStringEscapes.wrapBash(password))
-                .failOnNonZeroResultCode()
-                .execute();
-        entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "triggered as part 
of server-add");
-    }
-
-    @Override
-    public void bucketCreate(String bucketName, String bucketType, Integer 
bucketPort, Integer bucketRamSize, Integer bucketReplica) {
-        log.info("Adding bucket: {} to cluster {} primary node: {}", new 
Object[]{bucketName, CouchbaseClusterImpl.getClusterOrNode(getEntity()), 
getEntity()});
-
-        newScript("bucketCreate").body.append(couchbaseCli("bucket-create")
-                + getCouchbaseHostnameAndCredentials() +
-                " --bucket=" + BashStringEscapes.wrapBash(bucketName) +
-                " --bucket-type=" + BashStringEscapes.wrapBash(bucketType) +
-                " --bucket-port=" + bucketPort +
-                " --bucket-ramsize=" + bucketRamSize +
-                " --bucket-replica=" + bucketReplica)
-                .failOnNonZeroResultCode()
-                .execute();
-    }
-
-    @Override
-    public void addReplicationRule(Entity toCluster, String fromBucket, String 
toBucket) {
-        
DynamicTasks.queue(DependentConfiguration.attributeWhenReady(toCluster, 
Attributes.SERVICE_UP)).getUnchecked();
-
-        String destName = CouchbaseClusterImpl.getClusterName(toCluster);
-
-        log.info("Setting up XDCR for " + fromBucket + " from " + 
CouchbaseClusterImpl.getClusterName(getEntity()) + " (via " + getEntity() + ") "
-                + "to " + destName + " (" + toCluster + ")");
-
-        Entity destPrimaryNode = 
toCluster.getAttribute(CouchbaseCluster.COUCHBASE_PRIMARY_NODE);
-        String destHostname = 
destPrimaryNode.getAttribute(Attributes.HOSTNAME);
-        String destUsername = 
toCluster.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME);
-        String destPassword = 
toCluster.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD);
-
-        // on the REST API there is mention of a 'type' 'continuous' but i 
don't see other refs to this
-
-        // PROTOCOL   Select REST protocol or memcached for replication. xmem 
indicates memcached while capi indicates REST protocol.
-        // looks like xmem is the default; leave off for now
-//        String replMode = "xmem";
-
-        DynamicTasks.queue(TaskTags.markInessential(SshEffectorTasks.ssh(
-                couchbaseCli("xdcr-setup") +
-                        getCouchbaseHostnameAndCredentials() +
-                        " --create" +
-                        " --xdcr-cluster-name=" + 
BashStringEscapes.wrapBash(destName) +
-                        " --xdcr-hostname=" + 
BashStringEscapes.wrapBash(destHostname) +
-                        " --xdcr-username=" + 
BashStringEscapes.wrapBash(destUsername) +
-                        " --xdcr-password=" + 
BashStringEscapes.wrapBash(destPassword)
-        ).summary("create xdcr destination " + destName).newTask()));
-
-        // would be nice to auto-create bucket, but we'll need to know the 
parameters; the port in particular is tedious
-//        ((CouchbaseNode)destPrimaryNode).bucketCreate(toBucket, "couchbase", 
null, 0, 0);
-
-        DynamicTasks.queue(SshEffectorTasks.ssh(
-                couchbaseCli("xdcr-replicate") +
-                        getCouchbaseHostnameAndCredentials() +
-                        " --create" +
-                        " --xdcr-cluster-name=" + 
BashStringEscapes.wrapBash(destName) +
-                        " --xdcr-from-bucket=" + 
BashStringEscapes.wrapBash(fromBucket) +
-                        " --xdcr-to-bucket=" + 
BashStringEscapes.wrapBash(toBucket)
-//            + " --xdcr-replication-mode="+replMode
-        ).summary("configure replication for " + fromBucket + " to " + 
destName + ":" + toBucket).newTask());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java
 
b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java
deleted file mode 100644
index c0740ee..0000000
--- 
a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.nosql.couchbase;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.Entity;
-import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.entity.basic.SoftwareProcess;
-import brooklyn.entity.proxying.ImplementedBy;
-import brooklyn.event.AttributeSensor;
-import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
-import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
-import brooklyn.event.basic.Sensors;
-import brooklyn.util.flags.SetFromFlag;
-
-@ImplementedBy(CouchbaseSyncGatewayImpl.class)
-public interface CouchbaseSyncGateway extends SoftwareProcess {
-
-    @SetFromFlag("version")
-    ConfigKey<String> SUGGESTED_VERSION = 
ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION,
-            "1.0-beta3.1");
-
-    @SetFromFlag("downloadUrl")
-    BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new 
BasicAttributeSensorAndConfigKey<String>(
-            SoftwareProcess.DOWNLOAD_URL, 
"http://packages.couchbase.com/releases/couchbase-sync-gateway/1.0-beta/couchbase-sync-gateway-community_${version}_${driver.osTag}";);
-    
-    @SetFromFlag("couchbaseServer")
-    ConfigKey<Entity> COUCHBASE_SERVER = ConfigKeys.newConfigKey(Entity.class, 
"couchbaseSyncGateway.couchbaseNode", 
-            "Couchbase server node or cluster the sync gateway connects to");
-
-    @SetFromFlag("serverPool")
-    ConfigKey<String> COUCHBASE_SERVER_POOL = 
ConfigKeys.newStringConfigKey("couchbaseSyncGateway.serverPool", 
-            "Couchbase Server pool name in which to find buckets", "default");
-    
-    @SetFromFlag("couchbaseServerBucket")
-    ConfigKey<String> COUCHBASE_SERVER_BUCKET = 
ConfigKeys.newStringConfigKey("couchbaseSyncGateway.serverBucket", 
-            "Name of the Couchbase bucket to use", "sync_gateway");
-
-    @SetFromFlag("pretty")
-    ConfigKey<Boolean> PRETTY = 
ConfigKeys.newBooleanConfigKey("couchbaseSyncGateway.pretty", 
-            "Pretty-print JSON responses. This is useful for debugging, but 
reduces performance.", false);
-
-    @SetFromFlag("verbose")
-    ConfigKey<Boolean> VERBOSE = 
ConfigKeys.newBooleanConfigKey("couchbaseSyncGateway.verbose", 
-            "Logs more information about requests.", false);
-
-    AttributeSensor<String> COUCHBASE_SERVER_WEB_URL = 
Sensors.newStringSensor("couchbaseSyncGateway.serverWebUrl", 
-            "The Url and web port of the couchbase server to connect to");
-    
-    AttributeSensor<String> MANAGEMENT_URL = 
Sensors.newStringSensor("coucbaseSyncGateway.managementUrl", 
-            "Management URL for Couchbase Sycn Gateway");
-
-    PortAttributeSensorAndConfigKey SYNC_REST_API_PORT = new 
PortAttributeSensorAndConfigKey("couchbaseSyncGateway.syncRestPort", 
-            "Port the Sync REST API listens on", "4984");
-    
-    PortAttributeSensorAndConfigKey ADMIN_REST_API_PORT = new 
PortAttributeSensorAndConfigKey("couchbaseSyncGateway.adminRestPort", 
-            "Port the Admin REST API listens on", "4985");
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayDriver.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayDriver.java
 
b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayDriver.java
deleted file mode 100644
index 148ec0b..0000000
--- 
a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayDriver.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.nosql.couchbase;
-
-import brooklyn.entity.basic.SoftwareProcessDriver;
-
-public interface CouchbaseSyncGatewayDriver extends SoftwareProcessDriver {
-
-    public String getOsTag();
-    
-}
\ No newline at end of file


Reply via email to