http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/resources/org/apache/brooklyn/entity/nosql/riak/riak.conf ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/resources/org/apache/brooklyn/entity/nosql/riak/riak.conf b/software/nosql/src/main/resources/org/apache/brooklyn/entity/nosql/riak/riak.conf new file mode 100644 index 0000000..125fa77 --- /dev/null +++ b/software/nosql/src/main/resources/org/apache/brooklyn/entity/nosql/riak/riak.conf @@ -0,0 +1,494 @@ +## Brooklyn note: file from 2.0.1 Ubuntu install, with erlang section added, and ports templated + +## Where to emit the default log messages (typically at 'info' +## severity): +## off: disabled +## file: the file specified by log.console.file +## console: to standard output (seen when using `riak attach-direct`) +## both: log.console.file and standard out. +## +## Default: file +## +## Acceptable values: +## - one of: off, file, console, both +log.console = file + +## The severity level of the console log, default is 'info'. +## +## Default: info +## +## Acceptable values: +## - one of: debug, info, notice, warning, error, critical, alert, emergency, none +log.console.level = info + +## When 'log.console' is set to 'file' or 'both', the file where +## console messages will be logged. +## +## Default: $(platform_log_dir)/console.log +## +## Acceptable values: +## - the path to a file +log.console.file = $(platform_log_dir)/console.log + +## The file where error messages will be logged. +## +## Default: $(platform_log_dir)/error.log +## +## Acceptable values: +## - the path to a file +log.error.file = $(platform_log_dir)/error.log + +## When set to 'on', enables log output to syslog. +## +## Default: off +## +## Acceptable values: +## - on or off +log.syslog = off + +## Whether to enable the crash log. +## +## Default: on +## +## Acceptable values: +## - on or off +log.crash = on + +## If the crash log is enabled, the file where its messages will +## be written. +## +## Default: $(platform_log_dir)/crash.log +## +## Acceptable values: +## - the path to a file +log.crash.file = $(platform_log_dir)/crash.log + +## Maximum size in bytes of individual messages in the crash log +## +## Default: 64KB +## +## Acceptable values: +## - a byte size with units, e.g. 10GB +log.crash.maximum_message_size = 64KB + +## Maximum size of the crash log in bytes, before it is rotated +## +## Default: 10MB +## +## Acceptable values: +## - a byte size with units, e.g. 10GB +log.crash.size = 10MB + +## The schedule on which to rotate the crash log. For more +## information see: +## https://github.com/basho/lager/blob/master/README.md#internal-log-rotation +## +## Default: $D0 +## +## Acceptable values: +## - text +log.crash.rotation = $D0 + +## The number of rotated crash logs to keep. When set to +## 'current', only the current open log file is kept. +## +## Default: 5 +## +## Acceptable values: +## - an integer +## - the text "current" +log.crash.rotation.keep = 5 + +## Name of the Erlang node +## +## Default: [email protected] +## +## Acceptable values: +## - text +nodename = riak@${driver.subnetHostname} + +## Cookie for distributed node communication. All nodes in the +## same cluster should use the same cookie or they will not be able to +## communicate. +## +## Default: riak +## +## Acceptable values: +## - text +distributed_cookie = riak + +## Sets the number of threads in async thread pool, valid range +## is 0-1024. If thread support is available, the default is 64. +## More information at: http://erlang.org/doc/man/erl.html +## +## Default: 64 +## +## Acceptable values: +## - an integer +erlang.async_threads = 64 + +## The number of concurrent ports/sockets +## Valid range is 1024-134217727 +## +## Default: 65536 +## +## Acceptable values: +## - an integer +erlang.max_ports = 65536 + +## Set scheduler forced wakeup interval. All run queues will be +## scanned each Interval milliseconds. While there are sleeping +## schedulers in the system, one scheduler will be woken for each +## non-empty run queue found. An Interval of zero disables this +## feature, which also is the default. +## This feature is a workaround for lengthy executing native code, and +## native code that do not bump reductions properly. +## More information: http://www.erlang.org/doc/man/erl.html#+sfwi +## +## Acceptable values: +## - an integer +## erlang.schedulers.force_wakeup_interval = 500 + +## Enable or disable scheduler compaction of load. By default +## scheduler compaction of load is enabled. When enabled, load +## balancing will strive for a load distribution which causes as many +## scheduler threads as possible to be fully loaded (i.e., not run out +## of work). This is accomplished by migrating load (e.g. runnable +## processes) into a smaller set of schedulers when schedulers +## frequently run out of work. When disabled, the frequency with which +## schedulers run out of work will not be taken into account by the +## load balancing logic. +## More information: http://www.erlang.org/doc/man/erl.html#+scl +## +## Acceptable values: +## - one of: true, false +## erlang.schedulers.compaction_of_load = false + +## Enable or disable scheduler utilization balancing of load. By +## default scheduler utilization balancing is disabled and instead +## scheduler compaction of load is enabled which will strive for a +## load distribution which causes as many scheduler threads as +## possible to be fully loaded (i.e., not run out of work). When +## scheduler utilization balancing is enabled the system will instead +## try to balance scheduler utilization between schedulers. That is, +## strive for equal scheduler utilization on all schedulers. +## More information: http://www.erlang.org/doc/man/erl.html#+sub +## +## Acceptable values: +## - one of: true, false +## erlang.schedulers.utilization_balancing = true + +## Number of partitions in the cluster (only valid when first +## creating the cluster). Must be a power of 2, minimum 8 and maximum +## 1024. +## +## Default: 64 +## +## Acceptable values: +## - an integer +## ring_size = 64 + +## Number of concurrent node-to-node transfers allowed. +## +## Default: 2 +## +## Acceptable values: +## - an integer +## transfer_limit = 2 + +## Default cert location for https can be overridden +## with the ssl config variable, for example: +## +## Acceptable values: +## - the path to a file +## ssl.certfile = $(platform_etc_dir)/cert.pem + +## Default key location for https can be overridden with the ssl +## config variable, for example: +## +## Acceptable values: +## - the path to a file +## ssl.keyfile = $(platform_etc_dir)/key.pem + +## Default signing authority location for https can be overridden +## with the ssl config variable, for example: +## +## Acceptable values: +## - the path to a file +## ssl.cacertfile = $(platform_etc_dir)/cacertfile.pem + +## DTrace support Do not enable 'dtrace' unless your Erlang/OTP +## runtime is compiled to support DTrace. DTrace is available in +## R15B01 (supported by the Erlang/OTP official source package) and in +## R14B04 via a custom source repository & branch. +## +## Default: off +## +## Acceptable values: +## - on or off +dtrace = off + +## Platform-specific installation paths (substituted by rebar) +## +## Default: /usr/sbin +## +## Acceptable values: +## - the path to a directory +platform_bin_dir = /usr/sbin + +## +## Default: /var/lib/riak +## +## Acceptable values: +## - the path to a directory +platform_data_dir = /var/lib/riak + +## +## Default: /etc/riak +## +## Acceptable values: +## - the path to a directory +platform_etc_dir = /etc/riak + +## +## Default: /usr/lib64/riak/lib +## +## Acceptable values: +## - the path to a directory +platform_lib_dir = /usr/lib64/riak/lib + +## +## Default: /var/log/riak +## +## Acceptable values: +## - the path to a directory +platform_log_dir = /var/log/riak + +## Enable consensus subsystem. Set to 'on' to enable the +## consensus subsystem used for strongly consistent Riak operations. +## +## Default: off +## +## Acceptable values: +## - on or off +## strong_consistency = on + +## listener.http.<name> is an IP address and TCP port that the Riak +## HTTP interface will bind. +## +## Default: 127.0.0.1:8098 +## +## Acceptable values: +## - an IP/port pair, e.g. 127.0.0.1:10011 +listener.http.internal = 0.0.0.0:${entity.riakWebPort?c} + +## listener.protobuf.<name> is an IP address and TCP port that the Riak +## Protocol Buffers interface will bind. +## +## Default: 127.0.0.1:8087 +## +## Acceptable values: +## - an IP/port pair, e.g. 127.0.0.1:10011 +listener.protobuf.internal = 0.0.0.0:${entity.riakPbPort?c} + +## The maximum length to which the queue of pending connections +## may grow. If set, it must be an integer > 0. If you anticipate a +## huge number of connections being initialized *simultaneously*, set +## this number higher. +## +## Default: 128 +## +## Acceptable values: +## - an integer +## protobuf.backlog = 128 + +## listener.https.<name> is an IP address and TCP port that the Riak +## HTTPS interface will bind. +## +## Acceptable values: +## - an IP/port pair, e.g. 127.0.0.1:10011 +## listener.https.internal = 127.0.0.1:8098 + +## How Riak will repair out-of-sync keys. Some features require +## this to be set to 'active', including search. +## * active: out-of-sync keys will be repaired in the background +## * passive: out-of-sync keys are only repaired on read +## * active-debug: like active, but outputs verbose debugging +## information +## +## Default: active +## +## Acceptable values: +## - one of: active, passive, active-debug +anti_entropy = active + +## Specifies the storage engine used for Riak's key-value data +## and secondary indexes (if supported). +## +## Default: bitcask +## +## Acceptable values: +## - one of: bitcask, leveldb, memory, multi +storage_backend = bitcask + +## Controls which binary representation of a riak value is stored +## on disk. +## * 0: Original erlang:term_to_binary format. Higher space overhead. +## * 1: New format for more compact storage of small values. +## +## Default: 1 +## +## Acceptable values: +## - the integer 1 +## - the integer 0 +object.format = 1 + +## Reading or writing objects bigger than this size will write a +## warning in the logs. +## +## Default: 5MB +## +## Acceptable values: +## - a byte size with units, e.g. 10GB +object.size.warning_threshold = 5MB + +## Writing an object bigger than this will send a failure to the +## client. +## +## Default: 50MB +## +## Acceptable values: +## - a byte size with units, e.g. 10GB +object.size.maximum = 50MB + +## Writing an object with more than this number of siblings will +## generate a warning in the logs. +## +## Default: 25 +## +## Acceptable values: +## - an integer +object.siblings.warning_threshold = 25 + +## Writing an object with more than this number of siblings will +## send a failure to the client. +## +## Default: 100 +## +## Acceptable values: +## - an integer +object.siblings.maximum = 100 + +## A path under which bitcask data files will be stored. +## +## Default: $(platform_data_dir)/bitcask +## +## Acceptable values: +## - the path to a directory +bitcask.data_root = $(platform_data_dir)/bitcask + +## Configure how Bitcask writes data to disk. +## erlang: Erlang's built-in file API +## nif: Direct calls to the POSIX C API +## The NIF mode provides higher throughput for certain +## workloads, but has the potential to negatively impact +## the Erlang VM, leading to higher worst-case latencies +## and possible throughput collapse. +## +## Default: erlang +## +## Acceptable values: +## - one of: erlang, nif +bitcask.io_mode = erlang + +## Set to 'off' to disable the admin panel. +## +## Default: off +## +## Acceptable values: +## - on or off +riak_control = on + +## Authentication mode used for access to the admin panel. +## +## Default: off +## +## Acceptable values: +## - one of: off, userlist +riak_control.auth.mode = off + +## If riak control's authentication mode (riak_control.auth.mode) +## is set to 'userlist' then this is the list of usernames and +## passwords for access to the admin panel. +## To create users with given names, add entries of the format: +## riak_control.auth.user.USERNAME.password = PASSWORD +## replacing USERNAME with the desired username and PASSWORD with the +## desired password for that user. +## +## Acceptable values: +## - text +## riak_control.auth.user.admin.password = pass + +## This parameter defines the percentage of total server memory +## to assign to LevelDB. LevelDB will dynamically adjust its internal +## cache sizes to stay within this size. The memory size can +## alternately be assigned as a byte count via leveldb.maximum_memory +## instead. +## +## Default: 70 +## +## Acceptable values: +## - an integer +leveldb.maximum_memory.percent = 70 + +## To enable Search set this 'on'. +## +## Default: off +## +## Acceptable values: +## - on or off +search = ${entity.isSearchEnabled()?string('on','off')} + +## How long Riak will wait for Solr to start. The start sequence +## will be tried twice. If both attempts timeout, then the Riak node +## will be shutdown. This may need to be increased as more data is +## indexed and Solr takes longer to start. Values lower than 1s will +## be rounded up to the minimum 1s. +## +## Default: 30s +## +## Acceptable values: +## - a time duration with units, e.g. '10s' for 10 seconds +search.solr.start_timeout = 30s + +## The port number which Solr binds to. +## NOTE: Binds on every interface. +## +## Default: 8093 +## +## Acceptable values: +## - an integer +search.solr.port = ${entity.searchSolrPort?c} + +## The port number which Solr JMX binds to. +## NOTE: Binds on every interface. +## +## Default: 8985 +## +## Acceptable values: +## - an integer +search.solr.jmx_port = ${entity.searchSolrJmxPort?c} + +## The options to pass to the Solr JVM. Non-standard options, +## i.e. -XX, may not be portable across JVM implementations. +## E.g. -XX:+UseCompressedStrings +## +## Default: -d64 -Xms1g -Xmx1g -XX:+UseStringCache -XX:+UseCompressedOops +## +## Acceptable values: +## - text +search.solr.jvm_options = -d64 -Xms1g -Xmx1g -XX:+UseStringCache -XX:+UseCompressedOops + +## erlang, constrain port range so we can open the internal firewall ports +erlang.distribution.port_range.minimum = ${entity.erlangPortRangeStart?c} +erlang.distribution.port_range.maximum = ${entity.erlangPortRangeEnd?c}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/resources/org/apache/brooklyn/entity/nosql/riak/riak.md ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/resources/org/apache/brooklyn/entity/nosql/riak/riak.md b/software/nosql/src/main/resources/org/apache/brooklyn/entity/nosql/riak/riak.md new file mode 100644 index 0000000..1523b5f --- /dev/null +++ b/software/nosql/src/main/resources/org/apache/brooklyn/entity/nosql/riak/riak.md @@ -0,0 +1,67 @@ +# Riak Examples + +Here is a selection of examples showing how to deploy Riak. + + +### A Single-Node Deployment + +``` +location: YOUR_CLOUD +services: +- type: org.apache.brooklyn.entity.nosql.riak.RiakNode +``` + + +### A Single-Node Deployment + +``` +location: YOUR_CLOUD +services: +- type: org.apache.brooklyn.entity.nosql.riak.RiakNode +``` + + +### A Cluster + +``` +services: +- type: org.apache.brooklyn.entity.nosql.riak.RiakCluster + location: YOUR_CLOUD + initialSize: 5 +``` + + +### A Cluster at a Specific Version with a Web App + +``` +services: +- type: org.apache.brooklyn.entity.nosql.riak.RiakCluster + id: cluster + brooklyn.config: + initialSize: 2 + install.version: 2.0.0 +- type: brooklyn.entity.webapp.ControlledDynamicWebAppCluster + brooklyn.config: + initialSize: 2 + wars.root: https://s3-eu-west-1.amazonaws.com/brooklyn-clocker/brooklyn-example-hello-world-sql-webapp.war + java.sysprops: + brooklyn.example.riak.nodes: $brooklyn:component("cluster").attributeWhenReady("riak.cluster.nodeList") +``` + +---- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/resources/org/apache/brooklyn/entity/nosql/riak/riak.png ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/resources/org/apache/brooklyn/entity/nosql/riak/riak.png b/software/nosql/src/main/resources/org/apache/brooklyn/entity/nosql/riak/riak.png new file mode 100644 index 0000000..a230b04 Binary files /dev/null and b/software/nosql/src/main/resources/org/apache/brooklyn/entity/nosql/riak/riak.png differ http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/resources/org/apache/brooklyn/entity/nosql/riak/vm.args ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/resources/org/apache/brooklyn/entity/nosql/riak/vm.args b/software/nosql/src/main/resources/org/apache/brooklyn/entity/nosql/riak/vm.args new file mode 100644 index 0000000..be58d78 --- /dev/null +++ b/software/nosql/src/main/resources/org/apache/brooklyn/entity/nosql/riak/vm.args @@ -0,0 +1,64 @@ +##### Brooklyn note: File from OSX distribution of Riak 1.4.8 + +## Name of the riak node +-name riak@${driver.subnetHostname} + +## Cookie for distributed erlang. All nodes in the same cluster +## should use the same cookie or they will not be able to communicate. +-setcookie riak + +## Heartbeat management; auto-restarts VM if it dies or becomes unresponsive +## (Disabled by default..use with caution!) +##-heart + +## Enable kernel poll and a few async threads ++K true ++A 64 + +## Treat error_logger warnings as warnings ++W w + +## Increase number of concurrent ports/sockets +-env ERL_MAX_PORTS 64000 + +## Tweak GC to run more often +-env ERL_FULLSWEEP_AFTER 0 + +## Set the location of crash dumps +-env ERL_CRASH_DUMP ./log/erl_crash.dump + +## Raise the ETS table limit +-env ERL_MAX_ETS_TABLES 256000 + +## Force the erlang VM to use SMP +-smp enable + +## For nodes with many busy_dist_port events, Basho recommends +## raising the sender-side network distribution buffer size. +## 32MB may not be sufficient for some workloads and is a suggested +## starting point. +## The Erlang/OTP default is 1024 (1 megabyte). +## See: http://www.erlang.org/doc/man/erl.html#%2bzdbbl +##+zdbbl 32768 + +## Raise the default erlang process limit ++P 256000 + +## Erlang VM scheduler tuning. +## Prerequisite: a patched VM from Basho, or a VM compiled separately +## with this patch applied: +## https://gist.github.com/evanmcc/a599f4c6374338ed672e +##+sfwi 500 + +## Begin SSL distribution items, DO NOT DELETE OR EDIT THIS COMMENT + +## To enable SSL encryption of the Erlang intra-cluster communication, +## un-comment the three lines below and make certain that the paths +## point to correct PEM data files. See docs TODO for details. + +## -proto_dist inet_ssl +## -ssl_dist_opt client_certfile "${driver.riakEtcDir}/erlclient.pem" +## -ssl_dist_opt server_certfile "${driver.riakEtcDir}/erlserver.pem" + +## End SSL distribution items, DO NOT DELETE OR EDIT THIS COMMENT + http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/resources/org/apache/brooklyn/entity/nosql/solr/solr.xml ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/resources/org/apache/brooklyn/entity/nosql/solr/solr.xml b/software/nosql/src/main/resources/org/apache/brooklyn/entity/nosql/solr/solr.xml new file mode 100644 index 0000000..6e12b5c --- /dev/null +++ b/software/nosql/src/main/resources/org/apache/brooklyn/entity/nosql/solr/solr.xml @@ -0,0 +1,19 @@ +[#ftl] +<?xml version="1.0" encoding="UTF-8"?> +<solr> + <int name="coreLoadThreads">4</int> + + <solrcloud> + <str name="host">${driver.hostname}</str> + <int name="hostPort">${entity.solrPort?c}</int> + <str name="hostContext">solr</str> + <int name="zkClientTimeout">15000</int> + <bool name="genericCoreNodeNames">true</bool> + </solrcloud> + + <shardHandlerFactory name="shardHandlerFactory" + class="HttpShardHandlerFactory"> + <int name="socketTimeout">0</int> + <int name="connTimeout">0</int> + </shardHandlerFactory> +</solr> http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/AbstractCassandraNodeTest.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/AbstractCassandraNodeTest.java b/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/AbstractCassandraNodeTest.java deleted file mode 100644 index 4cc01c1..0000000 --- a/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/AbstractCassandraNodeTest.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.cassandra; - -import org.testng.annotations.BeforeMethod; - -import brooklyn.entity.BrooklynAppLiveTestSupport; -import brooklyn.location.Location; - -/** - * Cassandra test framework for integration and live tests. - */ -public class AbstractCassandraNodeTest extends BrooklynAppLiveTestSupport { - - protected Location testLocation; - protected CassandraNode cassandra; - - @BeforeMethod(alwaysRun = true) - @Override - public void setUp() throws Exception { - super.setUp(); - testLocation = app.newLocalhostProvisioningLocation(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/AstyanaxSupport.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/AstyanaxSupport.java b/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/AstyanaxSupport.java deleted file mode 100644 index 5395705..0000000 --- a/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/AstyanaxSupport.java +++ /dev/null @@ -1,330 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.cassandra; - -import static com.google.common.base.Preconditions.checkNotNull; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertTrue; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; - -import brooklyn.entity.basic.Attributes; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.text.Identifiers; -import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.netflix.astyanax.AstyanaxContext; -import com.netflix.astyanax.Cluster; -import com.netflix.astyanax.Keyspace; -import com.netflix.astyanax.MutationBatch; -import com.netflix.astyanax.connectionpool.NodeDiscoveryType; -import com.netflix.astyanax.connectionpool.OperationResult; -import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; -import com.netflix.astyanax.connectionpool.exceptions.SchemaDisagreementException; -import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl; -import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor; -import com.netflix.astyanax.impl.AstyanaxConfigurationImpl; -import com.netflix.astyanax.model.Column; -import com.netflix.astyanax.model.ColumnFamily; -import com.netflix.astyanax.model.ColumnList; -import com.netflix.astyanax.serializers.StringSerializer; -import com.netflix.astyanax.thrift.ThriftFamilyFactory; - -/** - * Cassandra testing using Astyanax API. - */ -public class AstyanaxSupport { - private static final Logger log = LoggerFactory.getLogger(AstyanaxSupport.class); - - public final String clusterName; - public final String hostname; - public final int thriftPort; - - public AstyanaxSupport(CassandraNode node) { - this(node.getClusterName(), node.getAttribute(Attributes.HOSTNAME), node.getThriftPort()); - } - - public AstyanaxSupport(String clusterName, String hostname, int thriftPort) { - this.clusterName = clusterName; - this.hostname = hostname; - this.thriftPort = thriftPort; - } - - public AstyanaxContext<Keyspace> newAstyanaxContextForKeyspace(String keyspace) { - AstyanaxContext<Keyspace> context = new AstyanaxContext.Builder() - .forCluster(clusterName) - .forKeyspace(keyspace) - .withAstyanaxConfiguration(new AstyanaxConfigurationImpl() - .setDiscoveryType(NodeDiscoveryType.NONE)) - .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("BrooklynPool") - .setPort(thriftPort) - .setMaxConnsPerHost(1) - .setConnectTimeout(5000) // 10s - .setSeeds(String.format("%s:%d", hostname, thriftPort))) - .withConnectionPoolMonitor(new CountingConnectionPoolMonitor()) - .buildKeyspace(ThriftFamilyFactory.getInstance()); - - context.start(); - return context; - } - - public AstyanaxContext<Cluster> newAstyanaxContextForCluster() { - AstyanaxContext<Cluster> context = new AstyanaxContext.Builder() - .forCluster(clusterName) - .withAstyanaxConfiguration(new AstyanaxConfigurationImpl() - .setDiscoveryType(NodeDiscoveryType.NONE)) - .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("BrooklynPool") - .setPort(thriftPort) - .setMaxConnsPerHost(1) - .setConnectTimeout(5000) // 10s - .setSeeds(String.format("%s:%d", hostname, thriftPort))) - .withConnectionPoolMonitor(new CountingConnectionPoolMonitor()) - .buildCluster(ThriftFamilyFactory.getInstance()); - - context.start(); - return context; - } - - public static class AstyanaxSample extends AstyanaxSupport { - - public static class Builder { - protected CassandraNode node; - protected String clusterName; - protected String hostname; - protected Integer thriftPort; - protected String columnFamilyName = Identifiers.makeRandomId(8); - - public Builder node(CassandraNode val) { - this.node = val; - clusterName = node.getClusterName(); - hostname = node.getAttribute(Attributes.HOSTNAME); - thriftPort = node.getThriftPort(); - return this; - } - public Builder host(String clusterName, String hostname, int thriftPort) { - this.clusterName = clusterName; - this.hostname = hostname; - this.thriftPort = thriftPort; - return this; - } - public Builder columnFamilyName(String val) { - this.columnFamilyName = val; - return this; - } - public AstyanaxSample build() { - return new AstyanaxSample(this); - } - } - - public static Builder builder() { - return new Builder(); - } - - public final String columnFamilyName; - public final ColumnFamily<String, String> sampleColumnFamily; - - public AstyanaxSample(CassandraNode node) { - this(builder().node(node)); - } - - public AstyanaxSample(String clusterName, String hostname, int thriftPort) { - this(builder().host(clusterName, hostname, thriftPort)); - } - - protected AstyanaxSample(Builder builder) { - super(builder.clusterName, builder.hostname, builder.thriftPort); - columnFamilyName = checkNotNull(builder.columnFamilyName, "columnFamilyName"); - sampleColumnFamily = new ColumnFamily<String, String>( - columnFamilyName, // Column Family Name - StringSerializer.get(), // Key Serializer - StringSerializer.get()); // Column Serializer - } - - /** - * Exercise the {@link CassandraNode} using the Astyanax API. - */ - public void astyanaxTest() throws Exception { - String keyspaceName = "BrooklynTests_"+Identifiers.makeRandomId(8); - writeData(keyspaceName); - readData(keyspaceName); - } - - /** - * Write to a {@link CassandraNode} using the Astyanax API. - * @throws ConnectionException - */ - public void writeData(String keyspaceName) throws ConnectionException { - // Create context - AstyanaxContext<Keyspace> context = newAstyanaxContextForKeyspace(keyspaceName); - try { - Keyspace keyspace = context.getEntity(); - try { - checkNull(keyspace.describeKeyspace().getColumnFamily(columnFamilyName), "key space for column family "+columnFamilyName); - } catch (Exception ek) { - // (Re) Create keyspace if needed (including if family name already existed, - // e.g. due to a timeout on previous attempt) - log.debug("repairing Cassandra error by re-creating keyspace "+keyspace+": "+ek); - try { - log.debug("dropping Cassandra keyspace "+keyspace); - keyspace.dropKeyspace(); - } catch (Exception e) { - /* Ignore */ - log.debug("Cassandra keyspace "+keyspace+" could not be dropped (probably did not exist): "+e); - } - try { - keyspace.createKeyspace(ImmutableMap.<String, Object>builder() - .put("strategy_options", ImmutableMap.<String, Object>of("replication_factor", "1")) - .put("strategy_class", "SimpleStrategy") - .build()); - } catch (SchemaDisagreementException e) { - // discussion (but not terribly helpful) at http://stackoverflow.com/questions/6770894/schemadisagreementexception - // let's just try again after a delay - // (seems to have no effect; trying to fix by starting first node before others) - log.warn("error creating Cassandra keyspace "+keyspace+" (retrying): "+e); - Time.sleep(Duration.FIVE_SECONDS); - keyspace.createKeyspace(ImmutableMap.<String, Object>builder() - .put("strategy_options", ImmutableMap.<String, Object>of("replication_factor", "1")) - .put("strategy_class", "SimpleStrategy") - .build()); - } - } - - assertNull(keyspace.describeKeyspace().getColumnFamily("Rabbits"), "key space for arbitrary column family Rabbits"); - assertNull(keyspace.describeKeyspace().getColumnFamily(columnFamilyName), "key space for column family "+columnFamilyName); - - // Create column family - keyspace.createColumnFamily(sampleColumnFamily, null); - - // Insert rows - MutationBatch m = keyspace.prepareMutationBatch(); - m.withRow(sampleColumnFamily, "one") - .putColumn("name", "Alice", null) - .putColumn("company", "Cloudsoft Corp", null); - m.withRow(sampleColumnFamily, "two") - .putColumn("name", "Bob", null) - .putColumn("company", "Cloudsoft Corp", null) - .putColumn("pet", "Cat", null); - - OperationResult<Void> insert = m.execute(); - assertEquals(insert.getHost().getHostName(), hostname); - assertTrue(insert.getLatency() > 0L); - } finally { - context.shutdown(); - } - } - - /** - * Read from a {@link CassandraNode} using the Astyanax API. - * @throws ConnectionException - */ - public void readData(String keyspaceName) throws ConnectionException { - // Create context - AstyanaxContext<Keyspace> context = newAstyanaxContextForKeyspace(keyspaceName); - try { - Keyspace keyspace = context.getEntity(); - - // Query data - OperationResult<ColumnList<String>> query = keyspace.prepareQuery(sampleColumnFamily) - .getKey("one") - .execute(); - assertEquals(query.getHost().getHostName(), hostname); - assertTrue(query.getLatency() > 0L); - - ColumnList<String> columns = query.getResult(); - assertEquals(columns.size(), 2); - - // Lookup columns in response by name - String name = columns.getColumnByName("name").getStringValue(); - assertEquals(name, "Alice"); - - // Iterate through the columns - for (Column<String> c : columns) { - assertTrue(ImmutableList.of("name", "company").contains(c.getName())); - } - } finally { - context.shutdown(); - } - } - - - /** - * Returns the keyspace name to which the data has been written. If it fails the first time, - * then will increment the keyspace name. This is because the failure could be a response timeout, - * where the keyspace really has been created so subsequent attempts with the same name will - * fail (because we assert that the keyspace did not exist). - */ - public String writeData(String keyspacePrefix, int numRetries) throws ConnectionException { - int retryCount = 0; - while (true) { - try { - String keyspaceName = keyspacePrefix + (retryCount > 0 ? "" : "_"+retryCount); - writeData(keyspaceName); - return keyspaceName; - } catch (Exception e) { - log.warn("Error writing data - attempt "+(retryCount+1)+" of "+(numRetries+1)+": "+e, e); - if (++retryCount > numRetries) - throw Exceptions.propagate(e); - } - } - } - - /** - * Repeatedly tries to read data from the given keyspace name. Asserts that the data is the - * same as would be written by calling {@code writeData(keyspaceName)}. - */ - public void readData(String keyspaceName, int numRetries) throws ConnectionException { - int retryCount = 0; - while (true) { - try { - readData(keyspaceName); - return; - } catch (Exception e) { - log.warn("Error reading data - attempt "+(retryCount+1)+" of "+(numRetries+1)+": "+e, e); - if (++retryCount > numRetries) - throw Exceptions.propagate(e); - } - } - } - - /** - * Like {@link Assert#assertNull(Object, String)}, except throws IllegalStateException instead - */ - private void checkNull(Object obj, String msg) { - if (obj != null) { - throw new IllegalStateException("Not null: "+msg+"; obj="+obj); - } - } - } - - public static void main(String[] args) throws Exception { - AstyanaxSample support = new AstyanaxSample("ignored", "ec2-79-125-32-2.eu-west-1.compute.amazonaws.com", 9160); - AstyanaxContext<Cluster> context = support.newAstyanaxContextForCluster(); - try { - System.out.println(context.getEntity().describeSchemaVersions()); - } finally { - context.shutdown(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/CassandraDatacenterIntegrationTest.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/CassandraDatacenterIntegrationTest.java b/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/CassandraDatacenterIntegrationTest.java deleted file mode 100644 index b74d5cd..0000000 --- a/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/CassandraDatacenterIntegrationTest.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.cassandra; - -import static com.google.common.base.Preconditions.checkNotNull; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; - -import java.math.BigInteger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import brooklyn.entity.BrooklynAppLiveTestSupport; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.nosql.cassandra.TokenGenerators.PosNeg63TokenGenerator; -import brooklyn.entity.proxying.EntitySpec; -import brooklyn.entity.trait.Startable; -import brooklyn.location.Location; -import brooklyn.test.Asserts; -import brooklyn.test.EntityTestUtils; -import brooklyn.util.collections.MutableMap; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; - -/** - * An integration test of the {@link CassandraDatacenter} entity. - * - * Tests that a one node cluster can be started on localhost and data can be written/read, using the Astyanax API. - * - * NOTE: If these tests fail with "Timeout waiting for SERVICE_UP" and "java.lang.IllegalStateException: Unable to contact any seeds!" - * or "java.lang.RuntimeException: Unable to gossip with any seeds" appears in the log, it may be that the broadcast_address - * (set to InetAddress.getLocalHost().getHostName()) is not resolving to the value specified in listen_address - * (InetAddress.getLocalHost().getHostAddress()). You can work round this issue by ensuring that you machine has only one - * address, e.g. by disabling wireless if you are also using a wired connection - */ -public class CassandraDatacenterIntegrationTest extends BrooklynAppLiveTestSupport { - - private static final Logger log = LoggerFactory.getLogger(CassandraDatacenterIntegrationTest.class); - - protected Location testLocation; - protected CassandraDatacenter cluster; - - @BeforeMethod(alwaysRun = true) - @Override - public void setUp() throws Exception { - CassandraNodeIntegrationTest.assertCassandraPortsAvailableEventually(); - super.setUp(); - testLocation = app.newLocalhostProvisioningLocation(); - } - - @AfterMethod(alwaysRun=true) - @Override - public void tearDown() throws Exception { - super.tearDown(); - CassandraNodeIntegrationTest.assertCassandraPortsAvailableEventually(); - } - - - @Test(groups = "Integration") - public void testStartAndShutdownClusterSizeOne() throws Exception { - EntitySpec<CassandraDatacenter> spec = EntitySpec.create(CassandraDatacenter.class) - .configure("initialSize", 1) - .configure("tokenShift", 42); - runStartAndShutdownClusterSizeOne(spec, true); - } - - /** - * Cassandra v2 needs Java >= 1.7. If you have java 6 as the defult locally, then you can use - * something like {@code .configure("shell.env", MutableMap.of("JAVA_HOME", "/Library/Java/JavaVirtualMachines/jdk1.7.0_51.jdk/Contents/Home"))} - */ - @Test(groups = "Integration") - public void testStartAndShutdownClusterSizeOneCassandraVersion2() throws Exception { - String version = "2.0.9"; - - EntitySpec<CassandraDatacenter> spec = EntitySpec.create(CassandraDatacenter.class) - .configure(CassandraNode.SUGGESTED_VERSION, version) - .configure("initialSize", 1); - runStartAndShutdownClusterSizeOne(spec, false); - } - - /** - * Test that a single node cluster starts up and allows access via the Astyanax API. - * Only one node because Cassandra can only run one node per VM! - */ - protected void runStartAndShutdownClusterSizeOne(EntitySpec<CassandraDatacenter> datacenterSpec, final boolean assertToken) throws Exception { - cluster = app.createAndManageChild(datacenterSpec); - assertEquals(cluster.getCurrentSize().intValue(), 0); - - app.start(ImmutableList.of(testLocation)); - Entities.dumpInfo(app); - - final CassandraNode node = (CassandraNode) Iterables.get(cluster.getMembers(), 0); - String nodeAddr = checkNotNull(node.getAttribute(CassandraNode.HOSTNAME), "hostname") + ":" + checkNotNull(node.getAttribute(CassandraNode.THRIFT_PORT), "thriftPort"); - - EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.GROUP_SIZE, 1); - EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.CASSANDRA_CLUSTER_NODES, ImmutableList.of(nodeAddr)); - - EntityTestUtils.assertAttributeEqualsEventually(node, Startable.SERVICE_UP, true); - if (assertToken) { - PosNeg63TokenGenerator tg = new PosNeg63TokenGenerator(); - tg.growingCluster(1); - EntityTestUtils.assertAttributeEqualsEventually(node, CassandraNode.TOKEN, tg.newToken().add(BigInteger.valueOf(42))); - } - - // may take some time to be consistent (with new thrift_latency checks on the node, - // contactability should not be an issue, but consistency still might be) - Asserts.succeedsEventually(MutableMap.of("timeout", 120*1000), new Runnable() { - public void run() { - boolean open = CassandraDatacenterLiveTest.isSocketOpen(node); - Boolean consistant = open ? CassandraDatacenterLiveTest.areVersionsConsistent(node) : null; - Integer numPeers = node.getAttribute(CassandraNode.PEERS); - Integer liveNodeCount = node.getAttribute(CassandraNode.LIVE_NODE_COUNT); - String msg = "consistency: " - + (!open ? "unreachable" : consistant==null ? "error" : consistant)+"; " - + "peer group sizes: "+numPeers + "; live node count: " + liveNodeCount; - assertTrue(open, msg); - assertEquals(consistant, Boolean.TRUE, msg); - if (assertToken) { - assertEquals(numPeers, (Integer)1, msg); - } else { - assertTrue(numPeers != null && numPeers >= 1, msg); - } - assertEquals(liveNodeCount, (Integer)1, msg); - }}); - - CassandraDatacenterLiveTest.checkConnectionRepeatedly(2, 5, ImmutableList.of(node)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/CassandraDatacenterLiveTest.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/CassandraDatacenterLiveTest.java b/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/CassandraDatacenterLiveTest.java deleted file mode 100644 index 809a738..0000000 --- a/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/CassandraDatacenterLiveTest.java +++ /dev/null @@ -1,308 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.cassandra; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; - -import java.math.BigInteger; -import java.net.Socket; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import brooklyn.entity.BrooklynAppLiveTestSupport; -import brooklyn.entity.Entity; -import brooklyn.entity.basic.Attributes; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.nosql.cassandra.AstyanaxSupport.AstyanaxSample; -import brooklyn.entity.proxying.EntitySpec; -import brooklyn.entity.trait.Startable; -import brooklyn.location.Location; -import brooklyn.test.Asserts; -import brooklyn.test.EntityTestUtils; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.text.Identifiers; -import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.netflix.astyanax.AstyanaxContext; -import com.netflix.astyanax.Cluster; -import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; - -/** - * A live test of the {@link CassandraDatacenter} entity. - * - * Tests that a two node cluster can be started on Amazon EC2 and data written on one {@link CassandraNode} - * can be read from another, using the Astyanax API. - */ -public class CassandraDatacenterLiveTest extends BrooklynAppLiveTestSupport { - - private static final Logger log = LoggerFactory.getLogger(CassandraDatacenterLiveTest.class); - - private String provider = - "aws-ec2:eu-west-1"; -// "rackspace-cloudservers-uk"; -// "named:hpcloud-compute-at"; -// "localhost"; -// "jcloudsByon:(provider=\"aws-ec2\",region=\"us-east-1\",user=\"aled\",hosts=\"i-6f374743,i-35324219,i-1135453d\")"; - - protected Location testLocation; - protected CassandraDatacenter cluster; - - @BeforeMethod(alwaysRun = true) - @Override - public void setUp() throws Exception { - super.setUp(); - testLocation = mgmt.getLocationRegistry().resolve(provider); - } - - @AfterMethod(alwaysRun=true) - @Override - public void tearDown() throws Exception { - super.tearDown(); - } - - @Test(groups = "Live") - public void testDatacenter() throws Exception { - EntitySpec<CassandraDatacenter> spec = EntitySpec.create(CassandraDatacenter.class) - .configure("initialSize", 2) - .configure("clusterName", "CassandraClusterLiveTest"); - runCluster(spec, false); - } - - @Test(groups = "Live") - public void testDatacenterWithVnodes() throws Exception { - EntitySpec<CassandraDatacenter> spec = EntitySpec.create(CassandraDatacenter.class) - .configure("initialSize", 2) - .configure(CassandraDatacenter.USE_VNODES, true) - .configure("clusterName", "CassandraClusterLiveTest"); - runCluster(spec, true); - } - - /* - * TODO on some distros (e.g. CentOS?), it comes pre-installed with java 6. Installing java 7 - * didn't seem to be enough. I also had to set JAVA_HOME: - * .configure("shell.env", MutableMap.of("JAVA_HOME", "/etc/alternatives/java_sdk_1.7.0")) - * However, that would break other deployments such as on Ubuntu where JAVA_HOME would be different. - */ - @Test(groups = "Live") - public void testDatacenterWithVnodesVersion2() throws Exception { - EntitySpec<CassandraDatacenter> spec = EntitySpec.create(CassandraDatacenter.class) - .configure("initialSize", 2) - .configure(CassandraNode.SUGGESTED_VERSION, "2.0.9") - .configure(CassandraDatacenter.USE_VNODES, true) - .configure("clusterName", "CassandraClusterLiveTest"); - runCluster(spec, true); - } - - @Test(groups = {"Live", "Acceptance"}, invocationCount=10) - public void testManyTimes() throws Exception { - testDatacenter(); - } - - /** - * Test a Cassandra Datacenter: - * <ol> - * <li>Create two node datacenter - * <li>Confirm allows access via the Astyanax API through both nodes. - * <li>Confirm can size - * </ol> - */ - protected void runCluster(EntitySpec<CassandraDatacenter> datacenterSpec, boolean usesVnodes) throws Exception { - cluster = app.createAndManageChild(datacenterSpec); - assertEquals(cluster.getCurrentSize().intValue(), 0); - - app.start(ImmutableList.of(testLocation)); - - // Check cluster is up and healthy - EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.GROUP_SIZE, 2); - Entities.dumpInfo(app); - List<CassandraNode> members = castToCassandraNodes(cluster.getMembers()); - assertNodesConsistent(members); - - if (usesVnodes) { - assertVnodeTokensConsistent(members); - } else { - assertSingleTokenConsistent(members); - } - - // Can connect via Astyanax - checkConnectionRepeatedly(2, 5, members); - - // Resize - cluster.resize(3); - assertEquals(cluster.getMembers().size(), 3, "members="+cluster.getMembers()); - if (usesVnodes) { - assertVnodeTokensConsistent(castToCassandraNodes(cluster.getMembers())); - } else { - assertSingleTokenConsistent(castToCassandraNodes(cluster.getMembers())); - } - checkConnectionRepeatedly(2, 5, cluster.getMembers()); - } - - protected static List<CassandraNode> castToCassandraNodes(Collection<? extends Entity> rawnodes) { - final List<CassandraNode> nodes = Lists.newArrayList(); - for (Entity node : rawnodes) { - nodes.add((CassandraNode) node); - } - return nodes; - } - - protected static void assertNodesConsistent(final List<CassandraNode> nodes) { - final Integer expectedLiveNodeCount = nodes.size(); - // may take some time to be consistent (with new thrift_latency checks on the node, - // contactability should not be an issue, but consistency still might be) - Asserts.succeedsEventually(MutableMap.of("timeout", Duration.TWO_MINUTES), new Runnable() { - public void run() { - for (Entity n : nodes) { - CassandraNode node = (CassandraNode) n; - EntityTestUtils.assertAttributeEquals(node, Startable.SERVICE_UP, true); - String errmsg = "node="+node+"; hostname="+node.getAttribute(Attributes.HOSTNAME)+"; port="+node.getThriftPort(); - assertTrue(isSocketOpen(node), errmsg); - assertTrue(areVersionsConsistent(node), errmsg); - EntityTestUtils.assertAttributeEquals(node, CassandraNode.LIVE_NODE_COUNT, expectedLiveNodeCount); - } - }}); - } - - protected static void assertSingleTokenConsistent(final List<CassandraNode> nodes) { - final int numNodes = nodes.size(); - Asserts.succeedsEventually(MutableMap.of("timeout", Duration.TWO_MINUTES), new Runnable() { - public void run() { - Set<BigInteger> alltokens = Sets.newLinkedHashSet(); - for (Entity node : nodes) { - EntityTestUtils.assertAttributeEquals(node, Startable.SERVICE_UP, true); - EntityTestUtils.assertConfigEquals(node, CassandraNode.NUM_TOKENS_PER_NODE, 1); - EntityTestUtils.assertAttributeEquals(node, CassandraNode.PEERS, numNodes); - BigInteger token = node.getAttribute(CassandraNode.TOKEN); - Set<BigInteger> tokens = node.getAttribute(CassandraNode.TOKENS); - assertNotNull(token); - assertEquals(tokens, ImmutableSet.of(token)); - alltokens.addAll(tokens); - } - assertEquals(alltokens.size(), numNodes); - }}); - } - - protected static void assertVnodeTokensConsistent(final List<CassandraNode> nodes) { - final int numNodes = nodes.size(); - final int tokensPerNode = Iterables.get(nodes, 0).getNumTokensPerNode(); - - Asserts.succeedsEventually(MutableMap.of("timeout", Duration.TWO_MINUTES), new Runnable() { - public void run() { - Set<BigInteger> alltokens = Sets.newLinkedHashSet(); - for (Entity node : nodes) { - EntityTestUtils.assertAttributeEquals(node, Startable.SERVICE_UP, true); - EntityTestUtils.assertAttributeEquals(node, CassandraNode.PEERS, tokensPerNode*numNodes); - EntityTestUtils.assertConfigEquals(node, CassandraNode.NUM_TOKENS_PER_NODE, 256); - BigInteger token = node.getAttribute(CassandraNode.TOKEN); - Set<BigInteger> tokens = node.getAttribute(CassandraNode.TOKENS); - assertNotNull(token); - assertEquals(tokens.size(), tokensPerNode, "tokens="+tokens); - alltokens.addAll(tokens); - } - assertEquals(alltokens.size(), tokensPerNode*numNodes); - }}); - } - - protected static void checkConnectionRepeatedly(int totalAttemptsAllowed, int numRetriesPerAttempt, Iterable<? extends Entity> nodes) throws Exception { - int attemptNum = 0; - while (true) { - try { - checkConnection(numRetriesPerAttempt, nodes); - return; - } catch (Exception e) { - attemptNum++; - if (attemptNum >= totalAttemptsAllowed) { - log.warn("Cassandra not usable, "+attemptNum+" attempts; failing: "+e, e); - throw e; - } - log.warn("Cassandra not usable (attempt "+attemptNum+" of "+totalAttemptsAllowed+"), trying again after delay: "+e, e); - Time.sleep(Duration.TEN_SECONDS); - } - } - } - - protected static void checkConnection(int numRetries, Iterable<? extends Entity> nodes) throws ConnectionException { - CassandraNode first = (CassandraNode) Iterables.get(nodes, 0); - - // have been seeing intermittent SchemaDisagreementException errors on AWS, probably due to Astyanax / how we are using it - // (confirmed that clocks are in sync) - String uniqueName = Identifiers.makeRandomId(8); - AstyanaxSample astyanaxFirst = AstyanaxSample.builder().node(first).columnFamilyName(uniqueName).build(); - Map<String, List<String>> versions; - AstyanaxContext<Cluster> context = astyanaxFirst.newAstyanaxContextForCluster(); - try { - versions = context.getEntity().describeSchemaVersions(); - } finally { - context.shutdown(); - } - - log.info("Cassandra schema versions are: "+versions); - if (versions.size() > 1) { - Assert.fail("Inconsistent versions on Cassandra start: "+versions); - } - String keyspacePrefix = "BrooklynTests_"+Identifiers.makeRandomId(8); - - String keyspaceName = astyanaxFirst.writeData(keyspacePrefix, numRetries); - - for (Entity node : nodes) { - AstyanaxSample astyanaxSecond = AstyanaxSample.builder().node((CassandraNode)node).columnFamilyName(uniqueName).build(); - astyanaxSecond.readData(keyspaceName, numRetries); - } - } - - protected static Boolean areVersionsConsistent(CassandraNode node) { - AstyanaxContext<Cluster> context = null; - try { - context = new AstyanaxSample(node).newAstyanaxContextForCluster(); - Map<String, List<String>> v = context.getEntity().describeSchemaVersions(); - return v.size() == 1; - } catch (Exception e) { - return null; - } finally { - if (context != null) context.shutdown(); - } - } - - protected static boolean isSocketOpen(CassandraNode node) { - try { - Socket s = new Socket(node.getAttribute(Attributes.HOSTNAME), node.getThriftPort()); - s.close(); - return true; - } catch (Exception e) { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/CassandraDatacenterRebindIntegrationTest.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/CassandraDatacenterRebindIntegrationTest.java b/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/CassandraDatacenterRebindIntegrationTest.java deleted file mode 100644 index 8f917cb..0000000 --- a/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/CassandraDatacenterRebindIntegrationTest.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.cassandra; - -import static org.testng.Assert.assertNotNull; - -import java.math.BigInteger; -import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import brooklyn.entity.proxy.nginx.NginxController; -import brooklyn.entity.proxying.EntitySpec; -import brooklyn.entity.rebind.RebindOptions; -import brooklyn.entity.rebind.RebindTestFixtureWithApp; -import brooklyn.entity.trait.Startable; -import brooklyn.location.basic.LocalhostMachineProvisioningLocation; -import brooklyn.test.EntityTestUtils; - -import com.google.common.base.Predicates; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; - -/** - * Test the operation of the {@link NginxController} class. - */ -public class CassandraDatacenterRebindIntegrationTest extends RebindTestFixtureWithApp { - private static final Logger LOG = LoggerFactory.getLogger(CassandraDatacenterRebindIntegrationTest.class); - - private LocalhostMachineProvisioningLocation localhostProvisioningLocation; - - @BeforeMethod(alwaysRun=true) - public void setUp() throws Exception { - CassandraNodeIntegrationTest.assertCassandraPortsAvailableEventually(); - super.setUp(); - localhostProvisioningLocation = origApp.newLocalhostProvisioningLocation(); - } - - @AfterMethod(alwaysRun=true) - @Override - public void tearDown() throws Exception { - super.tearDown(); - CassandraNodeIntegrationTest.assertCassandraPortsAvailableEventually(); - } - - /** - * Test that Brooklyn can rebind to a single node datacenter. - */ - @Test(groups = "Integration") - public void testRebindDatacenterOfSizeOne() throws Exception { - CassandraDatacenter origDatacenter = origApp.createAndManageChild(EntitySpec.create(CassandraDatacenter.class) - .configure("initialSize", 1)); - - origApp.start(ImmutableList.of(localhostProvisioningLocation)); - CassandraNode origNode = (CassandraNode) Iterables.get(origDatacenter.getMembers(), 0); - - EntityTestUtils.assertAttributeEqualsEventually(origDatacenter, CassandraDatacenter.GROUP_SIZE, 1); - CassandraDatacenterLiveTest.assertNodesConsistent(ImmutableList.of(origNode)); - CassandraDatacenterLiveTest.assertSingleTokenConsistent(ImmutableList.of(origNode)); - CassandraDatacenterLiveTest.checkConnectionRepeatedly(2, 5, ImmutableList.of(origNode)); - BigInteger origToken = origNode.getAttribute(CassandraNode.TOKEN); - Set<BigInteger> origTokens = origNode.getAttribute(CassandraNode.TOKENS); - assertNotNull(origToken); - - newApp = rebind(RebindOptions.create().terminateOrigManagementContext(true)); - final CassandraDatacenter newDatacenter = (CassandraDatacenter) Iterables.find(newApp.getChildren(), Predicates.instanceOf(CassandraDatacenter.class)); - final CassandraNode newNode = (CassandraNode) Iterables.find(newDatacenter.getMembers(), Predicates.instanceOf(CassandraNode.class)); - - EntityTestUtils.assertAttributeEqualsEventually(newDatacenter, CassandraDatacenter.GROUP_SIZE, 1); - EntityTestUtils.assertAttributeEqualsEventually(newNode, Startable.SERVICE_UP, true); - EntityTestUtils.assertAttributeEqualsEventually(newNode, CassandraNode.TOKEN, origToken); - EntityTestUtils.assertAttributeEqualsEventually(newNode, CassandraNode.TOKENS, origTokens); - CassandraDatacenterLiveTest.assertNodesConsistent(ImmutableList.of(newNode)); - CassandraDatacenterLiveTest.assertSingleTokenConsistent(ImmutableList.of(newNode)); - CassandraDatacenterLiveTest.checkConnectionRepeatedly(2, 5, ImmutableList.of(newNode)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/CassandraDatacenterTest.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/CassandraDatacenterTest.java b/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/CassandraDatacenterTest.java deleted file mode 100644 index f902ac2..0000000 --- a/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/CassandraDatacenterTest.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.cassandra; - -import static org.testng.Assert.assertEquals; - -import java.math.BigInteger; -import java.util.Map; -import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import brooklyn.entity.BrooklynAppUnitTestSupport; -import brooklyn.entity.Entity; -import brooklyn.entity.basic.Attributes; -import brooklyn.entity.basic.EmptySoftwareProcess; -import brooklyn.entity.basic.EmptySoftwareProcessSshDriver; -import brooklyn.entity.basic.EntityInternal; -import brooklyn.entity.proxying.EntitySpec; -import brooklyn.location.LocationSpec; -import brooklyn.location.basic.LocalhostMachineProvisioningLocation; -import brooklyn.test.EntityTestUtils; -import brooklyn.util.ResourceUtils; -import brooklyn.util.javalang.JavaClassNames; -import brooklyn.util.text.TemplateProcessor; -import brooklyn.util.time.Duration; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; - -public class CassandraDatacenterTest extends BrooklynAppUnitTestSupport { - - private static final Logger log = LoggerFactory.getLogger(CassandraDatacenterTest.class); - - private LocalhostMachineProvisioningLocation loc; - private CassandraDatacenter cluster; - - @BeforeMethod(alwaysRun=true) - @Override - public void setUp() throws Exception { - super.setUp(); - loc = mgmt.getLocationManager().createLocation(LocationSpec.create(LocalhostMachineProvisioningLocation.class)); - } - - @Test - public void testPopulatesInitialSeeds() throws Exception { - cluster = app.createAndManageChild(EntitySpec.create(CassandraDatacenter.class) - .configure(CassandraDatacenter.INITIAL_SIZE, 2) - .configure(CassandraDatacenter.TOKEN_SHIFT, BigInteger.ZERO) - .configure(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER, Duration.ZERO) - .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(EmptySoftwareProcess.class))); - - app.start(ImmutableList.of(loc)); - EmptySoftwareProcess e1 = (EmptySoftwareProcess) Iterables.get(cluster.getMembers(), 0); - EmptySoftwareProcess e2 = (EmptySoftwareProcess) Iterables.get(cluster.getMembers(), 1); - - EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.CURRENT_SEEDS, ImmutableSet.<Entity>of(e1, e2)); - } - - @Test(groups="Integration") // because takes approx 2 seconds - public void testUpdatesSeedsOnFailuresAndAdditions() throws Exception { - doTestUpdatesSeedsOnFailuresAndAdditions(true, false); - } - - protected void doTestUpdatesSeedsOnFailuresAndAdditions(boolean fast, boolean checkSeedsConstantOnRejoining) throws Exception { - cluster = app.createAndManageChild(EntitySpec.create(CassandraDatacenter.class) - .configure(CassandraDatacenter.INITIAL_SIZE, 2) - .configure(CassandraDatacenter.TOKEN_SHIFT, BigInteger.ZERO) - .configure(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER, Duration.ZERO) - .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(EmptySoftwareProcess.class))); - - app.start(ImmutableList.of(loc)); - EmptySoftwareProcess e1 = (EmptySoftwareProcess) Iterables.get(cluster.getMembers(), 0); - EmptySoftwareProcess e2 = (EmptySoftwareProcess) Iterables.get(cluster.getMembers(), 1); - EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.CURRENT_SEEDS, ImmutableSet.<Entity>of(e1, e2)); - log.debug("Test "+JavaClassNames.niceClassAndMethod()+", cluster "+cluster+" has "+cluster.getMembers()+"; e1="+e1+" e2="+e2); - - // calling the driver stop for this entity will cause SERVICE_UP to become false, and stay false - // (and that's all it does, incidentally); if we just set the attribute it will become true on serviceUp sensor feed - ((EmptySoftwareProcess)e1).getDriver().stop(); - // not necessary, but speeds things up: - if (fast) - ((EntityInternal)e1).setAttribute(Attributes.SERVICE_UP, false); - - EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.CURRENT_SEEDS, ImmutableSet.<Entity>of(e2)); - - cluster.resize(3); - EmptySoftwareProcess e3 = (EmptySoftwareProcess) Iterables.getOnlyElement(Sets.difference(ImmutableSet.copyOf(cluster.getMembers()), ImmutableSet.of(e1,e2))); - log.debug("Test "+JavaClassNames.niceClassAndMethod()+", cluster "+cluster+" has "+cluster.getMembers()+"; e3="+e3); - try { - EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.CURRENT_SEEDS, ImmutableSet.<Entity>of(e2, e3)); - } finally { - log.debug("Test "+JavaClassNames.niceClassAndMethod()+", cluster "+cluster+" has "+cluster.getMembers()+"; seeds "+cluster.getAttribute(CassandraDatacenter.CURRENT_SEEDS)); - } - - if (!checkSeedsConstantOnRejoining) { - // cluster should not revert to e1+e2, simply because e1 has come back; but e1 should rejoin the group - // (not that important, and waits for 1s, so only done as part of integration) - ((EmptySoftwareProcessSshDriver)(((EmptySoftwareProcess)e1).getDriver())).launch(); - if (fast) - ((EntityInternal)e1).setAttribute(Attributes.SERVICE_UP, true); - EntityTestUtils.assertAttributeEqualsEventually(e1, CassandraNode.SERVICE_UP, true); - EntityTestUtils.assertAttributeEqualsContinually(cluster, CassandraDatacenter.CURRENT_SEEDS, ImmutableSet.<Entity>of(e2, e3)); - } - } - - @Test - public void testPopulatesInitialTokens() throws Exception { - cluster = app.createAndManageChild(EntitySpec.create(CassandraDatacenter.class) - .configure(CassandraDatacenter.INITIAL_SIZE, 2) - .configure(CassandraDatacenter.TOKEN_SHIFT, BigInteger.ZERO) - .configure(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER, Duration.ZERO) - .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(EmptySoftwareProcess.class))); - - app.start(ImmutableList.of(loc)); - - Set<BigInteger> tokens = Sets.newLinkedHashSet(); - Set<BigInteger> tokens2 = Sets.newLinkedHashSet(); - for (Entity member : cluster.getMembers()) { - BigInteger memberToken = member.getConfig(CassandraNode.TOKEN); - Set<BigInteger > memberTokens = member.getConfig(CassandraNode.TOKENS); - if (memberToken != null) tokens.add(memberToken); - if (memberTokens != null) tokens2.addAll(memberTokens); - } - assertEquals(tokens, ImmutableSet.of(new BigInteger("-9223372036854775808"), BigInteger.ZERO)); - assertEquals(tokens2, ImmutableSet.of()); - } - - @Test - public void testDoesNotPopulateInitialTokens() throws Exception { - cluster = app.createAndManageChild(EntitySpec.create(CassandraDatacenter.class) - .configure(CassandraDatacenter.INITIAL_SIZE, 2) - .configure(CassandraDatacenter.USE_VNODES, true) - .configure(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER, Duration.ZERO) - .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(EmptySoftwareProcess.class))); - - app.start(ImmutableList.of(loc)); - - Set<BigInteger> tokens = Sets.newLinkedHashSet(); - Set<BigInteger> tokens2 = Sets.newLinkedHashSet(); - for (Entity member : cluster.getMembers()) { - BigInteger memberToken = member.getConfig(CassandraNode.TOKEN); - Set<BigInteger > memberTokens = member.getConfig(CassandraNode.TOKENS); - if (memberToken != null) tokens.add(memberToken); - if (memberTokens != null) tokens2.addAll(memberTokens); - } - assertEquals(tokens, ImmutableSet.of()); - assertEquals(tokens2, ImmutableSet.of()); - } - - public static class MockInputForTemplate { - public BigInteger getToken() { return new BigInteger("-9223372036854775808"); } - public String getTokensAsString() { return "" + getToken(); } - public int getNumTokensPerNode() { return 1; } - public String getSeeds() { return ""; } - public int getGossipPort() { return 1234; } - public int getSslGossipPort() { return 1234; } - public int getThriftPort() { return 1234; } - public int getNativeTransportPort() { return 1234; } - public String getClusterName() { return "Mock"; } - public String getEndpointSnitchName() { return ""; } - public String getListenAddress() { return "0"; } - public String getBroadcastAddress() { return "0"; } - public String getRpcAddress() { return "0"; } - public String getRunDir() { return "/tmp/mock"; } - } - - @Test - public void testBigIntegerFormattedCorrectly() { - Map<String, Object> substitutions = ImmutableMap.<String, Object>builder() - .put("entity", new MockInputForTemplate()) - .put("driver", new MockInputForTemplate()) - .build(); - - String templatedUrl = CassandraNode.CASSANDRA_CONFIG_TEMPLATE_URL.getDefaultValue(); - String url = TemplateProcessor.processTemplateContents(templatedUrl, ImmutableMap.of("entity", ImmutableMap.of("majorMinorVersion", "1.2"))); - String templateContents = new ResourceUtils(this).getResourceAsString(url); - String processedTemplate = TemplateProcessor.processTemplateContents(templateContents, substitutions); - Assert.assertEquals(processedTemplate.indexOf("775,808"), -1); - Assert.assertTrue(processedTemplate.indexOf("-9223372036854775808") > 0); - } - - @Test(groups="Integration") // because takes approx 30 seconds - public void testUpdatesSeedsFastishManyTimes() throws Exception { - final int COUNT = 20; - for (int i=0; i<COUNT; i++) { - log.info("Test "+JavaClassNames.niceClassAndMethod()+", iteration "+(i+1)+" of "+COUNT); - try { - doTestUpdatesSeedsOnFailuresAndAdditions(true, true); - tearDown(); - setUp(); - } catch (Exception e) { - log.warn("Error in "+JavaClassNames.niceClassAndMethod()+", iteration "+(i+1)+" of "+COUNT, e); - throw e; - } - } - } - - @Test(groups="Integration") // because takes approx 5 seconds - public void testUpdateSeedsSlowAndRejoining() throws Exception { - final int COUNT = 1; - for (int i=0; i<COUNT; i++) { - log.info("Test "+JavaClassNames.niceClassAndMethod()+", iteration "+(i+1)+" of "+COUNT); - doTestUpdatesSeedsOnFailuresAndAdditions(false, true); - tearDown(); - setUp(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/CassandraFabricTest.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/CassandraFabricTest.java b/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/CassandraFabricTest.java deleted file mode 100644 index f4a786a..0000000 --- a/software/nosql/src/test/java/brooklyn/entity/nosql/cassandra/CassandraFabricTest.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.cassandra; - -import static org.testng.Assert.assertEquals; - -import java.util.Collection; -import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import brooklyn.entity.BrooklynAppUnitTestSupport; -import brooklyn.entity.Entity; -import brooklyn.entity.basic.AbstractEntity; -import brooklyn.entity.basic.Attributes; -import brooklyn.entity.basic.EmptySoftwareProcess; -import brooklyn.entity.basic.EntityInternal; -import brooklyn.entity.basic.EntityLocal; -import brooklyn.entity.basic.Lifecycle; -import brooklyn.entity.basic.ServiceStateLogic; -import brooklyn.entity.proxying.EntitySpec; -import brooklyn.entity.proxying.ImplementedBy; -import brooklyn.entity.trait.Startable; -import brooklyn.location.Location; -import brooklyn.location.LocationSpec; -import brooklyn.location.basic.LocalhostMachineProvisioningLocation; -import brooklyn.test.EntityTestUtils; -import brooklyn.util.time.Duration; - -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; - -public class CassandraFabricTest extends BrooklynAppUnitTestSupport { - - private static final Logger log = LoggerFactory.getLogger(CassandraFabricTest.class); - - private LocalhostMachineProvisioningLocation loc1; - private LocalhostMachineProvisioningLocation loc2; - private CassandraFabric fabric; - - @BeforeMethod(alwaysRun=true) - @Override - public void setUp() throws Exception { - super.setUp(); - loc1 = mgmt.getLocationManager().createLocation(LocationSpec.create(LocalhostMachineProvisioningLocation.class)); - loc2 = mgmt.getLocationManager().createLocation(LocationSpec.create(LocalhostMachineProvisioningLocation.class)); - } - - @Test - public void testPopulatesInitialSeeds() throws Exception { - fabric = app.createAndManageChild(EntitySpec.create(CassandraFabric.class) - .configure(CassandraFabric.INITIAL_QUORUM_SIZE, 2) - .configure(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER, Duration.ZERO) - .configure(CassandraFabric.MEMBER_SPEC, EntitySpec.create(CassandraDatacenter.class) - .configure(CassandraDatacenter.INITIAL_SIZE, 2) - .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(EmptySoftwareProcess.class)))); - - app.start(ImmutableList.of(loc1, loc2)); - CassandraDatacenter d1 = (CassandraDatacenter) Iterables.get(fabric.getMembers(), 0); - CassandraDatacenter d2 = (CassandraDatacenter) Iterables.get(fabric.getMembers(), 1); - - final EmptySoftwareProcess d1a = (EmptySoftwareProcess) Iterables.get(d1.getMembers(), 0); - final EmptySoftwareProcess d1b = (EmptySoftwareProcess) Iterables.get(d1.getMembers(), 1); - - final EmptySoftwareProcess d2a = (EmptySoftwareProcess) Iterables.get(d2.getMembers(), 0); - final EmptySoftwareProcess d2b = (EmptySoftwareProcess) Iterables.get(d2.getMembers(), 1); - - Predicate<Set<Entity>> predicate = new Predicate<Set<Entity>>() { - @Override public boolean apply(Set<Entity> input) { - return input != null && input.size() >= 2 && - Sets.intersection(input, ImmutableSet.of(d1a, d1b)).size() == 1 && - Sets.intersection(input, ImmutableSet.of(d2a, d2b)).size() == 1; - } - }; - EntityTestUtils.assertAttributeEventually(fabric, CassandraFabric.CURRENT_SEEDS, predicate); - EntityTestUtils.assertAttributeEventually(d1, CassandraDatacenter.CURRENT_SEEDS, predicate); - EntityTestUtils.assertAttributeEventually(d2, CassandraDatacenter.CURRENT_SEEDS, predicate); - - Set<Entity> seeds = fabric.getAttribute(CassandraFabric.CURRENT_SEEDS); - assertEquals(d1.getAttribute(CassandraDatacenter.CURRENT_SEEDS), seeds); - assertEquals(d2.getAttribute(CassandraDatacenter.CURRENT_SEEDS), seeds); - log.info("Seeds="+seeds); - } - - @Test - public void testPopulatesInitialSeedsWhenNodesOfOneClusterComeUpBeforeTheOtherCluster() throws Exception { - fabric = app.createAndManageChild(EntitySpec.create(CassandraFabric.class) - .configure(CassandraFabric.INITIAL_QUORUM_SIZE, 2) - .configure(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER, Duration.ZERO) - .configure(CassandraFabric.MEMBER_SPEC, EntitySpec.create(CassandraDatacenter.class) - .configure(CassandraDatacenter.INITIAL_SIZE, 2) - .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(DummyCassandraNode.class)))); - - Thread t = new Thread() { - public void run() { - app.start(ImmutableList.of(loc1, loc2)); - } - }; - t.start(); - try { - EntityTestUtils.assertGroupSizeEqualsEventually(fabric, 2); - CassandraDatacenter d1 = (CassandraDatacenter) Iterables.get(fabric.getMembers(), 0); - CassandraDatacenter d2 = (CassandraDatacenter) Iterables.get(fabric.getMembers(), 1); - - EntityTestUtils.assertGroupSizeEqualsEventually(d1, 2); - final DummyCassandraNode d1a = (DummyCassandraNode) Iterables.get(d1.getMembers(), 0); - final DummyCassandraNode d1b = (DummyCassandraNode) Iterables.get(d1.getMembers(), 1); - - EntityTestUtils.assertGroupSizeEqualsEventually(d2, 2); - final DummyCassandraNode d2a = (DummyCassandraNode) Iterables.get(d2.getMembers(), 0); - final DummyCassandraNode d2b = (DummyCassandraNode) Iterables.get(d2.getMembers(), 1); - - d1a.setAttribute(Attributes.HOSTNAME, "d1a"); - d1b.setAttribute(Attributes.HOSTNAME, "d1b"); - - Thread.sleep(1000); - d2a.setAttribute(Attributes.HOSTNAME, "d2a"); - d2b.setAttribute(Attributes.HOSTNAME, "d2b"); - - Predicate<Set<Entity>> predicate = new Predicate<Set<Entity>>() { - @Override public boolean apply(Set<Entity> input) { - return input != null && input.size() >= 2 && - Sets.intersection(input, ImmutableSet.of(d1a, d1b)).size() == 1 && - Sets.intersection(input, ImmutableSet.of(d2a, d2b)).size() == 1; - } - }; - EntityTestUtils.assertAttributeEventually(fabric, CassandraFabric.CURRENT_SEEDS, predicate); - EntityTestUtils.assertAttributeEventually(d1, CassandraDatacenter.CURRENT_SEEDS, predicate); - EntityTestUtils.assertAttributeEventually(d2, CassandraDatacenter.CURRENT_SEEDS, predicate); - - Set<Entity> seeds = fabric.getAttribute(CassandraFabric.CURRENT_SEEDS); - assertEquals(d1.getAttribute(CassandraDatacenter.CURRENT_SEEDS), seeds); - assertEquals(d2.getAttribute(CassandraDatacenter.CURRENT_SEEDS), seeds); - log.info("Seeds="+seeds); - } finally { - log.info("Failed seeds; fabric="+fabric.getAttribute(CassandraFabric.CURRENT_SEEDS)); - t.interrupt(); - } - } - - - @ImplementedBy(DummyCassandraNodeImpl.class) - public interface DummyCassandraNode extends Entity, Startable, EntityLocal, EntityInternal { - } - - public static class DummyCassandraNodeImpl extends AbstractEntity implements DummyCassandraNode { - - @Override - public void start(Collection<? extends Location> locations) { - ServiceStateLogic.setExpectedState(this, Lifecycle.STARTING); - } - - @Override - public void stop() { - ServiceStateLogic.setExpectedState(this, Lifecycle.STOPPING); - } - - @Override - public void restart() { - } - } -}
