http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java new file mode 100644 index 0000000..f279987 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java @@ -0,0 +1,597 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.couchbase; + +import static brooklyn.util.JavaGroovyEquivalents.groovyTruth; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import javax.annotation.Nonnull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.config.render.RendererHints; +import brooklyn.enricher.Enrichers; +import brooklyn.entity.Entity; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.basic.EntityInternal; +import brooklyn.entity.basic.ServiceStateLogic; +import brooklyn.entity.basic.SoftwareProcess; +import brooklyn.entity.effector.Effectors; +import brooklyn.entity.group.AbstractMembershipTrackingPolicy; +import brooklyn.entity.group.DynamicClusterImpl; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.entity.trait.Startable; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.DependentConfiguration; +import brooklyn.event.feed.http.HttpFeed; +import brooklyn.event.feed.http.HttpPollConfig; +import brooklyn.event.feed.http.HttpValueFunctions; +import brooklyn.event.feed.http.JsonFunctions; +import brooklyn.location.access.BrooklynAccessUtils; +import brooklyn.policy.PolicySpec; +import brooklyn.util.collections.CollectionFunctionals; +import brooklyn.util.collections.MutableSet; +import brooklyn.util.collections.QuorumCheck; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.guava.Functionals; +import brooklyn.util.guava.IfFunctions; +import brooklyn.util.math.MathPredicates; +import brooklyn.util.task.DynamicTasks; +import brooklyn.util.task.TaskBuilder; +import brooklyn.util.task.Tasks; +import brooklyn.util.text.ByteSizeStrings; +import brooklyn.util.text.StringFunctions; +import brooklyn.util.text.Strings; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.net.HostAndPort; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; + +public class CouchbaseClusterImpl extends DynamicClusterImpl implements CouchbaseCluster { + + /* + * Refactoring required: + * + * Currently, on start() the cluster waits for an arbitrary SERVICE_UP_TIME_OUT (3 minutes) before assuming that a quorate + * number of servers are available. The servers are then added to the cluster, and a further wait period of + * DELAY_BEFORE_ADVERTISING_CLUSTER (30 seconds) is used before advertising the cluster + * + * DELAY_BEFORE_ADVERTISING_CLUSTER: It should be possible to refactor this away by adding a repeater that will poll + * the REST API of the primary node (once established) until the API indicates that the cluster is available + * + * SERVICE_UP_TIME_OUT: The refactoring of this would be more substantial. One method would be to remove the bulk of the + * logic from the start() method, and rely entirely on the membership tracking policy and the onServerPoolMemberChanged() + * method. The addition of a RUNNING sensor on the nodes would allow the cluster to determine that a node is up and + * running but has not yet been added to the cluster. The IS_CLUSTER_INITIALIZED key could be used to determine whether + * or not the cluster should be initialized, or a node simply added to an existing cluster. A repeater could be used + * in the driver's to ensure that the method does not return until the node has been fully added + * + * There is an (incomplete) first-pass at this here: https://github.com/Nakomis/incubator-brooklyn/compare/couchbase-running-sensor + * however, there have been significant changes to the cluster initialization since that work was done so it will probably + * need to be re-done + * + * Additionally, during bucket creation, a HttpPoll is used to check that the bucket has been created. This should be + * refactored to use a Repeater in CouchbaseNodeSshDriver.bucketCreate() in a similar way to the one employed in + * CouchbaseNodeSshDriver.rebalance(). Were this done, this class could simply queue the bucket creation tasks + * + */ + + private static final Logger log = LoggerFactory.getLogger(CouchbaseClusterImpl.class); + private final Object mutex = new Object[0]; + // Used to serialize bucket creation as only one bucket can be created at a time, + // so a feed is used to determine when a bucket has finished being created + private final AtomicReference<HttpFeed> resetBucketCreation = new AtomicReference<HttpFeed>(); + + public void init() { + log.info("Initializing the Couchbase cluster..."); + super.init(); + + addEnricher( + Enrichers.builder() + .transforming(COUCHBASE_CLUSTER_UP_NODES) + .from(this) + .publishing(COUCHBASE_CLUSTER_UP_NODE_ADDRESSES) + .computing(new ListOfHostAndPort()).build() ); + addEnricher( + Enrichers.builder() + .transforming(COUCHBASE_CLUSTER_UP_NODE_ADDRESSES) + .from(this) + .publishing(COUCHBASE_CLUSTER_CONNECTION_URL) + .computing( + IfFunctions.<List<String>>ifPredicate( + Predicates.compose(MathPredicates.lessThan(getConfig(CouchbaseCluster.INITIAL_QUORUM_SIZE)), + CollectionFunctionals.sizeFunction(0)) ) + .value((String)null) + .defaultApply( + Functionals.chain( + CollectionFunctionals.<String,List<String>>limit(4), + StringFunctions.joiner(","), + StringFunctions.formatter("http://%s/"))) ) + .build() ); + + Map<? extends AttributeSensor<? extends Number>, ? extends AttributeSensor<? extends Number>> enricherSetup = + ImmutableMap.<AttributeSensor<? extends Number>, AttributeSensor<? extends Number>>builder() + .put(CouchbaseNode.OPS, CouchbaseCluster.OPS_PER_NODE) + .put(CouchbaseNode.COUCH_DOCS_DATA_SIZE, CouchbaseCluster.COUCH_DOCS_DATA_SIZE_PER_NODE) + .put(CouchbaseNode.COUCH_DOCS_ACTUAL_DISK_SIZE, CouchbaseCluster.COUCH_DOCS_ACTUAL_DISK_SIZE_PER_NODE) + .put(CouchbaseNode.EP_BG_FETCHED, CouchbaseCluster.EP_BG_FETCHED_PER_NODE) + .put(CouchbaseNode.MEM_USED, CouchbaseCluster.MEM_USED_PER_NODE) + .put(CouchbaseNode.COUCH_VIEWS_ACTUAL_DISK_SIZE, CouchbaseCluster.COUCH_VIEWS_ACTUAL_DISK_SIZE_PER_NODE) + .put(CouchbaseNode.CURR_ITEMS, CouchbaseCluster.CURR_ITEMS_PER_NODE) + .put(CouchbaseNode.VB_REPLICA_CURR_ITEMS, CouchbaseCluster.VB_REPLICA_CURR_ITEMS_PER_NODE) + .put(CouchbaseNode.COUCH_VIEWS_DATA_SIZE, CouchbaseCluster.COUCH_VIEWS_DATA_SIZE_PER_NODE) + .put(CouchbaseNode.GET_HITS, CouchbaseCluster.GET_HITS_PER_NODE) + .put(CouchbaseNode.CMD_GET, CouchbaseCluster.CMD_GET_PER_NODE) + .put(CouchbaseNode.CURR_ITEMS_TOT, CouchbaseCluster.CURR_ITEMS_TOT_PER_NODE) + .build(); + + for (AttributeSensor<? extends Number> nodeSensor : enricherSetup.keySet()) { + addSummingMemberEnricher(nodeSensor); + addAveragingMemberEnricher(nodeSensor, enricherSetup.get(nodeSensor)); + } + + addEnricher(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS) + .from(IS_CLUSTER_INITIALIZED).computing( + IfFunctions.ifNotEquals(true).value("The cluster is not yet completely initialized") + .defaultValue(null).build()).build() ); + } + + private void addAveragingMemberEnricher(AttributeSensor<? extends Number> fromSensor, AttributeSensor<? extends Number> toSensor) { + addEnricher(Enrichers.builder() + .aggregating(fromSensor) + .publishing(toSensor) + .fromMembers() + .computingAverage() + .build() + ); + } + + private void addSummingMemberEnricher(AttributeSensor<? extends Number> source) { + addEnricher(Enrichers.builder() + .aggregating(source) + .publishing(source) + .fromMembers() + .computingSum() + .build() + ); + } + + @Override + protected void doStart() { + setAttribute(IS_CLUSTER_INITIALIZED, false); + + super.doStart(); + + connectSensors(); + + setAttribute(BUCKET_CREATION_IN_PROGRESS, false); + + //start timeout before adding the servers + Tasks.setBlockingDetails("Pausing while Couchbase stabilizes"); + Time.sleep(getConfig(NODES_STARTED_STABILIZATION_DELAY)); + + Optional<Set<Entity>> upNodes = Optional.<Set<Entity>>fromNullable(getAttribute(COUCHBASE_CLUSTER_UP_NODES)); + if (upNodes.isPresent() && !upNodes.get().isEmpty()) { + + Tasks.setBlockingDetails("Adding servers to Couchbase"); + + //TODO: select a new primary node if this one fails + Entity primaryNode = upNodes.get().iterator().next(); + ((EntityInternal) primaryNode).setAttribute(CouchbaseNode.IS_PRIMARY_NODE, true); + setAttribute(COUCHBASE_PRIMARY_NODE, primaryNode); + + Set<Entity> serversToAdd = MutableSet.<Entity>copyOf(getUpNodes()); + + if (serversToAdd.size() >= getQuorumSize() && serversToAdd.size() > 1) { + log.info("Number of SERVICE_UP nodes:{} in cluster:{} reached Quorum:{}, adding the servers", new Object[]{serversToAdd.size(), getId(), getQuorumSize()}); + addServers(serversToAdd); + + //wait for servers to be added to the couchbase server + try { + Tasks.setBlockingDetails("Delaying before advertising cluster up"); + Time.sleep(getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER)); + } finally { + Tasks.resetBlockingDetails(); + } + + ((CouchbaseNode)getPrimaryNode()).rebalance(); + } else { + if (getQuorumSize()>1) { + log.warn(this+" is not quorate; will likely fail later, but proceeding for now"); + } + for (Entity server: serversToAdd) { + ((EntityInternal) server).setAttribute(CouchbaseNode.IS_IN_CLUSTER, true); + } + } + + if (getConfig(CREATE_BUCKETS)!=null) { + try { + Tasks.setBlockingDetails("Creating buckets in Couchbase"); + + createBuckets(); + DependentConfiguration.waitInTaskForAttributeReady(this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false)); + + } finally { + Tasks.resetBlockingDetails(); + } + } + + if (getConfig(REPLICATION)!=null) { + try { + Tasks.setBlockingDetails("Configuring replication rules"); + + List<Map<String, Object>> replRules = getConfig(REPLICATION); + for (Map<String, Object> replRule: replRules) { + DynamicTasks.queue(Effectors.invocation(getPrimaryNode(), CouchbaseNode.ADD_REPLICATION_RULE, replRule)); + } + DynamicTasks.waitForLast(); + + } finally { + Tasks.resetBlockingDetails(); + } + } + + setAttribute(IS_CLUSTER_INITIALIZED, true); + + } else { + throw new IllegalStateException("No up nodes available after starting"); + } + } + + @Override + public void stop() { + if (resetBucketCreation.get() != null) { + resetBucketCreation.get().stop(); + } + super.stop(); + } + + protected void connectSensors() { + addPolicy(PolicySpec.create(MemberTrackingPolicy.class) + .displayName("Controller targets tracker") + .configure("group", this)); + } + + private final static class ListOfHostAndPort implements Function<Set<Entity>, List<String>> { + @Override public List<String> apply(Set<Entity> input) { + List<String> addresses = Lists.newArrayList(); + for (Entity entity : input) { + addresses.add(String.format("%s", + BrooklynAccessUtils.getBrooklynAccessibleAddress(entity, entity.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT)))); + } + return addresses; + } + } + + public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy { + @Override protected void onEntityChange(Entity member) { + ((CouchbaseClusterImpl)entity).onServerPoolMemberChanged(member); + } + + @Override protected void onEntityAdded(Entity member) { + ((CouchbaseClusterImpl)entity).onServerPoolMemberChanged(member); + } + + @Override protected void onEntityRemoved(Entity member) { + ((CouchbaseClusterImpl)entity).onServerPoolMemberChanged(member); + } + }; + + protected synchronized void onServerPoolMemberChanged(Entity member) { + if (log.isTraceEnabled()) log.trace("For {}, considering membership of {} which is in locations {}", + new Object[]{this, member, member.getLocations()}); + + //FIXME: make use of servers to be added after cluster initialization. + synchronized (mutex) { + if (belongsInServerPool(member)) { + + Optional<Set<Entity>> upNodes = Optional.fromNullable(getUpNodes()); + if (upNodes.isPresent()) { + + if (!upNodes.get().contains(member)) { + Set<Entity> newNodes = Sets.newHashSet(getUpNodes()); + newNodes.add(member); + setAttribute(COUCHBASE_CLUSTER_UP_NODES, newNodes); + + //add to set of servers to be added. + if (isClusterInitialized()) { + addServer(member); + } + } + } else { + Set<Entity> newNodes = Sets.newHashSet(); + newNodes.add(member); + setAttribute(COUCHBASE_CLUSTER_UP_NODES, newNodes); + + if (isClusterInitialized()) { + addServer(member); + } + } + } else { + Set<Entity> upNodes = getUpNodes(); + if (upNodes != null && upNodes.contains(member)) { + upNodes.remove(member); + setAttribute(COUCHBASE_CLUSTER_UP_NODES, upNodes); + log.info("Removing couchbase node {}: {}; from cluster", new Object[]{this, member}); + } + } + if (log.isTraceEnabled()) log.trace("Done {} checkEntity {}", this, member); + } + } + + protected boolean belongsInServerPool(Entity member) { + if (!groovyTruth(member.getAttribute(Startable.SERVICE_UP))) { + if (log.isTraceEnabled()) log.trace("Members of {}, checking {}, eliminating because not up", this, member); + return false; + } + if (!getMembers().contains(member)) { + if (log.isTraceEnabled()) + log.trace("Members of {}, checking {}, eliminating because not member", this, member); + + return false; + } + if (log.isTraceEnabled()) log.trace("Members of {}, checking {}, approving", this, member); + + return true; + } + + + protected EntitySpec<?> getMemberSpec() { + EntitySpec<?> result = super.getMemberSpec(); + if (result != null) return result; + return EntitySpec.create(CouchbaseNode.class); + } + + @Override + public int getQuorumSize() { + Integer quorumSize = getConfig(CouchbaseCluster.INITIAL_QUORUM_SIZE); + if (quorumSize != null && quorumSize > 0) + return quorumSize; + // by default the quorum would be floor(initial_cluster_size/2) + 1 + return (int) Math.floor(getConfig(INITIAL_SIZE) / 2) + 1; + } + + protected int getActualSize() { + return Optional.fromNullable(getAttribute(CouchbaseCluster.ACTUAL_CLUSTER_SIZE)).or(-1); + } + + private Set<Entity> getUpNodes() { + return getAttribute(COUCHBASE_CLUSTER_UP_NODES); + } + + private CouchbaseNode getPrimaryNode() { + return (CouchbaseNode) getAttribute(COUCHBASE_PRIMARY_NODE); + } + + @Override + protected void initEnrichers() { + addEnricher(Enrichers.builder().updatingMap(ServiceStateLogic.SERVICE_NOT_UP_INDICATORS) + .from(COUCHBASE_CLUSTER_UP_NODES) + .computing(new Function<Set<Entity>, Object>() { + @Override + public Object apply(Set<Entity> input) { + if (input==null) return "Couchbase up nodes not set"; + if (input.isEmpty()) return "No Couchbase up nodes"; + if (input.size() < getQuorumSize()) return "Couchbase up nodes not quorate"; + return null; + } + }).build()); + + if (config().getLocalRaw(UP_QUORUM_CHECK).isAbsent()) { + // TODO Only leaving CouchbaseQuorumCheck here in case it is contained in persisted state. + // If so, need a transformer and then to delete it + @SuppressWarnings({ "unused", "hiding" }) + @Deprecated + class CouchbaseQuorumCheck implements QuorumCheck { + @Override + public boolean isQuorate(int sizeHealthy, int totalSize) { + // check members count passed in AND the sensor + if (sizeHealthy < getQuorumSize()) return false; + return true; + } + } + config().set(UP_QUORUM_CHECK, new CouchbaseClusterImpl.CouchbaseQuorumCheck(this)); + } + super.initEnrichers(); + } + + static class CouchbaseQuorumCheck implements QuorumCheck { + private final CouchbaseCluster cluster; + CouchbaseQuorumCheck(CouchbaseCluster cluster) { + this.cluster = cluster; + } + @Override + public boolean isQuorate(int sizeHealthy, int totalSize) { + // check members count passed in AND the sensor + if (sizeHealthy < cluster.getQuorumSize()) return false; + return true; + } + } + protected void addServers(Set<Entity> serversToAdd) { + Preconditions.checkNotNull(serversToAdd); + for (Entity s : serversToAdd) { + addServerSeveralTimes(s, 12, Duration.TEN_SECONDS); + } + } + + /** try adding in a loop because we are seeing spurious port failures in AWS */ + protected void addServerSeveralTimes(Entity s, int numRetries, Duration delayOnFailure) { + try { + addServer(s); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + if (numRetries<=0) throw Exceptions.propagate(e); + // retry once after sleep because we are getting some odd primary-change events + log.warn("Error adding "+s+" to "+this+", "+numRetries+" retries remaining, will retry after delay ("+e+")"); + Time.sleep(delayOnFailure); + addServerSeveralTimes(s, numRetries-1, delayOnFailure); + } + } + + protected void addServer(Entity serverToAdd) { + Preconditions.checkNotNull(serverToAdd); + if (serverToAdd.equals(getPrimaryNode())) { + // no need to add; but we pass it in anyway because it makes the calling logic easier + return; + } + if (!isMemberInCluster(serverToAdd)) { + HostAndPort webAdmin = HostAndPort.fromParts(serverToAdd.getAttribute(SoftwareProcess.SUBNET_HOSTNAME), + serverToAdd.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT)); + String username = serverToAdd.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME); + String password = serverToAdd.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD); + + if (isClusterInitialized()) { + Entities.invokeEffectorWithArgs(this, getPrimaryNode(), CouchbaseNode.SERVER_ADD_AND_REBALANCE, webAdmin.toString(), username, password).getUnchecked(); + } else { + Entities.invokeEffectorWithArgs(this, getPrimaryNode(), CouchbaseNode.SERVER_ADD, webAdmin.toString(), username, password).getUnchecked(); + } + //FIXME check feedback of whether the server was added. + ((EntityInternal) serverToAdd).setAttribute(CouchbaseNode.IS_IN_CLUSTER, true); + } + } + + /** finds the cluster name specified for a node or a cluster, + * using {@link CouchbaseCluster#CLUSTER_NAME} or falling back to the cluster (or node) ID. */ + public static String getClusterName(Entity node) { + String name = node.getConfig(CLUSTER_NAME); + if (!Strings.isBlank(name)) return Strings.makeValidFilename(name); + return getClusterOrNode(node).getId(); + } + + /** returns Couchbase cluster in ancestry, defaulting to the given node if none */ + @Nonnull public static Entity getClusterOrNode(Entity node) { + Iterable<CouchbaseCluster> clusterNodes = Iterables.filter(Entities.ancestors(node), CouchbaseCluster.class); + return Iterables.getFirst(clusterNodes, node); + } + + public boolean isClusterInitialized() { + return Optional.fromNullable(getAttribute(IS_CLUSTER_INITIALIZED)).or(false); + } + + public boolean isMemberInCluster(Entity e) { + return Optional.fromNullable(e.getAttribute(CouchbaseNode.IS_IN_CLUSTER)).or(false); + } + + public void createBuckets() { + //TODO: check for port conflicts if buckets are being created with a port + List<Map<String, Object>> bucketsToCreate = getConfig(CREATE_BUCKETS); + if (bucketsToCreate==null) return; + + Entity primaryNode = getPrimaryNode(); + + for (Map<String, Object> bucketMap : bucketsToCreate) { + String bucketName = bucketMap.containsKey("bucket") ? (String) bucketMap.get("bucket") : "default"; + String bucketType = bucketMap.containsKey("bucket-type") ? (String) bucketMap.get("bucket-type") : "couchbase"; + // default bucket must be on this port; other buckets can (must) specify their own (unique) port + Integer bucketPort = bucketMap.containsKey("bucket-port") ? (Integer) bucketMap.get("bucket-port") : 11211; + Integer bucketRamSize = bucketMap.containsKey("bucket-ramsize") ? (Integer) bucketMap.get("bucket-ramsize") : 100; + Integer bucketReplica = bucketMap.containsKey("bucket-replica") ? (Integer) bucketMap.get("bucket-replica") : 1; + + createBucket(primaryNode, bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica); + } + } + + public void createBucket(final Entity primaryNode, final String bucketName, final String bucketType, final Integer bucketPort, final Integer bucketRamSize, final Integer bucketReplica) { + DynamicTasks.queueIfPossible(TaskBuilder.<Void>builder().name("Creating bucket " + bucketName).body( + new Callable<Void>() { + @Override + public Void call() throws Exception { + DependentConfiguration.waitInTaskForAttributeReady(CouchbaseClusterImpl.this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false)); + if (CouchbaseClusterImpl.this.resetBucketCreation.get() != null) { + CouchbaseClusterImpl.this.resetBucketCreation.get().stop(); + } + setAttribute(CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, true); + HostAndPort hostAndPort = BrooklynAccessUtils.getBrooklynAccessibleAddress(primaryNode, primaryNode.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT)); + + CouchbaseClusterImpl.this.resetBucketCreation.set(HttpFeed.builder() + .entity(CouchbaseClusterImpl.this) + .period(500, TimeUnit.MILLISECONDS) + .baseUri(String.format("http://%s/pools/default/buckets/%s", hostAndPort, bucketName)) + .credentials(primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME), primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD)) + .poll(new HttpPollConfig<Boolean>(BUCKET_CREATION_IN_PROGRESS) + .onSuccess(Functionals.chain(HttpValueFunctions.jsonContents(), JsonFunctions.walkN("nodes"), new Function<JsonElement, Boolean>() { + @Override + public Boolean apply(JsonElement input) { + // Wait until bucket has been created on all nodes and the couchApiBase element has been published (indicating that the bucket is useable) + JsonArray servers = input.getAsJsonArray(); + if (servers.size() != CouchbaseClusterImpl.this.getMembers().size()) { + return true; + } + for (JsonElement server : servers) { + Object api = server.getAsJsonObject().get("couchApiBase"); + if (api == null || Strings.isEmpty(String.valueOf(api))) { + return true; + } + } + return false; + } + })) + .onFailureOrException(new Function<Object, Boolean>() { + @Override + public Boolean apply(Object input) { + if (input instanceof brooklyn.util.http.HttpToolResponse) { + if (((brooklyn.util.http.HttpToolResponse) input).getResponseCode() == 404) { + return true; + } + } + if (input instanceof Throwable) + Exceptions.propagate((Throwable) input); + throw new IllegalStateException("Unexpected response when creating bucket:" + input); + } + })) + .build()); + + // TODO: Bail out if bucket creation fails, to allow next bucket to proceed + Entities.invokeEffectorWithArgs(CouchbaseClusterImpl.this, primaryNode, CouchbaseNode.BUCKET_CREATE, bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica); + DependentConfiguration.waitInTaskForAttributeReady(CouchbaseClusterImpl.this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false)); + if (CouchbaseClusterImpl.this.resetBucketCreation.get() != null) { + CouchbaseClusterImpl.this.resetBucketCreation.get().stop(); + } + return null; + } + } + ).build()).orSubmitAndBlock(); + } + + static { + RendererHints.register(COUCH_DOCS_DATA_SIZE_PER_NODE, RendererHints.displayValue(ByteSizeStrings.metric())); + RendererHints.register(COUCH_DOCS_ACTUAL_DISK_SIZE_PER_NODE, RendererHints.displayValue(ByteSizeStrings.metric())); + RendererHints.register(MEM_USED_PER_NODE, RendererHints.displayValue(ByteSizeStrings.metric())); + RendererHints.register(COUCH_VIEWS_ACTUAL_DISK_SIZE_PER_NODE, RendererHints.displayValue(ByteSizeStrings.metric())); + RendererHints.register(COUCH_VIEWS_DATA_SIZE_PER_NODE, RendererHints.displayValue(ByteSizeStrings.metric())); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNode.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNode.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNode.java new file mode 100644 index 0000000..16434fa --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNode.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.couchbase; + +import java.net.URI; + +import org.apache.brooklyn.catalog.Catalog; +import brooklyn.config.ConfigKey; +import brooklyn.config.render.RendererHints; +import brooklyn.entity.annotation.Effector; +import brooklyn.entity.annotation.EffectorParam; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.MethodEffector; +import brooklyn.entity.basic.SoftwareProcess; +import brooklyn.entity.effector.Effectors; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; +import brooklyn.event.basic.PortAttributeSensorAndConfigKey; +import brooklyn.event.basic.Sensors; +import brooklyn.util.flags.SetFromFlag; +import brooklyn.util.text.ByteSizeStrings; + +@Catalog(name="CouchBase Node", description="Couchbase Server is an open source, distributed (shared-nothing architecture) " + + "NoSQL document-oriented database that is optimized for interactive applications.") +@ImplementedBy(CouchbaseNodeImpl.class) +public interface CouchbaseNode extends SoftwareProcess { + + @SetFromFlag("adminUsername") + ConfigKey<String> COUCHBASE_ADMIN_USERNAME = ConfigKeys.newStringConfigKey("couchbase.adminUsername", "Username for the admin user on the node", "Administrator"); + + @SetFromFlag("adminPassword") + ConfigKey<String> COUCHBASE_ADMIN_PASSWORD = ConfigKeys.newStringConfigKey("couchbase.adminPassword", "Password for the admin user on the node", "Password"); + + @SetFromFlag("version") + ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, + "3.0.0"); + + @SetFromFlag("enterprise") + ConfigKey<Boolean> USE_ENTERPRISE = ConfigKeys.newBooleanConfigKey("couchbase.enterprise.enabled", + "Whether to use Couchbase Enterprise; if false uses the community version. Defaults to true.", true); + + @SetFromFlag("downloadUrl") + BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>( + SoftwareProcess.DOWNLOAD_URL, "http://packages.couchbase.com/releases/${version}/" + + "couchbase-server-${driver.communityOrEnterprise}${driver.downloadLinkPreVersionSeparator}${version}${driver.downloadLinkOsTagWithPrefix}"); + + @SetFromFlag("clusterInitRamSize") + BasicAttributeSensorAndConfigKey<Integer> COUCHBASE_CLUSTER_INIT_RAM_SIZE = new BasicAttributeSensorAndConfigKey<Integer>( + Integer.class, "couchbase.clusterInitRamSize", "initial ram size of the cluster", 300); + + PortAttributeSensorAndConfigKey COUCHBASE_WEB_ADMIN_PORT = new PortAttributeSensorAndConfigKey("couchbase.webAdminPort", "Web Administration Port", "8091+"); + PortAttributeSensorAndConfigKey COUCHBASE_API_PORT = new PortAttributeSensorAndConfigKey("couchbase.apiPort", "Couchbase API Port", "8092+"); + PortAttributeSensorAndConfigKey COUCHBASE_INTERNAL_BUCKET_PORT = new PortAttributeSensorAndConfigKey("couchbase.internalBucketPort", "Internal Bucket Port", "11209"); + PortAttributeSensorAndConfigKey COUCHBASE_INTERNAL_EXTERNAL_BUCKET_PORT = new PortAttributeSensorAndConfigKey("couchbase.internalExternalBucketPort", "Internal/External Bucket Port", "11210"); + PortAttributeSensorAndConfigKey COUCHBASE_CLIENT_INTERFACE_PROXY = new PortAttributeSensorAndConfigKey("couchbase.clientInterfaceProxy", "Client interface (proxy)", "11211"); + PortAttributeSensorAndConfigKey COUCHBASE_INCOMING_SSL_PROXY = new PortAttributeSensorAndConfigKey("couchbase.incomingSslProxy", "Incoming SSL Proxy", "11214"); + PortAttributeSensorAndConfigKey COUCHBASE_INTERNAL_OUTGOING_SSL_PROXY = new PortAttributeSensorAndConfigKey("couchbase.internalOutgoingSslProxy", "Internal Outgoing SSL Proxy", "11215"); + PortAttributeSensorAndConfigKey COUCHBASE_REST_HTTPS_FOR_SSL = new PortAttributeSensorAndConfigKey("couchbase.internalRestHttpsForSsl", "Internal REST HTTPS for SSL", "18091"); + PortAttributeSensorAndConfigKey COUCHBASE_CAPI_HTTPS_FOR_SSL = new PortAttributeSensorAndConfigKey("couchbase.internalCapiHttpsForSsl", "Internal CAPI HTTPS for SSL", "18092"); + PortAttributeSensorAndConfigKey ERLANG_PORT_MAPPER = new PortAttributeSensorAndConfigKey("couchbase.erlangPortMapper", "Erlang Port Mapper Daemon Listener Port (epmd)", "4369"); + PortAttributeSensorAndConfigKey NODE_DATA_EXCHANGE_PORT_RANGE_START = new PortAttributeSensorAndConfigKey("couchbase.nodeDataExchangePortRangeStart", "Node data exchange Port Range Start", "21100+"); + PortAttributeSensorAndConfigKey NODE_DATA_EXCHANGE_PORT_RANGE_END = new PortAttributeSensorAndConfigKey("couchbase.nodeDataExchangePortRangeEnd", "Node data exchange Port Range End", "21199+"); + + AttributeSensor<Boolean> IS_PRIMARY_NODE = Sensors.newBooleanSensor("couchbase.isPrimaryNode", "flag to determine if the current couchbase node is the primary node for the cluster"); + AttributeSensor<Boolean> IS_IN_CLUSTER = Sensors.newBooleanSensor("couchbase.isInCluster", "flag to determine if the current couchbase node has been added to a cluster, " + + "including being the first / primary node"); + AttributeSensor<URI> COUCHBASE_WEB_ADMIN_URL = Attributes.MAIN_URI; + + // Interesting stats + AttributeSensor<Double> OPS = Sensors.newDoubleSensor("couchbase.stats.ops", + "Retrieved from pools/nodes/<current node>/interestingStats/ops"); + AttributeSensor<Long> COUCH_DOCS_DATA_SIZE = Sensors.newLongSensor("couchbase.stats.couch.docs.data.size", + "Retrieved from pools/nodes/<current node>/interestingStats/couch_docs_data_size"); + AttributeSensor<Long> COUCH_DOCS_ACTUAL_DISK_SIZE = Sensors.newLongSensor("couchbase.stats.couch.docs.actual.disk.size", + "Retrieved from pools/nodes/<current node>/interestingStats/couch_docs_actual_disk_size"); + AttributeSensor<Long> EP_BG_FETCHED = Sensors.newLongSensor("couchbase.stats.ep.bg.fetched", + "Retrieved from pools/nodes/<current node>/interestingStats/ep_bg_fetched"); + AttributeSensor<Long> MEM_USED = Sensors.newLongSensor("couchbase.stats.mem.used", + "Retrieved from pools/nodes/<current node>/interestingStats/mem_used"); + AttributeSensor<Long> COUCH_VIEWS_ACTUAL_DISK_SIZE = Sensors.newLongSensor("couchbase.stats.couch.views.actual.disk.size", + "Retrieved from pools/nodes/<current node>/interestingStats/couch_views_actual_disk_size"); + AttributeSensor<Long> CURR_ITEMS = Sensors.newLongSensor("couchbase.stats.curr.items", + "Retrieved from pools/nodes/<current node>/interestingStats/curr_items"); + AttributeSensor<Long> VB_REPLICA_CURR_ITEMS = Sensors.newLongSensor("couchbase.stats.vb.replica.curr.items", + "Retrieved from pools/nodes/<current node>/interestingStats/vb_replica_curr_items"); + AttributeSensor<Long> COUCH_VIEWS_DATA_SIZE = Sensors.newLongSensor("couchbase.stats.couch.views.data.size", + "Retrieved from pools/nodes/<current node>/interestingStats/couch_views_data_size"); + AttributeSensor<Long> GET_HITS = Sensors.newLongSensor("couchbase.stats.get.hits", + "Retrieved from pools/nodes/<current node>/interestingStats/get_hits"); + AttributeSensor<Double> CMD_GET = Sensors.newDoubleSensor("couchbase.stats.cmd.get", + "Retrieved from pools/nodes/<current node>/interestingStats/cmd_get"); + AttributeSensor<Long> CURR_ITEMS_TOT = Sensors.newLongSensor("couchbase.stats.curr.items.tot", + "Retrieved from pools/nodes/<current node>/interestingStats/curr_items_tot"); + AttributeSensor<String> REBALANCE_STATUS = Sensors.newStringSensor("couchbase.rebalance.status", + "Displays the current rebalance status from pools/nodes/rebalanceStatus"); + + class MainUri { + public static final AttributeSensor<URI> MAIN_URI = Attributes.MAIN_URI; + + static { + // ROOT_URL does not need init because it refers to something already initialized + RendererHints.register(COUCHBASE_WEB_ADMIN_URL, RendererHints.namedActionWithUrl()); + + RendererHints.register(COUCH_DOCS_DATA_SIZE, RendererHints.displayValue(ByteSizeStrings.metric())); + RendererHints.register(COUCH_DOCS_ACTUAL_DISK_SIZE, RendererHints.displayValue(ByteSizeStrings.metric())); + RendererHints.register(MEM_USED, RendererHints.displayValue(ByteSizeStrings.metric())); + RendererHints.register(COUCH_VIEWS_ACTUAL_DISK_SIZE, RendererHints.displayValue(ByteSizeStrings.metric())); + RendererHints.register(COUCH_VIEWS_DATA_SIZE, RendererHints.displayValue(ByteSizeStrings.metric())); + } + } + + // this long-winded reference is done just to trigger the initialization above + AttributeSensor<URI> MAIN_URI = MainUri.MAIN_URI; + + MethodEffector<Void> SERVER_ADD = new MethodEffector<Void>(CouchbaseNode.class, "serverAdd"); + MethodEffector<Void> SERVER_ADD_AND_REBALANCE = new MethodEffector<Void>(CouchbaseNode.class, "serverAddAndRebalance"); + MethodEffector<Void> REBALANCE = new MethodEffector<Void>(CouchbaseNode.class, "rebalance"); + MethodEffector<Void> BUCKET_CREATE = new MethodEffector<Void>(CouchbaseNode.class, "bucketCreate"); + brooklyn.entity.Effector<Void> ADD_REPLICATION_RULE = Effectors.effector(Void.class, "addReplicationRule") + .description("Adds a replication rule from the indicated bucket on the cluster where this node is located " + + "to the indicated cluster and optional destination bucket") + .parameter(String.class, "fromBucket", "Bucket to be replicated") + .parameter(Object.class, "toCluster", "Entity (or ID) of the cluster to which this should replicate") + .parameter(String.class, "toBucket", "Destination bucket for replication in the toCluster, defaulting to the same as the fromBucket") + .buildAbstract(); + + @Effector(description = "add a server to a cluster") + public void serverAdd(@EffectorParam(name = "serverHostname") String serverToAdd, @EffectorParam(name = "username") String username, @EffectorParam(name = "password") String password); + + @Effector(description = "add a server to a cluster, and immediately rebalances") + public void serverAddAndRebalance(@EffectorParam(name = "serverHostname") String serverToAdd, @EffectorParam(name = "username") String username, @EffectorParam(name = "password") String password); + + @Effector(description = "rebalance the couchbase cluster") + public void rebalance(); + + @Effector(description = "create a new bucket") + public void bucketCreate(@EffectorParam(name = "bucketName") String bucketName, @EffectorParam(name = "bucketType") String bucketType, + @EffectorParam(name = "bucketPort") Integer bucketPort, @EffectorParam(name = "bucketRamSize") Integer bucketRamSize, + @EffectorParam(name = "bucketReplica") Integer bucketReplica); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java new file mode 100644 index 0000000..1ff28fd --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.couchbase; + +import brooklyn.entity.Entity; +import brooklyn.entity.basic.SoftwareProcessDriver; + +public interface CouchbaseNodeDriver extends SoftwareProcessDriver { + public String getOsTag(); + public String getDownloadLinkPreVersionSeparator(); + public String getDownloadLinkOsTagWithPrefix(); + + public String getCommunityOrEnterprise(); + + public void serverAdd(String serverToAdd, String username, String password); + + public void rebalance(); + + public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer bucketRamSize, Integer bucketReplica); + + public void serverAddAndRebalance(String serverToAdd, String username, String password); + + public void addReplicationRule(Entity toCluster, String fromBucket, String toBucket); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java new file mode 100644 index 0000000..a0654db --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.couchbase; + +import static java.lang.String.format; + +import java.net.URI; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.http.auth.UsernamePasswordCredentials; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.Entity; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.SoftwareProcessImpl; +import brooklyn.entity.effector.EffectorBody; +import brooklyn.event.AttributeSensor; +import brooklyn.event.SensorEvent; +import brooklyn.event.SensorEventListener; +import brooklyn.event.feed.http.HttpFeed; +import brooklyn.event.feed.http.HttpPollConfig; +import brooklyn.event.feed.http.HttpValueFunctions; +import brooklyn.event.feed.http.JsonFunctions; +import brooklyn.location.MachineProvisioningLocation; +import brooklyn.location.access.BrooklynAccessUtils; +import brooklyn.location.cloud.CloudLocationConfig; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.collections.MutableSet; +import brooklyn.util.config.ConfigBag; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.guava.Functionals; +import brooklyn.util.guava.MaybeFunctions; +import brooklyn.util.guava.TypeTokens; +import brooklyn.util.http.HttpTool; +import brooklyn.util.http.HttpToolResponse; +import brooklyn.util.net.Urls; +import brooklyn.util.task.Tasks; +import brooklyn.util.text.Strings; +import brooklyn.util.time.Duration; + +import com.google.common.base.Charsets; +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.base.Preconditions; +import com.google.common.net.HostAndPort; +import com.google.common.net.HttpHeaders; +import com.google.common.net.MediaType; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; + +public class CouchbaseNodeImpl extends SoftwareProcessImpl implements CouchbaseNode { + + private static final Logger log = LoggerFactory.getLogger(CouchbaseNodeImpl.class); + + private volatile HttpFeed httpFeed; + + @Override + public Class<CouchbaseNodeDriver> getDriverInterface() { + return CouchbaseNodeDriver.class; + } + + @Override + public CouchbaseNodeDriver getDriver() { + return (CouchbaseNodeDriver) super.getDriver(); + } + + @Override + public void init() { + super.init(); + + subscribe(this, Attributes.SERVICE_UP, new SensorEventListener<Boolean>() { + @Override + public void onEvent(SensorEvent<Boolean> booleanSensorEvent) { + if (Boolean.TRUE.equals(booleanSensorEvent.getValue())) { + Integer webPort = getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT); + Preconditions.checkNotNull(webPort, CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT+" not set for %s; is an acceptable port available?", this); + String hostAndPort = BrooklynAccessUtils.getBrooklynAccessibleAddress(CouchbaseNodeImpl.this, webPort).toString(); + setAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_URL, URI.create(format("http://%s", hostAndPort))); + } + } + }); + + getMutableEntityType().addEffector(ADD_REPLICATION_RULE, new EffectorBody<Void>() { + @Override + public Void call(ConfigBag parameters) { + addReplicationRule(parameters); + return null; + } + }); + } + + protected Map<String, Object> obtainProvisioningFlags(@SuppressWarnings("rawtypes") MachineProvisioningLocation location) { + ConfigBag result = ConfigBag.newInstance(super.obtainProvisioningFlags(location)); + result.configure(CloudLocationConfig.OS_64_BIT, true); + return result.getAllConfig(); + } + + @Override + protected Collection<Integer> getRequiredOpenPorts() { + // TODO this creates a huge list of inbound ports; much better to define on a security group using range syntax! + int erlangRangeStart = getConfig(NODE_DATA_EXCHANGE_PORT_RANGE_START).iterator().next(); + int erlangRangeEnd = getConfig(NODE_DATA_EXCHANGE_PORT_RANGE_END).iterator().next(); + + Set<Integer> newPorts = MutableSet.<Integer>copyOf(super.getRequiredOpenPorts()); + newPorts.remove(erlangRangeStart); + newPorts.remove(erlangRangeEnd); + for (int i = erlangRangeStart; i <= erlangRangeEnd; i++) + newPorts.add(i); + return newPorts; + } + + @Override + public void serverAdd(String serverToAdd, String username, String password) { + getDriver().serverAdd(serverToAdd, username, password); + } + + @Override + public void serverAddAndRebalance(String serverToAdd, String username, String password) { + getDriver().serverAddAndRebalance(serverToAdd, username, password); + } + + @Override + public void rebalance() { + getDriver().rebalance(); + } + + protected final static Function<HttpToolResponse, JsonElement> GET_THIS_NODE_STATS = Functionals.chain( + HttpValueFunctions.jsonContents(), + JsonFunctions.walk("nodes"), + new Function<JsonElement, JsonElement>() { + @Override public JsonElement apply(JsonElement input) { + JsonArray nodes = input.getAsJsonArray(); + for (JsonElement element : nodes) { + JsonElement thisNode = element.getAsJsonObject().get("thisNode"); + if (thisNode!=null && Boolean.TRUE.equals(thisNode.getAsBoolean())) { + return element.getAsJsonObject().get("interestingStats"); + } + } + return null; + }} + ); + + protected final static <T> HttpPollConfig<T> getSensorFromNodeStat(AttributeSensor<T> sensor, String ...jsonPath) { + return new HttpPollConfig<T>(sensor) + .onSuccess(Functionals.chain(GET_THIS_NODE_STATS, + MaybeFunctions.<JsonElement>wrap(), + JsonFunctions.walkM(jsonPath), + JsonFunctions.castM(TypeTokens.getRawRawType(sensor.getTypeToken()), null))) + .onFailureOrException(Functions.<T>constant(null)); + } + + @Override + protected void postStart() { + super.postStart(); + renameServerToPublicHostname(); + } + + protected void renameServerToPublicHostname() { + // http://docs.couchbase.com/couchbase-manual-2.5/cb-install/#couchbase-getting-started-hostnames + URI apiUri = null; + try { + HostAndPort accessible = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, getAttribute(COUCHBASE_WEB_ADMIN_PORT)); + apiUri = URI.create(String.format("http://%s:%d/node/controller/rename", accessible.getHostText(), accessible.getPort())); + UsernamePasswordCredentials credentials = new UsernamePasswordCredentials(getConfig(COUCHBASE_ADMIN_USERNAME), getConfig(COUCHBASE_ADMIN_PASSWORD)); + HttpToolResponse response = HttpTool.httpPost( + // the uri is required by the HttpClientBuilder in order to set the AuthScope of the credentials + HttpTool.httpClientBuilder().uri(apiUri).credentials(credentials).build(), + apiUri, + MutableMap.of( + HttpHeaders.CONTENT_TYPE, MediaType.FORM_DATA.toString(), + HttpHeaders.ACCEPT, "*/*", + // this appears needed; without it we get org.apache.http.NoHttpResponseException !? + HttpHeaders.AUTHORIZATION, HttpTool.toBasicAuthorizationValue(credentials)), + Charsets.UTF_8.encode("hostname="+Urls.encode(accessible.getHostText())).array()); + log.debug("Renamed Couchbase server "+this+" via "+apiUri+": "+response); + if (!HttpTool.isStatusCodeHealthy(response.getResponseCode())) { + log.warn("Invalid response code, renaming "+apiUri+": "+response); + } + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + log.warn("Error renaming server, using "+apiUri+": "+e, e); + } + } + + public void connectSensors() { + super.connectSensors(); + connectServiceUpIsRunning(); + + HostAndPort hostAndPort = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, this.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT)); + httpFeed = HttpFeed.builder() + .entity(this) + .period(Duration.seconds(3)) + .baseUri("http://" + hostAndPort + "/pools/nodes/") + .credentialsIfNotNull(getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME), getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD)) + .poll(getSensorFromNodeStat(CouchbaseNode.OPS, "ops")) + .poll(getSensorFromNodeStat(CouchbaseNode.COUCH_DOCS_DATA_SIZE, "couch_docs_data_size")) + .poll(getSensorFromNodeStat(CouchbaseNode.COUCH_DOCS_ACTUAL_DISK_SIZE, "couch_docs_actual_disk_size")) + .poll(getSensorFromNodeStat(CouchbaseNode.EP_BG_FETCHED, "ep_bg_fetched")) + .poll(getSensorFromNodeStat(CouchbaseNode.MEM_USED, "mem_used")) + .poll(getSensorFromNodeStat(CouchbaseNode.COUCH_VIEWS_ACTUAL_DISK_SIZE, "couch_views_actual_disk_size")) + .poll(getSensorFromNodeStat(CouchbaseNode.CURR_ITEMS, "curr_items")) + .poll(getSensorFromNodeStat(CouchbaseNode.VB_REPLICA_CURR_ITEMS, "vb_replica_curr_items")) + .poll(getSensorFromNodeStat(CouchbaseNode.COUCH_VIEWS_DATA_SIZE, "couch_views_data_size")) + .poll(getSensorFromNodeStat(CouchbaseNode.GET_HITS, "get_hits")) + .poll(getSensorFromNodeStat(CouchbaseNode.CMD_GET, "cmd_get")) + .poll(getSensorFromNodeStat(CouchbaseNode.CURR_ITEMS_TOT, "curr_items_tot")) + .poll(new HttpPollConfig<String>(CouchbaseNode.REBALANCE_STATUS) + .onSuccess(HttpValueFunctions.jsonContents("rebalanceStatus", String.class)) + .onFailureOrException(Functions.constant("Could not retrieve"))) + .build(); + } + + public void disconnectSensors() { + super.disconnectSensors(); + disconnectServiceUpIsRunning(); + if (httpFeed != null) { + httpFeed.stop(); + } + } + + @Override + public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer bucketRamSize, Integer bucketReplica) { + if (Strings.isBlank(bucketType)) bucketType = "couchbase"; + if (bucketRamSize==null || bucketRamSize<=0) bucketRamSize = 200; + if (bucketReplica==null || bucketReplica<0) bucketReplica = 1; + + getDriver().bucketCreate(bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica); + } + + /** exposed through {@link CouchbaseNode#ADD_REPLICATION_RULE} */ + protected void addReplicationRule(ConfigBag ruleArgs) { + Object toClusterO = Preconditions.checkNotNull(ruleArgs.getStringKey("toCluster"), "toCluster must not be null"); + if (toClusterO instanceof String) { + toClusterO = getManagementContext().lookup((String)toClusterO); + } + Entity toCluster = Tasks.resolving(toClusterO, Entity.class).context(getExecutionContext()).get(); + + String fromBucket = Preconditions.checkNotNull( (String)ruleArgs.getStringKey("fromBucket"), "fromBucket must be specified" ); + + String toBucket = (String)ruleArgs.getStringKey("toBucket"); + if (toBucket==null) toBucket = fromBucket; + + if (!ruleArgs.getUnusedConfig().isEmpty()) { + throw new IllegalArgumentException("Unsupported replication rule data: "+ruleArgs.getUnusedConfig()); + } + + getDriver().addReplicationRule(toCluster, fromBucket, toBucket); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java new file mode 100644 index 0000000..73ed934 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java @@ -0,0 +1,512 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.couchbase; + +import static brooklyn.util.ssh.BashCommands.*; +import static java.lang.String.format; + +import java.net.URI; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Callable; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import org.apache.http.auth.UsernamePasswordCredentials; + +import brooklyn.entity.Entity; +import brooklyn.entity.Group; +import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.basic.ServiceStateLogic; +import brooklyn.entity.drivers.downloads.BasicDownloadRequirement; +import brooklyn.entity.drivers.downloads.DownloadProducerFromUrlAttribute; +import brooklyn.entity.software.SshEffectorTasks; +import brooklyn.event.basic.DependentConfiguration; +import brooklyn.event.feed.http.HttpValueFunctions; +import brooklyn.location.OsDetails; +import brooklyn.location.access.BrooklynAccessUtils; +import brooklyn.location.basic.SshMachineLocation; +import brooklyn.management.Task; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.http.HttpTool; +import brooklyn.util.http.HttpToolResponse; +import brooklyn.util.repeat.Repeater; +import brooklyn.util.ssh.BashCommands; +import brooklyn.util.task.DynamicTasks; +import brooklyn.util.task.TaskBuilder; +import brooklyn.util.task.TaskTags; +import brooklyn.util.task.Tasks; +import brooklyn.util.text.NaturalOrderComparator; +import brooklyn.util.text.StringEscapes.BashStringEscapes; +import brooklyn.util.text.Strings; +import brooklyn.util.time.Duration; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.net.HostAndPort; + +public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver implements CouchbaseNodeDriver { + + public CouchbaseNodeSshDriver(final CouchbaseNodeImpl entity, final SshMachineLocation machine) { + super(entity, machine); + } + + public static String couchbaseCli(String cmd) { + return "/opt/couchbase/bin/couchbase-cli " + cmd + " "; + } + + @Override + public void preInstall() { + resolver = Entities.newDownloader(this); + setExpandedInstallDir(getInstallDir()); + } + + @Override + public void install() { + //for reference https://github.com/urbandecoder/couchbase/blob/master/recipes/server.rb + //installation instructions (http://docs.couchbase.com/couchbase-manual-2.5/cb-install/#preparing-to-install) + + List<String> urls = resolver.getTargets(); + String saveAs = resolver.getFilename(); + + OsDetails osDetails = getMachine().getMachineDetails().getOsDetails(); + + if (osDetails.isLinux()) { + List<String> commands = installLinux(urls, saveAs); + //FIXME installation return error but the server is up and running. + newScript(INSTALLING) + .body.append(commands).execute(); + } else { + Tasks.markInessential(); + throw new IllegalStateException("Unsupported OS for installing Couchbase. Will continue but may fail later."); + } + } + + private List<String> installLinux(List<String> urls, String saveAs) { + + log.info("Installing " + getEntity() + " using couchbase-server-{} {}", getCommunityOrEnterprise(), getVersion()); + + String apt = chainGroup( + installPackage(MutableMap.of("apt", "python-httplib2 libssl0.9.8"), null), + sudo(format("dpkg -i %s", saveAs))); + + String yum = chainGroup( + "which yum", + // The following prevents failure on RHEL AWS nodes: + // https://forums.aws.amazon.com/thread.jspa?threadID=100509 + ok(sudo("sed -i.bk s/^enabled=1$/enabled=0/ /etc/yum/pluginconf.d/subscription-manager.conf")), + ok(sudo("yum check-update")), + sudo("yum install -y pkgconfig"), + // RHEL requires openssl version 098 + sudo("[ -f /etc/redhat-release ] && (grep -i \"red hat\" /etc/redhat-release && sudo yum install -y openssl098e) || :"), + sudo(format("rpm --install %s", saveAs))); + + String link = new DownloadProducerFromUrlAttribute().apply(new BasicDownloadRequirement(this)).getPrimaryLocations().iterator().next(); + return ImmutableList.<String>builder() + .add(INSTALL_CURL) + .addAll(Arrays.asList(INSTALL_CURL, + BashCommands.require(BashCommands.alternatives(BashCommands.simpleDownloadUrlAs(urls, saveAs), + // Referer link is required for 3.0.0; note mis-spelling is correct, as per http://en.wikipedia.org/wiki/HTTP_referer + "curl -f -L -k " + BashStringEscapes.wrapBash(link) + + " -H 'Referer: http://www.couchbase.com/downloads'" + + " -o " + saveAs), + "Could not retrieve " + saveAs + " (from " + urls.size() + " sites)", 9))) + .add(alternatives(apt, yum)) + .build(); + } + + @Override + public void customize() { + //TODO: add linux tweaks for couchbase + //http://blog.couchbase.com/often-overlooked-linux-os-tweaks + //http://blog.couchbase.com/kirk + + //turn off swappiness + //vm.swappiness=0 + //sudo echo 0 > /proc/sys/vm/swappiness + + //os page cache = 20% + + //disable THP + //sudo echo never > /sys/kernel/mm/transparent_hugepage/enabled + //sudo echo never > /sys/kernel/mm/transparent_hugepage/defrag + + //turn off transparent huge pages + //limit page cache disty bytes + //control the rate page cache is flused ... vm.dirty_* + } + + @Override + public void launch() { + String clusterPrefix = "--cluster-" + (isPreV3() ? "init-" : ""); + // in v30, the cluster arguments were changed, and it became mandatory to supply a url + password (if there is none, these are ignored) + newScript(LAUNCHING) + .body.append( + sudo("/etc/init.d/couchbase-server start"), + "for i in {0..120}\n" + + "do\n" + + " if [ $i -eq 120 ]; then echo REST API unavailable after 120 seconds, failing; exit 1; fi;\n" + + " curl -s " + String.format("http://localhost:%s", getWebPort()) + " > /dev/null && echo REST API available after $i seconds && break\n" + + " sleep 1\n" + + "done\n" + + couchbaseCli("cluster-init") + + (isPreV3() ? getCouchbaseHostnameAndPort() : getCouchbaseHostnameAndCredentials()) + + " " + clusterPrefix + "username=" + getUsername() + + " " + clusterPrefix + "password=" + getPassword() + + " " + clusterPrefix + "port=" + getWebPort() + + " " + clusterPrefix + "ramsize=" + getClusterInitRamSize()) + .execute(); + } + + @Override + public boolean isRunning() { + //TODO add a better way to check if couchbase server is running + return (newScript(CHECK_RUNNING) + .body.append(format("curl -u %s:%s http://localhost:%s/pools/nodes", getUsername(), getPassword(), getWebPort())) + .execute() == 0); + } + + @Override + public void stop() { + newScript(STOPPING) + .body.append(sudo("/etc/init.d/couchbase-server stop")) + .execute(); + } + + @Override + public String getVersion() { + return entity.getConfig(CouchbaseNode.SUGGESTED_VERSION); + } + + @Override + public String getOsTag() { + return newDownloadLinkSegmentComputer().getOsTag(); + } + + protected DownloadLinkSegmentComputer newDownloadLinkSegmentComputer() { + return new DownloadLinkSegmentComputer(getLocation().getOsDetails(), !isPreV3(), Strings.toString(getEntity())); + } + + public static class DownloadLinkSegmentComputer { + // links are: + // http://packages.couchbase.com/releases/2.2.0/couchbase-server-community_2.2.0_x86_64.rpm + // http://packages.couchbase.com/releases/2.2.0/couchbase-server-community_2.2.0_x86_64.deb + // ^^^ preV3 is _ everywhere + // http://packages.couchbase.com/releases/3.0.0/couchbase-server-community_3.0.0-ubuntu12.04_amd64.deb + // ^^^ most V3 is _${version}- + // http://packages.couchbase.com/releases/3.0.0/couchbase-server-community-3.0.0-centos6.x86_64.rpm + // ^^^ but RHEL is -${version}- + + @Nullable + private final OsDetails os; + @Nonnull + private final boolean isV3OrLater; + @Nonnull + private final String context; + @Nonnull + private final String osName; + @Nonnull + private final boolean isRpm; + @Nonnull + private final boolean is64bit; + + public DownloadLinkSegmentComputer(@Nullable OsDetails os, boolean isV3OrLater, @Nonnull String context) { + this.os = os; + this.isV3OrLater = isV3OrLater; + this.context = context; + if (os == null) { + // guess centos as RPM is sensible default + log.warn("No details known for OS of " + context + "; assuming 64-bit RPM distribution of Couchbase"); + osName = "centos"; + isRpm = true; + is64bit = true; + return; + } + osName = os.getName().toLowerCase(); + isRpm = !(osName.contains("deb") || osName.contains("ubuntu")); + is64bit = os.is64bit(); + } + + /** + * separator after the version number used to be _ but is - in 3.0 and later + */ + public String getPreVersionSeparator() { + if (!isV3OrLater) return "_"; + if (isRpm) return "-"; + return "_"; + } + + public String getOsTag() { + // couchbase only provide certain versions; if on other platforms let's suck-it-and-see + String family; + if (osName.contains("debian")) family = "debian7_"; + else if (osName.contains("ubuntu")) family = "ubuntu12.04_"; + else if (osName.contains("centos") || osName.contains("rhel") || (osName.contains("red") && osName.contains("hat"))) + family = "centos6."; + else { + log.warn("Unrecognised OS " + os + " of " + context + "; assuming RPM distribution of Couchbase"); + family = "centos6."; + } + + if (!is64bit && !isV3OrLater) { + // NB: 32-bit binaries aren't (yet?) available for v30 + log.warn("32-bit binaries for Couchbase might not be available, when deploying " + context); + } + String arch = !is64bit ? "x86" : !isRpm && isV3OrLater ? "amd64" : "x86_64"; + String fileExtension = isRpm ? ".rpm" : ".deb"; + + if (isV3OrLater) + return family + arch + fileExtension; + else + return arch + fileExtension; + } + + public String getOsTagWithPrefix() { + return (!isV3OrLater ? "_" : "-") + getOsTag(); + } + } + + @Override + public String getDownloadLinkOsTagWithPrefix() { + return newDownloadLinkSegmentComputer().getOsTagWithPrefix(); + } + + @Override + public String getDownloadLinkPreVersionSeparator() { + return newDownloadLinkSegmentComputer().getPreVersionSeparator(); + } + + private boolean isPreV3() { + return NaturalOrderComparator.INSTANCE.compare(getEntity().getConfig(CouchbaseNode.SUGGESTED_VERSION), "3.0") < 0; + } + + @Override + public String getCommunityOrEnterprise() { + Boolean isEnterprise = getEntity().getConfig(CouchbaseNode.USE_ENTERPRISE); + return isEnterprise ? "enterprise" : "community"; + } + + private String getUsername() { + return entity.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME); + } + + private String getPassword() { + return entity.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD); + } + + private String getWebPort() { + return "" + entity.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT); + } + + private String getCouchbaseHostnameAndCredentials() { + return format("-c %s:%s -u %s -p %s", getSubnetHostname(), getWebPort(), getUsername(), getPassword()); + } + + private String getCouchbaseHostnameAndPort() { + return format("-c %s:%s", getSubnetHostname(), getWebPort()); + } + + private String getClusterInitRamSize() { + return entity.getConfig(CouchbaseNode.COUCHBASE_CLUSTER_INIT_RAM_SIZE).toString(); + } + + @Override + public void rebalance() { + entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "explicitly started"); + newScript("rebalance") + .body.append( + couchbaseCli("rebalance") + getCouchbaseHostnameAndCredentials()) + .failOnNonZeroResultCode() + .execute(); + + // wait until the re-balance is started + // (if it's quick, this might miss it, but it will only block for 30s if so) + Repeater.create() + .backoff(Repeater.DEFAULT_REAL_QUICK_PERIOD, 2, Duration.millis(500)) + .limitTimeTo(Duration.THIRTY_SECONDS) + .until(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + for (HostAndPort nodeHostAndPort : getNodesHostAndPort()) { + if (isNodeRebalancing(nodeHostAndPort.toString())) { + return true; + } + } + return false; + } + } + ).run(); + + entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "waiting for completion"); + // Wait until the Couchbase node finishes the re-balancing + Task<Boolean> reBalance = TaskBuilder.<Boolean>builder() + .name("Waiting until node is rebalancing") + .body(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + return Repeater.create() + .backoff(Duration.ONE_SECOND, 1.2, Duration.TEN_SECONDS) + .limitTimeTo(Duration.FIVE_MINUTES) + .until(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + for (HostAndPort nodeHostAndPort : getNodesHostAndPort()) { + if (isNodeRebalancing(nodeHostAndPort.toString())) { + return false; + } + } + return true; + } + }) + .run(); + } + }) + .build(); + Boolean completed = DynamicTasks.queueIfPossible(reBalance) + .orSubmitAndBlock() + .andWaitForSuccess(); + if (completed) { + entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "completed"); + ServiceStateLogic.ServiceNotUpLogic.clearNotUpIndicator(getEntity(), "rebalancing"); + log.info("Rebalanced cluster via primary node {}", getEntity()); + } else { + entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "timed out"); + ServiceStateLogic.ServiceNotUpLogic.updateNotUpIndicator(getEntity(), "rebalancing", "rebalance did not complete within time limit"); + log.warn("Timeout rebalancing cluster via primary node {}", getEntity()); + } + } + + private Iterable<HostAndPort> getNodesHostAndPort() { + Group group = Iterables.getFirst(getEntity().getGroups(), null); + if (group == null) return Lists.newArrayList(); + return Iterables.transform(group.getAttribute(CouchbaseCluster.COUCHBASE_CLUSTER_UP_NODES), + new Function<Entity, HostAndPort>() { + @Override + public HostAndPort apply(Entity input) { + return BrooklynAccessUtils.getBrooklynAccessibleAddress(input, input.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT)); + } + }); + } + + private boolean isNodeRebalancing(String nodeHostAndPort) { + HttpToolResponse response = getApiResponse("http://" + nodeHostAndPort + "/pools/default/rebalanceProgress"); + if (response.getResponseCode() != 200) { + throw new IllegalStateException("failed retrieving rebalance status: " + response); + } + return !"none".equals(HttpValueFunctions.jsonContents("status", String.class).apply(response)); + } + + private HttpToolResponse getApiResponse(String uri) { + return HttpTool.httpGet(HttpTool.httpClientBuilder() + // the uri is required by the HttpClientBuilder in order to set the AuthScope of the credentials + .uri(uri) + .credentials(new UsernamePasswordCredentials(getUsername(), getPassword())) + .build(), + URI.create(uri), + ImmutableMap.<String, String>of()); + } + + @Override + public void serverAdd(String serverToAdd, String username, String password) { + newScript("serverAdd").body.append(couchbaseCli("server-add") + + getCouchbaseHostnameAndCredentials() + + " --server-add=" + BashStringEscapes.wrapBash(serverToAdd) + + " --server-add-username=" + BashStringEscapes.wrapBash(username) + + " --server-add-password=" + BashStringEscapes.wrapBash(password)) + .failOnNonZeroResultCode() + .execute(); + } + + @Override + public void serverAddAndRebalance(String serverToAdd, String username, String password) { + newScript("serverAddAndRebalance").body.append(couchbaseCli("rebalance") + + getCouchbaseHostnameAndCredentials() + + " --server-add=" + BashStringEscapes.wrapBash(serverToAdd) + + " --server-add-username=" + BashStringEscapes.wrapBash(username) + + " --server-add-password=" + BashStringEscapes.wrapBash(password)) + .failOnNonZeroResultCode() + .execute(); + entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "triggered as part of server-add"); + } + + @Override + public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer bucketRamSize, Integer bucketReplica) { + log.info("Adding bucket: {} to cluster {} primary node: {}", new Object[]{bucketName, CouchbaseClusterImpl.getClusterOrNode(getEntity()), getEntity()}); + + newScript("bucketCreate").body.append(couchbaseCli("bucket-create") + + getCouchbaseHostnameAndCredentials() + + " --bucket=" + BashStringEscapes.wrapBash(bucketName) + + " --bucket-type=" + BashStringEscapes.wrapBash(bucketType) + + " --bucket-port=" + bucketPort + + " --bucket-ramsize=" + bucketRamSize + + " --bucket-replica=" + bucketReplica) + .failOnNonZeroResultCode() + .execute(); + } + + @Override + public void addReplicationRule(Entity toCluster, String fromBucket, String toBucket) { + DynamicTasks.queue(DependentConfiguration.attributeWhenReady(toCluster, Attributes.SERVICE_UP)).getUnchecked(); + + String destName = CouchbaseClusterImpl.getClusterName(toCluster); + + log.info("Setting up XDCR for " + fromBucket + " from " + CouchbaseClusterImpl.getClusterName(getEntity()) + " (via " + getEntity() + ") " + + "to " + destName + " (" + toCluster + ")"); + + Entity destPrimaryNode = toCluster.getAttribute(CouchbaseCluster.COUCHBASE_PRIMARY_NODE); + String destHostname = destPrimaryNode.getAttribute(Attributes.HOSTNAME); + String destUsername = toCluster.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME); + String destPassword = toCluster.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD); + + // on the REST API there is mention of a 'type' 'continuous' but i don't see other refs to this + + // PROTOCOL Select REST protocol or memcached for replication. xmem indicates memcached while capi indicates REST protocol. + // looks like xmem is the default; leave off for now +// String replMode = "xmem"; + + DynamicTasks.queue(TaskTags.markInessential(SshEffectorTasks.ssh( + couchbaseCli("xdcr-setup") + + getCouchbaseHostnameAndCredentials() + + " --create" + + " --xdcr-cluster-name=" + BashStringEscapes.wrapBash(destName) + + " --xdcr-hostname=" + BashStringEscapes.wrapBash(destHostname) + + " --xdcr-username=" + BashStringEscapes.wrapBash(destUsername) + + " --xdcr-password=" + BashStringEscapes.wrapBash(destPassword) + ).summary("create xdcr destination " + destName).newTask())); + + // would be nice to auto-create bucket, but we'll need to know the parameters; the port in particular is tedious +// ((CouchbaseNode)destPrimaryNode).bucketCreate(toBucket, "couchbase", null, 0, 0); + + DynamicTasks.queue(SshEffectorTasks.ssh( + couchbaseCli("xdcr-replicate") + + getCouchbaseHostnameAndCredentials() + + " --create" + + " --xdcr-cluster-name=" + BashStringEscapes.wrapBash(destName) + + " --xdcr-from-bucket=" + BashStringEscapes.wrapBash(fromBucket) + + " --xdcr-to-bucket=" + BashStringEscapes.wrapBash(toBucket) +// + " --xdcr-replication-mode="+replMode + ).summary("configure replication for " + fromBucket + " to " + destName + ":" + toBucket).newTask()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java new file mode 100644 index 0000000..384434c --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.couchbase; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.Entity; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.SoftwareProcess; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; +import brooklyn.event.basic.PortAttributeSensorAndConfigKey; +import brooklyn.event.basic.Sensors; +import brooklyn.util.flags.SetFromFlag; + +@ImplementedBy(CouchbaseSyncGatewayImpl.class) +public interface CouchbaseSyncGateway extends SoftwareProcess { + + @SetFromFlag("version") + ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, + "1.0-beta3.1"); + + @SetFromFlag("downloadUrl") + BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>( + SoftwareProcess.DOWNLOAD_URL, "http://packages.couchbase.com/releases/couchbase-sync-gateway/1.0-beta/couchbase-sync-gateway-community_${version}_${driver.osTag}"); + + @SetFromFlag("couchbaseServer") + ConfigKey<Entity> COUCHBASE_SERVER = ConfigKeys.newConfigKey(Entity.class, "couchbaseSyncGateway.couchbaseNode", + "Couchbase server node or cluster the sync gateway connects to"); + + @SetFromFlag("serverPool") + ConfigKey<String> COUCHBASE_SERVER_POOL = ConfigKeys.newStringConfigKey("couchbaseSyncGateway.serverPool", + "Couchbase Server pool name in which to find buckets", "default"); + + @SetFromFlag("couchbaseServerBucket") + ConfigKey<String> COUCHBASE_SERVER_BUCKET = ConfigKeys.newStringConfigKey("couchbaseSyncGateway.serverBucket", + "Name of the Couchbase bucket to use", "sync_gateway"); + + @SetFromFlag("pretty") + ConfigKey<Boolean> PRETTY = ConfigKeys.newBooleanConfigKey("couchbaseSyncGateway.pretty", + "Pretty-print JSON responses. This is useful for debugging, but reduces performance.", false); + + @SetFromFlag("verbose") + ConfigKey<Boolean> VERBOSE = ConfigKeys.newBooleanConfigKey("couchbaseSyncGateway.verbose", + "Logs more information about requests.", false); + + AttributeSensor<String> COUCHBASE_SERVER_WEB_URL = Sensors.newStringSensor("couchbaseSyncGateway.serverWebUrl", + "The Url and web port of the couchbase server to connect to"); + + AttributeSensor<String> MANAGEMENT_URL = Sensors.newStringSensor("coucbaseSyncGateway.managementUrl", + "Management URL for Couchbase Sycn Gateway"); + + PortAttributeSensorAndConfigKey SYNC_REST_API_PORT = new PortAttributeSensorAndConfigKey("couchbaseSyncGateway.syncRestPort", + "Port the Sync REST API listens on", "4984"); + + PortAttributeSensorAndConfigKey ADMIN_REST_API_PORT = new PortAttributeSensorAndConfigKey("couchbaseSyncGateway.adminRestPort", + "Port the Admin REST API listens on", "4985"); + +} \ No newline at end of file
