http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java deleted file mode 100644 index 5c47fe7..0000000 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java +++ /dev/null @@ -1,597 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.couchbase; - -import static brooklyn.util.JavaGroovyEquivalents.groovyTruth; - -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import javax.annotation.Nonnull; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.config.render.RendererHints; -import brooklyn.enricher.Enrichers; -import brooklyn.entity.Entity; -import brooklyn.entity.basic.Attributes; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.basic.EntityInternal; -import brooklyn.entity.basic.ServiceStateLogic; -import brooklyn.entity.basic.SoftwareProcess; -import brooklyn.entity.effector.Effectors; -import brooklyn.entity.group.AbstractMembershipTrackingPolicy; -import brooklyn.entity.group.DynamicClusterImpl; -import brooklyn.entity.proxying.EntitySpec; -import brooklyn.entity.trait.Startable; -import brooklyn.event.AttributeSensor; -import brooklyn.event.basic.DependentConfiguration; -import brooklyn.event.feed.http.HttpFeed; -import brooklyn.event.feed.http.HttpPollConfig; -import brooklyn.event.feed.http.HttpValueFunctions; -import brooklyn.event.feed.http.JsonFunctions; -import brooklyn.location.access.BrooklynAccessUtils; -import brooklyn.policy.PolicySpec; -import brooklyn.util.collections.CollectionFunctionals; -import brooklyn.util.collections.MutableSet; -import brooklyn.util.collections.QuorumCheck; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.guava.Functionals; -import brooklyn.util.guava.IfFunctions; -import brooklyn.util.math.MathPredicates; -import brooklyn.util.task.DynamicTasks; -import brooklyn.util.task.TaskBuilder; -import brooklyn.util.task.Tasks; -import brooklyn.util.text.ByteSizeStrings; -import brooklyn.util.text.StringFunctions; -import brooklyn.util.text.Strings; -import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; - -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicates; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.common.net.HostAndPort; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; - -public class CouchbaseClusterImpl extends DynamicClusterImpl implements CouchbaseCluster { - - /* - * Refactoring required: - * - * Currently, on start() the cluster waits for an arbitrary SERVICE_UP_TIME_OUT (3 minutes) before assuming that a quorate - * number of servers are available. The servers are then added to the cluster, and a further wait period of - * DELAY_BEFORE_ADVERTISING_CLUSTER (30 seconds) is used before advertising the cluster - * - * DELAY_BEFORE_ADVERTISING_CLUSTER: It should be possible to refactor this away by adding a repeater that will poll - * the REST API of the primary node (once established) until the API indicates that the cluster is available - * - * SERVICE_UP_TIME_OUT: The refactoring of this would be more substantial. One method would be to remove the bulk of the - * logic from the start() method, and rely entirely on the membership tracking policy and the onServerPoolMemberChanged() - * method. The addition of a RUNNING sensor on the nodes would allow the cluster to determine that a node is up and - * running but has not yet been added to the cluster. The IS_CLUSTER_INITIALIZED key could be used to determine whether - * or not the cluster should be initialized, or a node simply added to an existing cluster. A repeater could be used - * in the driver's to ensure that the method does not return until the node has been fully added - * - * There is an (incomplete) first-pass at this here: https://github.com/Nakomis/incubator-brooklyn/compare/couchbase-running-sensor - * however, there have been significant changes to the cluster initialization since that work was done so it will probably - * need to be re-done - * - * Additionally, during bucket creation, a HttpPoll is used to check that the bucket has been created. This should be - * refactored to use a Repeater in CouchbaseNodeSshDriver.bucketCreate() in a similar way to the one employed in - * CouchbaseNodeSshDriver.rebalance(). Were this done, this class could simply queue the bucket creation tasks - * - */ - - private static final Logger log = LoggerFactory.getLogger(CouchbaseClusterImpl.class); - private final Object mutex = new Object[0]; - // Used to serialize bucket creation as only one bucket can be created at a time, - // so a feed is used to determine when a bucket has finished being created - private final AtomicReference<HttpFeed> resetBucketCreation = new AtomicReference<HttpFeed>(); - - public void init() { - log.info("Initializing the Couchbase cluster..."); - super.init(); - - addEnricher( - Enrichers.builder() - .transforming(COUCHBASE_CLUSTER_UP_NODES) - .from(this) - .publishing(COUCHBASE_CLUSTER_UP_NODE_ADDRESSES) - .computing(new ListOfHostAndPort()).build() ); - addEnricher( - Enrichers.builder() - .transforming(COUCHBASE_CLUSTER_UP_NODE_ADDRESSES) - .from(this) - .publishing(COUCHBASE_CLUSTER_CONNECTION_URL) - .computing( - IfFunctions.<List<String>>ifPredicate( - Predicates.compose(MathPredicates.lessThan(getConfig(CouchbaseCluster.INITIAL_QUORUM_SIZE)), - CollectionFunctionals.sizeFunction(0)) ) - .value((String)null) - .defaultApply( - Functionals.chain( - CollectionFunctionals.<String,List<String>>limit(4), - StringFunctions.joiner(","), - StringFunctions.formatter("http://%s/"))) ) - .build() ); - - Map<? extends AttributeSensor<? extends Number>, ? extends AttributeSensor<? extends Number>> enricherSetup = - ImmutableMap.<AttributeSensor<? extends Number>, AttributeSensor<? extends Number>>builder() - .put(CouchbaseNode.OPS, CouchbaseCluster.OPS_PER_NODE) - .put(CouchbaseNode.COUCH_DOCS_DATA_SIZE, CouchbaseCluster.COUCH_DOCS_DATA_SIZE_PER_NODE) - .put(CouchbaseNode.COUCH_DOCS_ACTUAL_DISK_SIZE, CouchbaseCluster.COUCH_DOCS_ACTUAL_DISK_SIZE_PER_NODE) - .put(CouchbaseNode.EP_BG_FETCHED, CouchbaseCluster.EP_BG_FETCHED_PER_NODE) - .put(CouchbaseNode.MEM_USED, CouchbaseCluster.MEM_USED_PER_NODE) - .put(CouchbaseNode.COUCH_VIEWS_ACTUAL_DISK_SIZE, CouchbaseCluster.COUCH_VIEWS_ACTUAL_DISK_SIZE_PER_NODE) - .put(CouchbaseNode.CURR_ITEMS, CouchbaseCluster.CURR_ITEMS_PER_NODE) - .put(CouchbaseNode.VB_REPLICA_CURR_ITEMS, CouchbaseCluster.VB_REPLICA_CURR_ITEMS_PER_NODE) - .put(CouchbaseNode.COUCH_VIEWS_DATA_SIZE, CouchbaseCluster.COUCH_VIEWS_DATA_SIZE_PER_NODE) - .put(CouchbaseNode.GET_HITS, CouchbaseCluster.GET_HITS_PER_NODE) - .put(CouchbaseNode.CMD_GET, CouchbaseCluster.CMD_GET_PER_NODE) - .put(CouchbaseNode.CURR_ITEMS_TOT, CouchbaseCluster.CURR_ITEMS_TOT_PER_NODE) - .build(); - - for (AttributeSensor<? extends Number> nodeSensor : enricherSetup.keySet()) { - addSummingMemberEnricher(nodeSensor); - addAveragingMemberEnricher(nodeSensor, enricherSetup.get(nodeSensor)); - } - - addEnricher(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS) - .from(IS_CLUSTER_INITIALIZED).computing( - IfFunctions.ifNotEquals(true).value("The cluster is not yet completely initialized") - .defaultValue(null).build()).build() ); - } - - private void addAveragingMemberEnricher(AttributeSensor<? extends Number> fromSensor, AttributeSensor<? extends Number> toSensor) { - addEnricher(Enrichers.builder() - .aggregating(fromSensor) - .publishing(toSensor) - .fromMembers() - .computingAverage() - .build() - ); - } - - private void addSummingMemberEnricher(AttributeSensor<? extends Number> source) { - addEnricher(Enrichers.builder() - .aggregating(source) - .publishing(source) - .fromMembers() - .computingSum() - .build() - ); - } - - @Override - protected void doStart() { - setAttribute(IS_CLUSTER_INITIALIZED, false); - - super.doStart(); - - connectSensors(); - - setAttribute(BUCKET_CREATION_IN_PROGRESS, false); - - //start timeout before adding the servers - Tasks.setBlockingDetails("Pausing while Couchbase stabilizes"); - Time.sleep(getConfig(NODES_STARTED_STABILIZATION_DELAY)); - - Optional<Set<Entity>> upNodes = Optional.<Set<Entity>>fromNullable(getAttribute(COUCHBASE_CLUSTER_UP_NODES)); - if (upNodes.isPresent() && !upNodes.get().isEmpty()) { - - Tasks.setBlockingDetails("Adding servers to Couchbase"); - - //TODO: select a new primary node if this one fails - Entity primaryNode = upNodes.get().iterator().next(); - ((EntityInternal) primaryNode).setAttribute(CouchbaseNode.IS_PRIMARY_NODE, true); - setAttribute(COUCHBASE_PRIMARY_NODE, primaryNode); - - Set<Entity> serversToAdd = MutableSet.<Entity>copyOf(getUpNodes()); - - if (serversToAdd.size() >= getQuorumSize() && serversToAdd.size() > 1) { - log.info("Number of SERVICE_UP nodes:{} in cluster:{} reached Quorum:{}, adding the servers", new Object[]{serversToAdd.size(), getId(), getQuorumSize()}); - addServers(serversToAdd); - - //wait for servers to be added to the couchbase server - try { - Tasks.setBlockingDetails("Delaying before advertising cluster up"); - Time.sleep(getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER)); - } finally { - Tasks.resetBlockingDetails(); - } - - ((CouchbaseNode)getPrimaryNode()).rebalance(); - } else { - if (getQuorumSize()>1) { - log.warn(this+" is not quorate; will likely fail later, but proceeding for now"); - } - for (Entity server: serversToAdd) { - ((EntityInternal) server).setAttribute(CouchbaseNode.IS_IN_CLUSTER, true); - } - } - - if (getConfig(CREATE_BUCKETS)!=null) { - try { - Tasks.setBlockingDetails("Creating buckets in Couchbase"); - - createBuckets(); - DependentConfiguration.waitInTaskForAttributeReady(this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false)); - - } finally { - Tasks.resetBlockingDetails(); - } - } - - if (getConfig(REPLICATION)!=null) { - try { - Tasks.setBlockingDetails("Configuring replication rules"); - - List<Map<String, Object>> replRules = getConfig(REPLICATION); - for (Map<String, Object> replRule: replRules) { - DynamicTasks.queue(Effectors.invocation(getPrimaryNode(), CouchbaseNode.ADD_REPLICATION_RULE, replRule)); - } - DynamicTasks.waitForLast(); - - } finally { - Tasks.resetBlockingDetails(); - } - } - - setAttribute(IS_CLUSTER_INITIALIZED, true); - - } else { - throw new IllegalStateException("No up nodes available after starting"); - } - } - - @Override - public void stop() { - if (resetBucketCreation.get() != null) { - resetBucketCreation.get().stop(); - } - super.stop(); - } - - protected void connectSensors() { - addPolicy(PolicySpec.create(MemberTrackingPolicy.class) - .displayName("Controller targets tracker") - .configure("group", this)); - } - - private final static class ListOfHostAndPort implements Function<Set<Entity>, List<String>> { - @Override public List<String> apply(Set<Entity> input) { - List<String> addresses = Lists.newArrayList(); - for (Entity entity : input) { - addresses.add(String.format("%s", - BrooklynAccessUtils.getBrooklynAccessibleAddress(entity, entity.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT)))); - } - return addresses; - } - } - - public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy { - @Override protected void onEntityChange(Entity member) { - ((CouchbaseClusterImpl)entity).onServerPoolMemberChanged(member); - } - - @Override protected void onEntityAdded(Entity member) { - ((CouchbaseClusterImpl)entity).onServerPoolMemberChanged(member); - } - - @Override protected void onEntityRemoved(Entity member) { - ((CouchbaseClusterImpl)entity).onServerPoolMemberChanged(member); - } - }; - - protected synchronized void onServerPoolMemberChanged(Entity member) { - if (log.isTraceEnabled()) log.trace("For {}, considering membership of {} which is in locations {}", - new Object[]{this, member, member.getLocations()}); - - //FIXME: make use of servers to be added after cluster initialization. - synchronized (mutex) { - if (belongsInServerPool(member)) { - - Optional<Set<Entity>> upNodes = Optional.fromNullable(getUpNodes()); - if (upNodes.isPresent()) { - - if (!upNodes.get().contains(member)) { - Set<Entity> newNodes = Sets.newHashSet(getUpNodes()); - newNodes.add(member); - setAttribute(COUCHBASE_CLUSTER_UP_NODES, newNodes); - - //add to set of servers to be added. - if (isClusterInitialized()) { - addServer(member); - } - } - } else { - Set<Entity> newNodes = Sets.newHashSet(); - newNodes.add(member); - setAttribute(COUCHBASE_CLUSTER_UP_NODES, newNodes); - - if (isClusterInitialized()) { - addServer(member); - } - } - } else { - Set<Entity> upNodes = getUpNodes(); - if (upNodes != null && upNodes.contains(member)) { - upNodes.remove(member); - setAttribute(COUCHBASE_CLUSTER_UP_NODES, upNodes); - log.info("Removing couchbase node {}: {}; from cluster", new Object[]{this, member}); - } - } - if (log.isTraceEnabled()) log.trace("Done {} checkEntity {}", this, member); - } - } - - protected boolean belongsInServerPool(Entity member) { - if (!groovyTruth(member.getAttribute(Startable.SERVICE_UP))) { - if (log.isTraceEnabled()) log.trace("Members of {}, checking {}, eliminating because not up", this, member); - return false; - } - if (!getMembers().contains(member)) { - if (log.isTraceEnabled()) - log.trace("Members of {}, checking {}, eliminating because not member", this, member); - - return false; - } - if (log.isTraceEnabled()) log.trace("Members of {}, checking {}, approving", this, member); - - return true; - } - - - protected EntitySpec<?> getMemberSpec() { - EntitySpec<?> result = super.getMemberSpec(); - if (result != null) return result; - return EntitySpec.create(CouchbaseNode.class); - } - - @Override - public int getQuorumSize() { - Integer quorumSize = getConfig(CouchbaseCluster.INITIAL_QUORUM_SIZE); - if (quorumSize != null && quorumSize > 0) - return quorumSize; - // by default the quorum would be floor(initial_cluster_size/2) + 1 - return (int) Math.floor(getConfig(INITIAL_SIZE) / 2) + 1; - } - - protected int getActualSize() { - return Optional.fromNullable(getAttribute(CouchbaseCluster.ACTUAL_CLUSTER_SIZE)).or(-1); - } - - private Set<Entity> getUpNodes() { - return getAttribute(COUCHBASE_CLUSTER_UP_NODES); - } - - private CouchbaseNode getPrimaryNode() { - return (CouchbaseNode) getAttribute(COUCHBASE_PRIMARY_NODE); - } - - @Override - protected void initEnrichers() { - addEnricher(Enrichers.builder().updatingMap(ServiceStateLogic.SERVICE_NOT_UP_INDICATORS) - .from(COUCHBASE_CLUSTER_UP_NODES) - .computing(new Function<Set<Entity>, Object>() { - @Override - public Object apply(Set<Entity> input) { - if (input==null) return "Couchbase up nodes not set"; - if (input.isEmpty()) return "No Couchbase up nodes"; - if (input.size() < getQuorumSize()) return "Couchbase up nodes not quorate"; - return null; - } - }).build()); - - if (config().getLocalRaw(UP_QUORUM_CHECK).isAbsent()) { - // TODO Only leaving CouchbaseQuorumCheck here in case it is contained in persisted state. - // If so, need a transformer and then to delete it - @SuppressWarnings({ "unused", "hiding" }) - @Deprecated - class CouchbaseQuorumCheck implements QuorumCheck { - @Override - public boolean isQuorate(int sizeHealthy, int totalSize) { - // check members count passed in AND the sensor - if (sizeHealthy < getQuorumSize()) return false; - return true; - } - } - config().set(UP_QUORUM_CHECK, new CouchbaseClusterImpl.CouchbaseQuorumCheck(this)); - } - super.initEnrichers(); - } - - static class CouchbaseQuorumCheck implements QuorumCheck { - private final CouchbaseCluster cluster; - CouchbaseQuorumCheck(CouchbaseCluster cluster) { - this.cluster = cluster; - } - @Override - public boolean isQuorate(int sizeHealthy, int totalSize) { - // check members count passed in AND the sensor - if (sizeHealthy < cluster.getQuorumSize()) return false; - return true; - } - } - protected void addServers(Set<Entity> serversToAdd) { - Preconditions.checkNotNull(serversToAdd); - for (Entity s : serversToAdd) { - addServerSeveralTimes(s, 12, Duration.TEN_SECONDS); - } - } - - /** try adding in a loop because we are seeing spurious port failures in AWS */ - protected void addServerSeveralTimes(Entity s, int numRetries, Duration delayOnFailure) { - try { - addServer(s); - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - if (numRetries<=0) throw Exceptions.propagate(e); - // retry once after sleep because we are getting some odd primary-change events - log.warn("Error adding "+s+" to "+this+", "+numRetries+" retries remaining, will retry after delay ("+e+")"); - Time.sleep(delayOnFailure); - addServerSeveralTimes(s, numRetries-1, delayOnFailure); - } - } - - protected void addServer(Entity serverToAdd) { - Preconditions.checkNotNull(serverToAdd); - if (serverToAdd.equals(getPrimaryNode())) { - // no need to add; but we pass it in anyway because it makes the calling logic easier - return; - } - if (!isMemberInCluster(serverToAdd)) { - HostAndPort webAdmin = HostAndPort.fromParts(serverToAdd.getAttribute(SoftwareProcess.SUBNET_HOSTNAME), - serverToAdd.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT)); - String username = serverToAdd.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME); - String password = serverToAdd.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD); - - if (isClusterInitialized()) { - Entities.invokeEffectorWithArgs(this, getPrimaryNode(), CouchbaseNode.SERVER_ADD_AND_REBALANCE, webAdmin.toString(), username, password).getUnchecked(); - } else { - Entities.invokeEffectorWithArgs(this, getPrimaryNode(), CouchbaseNode.SERVER_ADD, webAdmin.toString(), username, password).getUnchecked(); - } - //FIXME check feedback of whether the server was added. - ((EntityInternal) serverToAdd).setAttribute(CouchbaseNode.IS_IN_CLUSTER, true); - } - } - - /** finds the cluster name specified for a node or a cluster, - * using {@link CouchbaseCluster#CLUSTER_NAME} or falling back to the cluster (or node) ID. */ - public static String getClusterName(Entity node) { - String name = node.getConfig(CLUSTER_NAME); - if (!Strings.isBlank(name)) return Strings.makeValidFilename(name); - return getClusterOrNode(node).getId(); - } - - /** returns Couchbase cluster in ancestry, defaulting to the given node if none */ - @Nonnull public static Entity getClusterOrNode(Entity node) { - Iterable<CouchbaseCluster> clusterNodes = Iterables.filter(Entities.ancestors(node), CouchbaseCluster.class); - return Iterables.getFirst(clusterNodes, node); - } - - public boolean isClusterInitialized() { - return Optional.fromNullable(getAttribute(IS_CLUSTER_INITIALIZED)).or(false); - } - - public boolean isMemberInCluster(Entity e) { - return Optional.fromNullable(e.getAttribute(CouchbaseNode.IS_IN_CLUSTER)).or(false); - } - - public void createBuckets() { - //TODO: check for port conflicts if buckets are being created with a port - List<Map<String, Object>> bucketsToCreate = getConfig(CREATE_BUCKETS); - if (bucketsToCreate==null) return; - - Entity primaryNode = getPrimaryNode(); - - for (Map<String, Object> bucketMap : bucketsToCreate) { - String bucketName = bucketMap.containsKey("bucket") ? (String) bucketMap.get("bucket") : "default"; - String bucketType = bucketMap.containsKey("bucket-type") ? (String) bucketMap.get("bucket-type") : "couchbase"; - // default bucket must be on this port; other buckets can (must) specify their own (unique) port - Integer bucketPort = bucketMap.containsKey("bucket-port") ? (Integer) bucketMap.get("bucket-port") : 11211; - Integer bucketRamSize = bucketMap.containsKey("bucket-ramsize") ? (Integer) bucketMap.get("bucket-ramsize") : 100; - Integer bucketReplica = bucketMap.containsKey("bucket-replica") ? (Integer) bucketMap.get("bucket-replica") : 1; - - createBucket(primaryNode, bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica); - } - } - - public void createBucket(final Entity primaryNode, final String bucketName, final String bucketType, final Integer bucketPort, final Integer bucketRamSize, final Integer bucketReplica) { - DynamicTasks.queueIfPossible(TaskBuilder.<Void>builder().name("Creating bucket " + bucketName).body( - new Callable<Void>() { - @Override - public Void call() throws Exception { - DependentConfiguration.waitInTaskForAttributeReady(CouchbaseClusterImpl.this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false)); - if (CouchbaseClusterImpl.this.resetBucketCreation.get() != null) { - CouchbaseClusterImpl.this.resetBucketCreation.get().stop(); - } - setAttribute(CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, true); - HostAndPort hostAndPort = BrooklynAccessUtils.getBrooklynAccessibleAddress(primaryNode, primaryNode.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT)); - - CouchbaseClusterImpl.this.resetBucketCreation.set(HttpFeed.builder() - .entity(CouchbaseClusterImpl.this) - .period(500, TimeUnit.MILLISECONDS) - .baseUri(String.format("http://%s/pools/default/buckets/%s", hostAndPort, bucketName)) - .credentials(primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME), primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD)) - .poll(new HttpPollConfig<Boolean>(BUCKET_CREATION_IN_PROGRESS) - .onSuccess(Functionals.chain(HttpValueFunctions.jsonContents(), JsonFunctions.walkN("nodes"), new Function<JsonElement, Boolean>() { - @Override - public Boolean apply(JsonElement input) { - // Wait until bucket has been created on all nodes and the couchApiBase element has been published (indicating that the bucket is useable) - JsonArray servers = input.getAsJsonArray(); - if (servers.size() != CouchbaseClusterImpl.this.getMembers().size()) { - return true; - } - for (JsonElement server : servers) { - Object api = server.getAsJsonObject().get("couchApiBase"); - if (api == null || Strings.isEmpty(String.valueOf(api))) { - return true; - } - } - return false; - } - })) - .onFailureOrException(new Function<Object, Boolean>() { - @Override - public Boolean apply(Object input) { - if (input instanceof brooklyn.util.http.HttpToolResponse) { - if (((brooklyn.util.http.HttpToolResponse) input).getResponseCode() == 404) { - return true; - } - } - if (input instanceof Throwable) - Exceptions.propagate((Throwable) input); - throw new IllegalStateException("Unexpected response when creating bucket:" + input); - } - })) - .build()); - - // TODO: Bail out if bucket creation fails, to allow next bucket to proceed - Entities.invokeEffectorWithArgs(CouchbaseClusterImpl.this, primaryNode, CouchbaseNode.BUCKET_CREATE, bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica); - DependentConfiguration.waitInTaskForAttributeReady(CouchbaseClusterImpl.this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false)); - if (CouchbaseClusterImpl.this.resetBucketCreation.get() != null) { - CouchbaseClusterImpl.this.resetBucketCreation.get().stop(); - } - return null; - } - } - ).build()).orSubmitAndBlock(); - } - - static { - RendererHints.register(COUCH_DOCS_DATA_SIZE_PER_NODE, RendererHints.displayValue(ByteSizeStrings.metric())); - RendererHints.register(COUCH_DOCS_ACTUAL_DISK_SIZE_PER_NODE, RendererHints.displayValue(ByteSizeStrings.metric())); - RendererHints.register(MEM_USED_PER_NODE, RendererHints.displayValue(ByteSizeStrings.metric())); - RendererHints.register(COUCH_VIEWS_ACTUAL_DISK_SIZE_PER_NODE, RendererHints.displayValue(ByteSizeStrings.metric())); - RendererHints.register(COUCH_VIEWS_DATA_SIZE_PER_NODE, RendererHints.displayValue(ByteSizeStrings.metric())); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java deleted file mode 100644 index 727f942..0000000 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.couchbase; - -import java.net.URI; - -import org.apache.brooklyn.catalog.Catalog; -import brooklyn.config.ConfigKey; -import brooklyn.config.render.RendererHints; -import brooklyn.entity.annotation.Effector; -import brooklyn.entity.annotation.EffectorParam; -import brooklyn.entity.basic.Attributes; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.entity.basic.MethodEffector; -import brooklyn.entity.basic.SoftwareProcess; -import brooklyn.entity.effector.Effectors; -import brooklyn.entity.proxying.ImplementedBy; -import brooklyn.event.AttributeSensor; -import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; -import brooklyn.event.basic.PortAttributeSensorAndConfigKey; -import brooklyn.event.basic.Sensors; -import brooklyn.util.flags.SetFromFlag; -import brooklyn.util.text.ByteSizeStrings; - -@Catalog(name="CouchBase Node", description="Couchbase Server is an open source, distributed (shared-nothing architecture) " - + "NoSQL document-oriented database that is optimized for interactive applications.") -@ImplementedBy(CouchbaseNodeImpl.class) -public interface CouchbaseNode extends SoftwareProcess { - - @SetFromFlag("adminUsername") - ConfigKey<String> COUCHBASE_ADMIN_USERNAME = ConfigKeys.newStringConfigKey("couchbase.adminUsername", "Username for the admin user on the node", "Administrator"); - - @SetFromFlag("adminPassword") - ConfigKey<String> COUCHBASE_ADMIN_PASSWORD = ConfigKeys.newStringConfigKey("couchbase.adminPassword", "Password for the admin user on the node", "Password"); - - @SetFromFlag("version") - ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, - "3.0.0"); - - @SetFromFlag("enterprise") - ConfigKey<Boolean> USE_ENTERPRISE = ConfigKeys.newBooleanConfigKey("couchbase.enterprise.enabled", - "Whether to use Couchbase Enterprise; if false uses the community version. Defaults to true.", true); - - @SetFromFlag("downloadUrl") - BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>( - SoftwareProcess.DOWNLOAD_URL, "http://packages.couchbase.com/releases/${version}/" - + "couchbase-server-${driver.communityOrEnterprise}${driver.downloadLinkPreVersionSeparator}${version}${driver.downloadLinkOsTagWithPrefix}"); - - @SetFromFlag("clusterInitRamSize") - BasicAttributeSensorAndConfigKey<Integer> COUCHBASE_CLUSTER_INIT_RAM_SIZE = new BasicAttributeSensorAndConfigKey<Integer>( - Integer.class, "couchbase.clusterInitRamSize", "initial ram size of the cluster", 300); - - PortAttributeSensorAndConfigKey COUCHBASE_WEB_ADMIN_PORT = new PortAttributeSensorAndConfigKey("couchbase.webAdminPort", "Web Administration Port", "8091+"); - PortAttributeSensorAndConfigKey COUCHBASE_API_PORT = new PortAttributeSensorAndConfigKey("couchbase.apiPort", "Couchbase API Port", "8092+"); - PortAttributeSensorAndConfigKey COUCHBASE_INTERNAL_BUCKET_PORT = new PortAttributeSensorAndConfigKey("couchbase.internalBucketPort", "Internal Bucket Port", "11209"); - PortAttributeSensorAndConfigKey COUCHBASE_INTERNAL_EXTERNAL_BUCKET_PORT = new PortAttributeSensorAndConfigKey("couchbase.internalExternalBucketPort", "Internal/External Bucket Port", "11210"); - PortAttributeSensorAndConfigKey COUCHBASE_CLIENT_INTERFACE_PROXY = new PortAttributeSensorAndConfigKey("couchbase.clientInterfaceProxy", "Client interface (proxy)", "11211"); - PortAttributeSensorAndConfigKey COUCHBASE_INCOMING_SSL_PROXY = new PortAttributeSensorAndConfigKey("couchbase.incomingSslProxy", "Incoming SSL Proxy", "11214"); - PortAttributeSensorAndConfigKey COUCHBASE_INTERNAL_OUTGOING_SSL_PROXY = new PortAttributeSensorAndConfigKey("couchbase.internalOutgoingSslProxy", "Internal Outgoing SSL Proxy", "11215"); - PortAttributeSensorAndConfigKey COUCHBASE_REST_HTTPS_FOR_SSL = new PortAttributeSensorAndConfigKey("couchbase.internalRestHttpsForSsl", "Internal REST HTTPS for SSL", "18091"); - PortAttributeSensorAndConfigKey COUCHBASE_CAPI_HTTPS_FOR_SSL = new PortAttributeSensorAndConfigKey("couchbase.internalCapiHttpsForSsl", "Internal CAPI HTTPS for SSL", "18092"); - PortAttributeSensorAndConfigKey ERLANG_PORT_MAPPER = new PortAttributeSensorAndConfigKey("couchbase.erlangPortMapper", "Erlang Port Mapper Daemon Listener Port (epmd)", "4369"); - PortAttributeSensorAndConfigKey NODE_DATA_EXCHANGE_PORT_RANGE_START = new PortAttributeSensorAndConfigKey("couchbase.nodeDataExchangePortRangeStart", "Node data exchange Port Range Start", "21100+"); - PortAttributeSensorAndConfigKey NODE_DATA_EXCHANGE_PORT_RANGE_END = new PortAttributeSensorAndConfigKey("couchbase.nodeDataExchangePortRangeEnd", "Node data exchange Port Range End", "21199+"); - - AttributeSensor<Boolean> IS_PRIMARY_NODE = Sensors.newBooleanSensor("couchbase.isPrimaryNode", "flag to determine if the current couchbase node is the primary node for the cluster"); - AttributeSensor<Boolean> IS_IN_CLUSTER = Sensors.newBooleanSensor("couchbase.isInCluster", "flag to determine if the current couchbase node has been added to a cluster, " - + "including being the first / primary node"); - AttributeSensor<URI> COUCHBASE_WEB_ADMIN_URL = Attributes.MAIN_URI; - - // Interesting stats - AttributeSensor<Double> OPS = Sensors.newDoubleSensor("couchbase.stats.ops", - "Retrieved from pools/nodes/<current node>/interestingStats/ops"); - AttributeSensor<Long> COUCH_DOCS_DATA_SIZE = Sensors.newLongSensor("couchbase.stats.couch.docs.data.size", - "Retrieved from pools/nodes/<current node>/interestingStats/couch_docs_data_size"); - AttributeSensor<Long> COUCH_DOCS_ACTUAL_DISK_SIZE = Sensors.newLongSensor("couchbase.stats.couch.docs.actual.disk.size", - "Retrieved from pools/nodes/<current node>/interestingStats/couch_docs_actual_disk_size"); - AttributeSensor<Long> EP_BG_FETCHED = Sensors.newLongSensor("couchbase.stats.ep.bg.fetched", - "Retrieved from pools/nodes/<current node>/interestingStats/ep_bg_fetched"); - AttributeSensor<Long> MEM_USED = Sensors.newLongSensor("couchbase.stats.mem.used", - "Retrieved from pools/nodes/<current node>/interestingStats/mem_used"); - AttributeSensor<Long> COUCH_VIEWS_ACTUAL_DISK_SIZE = Sensors.newLongSensor("couchbase.stats.couch.views.actual.disk.size", - "Retrieved from pools/nodes/<current node>/interestingStats/couch_views_actual_disk_size"); - AttributeSensor<Long> CURR_ITEMS = Sensors.newLongSensor("couchbase.stats.curr.items", - "Retrieved from pools/nodes/<current node>/interestingStats/curr_items"); - AttributeSensor<Long> VB_REPLICA_CURR_ITEMS = Sensors.newLongSensor("couchbase.stats.vb.replica.curr.items", - "Retrieved from pools/nodes/<current node>/interestingStats/vb_replica_curr_items"); - AttributeSensor<Long> COUCH_VIEWS_DATA_SIZE = Sensors.newLongSensor("couchbase.stats.couch.views.data.size", - "Retrieved from pools/nodes/<current node>/interestingStats/couch_views_data_size"); - AttributeSensor<Long> GET_HITS = Sensors.newLongSensor("couchbase.stats.get.hits", - "Retrieved from pools/nodes/<current node>/interestingStats/get_hits"); - AttributeSensor<Double> CMD_GET = Sensors.newDoubleSensor("couchbase.stats.cmd.get", - "Retrieved from pools/nodes/<current node>/interestingStats/cmd_get"); - AttributeSensor<Long> CURR_ITEMS_TOT = Sensors.newLongSensor("couchbase.stats.curr.items.tot", - "Retrieved from pools/nodes/<current node>/interestingStats/curr_items_tot"); - AttributeSensor<String> REBALANCE_STATUS = Sensors.newStringSensor("couchbase.rebalance.status", - "Displays the current rebalance status from pools/nodes/rebalanceStatus"); - - class MainUri { - public static final AttributeSensor<URI> MAIN_URI = Attributes.MAIN_URI; - - static { - // ROOT_URL does not need init because it refers to something already initialized - RendererHints.register(COUCHBASE_WEB_ADMIN_URL, RendererHints.namedActionWithUrl()); - - RendererHints.register(COUCH_DOCS_DATA_SIZE, RendererHints.displayValue(ByteSizeStrings.metric())); - RendererHints.register(COUCH_DOCS_ACTUAL_DISK_SIZE, RendererHints.displayValue(ByteSizeStrings.metric())); - RendererHints.register(MEM_USED, RendererHints.displayValue(ByteSizeStrings.metric())); - RendererHints.register(COUCH_VIEWS_ACTUAL_DISK_SIZE, RendererHints.displayValue(ByteSizeStrings.metric())); - RendererHints.register(COUCH_VIEWS_DATA_SIZE, RendererHints.displayValue(ByteSizeStrings.metric())); - } - } - - // this long-winded reference is done just to trigger the initialization above - AttributeSensor<URI> MAIN_URI = MainUri.MAIN_URI; - - MethodEffector<Void> SERVER_ADD = new MethodEffector<Void>(CouchbaseNode.class, "serverAdd"); - MethodEffector<Void> SERVER_ADD_AND_REBALANCE = new MethodEffector<Void>(CouchbaseNode.class, "serverAddAndRebalance"); - MethodEffector<Void> REBALANCE = new MethodEffector<Void>(CouchbaseNode.class, "rebalance"); - MethodEffector<Void> BUCKET_CREATE = new MethodEffector<Void>(CouchbaseNode.class, "bucketCreate"); - brooklyn.entity.Effector<Void> ADD_REPLICATION_RULE = Effectors.effector(Void.class, "addReplicationRule") - .description("Adds a replication rule from the indicated bucket on the cluster where this node is located " - + "to the indicated cluster and optional destination bucket") - .parameter(String.class, "fromBucket", "Bucket to be replicated") - .parameter(Object.class, "toCluster", "Entity (or ID) of the cluster to which this should replicate") - .parameter(String.class, "toBucket", "Destination bucket for replication in the toCluster, defaulting to the same as the fromBucket") - .buildAbstract(); - - @Effector(description = "add a server to a cluster") - public void serverAdd(@EffectorParam(name = "serverHostname") String serverToAdd, @EffectorParam(name = "username") String username, @EffectorParam(name = "password") String password); - - @Effector(description = "add a server to a cluster, and immediately rebalances") - public void serverAddAndRebalance(@EffectorParam(name = "serverHostname") String serverToAdd, @EffectorParam(name = "username") String username, @EffectorParam(name = "password") String password); - - @Effector(description = "rebalance the couchbase cluster") - public void rebalance(); - - @Effector(description = "create a new bucket") - public void bucketCreate(@EffectorParam(name = "bucketName") String bucketName, @EffectorParam(name = "bucketType") String bucketType, - @EffectorParam(name = "bucketPort") Integer bucketPort, @EffectorParam(name = "bucketRamSize") Integer bucketRamSize, - @EffectorParam(name = "bucketReplica") Integer bucketReplica); - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java deleted file mode 100644 index 37f2f74..0000000 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.couchbase; - -import brooklyn.entity.Entity; -import brooklyn.entity.basic.SoftwareProcessDriver; - -public interface CouchbaseNodeDriver extends SoftwareProcessDriver { - public String getOsTag(); - public String getDownloadLinkPreVersionSeparator(); - public String getDownloadLinkOsTagWithPrefix(); - - public String getCommunityOrEnterprise(); - - public void serverAdd(String serverToAdd, String username, String password); - - public void rebalance(); - - public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer bucketRamSize, Integer bucketReplica); - - public void serverAddAndRebalance(String serverToAdd, String username, String password); - - public void addReplicationRule(Entity toCluster, String fromBucket, String toBucket); - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java deleted file mode 100644 index d7439ca..0000000 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.couchbase; - -import static java.lang.String.format; - -import java.net.URI; -import java.util.Collection; -import java.util.Map; -import java.util.Set; - -import org.apache.http.auth.UsernamePasswordCredentials; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.Entity; -import brooklyn.entity.basic.Attributes; -import brooklyn.entity.basic.SoftwareProcessImpl; -import brooklyn.entity.effector.EffectorBody; -import brooklyn.event.AttributeSensor; -import brooklyn.event.SensorEvent; -import brooklyn.event.SensorEventListener; -import brooklyn.event.feed.http.HttpFeed; -import brooklyn.event.feed.http.HttpPollConfig; -import brooklyn.event.feed.http.HttpValueFunctions; -import brooklyn.event.feed.http.JsonFunctions; -import brooklyn.location.MachineProvisioningLocation; -import brooklyn.location.access.BrooklynAccessUtils; -import brooklyn.location.cloud.CloudLocationConfig; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.collections.MutableSet; -import brooklyn.util.config.ConfigBag; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.guava.Functionals; -import brooklyn.util.guava.MaybeFunctions; -import brooklyn.util.guava.TypeTokens; -import brooklyn.util.http.HttpTool; -import brooklyn.util.http.HttpToolResponse; -import brooklyn.util.net.Urls; -import brooklyn.util.task.Tasks; -import brooklyn.util.text.Strings; -import brooklyn.util.time.Duration; - -import com.google.common.base.Charsets; -import com.google.common.base.Function; -import com.google.common.base.Functions; -import com.google.common.base.Preconditions; -import com.google.common.net.HostAndPort; -import com.google.common.net.HttpHeaders; -import com.google.common.net.MediaType; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; - -public class CouchbaseNodeImpl extends SoftwareProcessImpl implements CouchbaseNode { - - private static final Logger log = LoggerFactory.getLogger(CouchbaseNodeImpl.class); - - private volatile HttpFeed httpFeed; - - @Override - public Class<CouchbaseNodeDriver> getDriverInterface() { - return CouchbaseNodeDriver.class; - } - - @Override - public CouchbaseNodeDriver getDriver() { - return (CouchbaseNodeDriver) super.getDriver(); - } - - @Override - public void init() { - super.init(); - - subscribe(this, Attributes.SERVICE_UP, new SensorEventListener<Boolean>() { - @Override - public void onEvent(SensorEvent<Boolean> booleanSensorEvent) { - if (Boolean.TRUE.equals(booleanSensorEvent.getValue())) { - Integer webPort = getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT); - Preconditions.checkNotNull(webPort, CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT+" not set for %s; is an acceptable port available?", this); - String hostAndPort = BrooklynAccessUtils.getBrooklynAccessibleAddress(CouchbaseNodeImpl.this, webPort).toString(); - setAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_URL, URI.create(format("http://%s", hostAndPort))); - } - } - }); - - getMutableEntityType().addEffector(ADD_REPLICATION_RULE, new EffectorBody<Void>() { - @Override - public Void call(ConfigBag parameters) { - addReplicationRule(parameters); - return null; - } - }); - } - - protected Map<String, Object> obtainProvisioningFlags(@SuppressWarnings("rawtypes") MachineProvisioningLocation location) { - ConfigBag result = ConfigBag.newInstance(super.obtainProvisioningFlags(location)); - result.configure(CloudLocationConfig.OS_64_BIT, true); - return result.getAllConfig(); - } - - @Override - protected Collection<Integer> getRequiredOpenPorts() { - // TODO this creates a huge list of inbound ports; much better to define on a security group using range syntax! - int erlangRangeStart = getConfig(NODE_DATA_EXCHANGE_PORT_RANGE_START).iterator().next(); - int erlangRangeEnd = getConfig(NODE_DATA_EXCHANGE_PORT_RANGE_END).iterator().next(); - - Set<Integer> newPorts = MutableSet.<Integer>copyOf(super.getRequiredOpenPorts()); - newPorts.remove(erlangRangeStart); - newPorts.remove(erlangRangeEnd); - for (int i = erlangRangeStart; i <= erlangRangeEnd; i++) - newPorts.add(i); - return newPorts; - } - - @Override - public void serverAdd(String serverToAdd, String username, String password) { - getDriver().serverAdd(serverToAdd, username, password); - } - - @Override - public void serverAddAndRebalance(String serverToAdd, String username, String password) { - getDriver().serverAddAndRebalance(serverToAdd, username, password); - } - - @Override - public void rebalance() { - getDriver().rebalance(); - } - - protected final static Function<HttpToolResponse, JsonElement> GET_THIS_NODE_STATS = Functionals.chain( - HttpValueFunctions.jsonContents(), - JsonFunctions.walk("nodes"), - new Function<JsonElement, JsonElement>() { - @Override public JsonElement apply(JsonElement input) { - JsonArray nodes = input.getAsJsonArray(); - for (JsonElement element : nodes) { - JsonElement thisNode = element.getAsJsonObject().get("thisNode"); - if (thisNode!=null && Boolean.TRUE.equals(thisNode.getAsBoolean())) { - return element.getAsJsonObject().get("interestingStats"); - } - } - return null; - }} - ); - - protected final static <T> HttpPollConfig<T> getSensorFromNodeStat(AttributeSensor<T> sensor, String ...jsonPath) { - return new HttpPollConfig<T>(sensor) - .onSuccess(Functionals.chain(GET_THIS_NODE_STATS, - MaybeFunctions.<JsonElement>wrap(), - JsonFunctions.walkM(jsonPath), - JsonFunctions.castM(TypeTokens.getRawRawType(sensor.getTypeToken()), null))) - .onFailureOrException(Functions.<T>constant(null)); - } - - @Override - protected void postStart() { - super.postStart(); - renameServerToPublicHostname(); - } - - protected void renameServerToPublicHostname() { - // http://docs.couchbase.com/couchbase-manual-2.5/cb-install/#couchbase-getting-started-hostnames - URI apiUri = null; - try { - HostAndPort accessible = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, getAttribute(COUCHBASE_WEB_ADMIN_PORT)); - apiUri = URI.create(String.format("http://%s:%d/node/controller/rename", accessible.getHostText(), accessible.getPort())); - UsernamePasswordCredentials credentials = new UsernamePasswordCredentials(getConfig(COUCHBASE_ADMIN_USERNAME), getConfig(COUCHBASE_ADMIN_PASSWORD)); - HttpToolResponse response = HttpTool.httpPost( - // the uri is required by the HttpClientBuilder in order to set the AuthScope of the credentials - HttpTool.httpClientBuilder().uri(apiUri).credentials(credentials).build(), - apiUri, - MutableMap.of( - HttpHeaders.CONTENT_TYPE, MediaType.FORM_DATA.toString(), - HttpHeaders.ACCEPT, "*/*", - // this appears needed; without it we get org.apache.http.NoHttpResponseException !? - HttpHeaders.AUTHORIZATION, HttpTool.toBasicAuthorizationValue(credentials)), - Charsets.UTF_8.encode("hostname="+Urls.encode(accessible.getHostText())).array()); - log.debug("Renamed Couchbase server "+this+" via "+apiUri+": "+response); - if (!HttpTool.isStatusCodeHealthy(response.getResponseCode())) { - log.warn("Invalid response code, renaming "+apiUri+": "+response); - } - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - log.warn("Error renaming server, using "+apiUri+": "+e, e); - } - } - - public void connectSensors() { - super.connectSensors(); - connectServiceUpIsRunning(); - - HostAndPort hostAndPort = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, this.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT)); - httpFeed = HttpFeed.builder() - .entity(this) - .period(Duration.seconds(3)) - .baseUri("http://" + hostAndPort + "/pools/nodes/") - .credentialsIfNotNull(getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME), getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD)) - .poll(getSensorFromNodeStat(CouchbaseNode.OPS, "ops")) - .poll(getSensorFromNodeStat(CouchbaseNode.COUCH_DOCS_DATA_SIZE, "couch_docs_data_size")) - .poll(getSensorFromNodeStat(CouchbaseNode.COUCH_DOCS_ACTUAL_DISK_SIZE, "couch_docs_actual_disk_size")) - .poll(getSensorFromNodeStat(CouchbaseNode.EP_BG_FETCHED, "ep_bg_fetched")) - .poll(getSensorFromNodeStat(CouchbaseNode.MEM_USED, "mem_used")) - .poll(getSensorFromNodeStat(CouchbaseNode.COUCH_VIEWS_ACTUAL_DISK_SIZE, "couch_views_actual_disk_size")) - .poll(getSensorFromNodeStat(CouchbaseNode.CURR_ITEMS, "curr_items")) - .poll(getSensorFromNodeStat(CouchbaseNode.VB_REPLICA_CURR_ITEMS, "vb_replica_curr_items")) - .poll(getSensorFromNodeStat(CouchbaseNode.COUCH_VIEWS_DATA_SIZE, "couch_views_data_size")) - .poll(getSensorFromNodeStat(CouchbaseNode.GET_HITS, "get_hits")) - .poll(getSensorFromNodeStat(CouchbaseNode.CMD_GET, "cmd_get")) - .poll(getSensorFromNodeStat(CouchbaseNode.CURR_ITEMS_TOT, "curr_items_tot")) - .poll(new HttpPollConfig<String>(CouchbaseNode.REBALANCE_STATUS) - .onSuccess(HttpValueFunctions.jsonContents("rebalanceStatus", String.class)) - .onFailureOrException(Functions.constant("Could not retrieve"))) - .build(); - } - - public void disconnectSensors() { - super.disconnectSensors(); - disconnectServiceUpIsRunning(); - if (httpFeed != null) { - httpFeed.stop(); - } - } - - @Override - public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer bucketRamSize, Integer bucketReplica) { - if (Strings.isBlank(bucketType)) bucketType = "couchbase"; - if (bucketRamSize==null || bucketRamSize<=0) bucketRamSize = 200; - if (bucketReplica==null || bucketReplica<0) bucketReplica = 1; - - getDriver().bucketCreate(bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica); - } - - /** exposed through {@link CouchbaseNode#ADD_REPLICATION_RULE} */ - protected void addReplicationRule(ConfigBag ruleArgs) { - Object toClusterO = Preconditions.checkNotNull(ruleArgs.getStringKey("toCluster"), "toCluster must not be null"); - if (toClusterO instanceof String) { - toClusterO = getManagementContext().lookup((String)toClusterO); - } - Entity toCluster = Tasks.resolving(toClusterO, Entity.class).context(getExecutionContext()).get(); - - String fromBucket = Preconditions.checkNotNull( (String)ruleArgs.getStringKey("fromBucket"), "fromBucket must be specified" ); - - String toBucket = (String)ruleArgs.getStringKey("toBucket"); - if (toBucket==null) toBucket = fromBucket; - - if (!ruleArgs.getUnusedConfig().isEmpty()) { - throw new IllegalArgumentException("Unsupported replication rule data: "+ruleArgs.getUnusedConfig()); - } - - getDriver().addReplicationRule(toCluster, fromBucket, toBucket); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java deleted file mode 100644 index 6dd97d6..0000000 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java +++ /dev/null @@ -1,512 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.couchbase; - -import static brooklyn.util.ssh.BashCommands.*; -import static java.lang.String.format; - -import java.net.URI; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.Callable; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -import org.apache.http.auth.UsernamePasswordCredentials; - -import brooklyn.entity.Entity; -import brooklyn.entity.Group; -import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver; -import brooklyn.entity.basic.Attributes; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.basic.ServiceStateLogic; -import brooklyn.entity.drivers.downloads.BasicDownloadRequirement; -import brooklyn.entity.drivers.downloads.DownloadProducerFromUrlAttribute; -import brooklyn.entity.software.SshEffectorTasks; -import brooklyn.event.basic.DependentConfiguration; -import brooklyn.event.feed.http.HttpValueFunctions; -import brooklyn.location.OsDetails; -import brooklyn.location.access.BrooklynAccessUtils; -import brooklyn.location.basic.SshMachineLocation; -import brooklyn.management.Task; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.http.HttpTool; -import brooklyn.util.http.HttpToolResponse; -import brooklyn.util.repeat.Repeater; -import brooklyn.util.ssh.BashCommands; -import brooklyn.util.task.DynamicTasks; -import brooklyn.util.task.TaskBuilder; -import brooklyn.util.task.TaskTags; -import brooklyn.util.task.Tasks; -import brooklyn.util.text.NaturalOrderComparator; -import brooklyn.util.text.StringEscapes.BashStringEscapes; -import brooklyn.util.text.Strings; -import brooklyn.util.time.Duration; - -import com.google.common.base.Function; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.net.HostAndPort; - -public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver implements CouchbaseNodeDriver { - - public CouchbaseNodeSshDriver(final CouchbaseNodeImpl entity, final SshMachineLocation machine) { - super(entity, machine); - } - - public static String couchbaseCli(String cmd) { - return "/opt/couchbase/bin/couchbase-cli " + cmd + " "; - } - - @Override - public void preInstall() { - resolver = Entities.newDownloader(this); - setExpandedInstallDir(getInstallDir()); - } - - @Override - public void install() { - //for reference https://github.com/urbandecoder/couchbase/blob/master/recipes/server.rb - //installation instructions (http://docs.couchbase.com/couchbase-manual-2.5/cb-install/#preparing-to-install) - - List<String> urls = resolver.getTargets(); - String saveAs = resolver.getFilename(); - - OsDetails osDetails = getMachine().getMachineDetails().getOsDetails(); - - if (osDetails.isLinux()) { - List<String> commands = installLinux(urls, saveAs); - //FIXME installation return error but the server is up and running. - newScript(INSTALLING) - .body.append(commands).execute(); - } else { - Tasks.markInessential(); - throw new IllegalStateException("Unsupported OS for installing Couchbase. Will continue but may fail later."); - } - } - - private List<String> installLinux(List<String> urls, String saveAs) { - - log.info("Installing " + getEntity() + " using couchbase-server-{} {}", getCommunityOrEnterprise(), getVersion()); - - String apt = chainGroup( - installPackage(MutableMap.of("apt", "python-httplib2 libssl0.9.8"), null), - sudo(format("dpkg -i %s", saveAs))); - - String yum = chainGroup( - "which yum", - // The following prevents failure on RHEL AWS nodes: - // https://forums.aws.amazon.com/thread.jspa?threadID=100509 - ok(sudo("sed -i.bk s/^enabled=1$/enabled=0/ /etc/yum/pluginconf.d/subscription-manager.conf")), - ok(sudo("yum check-update")), - sudo("yum install -y pkgconfig"), - // RHEL requires openssl version 098 - sudo("[ -f /etc/redhat-release ] && (grep -i \"red hat\" /etc/redhat-release && sudo yum install -y openssl098e) || :"), - sudo(format("rpm --install %s", saveAs))); - - String link = new DownloadProducerFromUrlAttribute().apply(new BasicDownloadRequirement(this)).getPrimaryLocations().iterator().next(); - return ImmutableList.<String>builder() - .add(INSTALL_CURL) - .addAll(Arrays.asList(INSTALL_CURL, - BashCommands.require(BashCommands.alternatives(BashCommands.simpleDownloadUrlAs(urls, saveAs), - // Referer link is required for 3.0.0; note mis-spelling is correct, as per http://en.wikipedia.org/wiki/HTTP_referer - "curl -f -L -k " + BashStringEscapes.wrapBash(link) - + " -H 'Referer: http://www.couchbase.com/downloads'" - + " -o " + saveAs), - "Could not retrieve " + saveAs + " (from " + urls.size() + " sites)", 9))) - .add(alternatives(apt, yum)) - .build(); - } - - @Override - public void customize() { - //TODO: add linux tweaks for couchbase - //http://blog.couchbase.com/often-overlooked-linux-os-tweaks - //http://blog.couchbase.com/kirk - - //turn off swappiness - //vm.swappiness=0 - //sudo echo 0 > /proc/sys/vm/swappiness - - //os page cache = 20% - - //disable THP - //sudo echo never > /sys/kernel/mm/transparent_hugepage/enabled - //sudo echo never > /sys/kernel/mm/transparent_hugepage/defrag - - //turn off transparent huge pages - //limit page cache disty bytes - //control the rate page cache is flused ... vm.dirty_* - } - - @Override - public void launch() { - String clusterPrefix = "--cluster-" + (isPreV3() ? "init-" : ""); - // in v30, the cluster arguments were changed, and it became mandatory to supply a url + password (if there is none, these are ignored) - newScript(LAUNCHING) - .body.append( - sudo("/etc/init.d/couchbase-server start"), - "for i in {0..120}\n" + - "do\n" + - " if [ $i -eq 120 ]; then echo REST API unavailable after 120 seconds, failing; exit 1; fi;\n" + - " curl -s " + String.format("http://localhost:%s", getWebPort()) + " > /dev/null && echo REST API available after $i seconds && break\n" + - " sleep 1\n" + - "done\n" + - couchbaseCli("cluster-init") + - (isPreV3() ? getCouchbaseHostnameAndPort() : getCouchbaseHostnameAndCredentials()) + - " " + clusterPrefix + "username=" + getUsername() + - " " + clusterPrefix + "password=" + getPassword() + - " " + clusterPrefix + "port=" + getWebPort() + - " " + clusterPrefix + "ramsize=" + getClusterInitRamSize()) - .execute(); - } - - @Override - public boolean isRunning() { - //TODO add a better way to check if couchbase server is running - return (newScript(CHECK_RUNNING) - .body.append(format("curl -u %s:%s http://localhost:%s/pools/nodes", getUsername(), getPassword(), getWebPort())) - .execute() == 0); - } - - @Override - public void stop() { - newScript(STOPPING) - .body.append(sudo("/etc/init.d/couchbase-server stop")) - .execute(); - } - - @Override - public String getVersion() { - return entity.getConfig(CouchbaseNode.SUGGESTED_VERSION); - } - - @Override - public String getOsTag() { - return newDownloadLinkSegmentComputer().getOsTag(); - } - - protected DownloadLinkSegmentComputer newDownloadLinkSegmentComputer() { - return new DownloadLinkSegmentComputer(getLocation().getOsDetails(), !isPreV3(), Strings.toString(getEntity())); - } - - public static class DownloadLinkSegmentComputer { - // links are: - // http://packages.couchbase.com/releases/2.2.0/couchbase-server-community_2.2.0_x86_64.rpm - // http://packages.couchbase.com/releases/2.2.0/couchbase-server-community_2.2.0_x86_64.deb - // ^^^ preV3 is _ everywhere - // http://packages.couchbase.com/releases/3.0.0/couchbase-server-community_3.0.0-ubuntu12.04_amd64.deb - // ^^^ most V3 is _${version}- - // http://packages.couchbase.com/releases/3.0.0/couchbase-server-community-3.0.0-centos6.x86_64.rpm - // ^^^ but RHEL is -${version}- - - @Nullable - private final OsDetails os; - @Nonnull - private final boolean isV3OrLater; - @Nonnull - private final String context; - @Nonnull - private final String osName; - @Nonnull - private final boolean isRpm; - @Nonnull - private final boolean is64bit; - - public DownloadLinkSegmentComputer(@Nullable OsDetails os, boolean isV3OrLater, @Nonnull String context) { - this.os = os; - this.isV3OrLater = isV3OrLater; - this.context = context; - if (os == null) { - // guess centos as RPM is sensible default - log.warn("No details known for OS of " + context + "; assuming 64-bit RPM distribution of Couchbase"); - osName = "centos"; - isRpm = true; - is64bit = true; - return; - } - osName = os.getName().toLowerCase(); - isRpm = !(osName.contains("deb") || osName.contains("ubuntu")); - is64bit = os.is64bit(); - } - - /** - * separator after the version number used to be _ but is - in 3.0 and later - */ - public String getPreVersionSeparator() { - if (!isV3OrLater) return "_"; - if (isRpm) return "-"; - return "_"; - } - - public String getOsTag() { - // couchbase only provide certain versions; if on other platforms let's suck-it-and-see - String family; - if (osName.contains("debian")) family = "debian7_"; - else if (osName.contains("ubuntu")) family = "ubuntu12.04_"; - else if (osName.contains("centos") || osName.contains("rhel") || (osName.contains("red") && osName.contains("hat"))) - family = "centos6."; - else { - log.warn("Unrecognised OS " + os + " of " + context + "; assuming RPM distribution of Couchbase"); - family = "centos6."; - } - - if (!is64bit && !isV3OrLater) { - // NB: 32-bit binaries aren't (yet?) available for v30 - log.warn("32-bit binaries for Couchbase might not be available, when deploying " + context); - } - String arch = !is64bit ? "x86" : !isRpm && isV3OrLater ? "amd64" : "x86_64"; - String fileExtension = isRpm ? ".rpm" : ".deb"; - - if (isV3OrLater) - return family + arch + fileExtension; - else - return arch + fileExtension; - } - - public String getOsTagWithPrefix() { - return (!isV3OrLater ? "_" : "-") + getOsTag(); - } - } - - @Override - public String getDownloadLinkOsTagWithPrefix() { - return newDownloadLinkSegmentComputer().getOsTagWithPrefix(); - } - - @Override - public String getDownloadLinkPreVersionSeparator() { - return newDownloadLinkSegmentComputer().getPreVersionSeparator(); - } - - private boolean isPreV3() { - return NaturalOrderComparator.INSTANCE.compare(getEntity().getConfig(CouchbaseNode.SUGGESTED_VERSION), "3.0") < 0; - } - - @Override - public String getCommunityOrEnterprise() { - Boolean isEnterprise = getEntity().getConfig(CouchbaseNode.USE_ENTERPRISE); - return isEnterprise ? "enterprise" : "community"; - } - - private String getUsername() { - return entity.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME); - } - - private String getPassword() { - return entity.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD); - } - - private String getWebPort() { - return "" + entity.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT); - } - - private String getCouchbaseHostnameAndCredentials() { - return format("-c %s:%s -u %s -p %s", getSubnetHostname(), getWebPort(), getUsername(), getPassword()); - } - - private String getCouchbaseHostnameAndPort() { - return format("-c %s:%s", getSubnetHostname(), getWebPort()); - } - - private String getClusterInitRamSize() { - return entity.getConfig(CouchbaseNode.COUCHBASE_CLUSTER_INIT_RAM_SIZE).toString(); - } - - @Override - public void rebalance() { - entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "explicitly started"); - newScript("rebalance") - .body.append( - couchbaseCli("rebalance") + getCouchbaseHostnameAndCredentials()) - .failOnNonZeroResultCode() - .execute(); - - // wait until the re-balance is started - // (if it's quick, this might miss it, but it will only block for 30s if so) - Repeater.create() - .backoff(Repeater.DEFAULT_REAL_QUICK_PERIOD, 2, Duration.millis(500)) - .limitTimeTo(Duration.THIRTY_SECONDS) - .until(new Callable<Boolean>() { - @Override - public Boolean call() throws Exception { - for (HostAndPort nodeHostAndPort : getNodesHostAndPort()) { - if (isNodeRebalancing(nodeHostAndPort.toString())) { - return true; - } - } - return false; - } - } - ).run(); - - entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "waiting for completion"); - // Wait until the Couchbase node finishes the re-balancing - Task<Boolean> reBalance = TaskBuilder.<Boolean>builder() - .name("Waiting until node is rebalancing") - .body(new Callable<Boolean>() { - @Override - public Boolean call() throws Exception { - return Repeater.create() - .backoff(Duration.ONE_SECOND, 1.2, Duration.TEN_SECONDS) - .limitTimeTo(Duration.FIVE_MINUTES) - .until(new Callable<Boolean>() { - @Override - public Boolean call() throws Exception { - for (HostAndPort nodeHostAndPort : getNodesHostAndPort()) { - if (isNodeRebalancing(nodeHostAndPort.toString())) { - return false; - } - } - return true; - } - }) - .run(); - } - }) - .build(); - Boolean completed = DynamicTasks.queueIfPossible(reBalance) - .orSubmitAndBlock() - .andWaitForSuccess(); - if (completed) { - entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "completed"); - ServiceStateLogic.ServiceNotUpLogic.clearNotUpIndicator(getEntity(), "rebalancing"); - log.info("Rebalanced cluster via primary node {}", getEntity()); - } else { - entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "timed out"); - ServiceStateLogic.ServiceNotUpLogic.updateNotUpIndicator(getEntity(), "rebalancing", "rebalance did not complete within time limit"); - log.warn("Timeout rebalancing cluster via primary node {}", getEntity()); - } - } - - private Iterable<HostAndPort> getNodesHostAndPort() { - Group group = Iterables.getFirst(getEntity().getGroups(), null); - if (group == null) return Lists.newArrayList(); - return Iterables.transform(group.getAttribute(CouchbaseCluster.COUCHBASE_CLUSTER_UP_NODES), - new Function<Entity, HostAndPort>() { - @Override - public HostAndPort apply(Entity input) { - return BrooklynAccessUtils.getBrooklynAccessibleAddress(input, input.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT)); - } - }); - } - - private boolean isNodeRebalancing(String nodeHostAndPort) { - HttpToolResponse response = getApiResponse("http://" + nodeHostAndPort + "/pools/default/rebalanceProgress"); - if (response.getResponseCode() != 200) { - throw new IllegalStateException("failed retrieving rebalance status: " + response); - } - return !"none".equals(HttpValueFunctions.jsonContents("status", String.class).apply(response)); - } - - private HttpToolResponse getApiResponse(String uri) { - return HttpTool.httpGet(HttpTool.httpClientBuilder() - // the uri is required by the HttpClientBuilder in order to set the AuthScope of the credentials - .uri(uri) - .credentials(new UsernamePasswordCredentials(getUsername(), getPassword())) - .build(), - URI.create(uri), - ImmutableMap.<String, String>of()); - } - - @Override - public void serverAdd(String serverToAdd, String username, String password) { - newScript("serverAdd").body.append(couchbaseCli("server-add") - + getCouchbaseHostnameAndCredentials() + - " --server-add=" + BashStringEscapes.wrapBash(serverToAdd) + - " --server-add-username=" + BashStringEscapes.wrapBash(username) + - " --server-add-password=" + BashStringEscapes.wrapBash(password)) - .failOnNonZeroResultCode() - .execute(); - } - - @Override - public void serverAddAndRebalance(String serverToAdd, String username, String password) { - newScript("serverAddAndRebalance").body.append(couchbaseCli("rebalance") - + getCouchbaseHostnameAndCredentials() + - " --server-add=" + BashStringEscapes.wrapBash(serverToAdd) + - " --server-add-username=" + BashStringEscapes.wrapBash(username) + - " --server-add-password=" + BashStringEscapes.wrapBash(password)) - .failOnNonZeroResultCode() - .execute(); - entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "triggered as part of server-add"); - } - - @Override - public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer bucketRamSize, Integer bucketReplica) { - log.info("Adding bucket: {} to cluster {} primary node: {}", new Object[]{bucketName, CouchbaseClusterImpl.getClusterOrNode(getEntity()), getEntity()}); - - newScript("bucketCreate").body.append(couchbaseCli("bucket-create") - + getCouchbaseHostnameAndCredentials() + - " --bucket=" + BashStringEscapes.wrapBash(bucketName) + - " --bucket-type=" + BashStringEscapes.wrapBash(bucketType) + - " --bucket-port=" + bucketPort + - " --bucket-ramsize=" + bucketRamSize + - " --bucket-replica=" + bucketReplica) - .failOnNonZeroResultCode() - .execute(); - } - - @Override - public void addReplicationRule(Entity toCluster, String fromBucket, String toBucket) { - DynamicTasks.queue(DependentConfiguration.attributeWhenReady(toCluster, Attributes.SERVICE_UP)).getUnchecked(); - - String destName = CouchbaseClusterImpl.getClusterName(toCluster); - - log.info("Setting up XDCR for " + fromBucket + " from " + CouchbaseClusterImpl.getClusterName(getEntity()) + " (via " + getEntity() + ") " - + "to " + destName + " (" + toCluster + ")"); - - Entity destPrimaryNode = toCluster.getAttribute(CouchbaseCluster.COUCHBASE_PRIMARY_NODE); - String destHostname = destPrimaryNode.getAttribute(Attributes.HOSTNAME); - String destUsername = toCluster.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME); - String destPassword = toCluster.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD); - - // on the REST API there is mention of a 'type' 'continuous' but i don't see other refs to this - - // PROTOCOL Select REST protocol or memcached for replication. xmem indicates memcached while capi indicates REST protocol. - // looks like xmem is the default; leave off for now -// String replMode = "xmem"; - - DynamicTasks.queue(TaskTags.markInessential(SshEffectorTasks.ssh( - couchbaseCli("xdcr-setup") + - getCouchbaseHostnameAndCredentials() + - " --create" + - " --xdcr-cluster-name=" + BashStringEscapes.wrapBash(destName) + - " --xdcr-hostname=" + BashStringEscapes.wrapBash(destHostname) + - " --xdcr-username=" + BashStringEscapes.wrapBash(destUsername) + - " --xdcr-password=" + BashStringEscapes.wrapBash(destPassword) - ).summary("create xdcr destination " + destName).newTask())); - - // would be nice to auto-create bucket, but we'll need to know the parameters; the port in particular is tedious -// ((CouchbaseNode)destPrimaryNode).bucketCreate(toBucket, "couchbase", null, 0, 0); - - DynamicTasks.queue(SshEffectorTasks.ssh( - couchbaseCli("xdcr-replicate") + - getCouchbaseHostnameAndCredentials() + - " --create" + - " --xdcr-cluster-name=" + BashStringEscapes.wrapBash(destName) + - " --xdcr-from-bucket=" + BashStringEscapes.wrapBash(fromBucket) + - " --xdcr-to-bucket=" + BashStringEscapes.wrapBash(toBucket) -// + " --xdcr-replication-mode="+replMode - ).summary("configure replication for " + fromBucket + " to " + destName + ":" + toBucket).newTask()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java deleted file mode 100644 index c0740ee..0000000 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.couchbase; - -import brooklyn.config.ConfigKey; -import brooklyn.entity.Entity; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.entity.basic.SoftwareProcess; -import brooklyn.entity.proxying.ImplementedBy; -import brooklyn.event.AttributeSensor; -import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; -import brooklyn.event.basic.PortAttributeSensorAndConfigKey; -import brooklyn.event.basic.Sensors; -import brooklyn.util.flags.SetFromFlag; - -@ImplementedBy(CouchbaseSyncGatewayImpl.class) -public interface CouchbaseSyncGateway extends SoftwareProcess { - - @SetFromFlag("version") - ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, - "1.0-beta3.1"); - - @SetFromFlag("downloadUrl") - BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>( - SoftwareProcess.DOWNLOAD_URL, "http://packages.couchbase.com/releases/couchbase-sync-gateway/1.0-beta/couchbase-sync-gateway-community_${version}_${driver.osTag}"); - - @SetFromFlag("couchbaseServer") - ConfigKey<Entity> COUCHBASE_SERVER = ConfigKeys.newConfigKey(Entity.class, "couchbaseSyncGateway.couchbaseNode", - "Couchbase server node or cluster the sync gateway connects to"); - - @SetFromFlag("serverPool") - ConfigKey<String> COUCHBASE_SERVER_POOL = ConfigKeys.newStringConfigKey("couchbaseSyncGateway.serverPool", - "Couchbase Server pool name in which to find buckets", "default"); - - @SetFromFlag("couchbaseServerBucket") - ConfigKey<String> COUCHBASE_SERVER_BUCKET = ConfigKeys.newStringConfigKey("couchbaseSyncGateway.serverBucket", - "Name of the Couchbase bucket to use", "sync_gateway"); - - @SetFromFlag("pretty") - ConfigKey<Boolean> PRETTY = ConfigKeys.newBooleanConfigKey("couchbaseSyncGateway.pretty", - "Pretty-print JSON responses. This is useful for debugging, but reduces performance.", false); - - @SetFromFlag("verbose") - ConfigKey<Boolean> VERBOSE = ConfigKeys.newBooleanConfigKey("couchbaseSyncGateway.verbose", - "Logs more information about requests.", false); - - AttributeSensor<String> COUCHBASE_SERVER_WEB_URL = Sensors.newStringSensor("couchbaseSyncGateway.serverWebUrl", - "The Url and web port of the couchbase server to connect to"); - - AttributeSensor<String> MANAGEMENT_URL = Sensors.newStringSensor("coucbaseSyncGateway.managementUrl", - "Management URL for Couchbase Sycn Gateway"); - - PortAttributeSensorAndConfigKey SYNC_REST_API_PORT = new PortAttributeSensorAndConfigKey("couchbaseSyncGateway.syncRestPort", - "Port the Sync REST API listens on", "4984"); - - PortAttributeSensorAndConfigKey ADMIN_REST_API_PORT = new PortAttributeSensorAndConfigKey("couchbaseSyncGateway.adminRestPort", - "Port the Admin REST API listens on", "4985"); - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayDriver.java deleted file mode 100644 index 148ec0b..0000000 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayDriver.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.couchbase; - -import brooklyn.entity.basic.SoftwareProcessDriver; - -public interface CouchbaseSyncGatewayDriver extends SoftwareProcessDriver { - - public String getOsTag(); - -} \ No newline at end of file
