http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java new file mode 100644 index 0000000..6d16c9a --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java @@ -0,0 +1,594 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.cassandra; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.math.BigInteger; +import java.net.Socket; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import javax.annotation.Nullable; +import javax.management.ObjectName; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.enricher.RollingTimeWindowMeanEnricher; +import brooklyn.enricher.TimeWeightedDeltaEnricher; +import brooklyn.entity.Entity; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.basic.SoftwareProcessImpl; +import brooklyn.entity.effector.EffectorBody; +import brooklyn.entity.java.JavaAppUtils; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.DependentConfiguration; +import brooklyn.event.basic.Sensors; +import brooklyn.event.feed.function.FunctionFeed; +import brooklyn.event.feed.function.FunctionPollConfig; +import brooklyn.event.feed.jmx.JmxAttributePollConfig; +import brooklyn.event.feed.jmx.JmxFeed; +import brooklyn.event.feed.jmx.JmxHelper; +import brooklyn.event.feed.jmx.JmxOperationPollConfig; +import brooklyn.location.MachineLocation; +import brooklyn.location.MachineProvisioningLocation; +import brooklyn.location.basic.Machines; +import brooklyn.location.cloud.CloudLocationConfig; +import brooklyn.util.collections.MutableSet; +import brooklyn.util.config.ConfigBag; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.guava.Maybe; +import brooklyn.util.text.Strings; +import brooklyn.util.text.TemplateProcessor; +import brooklyn.util.time.Duration; + +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.base.Joiner; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.base.Splitter; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +/** + * Implementation of {@link CassandraNode}. + */ +public class CassandraNodeImpl extends SoftwareProcessImpl implements CassandraNode { + + private static final Logger log = LoggerFactory.getLogger(CassandraNodeImpl.class); + + private final AtomicReference<Boolean> detectedCloudSensors = new AtomicReference<Boolean>(false); + + public CassandraNodeImpl() { + } + + @Override + public void init() { + super.init(); + + getMutableEntityType().addEffector(EXECUTE_SCRIPT, new EffectorBody<String>() { + @Override + public String call(ConfigBag parameters) { + return executeScript((String)parameters.getStringKey("commands")); + } + }); + + Entities.checkRequiredUrl(this, getCassandraConfigTemplateUrl()); + Entities.getRequiredUrlConfig(this, CASSANDRA_RACKDC_CONFIG_TEMPLATE_URL); + + connectEnrichers(); + } + + /** + * Some clouds (e.g. Rackspace) give us VMs that have two nics: one for private and one for public. + * If the private IP is used then it doesn't work, even for a cluster purely internal to Rackspace! + * + * TODO Ugly. Need to understand more and find a better fix. Perhaps in Cassandra itself if necessary. + * Also need to investigate further: + * - does it still fail if BroadcastAddress is set to private IP? + * - is `openIptables` opening it up for both interfaces? + * - for aws->rackspace comms between nodes (thus using the public IP), will it be listening on an accessible port? + * - ideally do a check, open a server on one port on the machine, see if it is contactable on the public address; + * and set that as a flag on the cloud + */ + protected void setCloudPreferredSensorNames() { + if (detectedCloudSensors.get()) return; + synchronized (detectedCloudSensors) { + if (detectedCloudSensors.get()) return; + + MachineProvisioningLocation<?> loc = getProvisioningLocation(); + if (loc != null) { + try { + Method method = loc.getClass().getMethod("getProvider"); + method.setAccessible(true); + String provider = (String) method.invoke(loc); + String result = "(nothing special)"; + if (provider!=null) { + if (provider.contains("rackspace") || provider.contains("cloudservers") || provider.contains("softlayer")) { + /* These clouds have 2 NICs and it has to be consistent, so use public IP here to allow external access; + * (TODO internal access could be configured to improve performance / lower cost, + * if we know all nodes are visible to each other) */ + if (getConfig(LISTEN_ADDRESS_SENSOR)==null) + setConfig(LISTEN_ADDRESS_SENSOR, CassandraNode.ADDRESS.getName()); + if (getConfig(BROADCAST_ADDRESS_SENSOR)==null) + setConfig(BROADCAST_ADDRESS_SENSOR, CassandraNode.ADDRESS.getName()); + result = "public IP for both listen and broadcast"; + } else if (provider.contains("google-compute")) { + /* Google nodes cannot reach themselves/each-other on the public IP, + * and there is no hostname, so use private IP here */ + if (getConfig(LISTEN_ADDRESS_SENSOR)==null) + setConfig(LISTEN_ADDRESS_SENSOR, CassandraNode.SUBNET_HOSTNAME.getName()); + if (getConfig(BROADCAST_ADDRESS_SENSOR)==null) + setConfig(BROADCAST_ADDRESS_SENSOR, CassandraNode.SUBNET_HOSTNAME.getName()); + result = "private IP for both listen and broadcast"; + } + } + log.debug("Cassandra NICs inferred {} for {}; using location {}, based on provider {}", new Object[] {result, this, loc, provider}); + } catch (Exception e) { + log.debug("Cassandra NICs auto-detection failed for {} in location {}: {}", new Object[] {this, loc, e}); + } + } + detectedCloudSensors.set(true); + } + } + + @Override + protected void preStart() { + super.preStart(); + setCloudPreferredSensorNames(); + } + + // Used for freemarker + public String getMajorMinorVersion() { + String version = getConfig(CassandraNode.SUGGESTED_VERSION); + if (Strings.isBlank(version)) return ""; + List<String> versionParts = ImmutableList.copyOf(Splitter.on(".").split(version)); + return versionParts.get(0) + (versionParts.size() > 1 ? "."+versionParts.get(1) : ""); + } + + public String getCassandraConfigTemplateUrl() { + String templatedUrl = getConfig(CassandraNode.CASSANDRA_CONFIG_TEMPLATE_URL); + return TemplateProcessor.processTemplateContents(templatedUrl, this, ImmutableMap.<String, Object>of()); + } + + @Override public Integer getGossipPort() { return getAttribute(CassandraNode.GOSSIP_PORT); } + @Override public Integer getSslGossipPort() { return getAttribute(CassandraNode.SSL_GOSSIP_PORT); } + @Override public Integer getThriftPort() { return getAttribute(CassandraNode.THRIFT_PORT); } + @Override public Integer getNativeTransportPort() { return getAttribute(CassandraNode.NATIVE_TRANSPORT_PORT); } + @Override public String getClusterName() { return getAttribute(CassandraNode.CLUSTER_NAME); } + + @Override public int getNumTokensPerNode() { + return getConfig(CassandraNode.NUM_TOKENS_PER_NODE); + } + + @Deprecated + @Override public BigInteger getToken() { + BigInteger token = getAttribute(CassandraNode.TOKEN); + if (token == null) { + token = getConfig(CassandraNode.TOKEN); + } + return token; + } + + @Override public Set<BigInteger> getTokens() { + // Prefer an already-set attribute over the config. + // Prefer TOKENS over TOKEN. + Set<BigInteger> tokens = getAttribute(CassandraNode.TOKENS); + if (tokens == null) { + BigInteger token = getAttribute(CassandraNode.TOKEN); + if (token != null) { + tokens = ImmutableSet.of(token); + } + } + if (tokens == null) { + tokens = getConfig(CassandraNode.TOKENS); + } + if (tokens == null) { + BigInteger token = getConfig(CassandraNode.TOKEN); + if (token != null) { + tokens = ImmutableSet.of(token); + } + } + return tokens; + } + + @Deprecated + @Override public String getTokenAsString() { + BigInteger token = getToken(); + if (token==null) return ""; + return ""+token; + } + + @Override public String getTokensAsString() { + // TODO check what is required when replacing failed node. + // with vnodes in Cassandra 2.x, don't bother supplying token + Set<BigInteger> tokens = getTokens(); + if (tokens == null) return ""; + return Joiner.on(",").join(tokens); + } + + @Override public String getListenAddress() { + String sensorName = getConfig(LISTEN_ADDRESS_SENSOR); + if (Strings.isNonBlank(sensorName)) + return Entities.submit(this, DependentConfiguration.attributeWhenReady(this, Sensors.newStringSensor(sensorName))).getUnchecked(); + + String subnetAddress = getAttribute(CassandraNode.SUBNET_ADDRESS); + return Strings.isNonBlank(subnetAddress) ? subnetAddress : getAttribute(CassandraNode.ADDRESS); + } + @Override public String getBroadcastAddress() { + String sensorName = getConfig(BROADCAST_ADDRESS_SENSOR); + if (Strings.isNonBlank(sensorName)) + return Entities.submit(this, DependentConfiguration.attributeWhenReady(this, Sensors.newStringSensor(sensorName))).getUnchecked(); + + String snitchName = getConfig(CassandraNode.ENDPOINT_SNITCH_NAME); + if (snitchName.equals("Ec2MultiRegionSnitch") || snitchName.contains("MultiCloudSnitch")) { + // http://www.datastax.com/documentation/cassandra/2.0/mobile/cassandra/architecture/architectureSnitchEC2MultiRegion_c.html + // describes that the listen_address is set to the private IP, and the broadcast_address is set to the public IP. + return getAttribute(CassandraNode.ADDRESS); + } else if (!getDriver().isClustered()) { + return getListenAddress(); + } else { + // In other situations, prefer the hostname, so other regions can see it + // *Unless* hostname resolves at the target to a local-only interface which is different to ADDRESS + // (workaround for issue deploying to localhost) + String hostname = getAttribute(CassandraNode.HOSTNAME); + try { + String resolvedAddress = getDriver().getResolvedAddress(hostname); + if (resolvedAddress==null) { + log.debug("Cassandra using broadcast address "+getListenAddress()+" for "+this+" because hostname "+hostname+" could not be resolved at remote machine"); + return getListenAddress(); + } + if (resolvedAddress.equals("127.0.0.1")) { + log.debug("Cassandra using broadcast address "+getListenAddress()+" for "+this+" because hostname "+hostname+" resolves to 127.0.0.1"); + return getListenAddress(); + } + return hostname; + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + log.warn("Error resolving hostname "+hostname+" for "+this+": "+e, e); + return hostname; + } + } + } + /** not always the private IP, if public IP has been insisted on for broadcast, e.g. setting up a rack topology */ + // have not confirmed this does the right thing in all clouds ... only used for rack topology however + public String getPrivateIp() { + String sensorName = getConfig(BROADCAST_ADDRESS_SENSOR); + if (Strings.isNonBlank(sensorName)) { + return getAttribute(Sensors.newStringSensor(sensorName)); + } else { + String subnetAddress = getAttribute(CassandraNode.SUBNET_ADDRESS); + return Strings.isNonBlank(subnetAddress) ? subnetAddress : getAttribute(CassandraNode.ADDRESS); + } + } + public String getPublicIp() { + // may need to be something else in google + return getAttribute(CassandraNode.ADDRESS); + } + + @Override public String getRpcAddress() { + String sensorName = getConfig(RPC_ADDRESS_SENSOR); + if (Strings.isNonBlank(sensorName)) + return Entities.submit(this, DependentConfiguration.attributeWhenReady(this, Sensors.newStringSensor(sensorName))).getUnchecked(); + return "0.0.0.0"; + } + + @Override public String getSeeds() { + Set<Entity> seeds = getConfig(CassandraNode.INITIAL_SEEDS); + if (seeds==null) { + log.warn("No seeds available when requested for "+this, new Throwable("source of no Cassandra seeds when requested")); + return null; + } + String snitchName = getConfig(CassandraNode.ENDPOINT_SNITCH_NAME); + MutableSet<String> seedsHostnames = MutableSet.of(); + for (Entity entity : seeds) { + // tried removing ourselves if there are other nodes, but that is a BAD idea! + // blows up with a "java.lang.RuntimeException: No other nodes seen!" + + if (snitchName.equals("Ec2MultiRegionSnitch") || snitchName.contains("MultiCloudSnitch")) { + // http://www.datastax.com/documentation/cassandra/2.0/mobile/cassandra/architecture/architectureSnitchEC2MultiRegion_c.html + // says the seeds should be public IPs. + seedsHostnames.add(entity.getAttribute(CassandraNode.ADDRESS)); + } else { + String sensorName = getConfig(BROADCAST_ADDRESS_SENSOR); + if (Strings.isNonBlank(sensorName)) { + seedsHostnames.add(entity.getAttribute(Sensors.newStringSensor(sensorName))); + } else { + Maybe<String> optionalSeedHostname = Machines.findSubnetOrPublicHostname(entity); + if (optionalSeedHostname.isPresent()) { + String seedHostname = optionalSeedHostname.get(); + seedsHostnames.add(seedHostname); + } else { + log.warn("In node {}, seed hostname missing for {}; not including in seeds list", this, entity); + } + } + } + } + + String result = Strings.join(seedsHostnames, ","); + log.info("Seeds for {}: {}", this, result); + return result; + } + + // referenced by cassandra-rackdc.properties, read by some of the cassandra snitches + public String getDatacenterName() { + String name = getAttribute(CassandraNode.DATACENTER_NAME); + if (name == null) { + MachineLocation machine = getMachineOrNull(); + MachineProvisioningLocation<?> provisioningLocation = getProvisioningLocation(); + if (machine != null) { + name = machine.getConfig(CloudLocationConfig.CLOUD_REGION_ID); + } + if (name == null && provisioningLocation != null) { + name = provisioningLocation.getConfig(CloudLocationConfig.CLOUD_REGION_ID); + } + if (name == null) { + name = "UNKNOWN_DATACENTER"; + } + setAttribute((AttributeSensor<String>)DATACENTER_NAME, name); + } + return name; + } + + public String getRackName() { + String name = getAttribute(CassandraNode.RACK_NAME); + if (name == null) { + MachineLocation machine = getMachineOrNull(); + MachineProvisioningLocation<?> provisioningLocation = getProvisioningLocation(); + if (machine != null) { + name = machine.getConfig(CloudLocationConfig.CLOUD_AVAILABILITY_ZONE_ID); + } + if (name == null && provisioningLocation != null) { + name = provisioningLocation.getConfig(CloudLocationConfig.CLOUD_AVAILABILITY_ZONE_ID); + } + if (name == null) { + name = "UNKNOWN_RACK"; + } + setAttribute((AttributeSensor<String>)RACK_NAME, name); + } + return name; + } + + @Override + public Class<? extends CassandraNodeDriver> getDriverInterface() { + return CassandraNodeDriver.class; + } + + @Override + public CassandraNodeDriver getDriver() { + return (CassandraNodeDriver) super.getDriver(); + } + + private volatile JmxFeed jmxFeed; + private volatile FunctionFeed functionFeed; + private JmxFeed jmxMxBeanFeed; + private JmxHelper jmxHelper; + private ObjectName storageServiceMBean = JmxHelper.createObjectName("org.apache.cassandra.db:type=StorageService"); + private ObjectName readStageMBean = JmxHelper.createObjectName("org.apache.cassandra.request:type=ReadStage"); + private ObjectName mutationStageMBean = JmxHelper.createObjectName("org.apache.cassandra.request:type=MutationStage"); + private ObjectName snitchMBean = JmxHelper.createObjectName("org.apache.cassandra.db:type=EndpointSnitchInfo"); + + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + protected void connectSensors() { + // "cassandra" isn't really a protocol, but okay for now + setAttribute(DATASTORE_URL, "cassandra://"+getAttribute(HOSTNAME)+":"+getAttribute(THRIFT_PORT)); + + super.connectSensors(); + + jmxHelper = new JmxHelper(this); + jmxFeed = JmxFeed.builder() + .entity(this) + .period(3000, TimeUnit.MILLISECONDS) + .helper(jmxHelper) + .pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP_JMX) + .objectName(storageServiceMBean) + .attributeName("Initialized") + .onSuccess(Functions.forPredicate(Predicates.notNull())) + .onException(Functions.constant(false))) + .pollAttribute(new JmxAttributePollConfig<Set<BigInteger>>(TOKENS) + .objectName(storageServiceMBean) + .attributeName("TokenToEndpointMap") + .onSuccess(new Function<Object, Set<BigInteger>>() { + @Override + public Set<BigInteger> apply(@Nullable Object arg) { + Map input = (Map)arg; + if (input == null || input.isEmpty()) return null; + // FIXME does not work on aws-ec2, uses RFC1918 address + Predicate<String> self = Predicates.in(ImmutableList.of(getAttribute(HOSTNAME), getAttribute(ADDRESS), getAttribute(SUBNET_ADDRESS), getAttribute(SUBNET_HOSTNAME))); + Set<String> tokens = Maps.filterValues(input, self).keySet(); + Set<BigInteger> result = Sets.newLinkedHashSet(); + for (String token : tokens) { + result.add(new BigInteger(token)); + } + return result; + }}) + .onException(Functions.<Set<BigInteger>>constant(null))) + .pollAttribute(new JmxAttributePollConfig<BigInteger>(TOKEN) + .objectName(storageServiceMBean) + .attributeName("TokenToEndpointMap") + .onSuccess(new Function<Object, BigInteger>() { + @Override + public BigInteger apply(@Nullable Object arg) { + Map input = (Map)arg; + // TODO remove duplication from setting TOKENS + if (input == null || input.isEmpty()) return null; + // FIXME does not work on aws-ec2, uses RFC1918 address + Predicate<String> self = Predicates.in(ImmutableList.of(getAttribute(HOSTNAME), getAttribute(ADDRESS), getAttribute(SUBNET_ADDRESS), getAttribute(SUBNET_HOSTNAME))); + Set<String> tokens = Maps.filterValues(input, self).keySet(); + String token = Iterables.getFirst(tokens, null); + return (token != null) ? new BigInteger(token) : null; + }}) + .onException(Functions.<BigInteger>constant(null))) + .pollOperation(new JmxOperationPollConfig<String>(DATACENTER_NAME) + .period(60, TimeUnit.SECONDS) + .objectName(snitchMBean) + .operationName("getDatacenter") + .operationParams(ImmutableList.of(getBroadcastAddress())) + .onException(Functions.<String>constant(null))) + .pollOperation(new JmxOperationPollConfig<String>(RACK_NAME) + .period(60, TimeUnit.SECONDS) + .objectName(snitchMBean) + .operationName("getRack") + .operationParams(ImmutableList.of(getBroadcastAddress())) + .onException(Functions.<String>constant(null))) + .pollAttribute(new JmxAttributePollConfig<Integer>(PEERS) + .objectName(storageServiceMBean) + .attributeName("TokenToEndpointMap") + .onSuccess(new Function<Object, Integer>() { + @Override + public Integer apply(@Nullable Object arg) { + Map input = (Map)arg; + if (input == null || input.isEmpty()) return 0; + return input.size(); + } + }) + .onException(Functions.constant(-1))) + .pollAttribute(new JmxAttributePollConfig<Integer>(LIVE_NODE_COUNT) + .objectName(storageServiceMBean) + .attributeName("LiveNodes") + .onSuccess(new Function<Object, Integer>() { + @Override + public Integer apply(@Nullable Object arg) { + List input = (List)arg; + if (input == null || input.isEmpty()) return 0; + return input.size(); + } + }) + .onException(Functions.constant(-1))) + .pollAttribute(new JmxAttributePollConfig<Integer>(READ_ACTIVE) + .objectName(readStageMBean) + .attributeName("ActiveCount") + .onException(Functions.constant((Integer)null))) + .pollAttribute(new JmxAttributePollConfig<Long>(READ_PENDING) + .objectName(readStageMBean) + .attributeName("PendingTasks") + .onException(Functions.constant((Long)null))) + .pollAttribute(new JmxAttributePollConfig<Long>(READ_COMPLETED) + .objectName(readStageMBean) + .attributeName("CompletedTasks") + .onException(Functions.constant((Long)null))) + .pollAttribute(new JmxAttributePollConfig<Integer>(WRITE_ACTIVE) + .objectName(mutationStageMBean) + .attributeName("ActiveCount") + .onException(Functions.constant((Integer)null))) + .pollAttribute(new JmxAttributePollConfig<Long>(WRITE_PENDING) + .objectName(mutationStageMBean) + .attributeName("PendingTasks") + .onException(Functions.constant((Long)null))) + .pollAttribute(new JmxAttributePollConfig<Long>(WRITE_COMPLETED) + .objectName(mutationStageMBean) + .attributeName("CompletedTasks") + .onException(Functions.constant((Long)null))) + .build(); + + functionFeed = FunctionFeed.builder() + .entity(this) + .period(3000, TimeUnit.MILLISECONDS) + .poll(new FunctionPollConfig<Long, Long>(THRIFT_PORT_LATENCY) + .onException(Functions.constant((Long)null)) + .callable(new Callable<Long>() { + public Long call() { + try { + long start = System.currentTimeMillis(); + Socket s = new Socket(getAttribute(Attributes.HOSTNAME), getThriftPort()); + s.close(); + long latency = System.currentTimeMillis() - start; + computeServiceUp(); + return latency; + } catch (Exception e) { + if (log.isDebugEnabled()) + log.debug("Cassandra thrift port poll failure: "+e); + setAttribute(SERVICE_UP, false); + return null; + } + } + public void computeServiceUp() { + // this will wait an additional poll period after thrift port is up, + // as the caller will not have set yet, but that will help ensure it is really healthy! + setAttribute(SERVICE_UP, + getAttribute(THRIFT_PORT_LATENCY)!=null && getAttribute(THRIFT_PORT_LATENCY)>=0 && + Boolean.TRUE.equals(getAttribute(SERVICE_UP_JMX))); + } + })) + .build(); + + jmxMxBeanFeed = JavaAppUtils.connectMXBeanSensors(this); + } + + protected void connectEnrichers() { + connectEnrichers(Duration.TEN_SECONDS); + } + + protected void connectEnrichers(Duration windowPeriod) { + JavaAppUtils.connectJavaAppServerPolicies(this); + + addEnricher(TimeWeightedDeltaEnricher.<Long>getPerSecondDeltaEnricher(this, READ_COMPLETED, READS_PER_SECOND_LAST)); + addEnricher(TimeWeightedDeltaEnricher.<Long>getPerSecondDeltaEnricher(this, WRITE_COMPLETED, WRITES_PER_SECOND_LAST)); + + if (windowPeriod!=null) { + addEnricher(new RollingTimeWindowMeanEnricher<Long>(this, THRIFT_PORT_LATENCY, + THRIFT_PORT_LATENCY_IN_WINDOW, windowPeriod)); + addEnricher(new RollingTimeWindowMeanEnricher<Double>(this, READS_PER_SECOND_LAST, + READS_PER_SECOND_IN_WINDOW, windowPeriod)); + addEnricher(new RollingTimeWindowMeanEnricher<Double>(this, WRITES_PER_SECOND_LAST, + WRITES_PER_SECOND_IN_WINDOW, windowPeriod)); + } + } + + @Override + public void disconnectSensors() { + super.disconnectSensors(); + + if (jmxFeed != null) jmxFeed.stop(); + if (jmxMxBeanFeed != null) jmxMxBeanFeed.stop(); + if (jmxHelper != null) jmxHelper.terminate(); + if (functionFeed != null) functionFeed.stop(); + } + + @Override + public void setToken(String token) { + try { + if (!jmxHelper.isConnected()) jmxHelper.connect();; + jmxHelper.operation(storageServiceMBean, "move", token); + log.info("Moved server {} to token {}", getId(), token); + } catch (IOException ioe) { + Throwables.propagate(ioe); + } + } + + @Override + public String executeScript(String commands) { + return getDriver().executeScriptAsync(commands).block().getStdout(); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeSshDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeSshDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeSshDriver.java new file mode 100644 index 0000000..d9fd1c1 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeSshDriver.java @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.cassandra; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +import brooklyn.entity.Entity; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.basic.EntityLocal; +import brooklyn.entity.database.DatastoreMixins; +import brooklyn.entity.java.JavaSoftwareProcessSshDriver; +import brooklyn.entity.java.UsesJmx; +import brooklyn.entity.software.SshEffectorTasks; +import brooklyn.event.basic.DependentConfiguration; +import brooklyn.location.Location; +import brooklyn.location.access.BrooklynAccessUtils; +import brooklyn.location.basic.Machines; +import brooklyn.location.basic.SshMachineLocation; +import brooklyn.management.TaskWrapper; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.collections.MutableSet; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.guava.Maybe; +import brooklyn.util.net.Networking; +import brooklyn.util.os.Os; +import brooklyn.util.ssh.BashCommands; +import brooklyn.util.stream.Streams; +import brooklyn.util.task.DynamicTasks; +import brooklyn.util.task.Tasks; +import brooklyn.util.task.system.ProcessTaskWrapper; +import brooklyn.util.text.Identifiers; +import brooklyn.util.text.Strings; +import brooklyn.util.text.TemplateProcessor; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +/** + * Start a {@link CassandraNode} in a {@link Location} accessible over ssh. + */ +public class CassandraNodeSshDriver extends JavaSoftwareProcessSshDriver implements CassandraNodeDriver { + + private static final Logger log = LoggerFactory.getLogger(CassandraNodeSshDriver.class); + + protected Maybe<String> resolvedAddressCache = Maybe.absent(); + + public CassandraNodeSshDriver(CassandraNodeImpl entity, SshMachineLocation machine) { + super(entity, machine); + } + + @Override + protected String getLogFileLocation() { return Os.mergePathsUnix(getRunDir(),"cassandra.log"); } + + @Override + public Integer getGossipPort() { return entity.getAttribute(CassandraNode.GOSSIP_PORT); } + + @Override + public Integer getSslGossipPort() { return entity.getAttribute(CassandraNode.SSL_GOSSIP_PORT); } + + @Override + public Integer getThriftPort() { return entity.getAttribute(CassandraNode.THRIFT_PORT); } + + @Override + public Integer getNativeTransportPort() { return entity.getAttribute(CassandraNode.NATIVE_TRANSPORT_PORT); } + + @Override + public String getClusterName() { return entity.getAttribute(CassandraNode.CLUSTER_NAME); } + + @Override + public String getCassandraConfigTemplateUrl() { + String templatedUrl = entity.getConfig(CassandraNode.CASSANDRA_CONFIG_TEMPLATE_URL); + return TemplateProcessor.processTemplateContents(templatedUrl, this, ImmutableMap.<String, Object>of()); + } + + @Override + public String getCassandraConfigFileName() { return entity.getConfig(CassandraNode.CASSANDRA_CONFIG_FILE_NAME); } + + public String getEndpointSnitchName() { return entity.getConfig(CassandraNode.ENDPOINT_SNITCH_NAME); } + + public String getCassandraRackdcConfigTemplateUrl() { return entity.getConfig(CassandraNode.CASSANDRA_RACKDC_CONFIG_TEMPLATE_URL); } + + public String getCassandraRackdcConfigFileName() { return entity.getConfig(CassandraNode.CASSANDRA_RACKDC_CONFIG_FILE_NAME); } + + public String getMirrorUrl() { return entity.getConfig(CassandraNode.MIRROR_URL); } + + protected String getDefaultUnpackedDirectoryName() { + return "apache-cassandra-"+getVersion(); + } + + protected boolean isV2() { + String version = getVersion(); + return version.startsWith("2."); + } + + @Override + public boolean installJava() { + if (isV2()) { + return checkForAndInstallJava("1.8"); + } else { + return super.installJava(); + } + } + + @Override + public void preInstall() { + resolver = Entities.newDownloader(this); + setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(getDefaultUnpackedDirectoryName()))); + } + + @Override + public void install() { + List<String> urls = resolver.getTargets(); + String saveAs = resolver.getFilename(); + + List<String> commands = ImmutableList.<String>builder() + .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs)) + .add(BashCommands.INSTALL_TAR) + .add("tar xzfv " + saveAs) + .build(); + + newScript(INSTALLING) + .body.append(commands) + .execute(); + } + + @Override + public Set<Integer> getPortsUsed() { + return ImmutableSet.<Integer>builder() + .addAll(super.getPortsUsed()) + .addAll(getPortMap().values()) + .build(); + } + + protected Map<String, Integer> getPortMap() { + return ImmutableMap.<String, Integer>builder() + .put("jmxPort", entity.getAttribute(UsesJmx.JMX_PORT)) + .put("rmiPort", entity.getAttribute(UsesJmx.RMI_REGISTRY_PORT)) + .put("gossipPort", getGossipPort()) + .put("sslGossipPort", getSslGossipPort()) + .put("thriftPort", getThriftPort()) + .build(); + } + + @Override + public void customize() { + log.debug("Customizing {} (Cluster {})", entity, getClusterName()); + Networking.checkPortsValid(getPortMap()); + + customizeInitialSeeds(); + + String logFileEscaped = getLogFileLocation().replace("/", "\\/"); // escape slashes + + ImmutableList.Builder<String> commands = new ImmutableList.Builder<String>() + .add(String.format("cp -R %s/{bin,conf,lib,interface,pylib,tools} .", getExpandedInstallDir())) + .add("mkdir -p data") + .add("mkdir -p brooklyn_commands") + .add(String.format("sed -i.bk 's/log4j.appender.R.File=.*/log4j.appender.R.File=%s/g' %s/conf/log4j-server.properties", logFileEscaped, getRunDir())) + .add(String.format("sed -i.bk '/JMX_PORT/d' %s/conf/cassandra-env.sh", getRunDir())) + // Script sets 180k on Linux which gives Java error: The stack size specified is too small, Specify at least 228k + .add(String.format("sed -i.bk 's/-Xss180k/-Xss280k/g' %s/conf/cassandra-env.sh", getRunDir())); + + newScript(CUSTOMIZING) + .body.append(commands.build()) + .failOnNonZeroResultCode() + .execute(); + + // Copy the cassandra.yaml configuration file across + String destinationConfigFile = Os.mergePathsUnix(getRunDir(), "conf", getCassandraConfigFileName()); + copyTemplate(getCassandraConfigTemplateUrl(), destinationConfigFile); + + // Copy the cassandra-rackdc.properties configuration file across + String rackdcDestinationFile = Os.mergePathsUnix(getRunDir(), "conf", getCassandraRackdcConfigFileName()); + copyTemplate(getCassandraRackdcConfigTemplateUrl(), rackdcDestinationFile); + + customizeCopySnitch(); + } + + protected void customizeCopySnitch() { + // Copy the custom snitch jar file across + String customSnitchJarUrl = entity.getConfig(CassandraNode.CUSTOM_SNITCH_JAR_URL); + if (Strings.isNonBlank(customSnitchJarUrl)) { + int lastSlashIndex = customSnitchJarUrl.lastIndexOf("/"); + String customSnitchJarName = (lastSlashIndex > 0) ? customSnitchJarUrl.substring(lastSlashIndex+1) : "customBrooklynSnitch.jar"; + String jarDestinationFile = Os.mergePathsUnix(getRunDir(), "lib", customSnitchJarName); + InputStream customSnitchJarStream = checkNotNull(resource.getResourceFromUrl(customSnitchJarUrl), "%s could not be loaded", customSnitchJarUrl); + try { + getMachine().copyTo(customSnitchJarStream, jarDestinationFile); + } finally { + Streams.closeQuietly(customSnitchJarStream); + } + } + } + + protected void customizeInitialSeeds() { + if (entity.getConfig(CassandraNode.INITIAL_SEEDS)==null) { + if (isClustered()) { + entity.setConfig(CassandraNode.INITIAL_SEEDS, + DependentConfiguration.attributeWhenReady(entity.getParent(), CassandraDatacenter.CURRENT_SEEDS)); + } else { + entity.setConfig(CassandraNode.INITIAL_SEEDS, MutableSet.<Entity>of(entity)); + } + } + } + + @Override + public boolean isClustered() { + return entity.getParent() instanceof CassandraDatacenter; + } + + @Override + public void launch() { + String subnetHostname = Machines.findSubnetOrPublicHostname(entity).get(); + Set<Entity> seeds = getEntity().getConfig(CassandraNode.INITIAL_SEEDS); + List<Entity> ancestors = getCassandraAncestors(); + log.info("Launching " + entity + ": " + + "cluster "+getClusterName()+", " + + "hostname (public) " + getEntity().getAttribute(Attributes.HOSTNAME) + ", " + + "hostname (subnet) " + subnetHostname + ", " + + "seeds "+((CassandraNode)entity).getSeeds()+" (from "+seeds+")"); + + boolean isFirst = seeds.iterator().next().equals(entity); + if (isClustered() && !isFirst && CassandraDatacenter.WAIT_FOR_FIRST) { + // wait for the first node + long firstStartTime = Entities.submit(entity, DependentConfiguration.attributeWhenReady( + ancestors.get(ancestors.size()-1), CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC)).getUnchecked(); + // optionally force a delay before starting subsequent nodes; see comment at CassandraCluster.DELAY_AFTER_FIRST + Duration toWait = Duration.millis(firstStartTime + CassandraDatacenter.DELAY_AFTER_FIRST.toMilliseconds() - System.currentTimeMillis()); + if (toWait.toMilliseconds()>0) { + log.info("Launching " + entity + ": delaying launch of non-first node by "+toWait+" to prevent schema disagreements"); + Tasks.setBlockingDetails("Pausing to ensure first node has time to start"); + Time.sleep(toWait); + Tasks.resetBlockingDetails(); + } + } + + List<Entity> queuedStart = null; + if (CassandraDatacenter.DELAY_BETWEEN_STARTS!=null && !ancestors.isEmpty()) { + Entity root = ancestors.get(ancestors.size()-1); + // TODO currently use the class as a semaphore; messy, and obviously will not federate; + // should develop a brooklyn framework semaphore (similar to that done on SshMachineLocation) + // and use it - note however the synch block is very very short so relatively safe at least + synchronized (CassandraNode.class) { + queuedStart = root.getAttribute(CassandraDatacenter.QUEUED_START_NODES); + if (queuedStart==null) { + queuedStart = new ArrayList<Entity>(); + ((EntityLocal)root).setAttribute(CassandraDatacenter.QUEUED_START_NODES, queuedStart); + } + queuedStart.add(getEntity()); + ((EntityLocal)root).setAttribute(CassandraDatacenter.QUEUED_START_NODES, queuedStart); + } + do { + // get it again in case it is backed by something external + queuedStart = root.getAttribute(CassandraDatacenter.QUEUED_START_NODES); + if (queuedStart.get(0).equals(getEntity())) break; + synchronized (queuedStart) { + try { + queuedStart.wait(1000); + } catch (InterruptedException e) { + Exceptions.propagate(e); + } + } + } while (true); + + // TODO should look at last start time... but instead we always wait + CassandraDatacenter.DELAY_BETWEEN_STARTS.countdownTimer().waitForExpiryUnchecked(); + } + + try { + // Relies on `bin/cassandra -p <pidfile>`, rather than us writing pid file ourselves. + newScript(MutableMap.of(USE_PID_FILE, false), LAUNCHING) + .body.append( + // log the date to attempt to debug occasional http://wiki.apache.org/cassandra/FAQ#schema_disagreement + // (can be caused by machines out of synch time-wise; but in our case it seems to be caused by other things!) + "echo date on cassandra server `hostname` when launching is `date`", + launchEssentialCommand(), + "echo after essential command") + .execute(); + if (!isClustered()) { + InputStream creationScript = DatastoreMixins.getDatabaseCreationScript(entity); + if (creationScript!=null) { + Tasks.setBlockingDetails("Pausing to ensure Cassandra (singleton) has started before running creation script"); + Time.sleep(Duration.seconds(20)); + Tasks.resetBlockingDetails(); + executeScriptAsync(Streams.readFullyString(creationScript)); + } + } + if (isClustered() && isFirst) { + for (Entity ancestor: getCassandraAncestors()) { + ((EntityLocal)ancestor).setAttribute(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, System.currentTimeMillis()); + } + } + } finally { + if (queuedStart!=null) { + Entity head = queuedStart.remove(0); + checkArgument(head.equals(getEntity()), "first queued node was "+head+" but we are "+getEntity()); + synchronized (queuedStart) { + queuedStart.notifyAll(); + } + } + } + } + + /** returns cassandra-related ancestors (datacenter, fabric), with datacenter first and fabric last */ + protected List<Entity> getCassandraAncestors() { + List<Entity> result = new ArrayList<Entity>(); + Entity ancestor = getEntity().getParent(); + while (ancestor!=null) { + if (ancestor instanceof CassandraDatacenter || ancestor instanceof CassandraFabric) + result.add(ancestor); + ancestor = ancestor.getParent(); + } + return result; + } + + protected String launchEssentialCommand() { + if (isV2()) { + return String.format("./bin/cassandra -p %s > ./cassandra-console.log 2>&1", getPidFile()); + } else { + // TODO Could probably get rid of the nohup here, as script does equivalent itself + // with `exec ... <&- &` + return String.format("nohup ./bin/cassandra -p %s > ./cassandra-console.log 2>&1 &", getPidFile()); + } + } + + public String getPidFile() { return Os.mergePathsUnix(getRunDir(), "cassandra.pid"); } + + @Override + public boolean isRunning() { + return newScript(MutableMap.of(USE_PID_FILE, getPidFile()), CHECK_RUNNING).execute() == 0; + } + + @Override + public void stop() { + newScript(MutableMap.of(USE_PID_FILE, getPidFile()), STOPPING).execute(); + } + + @SuppressWarnings("unchecked") + @Override + protected Map<String,String> getCustomJavaSystemProperties() { + return MutableMap.<String, String>builder() + .putAll(super.getCustomJavaSystemProperties()) + .put("cassandra.config", getCassandraConfigFileName()) + .build(); + } + + @Override + public Map<String, String> getShellEnvironment() { + return MutableMap.<String, String>builder() + .putAll(super.getShellEnvironment()) + .put("CASSANDRA_HOME", getRunDir()) + .put("CASSANDRA_CONF", Os.mergePathsUnix(getRunDir(), "conf")) + .renameKey("JAVA_OPTS", "JVM_OPTS") + .build(); + } + + @Override + public ProcessTaskWrapper<Integer> executeScriptAsync(String commands) { + String fileToRun = Os.mergePathsUnix("brooklyn_commands", "cassandra-commands-"+Identifiers.makeRandomId(8)); + TaskWrapper<Void> task = SshEffectorTasks.put(Os.mergePathsUnix(getRunDir(), fileToRun)) + .machine(getMachine()) + .contents(commands) + .summary("copying cassandra script to execute "+fileToRun) + .newTask(); + DynamicTasks.queueIfPossible(task).orSubmitAndBlock(getEntity()).andWaitForSuccess(); + return executeScriptFromInstalledFileAsync(fileToRun); + } + + public ProcessTaskWrapper<Integer> executeScriptFromInstalledFileAsync(String fileToRun) { + ProcessTaskWrapper<Integer> task = SshEffectorTasks.ssh( + "cd "+getRunDir(), + scriptInvocationCommand(getThriftPort(), fileToRun)) + .machine(getMachine()) + .summary("executing cassandra script "+fileToRun) + .newTask(); + DynamicTasks.queueIfPossible(task).orSubmitAndBlock(getEntity()); + return task; + } + + protected String scriptInvocationCommand(Integer optionalThriftPort, String fileToRun) { + return "bin/cassandra-cli " + + (optionalThriftPort != null ? "--port " + optionalThriftPort : "") + + " --file "+fileToRun; + } + + @Override + public String getResolvedAddress(String hostname) { + return resolvedAddressCache.or(BrooklynAccessUtils.resolvedAddressSupplier(getEntity(), getMachine(), hostname)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/TokenGenerator.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/TokenGenerator.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/TokenGenerator.java new file mode 100644 index 0000000..6401c03 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/TokenGenerator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.cassandra; + +import java.math.BigInteger; +import java.util.Set; + +public interface TokenGenerator { + + BigInteger max(); + BigInteger min(); + BigInteger range(); + + void setOrigin(BigInteger shift); + + BigInteger newToken(); + + BigInteger getTokenForReplacementNode(BigInteger oldToken); + + Set<BigInteger> getTokensForReplacementNode(Set<BigInteger> oldTokens); + + /** + * Indicates that we are starting a new cluster of the given number of nodes, + * so expect that number of consecutive calls to {@link #newToken()}. + * + * @param numNewNodes + */ + void growingCluster(int numNewNodes); + + void shrinkingCluster(Set<BigInteger> nodesToRemove); + + void refresh(Set<BigInteger> currentNodes); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/TokenGenerators.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/TokenGenerators.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/TokenGenerators.java new file mode 100644 index 0000000..b1362d2 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/TokenGenerators.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.cassandra; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.Serializable; +import java.math.BigInteger; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import brooklyn.util.collections.MutableList; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public class TokenGenerators { + + /** + * Sub-classes are recommended to call {@link #checkRangeValid()} at construction time. + */ + public static abstract class AbstractTokenGenerator implements TokenGenerator, Serializable { + + private static final long serialVersionUID = -1884526356161711176L; + + public static final BigInteger TWO = BigInteger.valueOf(2); + + public abstract BigInteger max(); + public abstract BigInteger min(); + public abstract BigInteger range(); + + private final Set<BigInteger> currentTokens = Sets.newTreeSet(); + private final List<BigInteger> nextTokens = Lists.newArrayList(); + private BigInteger origin = BigInteger.ZERO; + + protected void checkRangeValid() { + Preconditions.checkState(range().equals(max().subtract(min()).add(BigInteger.ONE)), + "min=%s; max=%s; range=%s", min(), max(), range()); + } + + @Override + public void setOrigin(BigInteger shift) { + this.origin = Preconditions.checkNotNull(shift, "shift"); + } + + /** + * Unless we're explicitly starting a new cluster or resizing by a pre-defined number of nodes, then + * let Cassandra decide (i.e. return null). + */ + @Override + public synchronized BigInteger newToken() { + BigInteger result = (nextTokens.isEmpty()) ? null : nextTokens.remove(0); + if (result != null) currentTokens.add(result); + return result; + } + + @Override + public synchronized BigInteger getTokenForReplacementNode(BigInteger oldToken) { + checkNotNull(oldToken, "oldToken"); + return normalize(oldToken.subtract(BigInteger.ONE)); + } + + @Override + public synchronized Set<BigInteger> getTokensForReplacementNode(Set<BigInteger> oldTokens) { + checkNotNull(oldTokens, "oldToken"); + Set<BigInteger> result = Sets.newLinkedHashSet(); + for (BigInteger oldToken : oldTokens) { + result.add(getTokenForReplacementNode(oldToken)); + } + return result; + } + + @Override + public synchronized void growingCluster(int numNewNodes) { + if (currentTokens.isEmpty() && nextTokens.isEmpty()) { + nextTokens.addAll(generateEquidistantTokens(numNewNodes)); + } else { + // simple strategy which iteratively finds best midpoint + for (int i=0; i<numNewNodes; i++) { + nextTokens.add(generateBestNextToken()); + } + } + } + + @Override + public synchronized void shrinkingCluster(Set<BigInteger> nodesToRemove) { + currentTokens.remove(nodesToRemove); + } + + @Override + public synchronized void refresh(Set<BigInteger> currentNodes) { + currentTokens.clear(); + currentTokens.addAll(currentNodes); + } + + private List<BigInteger> generateEquidistantTokens(int numTokens) { + List<BigInteger> result = Lists.newArrayList(); + for (int i = 0; i < numTokens; i++) { + BigInteger token = range().multiply(BigInteger.valueOf(i)).divide(BigInteger.valueOf(numTokens)).add(min()); + token = normalize(token.add(origin)); + result.add(token); + } + return result; + } + + private BigInteger normalize(BigInteger input) { + while (input.compareTo(min()) < 0) + input = input.add(range()); + while (input.compareTo(max()) > 0) + input = input.subtract(range()); + return input; + } + + private BigInteger generateBestNextToken() { + List<BigInteger> allTokens = MutableList.<BigInteger>of().appendAll(currentTokens).appendAll(nextTokens); + Collections.sort(allTokens); + Iterator<BigInteger> ti = allTokens.iterator(); + + BigInteger thisValue = ti.next(); + BigInteger prevValue = allTokens.get(allTokens.size()-1).subtract(range()); + + BigInteger bestNewTokenSoFar = normalize(prevValue.add(thisValue).divide(TWO)); + BigInteger biggestRangeSizeSoFar = thisValue.subtract(prevValue); + + while (ti.hasNext()) { + prevValue = thisValue; + thisValue = ti.next(); + + BigInteger rangeHere = thisValue.subtract(prevValue); + if (rangeHere.compareTo(biggestRangeSizeSoFar) > 0) { + bestNewTokenSoFar = prevValue.add(thisValue).divide(TWO); + biggestRangeSizeSoFar = rangeHere; + } + } + return bestNewTokenSoFar; + } + + } + + public static class PosNeg63TokenGenerator extends AbstractTokenGenerator { + private static final long serialVersionUID = 7327403957176106754L; + + public static final BigInteger MIN_TOKEN = TWO.pow(63).negate(); + public static final BigInteger MAX_TOKEN = TWO.pow(63).subtract(BigInteger.ONE); + public static final BigInteger RANGE = TWO.pow(64); + + public PosNeg63TokenGenerator() { + checkRangeValid(); + } + + @Override public BigInteger max() { return MAX_TOKEN; } + @Override public BigInteger min() { return MIN_TOKEN; } + @Override public BigInteger range() { return RANGE; } + } + + /** token generator used by cassandra pre v1.2 */ + public static class NonNeg127TokenGenerator extends AbstractTokenGenerator { + private static final long serialVersionUID = 1357426905711548198L; + + public static final BigInteger MIN_TOKEN = BigInteger.ZERO; + public static final BigInteger MAX_TOKEN = TWO.pow(127).subtract(BigInteger.ONE); + public static final BigInteger RANGE = TWO.pow(127); + + public NonNeg127TokenGenerator() { + checkRangeValid(); + } + + @Override public BigInteger max() { return MAX_TOKEN; } + @Override public BigInteger min() { return MIN_TOKEN; } + @Override public BigInteger range() { return RANGE; } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseCluster.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseCluster.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseCluster.java new file mode 100644 index 0000000..b009485 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseCluster.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.couchbase; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.brooklyn.catalog.Catalog; +import brooklyn.config.ConfigKey; +import brooklyn.entity.Entity; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.group.DynamicCluster; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.Sensors; +import brooklyn.util.flags.SetFromFlag; +import brooklyn.util.time.Duration; + +import com.google.common.reflect.TypeToken; + +@Catalog(name="CouchBase Cluster", description="Couchbase is an open source, distributed (shared-nothing architecture) " + + "NoSQL document-oriented database that is optimized for interactive applications.") +@ImplementedBy(CouchbaseClusterImpl.class) +public interface CouchbaseCluster extends DynamicCluster { + + AttributeSensor<Integer> ACTUAL_CLUSTER_SIZE = Sensors.newIntegerSensor("coucbase.cluster.actualClusterSize", "returns the actual number of nodes in the cluster"); + + @SuppressWarnings("serial") + AttributeSensor<Set<Entity>> COUCHBASE_CLUSTER_UP_NODES = Sensors.newSensor(new TypeToken<Set<Entity>>() { + }, "couchbase.cluster.clusterEntities", "the set of service up nodes"); + + @SuppressWarnings("serial") + AttributeSensor<List<String>> COUCHBASE_CLUSTER_BUCKETS = Sensors.newSensor(new TypeToken<List<String>>() { + }, "couchbase.cluster.buckets", "Names of all the buckets the couchbase cluster"); + + AttributeSensor<Entity> COUCHBASE_PRIMARY_NODE = Sensors.newSensor(Entity.class, "couchbase.cluster.primaryNode", "The primary couchbase node to query and issue add-server and rebalance on"); + + AttributeSensor<Boolean> IS_CLUSTER_INITIALIZED = Sensors.newBooleanSensor("couchbase.cluster.isClusterInitialized", "flag to emit if the couchbase cluster was intialized"); + + @SetFromFlag("clusterName") + ConfigKey<String> CLUSTER_NAME = ConfigKeys.newStringConfigKey("couchbase.cluster.name", "Optional name for this cluster"); + + @SetFromFlag("intialQuorumSize") + ConfigKey<Integer> INITIAL_QUORUM_SIZE = ConfigKeys.newIntegerConfigKey("couchbase.cluster.intialQuorumSize", "Initial cluster quorum size - number of initial nodes that must have been successfully started to report success (if < 0, then use value of INITIAL_SIZE)", + -1); + + @SetFromFlag("delayBeforeAdvertisingCluster") + ConfigKey<Duration> DELAY_BEFORE_ADVERTISING_CLUSTER = ConfigKeys.newConfigKey(Duration.class, "couchbase.cluster.delayBeforeAdvertisingCluster", "Delay after cluster is started before checking and advertising its availability", Duration.TEN_SECONDS); + + // TODO not sure if this is needed; previously waited 3m (SERVICE_UP_TIME_OUT) but that seems absurdly long + @SetFromFlag("postStartStabilizationDelay") + ConfigKey<Duration> NODES_STARTED_STABILIZATION_DELAY = ConfigKeys.newConfigKey(Duration.class, "couchbase.cluster.postStartStabilizationDelay", "Delay after nodes have been started before treating it as a cluster", Duration.TEN_SECONDS); + + @SetFromFlag("adminUsername") + ConfigKey<String> COUCHBASE_ADMIN_USERNAME = CouchbaseNode.COUCHBASE_ADMIN_USERNAME; + + @SetFromFlag("adminPassword") + ConfigKey<String> COUCHBASE_ADMIN_PASSWORD = CouchbaseNode.COUCHBASE_ADMIN_PASSWORD; + + @SuppressWarnings("serial") + AttributeSensor<List<String>> COUCHBASE_CLUSTER_UP_NODE_ADDRESSES = Sensors.newSensor(new TypeToken<List<String>>() {}, + "couchbase.cluster.node.addresses", "List of host:port of all active nodes in the cluster (http admin port, and public hostname/IP)"); + AttributeSensor<String> COUCHBASE_CLUSTER_CONNECTION_URL = Sensors.newStringSensor( + "couchbase.cluster.connection.url", "Couchbase-style URL to connect to the cluster (e.g. http://127.0.0.1:8091/ or couchbase://10.0.0.1,10.0.0.2/)"); + + // Interesting stats + AttributeSensor<Double> OPS_PER_NODE = Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.ops", + "Average across cluster for pools/nodes/<current node>/interestingStats/ops"); + AttributeSensor<Double> EP_BG_FETCHED_PER_NODE = Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.ep.bg.fetched", + "Average across cluster for pools/nodes/<current node>/interestingStats/ep_bg_fetched"); + AttributeSensor<Double> CURR_ITEMS_PER_NODE = Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.curr.items", + "Average across cluster for pools/nodes/<current node>/interestingStats/curr_items"); + AttributeSensor<Double> VB_REPLICA_CURR_ITEMS_PER_NODE = Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.vb.replica.curr.items", + "Average across cluster for pools/nodes/<current node>/interestingStats/vb_replica_curr_items"); + AttributeSensor<Double> GET_HITS_PER_NODE = Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.get.hits", + "Average across cluster for pools/nodes/<current node>/interestingStats/get_hits"); + AttributeSensor<Double> CMD_GET_PER_NODE = Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.cmd.get", + "Average across cluster for pools/nodes/<current node>/interestingStats/cmd_get"); + AttributeSensor<Double> CURR_ITEMS_TOT_PER_NODE = Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.curr.items.tot", + "Average across cluster for pools/nodes/<current node>/interestingStats/curr_items_tot"); + // Although these are Double (after aggregation), they need to be coerced to Long for ByteSizeStrings rendering + AttributeSensor<Long> COUCH_DOCS_DATA_SIZE_PER_NODE = Sensors.newLongSensor("couchbase.stats.cluster.per.node.couch.docs.data.size", + "Average across cluster for pools/nodes/<current node>/interestingStats/couch_docs_data_size"); + AttributeSensor<Long> MEM_USED_PER_NODE = Sensors.newLongSensor("couchbase.stats.cluster.per.node.mem.used", + "Average across cluster for pools/nodes/<current node>/interestingStats/mem_used"); + AttributeSensor<Long> COUCH_VIEWS_ACTUAL_DISK_SIZE_PER_NODE = Sensors.newLongSensor("couchbase.stats.cluster.per.node.couch.views.actual.disk.size", + "Average across cluster for pools/nodes/<current node>/interestingStats/couch_views_actual_disk_size"); + AttributeSensor<Long> COUCH_DOCS_ACTUAL_DISK_SIZE_PER_NODE = Sensors.newLongSensor("couchbase.stats.cluster.per.node.couch.docs.actual.disk.size", + "Average across cluster for pools/nodes/<current node>/interestingStats/couch_docs_actual_disk_size"); + AttributeSensor<Long> COUCH_VIEWS_DATA_SIZE_PER_NODE = Sensors.newLongSensor("couchbase.stats.cluster.per.node.couch.views.data.size", + "Average across cluster for pools/nodes/<current node>/interestingStats/couch_views_data_size"); + + AttributeSensor<Boolean> BUCKET_CREATION_IN_PROGRESS = Sensors.newBooleanSensor("couchbase.cluster.bucketCreationInProgress", "Indicates that a bucket is currently being created, and" + + "further bucket creation should be deferred"); + + /** + * createBuckets is a list of all the buckets to be created on the couchbase cluster + * the buckets will be created on the primary node of the cluster + * each map entry for a bucket should contain the following parameters: + * - <"bucket",(String) name of the bucket (default: default)> + * - <"bucket-type",(String) name of bucket type (default: couchbase)> + * - <"bucket-port",(Integer) the bucket port to connect to (default: 11222)> + * - <"bucket-ramsize",(Integer) ram size allowed for bucket (default: 200)> + * - <"bucket-replica",(Integer) number of replicas for the bucket (default: 1)> + */ + @SuppressWarnings("serial") + @SetFromFlag("createBuckets") + ConfigKey<List<Map<String, Object>>> CREATE_BUCKETS = ConfigKeys.newConfigKey(new TypeToken<List<Map<String, Object>>>() {}, + "couchbase.cluster.createBuckets", "a list of all dedicated port buckets to be created on the couchbase cluster"); + + @SuppressWarnings("serial") + @SetFromFlag("replication") + ConfigKey<List<Map<String,Object>>> REPLICATION = ConfigKeys.newConfigKey(new TypeToken<List<Map<String,Object>>>() {}, + "couchbase.cluster.replicationConfiguration", "List of replication rules to configure, each rule including target (id of another cluster) and mode (unidirectional or bidirectional)"); + + int getQuorumSize(); +}
