http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java deleted file mode 100644 index 5c7b8fd..0000000 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java +++ /dev/null @@ -1,594 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.cassandra; - -import java.io.IOException; -import java.lang.reflect.Method; -import java.math.BigInteger; -import java.net.Socket; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import javax.annotation.Nullable; -import javax.management.ObjectName; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.enricher.RollingTimeWindowMeanEnricher; -import brooklyn.enricher.TimeWeightedDeltaEnricher; -import brooklyn.entity.Entity; -import brooklyn.entity.basic.Attributes; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.basic.SoftwareProcessImpl; -import brooklyn.entity.effector.EffectorBody; -import brooklyn.entity.java.JavaAppUtils; -import brooklyn.event.AttributeSensor; -import brooklyn.event.basic.DependentConfiguration; -import brooklyn.event.basic.Sensors; -import brooklyn.event.feed.function.FunctionFeed; -import brooklyn.event.feed.function.FunctionPollConfig; -import brooklyn.event.feed.jmx.JmxAttributePollConfig; -import brooklyn.event.feed.jmx.JmxFeed; -import brooklyn.event.feed.jmx.JmxHelper; -import brooklyn.event.feed.jmx.JmxOperationPollConfig; -import brooklyn.location.MachineLocation; -import brooklyn.location.MachineProvisioningLocation; -import brooklyn.location.basic.Machines; -import brooklyn.location.cloud.CloudLocationConfig; -import brooklyn.util.collections.MutableSet; -import brooklyn.util.config.ConfigBag; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.guava.Maybe; -import brooklyn.util.text.Strings; -import brooklyn.util.text.TemplateProcessor; -import brooklyn.util.time.Duration; - -import com.google.common.base.Function; -import com.google.common.base.Functions; -import com.google.common.base.Joiner; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.base.Splitter; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -/** - * Implementation of {@link CassandraNode}. - */ -public class CassandraNodeImpl extends SoftwareProcessImpl implements CassandraNode { - - private static final Logger log = LoggerFactory.getLogger(CassandraNodeImpl.class); - - private final AtomicReference<Boolean> detectedCloudSensors = new AtomicReference<Boolean>(false); - - public CassandraNodeImpl() { - } - - @Override - public void init() { - super.init(); - - getMutableEntityType().addEffector(EXECUTE_SCRIPT, new EffectorBody<String>() { - @Override - public String call(ConfigBag parameters) { - return executeScript((String)parameters.getStringKey("commands")); - } - }); - - Entities.checkRequiredUrl(this, getCassandraConfigTemplateUrl()); - Entities.getRequiredUrlConfig(this, CASSANDRA_RACKDC_CONFIG_TEMPLATE_URL); - - connectEnrichers(); - } - - /** - * Some clouds (e.g. Rackspace) give us VMs that have two nics: one for private and one for public. - * If the private IP is used then it doesn't work, even for a cluster purely internal to Rackspace! - * - * TODO Ugly. Need to understand more and find a better fix. Perhaps in Cassandra itself if necessary. - * Also need to investigate further: - * - does it still fail if BroadcastAddress is set to private IP? - * - is `openIptables` opening it up for both interfaces? - * - for aws->rackspace comms between nodes (thus using the public IP), will it be listening on an accessible port? - * - ideally do a check, open a server on one port on the machine, see if it is contactable on the public address; - * and set that as a flag on the cloud - */ - protected void setCloudPreferredSensorNames() { - if (detectedCloudSensors.get()) return; - synchronized (detectedCloudSensors) { - if (detectedCloudSensors.get()) return; - - MachineProvisioningLocation<?> loc = getProvisioningLocation(); - if (loc != null) { - try { - Method method = loc.getClass().getMethod("getProvider"); - method.setAccessible(true); - String provider = (String) method.invoke(loc); - String result = "(nothing special)"; - if (provider!=null) { - if (provider.contains("rackspace") || provider.contains("cloudservers") || provider.contains("softlayer")) { - /* These clouds have 2 NICs and it has to be consistent, so use public IP here to allow external access; - * (TODO internal access could be configured to improve performance / lower cost, - * if we know all nodes are visible to each other) */ - if (getConfig(LISTEN_ADDRESS_SENSOR)==null) - setConfig(LISTEN_ADDRESS_SENSOR, CassandraNode.ADDRESS.getName()); - if (getConfig(BROADCAST_ADDRESS_SENSOR)==null) - setConfig(BROADCAST_ADDRESS_SENSOR, CassandraNode.ADDRESS.getName()); - result = "public IP for both listen and broadcast"; - } else if (provider.contains("google-compute")) { - /* Google nodes cannot reach themselves/each-other on the public IP, - * and there is no hostname, so use private IP here */ - if (getConfig(LISTEN_ADDRESS_SENSOR)==null) - setConfig(LISTEN_ADDRESS_SENSOR, CassandraNode.SUBNET_HOSTNAME.getName()); - if (getConfig(BROADCAST_ADDRESS_SENSOR)==null) - setConfig(BROADCAST_ADDRESS_SENSOR, CassandraNode.SUBNET_HOSTNAME.getName()); - result = "private IP for both listen and broadcast"; - } - } - log.debug("Cassandra NICs inferred {} for {}; using location {}, based on provider {}", new Object[] {result, this, loc, provider}); - } catch (Exception e) { - log.debug("Cassandra NICs auto-detection failed for {} in location {}: {}", new Object[] {this, loc, e}); - } - } - detectedCloudSensors.set(true); - } - } - - @Override - protected void preStart() { - super.preStart(); - setCloudPreferredSensorNames(); - } - - // Used for freemarker - public String getMajorMinorVersion() { - String version = getConfig(CassandraNode.SUGGESTED_VERSION); - if (Strings.isBlank(version)) return ""; - List<String> versionParts = ImmutableList.copyOf(Splitter.on(".").split(version)); - return versionParts.get(0) + (versionParts.size() > 1 ? "."+versionParts.get(1) : ""); - } - - public String getCassandraConfigTemplateUrl() { - String templatedUrl = getConfig(CassandraNode.CASSANDRA_CONFIG_TEMPLATE_URL); - return TemplateProcessor.processTemplateContents(templatedUrl, this, ImmutableMap.<String, Object>of()); - } - - @Override public Integer getGossipPort() { return getAttribute(CassandraNode.GOSSIP_PORT); } - @Override public Integer getSslGossipPort() { return getAttribute(CassandraNode.SSL_GOSSIP_PORT); } - @Override public Integer getThriftPort() { return getAttribute(CassandraNode.THRIFT_PORT); } - @Override public Integer getNativeTransportPort() { return getAttribute(CassandraNode.NATIVE_TRANSPORT_PORT); } - @Override public String getClusterName() { return getAttribute(CassandraNode.CLUSTER_NAME); } - - @Override public int getNumTokensPerNode() { - return getConfig(CassandraNode.NUM_TOKENS_PER_NODE); - } - - @Deprecated - @Override public BigInteger getToken() { - BigInteger token = getAttribute(CassandraNode.TOKEN); - if (token == null) { - token = getConfig(CassandraNode.TOKEN); - } - return token; - } - - @Override public Set<BigInteger> getTokens() { - // Prefer an already-set attribute over the config. - // Prefer TOKENS over TOKEN. - Set<BigInteger> tokens = getAttribute(CassandraNode.TOKENS); - if (tokens == null) { - BigInteger token = getAttribute(CassandraNode.TOKEN); - if (token != null) { - tokens = ImmutableSet.of(token); - } - } - if (tokens == null) { - tokens = getConfig(CassandraNode.TOKENS); - } - if (tokens == null) { - BigInteger token = getConfig(CassandraNode.TOKEN); - if (token != null) { - tokens = ImmutableSet.of(token); - } - } - return tokens; - } - - @Deprecated - @Override public String getTokenAsString() { - BigInteger token = getToken(); - if (token==null) return ""; - return ""+token; - } - - @Override public String getTokensAsString() { - // TODO check what is required when replacing failed node. - // with vnodes in Cassandra 2.x, don't bother supplying token - Set<BigInteger> tokens = getTokens(); - if (tokens == null) return ""; - return Joiner.on(",").join(tokens); - } - - @Override public String getListenAddress() { - String sensorName = getConfig(LISTEN_ADDRESS_SENSOR); - if (Strings.isNonBlank(sensorName)) - return Entities.submit(this, DependentConfiguration.attributeWhenReady(this, Sensors.newStringSensor(sensorName))).getUnchecked(); - - String subnetAddress = getAttribute(CassandraNode.SUBNET_ADDRESS); - return Strings.isNonBlank(subnetAddress) ? subnetAddress : getAttribute(CassandraNode.ADDRESS); - } - @Override public String getBroadcastAddress() { - String sensorName = getConfig(BROADCAST_ADDRESS_SENSOR); - if (Strings.isNonBlank(sensorName)) - return Entities.submit(this, DependentConfiguration.attributeWhenReady(this, Sensors.newStringSensor(sensorName))).getUnchecked(); - - String snitchName = getConfig(CassandraNode.ENDPOINT_SNITCH_NAME); - if (snitchName.equals("Ec2MultiRegionSnitch") || snitchName.contains("MultiCloudSnitch")) { - // http://www.datastax.com/documentation/cassandra/2.0/mobile/cassandra/architecture/architectureSnitchEC2MultiRegion_c.html - // describes that the listen_address is set to the private IP, and the broadcast_address is set to the public IP. - return getAttribute(CassandraNode.ADDRESS); - } else if (!getDriver().isClustered()) { - return getListenAddress(); - } else { - // In other situations, prefer the hostname, so other regions can see it - // *Unless* hostname resolves at the target to a local-only interface which is different to ADDRESS - // (workaround for issue deploying to localhost) - String hostname = getAttribute(CassandraNode.HOSTNAME); - try { - String resolvedAddress = getDriver().getResolvedAddress(hostname); - if (resolvedAddress==null) { - log.debug("Cassandra using broadcast address "+getListenAddress()+" for "+this+" because hostname "+hostname+" could not be resolved at remote machine"); - return getListenAddress(); - } - if (resolvedAddress.equals("127.0.0.1")) { - log.debug("Cassandra using broadcast address "+getListenAddress()+" for "+this+" because hostname "+hostname+" resolves to 127.0.0.1"); - return getListenAddress(); - } - return hostname; - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - log.warn("Error resolving hostname "+hostname+" for "+this+": "+e, e); - return hostname; - } - } - } - /** not always the private IP, if public IP has been insisted on for broadcast, e.g. setting up a rack topology */ - // have not confirmed this does the right thing in all clouds ... only used for rack topology however - public String getPrivateIp() { - String sensorName = getConfig(BROADCAST_ADDRESS_SENSOR); - if (Strings.isNonBlank(sensorName)) { - return getAttribute(Sensors.newStringSensor(sensorName)); - } else { - String subnetAddress = getAttribute(CassandraNode.SUBNET_ADDRESS); - return Strings.isNonBlank(subnetAddress) ? subnetAddress : getAttribute(CassandraNode.ADDRESS); - } - } - public String getPublicIp() { - // may need to be something else in google - return getAttribute(CassandraNode.ADDRESS); - } - - @Override public String getRpcAddress() { - String sensorName = getConfig(RPC_ADDRESS_SENSOR); - if (Strings.isNonBlank(sensorName)) - return Entities.submit(this, DependentConfiguration.attributeWhenReady(this, Sensors.newStringSensor(sensorName))).getUnchecked(); - return "0.0.0.0"; - } - - @Override public String getSeeds() { - Set<Entity> seeds = getConfig(CassandraNode.INITIAL_SEEDS); - if (seeds==null) { - log.warn("No seeds available when requested for "+this, new Throwable("source of no Cassandra seeds when requested")); - return null; - } - String snitchName = getConfig(CassandraNode.ENDPOINT_SNITCH_NAME); - MutableSet<String> seedsHostnames = MutableSet.of(); - for (Entity entity : seeds) { - // tried removing ourselves if there are other nodes, but that is a BAD idea! - // blows up with a "java.lang.RuntimeException: No other nodes seen!" - - if (snitchName.equals("Ec2MultiRegionSnitch") || snitchName.contains("MultiCloudSnitch")) { - // http://www.datastax.com/documentation/cassandra/2.0/mobile/cassandra/architecture/architectureSnitchEC2MultiRegion_c.html - // says the seeds should be public IPs. - seedsHostnames.add(entity.getAttribute(CassandraNode.ADDRESS)); - } else { - String sensorName = getConfig(BROADCAST_ADDRESS_SENSOR); - if (Strings.isNonBlank(sensorName)) { - seedsHostnames.add(entity.getAttribute(Sensors.newStringSensor(sensorName))); - } else { - Maybe<String> optionalSeedHostname = Machines.findSubnetOrPublicHostname(entity); - if (optionalSeedHostname.isPresent()) { - String seedHostname = optionalSeedHostname.get(); - seedsHostnames.add(seedHostname); - } else { - log.warn("In node {}, seed hostname missing for {}; not including in seeds list", this, entity); - } - } - } - } - - String result = Strings.join(seedsHostnames, ","); - log.info("Seeds for {}: {}", this, result); - return result; - } - - // referenced by cassandra-rackdc.properties, read by some of the cassandra snitches - public String getDatacenterName() { - String name = getAttribute(CassandraNode.DATACENTER_NAME); - if (name == null) { - MachineLocation machine = getMachineOrNull(); - MachineProvisioningLocation<?> provisioningLocation = getProvisioningLocation(); - if (machine != null) { - name = machine.getConfig(CloudLocationConfig.CLOUD_REGION_ID); - } - if (name == null && provisioningLocation != null) { - name = provisioningLocation.getConfig(CloudLocationConfig.CLOUD_REGION_ID); - } - if (name == null) { - name = "UNKNOWN_DATACENTER"; - } - setAttribute((AttributeSensor<String>)DATACENTER_NAME, name); - } - return name; - } - - public String getRackName() { - String name = getAttribute(CassandraNode.RACK_NAME); - if (name == null) { - MachineLocation machine = getMachineOrNull(); - MachineProvisioningLocation<?> provisioningLocation = getProvisioningLocation(); - if (machine != null) { - name = machine.getConfig(CloudLocationConfig.CLOUD_AVAILABILITY_ZONE_ID); - } - if (name == null && provisioningLocation != null) { - name = provisioningLocation.getConfig(CloudLocationConfig.CLOUD_AVAILABILITY_ZONE_ID); - } - if (name == null) { - name = "UNKNOWN_RACK"; - } - setAttribute((AttributeSensor<String>)RACK_NAME, name); - } - return name; - } - - @Override - public Class<? extends CassandraNodeDriver> getDriverInterface() { - return CassandraNodeDriver.class; - } - - @Override - public CassandraNodeDriver getDriver() { - return (CassandraNodeDriver) super.getDriver(); - } - - private volatile JmxFeed jmxFeed; - private volatile FunctionFeed functionFeed; - private JmxFeed jmxMxBeanFeed; - private JmxHelper jmxHelper; - private ObjectName storageServiceMBean = JmxHelper.createObjectName("org.apache.cassandra.db:type=StorageService"); - private ObjectName readStageMBean = JmxHelper.createObjectName("org.apache.cassandra.request:type=ReadStage"); - private ObjectName mutationStageMBean = JmxHelper.createObjectName("org.apache.cassandra.request:type=MutationStage"); - private ObjectName snitchMBean = JmxHelper.createObjectName("org.apache.cassandra.db:type=EndpointSnitchInfo"); - - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Override - protected void connectSensors() { - // "cassandra" isn't really a protocol, but okay for now - setAttribute(DATASTORE_URL, "cassandra://"+getAttribute(HOSTNAME)+":"+getAttribute(THRIFT_PORT)); - - super.connectSensors(); - - jmxHelper = new JmxHelper(this); - jmxFeed = JmxFeed.builder() - .entity(this) - .period(3000, TimeUnit.MILLISECONDS) - .helper(jmxHelper) - .pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP_JMX) - .objectName(storageServiceMBean) - .attributeName("Initialized") - .onSuccess(Functions.forPredicate(Predicates.notNull())) - .onException(Functions.constant(false))) - .pollAttribute(new JmxAttributePollConfig<Set<BigInteger>>(TOKENS) - .objectName(storageServiceMBean) - .attributeName("TokenToEndpointMap") - .onSuccess(new Function<Object, Set<BigInteger>>() { - @Override - public Set<BigInteger> apply(@Nullable Object arg) { - Map input = (Map)arg; - if (input == null || input.isEmpty()) return null; - // FIXME does not work on aws-ec2, uses RFC1918 address - Predicate<String> self = Predicates.in(ImmutableList.of(getAttribute(HOSTNAME), getAttribute(ADDRESS), getAttribute(SUBNET_ADDRESS), getAttribute(SUBNET_HOSTNAME))); - Set<String> tokens = Maps.filterValues(input, self).keySet(); - Set<BigInteger> result = Sets.newLinkedHashSet(); - for (String token : tokens) { - result.add(new BigInteger(token)); - } - return result; - }}) - .onException(Functions.<Set<BigInteger>>constant(null))) - .pollAttribute(new JmxAttributePollConfig<BigInteger>(TOKEN) - .objectName(storageServiceMBean) - .attributeName("TokenToEndpointMap") - .onSuccess(new Function<Object, BigInteger>() { - @Override - public BigInteger apply(@Nullable Object arg) { - Map input = (Map)arg; - // TODO remove duplication from setting TOKENS - if (input == null || input.isEmpty()) return null; - // FIXME does not work on aws-ec2, uses RFC1918 address - Predicate<String> self = Predicates.in(ImmutableList.of(getAttribute(HOSTNAME), getAttribute(ADDRESS), getAttribute(SUBNET_ADDRESS), getAttribute(SUBNET_HOSTNAME))); - Set<String> tokens = Maps.filterValues(input, self).keySet(); - String token = Iterables.getFirst(tokens, null); - return (token != null) ? new BigInteger(token) : null; - }}) - .onException(Functions.<BigInteger>constant(null))) - .pollOperation(new JmxOperationPollConfig<String>(DATACENTER_NAME) - .period(60, TimeUnit.SECONDS) - .objectName(snitchMBean) - .operationName("getDatacenter") - .operationParams(ImmutableList.of(getBroadcastAddress())) - .onException(Functions.<String>constant(null))) - .pollOperation(new JmxOperationPollConfig<String>(RACK_NAME) - .period(60, TimeUnit.SECONDS) - .objectName(snitchMBean) - .operationName("getRack") - .operationParams(ImmutableList.of(getBroadcastAddress())) - .onException(Functions.<String>constant(null))) - .pollAttribute(new JmxAttributePollConfig<Integer>(PEERS) - .objectName(storageServiceMBean) - .attributeName("TokenToEndpointMap") - .onSuccess(new Function<Object, Integer>() { - @Override - public Integer apply(@Nullable Object arg) { - Map input = (Map)arg; - if (input == null || input.isEmpty()) return 0; - return input.size(); - } - }) - .onException(Functions.constant(-1))) - .pollAttribute(new JmxAttributePollConfig<Integer>(LIVE_NODE_COUNT) - .objectName(storageServiceMBean) - .attributeName("LiveNodes") - .onSuccess(new Function<Object, Integer>() { - @Override - public Integer apply(@Nullable Object arg) { - List input = (List)arg; - if (input == null || input.isEmpty()) return 0; - return input.size(); - } - }) - .onException(Functions.constant(-1))) - .pollAttribute(new JmxAttributePollConfig<Integer>(READ_ACTIVE) - .objectName(readStageMBean) - .attributeName("ActiveCount") - .onException(Functions.constant((Integer)null))) - .pollAttribute(new JmxAttributePollConfig<Long>(READ_PENDING) - .objectName(readStageMBean) - .attributeName("PendingTasks") - .onException(Functions.constant((Long)null))) - .pollAttribute(new JmxAttributePollConfig<Long>(READ_COMPLETED) - .objectName(readStageMBean) - .attributeName("CompletedTasks") - .onException(Functions.constant((Long)null))) - .pollAttribute(new JmxAttributePollConfig<Integer>(WRITE_ACTIVE) - .objectName(mutationStageMBean) - .attributeName("ActiveCount") - .onException(Functions.constant((Integer)null))) - .pollAttribute(new JmxAttributePollConfig<Long>(WRITE_PENDING) - .objectName(mutationStageMBean) - .attributeName("PendingTasks") - .onException(Functions.constant((Long)null))) - .pollAttribute(new JmxAttributePollConfig<Long>(WRITE_COMPLETED) - .objectName(mutationStageMBean) - .attributeName("CompletedTasks") - .onException(Functions.constant((Long)null))) - .build(); - - functionFeed = FunctionFeed.builder() - .entity(this) - .period(3000, TimeUnit.MILLISECONDS) - .poll(new FunctionPollConfig<Long, Long>(THRIFT_PORT_LATENCY) - .onException(Functions.constant((Long)null)) - .callable(new Callable<Long>() { - public Long call() { - try { - long start = System.currentTimeMillis(); - Socket s = new Socket(getAttribute(Attributes.HOSTNAME), getThriftPort()); - s.close(); - long latency = System.currentTimeMillis() - start; - computeServiceUp(); - return latency; - } catch (Exception e) { - if (log.isDebugEnabled()) - log.debug("Cassandra thrift port poll failure: "+e); - setAttribute(SERVICE_UP, false); - return null; - } - } - public void computeServiceUp() { - // this will wait an additional poll period after thrift port is up, - // as the caller will not have set yet, but that will help ensure it is really healthy! - setAttribute(SERVICE_UP, - getAttribute(THRIFT_PORT_LATENCY)!=null && getAttribute(THRIFT_PORT_LATENCY)>=0 && - Boolean.TRUE.equals(getAttribute(SERVICE_UP_JMX))); - } - })) - .build(); - - jmxMxBeanFeed = JavaAppUtils.connectMXBeanSensors(this); - } - - protected void connectEnrichers() { - connectEnrichers(Duration.TEN_SECONDS); - } - - protected void connectEnrichers(Duration windowPeriod) { - JavaAppUtils.connectJavaAppServerPolicies(this); - - addEnricher(TimeWeightedDeltaEnricher.<Long>getPerSecondDeltaEnricher(this, READ_COMPLETED, READS_PER_SECOND_LAST)); - addEnricher(TimeWeightedDeltaEnricher.<Long>getPerSecondDeltaEnricher(this, WRITE_COMPLETED, WRITES_PER_SECOND_LAST)); - - if (windowPeriod!=null) { - addEnricher(new RollingTimeWindowMeanEnricher<Long>(this, THRIFT_PORT_LATENCY, - THRIFT_PORT_LATENCY_IN_WINDOW, windowPeriod)); - addEnricher(new RollingTimeWindowMeanEnricher<Double>(this, READS_PER_SECOND_LAST, - READS_PER_SECOND_IN_WINDOW, windowPeriod)); - addEnricher(new RollingTimeWindowMeanEnricher<Double>(this, WRITES_PER_SECOND_LAST, - WRITES_PER_SECOND_IN_WINDOW, windowPeriod)); - } - } - - @Override - public void disconnectSensors() { - super.disconnectSensors(); - - if (jmxFeed != null) jmxFeed.stop(); - if (jmxMxBeanFeed != null) jmxMxBeanFeed.stop(); - if (jmxHelper != null) jmxHelper.terminate(); - if (functionFeed != null) functionFeed.stop(); - } - - @Override - public void setToken(String token) { - try { - if (!jmxHelper.isConnected()) jmxHelper.connect();; - jmxHelper.operation(storageServiceMBean, "move", token); - log.info("Moved server {} to token {}", getId(), token); - } catch (IOException ioe) { - Throwables.propagate(ioe); - } - } - - @Override - public String executeScript(String commands) { - return getDriver().executeScriptAsync(commands).block().getStdout(); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraNodeSshDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraNodeSshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraNodeSshDriver.java deleted file mode 100644 index 44651ba..0000000 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraNodeSshDriver.java +++ /dev/null @@ -1,420 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.cassandra; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; - -import brooklyn.entity.Entity; -import brooklyn.entity.basic.Attributes; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.basic.EntityLocal; -import brooklyn.entity.database.DatastoreMixins; -import brooklyn.entity.java.JavaSoftwareProcessSshDriver; -import brooklyn.entity.java.UsesJmx; -import brooklyn.entity.software.SshEffectorTasks; -import brooklyn.event.basic.DependentConfiguration; -import brooklyn.location.Location; -import brooklyn.location.access.BrooklynAccessUtils; -import brooklyn.location.basic.Machines; -import brooklyn.location.basic.SshMachineLocation; -import brooklyn.management.TaskWrapper; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.collections.MutableSet; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.guava.Maybe; -import brooklyn.util.net.Networking; -import brooklyn.util.os.Os; -import brooklyn.util.ssh.BashCommands; -import brooklyn.util.stream.Streams; -import brooklyn.util.task.DynamicTasks; -import brooklyn.util.task.Tasks; -import brooklyn.util.task.system.ProcessTaskWrapper; -import brooklyn.util.text.Identifiers; -import brooklyn.util.text.Strings; -import brooklyn.util.text.TemplateProcessor; -import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; - -/** - * Start a {@link CassandraNode} in a {@link Location} accessible over ssh. - */ -public class CassandraNodeSshDriver extends JavaSoftwareProcessSshDriver implements CassandraNodeDriver { - - private static final Logger log = LoggerFactory.getLogger(CassandraNodeSshDriver.class); - - protected Maybe<String> resolvedAddressCache = Maybe.absent(); - - public CassandraNodeSshDriver(CassandraNodeImpl entity, SshMachineLocation machine) { - super(entity, machine); - } - - @Override - protected String getLogFileLocation() { return Os.mergePathsUnix(getRunDir(),"cassandra.log"); } - - @Override - public Integer getGossipPort() { return entity.getAttribute(CassandraNode.GOSSIP_PORT); } - - @Override - public Integer getSslGossipPort() { return entity.getAttribute(CassandraNode.SSL_GOSSIP_PORT); } - - @Override - public Integer getThriftPort() { return entity.getAttribute(CassandraNode.THRIFT_PORT); } - - @Override - public Integer getNativeTransportPort() { return entity.getAttribute(CassandraNode.NATIVE_TRANSPORT_PORT); } - - @Override - public String getClusterName() { return entity.getAttribute(CassandraNode.CLUSTER_NAME); } - - @Override - public String getCassandraConfigTemplateUrl() { - String templatedUrl = entity.getConfig(CassandraNode.CASSANDRA_CONFIG_TEMPLATE_URL); - return TemplateProcessor.processTemplateContents(templatedUrl, this, ImmutableMap.<String, Object>of()); - } - - @Override - public String getCassandraConfigFileName() { return entity.getConfig(CassandraNode.CASSANDRA_CONFIG_FILE_NAME); } - - public String getEndpointSnitchName() { return entity.getConfig(CassandraNode.ENDPOINT_SNITCH_NAME); } - - public String getCassandraRackdcConfigTemplateUrl() { return entity.getConfig(CassandraNode.CASSANDRA_RACKDC_CONFIG_TEMPLATE_URL); } - - public String getCassandraRackdcConfigFileName() { return entity.getConfig(CassandraNode.CASSANDRA_RACKDC_CONFIG_FILE_NAME); } - - public String getMirrorUrl() { return entity.getConfig(CassandraNode.MIRROR_URL); } - - protected String getDefaultUnpackedDirectoryName() { - return "apache-cassandra-"+getVersion(); - } - - protected boolean isV2() { - String version = getVersion(); - return version.startsWith("2."); - } - - @Override - public boolean installJava() { - if (isV2()) { - return checkForAndInstallJava("1.8"); - } else { - return super.installJava(); - } - } - - @Override - public void preInstall() { - resolver = Entities.newDownloader(this); - setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(getDefaultUnpackedDirectoryName()))); - } - - @Override - public void install() { - List<String> urls = resolver.getTargets(); - String saveAs = resolver.getFilename(); - - List<String> commands = ImmutableList.<String>builder() - .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs)) - .add(BashCommands.INSTALL_TAR) - .add("tar xzfv " + saveAs) - .build(); - - newScript(INSTALLING) - .body.append(commands) - .execute(); - } - - @Override - public Set<Integer> getPortsUsed() { - return ImmutableSet.<Integer>builder() - .addAll(super.getPortsUsed()) - .addAll(getPortMap().values()) - .build(); - } - - protected Map<String, Integer> getPortMap() { - return ImmutableMap.<String, Integer>builder() - .put("jmxPort", entity.getAttribute(UsesJmx.JMX_PORT)) - .put("rmiPort", entity.getAttribute(UsesJmx.RMI_REGISTRY_PORT)) - .put("gossipPort", getGossipPort()) - .put("sslGossipPort", getSslGossipPort()) - .put("thriftPort", getThriftPort()) - .build(); - } - - @Override - public void customize() { - log.debug("Customizing {} (Cluster {})", entity, getClusterName()); - Networking.checkPortsValid(getPortMap()); - - customizeInitialSeeds(); - - String logFileEscaped = getLogFileLocation().replace("/", "\\/"); // escape slashes - - ImmutableList.Builder<String> commands = new ImmutableList.Builder<String>() - .add(String.format("cp -R %s/{bin,conf,lib,interface,pylib,tools} .", getExpandedInstallDir())) - .add("mkdir -p data") - .add("mkdir -p brooklyn_commands") - .add(String.format("sed -i.bk 's/log4j.appender.R.File=.*/log4j.appender.R.File=%s/g' %s/conf/log4j-server.properties", logFileEscaped, getRunDir())) - .add(String.format("sed -i.bk '/JMX_PORT/d' %s/conf/cassandra-env.sh", getRunDir())) - // Script sets 180k on Linux which gives Java error: The stack size specified is too small, Specify at least 228k - .add(String.format("sed -i.bk 's/-Xss180k/-Xss280k/g' %s/conf/cassandra-env.sh", getRunDir())); - - newScript(CUSTOMIZING) - .body.append(commands.build()) - .failOnNonZeroResultCode() - .execute(); - - // Copy the cassandra.yaml configuration file across - String destinationConfigFile = Os.mergePathsUnix(getRunDir(), "conf", getCassandraConfigFileName()); - copyTemplate(getCassandraConfigTemplateUrl(), destinationConfigFile); - - // Copy the cassandra-rackdc.properties configuration file across - String rackdcDestinationFile = Os.mergePathsUnix(getRunDir(), "conf", getCassandraRackdcConfigFileName()); - copyTemplate(getCassandraRackdcConfigTemplateUrl(), rackdcDestinationFile); - - customizeCopySnitch(); - } - - protected void customizeCopySnitch() { - // Copy the custom snitch jar file across - String customSnitchJarUrl = entity.getConfig(CassandraNode.CUSTOM_SNITCH_JAR_URL); - if (Strings.isNonBlank(customSnitchJarUrl)) { - int lastSlashIndex = customSnitchJarUrl.lastIndexOf("/"); - String customSnitchJarName = (lastSlashIndex > 0) ? customSnitchJarUrl.substring(lastSlashIndex+1) : "customBrooklynSnitch.jar"; - String jarDestinationFile = Os.mergePathsUnix(getRunDir(), "lib", customSnitchJarName); - InputStream customSnitchJarStream = checkNotNull(resource.getResourceFromUrl(customSnitchJarUrl), "%s could not be loaded", customSnitchJarUrl); - try { - getMachine().copyTo(customSnitchJarStream, jarDestinationFile); - } finally { - Streams.closeQuietly(customSnitchJarStream); - } - } - } - - protected void customizeInitialSeeds() { - if (entity.getConfig(CassandraNode.INITIAL_SEEDS)==null) { - if (isClustered()) { - entity.setConfig(CassandraNode.INITIAL_SEEDS, - DependentConfiguration.attributeWhenReady(entity.getParent(), CassandraDatacenter.CURRENT_SEEDS)); - } else { - entity.setConfig(CassandraNode.INITIAL_SEEDS, MutableSet.<Entity>of(entity)); - } - } - } - - @Override - public boolean isClustered() { - return entity.getParent() instanceof CassandraDatacenter; - } - - @Override - public void launch() { - String subnetHostname = Machines.findSubnetOrPublicHostname(entity).get(); - Set<Entity> seeds = getEntity().getConfig(CassandraNode.INITIAL_SEEDS); - List<Entity> ancestors = getCassandraAncestors(); - log.info("Launching " + entity + ": " + - "cluster "+getClusterName()+", " + - "hostname (public) " + getEntity().getAttribute(Attributes.HOSTNAME) + ", " + - "hostname (subnet) " + subnetHostname + ", " + - "seeds "+((CassandraNode)entity).getSeeds()+" (from "+seeds+")"); - - boolean isFirst = seeds.iterator().next().equals(entity); - if (isClustered() && !isFirst && CassandraDatacenter.WAIT_FOR_FIRST) { - // wait for the first node - long firstStartTime = Entities.submit(entity, DependentConfiguration.attributeWhenReady( - ancestors.get(ancestors.size()-1), CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC)).getUnchecked(); - // optionally force a delay before starting subsequent nodes; see comment at CassandraCluster.DELAY_AFTER_FIRST - Duration toWait = Duration.millis(firstStartTime + CassandraDatacenter.DELAY_AFTER_FIRST.toMilliseconds() - System.currentTimeMillis()); - if (toWait.toMilliseconds()>0) { - log.info("Launching " + entity + ": delaying launch of non-first node by "+toWait+" to prevent schema disagreements"); - Tasks.setBlockingDetails("Pausing to ensure first node has time to start"); - Time.sleep(toWait); - Tasks.resetBlockingDetails(); - } - } - - List<Entity> queuedStart = null; - if (CassandraDatacenter.DELAY_BETWEEN_STARTS!=null && !ancestors.isEmpty()) { - Entity root = ancestors.get(ancestors.size()-1); - // TODO currently use the class as a semaphore; messy, and obviously will not federate; - // should develop a brooklyn framework semaphore (similar to that done on SshMachineLocation) - // and use it - note however the synch block is very very short so relatively safe at least - synchronized (CassandraNode.class) { - queuedStart = root.getAttribute(CassandraDatacenter.QUEUED_START_NODES); - if (queuedStart==null) { - queuedStart = new ArrayList<Entity>(); - ((EntityLocal)root).setAttribute(CassandraDatacenter.QUEUED_START_NODES, queuedStart); - } - queuedStart.add(getEntity()); - ((EntityLocal)root).setAttribute(CassandraDatacenter.QUEUED_START_NODES, queuedStart); - } - do { - // get it again in case it is backed by something external - queuedStart = root.getAttribute(CassandraDatacenter.QUEUED_START_NODES); - if (queuedStart.get(0).equals(getEntity())) break; - synchronized (queuedStart) { - try { - queuedStart.wait(1000); - } catch (InterruptedException e) { - Exceptions.propagate(e); - } - } - } while (true); - - // TODO should look at last start time... but instead we always wait - CassandraDatacenter.DELAY_BETWEEN_STARTS.countdownTimer().waitForExpiryUnchecked(); - } - - try { - // Relies on `bin/cassandra -p <pidfile>`, rather than us writing pid file ourselves. - newScript(MutableMap.of(USE_PID_FILE, false), LAUNCHING) - .body.append( - // log the date to attempt to debug occasional http://wiki.apache.org/cassandra/FAQ#schema_disagreement - // (can be caused by machines out of synch time-wise; but in our case it seems to be caused by other things!) - "echo date on cassandra server `hostname` when launching is `date`", - launchEssentialCommand(), - "echo after essential command") - .execute(); - if (!isClustered()) { - InputStream creationScript = DatastoreMixins.getDatabaseCreationScript(entity); - if (creationScript!=null) { - Tasks.setBlockingDetails("Pausing to ensure Cassandra (singleton) has started before running creation script"); - Time.sleep(Duration.seconds(20)); - Tasks.resetBlockingDetails(); - executeScriptAsync(Streams.readFullyString(creationScript)); - } - } - if (isClustered() && isFirst) { - for (Entity ancestor: getCassandraAncestors()) { - ((EntityLocal)ancestor).setAttribute(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, System.currentTimeMillis()); - } - } - } finally { - if (queuedStart!=null) { - Entity head = queuedStart.remove(0); - checkArgument(head.equals(getEntity()), "first queued node was "+head+" but we are "+getEntity()); - synchronized (queuedStart) { - queuedStart.notifyAll(); - } - } - } - } - - /** returns cassandra-related ancestors (datacenter, fabric), with datacenter first and fabric last */ - protected List<Entity> getCassandraAncestors() { - List<Entity> result = new ArrayList<Entity>(); - Entity ancestor = getEntity().getParent(); - while (ancestor!=null) { - if (ancestor instanceof CassandraDatacenter || ancestor instanceof CassandraFabric) - result.add(ancestor); - ancestor = ancestor.getParent(); - } - return result; - } - - protected String launchEssentialCommand() { - if (isV2()) { - return String.format("./bin/cassandra -p %s > ./cassandra-console.log 2>&1", getPidFile()); - } else { - // TODO Could probably get rid of the nohup here, as script does equivalent itself - // with `exec ... <&- &` - return String.format("nohup ./bin/cassandra -p %s > ./cassandra-console.log 2>&1 &", getPidFile()); - } - } - - public String getPidFile() { return Os.mergePathsUnix(getRunDir(), "cassandra.pid"); } - - @Override - public boolean isRunning() { - return newScript(MutableMap.of(USE_PID_FILE, getPidFile()), CHECK_RUNNING).execute() == 0; - } - - @Override - public void stop() { - newScript(MutableMap.of(USE_PID_FILE, getPidFile()), STOPPING).execute(); - } - - @SuppressWarnings("unchecked") - @Override - protected Map<String,String> getCustomJavaSystemProperties() { - return MutableMap.<String, String>builder() - .putAll(super.getCustomJavaSystemProperties()) - .put("cassandra.config", getCassandraConfigFileName()) - .build(); - } - - @Override - public Map<String, String> getShellEnvironment() { - return MutableMap.<String, String>builder() - .putAll(super.getShellEnvironment()) - .put("CASSANDRA_HOME", getRunDir()) - .put("CASSANDRA_CONF", Os.mergePathsUnix(getRunDir(), "conf")) - .renameKey("JAVA_OPTS", "JVM_OPTS") - .build(); - } - - @Override - public ProcessTaskWrapper<Integer> executeScriptAsync(String commands) { - String fileToRun = Os.mergePathsUnix("brooklyn_commands", "cassandra-commands-"+Identifiers.makeRandomId(8)); - TaskWrapper<Void> task = SshEffectorTasks.put(Os.mergePathsUnix(getRunDir(), fileToRun)) - .machine(getMachine()) - .contents(commands) - .summary("copying cassandra script to execute "+fileToRun) - .newTask(); - DynamicTasks.queueIfPossible(task).orSubmitAndBlock(getEntity()).andWaitForSuccess(); - return executeScriptFromInstalledFileAsync(fileToRun); - } - - public ProcessTaskWrapper<Integer> executeScriptFromInstalledFileAsync(String fileToRun) { - ProcessTaskWrapper<Integer> task = SshEffectorTasks.ssh( - "cd "+getRunDir(), - scriptInvocationCommand(getThriftPort(), fileToRun)) - .machine(getMachine()) - .summary("executing cassandra script "+fileToRun) - .newTask(); - DynamicTasks.queueIfPossible(task).orSubmitAndBlock(getEntity()); - return task; - } - - protected String scriptInvocationCommand(Integer optionalThriftPort, String fileToRun) { - return "bin/cassandra-cli " + - (optionalThriftPort != null ? "--port " + optionalThriftPort : "") + - " --file "+fileToRun; - } - - @Override - public String getResolvedAddress(String hostname) { - return resolvedAddressCache.or(BrooklynAccessUtils.resolvedAddressSupplier(getEntity(), getMachine(), hostname)); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/TokenGenerator.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/TokenGenerator.java b/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/TokenGenerator.java deleted file mode 100644 index 8503ad7..0000000 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/TokenGenerator.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.cassandra; - -import java.math.BigInteger; -import java.util.Set; - -public interface TokenGenerator { - - BigInteger max(); - BigInteger min(); - BigInteger range(); - - void setOrigin(BigInteger shift); - - BigInteger newToken(); - - BigInteger getTokenForReplacementNode(BigInteger oldToken); - - Set<BigInteger> getTokensForReplacementNode(Set<BigInteger> oldTokens); - - /** - * Indicates that we are starting a new cluster of the given number of nodes, - * so expect that number of consecutive calls to {@link #newToken()}. - * - * @param numNewNodes - */ - void growingCluster(int numNewNodes); - - void shrinkingCluster(Set<BigInteger> nodesToRemove); - - void refresh(Set<BigInteger> currentNodes); -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/TokenGenerators.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/TokenGenerators.java b/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/TokenGenerators.java deleted file mode 100644 index 70dd0f6..0000000 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/TokenGenerators.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.cassandra; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.Serializable; -import java.math.BigInteger; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - -import brooklyn.util.collections.MutableList; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -public class TokenGenerators { - - /** - * Sub-classes are recommended to call {@link #checkRangeValid()} at construction time. - */ - public static abstract class AbstractTokenGenerator implements TokenGenerator, Serializable { - - private static final long serialVersionUID = -1884526356161711176L; - - public static final BigInteger TWO = BigInteger.valueOf(2); - - public abstract BigInteger max(); - public abstract BigInteger min(); - public abstract BigInteger range(); - - private final Set<BigInteger> currentTokens = Sets.newTreeSet(); - private final List<BigInteger> nextTokens = Lists.newArrayList(); - private BigInteger origin = BigInteger.ZERO; - - protected void checkRangeValid() { - Preconditions.checkState(range().equals(max().subtract(min()).add(BigInteger.ONE)), - "min=%s; max=%s; range=%s", min(), max(), range()); - } - - @Override - public void setOrigin(BigInteger shift) { - this.origin = Preconditions.checkNotNull(shift, "shift"); - } - - /** - * Unless we're explicitly starting a new cluster or resizing by a pre-defined number of nodes, then - * let Cassandra decide (i.e. return null). - */ - @Override - public synchronized BigInteger newToken() { - BigInteger result = (nextTokens.isEmpty()) ? null : nextTokens.remove(0); - if (result != null) currentTokens.add(result); - return result; - } - - @Override - public synchronized BigInteger getTokenForReplacementNode(BigInteger oldToken) { - checkNotNull(oldToken, "oldToken"); - return normalize(oldToken.subtract(BigInteger.ONE)); - } - - @Override - public synchronized Set<BigInteger> getTokensForReplacementNode(Set<BigInteger> oldTokens) { - checkNotNull(oldTokens, "oldToken"); - Set<BigInteger> result = Sets.newLinkedHashSet(); - for (BigInteger oldToken : oldTokens) { - result.add(getTokenForReplacementNode(oldToken)); - } - return result; - } - - @Override - public synchronized void growingCluster(int numNewNodes) { - if (currentTokens.isEmpty() && nextTokens.isEmpty()) { - nextTokens.addAll(generateEquidistantTokens(numNewNodes)); - } else { - // simple strategy which iteratively finds best midpoint - for (int i=0; i<numNewNodes; i++) { - nextTokens.add(generateBestNextToken()); - } - } - } - - @Override - public synchronized void shrinkingCluster(Set<BigInteger> nodesToRemove) { - currentTokens.remove(nodesToRemove); - } - - @Override - public synchronized void refresh(Set<BigInteger> currentNodes) { - currentTokens.clear(); - currentTokens.addAll(currentNodes); - } - - private List<BigInteger> generateEquidistantTokens(int numTokens) { - List<BigInteger> result = Lists.newArrayList(); - for (int i = 0; i < numTokens; i++) { - BigInteger token = range().multiply(BigInteger.valueOf(i)).divide(BigInteger.valueOf(numTokens)).add(min()); - token = normalize(token.add(origin)); - result.add(token); - } - return result; - } - - private BigInteger normalize(BigInteger input) { - while (input.compareTo(min()) < 0) - input = input.add(range()); - while (input.compareTo(max()) > 0) - input = input.subtract(range()); - return input; - } - - private BigInteger generateBestNextToken() { - List<BigInteger> allTokens = MutableList.<BigInteger>of().appendAll(currentTokens).appendAll(nextTokens); - Collections.sort(allTokens); - Iterator<BigInteger> ti = allTokens.iterator(); - - BigInteger thisValue = ti.next(); - BigInteger prevValue = allTokens.get(allTokens.size()-1).subtract(range()); - - BigInteger bestNewTokenSoFar = normalize(prevValue.add(thisValue).divide(TWO)); - BigInteger biggestRangeSizeSoFar = thisValue.subtract(prevValue); - - while (ti.hasNext()) { - prevValue = thisValue; - thisValue = ti.next(); - - BigInteger rangeHere = thisValue.subtract(prevValue); - if (rangeHere.compareTo(biggestRangeSizeSoFar) > 0) { - bestNewTokenSoFar = prevValue.add(thisValue).divide(TWO); - biggestRangeSizeSoFar = rangeHere; - } - } - return bestNewTokenSoFar; - } - - } - - public static class PosNeg63TokenGenerator extends AbstractTokenGenerator { - private static final long serialVersionUID = 7327403957176106754L; - - public static final BigInteger MIN_TOKEN = TWO.pow(63).negate(); - public static final BigInteger MAX_TOKEN = TWO.pow(63).subtract(BigInteger.ONE); - public static final BigInteger RANGE = TWO.pow(64); - - public PosNeg63TokenGenerator() { - checkRangeValid(); - } - - @Override public BigInteger max() { return MAX_TOKEN; } - @Override public BigInteger min() { return MIN_TOKEN; } - @Override public BigInteger range() { return RANGE; } - } - - /** token generator used by cassandra pre v1.2 */ - public static class NonNeg127TokenGenerator extends AbstractTokenGenerator { - private static final long serialVersionUID = 1357426905711548198L; - - public static final BigInteger MIN_TOKEN = BigInteger.ZERO; - public static final BigInteger MAX_TOKEN = TWO.pow(127).subtract(BigInteger.ONE); - public static final BigInteger RANGE = TWO.pow(127); - - public NonNeg127TokenGenerator() { - checkRangeValid(); - } - - @Override public BigInteger max() { return MAX_TOKEN; } - @Override public BigInteger min() { return MIN_TOKEN; } - @Override public BigInteger range() { return RANGE; } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseCluster.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseCluster.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseCluster.java deleted file mode 100644 index e824b71..0000000 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseCluster.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.couchbase; - -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.brooklyn.catalog.Catalog; -import brooklyn.config.ConfigKey; -import brooklyn.entity.Entity; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.entity.group.DynamicCluster; -import brooklyn.entity.proxying.ImplementedBy; -import brooklyn.event.AttributeSensor; -import brooklyn.event.basic.Sensors; -import brooklyn.util.flags.SetFromFlag; -import brooklyn.util.time.Duration; - -import com.google.common.reflect.TypeToken; - -@Catalog(name="CouchBase Cluster", description="Couchbase is an open source, distributed (shared-nothing architecture) " - + "NoSQL document-oriented database that is optimized for interactive applications.") -@ImplementedBy(CouchbaseClusterImpl.class) -public interface CouchbaseCluster extends DynamicCluster { - - AttributeSensor<Integer> ACTUAL_CLUSTER_SIZE = Sensors.newIntegerSensor("coucbase.cluster.actualClusterSize", "returns the actual number of nodes in the cluster"); - - @SuppressWarnings("serial") - AttributeSensor<Set<Entity>> COUCHBASE_CLUSTER_UP_NODES = Sensors.newSensor(new TypeToken<Set<Entity>>() { - }, "couchbase.cluster.clusterEntities", "the set of service up nodes"); - - @SuppressWarnings("serial") - AttributeSensor<List<String>> COUCHBASE_CLUSTER_BUCKETS = Sensors.newSensor(new TypeToken<List<String>>() { - }, "couchbase.cluster.buckets", "Names of all the buckets the couchbase cluster"); - - AttributeSensor<Entity> COUCHBASE_PRIMARY_NODE = Sensors.newSensor(Entity.class, "couchbase.cluster.primaryNode", "The primary couchbase node to query and issue add-server and rebalance on"); - - AttributeSensor<Boolean> IS_CLUSTER_INITIALIZED = Sensors.newBooleanSensor("couchbase.cluster.isClusterInitialized", "flag to emit if the couchbase cluster was intialized"); - - @SetFromFlag("clusterName") - ConfigKey<String> CLUSTER_NAME = ConfigKeys.newStringConfigKey("couchbase.cluster.name", "Optional name for this cluster"); - - @SetFromFlag("intialQuorumSize") - ConfigKey<Integer> INITIAL_QUORUM_SIZE = ConfigKeys.newIntegerConfigKey("couchbase.cluster.intialQuorumSize", "Initial cluster quorum size - number of initial nodes that must have been successfully started to report success (if < 0, then use value of INITIAL_SIZE)", - -1); - - @SetFromFlag("delayBeforeAdvertisingCluster") - ConfigKey<Duration> DELAY_BEFORE_ADVERTISING_CLUSTER = ConfigKeys.newConfigKey(Duration.class, "couchbase.cluster.delayBeforeAdvertisingCluster", "Delay after cluster is started before checking and advertising its availability", Duration.TEN_SECONDS); - - // TODO not sure if this is needed; previously waited 3m (SERVICE_UP_TIME_OUT) but that seems absurdly long - @SetFromFlag("postStartStabilizationDelay") - ConfigKey<Duration> NODES_STARTED_STABILIZATION_DELAY = ConfigKeys.newConfigKey(Duration.class, "couchbase.cluster.postStartStabilizationDelay", "Delay after nodes have been started before treating it as a cluster", Duration.TEN_SECONDS); - - @SetFromFlag("adminUsername") - ConfigKey<String> COUCHBASE_ADMIN_USERNAME = CouchbaseNode.COUCHBASE_ADMIN_USERNAME; - - @SetFromFlag("adminPassword") - ConfigKey<String> COUCHBASE_ADMIN_PASSWORD = CouchbaseNode.COUCHBASE_ADMIN_PASSWORD; - - @SuppressWarnings("serial") - AttributeSensor<List<String>> COUCHBASE_CLUSTER_UP_NODE_ADDRESSES = Sensors.newSensor(new TypeToken<List<String>>() {}, - "couchbase.cluster.node.addresses", "List of host:port of all active nodes in the cluster (http admin port, and public hostname/IP)"); - AttributeSensor<String> COUCHBASE_CLUSTER_CONNECTION_URL = Sensors.newStringSensor( - "couchbase.cluster.connection.url", "Couchbase-style URL to connect to the cluster (e.g. http://127.0.0.1:8091/ or couchbase://10.0.0.1,10.0.0.2/)"); - - // Interesting stats - AttributeSensor<Double> OPS_PER_NODE = Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.ops", - "Average across cluster for pools/nodes/<current node>/interestingStats/ops"); - AttributeSensor<Double> EP_BG_FETCHED_PER_NODE = Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.ep.bg.fetched", - "Average across cluster for pools/nodes/<current node>/interestingStats/ep_bg_fetched"); - AttributeSensor<Double> CURR_ITEMS_PER_NODE = Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.curr.items", - "Average across cluster for pools/nodes/<current node>/interestingStats/curr_items"); - AttributeSensor<Double> VB_REPLICA_CURR_ITEMS_PER_NODE = Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.vb.replica.curr.items", - "Average across cluster for pools/nodes/<current node>/interestingStats/vb_replica_curr_items"); - AttributeSensor<Double> GET_HITS_PER_NODE = Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.get.hits", - "Average across cluster for pools/nodes/<current node>/interestingStats/get_hits"); - AttributeSensor<Double> CMD_GET_PER_NODE = Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.cmd.get", - "Average across cluster for pools/nodes/<current node>/interestingStats/cmd_get"); - AttributeSensor<Double> CURR_ITEMS_TOT_PER_NODE = Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.curr.items.tot", - "Average across cluster for pools/nodes/<current node>/interestingStats/curr_items_tot"); - // Although these are Double (after aggregation), they need to be coerced to Long for ByteSizeStrings rendering - AttributeSensor<Long> COUCH_DOCS_DATA_SIZE_PER_NODE = Sensors.newLongSensor("couchbase.stats.cluster.per.node.couch.docs.data.size", - "Average across cluster for pools/nodes/<current node>/interestingStats/couch_docs_data_size"); - AttributeSensor<Long> MEM_USED_PER_NODE = Sensors.newLongSensor("couchbase.stats.cluster.per.node.mem.used", - "Average across cluster for pools/nodes/<current node>/interestingStats/mem_used"); - AttributeSensor<Long> COUCH_VIEWS_ACTUAL_DISK_SIZE_PER_NODE = Sensors.newLongSensor("couchbase.stats.cluster.per.node.couch.views.actual.disk.size", - "Average across cluster for pools/nodes/<current node>/interestingStats/couch_views_actual_disk_size"); - AttributeSensor<Long> COUCH_DOCS_ACTUAL_DISK_SIZE_PER_NODE = Sensors.newLongSensor("couchbase.stats.cluster.per.node.couch.docs.actual.disk.size", - "Average across cluster for pools/nodes/<current node>/interestingStats/couch_docs_actual_disk_size"); - AttributeSensor<Long> COUCH_VIEWS_DATA_SIZE_PER_NODE = Sensors.newLongSensor("couchbase.stats.cluster.per.node.couch.views.data.size", - "Average across cluster for pools/nodes/<current node>/interestingStats/couch_views_data_size"); - - AttributeSensor<Boolean> BUCKET_CREATION_IN_PROGRESS = Sensors.newBooleanSensor("couchbase.cluster.bucketCreationInProgress", "Indicates that a bucket is currently being created, and" + - "further bucket creation should be deferred"); - - /** - * createBuckets is a list of all the buckets to be created on the couchbase cluster - * the buckets will be created on the primary node of the cluster - * each map entry for a bucket should contain the following parameters: - * - <"bucket",(String) name of the bucket (default: default)> - * - <"bucket-type",(String) name of bucket type (default: couchbase)> - * - <"bucket-port",(Integer) the bucket port to connect to (default: 11222)> - * - <"bucket-ramsize",(Integer) ram size allowed for bucket (default: 200)> - * - <"bucket-replica",(Integer) number of replicas for the bucket (default: 1)> - */ - @SuppressWarnings("serial") - @SetFromFlag("createBuckets") - ConfigKey<List<Map<String, Object>>> CREATE_BUCKETS = ConfigKeys.newConfigKey(new TypeToken<List<Map<String, Object>>>() {}, - "couchbase.cluster.createBuckets", "a list of all dedicated port buckets to be created on the couchbase cluster"); - - @SuppressWarnings("serial") - @SetFromFlag("replication") - ConfigKey<List<Map<String,Object>>> REPLICATION = ConfigKeys.newConfigKey(new TypeToken<List<Map<String,Object>>>() {}, - "couchbase.cluster.replicationConfiguration", "List of replication rules to configure, each rule including target (id of another cluster) and mode (unidirectional or bidirectional)"); - - int getQuorumSize(); -}
