http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraClusterImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraClusterImpl.java new file mode 100644 index 0000000..7036285 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraClusterImpl.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.cassandra; + +/** + * @deprecated since 0.7.0; use {@link CassandraDatacenter} which is equivalent but has + * a less ambiguous name; <em>Cluster</em> in Cassandra corresponds to what Brooklyn terms a <em>Fabric</em>. + */ +@Deprecated +public class CassandraClusterImpl extends CassandraDatacenterImpl implements CassandraCluster { +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenter.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenter.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenter.java new file mode 100644 index 0000000..7ef646f --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenter.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.cassandra; + +import java.math.BigInteger; +import java.util.List; +import java.util.Set; + +import org.apache.brooklyn.catalog.Catalog; +import org.apache.brooklyn.entity.nosql.cassandra.TokenGenerators.PosNeg63TokenGenerator; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.Entity; +import brooklyn.entity.annotation.Effector; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.MethodEffector; +import brooklyn.entity.database.DatastoreMixins; +import brooklyn.entity.effector.Effectors; +import brooklyn.entity.group.DynamicCluster; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; +import brooklyn.event.basic.Sensors; +import brooklyn.util.flags.SetFromFlag; +import brooklyn.util.time.Duration; + +import com.google.common.base.Supplier; +import com.google.common.collect.Multimap; +import com.google.common.reflect.TypeToken; + +/** + * A group of {@link CassandraNode}s -- based on Brooklyn's {@link DynamicCluster} + * (though it is a "Datacenter" in Cassandra terms, where Cassandra's "cluster" corresponds + * to a Brooklyn Fabric, cf {@link CassandraFabric}). + * The Datacenter can be resized, manually or by policy if required. + * Tokens are selected intelligently. + * <p> + * Note that due to how Cassandra assumes ports are the same across a cluster, + * it is <em>NOT</em> possible to deploy a cluster of size larger than 1 to localhost. + * (Some exploratory work has been done to use different 127.0.0.x IP's for localhost, + * and there is evidence this could be made to work.) + */ +@Catalog(name="Apache Cassandra Datacenter Cluster", description="Cassandra is a highly scalable, eventually " + + "consistent, distributed, structured key-value store which provides a ColumnFamily-based data model " + + "richer than typical key/value systems", iconUrl="classpath:///cassandra-logo.jpeg") +@ImplementedBy(CassandraDatacenterImpl.class) +public interface CassandraDatacenter extends DynamicCluster, DatastoreMixins.HasDatastoreUrl, DatastoreMixins.CanExecuteScript { + + // FIXME datacenter name -- also CASS_CLUSTER_NODES should be CASS_DC_NODES + @SetFromFlag("clusterName") + BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = new BasicAttributeSensorAndConfigKey<String>(String.class, "cassandra.cluster.name", "Name of the Cassandra cluster", "BrooklynCluster"); + + @SetFromFlag("snitchName") + ConfigKey<String> ENDPOINT_SNITCH_NAME = ConfigKeys.newStringConfigKey("cassandra.cluster.snitchName", "Type of the Cassandra snitch", "SimpleSnitch"); + + @SetFromFlag("seedSupplier") + @SuppressWarnings("serial") + ConfigKey<Supplier<Set<Entity>>> SEED_SUPPLIER = ConfigKeys.newConfigKey(new TypeToken<Supplier<Set<Entity>>>() { }, "cassandra.cluster.seedSupplier", "For determining the seed nodes", null); + + @SuppressWarnings("serial") + @SetFromFlag("tokenGeneratorClass") + ConfigKey<Class<? extends TokenGenerator>> TOKEN_GENERATOR_CLASS = ConfigKeys.newConfigKey( + new TypeToken<Class<? extends TokenGenerator>>() {}, "cassandra.cluster.tokenGenerator.class", "For determining the tokens of nodes", + PosNeg63TokenGenerator.class); + + @SetFromFlag("tokenShift") + ConfigKey<BigInteger> TOKEN_SHIFT = ConfigKeys.newConfigKey(BigInteger.class, "cassandra.cluster.tokenShift", + "Delta applied to all tokens generated for this Cassandra datacenter, " + + "useful when configuring multiple datacenters which should be shifted; " + + "if not set, a random shift is applied. (Pass 0 to prevent any shift.)", null); + + ConfigKey<Boolean> USE_VNODES = ConfigKeys.newBooleanConfigKey( + "cassandra.cluster.useVnodes", + "Determines whether to use vnodes; if doing so, tokens will not be explicitly assigned to nodes in the cluster", + false); + + /** + * num_tokens will automatically be reset to 1 for each node if {@link #USE_VNODES} is false. + */ + ConfigKey<Integer> NUM_TOKENS_PER_NODE = ConfigKeys.newIntegerConfigKey("cassandra.numTokensPerNode", + "Number of tokens per node; if using vnodes, should set this to a value like 256; will be overridden to 1 if USE_VNODES==false", + 256); + + /** + * Additional time after the nodes in the cluster are up when starting + * before announcing the cluster as up. + * <p> + * Useful to ensure nodes have synchronized. + * <p> + * On 1.2.2 this could be as much as 120s when using 2 seed nodes, + * or just a few seconds with 1 seed node. On 1.2.9 it seems a few + * seconds is sufficient even with 2 seed nodes + */ + @SetFromFlag("delayBeforeAdvertisingCluster") + ConfigKey<Duration> DELAY_BEFORE_ADVERTISING_CLUSTER = ConfigKeys.newConfigKey(Duration.class, "cassandra.cluster.delayBeforeAdvertisingCluster", "Delay after cluster is started before checking and advertising its availability", Duration.TEN_SECONDS); + + @SuppressWarnings("serial") + AttributeSensor<Multimap<String,Entity>> DATACENTER_USAGE = Sensors.newSensor(new TypeToken<Multimap<String,Entity>>() { }, "cassandra.cluster.datacenterUsages", "Current set of datacenters in use, with nodes in each"); + + @SuppressWarnings("serial") + AttributeSensor<Set<String>> DATACENTERS = Sensors.newSensor(new TypeToken<Set<String>>() { }, "cassandra.cluster.datacenters", "Current set of datacenters in use"); + + AttributeSensor<Boolean> HAS_PUBLISHED_SEEDS = Sensors.newBooleanSensor("cassandra.cluster.seeds.hasPublished", "Whether we have published any seeds"); + + @SuppressWarnings("serial") + AttributeSensor<Set<Entity>> CURRENT_SEEDS = Sensors.newSensor(new TypeToken<Set<Entity>>() { }, "cassandra.cluster.seeds.current", "Current set of seeds to use to bootstrap the cluster"); + + AttributeSensor<String> HOSTNAME = Sensors.newStringSensor("cassandra.cluster.hostname", "Hostname to connect to cluster with"); + + @SuppressWarnings("serial") + AttributeSensor<List<String>> CASSANDRA_CLUSTER_NODES = Sensors.newSensor(new TypeToken<List<String>>() {}, + "cassandra.cluster.nodes", "List of host:port of all active nodes in the cluster (thrift port, and public hostname/IP)"); + + AttributeSensor<Integer> THRIFT_PORT = Sensors.newIntegerSensor("cassandra.cluster.thrift.port", "Cassandra Thrift RPC port to connect to cluster with"); + + AttributeSensor<Long> FIRST_NODE_STARTED_TIME_UTC = Sensors.newLongSensor("cassandra.cluster.first.node.started.utc", "Time (UTC) when the first node was started"); + @SuppressWarnings("serial") + AttributeSensor<List<Entity>> QUEUED_START_NODES = Sensors.newSensor(new TypeToken<List<Entity>>() {}, "cassandra.cluster.start.nodes.queued", + "Nodes queued for starting (for sequential start)"); + + AttributeSensor<Integer> SCHEMA_VERSION_COUNT = Sensors.newIntegerSensor("cassandra.cluster.schema.versions.count", + "Number of different schema versions in the cluster; should be 1 for a healthy cluster, 0 when off; " + + "2 and above indicats a Schema Disagreement Error (and keyspace access may fail)"); + + AttributeSensor<Long> READ_PENDING = Sensors.newLongSensor("cassandra.cluster.read.pending", "Current pending ReadStage tasks"); + AttributeSensor<Integer> READ_ACTIVE = Sensors.newIntegerSensor("cassandra.cluster.read.active", "Current active ReadStage tasks"); + AttributeSensor<Long> WRITE_PENDING = Sensors.newLongSensor("cassandra.cluster.write.pending", "Current pending MutationStage tasks"); + AttributeSensor<Integer> WRITE_ACTIVE = Sensors.newIntegerSensor("cassandra.cluster.write.active", "Current active MutationStage tasks"); + + AttributeSensor<Long> THRIFT_PORT_LATENCY_PER_NODE = Sensors.newLongSensor("cassandra.cluster.thrift.latency.perNode", "Latency for thrift port connection averaged over all nodes (ms)"); + AttributeSensor<Double> READS_PER_SECOND_LAST_PER_NODE = Sensors.newDoubleSensor("cassandra.reads.perSec.last.perNode", "Reads/sec (last datapoint) averaged over all nodes"); + AttributeSensor<Double> WRITES_PER_SECOND_LAST_PER_NODE = Sensors.newDoubleSensor("cassandra.write.perSec.last.perNode", "Writes/sec (last datapoint) averaged over all nodes"); + AttributeSensor<Double> PROCESS_CPU_TIME_FRACTION_LAST_PER_NODE = Sensors.newDoubleSensor("cassandra.cluster.metrics.processCpuTime.fraction.perNode", "Fraction of CPU time used (percentage reported by JMX), averaged over all nodes"); + + AttributeSensor<Double> READS_PER_SECOND_IN_WINDOW_PER_NODE = Sensors.newDoubleSensor("cassandra.reads.perSec.windowed.perNode", "Reads/sec (over time window) averaged over all nodes"); + AttributeSensor<Double> WRITES_PER_SECOND_IN_WINDOW_PER_NODE = Sensors.newDoubleSensor("cassandra.writes.perSec.windowed.perNode", "Writes/sec (over time window) averaged over all nodes"); + AttributeSensor<Double> THRIFT_PORT_LATENCY_IN_WINDOW_PER_NODE = Sensors.newDoubleSensor("cassandra.thrift.latency.windowed.perNode", "Latency for thrift port (ms, over time window) averaged over all nodes"); + AttributeSensor<Double> PROCESS_CPU_TIME_FRACTION_IN_WINDOW_PER_NODE = Sensors.newDoubleSensor("cassandra.cluster.metrics.processCpuTime.fraction.windowed", "Fraction of CPU time used (percentage, over time window), averaged over all nodes"); + + MethodEffector<Void> UPDATE = new MethodEffector<Void>(CassandraDatacenter.class, "update"); + + brooklyn.entity.Effector<String> EXECUTE_SCRIPT = Effectors.effector(DatastoreMixins.EXECUTE_SCRIPT) + .description("executes the given script contents using cassandra-cli") + .buildAbstract(); + + /** + * Sets the number of nodes used to seed the cluster. + * <p> + * Version 1.2.2 is buggy and requires a big delay for 2 nodes both seeds to reconcile, + * with 1.2.9 this seems fine, with just a few seconds' delay after starting. + * + * @see <a href="http://stackoverflow.com/questions/6770894/schemadisagreementexception/18639005" /> + */ + int DEFAULT_SEED_QUORUM = 2; + + /** + * Can insert a delay after the first node comes up. + * <p> + * Reportedly not needed with 1.2.9, but we are still seeing some seed failures so re-introducing it. + * (This does not seem to help with the bug in 1.2.2.) + */ + Duration DELAY_AFTER_FIRST = Duration.ONE_MINUTE; + + /** + * If set (ie non-null), this waits the indicated time after a successful launch of one node + * before starting the next. (If it is null, all nodes start simultaneously, + * possibly after the DELAY_AFTER_FIRST.) + * <p> + * When subsequent nodes start simultaneously, we occasionally see schema disagreement problems; + * if nodes start sequentially, we occasionally get "no sources for (tokenRange]" problems. + * Either way the node stops. Ideally this can be solved at the Cassandra level, + * but if not, we will have to introduce some restarts at the Cassandra nodes (which does seem + * to resolve the problems.) + */ + Duration DELAY_BETWEEN_STARTS = null; + + /** + * Whether to wait for the first node to start up + * <p> + * not sure whether this is needed or not. Need to test in env where not all nodes are seed nodes, + * what happens if non-seed nodes start before the seed nodes? + */ + boolean WAIT_FOR_FIRST = true; + + @Effector(description="Updates the cluster members") + void update(); + + /** + * The name of the cluster. + */ + String getClusterName(); + + Set<Entity> gatherPotentialSeeds(); + + Set<Entity> gatherPotentialRunningSeeds(); + + String executeScript(String commands); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterImpl.java new file mode 100644 index 0000000..baa9a17 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterImpl.java @@ -0,0 +1,625 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.cassandra; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.enricher.Enrichers; +import brooklyn.entity.Entity; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.DynamicGroup; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.basic.EntityPredicates; +import brooklyn.entity.basic.Lifecycle; +import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic; +import brooklyn.entity.effector.EffectorBody; +import brooklyn.entity.group.AbstractMembershipTrackingPolicy; +import brooklyn.entity.group.DynamicClusterImpl; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.event.AttributeSensor; +import brooklyn.event.SensorEvent; +import brooklyn.event.SensorEventListener; +import brooklyn.location.Location; +import brooklyn.location.basic.Machines; +import brooklyn.policy.PolicySpec; +import brooklyn.util.ResourceUtils; +import brooklyn.util.collections.MutableList; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.collections.MutableSet; +import brooklyn.util.config.ConfigBag; +import brooklyn.util.text.Strings; +import brooklyn.util.time.Time; + +import com.google.common.base.Objects; +import com.google.common.base.Optional; +import com.google.common.base.Supplier; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.LinkedHashMultimap; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import com.google.common.net.HostAndPort; + +/** + * Implementation of {@link CassandraDatacenter}. + * <p> + * Several subtleties to note: + * - a node may take some time after it is running and serving JMX to actually be contactable on its thrift port + * (so we wait for thrift port to be contactable) + * - sometimes new nodes take a while to peer, and/or take a while to get a consistent schema + * (each up to 1m; often very close to the 1m) + */ +public class CassandraDatacenterImpl extends DynamicClusterImpl implements CassandraDatacenter { + + /* + * TODO Seed management is hard! + * - The ServiceRestarter is not doing customize(), so is not refreshing the seeds in cassandra.yaml. + * If we have two nodes that were seeds for each other and they both restart at the same time, we'll have a split brain. + */ + + private static final Logger log = LoggerFactory.getLogger(CassandraDatacenterImpl.class); + + // Mutex for synchronizing during re-size operations + private final Object mutex = new Object[0]; + + private final Supplier<Set<Entity>> defaultSeedSupplier = new Supplier<Set<Entity>>() { + // Mutex for (re)calculating our seeds + // TODO is this very dangerous?! Calling out to SeedTracker, which calls out to alien getAttribute()/getConfig(). But I think that's ok. + // TODO might not need mutex? previous race was being caused by something else, other than concurrent calls! + private final Object seedMutex = new Object(); + + @Override + public Set<Entity> get() { + synchronized (seedMutex) { + boolean hasPublishedSeeds = Boolean.TRUE.equals(getAttribute(HAS_PUBLISHED_SEEDS)); + int quorumSize = getSeedQuorumSize(); + Set<Entity> potentialSeeds = gatherPotentialSeeds(); + Set<Entity> potentialRunningSeeds = gatherPotentialRunningSeeds(); + boolean stillWaitingForQuorum = (!hasPublishedSeeds) && (potentialSeeds.size() < quorumSize); + + if (stillWaitingForQuorum) { + if (log.isDebugEnabled()) log.debug("Not refreshed seeds of cluster {}, because still waiting for quorum (need {}; have {} potentials)", new Object[] {CassandraDatacenterImpl.class, quorumSize, potentialSeeds.size()}); + return ImmutableSet.of(); + } else if (hasPublishedSeeds) { + Set<Entity> currentSeeds = getAttribute(CURRENT_SEEDS); + if (getAttribute(SERVICE_STATE_ACTUAL) == Lifecycle.STARTING) { + if (Sets.intersection(currentSeeds, potentialSeeds).isEmpty()) { + log.warn("Cluster {} lost all its seeds while starting! Subsequent failure likely, but changing seeds during startup would risk split-brain: seeds={}", new Object[] {CassandraDatacenterImpl.this, currentSeeds}); + } + return currentSeeds; + } else if (potentialRunningSeeds.isEmpty()) { + // TODO Could be race where nodes have only just returned from start() and are about to + // transition to serviceUp; so don't just abandon all our seeds! + log.warn("Cluster {} has no running seeds (yet?); leaving seeds as-is; but risks split-brain if these seeds come back up!", new Object[] {CassandraDatacenterImpl.this}); + return currentSeeds; + } else { + Set<Entity> result = trim(quorumSize, potentialRunningSeeds); + log.debug("Cluster {} updating seeds: chosen={}; potentialRunning={}", new Object[] {CassandraDatacenterImpl.this, result, potentialRunningSeeds}); + return result; + } + } else { + Set<Entity> result = trim(quorumSize, potentialSeeds); + if (log.isDebugEnabled()) log.debug("Cluster {} has reached seed quorum: seeds={}", new Object[] {CassandraDatacenterImpl.this, result}); + return result; + } + } + } + private Set<Entity> trim(int num, Set<Entity> contenders) { + // Prefer existing seeds wherever possible; otherwise accept any other contenders + Set<Entity> currentSeeds = (getAttribute(CURRENT_SEEDS) != null) ? getAttribute(CURRENT_SEEDS) : ImmutableSet.<Entity>of(); + Set<Entity> result = Sets.newLinkedHashSet(); + result.addAll(Sets.intersection(currentSeeds, contenders)); + result.addAll(contenders); + return ImmutableSet.copyOf(Iterables.limit(result, num)); + } + }; + + protected SeedTracker seedTracker = new SeedTracker(); + protected TokenGenerator tokenGenerator = null; + + public CassandraDatacenterImpl() { + } + + @Override + public void init() { + super.init(); + + /* + * subscribe to hostname, and keep an accurate set of current seeds in a sensor; + * then at nodes we set the initial seeds to be the current seeds when ready (non-empty) + */ + subscribeToMembers(this, Attributes.HOSTNAME, new SensorEventListener<String>() { + @Override + public void onEvent(SensorEvent<String> event) { + seedTracker.onHostnameChanged(event.getSource(), event.getValue()); + } + }); + subscribe(this, DynamicGroup.MEMBER_REMOVED, new SensorEventListener<Entity>() { + @Override public void onEvent(SensorEvent<Entity> event) { + seedTracker.onMemberRemoved(event.getValue()); + } + }); + subscribeToMembers(this, Attributes.SERVICE_UP, new SensorEventListener<Boolean>() { + @Override + public void onEvent(SensorEvent<Boolean> event) { + seedTracker.onServiceUpChanged(event.getSource(), event.getValue()); + } + }); + subscribeToMembers(this, Attributes.SERVICE_STATE_ACTUAL, new SensorEventListener<Lifecycle>() { + @Override + public void onEvent(SensorEvent<Lifecycle> event) { + // trigger a recomputation also when lifecycle state changes, + // because it might not have ruled a seed as inviable when service up went true + // because service state was not yet running + seedTracker.onServiceUpChanged(event.getSource(), Lifecycle.RUNNING==event.getValue()); + } + }); + + // Track the datacenters for this cluster + subscribeToMembers(this, CassandraNode.DATACENTER_NAME, new SensorEventListener<String>() { + @Override + public void onEvent(SensorEvent<String> event) { + Entity member = event.getSource(); + String dcName = event.getValue(); + if (dcName != null) { + Multimap<String, Entity> datacenterUsage = getAttribute(DATACENTER_USAGE); + Multimap<String, Entity> mutableDatacenterUsage = (datacenterUsage == null) ? LinkedHashMultimap.<String, Entity>create() : LinkedHashMultimap.create(datacenterUsage); + Optional<String> oldDcName = getKeyOfVal(mutableDatacenterUsage, member); + if (!(oldDcName.isPresent() && dcName.equals(oldDcName.get()))) { + mutableDatacenterUsage.values().remove(member); + mutableDatacenterUsage.put(dcName, member); + setAttribute(DATACENTER_USAGE, mutableDatacenterUsage); + setAttribute(DATACENTERS, Sets.newLinkedHashSet(mutableDatacenterUsage.keySet())); + } + } + } + private <K,V> Optional<K> getKeyOfVal(Multimap<K,V> map, V val) { + for (Map.Entry<K,V> entry : map.entries()) { + if (Objects.equal(val, entry.getValue())) { + return Optional.of(entry.getKey()); + } + } + return Optional.absent(); + } + }); + subscribe(this, DynamicGroup.MEMBER_REMOVED, new SensorEventListener<Entity>() { + @Override public void onEvent(SensorEvent<Entity> event) { + Entity entity = event.getSource(); + Multimap<String, Entity> datacenterUsage = getAttribute(DATACENTER_USAGE); + if (datacenterUsage != null && datacenterUsage.containsValue(entity)) { + Multimap<String, Entity> mutableDatacenterUsage = LinkedHashMultimap.create(datacenterUsage); + mutableDatacenterUsage.values().remove(entity); + setAttribute(DATACENTER_USAGE, mutableDatacenterUsage); + setAttribute(DATACENTERS, Sets.newLinkedHashSet(mutableDatacenterUsage.keySet())); + } + } + }); + + getMutableEntityType().addEffector(EXECUTE_SCRIPT, new EffectorBody<String>() { + @Override + public String call(ConfigBag parameters) { + return executeScript((String)parameters.getStringKey("commands")); + } + }); + } + + protected Supplier<Set<Entity>> getSeedSupplier() { + Supplier<Set<Entity>> seedSupplier = getConfig(SEED_SUPPLIER); + return (seedSupplier == null) ? defaultSeedSupplier : seedSupplier; + } + + protected boolean useVnodes() { + return Boolean.TRUE.equals(getConfig(USE_VNODES)); + } + + protected synchronized TokenGenerator getTokenGenerator() { + if (tokenGenerator!=null) + return tokenGenerator; + + try { + tokenGenerator = getConfig(TOKEN_GENERATOR_CLASS).newInstance(); + + BigInteger shift = getConfig(TOKEN_SHIFT); + if (shift==null) + shift = BigDecimal.valueOf(Math.random()).multiply( + new BigDecimal(tokenGenerator.range())).toBigInteger(); + tokenGenerator.setOrigin(shift); + + return tokenGenerator; + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + protected int getSeedQuorumSize() { + Integer quorumSize = getConfig(INITIAL_QUORUM_SIZE); + if (quorumSize!=null && quorumSize>0) + return quorumSize; + // default 2 is recommended, unless initial size is smaller + return Math.min(Math.max(getConfig(INITIAL_SIZE), 1), DEFAULT_SEED_QUORUM); + } + + @Override + public Set<Entity> gatherPotentialSeeds() { + return seedTracker.gatherPotentialSeeds(); + } + + @Override + public Set<Entity> gatherPotentialRunningSeeds() { + return seedTracker.gatherPotentialRunningSeeds(); + } + + /** + * Sets the default {@link #MEMBER_SPEC} to describe the Cassandra nodes. + */ + @Override + protected EntitySpec<?> getMemberSpec() { + return getConfig(MEMBER_SPEC, EntitySpec.create(CassandraNode.class)); + } + + @Override + public String getClusterName() { + return getAttribute(CLUSTER_NAME); + } + + @Override + public Collection<Entity> grow(int delta) { + if (useVnodes()) { + // nothing to do for token generator + } else { + if (getCurrentSize() == 0) { + getTokenGenerator().growingCluster(delta); + } + } + return super.grow(delta); + } + + @SuppressWarnings("deprecation") + @Override + protected Entity createNode(@Nullable Location loc, Map<?,?> flags) { + Map<Object, Object> allflags = MutableMap.copyOf(flags); + + if ((flags.containsKey(CassandraNode.TOKEN) || flags.containsKey("token")) || (flags.containsKey(CassandraNode.TOKENS) || flags.containsKey("tokens"))) { + // leave token config as-is + } else if (!useVnodes()) { + BigInteger token = getTokenGenerator().newToken(); + allflags.put(CassandraNode.TOKEN, token); + } + + if ((flags.containsKey(CassandraNode.NUM_TOKENS_PER_NODE) || flags.containsKey("numTokensPerNode"))) { + // leave num_tokens as-is + } else if (useVnodes()) { + Integer numTokensPerNode = getConfig(NUM_TOKENS_PER_NODE); + allflags.put(CassandraNode.NUM_TOKENS_PER_NODE, numTokensPerNode); + } else { + allflags.put(CassandraNode.NUM_TOKENS_PER_NODE, 1); + } + + return super.createNode(loc, allflags); + } + + @Override + protected Entity replaceMember(Entity member, Location memberLoc, Map<?, ?> extraFlags) { + Set<BigInteger> oldTokens = ((CassandraNode) member).getTokens(); + Set<BigInteger> newTokens = (oldTokens != null && oldTokens.size() > 0) ? getTokenGenerator().getTokensForReplacementNode(oldTokens) : null; + return super.replaceMember(member, memberLoc, MutableMap.copyOf(extraFlags).add(CassandraNode.TOKENS, newTokens)); + } + + @Override + public void start(Collection<? extends Location> locations) { + Machines.warnIfLocalhost(locations, "CassandraCluster does not support multiple nodes on localhost, " + + "due to assumptions Cassandra makes about the use of the same port numbers used across the cluster."); + + // force this to be set - even if it is using the default + setAttribute(CLUSTER_NAME, getConfig(CLUSTER_NAME)); + + super.start(locations); + + connectSensors(); + + // TODO wait until all nodes which we think are up are consistent + // i.e. all known nodes use the same schema, as reported by + // SshEffectorTasks.ssh("echo \"describe cluster;\" | /bin/cassandra-cli"); + // once we've done that we can revert to using 2 seed nodes. + // see CassandraCluster.DEFAULT_SEED_QUORUM + // (also ensure the cluster is ready if we are about to run a creation script) + Time.sleep(getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER)); + + String scriptUrl = getConfig(CassandraNode.CREATION_SCRIPT_URL); + if (Strings.isNonEmpty(scriptUrl)) { + executeScript(new ResourceUtils(this).getResourceAsString(scriptUrl)); + } + + update(); + } + + protected void connectSensors() { + connectEnrichers(); + + addPolicy(PolicySpec.create(MemberTrackingPolicy.class) + .displayName("Cassandra Cluster Tracker") + .configure("sensorsToTrack", ImmutableSet.of(Attributes.SERVICE_UP, Attributes.HOSTNAME, CassandraNode.THRIFT_PORT)) + .configure("group", this)); + } + + public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy { + @Override + protected void onEntityChange(Entity member) { + if (log.isDebugEnabled()) log.debug("Node {} updated in Cluster {}", member, this); + ((CassandraDatacenterImpl)entity).update(); + } + @Override + protected void onEntityAdded(Entity member) { + if (log.isDebugEnabled()) log.debug("Node {} added to Cluster {}", member, this); + ((CassandraDatacenterImpl)entity).update(); + } + @Override + protected void onEntityRemoved(Entity member) { + if (log.isDebugEnabled()) log.debug("Node {} removed from Cluster {}", member, this); + ((CassandraDatacenterImpl)entity).update(); + } + }; + + @SuppressWarnings("unchecked") + protected void connectEnrichers() { + List<? extends List<? extends AttributeSensor<? extends Number>>> summingEnricherSetup = ImmutableList.of( + ImmutableList.of(CassandraNode.READ_ACTIVE, READ_ACTIVE), + ImmutableList.of(CassandraNode.READ_PENDING, READ_PENDING), + ImmutableList.of(CassandraNode.WRITE_ACTIVE, WRITE_ACTIVE), + ImmutableList.of(CassandraNode.WRITE_PENDING, WRITE_PENDING) + ); + + List<? extends List<? extends AttributeSensor<? extends Number>>> averagingEnricherSetup = ImmutableList.of( + ImmutableList.of(CassandraNode.READS_PER_SECOND_LAST, READS_PER_SECOND_LAST_PER_NODE), + ImmutableList.of(CassandraNode.WRITES_PER_SECOND_LAST, WRITES_PER_SECOND_LAST_PER_NODE), + ImmutableList.of(CassandraNode.WRITES_PER_SECOND_IN_WINDOW, WRITES_PER_SECOND_IN_WINDOW_PER_NODE), + ImmutableList.of(CassandraNode.READS_PER_SECOND_IN_WINDOW, READS_PER_SECOND_IN_WINDOW_PER_NODE), + ImmutableList.of(CassandraNode.THRIFT_PORT_LATENCY, THRIFT_PORT_LATENCY_PER_NODE), + ImmutableList.of(CassandraNode.THRIFT_PORT_LATENCY_IN_WINDOW, THRIFT_PORT_LATENCY_IN_WINDOW_PER_NODE), + ImmutableList.of(CassandraNode.PROCESS_CPU_TIME_FRACTION_LAST, PROCESS_CPU_TIME_FRACTION_LAST_PER_NODE), + ImmutableList.of(CassandraNode.PROCESS_CPU_TIME_FRACTION_IN_WINDOW, PROCESS_CPU_TIME_FRACTION_IN_WINDOW_PER_NODE) + ); + + for (List<? extends AttributeSensor<? extends Number>> es : summingEnricherSetup) { + AttributeSensor<? extends Number> t = es.get(0); + AttributeSensor<? extends Number> total = es.get(1); + addEnricher(Enrichers.builder() + .aggregating(t) + .publishing(total) + .fromMembers() + .computingSum() + .defaultValueForUnreportedSensors(null) + .valueToReportIfNoSensors(null) + .build()); + } + + for (List<? extends AttributeSensor<? extends Number>> es : averagingEnricherSetup) { + AttributeSensor<Number> t = (AttributeSensor<Number>) es.get(0); + AttributeSensor<Double> average = (AttributeSensor<Double>) es.get(1); + addEnricher(Enrichers.builder() + .aggregating(t) + .publishing(average) + .fromMembers() + .computingAverage() + .defaultValueForUnreportedSensors(null) + .valueToReportIfNoSensors(null) + .build()); + + } + } + + @Override + public void stop() { + disconnectSensors(); + + super.stop(); + } + + protected void disconnectSensors() { + } + + @Override + public void update() { + synchronized (mutex) { + // Update our seeds, as necessary + seedTracker.refreshSeeds(); + + // Choose the first available cluster member to set host and port (and compute one-up) + Optional<Entity> upNode = Iterables.tryFind(getMembers(), EntityPredicates.attributeEqualTo(SERVICE_UP, Boolean.TRUE)); + + if (upNode.isPresent()) { + setAttribute(HOSTNAME, upNode.get().getAttribute(Attributes.HOSTNAME)); + setAttribute(THRIFT_PORT, upNode.get().getAttribute(CassandraNode.THRIFT_PORT)); + + List<String> currentNodes = getAttribute(CASSANDRA_CLUSTER_NODES); + Set<String> oldNodes = (currentNodes != null) ? ImmutableSet.copyOf(currentNodes) : ImmutableSet.<String>of(); + Set<String> newNodes = MutableSet.<String>of(); + for (Entity member : getMembers()) { + if (member instanceof CassandraNode && Boolean.TRUE.equals(member.getAttribute(SERVICE_UP))) { + String hostname = member.getAttribute(Attributes.HOSTNAME); + Integer thriftPort = member.getAttribute(CassandraNode.THRIFT_PORT); + if (hostname != null && thriftPort != null) { + newNodes.add(HostAndPort.fromParts(hostname, thriftPort).toString()); + } + } + } + if (Sets.symmetricDifference(oldNodes, newNodes).size() > 0) { + setAttribute(CASSANDRA_CLUSTER_NODES, MutableList.copyOf(newNodes)); + } + } else { + setAttribute(HOSTNAME, null); + setAttribute(THRIFT_PORT, null); + setAttribute(CASSANDRA_CLUSTER_NODES, Collections.<String>emptyList()); + } + + ServiceNotUpLogic.updateNotUpIndicatorRequiringNonEmptyList(this, CASSANDRA_CLUSTER_NODES); + } + } + + /** + * For tracking our seeds. This gets fiddly! High-level logic is: + * <ul> + * <li>If we have never reached quorum (i.e. have never published seeds), then continue to wait for quorum; + * because entity-startup may be blocking for this. This is handled by the seedSupplier. + * <li>If we previously reached quorum (i.e. have previousy published seeds), then always update; + * we never want stale/dead entities listed in our seeds. + * <li>If an existing seed looks unhealthy, then replace it. + * <li>If a new potential seed becomes available (and we're in need of more), then add it. + * <ul> + * + * Also note that {@link CassandraFabric} can take over, because it know about multiple sub-clusters! + * It will provide a different {@link CassandraDatacenter#SEED_SUPPLIER}. Each time we think that our seeds + * need to change, we call that. The fabric will call into {@link CassandraDatacenterImpl#gatherPotentialSeeds()} + * to find out what's available. + * + * @author aled + */ + protected class SeedTracker { + private final Map<Entity, Boolean> memberUpness = Maps.newLinkedHashMap(); + + public void onMemberRemoved(Entity member) { + Set<Entity> seeds = getSeeds(); + boolean maybeRemove = seeds.contains(member); + memberUpness.remove(member); + + if (maybeRemove) { + refreshSeeds(); + } else { + if (log.isTraceEnabled()) log.trace("Seeds considered stable for cluster {} (node {} removed)", new Object[] {CassandraDatacenterImpl.this, member}); + return; + } + } + public void onHostnameChanged(Entity member, String hostname) { + Set<Entity> seeds = getSeeds(); + int quorum = getSeedQuorumSize(); + boolean isViable = isViableSeed(member); + boolean maybeAdd = isViable && seeds.size() < quorum; + boolean maybeRemove = seeds.contains(member) && !isViable; + + if (maybeAdd || maybeRemove) { + refreshSeeds(); + } else { + if (log.isTraceEnabled()) log.trace("Seeds considered stable for cluster {} (node {} changed hostname {})", new Object[] {CassandraDatacenterImpl.this, member, hostname}); + return; + } + } + public void onServiceUpChanged(Entity member, Boolean serviceUp) { + Boolean oldVal = memberUpness.put(member, serviceUp); + if (Objects.equal(oldVal, serviceUp)) { + if (log.isTraceEnabled()) log.trace("Ignoring duplicate service-up in "+CassandraDatacenterImpl.this+" for "+member+", "+serviceUp); + } + Set<Entity> seeds = getSeeds(); + int quorum = getSeedQuorumSize(); + boolean isViable = isViableSeed(member); + boolean maybeAdd = isViable && seeds.size() < quorum; + boolean maybeRemove = seeds.contains(member) && !isViable; + + if (log.isDebugEnabled()) + log.debug("Considering refresh of seeds for "+CassandraDatacenterImpl.this+" because "+member+" is now "+serviceUp+" ("+isViable+" / "+maybeAdd+" / "+maybeRemove+")"); + if (maybeAdd || maybeRemove) { + refreshSeeds(); + } else { + if (log.isTraceEnabled()) log.trace("Seeds considered stable for cluster {} (node {} changed serviceUp {})", new Object[] {CassandraDatacenterImpl.this, member, serviceUp}); + return; + } + } + protected Set<Entity> getSeeds() { + Set<Entity> result = getAttribute(CURRENT_SEEDS); + return (result == null) ? ImmutableSet.<Entity>of() : result; + } + public void refreshSeeds() { + Set<Entity> oldseeds = getAttribute(CURRENT_SEEDS); + Set<Entity> newseeds = getSeedSupplier().get(); + if (Objects.equal(oldseeds, newseeds)) { + if (log.isTraceEnabled()) log.debug("Seed refresh no-op for cluster {}: still={}", new Object[] {CassandraDatacenterImpl.this, oldseeds}); + } else { + if (log.isDebugEnabled()) log.debug("Refreshing seeds of cluster {}: now={}; old={}", new Object[] {this, newseeds, oldseeds}); + setAttribute(CURRENT_SEEDS, newseeds); + if (newseeds != null && newseeds.size() > 0) { + setAttribute(HAS_PUBLISHED_SEEDS, true); + } + } + } + public Set<Entity> gatherPotentialSeeds() { + Set<Entity> result = Sets.newLinkedHashSet(); + for (Entity member : getMembers()) { + if (isViableSeed(member)) { + result.add(member); + } + } + if (log.isTraceEnabled()) log.trace("Viable seeds in Cluster {}: {}", new Object[] {result}); + return result; + } + public Set<Entity> gatherPotentialRunningSeeds() { + Set<Entity> result = Sets.newLinkedHashSet(); + for (Entity member : getMembers()) { + if (isRunningSeed(member)) { + result.add(member); + } + } + if (log.isTraceEnabled()) log.trace("Viable running seeds in Cluster {}: {}", new Object[] {result}); + return result; + } + public boolean isViableSeed(Entity member) { + // TODO would be good to reuse the better logic in ServiceFailureDetector + // (e.g. if that didn't just emit a notification but set a sensor as well?) + boolean managed = Entities.isManaged(member); + String hostname = member.getAttribute(Attributes.HOSTNAME); + boolean serviceUp = Boolean.TRUE.equals(member.getAttribute(Attributes.SERVICE_UP)); + Lifecycle serviceState = member.getAttribute(Attributes.SERVICE_STATE_ACTUAL); + boolean hasFailed = !managed || (serviceState == Lifecycle.ON_FIRE) || (serviceState == Lifecycle.RUNNING && !serviceUp) || (serviceState == Lifecycle.STOPPED); + boolean result = (hostname != null && !hasFailed); + if (log.isTraceEnabled()) log.trace("Node {} in Cluster {}: viableSeed={}; hostname={}; serviceUp={}; serviceState={}; hasFailed={}", new Object[] {member, this, result, hostname, serviceUp, serviceState, hasFailed}); + return result; + } + public boolean isRunningSeed(Entity member) { + boolean viableSeed = isViableSeed(member); + boolean serviceUp = Boolean.TRUE.equals(member.getAttribute(Attributes.SERVICE_UP)); + Lifecycle serviceState = member.getAttribute(Attributes.SERVICE_STATE_ACTUAL); + boolean result = viableSeed && serviceUp && serviceState == Lifecycle.RUNNING; + if (log.isTraceEnabled()) log.trace("Node {} in Cluster {}: runningSeed={}; viableSeed={}; serviceUp={}; serviceState={}", new Object[] {member, this, result, viableSeed, serviceUp, serviceState}); + return result; + } + } + + @Override + public String executeScript(String commands) { + Entity someChild = Iterables.getFirst(getMembers(), null); + if (someChild==null) + throw new IllegalStateException("No Cassandra nodes available"); + // FIXME cross-etntity method-style calls such as below do not set up a queueing context (DynamicSequentialTask) +// return ((CassandraNode)someChild).executeScript(commands); + return Entities.invokeEffector(this, someChild, CassandraNode.EXECUTE_SCRIPT, MutableMap.of("commands", commands)).getUnchecked(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabric.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabric.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabric.java new file mode 100644 index 0000000..23db92c --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabric.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.cassandra; + +import java.util.Set; + +import org.apache.brooklyn.catalog.Catalog; +import brooklyn.config.ConfigKey; +import brooklyn.entity.Entity; +import brooklyn.entity.annotation.Effector; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.MethodEffector; +import brooklyn.entity.group.DynamicFabric; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.AttributeSensor; +import brooklyn.location.Location; + +import com.google.common.base.Function; +import com.google.common.collect.Multimap; +import com.google.common.reflect.TypeToken; + +/** + * A fabric of {@link CassandraNode}s, which forms a cluster spanning multiple locations. + * <p> + * Each {@link CassandraDatacenter} child instance is actually just a part of the whole cluster. It consists of the + * nodes in that single location (which normally corresponds to a "datacenter" in Cassandra terminology). + */ +@Catalog(name="Apache Cassandra Database Fabric", description="Cassandra is a highly scalable, eventually " + + "consistent, distributed, structured key-value store which provides a ColumnFamily-based data model " + + "richer than typical key/value systems", iconUrl="classpath:///cassandra-logo.jpeg") +@ImplementedBy(CassandraFabricImpl.class) +public interface CassandraFabric extends DynamicFabric { + + ConfigKey<Integer> INITIAL_QUORUM_SIZE = ConfigKeys.newIntegerConfigKey( + "fabric.initial.quorumSize", + "Initial fabric quorum size - number of initial nodes that must have been successfully started " + + "to report success (if less than 0, then use a value based on INITIAL_SIZE of clusters)", + -1); + + @SuppressWarnings("serial") + ConfigKey<Function<Location, String>> DATA_CENTER_NAMER = ConfigKeys.newConfigKey(new TypeToken<Function<Location, String>>(){}, + "cassandra.fabric.datacenter.namer", + "Function used to provide the cassandra.replication.datacenterName for a given location"); + + int DEFAULT_SEED_QUORUM = 5; + + AttributeSensor<Multimap<String,Entity>> DATACENTER_USAGE = CassandraDatacenter.DATACENTER_USAGE; + + AttributeSensor<Set<String>> DATACENTERS = CassandraDatacenter.DATACENTERS; + + AttributeSensor<Set<Entity>> CURRENT_SEEDS = CassandraDatacenter.CURRENT_SEEDS; + + AttributeSensor<Boolean> HAS_PUBLISHED_SEEDS = CassandraDatacenter.HAS_PUBLISHED_SEEDS; + + AttributeSensor<String> HOSTNAME = CassandraDatacenter.HOSTNAME; + + AttributeSensor<Integer> THRIFT_PORT = CassandraDatacenter.THRIFT_PORT; + + MethodEffector<Void> UPDATE = new MethodEffector<Void>(CassandraFabric.class, "update"); + + @Effector(description="Updates the cluster members") + void update(); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricImpl.java new file mode 100644 index 0000000..bce7cac --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricImpl.java @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.cassandra; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.Entity; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.DynamicGroup; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.basic.EntityInternal; +import brooklyn.entity.basic.EntityPredicates; +import brooklyn.entity.basic.Lifecycle; +import brooklyn.entity.group.AbstractMembershipTrackingPolicy; +import brooklyn.entity.group.DynamicFabricImpl; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.event.SensorEvent; +import brooklyn.event.SensorEventListener; +import brooklyn.location.Location; +import brooklyn.policy.PolicySpec; +import brooklyn.util.collections.CollectionFunctionals; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.collections.MutableSet; +import brooklyn.util.time.Time; + +import com.google.common.base.Function; +import com.google.common.base.Objects; +import com.google.common.base.Optional; +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.LinkedHashMultimap; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; + +/** + * Implementation of {@link CassandraDatacenter}. + * <p> + * Serveral subtleties to note: + * - a node may take some time after it is running and serving JMX to actually be contactable on its thrift port + * (so we wait for thrift port to be contactable) + * - sometimes new nodes take a while to peer, and/or take a while to get a consistent schema + * (each up to 1m; often very close to the 1m) + */ +public class CassandraFabricImpl extends DynamicFabricImpl implements CassandraFabric { + + private static final Logger log = LoggerFactory.getLogger(CassandraFabricImpl.class); + + // Mutex for synchronizing during re-size operations + private final Object mutex = new Object[0]; + + private final Supplier<Set<Entity>> defaultSeedSupplier = new Supplier<Set<Entity>>() { + @Override public Set<Entity> get() { + // TODO Remove duplication from CassandraClusterImpl.defaultSeedSupplier + Set<Entity> seeds = getAttribute(CURRENT_SEEDS); + boolean hasPublishedSeeds = Boolean.TRUE.equals(getAttribute(HAS_PUBLISHED_SEEDS)); + int quorumSize = getSeedQuorumSize(); + + // update seeds if we're not quorate; note this may not work for dynamically adding new datacenters + // as we do not take a new seed from the new datacenter + if (seeds == null || seeds.size() < quorumSize || containsDownEntity(seeds)) { + Set<Entity> newseeds; + Map<CassandraDatacenter,Set<Entity>> potentialSeeds = MutableMap.of(); + int potentialSeedCount = 0; + for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) { + Set<Entity> dcPotentialSeeds = member.gatherPotentialSeeds(); + potentialSeeds.put(member, dcPotentialSeeds); + potentialSeedCount += dcPotentialSeeds.size(); + } + + if (hasPublishedSeeds) { + Set<Entity> currentSeeds = getAttribute(CURRENT_SEEDS); + Lifecycle serviceState = getAttribute(SERVICE_STATE_ACTUAL); + if (serviceState == Lifecycle.STARTING) { + if (Sets.intersection(currentSeeds, ImmutableSet.copyOf(Iterables.concat(potentialSeeds.values()))).isEmpty()) { + log.warn("Fabric {} lost all its seeds while starting! Subsequent failure likely, but changing seeds during startup would risk split-brain: seeds={}", new Object[] {CassandraFabricImpl.this, currentSeeds}); + } + newseeds = currentSeeds; + } else if (serviceState == Lifecycle.STOPPING || serviceState == Lifecycle.STOPPED) { + if (log.isTraceEnabled()) log.trace("Fabric {} ignoring any potential seed-changes, because {}: seeds={}", new Object[] {CassandraFabricImpl.this, serviceState, currentSeeds}); + newseeds = currentSeeds; + } else if (potentialSeedCount == 0) { + // TODO Could be race where nodes have only just returned from start() and are about to + // transition to serviceUp; so don't just abandon all our seeds! + log.warn("Fabric {} has no seeds (after startup); leaving seeds as-is; but risks split-brain if these seeds come back up!", new Object[] {CassandraFabricImpl.this}); + newseeds = currentSeeds; + } else if (!allNonEmpty(potentialSeeds.values())) { + log.warn("Fabric {} has datacenter with no seeds (after startup); leaving seeds as-is; but risks split-brain if these seeds come back up!", new Object[] {CassandraFabricImpl.this}); + newseeds = currentSeeds; + } else { + Set<Entity> result = selectSeeds(quorumSize, potentialSeeds); + if (log.isDebugEnabled() && !Objects.equal(seeds, result)) { + log.debug("Fabric {} updating seeds: chosen={}; potential={}", new Object[] {CassandraFabricImpl.this, result, potentialSeeds}); + } + newseeds = result; + } + } else if (potentialSeedCount < quorumSize) { + if (log.isDebugEnabled()) log.debug("Not setting seeds of fabric {} yet, because still waiting for quorum (need {}; have {} potentials from {} members)", new Object[] {CassandraFabricImpl.this, quorumSize, potentialSeedCount, getMembers()}); + newseeds = ImmutableSet.of(); + } else if (!allNonEmpty(potentialSeeds.values())) { + if (log.isDebugEnabled()) { + Map<CassandraDatacenter, Integer> datacenterCounts = Maps.transformValues(potentialSeeds, CollectionFunctionals.sizeFunction()); + log.debug("Not setting seeds of fabric {} yet, because not all datacenters have seeds (sizes are {})", new Object[] {CassandraFabricImpl.this, datacenterCounts}); + } + newseeds = ImmutableSet.of(); + } else { + // yay, we're quorate + Set<Entity> result = selectSeeds(quorumSize, potentialSeeds); + log.info("Fabric {} has reached seed quorum: seeds={}", new Object[] {CassandraFabricImpl.this, result}); + newseeds = result; + } + + if (!Objects.equal(seeds, newseeds)) { + setAttribute(CURRENT_SEEDS, newseeds); + + if (newseeds != null && newseeds.size() > 0) { + setAttribute(HAS_PUBLISHED_SEEDS, true); + + // Need to tell every datacenter that seeds are ready. + // Otherwise a datacenter might get no more changes (e.g. to nodes' hostnames etc), + // and not call seedSupplier.get() again. + for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) { + member.update(); + } + } + return newseeds; + } else { + return seeds; + } + } else { + if (log.isTraceEnabled()) log.trace("Not refresheed seeds of fabric {}, because have quorum {} (of {} members), and none are down: seeds={}", + new Object[] {CassandraFabricImpl.class, quorumSize, getMembers().size(), seeds}); + return seeds; + } + } + private boolean allNonEmpty(Collection<? extends Collection<Entity>> contenders) { + for (Collection<Entity> contender: contenders) + if (contender.isEmpty()) return false; + return true; + } + private Set<Entity> selectSeeds(int num, Map<CassandraDatacenter,? extends Collection<Entity>> contenders) { + // Prefer existing seeds wherever possible; + // otherwise prefer a seed from each sub-cluster; + // otherwise accept any other contenders + Set<Entity> currentSeeds = (getAttribute(CURRENT_SEEDS) != null) ? getAttribute(CURRENT_SEEDS) : ImmutableSet.<Entity>of(); + MutableSet<Entity> result = MutableSet.of(); + result.addAll(Sets.intersection(currentSeeds, ImmutableSet.copyOf(contenders.values()))); + for (CassandraDatacenter cluster : contenders.keySet()) { + Set<Entity> contendersInCluster = Sets.newLinkedHashSet(contenders.get(cluster)); + if (contendersInCluster.size() > 0 && Sets.intersection(result, contendersInCluster).isEmpty()) { + result.add(Iterables.getFirst(contendersInCluster, null)); + } + } + result.addAll(Iterables.concat(contenders.values())); + return ImmutableSet.copyOf(Iterables.limit(result, num)); + } + private boolean containsDownEntity(Set<Entity> seeds) { + for (Entity seed : seeds) { + if (!isViableSeed(seed)) { + return true; + } + } + return false; + } + public boolean isViableSeed(Entity member) { + // TODO remove duplication from CassandraClusterImpl.SeedTracker.isViableSeed + boolean managed = Entities.isManaged(member); + String hostname = member.getAttribute(Attributes.HOSTNAME); + boolean serviceUp = Boolean.TRUE.equals(member.getAttribute(Attributes.SERVICE_UP)); + Lifecycle serviceState = member.getAttribute(Attributes.SERVICE_STATE_ACTUAL); + boolean hasFailed = !managed || (serviceState == Lifecycle.ON_FIRE) || (serviceState == Lifecycle.RUNNING && !serviceUp) || (serviceState == Lifecycle.STOPPED); + boolean result = (hostname != null && !hasFailed); + if (log.isTraceEnabled()) log.trace("Node {} in Fabric {}: viableSeed={}; hostname={}; serviceUp={}; serviceState={}; hasFailed={}", new Object[] {member, CassandraFabricImpl.this, result, hostname, serviceUp, serviceState, hasFailed}); + return result; + } + }; + + public CassandraFabricImpl() { + } + + @Override + public void init() { + super.init(); + + if (!getConfigRaw(CassandraDatacenter.SEED_SUPPLIER, true).isPresentAndNonNull()) + setConfig(CassandraDatacenter.SEED_SUPPLIER, getSeedSupplier()); + + // track members + addPolicy(PolicySpec.create(MemberTrackingPolicy.class) + .displayName("Cassandra Fabric Tracker") + .configure("group", this)); + + // Track first node's startup + subscribeToMembers(this, CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, new SensorEventListener<Long>() { + @Override + public void onEvent(SensorEvent<Long> event) { + Long oldval = getAttribute(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC); + Long newval = event.getValue(); + if (oldval == null && newval != null) { + setAttribute(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, newval); + for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) { + ((EntityInternal)member).setAttribute(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, newval); + } + } + } + }); + + // Track the datacenters for this cluster + subscribeToMembers(this, CassandraDatacenter.DATACENTER_USAGE, new SensorEventListener<Multimap<String,Entity>>() { + @Override + public void onEvent(SensorEvent<Multimap<String,Entity>> event) { + Multimap<String, Entity> usage = calculateDatacenterUsage(); + setAttribute(DATACENTER_USAGE, usage); + setAttribute(DATACENTERS, usage.keySet()); + } + }); + subscribe(this, DynamicGroup.MEMBER_REMOVED, new SensorEventListener<Entity>() { + @Override public void onEvent(SensorEvent<Entity> event) { + Multimap<String, Entity> usage = calculateDatacenterUsage(); + setAttribute(DATACENTER_USAGE, usage); + setAttribute(DATACENTERS, usage.keySet()); + } + }); + } + + public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy { + @Override + protected void onEntityChange(Entity member) { + if (log.isDebugEnabled()) log.debug("Location {} updated in Fabric {}", member, entity); + ((CassandraFabricImpl)entity).update(); + } + @Override + protected void onEntityAdded(Entity member) { + if (log.isDebugEnabled()) log.debug("Location {} added to Fabric {}", member, entity); + ((CassandraFabricImpl)entity).update(); + } + @Override + protected void onEntityRemoved(Entity member) { + if (log.isDebugEnabled()) log.debug("Location {} removed from Fabric {}", member, entity); + ((CassandraFabricImpl)entity).update(); + } + }; + + protected int getSeedQuorumSize() { + Integer quorumSize = getConfig(INITIAL_QUORUM_SIZE); + if (quorumSize!=null && quorumSize>0) + return quorumSize; + + int initialSizeSum = 0; + for (CassandraDatacenter cluster : Iterables.filter(getMembers(), CassandraDatacenter.class)) { + initialSizeSum += cluster.getConfig(CassandraDatacenter.INITIAL_SIZE); + } + if (initialSizeSum>5) initialSizeSum /= 2; + else if (initialSizeSum>3) initialSizeSum -= 2; + else if (initialSizeSum>2) initialSizeSum -= 1; + + return Math.min(Math.max(initialSizeSum, 1), CassandraFabric.DEFAULT_SEED_QUORUM); + } + + /** + * Sets the default {@link #MEMBER_SPEC} to describe the Cassandra sub-clusters. + */ + @Override + protected EntitySpec<?> getMemberSpec() { + // Need to set the seedSupplier, even if the caller has overridden the CassandraCluster config + // (unless they've explicitly overridden the seedSupplier as well!) + // TODO probably don't need to anymore, as it is set on the Fabric here -- just make sure there is a default! + EntitySpec<?> custom = getConfig(MEMBER_SPEC); + if (custom == null) { + return EntitySpec.create(CassandraDatacenter.class) + .configure(CassandraDatacenter.SEED_SUPPLIER, getSeedSupplier()); + } else if (custom.getConfig().containsKey(CassandraDatacenter.SEED_SUPPLIER) || custom.getFlags().containsKey("seedSupplier")) { + return custom; + } else { + return EntitySpec.create(custom) + .configure(CassandraDatacenter.SEED_SUPPLIER, getSeedSupplier()); + } + } + + @Override + protected Entity createCluster(Location location, Map flags) { + Function<Location, String> dataCenterNamer = getConfig(DATA_CENTER_NAMER); + if (dataCenterNamer != null) { + flags = ImmutableMap.builder() + .putAll(flags) + .put(CassandraNode.DATACENTER_NAME, dataCenterNamer.apply(location)) + .build(); + } + return super.createCluster(location, flags); + } + + /** + * Prefers one node per location, and then others from anywhere. + * Then trims result down to the "quorumSize". + */ + public Supplier<Set<Entity>> getSeedSupplier() { + return defaultSeedSupplier; + } + + @Override + public void start(Collection<? extends Location> locations) { + super.start(locations); + + connectSensors(); + + // TODO wait until all nodes which we think are up are consistent + // i.e. all known nodes use the same schema, as reported by + // SshEffectorTasks.ssh("echo \"describe cluster;\" | /bin/cassandra-cli"); + // once we've done that we can revert to using 2 seed nodes. + // see CassandraCluster.DEFAULT_SEED_QUORUM + Time.sleep(getConfig(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER)); + + update(); + } + + protected void connectSensors() { + connectEnrichers(); + } + + protected void connectEnrichers() { + // TODO Aggregate across sub-clusters + + subscribeToMembers(this, SERVICE_UP, new SensorEventListener<Boolean>() { + @Override public void onEvent(SensorEvent<Boolean> event) { + setAttribute(SERVICE_UP, calculateServiceUp()); + } + }); + } + + @Override + public void stop() { + disconnectSensors(); + + super.stop(); + } + + protected void disconnectSensors() { + } + + protected boolean calculateServiceUp() { + Optional<Entity> upNode = Iterables.tryFind(getMembers(), EntityPredicates.attributeEqualTo(SERVICE_UP, Boolean.TRUE)); + return upNode.isPresent(); + } + + protected Multimap<String, Entity> calculateDatacenterUsage() { + Multimap<String, Entity> result = LinkedHashMultimap.<String, Entity>create(); + for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) { + Multimap<String, Entity> memberUsage = member.getAttribute(CassandraDatacenter.DATACENTER_USAGE); + if (memberUsage != null) result.putAll(memberUsage); + } + return result; + } + + @Override + public void update() { + synchronized (mutex) { + for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) { + member.update(); + } + + calculateServiceUp(); + + // Choose the first available location to set host and port (and compute one-up) + Optional<Entity> upNode = Iterables.tryFind(getMembers(), EntityPredicates.attributeEqualTo(SERVICE_UP, Boolean.TRUE)); + + if (upNode.isPresent()) { + setAttribute(HOSTNAME, upNode.get().getAttribute(Attributes.HOSTNAME)); + setAttribute(THRIFT_PORT, upNode.get().getAttribute(CassandraNode.THRIFT_PORT)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java new file mode 100644 index 0000000..7d0a56d --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.cassandra; + +import java.math.BigInteger; +import java.util.Set; + +import org.apache.brooklyn.catalog.Catalog; +import brooklyn.config.ConfigKey; +import brooklyn.entity.Effector; +import brooklyn.entity.Entity; +import brooklyn.entity.basic.BrooklynConfigKeys; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.SoftwareProcess; +import brooklyn.entity.database.DatastoreMixins; +import brooklyn.entity.java.UsesJavaMXBeans; +import brooklyn.entity.java.UsesJmx; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; +import brooklyn.event.basic.BasicConfigKey; +import brooklyn.event.basic.PortAttributeSensorAndConfigKey; +import brooklyn.event.basic.Sensors; +import brooklyn.location.basic.PortRanges; +import brooklyn.util.flags.SetFromFlag; +import brooklyn.util.time.Duration; + +import com.google.common.reflect.TypeToken; + +/** + * An {@link brooklyn.entity.Entity} that represents a Cassandra node in a {@link CassandraDatacenter}. + */ +@Catalog(name="Apache Cassandra Node", description="Cassandra is a highly scalable, eventually " + + "consistent, distributed, structured key-value store which provides a ColumnFamily-based data model " + + "richer than typical key/value systems", iconUrl="classpath:///cassandra-logo.jpeg") +@ImplementedBy(CassandraNodeImpl.class) +public interface CassandraNode extends DatastoreMixins.DatastoreCommon, SoftwareProcess, UsesJmx, UsesJavaMXBeans, DatastoreMixins.HasDatastoreUrl, DatastoreMixins.CanExecuteScript { + + @SetFromFlag("version") + ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "1.2.16"); + // when this changes remember to put a copy under releng2:/var/www/developer/brooklyn/repository/ ! + // TODO experiment with supporting 2.0.x + + @SetFromFlag("downloadUrl") + BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>( + SoftwareProcess.DOWNLOAD_URL, "${driver.mirrorUrl}/${version}/apache-cassandra-${version}-bin.tar.gz"); + + /** download mirror, if desired */ + @SetFromFlag("mirrorUrl") + ConfigKey<String> MIRROR_URL = new BasicConfigKey<String>(String.class, "cassandra.install.mirror.url", "URL of mirror", + "http://www.mirrorservice.org/sites/ftp.apache.org/cassandra" + // for older versions, but slower: +// "http://archive.apache.org/dist/cassandra/" + ); + + @SetFromFlag("tgzUrl") + ConfigKey<String> TGZ_URL = new BasicConfigKey<String>(String.class, "cassandra.install.tgzUrl", "URL of TGZ download file"); + + @SetFromFlag("clusterName") + BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = CassandraDatacenter.CLUSTER_NAME; + + @SetFromFlag("snitchName") + ConfigKey<String> ENDPOINT_SNITCH_NAME = CassandraDatacenter.ENDPOINT_SNITCH_NAME; + + @SetFromFlag("gossipPort") + PortAttributeSensorAndConfigKey GOSSIP_PORT = new PortAttributeSensorAndConfigKey("cassandra.gossip.port", "Cassandra Gossip communications port", PortRanges.fromString("7000+")); + + @SetFromFlag("sslGgossipPort") + PortAttributeSensorAndConfigKey SSL_GOSSIP_PORT = new PortAttributeSensorAndConfigKey("cassandra.ssl-gossip.port", "Cassandra Gossip SSL communications port", PortRanges.fromString("7001+")); + + @SetFromFlag("thriftPort") + PortAttributeSensorAndConfigKey THRIFT_PORT = new PortAttributeSensorAndConfigKey("cassandra.thrift.port", "Cassandra Thrift RPC port", PortRanges.fromString("9160+")); + + @SetFromFlag("nativePort") + PortAttributeSensorAndConfigKey NATIVE_TRANSPORT_PORT = new PortAttributeSensorAndConfigKey("cassandra.native.port", "Cassandra Native Transport port", PortRanges.fromString("9042+")); + + @SetFromFlag("rmiRegistryPort") + // cassandra nodetool and others want 7199 - not required, but useful + PortAttributeSensorAndConfigKey RMI_REGISTRY_PORT = new PortAttributeSensorAndConfigKey(UsesJmx.RMI_REGISTRY_PORT, + PortRanges.fromInteger(7199)); + + // some of the cassandra tooing (eg nodetool) use RMI, but we want JMXMP, so do both! + ConfigKey<JmxAgentModes> JMX_AGENT_MODE = ConfigKeys.newConfigKeyWithDefault(UsesJmx.JMX_AGENT_MODE, JmxAgentModes.JMXMP_AND_RMI); + + @SetFromFlag("customSnitchJarUrl") + ConfigKey<String> CUSTOM_SNITCH_JAR_URL = ConfigKeys.newStringConfigKey("cassandra.config.customSnitchUrl", + "URL for a jar file to be uploaded (e.g. \"classpath://org/apache/brooklyn/entity/nosql/cassandra/cassandra-multicloud-snitch.jar\"); defaults to null which means nothing to upload", + null); + + @SetFromFlag("cassandraConfigTemplateUrl") + ConfigKey<String> CASSANDRA_CONFIG_TEMPLATE_URL = ConfigKeys.newStringConfigKey( + "cassandra.config.templateUrl", "A URL (in freemarker format) for a cassandra.yaml config file (in freemarker format)", + "classpath://org/apache/brooklyn/entity/nosql/cassandra/cassandra-${entity.majorMinorVersion}.yaml"); + + @SetFromFlag("cassandraConfigFileName") + ConfigKey<String> CASSANDRA_CONFIG_FILE_NAME = ConfigKeys.newStringConfigKey( + "cassandra.config.fileName", "Name for the copied config file", "cassandra.yaml"); + + @SetFromFlag("cassandraRackdcConfigTemplateUrl") + ConfigKey<String> CASSANDRA_RACKDC_CONFIG_TEMPLATE_URL = ConfigKeys.newStringConfigKey( + "cassandra.config.rackdc.templateUrl", "Template file (in freemarker format) for the cassandra-rackdc.properties config file", + "classpath://org/apache/brooklyn/entity/nosql/cassandra/cassandra-rackdc.properties"); + + @SetFromFlag("cassandraRackdcConfigFileName") + ConfigKey<String> CASSANDRA_RACKDC_CONFIG_FILE_NAME = ConfigKeys.newStringConfigKey( + "cassandra.config.rackdc.fileName", "Name for the copied rackdc config file (used for configuring replication, when a suitable snitch is used)", "cassandra-rackdc.properties"); + + @SetFromFlag("datacenterName") + BasicAttributeSensorAndConfigKey<String> DATACENTER_NAME = new BasicAttributeSensorAndConfigKey<String>( + String.class, "cassandra.replication.datacenterName", "Datacenter name (used for configuring replication, when a suitable snitch is used)", + null); + + @SetFromFlag("rackName") + BasicAttributeSensorAndConfigKey<String> RACK_NAME = new BasicAttributeSensorAndConfigKey<String>( + String.class, "cassandra.replication.rackName", "Rack name (used for configuring replication, when a suitable snitch is used)", + null); + + ConfigKey<Integer> NUM_TOKENS_PER_NODE = ConfigKeys.newIntegerConfigKey("cassandra.numTokensPerNode", + "Number of tokens per node; if using vnodes, should set this to a value like 256", + 1); + + /** + * @deprecated since 0.7; use {@link #TOKENS} + */ + @SetFromFlag("token") + @Deprecated + BasicAttributeSensorAndConfigKey<BigInteger> TOKEN = new BasicAttributeSensorAndConfigKey<BigInteger>( + BigInteger.class, "cassandra.token", "Cassandra Token"); + + @SetFromFlag("tokens") + BasicAttributeSensorAndConfigKey<Set<BigInteger>> TOKENS = new BasicAttributeSensorAndConfigKey<Set<BigInteger>>( + new TypeToken<Set<BigInteger>>() {}, "cassandra.tokens", "Cassandra Tokens"); + + AttributeSensor<Integer> PEERS = Sensors.newIntegerSensor( "cassandra.peers", "Number of peers in cluster"); + + AttributeSensor<Integer> LIVE_NODE_COUNT = Sensors.newIntegerSensor( "cassandra.liveNodeCount", "Number of live nodes in cluster"); + + /* Metrics for read/write performance. */ + + AttributeSensor<Long> READ_PENDING = Sensors.newLongSensor("cassandra.read.pending", "Current pending ReadStage tasks"); + AttributeSensor<Integer> READ_ACTIVE = Sensors.newIntegerSensor("cassandra.read.active", "Current active ReadStage tasks"); + AttributeSensor<Long> READ_COMPLETED = Sensors.newLongSensor("cassandra.read.completed", "Total completed ReadStage tasks"); + AttributeSensor<Long> WRITE_PENDING = Sensors.newLongSensor("cassandra.write.pending", "Current pending MutationStage tasks"); + AttributeSensor<Integer> WRITE_ACTIVE = Sensors.newIntegerSensor("cassandra.write.active", "Current active MutationStage tasks"); + AttributeSensor<Long> WRITE_COMPLETED = Sensors.newLongSensor("cassandra.write.completed", "Total completed MutationStage tasks"); + + AttributeSensor<Boolean> SERVICE_UP_JMX = Sensors.newBooleanSensor("cassandra.service.jmx.up", "Whether JMX is up for this service"); + AttributeSensor<Long> THRIFT_PORT_LATENCY = Sensors.newLongSensor("cassandra.thrift.latency", "Latency for thrift port connection (ms) or null if down"); + + AttributeSensor<Double> READS_PER_SECOND_LAST = Sensors.newDoubleSensor("cassandra.reads.perSec.last", "Reads/sec (last datapoint)"); + AttributeSensor<Double> WRITES_PER_SECOND_LAST = Sensors.newDoubleSensor("cassandra.write.perSec.last", "Writes/sec (last datapoint)"); + + AttributeSensor<Double> THRIFT_PORT_LATENCY_IN_WINDOW = Sensors.newDoubleSensor("cassandra.thrift.latency.windowed", "Latency for thrift port (ms, averaged over time window)"); + AttributeSensor<Double> READS_PER_SECOND_IN_WINDOW = Sensors.newDoubleSensor("cassandra.reads.perSec.windowed", "Reads/sec (over time window)"); + AttributeSensor<Double> WRITES_PER_SECOND_IN_WINDOW = Sensors.newDoubleSensor("cassandra.writes.perSec.windowed", "Writes/sec (over time window)"); + + @SuppressWarnings({ "rawtypes", "unchecked" }) + ConfigKey<Set<Entity>> INITIAL_SEEDS = (ConfigKey)ConfigKeys.newConfigKey(Set.class, "cassandra.cluster.seeds.initial", + "List of cluster nodes to seed this node"); + + ConfigKey<Duration> START_TIMEOUT = ConfigKeys.newConfigKeyWithDefault(BrooklynConfigKeys.START_TIMEOUT, Duration.FIVE_MINUTES); + + ConfigKey<String> LISTEN_ADDRESS_SENSOR = ConfigKeys.newStringConfigKey("cassandra.listenAddressSensor", "sensor name from which to take the listen address; default (null) is a smart lookup"); + ConfigKey<String> BROADCAST_ADDRESS_SENSOR = ConfigKeys.newStringConfigKey("cassandra.broadcastAddressSensor", "sensor name from which to take the broadcast address; default (null) is a smart lookup"); + ConfigKey<String> RPC_ADDRESS_SENSOR = ConfigKeys.newStringConfigKey("cassandra.rpcAddressSensor", "sensor name from which to take the RPC address; default (null) is 0.0.0.0"); + + Effector<String> EXECUTE_SCRIPT = CassandraDatacenter.EXECUTE_SCRIPT; + + /* Accessors used from template */ + + String getMajorMinorVersion(); + Integer getGossipPort(); + Integer getSslGossipPort(); + Integer getThriftPort(); + Integer getNativeTransportPort(); + String getClusterName(); + String getListenAddress(); + String getBroadcastAddress(); + String getRpcAddress(); + String getSeeds(); + + String getPrivateIp(); + String getPublicIp(); + + /** + * In range 0 to (2^127)-1; or null if not yet set or known. + * Returns the first token if more than one token. + * @deprecated since 0.7; see {@link #getTokens()} + */ + @Deprecated + BigInteger getToken(); + + int getNumTokensPerNode(); + + Set<BigInteger> getTokens(); + + /** + * string value of token (with no commas, which freemarker introduces!) or blank if none + * @deprecated since 0.7; use {@link #getTokensAsString()} + */ + @Deprecated + String getTokenAsString(); + + /** string value of comma-separated tokens; or blank if none */ + String getTokensAsString(); + + /* For configuration */ + + void setToken(String token); + + /* Using Cassandra */ + + String executeScript(String commands); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeDriver.java new file mode 100644 index 0000000..eab6672 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeDriver.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.cassandra; + +import brooklyn.entity.java.JavaSoftwareProcessDriver; +import brooklyn.util.task.system.ProcessTaskWrapper; + +public interface CassandraNodeDriver extends JavaSoftwareProcessDriver { + + Integer getGossipPort(); + + Integer getSslGossipPort(); + + Integer getThriftPort(); + + Integer getNativeTransportPort(); + + String getClusterName(); + + String getCassandraConfigTemplateUrl(); + + String getCassandraConfigFileName(); + + boolean isClustered(); + + ProcessTaskWrapper<Integer> executeScriptAsync(String commands); + + /** returns the address that the given hostname resolves to at the target */ + String getResolvedAddress(String hostname); + +}
