http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
 
b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
new file mode 100644
index 0000000..f279987
--- /dev/null
+++ 
b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.couchbase;
+
+import static brooklyn.util.JavaGroovyEquivalents.groovyTruth;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nonnull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.render.RendererHints;
+import brooklyn.enricher.Enrichers;
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.basic.ServiceStateLogic;
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.effector.Effectors;
+import brooklyn.entity.group.AbstractMembershipTrackingPolicy;
+import brooklyn.entity.group.DynamicClusterImpl;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.trait.Startable;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.DependentConfiguration;
+import brooklyn.event.feed.http.HttpFeed;
+import brooklyn.event.feed.http.HttpPollConfig;
+import brooklyn.event.feed.http.HttpValueFunctions;
+import brooklyn.event.feed.http.JsonFunctions;
+import brooklyn.location.access.BrooklynAccessUtils;
+import brooklyn.policy.PolicySpec;
+import brooklyn.util.collections.CollectionFunctionals;
+import brooklyn.util.collections.MutableSet;
+import brooklyn.util.collections.QuorumCheck;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.guava.Functionals;
+import brooklyn.util.guava.IfFunctions;
+import brooklyn.util.math.MathPredicates;
+import brooklyn.util.task.DynamicTasks;
+import brooklyn.util.task.TaskBuilder;
+import brooklyn.util.task.Tasks;
+import brooklyn.util.text.ByteSizeStrings;
+import brooklyn.util.text.StringFunctions;
+import brooklyn.util.text.Strings;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.net.HostAndPort;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+
+public class CouchbaseClusterImpl extends DynamicClusterImpl implements 
CouchbaseCluster {
+    
+    /*
+     * Refactoring required:
+     * 
+     * Currently, on start() the cluster waits for an arbitrary 
SERVICE_UP_TIME_OUT (3 minutes) before assuming that a quorate 
+     * number of servers are available. The servers are then added to the 
cluster, and a further wait period of  
+     * DELAY_BEFORE_ADVERTISING_CLUSTER (30 seconds) is used before 
advertising the cluster
+     * 
+     * DELAY_BEFORE_ADVERTISING_CLUSTER: It should be possible to refactor 
this away by adding a repeater that will poll
+     * the REST API of the primary node (once established) until the API 
indicates that the cluster is available
+     * 
+     * SERVICE_UP_TIME_OUT: The refactoring of this would be more substantial. 
One method would be to remove the bulk of the 
+     * logic from the start() method, and rely entirely on the membership 
tracking policy and the onServerPoolMemberChanged()
+     * method. The addition of a RUNNING sensor on the nodes would allow the 
cluster to determine that a node is up and
+     * running but has not yet been added to the cluster. The 
IS_CLUSTER_INITIALIZED key could be used to determine whether
+     * or not the cluster should be initialized, or a node simply added to an 
existing cluster. A repeater could be used
+     * in the driver's to ensure that the method does not return until the 
node has been fully added
+     * 
+     * There is an (incomplete) first-pass at this here: 
https://github.com/Nakomis/incubator-brooklyn/compare/couchbase-running-sensor
+     * however, there have been significant changes to the cluster 
initialization since that work was done so it will probably
+     * need to be re-done
+     * 
+     * Additionally, during bucket creation, a HttpPoll is used to check that 
the bucket has been created. This should be 
+     * refactored to use a Repeater in CouchbaseNodeSshDriver.bucketCreate() 
in a similar way to the one employed in
+     * CouchbaseNodeSshDriver.rebalance(). Were this done, this class could 
simply queue the bucket creation tasks
+     * 
+     */
+    
+    private static final Logger log = 
LoggerFactory.getLogger(CouchbaseClusterImpl.class);
+    private final Object mutex = new Object[0];
+    // Used to serialize bucket creation as only one bucket can be created at 
a time,
+    // so a feed is used to determine when a bucket has finished being created
+    private final AtomicReference<HttpFeed> resetBucketCreation = new 
AtomicReference<HttpFeed>();
+
+    public void init() {
+        log.info("Initializing the Couchbase cluster...");
+        super.init();
+        
+        addEnricher(
+            Enrichers.builder()
+                .transforming(COUCHBASE_CLUSTER_UP_NODES)
+                .from(this)
+                .publishing(COUCHBASE_CLUSTER_UP_NODE_ADDRESSES)
+                .computing(new ListOfHostAndPort()).build() );
+        addEnricher(
+            Enrichers.builder()
+                .transforming(COUCHBASE_CLUSTER_UP_NODE_ADDRESSES)
+                .from(this)
+                .publishing(COUCHBASE_CLUSTER_CONNECTION_URL)
+                .computing(
+                    IfFunctions.<List<String>>ifPredicate(
+                        
Predicates.compose(MathPredicates.lessThan(getConfig(CouchbaseCluster.INITIAL_QUORUM_SIZE)),
 
+                            CollectionFunctionals.sizeFunction(0)) )
+                    .value((String)null)
+                    .defaultApply(
+                        Functionals.chain(
+                            
CollectionFunctionals.<String,List<String>>limit(4), 
+                            StringFunctions.joiner(","),
+                            StringFunctions.formatter("http://%s/";))) )
+                .build() );
+        
+        Map<? extends AttributeSensor<? extends Number>, ? extends 
AttributeSensor<? extends Number>> enricherSetup = 
+            ImmutableMap.<AttributeSensor<? extends Number>, AttributeSensor<? 
extends Number>>builder()
+                .put(CouchbaseNode.OPS, CouchbaseCluster.OPS_PER_NODE)
+                .put(CouchbaseNode.COUCH_DOCS_DATA_SIZE, 
CouchbaseCluster.COUCH_DOCS_DATA_SIZE_PER_NODE)
+                .put(CouchbaseNode.COUCH_DOCS_ACTUAL_DISK_SIZE, 
CouchbaseCluster.COUCH_DOCS_ACTUAL_DISK_SIZE_PER_NODE)
+                .put(CouchbaseNode.EP_BG_FETCHED, 
CouchbaseCluster.EP_BG_FETCHED_PER_NODE)
+                .put(CouchbaseNode.MEM_USED, 
CouchbaseCluster.MEM_USED_PER_NODE)
+                .put(CouchbaseNode.COUCH_VIEWS_ACTUAL_DISK_SIZE, 
CouchbaseCluster.COUCH_VIEWS_ACTUAL_DISK_SIZE_PER_NODE)
+                .put(CouchbaseNode.CURR_ITEMS, 
CouchbaseCluster.CURR_ITEMS_PER_NODE)
+                .put(CouchbaseNode.VB_REPLICA_CURR_ITEMS, 
CouchbaseCluster.VB_REPLICA_CURR_ITEMS_PER_NODE)
+                .put(CouchbaseNode.COUCH_VIEWS_DATA_SIZE, 
CouchbaseCluster.COUCH_VIEWS_DATA_SIZE_PER_NODE)
+                .put(CouchbaseNode.GET_HITS, 
CouchbaseCluster.GET_HITS_PER_NODE)
+                .put(CouchbaseNode.CMD_GET, CouchbaseCluster.CMD_GET_PER_NODE)
+                .put(CouchbaseNode.CURR_ITEMS_TOT, 
CouchbaseCluster.CURR_ITEMS_TOT_PER_NODE)
+            .build();
+        
+        for (AttributeSensor<? extends Number> nodeSensor : 
enricherSetup.keySet()) {
+            addSummingMemberEnricher(nodeSensor);
+            addAveragingMemberEnricher(nodeSensor, 
enricherSetup.get(nodeSensor));
+        }
+        
+        
addEnricher(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS)
+            .from(IS_CLUSTER_INITIALIZED).computing(
+                IfFunctions.ifNotEquals(true).value("The cluster is not yet 
completely initialized")
+                    .defaultValue(null).build()).build() );
+    }
+    
+    private void addAveragingMemberEnricher(AttributeSensor<? extends Number> 
fromSensor, AttributeSensor<? extends Number> toSensor) {
+        addEnricher(Enrichers.builder()
+            .aggregating(fromSensor)
+            .publishing(toSensor)
+            .fromMembers()
+            .computingAverage()
+            .build()
+        );
+    }
+
+    private void addSummingMemberEnricher(AttributeSensor<? extends Number> 
source) {
+        addEnricher(Enrichers.builder()
+            .aggregating(source)
+            .publishing(source)
+            .fromMembers()
+            .computingSum()
+            .build()
+        );
+    }
+
+    @Override
+    protected void doStart() {
+        setAttribute(IS_CLUSTER_INITIALIZED, false);
+        
+        super.doStart();
+
+        connectSensors();
+        
+        setAttribute(BUCKET_CREATION_IN_PROGRESS, false);
+
+        //start timeout before adding the servers
+        Tasks.setBlockingDetails("Pausing while Couchbase stabilizes");
+        Time.sleep(getConfig(NODES_STARTED_STABILIZATION_DELAY));
+
+        Optional<Set<Entity>> upNodes = 
Optional.<Set<Entity>>fromNullable(getAttribute(COUCHBASE_CLUSTER_UP_NODES));
+        if (upNodes.isPresent() && !upNodes.get().isEmpty()) {
+
+            Tasks.setBlockingDetails("Adding servers to Couchbase");
+            
+            //TODO: select a new primary node if this one fails
+            Entity primaryNode = upNodes.get().iterator().next();
+            ((EntityInternal) 
primaryNode).setAttribute(CouchbaseNode.IS_PRIMARY_NODE, true);
+            setAttribute(COUCHBASE_PRIMARY_NODE, primaryNode);
+
+            Set<Entity> serversToAdd = MutableSet.<Entity>copyOf(getUpNodes());
+
+            if (serversToAdd.size() >= getQuorumSize() && serversToAdd.size() 
> 1) {
+                log.info("Number of SERVICE_UP nodes:{} in cluster:{} reached 
Quorum:{}, adding the servers", new Object[]{serversToAdd.size(), getId(), 
getQuorumSize()});
+                addServers(serversToAdd);
+
+                //wait for servers to be added to the couchbase server
+                try {
+                    Tasks.setBlockingDetails("Delaying before advertising 
cluster up");
+                    Time.sleep(getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER));
+                } finally {
+                    Tasks.resetBlockingDetails();
+                }
+                
+                ((CouchbaseNode)getPrimaryNode()).rebalance();
+            } else {
+                if (getQuorumSize()>1) {
+                    log.warn(this+" is not quorate; will likely fail later, 
but proceeding for now");
+                }
+                for (Entity server: serversToAdd) {
+                    ((EntityInternal) 
server).setAttribute(CouchbaseNode.IS_IN_CLUSTER, true);
+                }
+            }
+                
+            if (getConfig(CREATE_BUCKETS)!=null) {
+                try {
+                    Tasks.setBlockingDetails("Creating buckets in Couchbase");
+
+                    createBuckets();
+                    DependentConfiguration.waitInTaskForAttributeReady(this, 
CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false));
+
+                } finally {
+                    Tasks.resetBlockingDetails();
+                }
+            }
+
+            if (getConfig(REPLICATION)!=null) {
+                try {
+                    Tasks.setBlockingDetails("Configuring replication rules");
+
+                    List<Map<String, Object>> replRules = 
getConfig(REPLICATION);
+                    for (Map<String, Object> replRule: replRules) {
+                        
DynamicTasks.queue(Effectors.invocation(getPrimaryNode(), 
CouchbaseNode.ADD_REPLICATION_RULE, replRule));
+                    }
+                    DynamicTasks.waitForLast();
+
+                } finally {
+                    Tasks.resetBlockingDetails();
+                }
+            }
+
+            setAttribute(IS_CLUSTER_INITIALIZED, true);
+            
+        } else {
+            throw new IllegalStateException("No up nodes available after 
starting");
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (resetBucketCreation.get() != null) {
+            resetBucketCreation.get().stop();
+        }
+        super.stop();
+    }
+
+    protected void connectSensors() {
+        addPolicy(PolicySpec.create(MemberTrackingPolicy.class)
+                .displayName("Controller targets tracker")
+                .configure("group", this));
+    }
+    
+    private final static class ListOfHostAndPort implements 
Function<Set<Entity>, List<String>> {
+        @Override public List<String> apply(Set<Entity> input) {
+            List<String> addresses = Lists.newArrayList();
+            for (Entity entity : input) {
+                addresses.add(String.format("%s",
+                        
BrooklynAccessUtils.getBrooklynAccessibleAddress(entity, 
entity.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT))));
+            }
+            return addresses;
+        }
+    }
+
+    public static class MemberTrackingPolicy extends 
AbstractMembershipTrackingPolicy {
+        @Override protected void onEntityChange(Entity member) {
+            ((CouchbaseClusterImpl)entity).onServerPoolMemberChanged(member);
+        }
+
+        @Override protected void onEntityAdded(Entity member) {
+            ((CouchbaseClusterImpl)entity).onServerPoolMemberChanged(member);
+        }
+
+        @Override protected void onEntityRemoved(Entity member) {
+            ((CouchbaseClusterImpl)entity).onServerPoolMemberChanged(member);
+        }
+    };
+
+    protected synchronized void onServerPoolMemberChanged(Entity member) {
+        if (log.isTraceEnabled()) log.trace("For {}, considering membership of 
{} which is in locations {}",
+                new Object[]{this, member, member.getLocations()});
+
+        //FIXME: make use of servers to be added after cluster initialization.
+        synchronized (mutex) {
+            if (belongsInServerPool(member)) {
+
+                Optional<Set<Entity>> upNodes = 
Optional.fromNullable(getUpNodes());
+                if (upNodes.isPresent()) {
+
+                    if (!upNodes.get().contains(member)) {
+                        Set<Entity> newNodes = Sets.newHashSet(getUpNodes());
+                        newNodes.add(member);
+                        setAttribute(COUCHBASE_CLUSTER_UP_NODES, newNodes);
+
+                        //add to set of servers to be added.
+                        if (isClusterInitialized()) {
+                            addServer(member);
+                        }
+                    }
+                } else {
+                    Set<Entity> newNodes = Sets.newHashSet();
+                    newNodes.add(member);
+                    setAttribute(COUCHBASE_CLUSTER_UP_NODES, newNodes);
+
+                    if (isClusterInitialized()) {
+                        addServer(member);
+                    }
+                }
+            } else {
+                Set<Entity> upNodes = getUpNodes();
+                if (upNodes != null && upNodes.contains(member)) {
+                    upNodes.remove(member);
+                    setAttribute(COUCHBASE_CLUSTER_UP_NODES, upNodes);
+                    log.info("Removing couchbase node {}: {}; from cluster", 
new Object[]{this, member});
+                }
+            }
+            if (log.isTraceEnabled()) log.trace("Done {} checkEntity {}", 
this, member);
+        }
+    }
+
+    protected boolean belongsInServerPool(Entity member) {
+        if (!groovyTruth(member.getAttribute(Startable.SERVICE_UP))) {
+            if (log.isTraceEnabled()) log.trace("Members of {}, checking {}, 
eliminating because not up", this, member);
+            return false;
+        }
+        if (!getMembers().contains(member)) {
+            if (log.isTraceEnabled())
+                log.trace("Members of {}, checking {}, eliminating because not 
member", this, member);
+
+            return false;
+        }
+        if (log.isTraceEnabled()) log.trace("Members of {}, checking {}, 
approving", this, member);
+
+        return true;
+    }
+
+
+    protected EntitySpec<?> getMemberSpec() {
+        EntitySpec<?> result = super.getMemberSpec();
+        if (result != null) return result;
+        return EntitySpec.create(CouchbaseNode.class);
+    }
+
+    @Override
+    public int getQuorumSize() {
+        Integer quorumSize = getConfig(CouchbaseCluster.INITIAL_QUORUM_SIZE);
+        if (quorumSize != null && quorumSize > 0)
+            return quorumSize;
+        // by default the quorum would be floor(initial_cluster_size/2) + 1
+        return (int) Math.floor(getConfig(INITIAL_SIZE) / 2) + 1;
+    }
+
+    protected int getActualSize() {
+        return 
Optional.fromNullable(getAttribute(CouchbaseCluster.ACTUAL_CLUSTER_SIZE)).or(-1);
+    }
+
+    private Set<Entity> getUpNodes() {
+        return getAttribute(COUCHBASE_CLUSTER_UP_NODES);
+    }
+
+    private CouchbaseNode getPrimaryNode() {
+        return (CouchbaseNode) getAttribute(COUCHBASE_PRIMARY_NODE);
+    }
+
+    @Override
+    protected void initEnrichers() {
+        
addEnricher(Enrichers.builder().updatingMap(ServiceStateLogic.SERVICE_NOT_UP_INDICATORS)
+            .from(COUCHBASE_CLUSTER_UP_NODES)
+            .computing(new Function<Set<Entity>, Object>() {
+                @Override
+                public Object apply(Set<Entity> input) {
+                    if (input==null) return "Couchbase up nodes not set";
+                    if (input.isEmpty()) return "No Couchbase up nodes";
+                    if (input.size() < getQuorumSize()) return "Couchbase up 
nodes not quorate";
+                    return null;
+                }
+            }).build());
+        
+        if (config().getLocalRaw(UP_QUORUM_CHECK).isAbsent()) {
+            // TODO Only leaving CouchbaseQuorumCheck here in case it is 
contained in persisted state.
+            // If so, need a transformer and then to delete it
+            @SuppressWarnings({ "unused", "hiding" })
+            @Deprecated
+            class CouchbaseQuorumCheck implements QuorumCheck {
+                @Override
+                public boolean isQuorate(int sizeHealthy, int totalSize) {
+                    // check members count passed in AND the sensor  
+                    if (sizeHealthy < getQuorumSize()) return false;
+                    return true;
+                }
+            }
+            config().set(UP_QUORUM_CHECK, new 
CouchbaseClusterImpl.CouchbaseQuorumCheck(this));
+        }
+        super.initEnrichers();
+    }
+    
+    static class CouchbaseQuorumCheck implements QuorumCheck {
+        private final CouchbaseCluster cluster;
+        CouchbaseQuorumCheck(CouchbaseCluster cluster) {
+            this.cluster = cluster;
+        }
+        @Override
+        public boolean isQuorate(int sizeHealthy, int totalSize) {
+            // check members count passed in AND the sensor  
+            if (sizeHealthy < cluster.getQuorumSize()) return false;
+            return true;
+        }
+    }
+    protected void addServers(Set<Entity> serversToAdd) {
+        Preconditions.checkNotNull(serversToAdd);
+        for (Entity s : serversToAdd) {
+            addServerSeveralTimes(s, 12, Duration.TEN_SECONDS);
+        }
+    }
+
+    /** try adding in a loop because we are seeing spurious port failures in 
AWS */
+    protected void addServerSeveralTimes(Entity s, int numRetries, Duration 
delayOnFailure) {
+        try {
+            addServer(s);
+        } catch (Exception e) {
+            Exceptions.propagateIfFatal(e);
+            if (numRetries<=0) throw Exceptions.propagate(e);
+            // retry once after sleep because we are getting some odd 
primary-change events
+            log.warn("Error adding "+s+" to "+this+", "+numRetries+" retries 
remaining, will retry after delay ("+e+")");
+            Time.sleep(delayOnFailure);
+            addServerSeveralTimes(s, numRetries-1, delayOnFailure);
+        }
+    }
+
+    protected void addServer(Entity serverToAdd) {
+        Preconditions.checkNotNull(serverToAdd);
+        if (serverToAdd.equals(getPrimaryNode())) {
+            // no need to add; but we pass it in anyway because it makes the 
calling logic easier
+            return;
+        }
+        if (!isMemberInCluster(serverToAdd)) {
+            HostAndPort webAdmin = 
HostAndPort.fromParts(serverToAdd.getAttribute(SoftwareProcess.SUBNET_HOSTNAME),
+                    
serverToAdd.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT));
+            String username = 
serverToAdd.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME);
+            String password = 
serverToAdd.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD);
+
+            if (isClusterInitialized()) {
+                Entities.invokeEffectorWithArgs(this, getPrimaryNode(), 
CouchbaseNode.SERVER_ADD_AND_REBALANCE, webAdmin.toString(), username, 
password).getUnchecked();
+            } else {
+                Entities.invokeEffectorWithArgs(this, getPrimaryNode(), 
CouchbaseNode.SERVER_ADD, webAdmin.toString(), username, 
password).getUnchecked();
+            }
+            //FIXME check feedback of whether the server was added.
+            ((EntityInternal) 
serverToAdd).setAttribute(CouchbaseNode.IS_IN_CLUSTER, true);
+        }
+    }
+
+    /** finds the cluster name specified for a node or a cluster, 
+     * using {@link CouchbaseCluster#CLUSTER_NAME} or falling back to the 
cluster (or node) ID. */
+    public static String getClusterName(Entity node) {
+        String name = node.getConfig(CLUSTER_NAME);
+        if (!Strings.isBlank(name)) return Strings.makeValidFilename(name);
+        return getClusterOrNode(node).getId();
+    }
+    
+    /** returns Couchbase cluster in ancestry, defaulting to the given node if 
none */
+    @Nonnull public static Entity getClusterOrNode(Entity node) {
+        Iterable<CouchbaseCluster> clusterNodes = 
Iterables.filter(Entities.ancestors(node), CouchbaseCluster.class);
+        return Iterables.getFirst(clusterNodes, node);
+    }
+    
+    public boolean isClusterInitialized() {
+        return 
Optional.fromNullable(getAttribute(IS_CLUSTER_INITIALIZED)).or(false);
+    }
+
+    public boolean isMemberInCluster(Entity e) {
+        return 
Optional.fromNullable(e.getAttribute(CouchbaseNode.IS_IN_CLUSTER)).or(false);
+    }
+    
+    public void createBuckets() {
+        //TODO: check for port conflicts if buckets are being created with a 
port
+        List<Map<String, Object>> bucketsToCreate = getConfig(CREATE_BUCKETS);
+        if (bucketsToCreate==null) return;
+        
+        Entity primaryNode = getPrimaryNode();
+
+        for (Map<String, Object> bucketMap : bucketsToCreate) {
+            String bucketName = bucketMap.containsKey("bucket") ? (String) 
bucketMap.get("bucket") : "default";
+            String bucketType = bucketMap.containsKey("bucket-type") ? 
(String) bucketMap.get("bucket-type") : "couchbase";
+            // default bucket must be on this port; other buckets can (must) 
specify their own (unique) port
+            Integer bucketPort = bucketMap.containsKey("bucket-port") ? 
(Integer) bucketMap.get("bucket-port") : 11211;
+            Integer bucketRamSize = bucketMap.containsKey("bucket-ramsize") ? 
(Integer) bucketMap.get("bucket-ramsize") : 100;
+            Integer bucketReplica = bucketMap.containsKey("bucket-replica") ? 
(Integer) bucketMap.get("bucket-replica") : 1;
+
+            createBucket(primaryNode, bucketName, bucketType, bucketPort, 
bucketRamSize, bucketReplica);
+        }
+    }
+
+    public void createBucket(final Entity primaryNode, final String 
bucketName, final String bucketType, final Integer bucketPort, final Integer 
bucketRamSize, final Integer bucketReplica) {
+        
DynamicTasks.queueIfPossible(TaskBuilder.<Void>builder().name("Creating bucket 
" + bucketName).body(
+                new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        
DependentConfiguration.waitInTaskForAttributeReady(CouchbaseClusterImpl.this, 
CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false));
+                        if 
(CouchbaseClusterImpl.this.resetBucketCreation.get() != null) {
+                            
CouchbaseClusterImpl.this.resetBucketCreation.get().stop();
+                        }
+                        
setAttribute(CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, true);
+                        HostAndPort hostAndPort = 
BrooklynAccessUtils.getBrooklynAccessibleAddress(primaryNode, 
primaryNode.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT));
+
+                        
CouchbaseClusterImpl.this.resetBucketCreation.set(HttpFeed.builder()
+                                .entity(CouchbaseClusterImpl.this)
+                                .period(500, TimeUnit.MILLISECONDS)
+                                
.baseUri(String.format("http://%s/pools/default/buckets/%s";, hostAndPort, 
bucketName))
+                                
.credentials(primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME), 
primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD))
+                                .poll(new 
HttpPollConfig<Boolean>(BUCKET_CREATION_IN_PROGRESS)
+                                        
.onSuccess(Functionals.chain(HttpValueFunctions.jsonContents(), 
JsonFunctions.walkN("nodes"), new Function<JsonElement, Boolean>() {
+                                            @Override
+                                            public Boolean apply(JsonElement 
input) {
+                                                // Wait until bucket has been 
created on all nodes and the couchApiBase element has been published 
(indicating that the bucket is useable)
+                                                JsonArray servers = 
input.getAsJsonArray();
+                                                if (servers.size() != 
CouchbaseClusterImpl.this.getMembers().size()) {
+                                                    return true;
+                                                }
+                                                for (JsonElement server : 
servers) {
+                                                    Object api = 
server.getAsJsonObject().get("couchApiBase");
+                                                    if (api == null || 
Strings.isEmpty(String.valueOf(api))) {
+                                                        return true;
+                                                    }
+                                                }
+                                                return false;
+                                            }
+                                        }))
+                                        .onFailureOrException(new 
Function<Object, Boolean>() {
+                                            @Override
+                                            public Boolean apply(Object input) 
{
+                                                if (input instanceof 
brooklyn.util.http.HttpToolResponse) {
+                                                    if 
(((brooklyn.util.http.HttpToolResponse) input).getResponseCode() == 404) {
+                                                        return true;
+                                                    }
+                                                }
+                                                if (input instanceof Throwable)
+                                                    
Exceptions.propagate((Throwable) input);
+                                                throw new 
IllegalStateException("Unexpected response when creating bucket:" + input);
+                                            }
+                                        }))
+                                .build());
+
+                        // TODO: Bail out if bucket creation fails, to allow 
next bucket to proceed
+                        
Entities.invokeEffectorWithArgs(CouchbaseClusterImpl.this, primaryNode, 
CouchbaseNode.BUCKET_CREATE, bucketName, bucketType, bucketPort, bucketRamSize, 
bucketReplica);
+                        
DependentConfiguration.waitInTaskForAttributeReady(CouchbaseClusterImpl.this, 
CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false));
+                        if 
(CouchbaseClusterImpl.this.resetBucketCreation.get() != null) {
+                            
CouchbaseClusterImpl.this.resetBucketCreation.get().stop();
+                        }
+                        return null;
+                    }
+                }
+        ).build()).orSubmitAndBlock();
+    }
+    
+    static {
+        RendererHints.register(COUCH_DOCS_DATA_SIZE_PER_NODE, 
RendererHints.displayValue(ByteSizeStrings.metric()));
+        RendererHints.register(COUCH_DOCS_ACTUAL_DISK_SIZE_PER_NODE, 
RendererHints.displayValue(ByteSizeStrings.metric()));
+        RendererHints.register(MEM_USED_PER_NODE, 
RendererHints.displayValue(ByteSizeStrings.metric()));
+        RendererHints.register(COUCH_VIEWS_ACTUAL_DISK_SIZE_PER_NODE, 
RendererHints.displayValue(ByteSizeStrings.metric()));
+        RendererHints.register(COUCH_VIEWS_DATA_SIZE_PER_NODE, 
RendererHints.displayValue(ByteSizeStrings.metric()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNode.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNode.java
 
b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNode.java
new file mode 100644
index 0000000..16434fa
--- /dev/null
+++ 
b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNode.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.couchbase;
+
+import java.net.URI;
+
+import org.apache.brooklyn.catalog.Catalog;
+import brooklyn.config.ConfigKey;
+import brooklyn.config.render.RendererHints;
+import brooklyn.entity.annotation.Effector;
+import brooklyn.entity.annotation.EffectorParam;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.MethodEffector;
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.effector.Effectors;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
+import brooklyn.event.basic.Sensors;
+import brooklyn.util.flags.SetFromFlag;
+import brooklyn.util.text.ByteSizeStrings;
+
+@Catalog(name="CouchBase Node", description="Couchbase Server is an open 
source, distributed (shared-nothing architecture) "
+        + "NoSQL document-oriented database that is optimized for interactive 
applications.")
+@ImplementedBy(CouchbaseNodeImpl.class)
+public interface CouchbaseNode extends SoftwareProcess {
+
+    @SetFromFlag("adminUsername")
+    ConfigKey<String> COUCHBASE_ADMIN_USERNAME = 
ConfigKeys.newStringConfigKey("couchbase.adminUsername", "Username for the 
admin user on the node", "Administrator");
+
+    @SetFromFlag("adminPassword")
+    ConfigKey<String> COUCHBASE_ADMIN_PASSWORD = 
ConfigKeys.newStringConfigKey("couchbase.adminPassword", "Password for the 
admin user on the node", "Password");
+
+    @SetFromFlag("version")
+    ConfigKey<String> SUGGESTED_VERSION = 
ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION,
+            "3.0.0");
+
+    @SetFromFlag("enterprise")
+    ConfigKey<Boolean> USE_ENTERPRISE = 
ConfigKeys.newBooleanConfigKey("couchbase.enterprise.enabled",
+        "Whether to use Couchbase Enterprise; if false uses the community 
version. Defaults to true.", true);
+
+    @SetFromFlag("downloadUrl")
+    BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new 
BasicAttributeSensorAndConfigKey<String>(
+            SoftwareProcess.DOWNLOAD_URL, 
"http://packages.couchbase.com/releases/${version}/";
+                + 
"couchbase-server-${driver.communityOrEnterprise}${driver.downloadLinkPreVersionSeparator}${version}${driver.downloadLinkOsTagWithPrefix}");
+
+    @SetFromFlag("clusterInitRamSize")
+    BasicAttributeSensorAndConfigKey<Integer> COUCHBASE_CLUSTER_INIT_RAM_SIZE 
= new BasicAttributeSensorAndConfigKey<Integer>(
+            Integer.class, "couchbase.clusterInitRamSize", "initial ram size 
of the cluster", 300);
+
+    PortAttributeSensorAndConfigKey COUCHBASE_WEB_ADMIN_PORT = new 
PortAttributeSensorAndConfigKey("couchbase.webAdminPort", "Web Administration 
Port", "8091+");
+    PortAttributeSensorAndConfigKey COUCHBASE_API_PORT = new 
PortAttributeSensorAndConfigKey("couchbase.apiPort", "Couchbase API Port", 
"8092+");
+    PortAttributeSensorAndConfigKey COUCHBASE_INTERNAL_BUCKET_PORT = new 
PortAttributeSensorAndConfigKey("couchbase.internalBucketPort", "Internal 
Bucket Port", "11209");
+    PortAttributeSensorAndConfigKey COUCHBASE_INTERNAL_EXTERNAL_BUCKET_PORT = 
new PortAttributeSensorAndConfigKey("couchbase.internalExternalBucketPort", 
"Internal/External Bucket Port", "11210");
+    PortAttributeSensorAndConfigKey COUCHBASE_CLIENT_INTERFACE_PROXY = new 
PortAttributeSensorAndConfigKey("couchbase.clientInterfaceProxy", "Client 
interface (proxy)", "11211");
+    PortAttributeSensorAndConfigKey COUCHBASE_INCOMING_SSL_PROXY = new 
PortAttributeSensorAndConfigKey("couchbase.incomingSslProxy", "Incoming SSL 
Proxy", "11214");
+    PortAttributeSensorAndConfigKey COUCHBASE_INTERNAL_OUTGOING_SSL_PROXY = 
new PortAttributeSensorAndConfigKey("couchbase.internalOutgoingSslProxy", 
"Internal Outgoing SSL Proxy", "11215");
+    PortAttributeSensorAndConfigKey COUCHBASE_REST_HTTPS_FOR_SSL = new 
PortAttributeSensorAndConfigKey("couchbase.internalRestHttpsForSsl", "Internal 
REST HTTPS for SSL", "18091");
+    PortAttributeSensorAndConfigKey COUCHBASE_CAPI_HTTPS_FOR_SSL = new 
PortAttributeSensorAndConfigKey("couchbase.internalCapiHttpsForSsl", "Internal 
CAPI HTTPS for SSL", "18092");
+    PortAttributeSensorAndConfigKey ERLANG_PORT_MAPPER = new 
PortAttributeSensorAndConfigKey("couchbase.erlangPortMapper", "Erlang Port 
Mapper Daemon Listener Port (epmd)", "4369");
+    PortAttributeSensorAndConfigKey NODE_DATA_EXCHANGE_PORT_RANGE_START = new 
PortAttributeSensorAndConfigKey("couchbase.nodeDataExchangePortRangeStart", 
"Node data exchange Port Range Start", "21100+");
+    PortAttributeSensorAndConfigKey NODE_DATA_EXCHANGE_PORT_RANGE_END = new 
PortAttributeSensorAndConfigKey("couchbase.nodeDataExchangePortRangeEnd", "Node 
data exchange Port Range End", "21199+");
+
+    AttributeSensor<Boolean> IS_PRIMARY_NODE = 
Sensors.newBooleanSensor("couchbase.isPrimaryNode", "flag to determine if the 
current couchbase node is the primary node for the cluster");
+    AttributeSensor<Boolean> IS_IN_CLUSTER = 
Sensors.newBooleanSensor("couchbase.isInCluster", "flag to determine if the 
current couchbase node has been added to a cluster, "
+        + "including being the first / primary node");
+    AttributeSensor<URI> COUCHBASE_WEB_ADMIN_URL = Attributes.MAIN_URI;
+    
+    // Interesting stats
+    AttributeSensor<Double> OPS = 
Sensors.newDoubleSensor("couchbase.stats.ops", 
+            "Retrieved from pools/nodes/<current node>/interestingStats/ops");
+    AttributeSensor<Long> COUCH_DOCS_DATA_SIZE = 
Sensors.newLongSensor("couchbase.stats.couch.docs.data.size", 
+            "Retrieved from pools/nodes/<current 
node>/interestingStats/couch_docs_data_size");
+    AttributeSensor<Long> COUCH_DOCS_ACTUAL_DISK_SIZE = 
Sensors.newLongSensor("couchbase.stats.couch.docs.actual.disk.size", 
+            "Retrieved from pools/nodes/<current 
node>/interestingStats/couch_docs_actual_disk_size");
+    AttributeSensor<Long> EP_BG_FETCHED = 
Sensors.newLongSensor("couchbase.stats.ep.bg.fetched", 
+            "Retrieved from pools/nodes/<current 
node>/interestingStats/ep_bg_fetched");
+    AttributeSensor<Long> MEM_USED = 
Sensors.newLongSensor("couchbase.stats.mem.used", 
+            "Retrieved from pools/nodes/<current 
node>/interestingStats/mem_used");
+    AttributeSensor<Long> COUCH_VIEWS_ACTUAL_DISK_SIZE = 
Sensors.newLongSensor("couchbase.stats.couch.views.actual.disk.size", 
+            "Retrieved from pools/nodes/<current 
node>/interestingStats/couch_views_actual_disk_size");
+    AttributeSensor<Long> CURR_ITEMS = 
Sensors.newLongSensor("couchbase.stats.curr.items", 
+            "Retrieved from pools/nodes/<current 
node>/interestingStats/curr_items");
+    AttributeSensor<Long> VB_REPLICA_CURR_ITEMS = 
Sensors.newLongSensor("couchbase.stats.vb.replica.curr.items", 
+            "Retrieved from pools/nodes/<current 
node>/interestingStats/vb_replica_curr_items");
+    AttributeSensor<Long> COUCH_VIEWS_DATA_SIZE = 
Sensors.newLongSensor("couchbase.stats.couch.views.data.size", 
+            "Retrieved from pools/nodes/<current 
node>/interestingStats/couch_views_data_size");
+    AttributeSensor<Long> GET_HITS = 
Sensors.newLongSensor("couchbase.stats.get.hits", 
+            "Retrieved from pools/nodes/<current 
node>/interestingStats/get_hits");
+    AttributeSensor<Double> CMD_GET = 
Sensors.newDoubleSensor("couchbase.stats.cmd.get", 
+            "Retrieved from pools/nodes/<current 
node>/interestingStats/cmd_get");
+    AttributeSensor<Long> CURR_ITEMS_TOT = 
Sensors.newLongSensor("couchbase.stats.curr.items.tot", 
+            "Retrieved from pools/nodes/<current 
node>/interestingStats/curr_items_tot");
+    AttributeSensor<String> REBALANCE_STATUS = 
Sensors.newStringSensor("couchbase.rebalance.status", 
+            "Displays the current rebalance status from 
pools/nodes/rebalanceStatus");
+    
+    class MainUri {
+        public static final AttributeSensor<URI> MAIN_URI = 
Attributes.MAIN_URI;
+        
+        static {
+            // ROOT_URL does not need init because it refers to something 
already initialized
+            RendererHints.register(COUCHBASE_WEB_ADMIN_URL, 
RendererHints.namedActionWithUrl());
+
+            RendererHints.register(COUCH_DOCS_DATA_SIZE, 
RendererHints.displayValue(ByteSizeStrings.metric()));
+            RendererHints.register(COUCH_DOCS_ACTUAL_DISK_SIZE, 
RendererHints.displayValue(ByteSizeStrings.metric()));
+            RendererHints.register(MEM_USED, 
RendererHints.displayValue(ByteSizeStrings.metric()));
+            RendererHints.register(COUCH_VIEWS_ACTUAL_DISK_SIZE, 
RendererHints.displayValue(ByteSizeStrings.metric()));
+            RendererHints.register(COUCH_VIEWS_DATA_SIZE, 
RendererHints.displayValue(ByteSizeStrings.metric()));
+        }
+    }
+    
+    // this long-winded reference is done just to trigger the initialization 
above
+    AttributeSensor<URI> MAIN_URI = MainUri.MAIN_URI;
+
+    MethodEffector<Void> SERVER_ADD = new 
MethodEffector<Void>(CouchbaseNode.class, "serverAdd");
+    MethodEffector<Void> SERVER_ADD_AND_REBALANCE = new 
MethodEffector<Void>(CouchbaseNode.class, "serverAddAndRebalance");
+    MethodEffector<Void> REBALANCE = new 
MethodEffector<Void>(CouchbaseNode.class, "rebalance");
+    MethodEffector<Void> BUCKET_CREATE = new 
MethodEffector<Void>(CouchbaseNode.class, "bucketCreate");
+    brooklyn.entity.Effector<Void> ADD_REPLICATION_RULE = 
Effectors.effector(Void.class, "addReplicationRule")
+        .description("Adds a replication rule from the indicated bucket on the 
cluster where this node is located "
+            + "to the indicated cluster and optional destination bucket")
+        .parameter(String.class, "fromBucket", "Bucket to be replicated")
+        .parameter(Object.class, "toCluster", "Entity (or ID) of the cluster 
to which this should replicate")
+        .parameter(String.class, "toBucket", "Destination bucket for 
replication in the toCluster, defaulting to the same as the fromBucket")
+        .buildAbstract();
+
+    @Effector(description = "add a server to a cluster")
+    public void serverAdd(@EffectorParam(name = "serverHostname") String 
serverToAdd, @EffectorParam(name = "username") String username, 
@EffectorParam(name = "password") String password);
+    
+    @Effector(description = "add a server to a cluster, and immediately 
rebalances")
+    public void serverAddAndRebalance(@EffectorParam(name = "serverHostname") 
String serverToAdd, @EffectorParam(name = "username") String username, 
@EffectorParam(name = "password") String password);
+
+    @Effector(description = "rebalance the couchbase cluster")
+    public void rebalance();
+    
+    @Effector(description = "create a new bucket")
+    public void bucketCreate(@EffectorParam(name = "bucketName") String 
bucketName, @EffectorParam(name = "bucketType") String bucketType, 
+            @EffectorParam(name = "bucketPort") Integer bucketPort, 
@EffectorParam(name = "bucketRamSize") Integer bucketRamSize, 
+            @EffectorParam(name = "bucketReplica") Integer bucketReplica);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java
 
b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java
new file mode 100644
index 0000000..1ff28fd
--- /dev/null
+++ 
b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.couchbase;
+
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.SoftwareProcessDriver;
+
+public interface CouchbaseNodeDriver extends SoftwareProcessDriver {
+    public String getOsTag();
+    public String getDownloadLinkPreVersionSeparator();
+    public String getDownloadLinkOsTagWithPrefix();
+    
+    public String getCommunityOrEnterprise();
+
+    public void serverAdd(String serverToAdd, String username, String 
password);
+
+    public void rebalance();
+    
+    public void bucketCreate(String bucketName, String bucketType, Integer 
bucketPort, Integer bucketRamSize, Integer bucketReplica);
+
+    public void serverAddAndRebalance(String serverToAdd, String username, 
String password);
+
+    public void addReplicationRule(Entity toCluster, String fromBucket, String 
toBucket);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java
 
b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java
new file mode 100644
index 0000000..a0654db
--- /dev/null
+++ 
b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.couchbase;
+
+import static java.lang.String.format;
+
+import java.net.URI;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.SoftwareProcessImpl;
+import brooklyn.entity.effector.EffectorBody;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.SensorEvent;
+import brooklyn.event.SensorEventListener;
+import brooklyn.event.feed.http.HttpFeed;
+import brooklyn.event.feed.http.HttpPollConfig;
+import brooklyn.event.feed.http.HttpValueFunctions;
+import brooklyn.event.feed.http.JsonFunctions;
+import brooklyn.location.MachineProvisioningLocation;
+import brooklyn.location.access.BrooklynAccessUtils;
+import brooklyn.location.cloud.CloudLocationConfig;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.collections.MutableSet;
+import brooklyn.util.config.ConfigBag;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.guava.Functionals;
+import brooklyn.util.guava.MaybeFunctions;
+import brooklyn.util.guava.TypeTokens;
+import brooklyn.util.http.HttpTool;
+import brooklyn.util.http.HttpToolResponse;
+import brooklyn.util.net.Urls;
+import brooklyn.util.task.Tasks;
+import brooklyn.util.text.Strings;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Preconditions;
+import com.google.common.net.HostAndPort;
+import com.google.common.net.HttpHeaders;
+import com.google.common.net.MediaType;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+
+public class CouchbaseNodeImpl extends SoftwareProcessImpl implements 
CouchbaseNode {
+
+    private static final Logger log = 
LoggerFactory.getLogger(CouchbaseNodeImpl.class);
+
+    private volatile HttpFeed httpFeed;
+
+    @Override
+    public Class<CouchbaseNodeDriver> getDriverInterface() {
+        return CouchbaseNodeDriver.class;
+    }
+
+    @Override
+    public CouchbaseNodeDriver getDriver() {
+        return (CouchbaseNodeDriver) super.getDriver();
+    }
+
+    @Override
+    public void init() {
+        super.init();
+
+        subscribe(this, Attributes.SERVICE_UP, new 
SensorEventListener<Boolean>() {
+            @Override
+            public void onEvent(SensorEvent<Boolean> booleanSensorEvent) {
+                if (Boolean.TRUE.equals(booleanSensorEvent.getValue())) {
+                    Integer webPort = 
getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT);
+                    Preconditions.checkNotNull(webPort, 
CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT+" not set for %s; is an acceptable port 
available?", this);
+                    String hostAndPort = 
BrooklynAccessUtils.getBrooklynAccessibleAddress(CouchbaseNodeImpl.this, 
webPort).toString();
+                    setAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_URL, 
URI.create(format("http://%s";, hostAndPort)));
+                }
+            }
+        });
+
+        getMutableEntityType().addEffector(ADD_REPLICATION_RULE, new 
EffectorBody<Void>() {
+            @Override
+            public Void call(ConfigBag parameters) {
+                addReplicationRule(parameters);
+                return null;
+            }
+        });
+    }
+
+    protected Map<String, Object> 
obtainProvisioningFlags(@SuppressWarnings("rawtypes") 
MachineProvisioningLocation location) {
+        ConfigBag result = 
ConfigBag.newInstance(super.obtainProvisioningFlags(location));
+        result.configure(CloudLocationConfig.OS_64_BIT, true);
+        return result.getAllConfig();
+    }
+
+    @Override
+    protected Collection<Integer> getRequiredOpenPorts() {
+        // TODO this creates a huge list of inbound ports; much better to 
define on a security group using range syntax!
+        int erlangRangeStart = 
getConfig(NODE_DATA_EXCHANGE_PORT_RANGE_START).iterator().next();
+        int erlangRangeEnd = 
getConfig(NODE_DATA_EXCHANGE_PORT_RANGE_END).iterator().next();
+
+        Set<Integer> newPorts = 
MutableSet.<Integer>copyOf(super.getRequiredOpenPorts());
+        newPorts.remove(erlangRangeStart);
+        newPorts.remove(erlangRangeEnd);
+        for (int i = erlangRangeStart; i <= erlangRangeEnd; i++)
+            newPorts.add(i);
+        return newPorts;
+    }
+
+    @Override
+    public void serverAdd(String serverToAdd, String username, String 
password) {
+        getDriver().serverAdd(serverToAdd, username, password);
+    }
+
+    @Override
+    public void serverAddAndRebalance(String serverToAdd, String username, 
String password) {
+        getDriver().serverAddAndRebalance(serverToAdd, username, password);
+    }
+
+    @Override
+    public void rebalance() {
+        getDriver().rebalance();
+    }
+
+    protected final static Function<HttpToolResponse, JsonElement> 
GET_THIS_NODE_STATS = Functionals.chain(
+        HttpValueFunctions.jsonContents(),
+        JsonFunctions.walk("nodes"),
+        new Function<JsonElement, JsonElement>() {
+            @Override public JsonElement apply(JsonElement input) {
+                JsonArray nodes = input.getAsJsonArray();
+                for (JsonElement element : nodes) {
+                    JsonElement thisNode = 
element.getAsJsonObject().get("thisNode");
+                    if (thisNode!=null && 
Boolean.TRUE.equals(thisNode.getAsBoolean())) {
+                        return 
element.getAsJsonObject().get("interestingStats");
+                    }
+                }
+                return null;
+        }}
+    );
+
+    protected final static <T> HttpPollConfig<T> 
getSensorFromNodeStat(AttributeSensor<T> sensor, String ...jsonPath) {
+        return new HttpPollConfig<T>(sensor)
+            .onSuccess(Functionals.chain(GET_THIS_NODE_STATS,
+                MaybeFunctions.<JsonElement>wrap(),
+                JsonFunctions.walkM(jsonPath),
+                
JsonFunctions.castM(TypeTokens.getRawRawType(sensor.getTypeToken()), null)))
+            .onFailureOrException(Functions.<T>constant(null));
+    }
+
+    @Override
+    protected void postStart() {
+        super.postStart();
+        renameServerToPublicHostname();
+    }
+
+    protected void renameServerToPublicHostname() {
+        // 
http://docs.couchbase.com/couchbase-manual-2.5/cb-install/#couchbase-getting-started-hostnames
+        URI apiUri = null;
+        try {
+            HostAndPort accessible = 
BrooklynAccessUtils.getBrooklynAccessibleAddress(this, 
getAttribute(COUCHBASE_WEB_ADMIN_PORT));
+            apiUri = 
URI.create(String.format("http://%s:%d/node/controller/rename";, 
accessible.getHostText(), accessible.getPort()));
+            UsernamePasswordCredentials credentials = new 
UsernamePasswordCredentials(getConfig(COUCHBASE_ADMIN_USERNAME), 
getConfig(COUCHBASE_ADMIN_PASSWORD));
+            HttpToolResponse response = HttpTool.httpPost(
+                    // the uri is required by the HttpClientBuilder in order 
to set the AuthScope of the credentials
+                    
HttpTool.httpClientBuilder().uri(apiUri).credentials(credentials).build(),
+                    apiUri,
+                    MutableMap.of(
+                            HttpHeaders.CONTENT_TYPE, 
MediaType.FORM_DATA.toString(),
+                            HttpHeaders.ACCEPT, "*/*",
+                            // this appears needed; without it we get 
org.apache.http.NoHttpResponseException !?
+                            HttpHeaders.AUTHORIZATION, 
HttpTool.toBasicAuthorizationValue(credentials)),
+                    
Charsets.UTF_8.encode("hostname="+Urls.encode(accessible.getHostText())).array());
+            log.debug("Renamed Couchbase server "+this+" via "+apiUri+": 
"+response);
+            if (!HttpTool.isStatusCodeHealthy(response.getResponseCode())) {
+                log.warn("Invalid response code, renaming "+apiUri+": 
"+response);
+            }
+        } catch (Exception e) {
+            Exceptions.propagateIfFatal(e);
+            log.warn("Error renaming server, using "+apiUri+": "+e, e);
+        }
+    }
+
+    public void connectSensors() {
+        super.connectSensors();
+        connectServiceUpIsRunning();
+
+        HostAndPort hostAndPort = 
BrooklynAccessUtils.getBrooklynAccessibleAddress(this, 
this.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT));
+        httpFeed = HttpFeed.builder()
+            .entity(this)
+            .period(Duration.seconds(3))
+            .baseUri("http://"; + hostAndPort + "/pools/nodes/")
+            
.credentialsIfNotNull(getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME), 
getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD))
+            .poll(getSensorFromNodeStat(CouchbaseNode.OPS, "ops"))
+            .poll(getSensorFromNodeStat(CouchbaseNode.COUCH_DOCS_DATA_SIZE, 
"couch_docs_data_size"))
+            
.poll(getSensorFromNodeStat(CouchbaseNode.COUCH_DOCS_ACTUAL_DISK_SIZE, 
"couch_docs_actual_disk_size"))
+            .poll(getSensorFromNodeStat(CouchbaseNode.EP_BG_FETCHED, 
"ep_bg_fetched"))
+            .poll(getSensorFromNodeStat(CouchbaseNode.MEM_USED, "mem_used"))
+            
.poll(getSensorFromNodeStat(CouchbaseNode.COUCH_VIEWS_ACTUAL_DISK_SIZE, 
"couch_views_actual_disk_size"))
+            .poll(getSensorFromNodeStat(CouchbaseNode.CURR_ITEMS, 
"curr_items"))
+            .poll(getSensorFromNodeStat(CouchbaseNode.VB_REPLICA_CURR_ITEMS, 
"vb_replica_curr_items"))
+            .poll(getSensorFromNodeStat(CouchbaseNode.COUCH_VIEWS_DATA_SIZE, 
"couch_views_data_size"))
+            .poll(getSensorFromNodeStat(CouchbaseNode.GET_HITS, "get_hits"))
+            .poll(getSensorFromNodeStat(CouchbaseNode.CMD_GET, "cmd_get"))
+            .poll(getSensorFromNodeStat(CouchbaseNode.CURR_ITEMS_TOT, 
"curr_items_tot"))
+            .poll(new HttpPollConfig<String>(CouchbaseNode.REBALANCE_STATUS)
+                    
.onSuccess(HttpValueFunctions.jsonContents("rebalanceStatus", String.class))
+                    .onFailureOrException(Functions.constant("Could not 
retrieve")))
+            .build();
+    }
+
+    public void disconnectSensors() {
+        super.disconnectSensors();
+        disconnectServiceUpIsRunning();
+        if (httpFeed != null) {
+            httpFeed.stop();
+        }
+    }
+
+    @Override
+    public void bucketCreate(String bucketName, String bucketType, Integer 
bucketPort, Integer bucketRamSize, Integer bucketReplica) {
+        if (Strings.isBlank(bucketType)) bucketType = "couchbase";
+        if (bucketRamSize==null || bucketRamSize<=0) bucketRamSize = 200;
+        if (bucketReplica==null || bucketReplica<0) bucketReplica = 1;
+
+        getDriver().bucketCreate(bucketName, bucketType, bucketPort, 
bucketRamSize, bucketReplica);
+    }
+
+    /** exposed through {@link CouchbaseNode#ADD_REPLICATION_RULE} */
+    protected void addReplicationRule(ConfigBag ruleArgs) {
+        Object toClusterO = 
Preconditions.checkNotNull(ruleArgs.getStringKey("toCluster"), "toCluster must 
not be null");
+        if (toClusterO instanceof String) {
+            toClusterO = getManagementContext().lookup((String)toClusterO);
+        }
+        Entity toCluster = Tasks.resolving(toClusterO, 
Entity.class).context(getExecutionContext()).get();
+
+        String fromBucket = Preconditions.checkNotNull( 
(String)ruleArgs.getStringKey("fromBucket"), "fromBucket must be specified" );
+
+        String toBucket = (String)ruleArgs.getStringKey("toBucket");
+        if (toBucket==null) toBucket = fromBucket;
+
+        if (!ruleArgs.getUnusedConfig().isEmpty()) {
+            throw new IllegalArgumentException("Unsupported replication rule 
data: "+ruleArgs.getUnusedConfig());
+        }
+
+        getDriver().addReplicationRule(toCluster, fromBucket, toBucket);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java
 
b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java
new file mode 100644
index 0000000..73ed934
--- /dev/null
+++ 
b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java
@@ -0,0 +1,512 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.couchbase;
+
+import static brooklyn.util.ssh.BashCommands.*;
+import static java.lang.String.format;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.apache.http.auth.UsernamePasswordCredentials;
+
+import brooklyn.entity.Entity;
+import brooklyn.entity.Group;
+import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.ServiceStateLogic;
+import brooklyn.entity.drivers.downloads.BasicDownloadRequirement;
+import brooklyn.entity.drivers.downloads.DownloadProducerFromUrlAttribute;
+import brooklyn.entity.software.SshEffectorTasks;
+import brooklyn.event.basic.DependentConfiguration;
+import brooklyn.event.feed.http.HttpValueFunctions;
+import brooklyn.location.OsDetails;
+import brooklyn.location.access.BrooklynAccessUtils;
+import brooklyn.location.basic.SshMachineLocation;
+import brooklyn.management.Task;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.http.HttpTool;
+import brooklyn.util.http.HttpToolResponse;
+import brooklyn.util.repeat.Repeater;
+import brooklyn.util.ssh.BashCommands;
+import brooklyn.util.task.DynamicTasks;
+import brooklyn.util.task.TaskBuilder;
+import brooklyn.util.task.TaskTags;
+import brooklyn.util.task.Tasks;
+import brooklyn.util.text.NaturalOrderComparator;
+import brooklyn.util.text.StringEscapes.BashStringEscapes;
+import brooklyn.util.text.Strings;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.net.HostAndPort;
+
+public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver 
implements CouchbaseNodeDriver {
+
+    public CouchbaseNodeSshDriver(final CouchbaseNodeImpl entity, final 
SshMachineLocation machine) {
+        super(entity, machine);
+    }
+
+    public static String couchbaseCli(String cmd) {
+        return "/opt/couchbase/bin/couchbase-cli " + cmd + " ";
+    }
+
+    @Override
+    public void preInstall() {
+        resolver = Entities.newDownloader(this);
+        setExpandedInstallDir(getInstallDir());
+    }
+
+    @Override
+    public void install() {
+        //for reference 
https://github.com/urbandecoder/couchbase/blob/master/recipes/server.rb
+        //installation instructions 
(http://docs.couchbase.com/couchbase-manual-2.5/cb-install/#preparing-to-install)
+
+        List<String> urls = resolver.getTargets();
+        String saveAs = resolver.getFilename();
+
+        OsDetails osDetails = getMachine().getMachineDetails().getOsDetails();
+
+        if (osDetails.isLinux()) {
+            List<String> commands = installLinux(urls, saveAs);
+            //FIXME installation return error but the server is up and running.
+            newScript(INSTALLING)
+                    .body.append(commands).execute();
+        } else {
+            Tasks.markInessential();
+            throw new IllegalStateException("Unsupported OS for installing 
Couchbase. Will continue but may fail later.");
+        }
+    }
+
+    private List<String> installLinux(List<String> urls, String saveAs) {
+
+        log.info("Installing " + getEntity() + " using couchbase-server-{} 
{}", getCommunityOrEnterprise(), getVersion());
+
+        String apt = chainGroup(
+                installPackage(MutableMap.of("apt", "python-httplib2 
libssl0.9.8"), null),
+                sudo(format("dpkg -i %s", saveAs)));
+
+        String yum = chainGroup(
+                "which yum",
+                // The following prevents failure on RHEL AWS nodes:
+                // https://forums.aws.amazon.com/thread.jspa?threadID=100509
+                ok(sudo("sed -i.bk s/^enabled=1$/enabled=0/ 
/etc/yum/pluginconf.d/subscription-manager.conf")),
+                ok(sudo("yum check-update")),
+                sudo("yum install -y pkgconfig"),
+                // RHEL requires openssl version 098
+                sudo("[ -f /etc/redhat-release ] && (grep -i \"red hat\" 
/etc/redhat-release && sudo yum install -y openssl098e) || :"),
+                sudo(format("rpm --install %s", saveAs)));
+
+        String link = new DownloadProducerFromUrlAttribute().apply(new 
BasicDownloadRequirement(this)).getPrimaryLocations().iterator().next();
+        return ImmutableList.<String>builder()
+                .add(INSTALL_CURL)
+                .addAll(Arrays.asList(INSTALL_CURL,
+                        
BashCommands.require(BashCommands.alternatives(BashCommands.simpleDownloadUrlAs(urls,
 saveAs),
+                                        // Referer link is required for 3.0.0; 
note mis-spelling is correct, as per http://en.wikipedia.org/wiki/HTTP_referer
+                                        "curl -f -L -k " + 
BashStringEscapes.wrapBash(link)
+                                                + " -H 'Referer: 
http://www.couchbase.com/downloads'"
+                                                + " -o " + saveAs),
+                                "Could not retrieve " + saveAs + " (from " + 
urls.size() + " sites)", 9)))
+                .add(alternatives(apt, yum))
+                .build();
+    }
+
+    @Override
+    public void customize() {
+        //TODO: add linux tweaks for couchbase
+        //http://blog.couchbase.com/often-overlooked-linux-os-tweaks
+        //http://blog.couchbase.com/kirk
+
+        //turn off swappiness
+        //vm.swappiness=0
+        //sudo echo 0 > /proc/sys/vm/swappiness
+
+        //os page cache = 20%
+
+        //disable THP
+        //sudo echo never > /sys/kernel/mm/transparent_hugepage/enabled
+        //sudo echo never > /sys/kernel/mm/transparent_hugepage/defrag
+
+        //turn off transparent huge pages
+        //limit page cache disty bytes
+        //control the rate page cache is flused ... vm.dirty_*
+    }
+
+    @Override
+    public void launch() {
+        String clusterPrefix = "--cluster-" + (isPreV3() ? "init-" : "");
+        // in v30, the cluster arguments were changed, and it became mandatory 
to supply a url + password (if there is none, these are ignored)
+        newScript(LAUNCHING)
+                .body.append(
+                sudo("/etc/init.d/couchbase-server start"),
+                "for i in {0..120}\n" +
+                        "do\n" +
+                        "    if [ $i -eq 120 ]; then echo REST API unavailable 
after 120 seconds, failing; exit 1; fi;\n" +
+                        "    curl -s " + String.format("http://localhost:%s";, 
getWebPort()) + " > /dev/null && echo REST API available after $i seconds && 
break\n" +
+                        "    sleep 1\n" +
+                        "done\n" +
+                        couchbaseCli("cluster-init") +
+                        (isPreV3() ? getCouchbaseHostnameAndPort() : 
getCouchbaseHostnameAndCredentials()) +
+                        " " + clusterPrefix + "username=" + getUsername() +
+                        " " + clusterPrefix + "password=" + getPassword() +
+                        " " + clusterPrefix + "port=" + getWebPort() +
+                        " " + clusterPrefix + "ramsize=" + 
getClusterInitRamSize())
+                .execute();
+    }
+
+    @Override
+    public boolean isRunning() {
+        //TODO add a better way to check if couchbase server is running
+        return (newScript(CHECK_RUNNING)
+                .body.append(format("curl -u %s:%s 
http://localhost:%s/pools/nodes";, getUsername(), getPassword(), getWebPort()))
+                .execute() == 0);
+    }
+
+    @Override
+    public void stop() {
+        newScript(STOPPING)
+                .body.append(sudo("/etc/init.d/couchbase-server stop"))
+                .execute();
+    }
+
+    @Override
+    public String getVersion() {
+        return entity.getConfig(CouchbaseNode.SUGGESTED_VERSION);
+    }
+
+    @Override
+    public String getOsTag() {
+        return newDownloadLinkSegmentComputer().getOsTag();
+    }
+
+    protected DownloadLinkSegmentComputer newDownloadLinkSegmentComputer() {
+        return new DownloadLinkSegmentComputer(getLocation().getOsDetails(), 
!isPreV3(), Strings.toString(getEntity()));
+    }
+
+    public static class DownloadLinkSegmentComputer {
+        // links are:
+        // 
http://packages.couchbase.com/releases/2.2.0/couchbase-server-community_2.2.0_x86_64.rpm
+        // 
http://packages.couchbase.com/releases/2.2.0/couchbase-server-community_2.2.0_x86_64.deb
+        // ^^^ preV3 is _ everywhere
+        // 
http://packages.couchbase.com/releases/3.0.0/couchbase-server-community_3.0.0-ubuntu12.04_amd64.deb
+        // ^^^ most V3 is _${version}-
+        // 
http://packages.couchbase.com/releases/3.0.0/couchbase-server-community-3.0.0-centos6.x86_64.rpm
+        // ^^^ but RHEL is -${version}-
+
+        @Nullable
+        private final OsDetails os;
+        @Nonnull
+        private final boolean isV3OrLater;
+        @Nonnull
+        private final String context;
+        @Nonnull
+        private final String osName;
+        @Nonnull
+        private final boolean isRpm;
+        @Nonnull
+        private final boolean is64bit;
+
+        public DownloadLinkSegmentComputer(@Nullable OsDetails os, boolean 
isV3OrLater, @Nonnull String context) {
+            this.os = os;
+            this.isV3OrLater = isV3OrLater;
+            this.context = context;
+            if (os == null) {
+                // guess centos as RPM is sensible default
+                log.warn("No details known for OS of " + context + "; assuming 
64-bit RPM distribution of Couchbase");
+                osName = "centos";
+                isRpm = true;
+                is64bit = true;
+                return;
+            }
+            osName = os.getName().toLowerCase();
+            isRpm = !(osName.contains("deb") || osName.contains("ubuntu"));
+            is64bit = os.is64bit();
+        }
+
+        /**
+         * separator after the version number used to be _ but is - in 3.0 and 
later
+         */
+        public String getPreVersionSeparator() {
+            if (!isV3OrLater) return "_";
+            if (isRpm) return "-";
+            return "_";
+        }
+
+        public String getOsTag() {
+            // couchbase only provide certain versions; if on other platforms 
let's suck-it-and-see
+            String family;
+            if (osName.contains("debian")) family = "debian7_";
+            else if (osName.contains("ubuntu")) family = "ubuntu12.04_";
+            else if (osName.contains("centos") || osName.contains("rhel") || 
(osName.contains("red") && osName.contains("hat")))
+                family = "centos6.";
+            else {
+                log.warn("Unrecognised OS " + os + " of " + context + "; 
assuming RPM distribution of Couchbase");
+                family = "centos6.";
+            }
+
+            if (!is64bit && !isV3OrLater) {
+                // NB: 32-bit binaries aren't (yet?) available for v30
+                log.warn("32-bit binaries for Couchbase might not be 
available, when deploying " + context);
+            }
+            String arch = !is64bit ? "x86" : !isRpm && isV3OrLater ? "amd64" : 
"x86_64";
+            String fileExtension = isRpm ? ".rpm" : ".deb";
+
+            if (isV3OrLater)
+                return family + arch + fileExtension;
+            else
+                return arch + fileExtension;
+        }
+
+        public String getOsTagWithPrefix() {
+            return (!isV3OrLater ? "_" : "-") + getOsTag();
+        }
+    }
+
+    @Override
+    public String getDownloadLinkOsTagWithPrefix() {
+        return newDownloadLinkSegmentComputer().getOsTagWithPrefix();
+    }
+
+    @Override
+    public String getDownloadLinkPreVersionSeparator() {
+        return newDownloadLinkSegmentComputer().getPreVersionSeparator();
+    }
+
+    private boolean isPreV3() {
+        return 
NaturalOrderComparator.INSTANCE.compare(getEntity().getConfig(CouchbaseNode.SUGGESTED_VERSION),
 "3.0") < 0;
+    }
+
+    @Override
+    public String getCommunityOrEnterprise() {
+        Boolean isEnterprise = 
getEntity().getConfig(CouchbaseNode.USE_ENTERPRISE);
+        return isEnterprise ? "enterprise" : "community";
+    }
+
+    private String getUsername() {
+        return entity.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME);
+    }
+
+    private String getPassword() {
+        return entity.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD);
+    }
+
+    private String getWebPort() {
+        return "" + 
entity.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT);
+    }
+
+    private String getCouchbaseHostnameAndCredentials() {
+        return format("-c %s:%s -u %s -p %s", getSubnetHostname(), 
getWebPort(), getUsername(), getPassword());
+    }
+
+    private String getCouchbaseHostnameAndPort() {
+        return format("-c %s:%s", getSubnetHostname(), getWebPort());
+    }
+
+    private String getClusterInitRamSize() {
+        return 
entity.getConfig(CouchbaseNode.COUCHBASE_CLUSTER_INIT_RAM_SIZE).toString();
+    }
+
+    @Override
+    public void rebalance() {
+        entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "explicitly 
started");
+        newScript("rebalance")
+                .body.append(
+                couchbaseCli("rebalance") + 
getCouchbaseHostnameAndCredentials())
+                .failOnNonZeroResultCode()
+                .execute();
+
+        // wait until the re-balance is started
+        // (if it's quick, this might miss it, but it will only block for 30s 
if so)
+        Repeater.create()
+                .backoff(Repeater.DEFAULT_REAL_QUICK_PERIOD, 2, 
Duration.millis(500))
+                .limitTimeTo(Duration.THIRTY_SECONDS)
+                .until(new Callable<Boolean>() {
+                           @Override
+                           public Boolean call() throws Exception {
+                               for (HostAndPort nodeHostAndPort : 
getNodesHostAndPort()) {
+                                   if 
(isNodeRebalancing(nodeHostAndPort.toString())) {
+                                       return true;
+                                   }
+                               }
+                               return false;
+                           }
+                       }
+                ).run();
+
+        entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "waiting for 
completion");
+        // Wait until the Couchbase node finishes the re-balancing
+        Task<Boolean> reBalance = TaskBuilder.<Boolean>builder()
+                .name("Waiting until node is rebalancing")
+                .body(new Callable<Boolean>() {
+                    @Override
+                    public Boolean call() throws Exception {
+                        return Repeater.create()
+                                .backoff(Duration.ONE_SECOND, 1.2, 
Duration.TEN_SECONDS)
+                                .limitTimeTo(Duration.FIVE_MINUTES)
+                                .until(new Callable<Boolean>() {
+                                    @Override
+                                    public Boolean call() throws Exception {
+                                        for (HostAndPort nodeHostAndPort : 
getNodesHostAndPort()) {
+                                            if 
(isNodeRebalancing(nodeHostAndPort.toString())) {
+                                                return false;
+                                            }
+                                        }
+                                        return true;
+                                    }
+                                })
+                                .run();
+                        }
+                })
+                .build();
+        Boolean completed = DynamicTasks.queueIfPossible(reBalance)
+                .orSubmitAndBlock()
+                .andWaitForSuccess();
+        if (completed) {
+            entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "completed");
+            
ServiceStateLogic.ServiceNotUpLogic.clearNotUpIndicator(getEntity(), 
"rebalancing");
+            log.info("Rebalanced cluster via primary node {}", getEntity());
+        } else {
+            entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "timed out");
+            
ServiceStateLogic.ServiceNotUpLogic.updateNotUpIndicator(getEntity(), 
"rebalancing", "rebalance did not complete within time limit");
+            log.warn("Timeout rebalancing cluster via primary node {}", 
getEntity());
+        }
+    }
+
+    private Iterable<HostAndPort> getNodesHostAndPort() {
+        Group group = Iterables.getFirst(getEntity().getGroups(), null);
+        if (group == null) return Lists.newArrayList();
+        return 
Iterables.transform(group.getAttribute(CouchbaseCluster.COUCHBASE_CLUSTER_UP_NODES),
+                new Function<Entity, HostAndPort>() {
+                    @Override
+                    public HostAndPort apply(Entity input) {
+                        return 
BrooklynAccessUtils.getBrooklynAccessibleAddress(input, 
input.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT));
+                    }
+                });
+    }
+
+    private boolean isNodeRebalancing(String nodeHostAndPort) {
+        HttpToolResponse response = getApiResponse("http://"; + nodeHostAndPort 
+ "/pools/default/rebalanceProgress");
+        if (response.getResponseCode() != 200) {
+            throw new IllegalStateException("failed retrieving rebalance 
status: " + response);
+        }
+        return !"none".equals(HttpValueFunctions.jsonContents("status", 
String.class).apply(response));
+    }
+
+    private HttpToolResponse getApiResponse(String uri) {
+        return HttpTool.httpGet(HttpTool.httpClientBuilder()
+                        // the uri is required by the HttpClientBuilder in 
order to set the AuthScope of the credentials
+                        .uri(uri)
+                        .credentials(new 
UsernamePasswordCredentials(getUsername(), getPassword()))
+                        .build(),
+                URI.create(uri),
+                ImmutableMap.<String, String>of());
+    }
+
+    @Override
+    public void serverAdd(String serverToAdd, String username, String 
password) {
+        newScript("serverAdd").body.append(couchbaseCli("server-add")
+                + getCouchbaseHostnameAndCredentials() +
+                " --server-add=" + BashStringEscapes.wrapBash(serverToAdd) +
+                " --server-add-username=" + 
BashStringEscapes.wrapBash(username) +
+                " --server-add-password=" + 
BashStringEscapes.wrapBash(password))
+                .failOnNonZeroResultCode()
+                .execute();
+    }
+
+    @Override
+    public void serverAddAndRebalance(String serverToAdd, String username, 
String password) {
+        
newScript("serverAddAndRebalance").body.append(couchbaseCli("rebalance")
+                + getCouchbaseHostnameAndCredentials() +
+                " --server-add=" + BashStringEscapes.wrapBash(serverToAdd) +
+                " --server-add-username=" + 
BashStringEscapes.wrapBash(username) +
+                " --server-add-password=" + 
BashStringEscapes.wrapBash(password))
+                .failOnNonZeroResultCode()
+                .execute();
+        entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "triggered as part 
of server-add");
+    }
+
+    @Override
+    public void bucketCreate(String bucketName, String bucketType, Integer 
bucketPort, Integer bucketRamSize, Integer bucketReplica) {
+        log.info("Adding bucket: {} to cluster {} primary node: {}", new 
Object[]{bucketName, CouchbaseClusterImpl.getClusterOrNode(getEntity()), 
getEntity()});
+
+        newScript("bucketCreate").body.append(couchbaseCli("bucket-create")
+                + getCouchbaseHostnameAndCredentials() +
+                " --bucket=" + BashStringEscapes.wrapBash(bucketName) +
+                " --bucket-type=" + BashStringEscapes.wrapBash(bucketType) +
+                " --bucket-port=" + bucketPort +
+                " --bucket-ramsize=" + bucketRamSize +
+                " --bucket-replica=" + bucketReplica)
+                .failOnNonZeroResultCode()
+                .execute();
+    }
+
+    @Override
+    public void addReplicationRule(Entity toCluster, String fromBucket, String 
toBucket) {
+        
DynamicTasks.queue(DependentConfiguration.attributeWhenReady(toCluster, 
Attributes.SERVICE_UP)).getUnchecked();
+
+        String destName = CouchbaseClusterImpl.getClusterName(toCluster);
+
+        log.info("Setting up XDCR for " + fromBucket + " from " + 
CouchbaseClusterImpl.getClusterName(getEntity()) + " (via " + getEntity() + ") "
+                + "to " + destName + " (" + toCluster + ")");
+
+        Entity destPrimaryNode = 
toCluster.getAttribute(CouchbaseCluster.COUCHBASE_PRIMARY_NODE);
+        String destHostname = 
destPrimaryNode.getAttribute(Attributes.HOSTNAME);
+        String destUsername = 
toCluster.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME);
+        String destPassword = 
toCluster.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD);
+
+        // on the REST API there is mention of a 'type' 'continuous' but i 
don't see other refs to this
+
+        // PROTOCOL   Select REST protocol or memcached for replication. xmem 
indicates memcached while capi indicates REST protocol.
+        // looks like xmem is the default; leave off for now
+//        String replMode = "xmem";
+
+        DynamicTasks.queue(TaskTags.markInessential(SshEffectorTasks.ssh(
+                couchbaseCli("xdcr-setup") +
+                        getCouchbaseHostnameAndCredentials() +
+                        " --create" +
+                        " --xdcr-cluster-name=" + 
BashStringEscapes.wrapBash(destName) +
+                        " --xdcr-hostname=" + 
BashStringEscapes.wrapBash(destHostname) +
+                        " --xdcr-username=" + 
BashStringEscapes.wrapBash(destUsername) +
+                        " --xdcr-password=" + 
BashStringEscapes.wrapBash(destPassword)
+        ).summary("create xdcr destination " + destName).newTask()));
+
+        // would be nice to auto-create bucket, but we'll need to know the 
parameters; the port in particular is tedious
+//        ((CouchbaseNode)destPrimaryNode).bucketCreate(toBucket, "couchbase", 
null, 0, 0);
+
+        DynamicTasks.queue(SshEffectorTasks.ssh(
+                couchbaseCli("xdcr-replicate") +
+                        getCouchbaseHostnameAndCredentials() +
+                        " --create" +
+                        " --xdcr-cluster-name=" + 
BashStringEscapes.wrapBash(destName) +
+                        " --xdcr-from-bucket=" + 
BashStringEscapes.wrapBash(fromBucket) +
+                        " --xdcr-to-bucket=" + 
BashStringEscapes.wrapBash(toBucket)
+//            + " --xdcr-replication-mode="+replMode
+        ).summary("configure replication for " + fromBucket + " to " + 
destName + ":" + toBucket).newTask());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java
 
b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java
new file mode 100644
index 0000000..384434c
--- /dev/null
+++ 
b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.couchbase;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
+import brooklyn.event.basic.Sensors;
+import brooklyn.util.flags.SetFromFlag;
+
+@ImplementedBy(CouchbaseSyncGatewayImpl.class)
+public interface CouchbaseSyncGateway extends SoftwareProcess {
+
+    @SetFromFlag("version")
+    ConfigKey<String> SUGGESTED_VERSION = 
ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION,
+            "1.0-beta3.1");
+
+    @SetFromFlag("downloadUrl")
+    BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new 
BasicAttributeSensorAndConfigKey<String>(
+            SoftwareProcess.DOWNLOAD_URL, 
"http://packages.couchbase.com/releases/couchbase-sync-gateway/1.0-beta/couchbase-sync-gateway-community_${version}_${driver.osTag}";);
+    
+    @SetFromFlag("couchbaseServer")
+    ConfigKey<Entity> COUCHBASE_SERVER = ConfigKeys.newConfigKey(Entity.class, 
"couchbaseSyncGateway.couchbaseNode", 
+            "Couchbase server node or cluster the sync gateway connects to");
+
+    @SetFromFlag("serverPool")
+    ConfigKey<String> COUCHBASE_SERVER_POOL = 
ConfigKeys.newStringConfigKey("couchbaseSyncGateway.serverPool", 
+            "Couchbase Server pool name in which to find buckets", "default");
+    
+    @SetFromFlag("couchbaseServerBucket")
+    ConfigKey<String> COUCHBASE_SERVER_BUCKET = 
ConfigKeys.newStringConfigKey("couchbaseSyncGateway.serverBucket", 
+            "Name of the Couchbase bucket to use", "sync_gateway");
+
+    @SetFromFlag("pretty")
+    ConfigKey<Boolean> PRETTY = 
ConfigKeys.newBooleanConfigKey("couchbaseSyncGateway.pretty", 
+            "Pretty-print JSON responses. This is useful for debugging, but 
reduces performance.", false);
+
+    @SetFromFlag("verbose")
+    ConfigKey<Boolean> VERBOSE = 
ConfigKeys.newBooleanConfigKey("couchbaseSyncGateway.verbose", 
+            "Logs more information about requests.", false);
+
+    AttributeSensor<String> COUCHBASE_SERVER_WEB_URL = 
Sensors.newStringSensor("couchbaseSyncGateway.serverWebUrl", 
+            "The Url and web port of the couchbase server to connect to");
+    
+    AttributeSensor<String> MANAGEMENT_URL = 
Sensors.newStringSensor("coucbaseSyncGateway.managementUrl", 
+            "Management URL for Couchbase Sycn Gateway");
+
+    PortAttributeSensorAndConfigKey SYNC_REST_API_PORT = new 
PortAttributeSensorAndConfigKey("couchbaseSyncGateway.syncRestPort", 
+            "Port the Sync REST API listens on", "4984");
+    
+    PortAttributeSensorAndConfigKey ADMIN_REST_API_PORT = new 
PortAttributeSensorAndConfigKey("couchbaseSyncGateway.adminRestPort", 
+            "Port the Admin REST API listens on", "4985");
+
+}
\ No newline at end of file


Reply via email to